Ver código fonte

use only one source thread to manage all sink nodes

Corentin Salingue 8 anos atrás
pai
commit
a8ca292382

+ 3 - 0
configure.ac

@@ -1916,6 +1916,9 @@ if test x$build_mpi_master_slave = xyes; then
     CXXLD=mpicxx_path    
 fi
 
+AC_ARG_WITH(mpi-master-slave-multiple-thread, [AS_HELP_STRING([--with-mpiexec-args])],
+	[AC_DEFINE([STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD], [1], [Use multiple threads to communicate with slaves])])
+
 AC_MSG_CHECKING(whether the master-slave mode should be enabled)
 AC_MSG_RESULT($build_mpi_master_slave)
 AM_CONDITIONAL([STARPU_USE_MPI_MASTER_SLAVE], [test x$build_mpi_master_slave = xyes])

+ 1 - 1
src/core/topology.c

@@ -78,7 +78,7 @@ static struct _starpu_worker_set cuda_worker_set[STARPU_MAXCUDADEVS];
 static struct _starpu_worker_set mic_worker_set[STARPU_MAXMICDEVS];
 #endif
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
-static struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
+struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
 #endif
 
 void *

+ 45 - 0
src/core/workers.c

@@ -811,6 +811,12 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 
 				worker_set->set_is_initialized = 0;
 
+#ifdef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+                /* if MPI has multiple threads supports
+                 * we launch 1 thread per device 
+                 * else 
+                 * we launch one thread for all devices
+                 */
 				STARPU_PTHREAD_CREATE_ON(
 						workerarg->name,
 						&worker_set->worker_thread,
@@ -833,6 +839,7 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 				STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
 
 				worker_set->started = 1;
+#endif /* STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD */
 
 				break;
 #endif /* STARPU_USE_MPI_MASTER_SLAVE */
@@ -842,6 +849,44 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 		}
 	}
 
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
+    if (pconfig->topology.nmpidevices > 0)
+    {
+        struct _starpu_worker_set * worker_set_zero = &mpi_worker_set[0];
+        struct _starpu_worker * worker_zero = &worker_set_zero->workers[0];
+        STARPU_PTHREAD_CREATE_ON(
+                worker_zero->name,
+                &worker_set_zero->worker_thread,
+                NULL,
+                _starpu_mpi_src_worker,
+                &mpi_worker_set,
+                _starpu_simgrid_get_host_by_worker(worker_zero));
+
+        /* We use the first worker to know if everything are finished */
+#ifdef STARPU_USE_FXT
+        STARPU_PTHREAD_MUTEX_LOCK(&worker_zero->mutex);
+        while (!worker_zero->worker_is_running)
+            STARPU_PTHREAD_COND_WAIT(&worker_zero->started_cond, &worker_zero->mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&worker_zero->mutex);
+#endif
+
+        STARPU_PTHREAD_MUTEX_LOCK(&worker_set_zero->mutex);
+        while (!worker_set_zero->set_is_initialized)
+            STARPU_PTHREAD_COND_WAIT(&worker_set_zero->ready_cond,
+                    &worker_set_zero->mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set_zero->mutex);
+
+        int mpidevice;
+        for (mpidevice = 0; mpidevice < pconfig->topology.nmpidevices; mpidevice++)
+        {
+            mpi_worker_set[mpidevice].started = 1;
+            mpi_worker_set[mpidevice].worker_thread = mpi_worker_set[0].worker_thread;
+        }
+
+    }
+
+#endif
+
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_worker *workerarg = &pconfig->workers[worker];

+ 4 - 0
src/core/workers.h

@@ -178,6 +178,10 @@ struct _starpu_worker_set
 	unsigned set_is_initialized;
 };
 
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
+#endif
+
 struct _starpu_machine_topology
 {
 	/* Total number of workers. */

+ 149 - 57
src/drivers/mp_common/source_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -24,9 +24,21 @@
 
 
 #include <datawizard/coherency.h>
+#include <datawizard/memory_nodes.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <drivers/mp_common/mp_common.h>
 
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
+struct starpu_save_thread_env
+{
+    struct starpu_task * current_task;
+    struct _starpu_worker * current_worker;
+    struct _starpu_worker_set * current_worker_set;
+    unsigned * current_mem_node;
+};
+
+struct starpu_save_thread_env save_thread_env[STARPU_MAXMPIDEVS];
+#endif
 
 /* Finalize the execution of a task by a worker*/
 static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
@@ -646,6 +658,23 @@ int _starpu_src_common_locate_file(char *located_file_name,
 	return 1;
 }
 
+
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
+static void _starpu_src_common_switch_env(unsigned old, unsigned new)
+{
+    save_thread_env[old].current_task = starpu_task_get_current();
+    save_thread_env[old].current_worker = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
+    save_thread_env[old].current_worker_set = STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
+    save_thread_env[old].current_mem_node = STARPU_PTHREAD_GETSPECIFIC(_starpu_memory_node_key);
+
+    _starpu_set_current_task(save_thread_env[new].current_task);
+    STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, save_thread_env[new].current_worker);
+    STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, save_thread_env[new].current_worker_set);
+    STARPU_PTHREAD_SETSPECIFIC(_starpu_memory_node_key, save_thread_env[new].current_mem_node);
+}
+#endif
+
+
 /* Send workers to the sink node
  */
 static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
@@ -671,76 +700,139 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
 	node->dt_send(node, &config->combined_workers,combined_worker_size);
 }
 
-/* Function looping on the source node */
-void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
-		unsigned baseworkerid,
-		struct _starpu_mp_node * mp_node)
+
+static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
 {
-	unsigned memnode = worker_set->workers[0].memory_node;
-	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
+    int res = 0;
+    struct _starpu_job * j;
 
-	_starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
+    _starpu_may_pause();
 
-	/*main loop*/
-	while (_starpu_machine_is_running())
-	{
-		int res = 0;
-		struct _starpu_job * j;
+#ifdef STARPU_SIMGRID
+    starpu_pthread_wait_reset(&worker_set->workers[0].wait);
+#endif
+
+    _STARPU_TRACE_START_PROGRESS(memnode);
+    res |= __starpu_datawizard_progress(memnode, 1, 1);
+    res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
+    _STARPU_TRACE_END_PROGRESS(memnode);
 
-		_starpu_may_pause();
+    /* Handle message which have been store */
+    _starpu_src_common_handle_stored_async(mp_node);
+
+    /* poll the device for completed jobs.*/
+    while(mp_node->mp_recv_is_ready(mp_node))
+        _starpu_src_common_recv_async(mp_node);
+
+    /* get task for each worker*/
+    res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
 
 #ifdef STARPU_SIMGRID
-		starpu_pthread_wait_reset(&worker_set->workers[0].wait);
+    if (!res)
+        starpu_pthread_wait_wait(&worker_set->workers[0].wait);
 #endif
 
-		_STARPU_TRACE_START_PROGRESS(memnode);
-		res |= __starpu_datawizard_progress(memnode, 1, 1);
-		res |= __starpu_datawizard_progress(STARPU_MAIN_RAM, 1, 1);
-		_STARPU_TRACE_END_PROGRESS(memnode);
+    /*if at least one worker have pop a task*/
+    if(res != 0)
+    {
+        unsigned i;
+        for(i=0; i<worker_set->nworkers; i++)
+        {
+            if(tasks[i] != NULL)
+            {
+                j = _starpu_get_job_associated_to_task(tasks[i]);
+                _starpu_set_local_worker_key(&worker_set->workers[i]);
+                res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
+                switch (res)
+                {
+                    case 0:
+                        /* The task task has been launched with no error */
+                        break;
+                    case -EAGAIN:
+                        _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
+                        _starpu_push_task_to_workers(tasks[i]);
+                        STARPU_ABORT();
+                        continue;
+                        break;
+                    default:
+                        STARPU_ASSERT(0);
+                }
+            }
+        }
+    }
 
-		/* Handle message which have been store */
-		_starpu_src_common_handle_stored_async(mp_node);
+}
 
-		/* poll the device for completed jobs.*/
-		while(mp_node->mp_recv_is_ready(mp_node))
-			_starpu_src_common_recv_async(mp_node);
 
-		/* get task for each worker*/
-		res |= _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers, memnode);
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
+/* Function looping on the source node */
+void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
+        int ndevices,
+		struct _starpu_mp_node ** mp_node)
+{
+    unsigned memnode[ndevices];
+    unsigned offsetmemnode[ndevices];
+    memset(offsetmemnode, 0, ndevices*sizeof(unsigned));
+
+    unsigned device;
+    int nbworkers = 0;
+    for (device = 0; device < ndevices; device++)
+    {
+        memnode[device] = worker_set[device].workers[0].memory_node;
+        nbworkers += worker_set[device].nworkers;
+        if (device != 0)
+            offsetmemnode[device] += offsetmemnode[device-1];
+        if (device != ndevices -1)
+            offsetmemnode[device+1] += worker_set[device].nworkers;
+    }
+
+	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*nbworkers);
+
+    for (device = 0; device < ndevices; device++)
+    {
+        struct _starpu_worker *baseworker = &worker_set[device].workers[0];
+        struct _starpu_machine_config *config = baseworker->config;
+        unsigned baseworkerid = baseworker - config->workers;
+        _starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
+    }
 
-#ifdef STARPU_SIMGRID
-		if (!res)
-			starpu_pthread_wait_wait(&worker_set->workers[0].wait);
+	/*main loop*/
+	while (_starpu_machine_is_running())
+	{
+        for (device = 0; device < ndevices ; device++)
+        {
+            _starpu_src_common_worker_internal_work(&worker_set[device], mp_node[device], tasks+offsetmemnode[device], memnode[device]);
+            _starpu_src_common_switch_env(device, (device+1)%ndevices);
+        }
+    }
+	free(tasks);
+
+    for (device = 0; device < ndevices; device++)
+        _starpu_handle_all_pending_node_data_requests(memnode[device]);
+
+	/* In case there remains some memory that was automatically
+	 * allocated by StarPU, we release it now. Note that data
+	 * coherency is not maintained anymore at that point ! */
+    for (device = 0; device < ndevices; device++)
+        _starpu_free_all_automatically_allocated_buffers(memnode[device]);
+
+}
 #endif
 
-		/*if at least one worker have pop a task*/
-		if(res != 0)
-		{
-			unsigned i;
-			for(i=0; i<worker_set->nworkers; i++)
-			{
-				if(tasks[i] != NULL)
-				{
-					j = _starpu_get_job_associated_to_task(tasks[i]);
-					_starpu_set_local_worker_key(&worker_set->workers[i]);
-					res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
-					switch (res)
-					{
-						case 0:
-							/* The task task has been launched with no error */
-							break;
-						case -EAGAIN:
-							_STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
-							_starpu_push_task_to_workers(tasks[i]);
-							STARPU_ABORT();
-							continue;
-							break;
-						default:
-							STARPU_ASSERT(0);
-					}
-				}
-			}
-		}
+/* Function looping on the source node */
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
+		unsigned baseworkerid,
+		struct _starpu_mp_node * mp_node)
+{
+	unsigned memnode = worker_set->workers[0].memory_node;
+	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
+
+	_starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
+
+	/*main loop*/
+	while (_starpu_machine_is_running())
+	{
+        _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
 	}
 	free(tasks);
 

+ 6 - 1
src/drivers/mp_common/source_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012  INRIA
+ * Copyright (C) 2012, 2016  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -75,6 +75,11 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 			       unsigned baseworkerid, 
 			       struct _starpu_mp_node * node_set);
 
+#if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
+void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
+                 int ndevices,
+                 struct _starpu_mp_node ** mp_node);
+#endif
 
 #endif /* STARPU_USE_MP */
 

+ 76 - 37
src/drivers/mpi/driver_mpi_source.c

@@ -193,52 +193,91 @@ unsigned _starpu_mpi_src_get_device_count()
 
 void *_starpu_mpi_src_worker(void *arg)
 {
-    struct _starpu_worker_set *worker_set = arg;
-    /* As all workers of a set share common data, we just use the first
-     *       * one for intializing the following stuffs. */
-    struct _starpu_worker *baseworker = &worker_set->workers[0];
-    struct _starpu_machine_config *config = baseworker->config;
-    unsigned baseworkerid = baseworker - config->workers;
-    unsigned devid = baseworker->devid;
-    unsigned i;
-
-    /* unsigned memnode = baseworker->memory_node; */
-
-    _starpu_driver_start(baseworker, _STARPU_FUT_MPI_KEY, 0);
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+    struct _starpu_worker_set *worker_set_mpi = (struct _starpu_worker_set *) arg;
+    int nbsinknodes = _starpu_mpi_src_get_device_count();
+
+    int workersetnum;
+    for (workersetnum = 0; workersetnum < nbsinknodes; workersetnum++)
+    {
+        struct _starpu_worker_set * worker_set = &worker_set_mpi[workersetnum];
+#else
+        struct _starpu_worker_set *worker_set = arg;
+#endif
+
+        /* As all workers of a set share common data, we just use the first
+         *       * one for intializing the following stuffs. */
+        struct _starpu_worker *baseworker = &worker_set->workers[0];
+        struct _starpu_machine_config *config = baseworker->config;
+        unsigned baseworkerid = baseworker - config->workers;
+        unsigned devid = baseworker->devid;
+        unsigned i;
+
+        /* unsigned memnode = baseworker->memory_node; */
+
+        _starpu_driver_start(baseworker, _STARPU_FUT_MPI_KEY, 0);
 #ifdef STARPU_USE_FXT             
-    for (i = 1; i < worker_set->nworkers; i++)
-        _starpu_worker_start(&worker_set->workers[i], _STARPU_FUT_MPI_KEY, 0);
+        for (i = 1; i < worker_set->nworkers; i++)
+            _starpu_worker_start(&worker_set->workers[i], _STARPU_FUT_MPI_KEY, 0);
 #endif          
 
-    // Current task for a thread managing a worker set has no sense.
-    _starpu_set_current_task(NULL);
-
-    for (i = 0; i < config->topology.nmpicores[devid]; i++)
-    {
-        struct _starpu_worker *worker = &config->workers[baseworkerid+i];
-        snprintf(worker->name, sizeof(worker->name), "MPI_MS %d core %u", devid, i);
-        snprintf(worker->short_name, sizeof(worker->short_name), "MPI_MS %d.%u", devid, i);
-    }
-    {
-        char thread_name[16];
-        snprintf(thread_name, sizeof(thread_name), "MPI_MS %d", devid);
-        starpu_pthread_setname(thread_name);
-    }
+        // Current task for a thread managing a worker set has no sense.
+        _starpu_set_current_task(NULL);
+
+        for (i = 0; i < config->topology.nmpicores[devid]; i++)
+        {
+            struct _starpu_worker *worker = &config->workers[baseworkerid+i];
+            snprintf(worker->name, sizeof(worker->name), "MPI_MS %d core %u", devid, i);
+            snprintf(worker->short_name, sizeof(worker->short_name), "MPI_MS %d.%u", devid, i);
+        }
+
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+        {
+            char thread_name[16];
+            snprintf(thread_name, sizeof(thread_name), "MPI_MS");
+            starpu_pthread_setname(thread_name);
+        }
+#else
+        {
+            char thread_name[16];
+            snprintf(thread_name, sizeof(thread_name), "MPI_MS %d", devid);
+            starpu_pthread_setname(thread_name);
+        }
+#endif
+
+        for (i = 0; i < worker_set->nworkers; i++)
+        {
+            struct _starpu_worker *worker = &worker_set->workers[i];
+            _STARPU_TRACE_WORKER_INIT_END(worker->workerid);
+        }
+    
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+    }  /* for */
 
-    for (i = 0; i < worker_set->nworkers; i++)
+    /* set the worker zero for the main thread */
+    for (workersetnum = 0; workersetnum < nbsinknodes; workersetnum++)
     {
-        struct _starpu_worker *worker = &worker_set->workers[i];
-        _STARPU_TRACE_WORKER_INIT_END(worker->workerid);
+        struct _starpu_worker_set * worker_set = &worker_set_mpi[workersetnum];
+        struct _starpu_worker *baseworker = &worker_set->workers[0];
+#endif
+
+        /* tell the main thread that this one is ready */
+        STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+        baseworker->status = STATUS_UNKNOWN;
+        worker_set->set_is_initialized = 1;
+        STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
+
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
     }
+#endif
 
-    /* tell the main thread that this one is ready */
-    STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
-    baseworker->status = STATUS_UNKNOWN;
-    worker_set->set_is_initialized = 1;
-    STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
-    STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
 
+#ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
+    _starpu_src_common_workers_set(worker_set_mpi, nbsinknodes, mpi_ms_nodes);
+#else
     _starpu_src_common_worker(worker_set, baseworkerid, mpi_ms_nodes[devid]);
+#endif
 
     return NULL;