Nathalie Furmento лет назад: 6
Родитель
Сommit
4f7081ca91

+ 1 - 3
src/core/workers.c

@@ -921,14 +921,12 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
                 STARPU_PTHREAD_MUTEX_LOCK(&worker_set_zero->mutex);
                 while (!worker_set_zero->set_is_initialized)
                         STARPU_PTHREAD_COND_WAIT(&worker_set_zero->ready_cond,
-                                        &worker_set_zero->mutex);
+						 &worker_set_zero->mutex);
                 STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set_zero->mutex);
 
                 worker_set_zero->started = 1;
                 worker_set_zero->worker_thread = mpi_worker_set[0].worker_thread;
-
         }
-
 #endif
 
 	for (worker = 0; worker < nworkers; worker++)

+ 163 - 170
src/drivers/mp_common/mp_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2016,2017                           Inria
- * Copyright (C) 2013-2017                                CNRS
+ * Copyright (C) 2013-2017, 2019                                CNRS
  * Copyright (C) 2013,2015                                Université de Bordeaux
  * Copyright (C) 2013                                     Thibaut Lambert
  *
@@ -150,180 +150,177 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 	switch(node->kind)
 	{
 #ifdef STARPU_USE_MIC
-	case STARPU_NODE_MIC_SOURCE:
-	{
-		node->nb_mp_sinks = starpu_mic_worker_get_count();
-		node->devid = peer_id;
-
-		node->init = _starpu_mic_src_init;
-		node->launch_workers= NULL;
-		node->deinit = _starpu_mic_src_deinit;
-		node->report_error = _starpu_mic_src_report_scif_error;
-
-		node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
-		node->mp_send = _starpu_mic_common_send;
-		node->mp_recv = _starpu_mic_common_recv;
-		node->dt_send = _starpu_mic_common_dt_send;
-		node->dt_recv = _starpu_mic_common_dt_recv;
-
-		node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
-		node->lookup = NULL;
-		node->bind_thread = NULL;
-		node->execute = NULL;
-		node->allocate = NULL;
-		node->free = NULL;
-	}
-	break;
-
-	case STARPU_NODE_MIC_SINK:
-	{
-		node->devid = atoi(starpu_getenv("_STARPU_MIC_DEVID"));
-		node->nb_mp_sinks = atoi(starpu_getenv("_STARPU_MIC_NB"));
-
-		node->init = _starpu_mic_sink_init;
-		node->launch_workers = _starpu_mic_sink_launch_workers;
-		node->deinit = _starpu_mic_sink_deinit;
-		node->report_error = _starpu_mic_sink_report_error;
-
-		node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
-		node->mp_send = _starpu_mic_common_send;
-		node->mp_recv = _starpu_mic_common_recv;
-		node->dt_send = _starpu_mic_common_dt_send;
-		node->dt_recv = _starpu_mic_common_dt_recv;
-
-                node->dt_test = NULL; /* Not used now */
-
-		node->get_kernel_from_job = NULL;
-		node->lookup = _starpu_mic_sink_lookup;
-		node->bind_thread = _starpu_mic_sink_bind_thread;
-		node->execute = _starpu_sink_common_execute;
-		node->allocate = _starpu_mic_sink_allocate;
-		node->free = _starpu_mic_sink_free;
+		case STARPU_NODE_MIC_SOURCE:
+		{
+			node->nb_mp_sinks = starpu_mic_worker_get_count();
+			node->devid = peer_id;
+
+			node->init = _starpu_mic_src_init;
+			node->launch_workers= NULL;
+			node->deinit = _starpu_mic_src_deinit;
+			node->report_error = _starpu_mic_src_report_scif_error;
+
+			node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
+			node->mp_send = _starpu_mic_common_send;
+			node->mp_recv = _starpu_mic_common_recv;
+			node->dt_send = _starpu_mic_common_dt_send;
+			node->dt_recv = _starpu_mic_common_dt_recv;
+
+			node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
+			node->lookup = NULL;
+			node->bind_thread = NULL;
+			node->execute = NULL;
+			node->allocate = NULL;
+			node->free = NULL;
+		}
+		break;
 
-	}
-	break;
+		case STARPU_NODE_MIC_SINK:
+		{
+			node->devid = atoi(starpu_getenv("_STARPU_MIC_DEVID"));
+			node->nb_mp_sinks = atoi(starpu_getenv("_STARPU_MIC_NB"));
+
+			node->init = _starpu_mic_sink_init;
+			node->launch_workers = _starpu_mic_sink_launch_workers;
+			node->deinit = _starpu_mic_sink_deinit;
+			node->report_error = _starpu_mic_sink_report_error;
+
+			node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
+			node->mp_send = _starpu_mic_common_send;
+			node->mp_recv = _starpu_mic_common_recv;
+			node->dt_send = _starpu_mic_common_dt_send;
+			node->dt_recv = _starpu_mic_common_dt_recv;
+
+			node->dt_test = NULL; /* Not used now */
+
+			node->get_kernel_from_job = NULL;
+			node->lookup = _starpu_mic_sink_lookup;
+			node->bind_thread = _starpu_mic_sink_bind_thread;
+			node->execute = _starpu_sink_common_execute;
+			node->allocate = _starpu_mic_sink_allocate;
+			node->free = _starpu_mic_sink_free;
+		}
+		break;
 #endif /* STARPU_USE_MIC */
 
 #ifdef STARPU_USE_SCC
-	case STARPU_NODE_SCC_SOURCE:
-	{
-		node->init = _starpu_scc_src_init;
-		node->deinit = NULL;
-		node->report_error = _starpu_scc_common_report_rcce_error;
-
-		node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
-		node->mp_send = _starpu_scc_common_send;
-		node->mp_recv = _starpu_scc_common_recv;
-		node->dt_send = _starpu_scc_common_send;
-		node->dt_recv = _starpu_scc_common_recv;
-		node->dt_send_to_device = NULL;
-		node->dt_recv_from_device = NULL;
-
-		node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
-		node->lookup = NULL;
-		node->bind_thread = NULL;
-		node->execute = NULL;
-		node->allocate = NULL;
-		node->free = NULL;
-	}
-	break;
+		case STARPU_NODE_SCC_SOURCE:
+		{
+			node->init = _starpu_scc_src_init;
+			node->deinit = NULL;
+			node->report_error = _starpu_scc_common_report_rcce_error;
+
+			node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
+			node->mp_send = _starpu_scc_common_send;
+			node->mp_recv = _starpu_scc_common_recv;
+			node->dt_send = _starpu_scc_common_send;
+			node->dt_recv = _starpu_scc_common_recv;
+			node->dt_send_to_device = NULL;
+			node->dt_recv_from_device = NULL;
+
+			node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
+			node->lookup = NULL;
+			node->bind_thread = NULL;
+			node->execute = NULL;
+			node->allocate = NULL;
+			node->free = NULL;
+		}
+		break;
 
-	case STARPU_NODE_SCC_SINK:
-	{
-		node->init = _starpu_scc_sink_init;
-		node->launch_workers = _starpu_scc_sink_launch_workers;
-		node->deinit = _starpu_scc_sink_deinit;
-		node->report_error = _starpu_scc_common_report_rcce_error;
-
-		node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
-		node->mp_send = _starpu_scc_common_send;
-		node->mp_recv = _starpu_scc_common_recv;
-		node->dt_send = _starpu_scc_common_send;
-		node->dt_recv = _starpu_scc_common_recv;
-		node->dt_send_to_device = _starpu_scc_sink_send_to_device;
-		node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
-
-                node->dt_test = NULL /* not used now */
-
-		node->get_kernel_from_job = NULL;
-		node->lookup = _starpu_scc_sink_lookup;
-		node->bind_thread = _starpu_scc_sink_bind_thread;
-		node->execute = _starpu_scc_sink_execute;
-		node->allocate = _starpu_sink_common_allocate;
-		node->free = _starpu_sink_common_free;
-	}
-	break;
+		case STARPU_NODE_SCC_SINK:
+		{
+			node->init = _starpu_scc_sink_init;
+			node->launch_workers = _starpu_scc_sink_launch_workers;
+			node->deinit = _starpu_scc_sink_deinit;
+			node->report_error = _starpu_scc_common_report_rcce_error;
+
+			node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
+			node->mp_send = _starpu_scc_common_send;
+			node->mp_recv = _starpu_scc_common_recv;
+			node->dt_send = _starpu_scc_common_send;
+			node->dt_recv = _starpu_scc_common_recv;
+			node->dt_send_to_device = _starpu_scc_sink_send_to_device;
+			node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
+
+			node->dt_test = NULL /* not used now */
+
+				node->get_kernel_from_job = NULL;
+			node->lookup = _starpu_scc_sink_lookup;
+			node->bind_thread = _starpu_scc_sink_bind_thread;
+			node->execute = _starpu_scc_sink_execute;
+			node->allocate = _starpu_sink_common_allocate;
+			node->free = _starpu_sink_common_free;
+		}
+		break;
 #endif /* STARPU_USE_SCC */
 
 #ifdef STARPU_USE_MPI_MASTER_SLAVE
-	case STARPU_NODE_MPI_SOURCE:
-        {
-                /*
-                   node->nb_mp_sinks = 
-                   node->devid = 
-                   */
-                node->peer_id = (_starpu_mpi_common_get_src_node() <= peer_id ? peer_id+1 : peer_id);
-                node->mp_connection.mpi_remote_nodeid = node->peer_id;
-
-                node->init = _starpu_mpi_source_init;
-                node->launch_workers = NULL;
-                node->deinit = _starpu_mpi_source_deinit;
-                /*     node->report_error = */
-
-                node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
-                node->mp_send = _starpu_mpi_common_mp_send;
-                node->mp_recv = _starpu_mpi_common_mp_recv;
-                node->dt_send = _starpu_mpi_common_send;
-                node->dt_recv = _starpu_mpi_common_recv;
-                node->dt_send_to_device = _starpu_mpi_common_send_to_device;
-                node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
-
-                node->get_kernel_from_job = _starpu_mpi_ms_src_get_kernel_from_job;
-                node->lookup = NULL;
-                node->bind_thread = NULL;
-                node->execute = NULL;
-                node->allocate = NULL;
-                node->free = NULL;
-        }
-        break;
-
-        case STARPU_NODE_MPI_SINK:
-        {
-                /*
-                   node->nb_mp_sinks = 
-                   node->devid = 
-                   */
-                node->mp_connection.mpi_remote_nodeid = _starpu_mpi_common_get_src_node();
-
-                node->init = _starpu_mpi_sink_init;
-                node->launch_workers = _starpu_mpi_sink_launch_workers;
-                node->deinit = _starpu_mpi_sink_deinit;
-                /*    node->report_error =  */
-
-                node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
-                node->mp_send = _starpu_mpi_common_mp_send;
-                node->mp_recv = _starpu_mpi_common_mp_recv;
-                node->dt_send = _starpu_mpi_common_send;
-                node->dt_recv = _starpu_mpi_common_recv;
-                node->dt_send_to_device = _starpu_mpi_common_send_to_device;
-                node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
-
-                node->dt_test = _starpu_mpi_common_test_event;
-
-                node->get_kernel_from_job = NULL;
-                node->lookup = _starpu_mpi_sink_lookup;
-                node->bind_thread = _starpu_mpi_sink_bind_thread;
-                node->execute = _starpu_sink_common_execute;
-                node->allocate = _starpu_sink_common_allocate;
-                node->free = _starpu_sink_common_free;
-
-
-        }
+		case STARPU_NODE_MPI_SOURCE:
+		{
+			/*
+			  node->nb_mp_sinks =
+			  node->devid =
+			*/
+			node->peer_id = (_starpu_mpi_common_get_src_node() <= peer_id ? peer_id+1 : peer_id);
+			node->mp_connection.mpi_remote_nodeid = node->peer_id;
+
+			node->init = _starpu_mpi_source_init;
+			node->launch_workers = NULL;
+			node->deinit = _starpu_mpi_source_deinit;
+			/*     node->report_error = */
+
+			node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
+			node->mp_send = _starpu_mpi_common_mp_send;
+			node->mp_recv = _starpu_mpi_common_mp_recv;
+			node->dt_send = _starpu_mpi_common_send;
+			node->dt_recv = _starpu_mpi_common_recv;
+			node->dt_send_to_device = _starpu_mpi_common_send_to_device;
+			node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
+
+			node->get_kernel_from_job = _starpu_mpi_ms_src_get_kernel_from_job;
+			node->lookup = NULL;
+			node->bind_thread = NULL;
+			node->execute = NULL;
+			node->allocate = NULL;
+			node->free = NULL;
+		}
+		break;
+
+	        case STARPU_NODE_MPI_SINK:
+		{
+			/*
+			  node->nb_mp_sinks =
+			  node->devid =
+			*/
+			node->mp_connection.mpi_remote_nodeid = _starpu_mpi_common_get_src_node();
+
+			node->init = _starpu_mpi_sink_init;
+			node->launch_workers = _starpu_mpi_sink_launch_workers;
+			node->deinit = _starpu_mpi_sink_deinit;
+			/*    node->report_error =  */
+
+			node->mp_recv_is_ready = _starpu_mpi_common_recv_is_ready;
+			node->mp_send = _starpu_mpi_common_mp_send;
+			node->mp_recv = _starpu_mpi_common_mp_recv;
+			node->dt_send = _starpu_mpi_common_send;
+			node->dt_recv = _starpu_mpi_common_recv;
+			node->dt_send_to_device = _starpu_mpi_common_send_to_device;
+			node->dt_recv_from_device = _starpu_mpi_common_recv_from_device;
+
+			node->dt_test = _starpu_mpi_common_test_event;
+
+			node->get_kernel_from_job = NULL;
+			node->lookup = _starpu_mpi_sink_lookup;
+			node->bind_thread = _starpu_mpi_sink_bind_thread;
+			node->execute = _starpu_sink_common_execute;
+			node->allocate = _starpu_sink_common_allocate;
+			node->free = _starpu_sink_common_free;
+		}
 		break;
 #endif /* STARPU_USE_MPI_MASTER_SLAVE */
 
-	default:
-		STARPU_ASSERT(0);
+		default:
+			STARPU_ASSERT(0);
 	}
 
 	/* Let's allocate the buffer, we want it to be big enough to contain
@@ -391,14 +388,11 @@ void _starpu_mp_common_node_destroy(struct _starpu_mp_node *node)
 	}
 
 	free(node->buffer);
-
 	free(node);
 }
 
 /* Send COMMAND to RECIPIENT, along with ARG if ARG_SIZE is non-zero */
-void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
-				    const enum _starpu_mp_command command,
-				    void *arg, int arg_size)
+void _starpu_mp_common_send_command(const struct _starpu_mp_node *node, const enum _starpu_mp_command command, void *arg, int arg_size)
 {
 	STARPU_ASSERT_MSG(arg_size <= BUFFER_SIZE, "Too much data (%d) for the static MIC buffer (%d), increase BUFFER_SIZE perhaps?", arg_size, BUFFER_SIZE);
 
@@ -424,8 +418,7 @@ void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
  * However, the data pointed by arg shouldn't be relied on after a new call to
  * STARPU_MP_COMMON_RECV_COMMAND as it might corrupt it.
  */
-enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node,
-						       void **arg, int *arg_size)
+enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node, void **arg, int *arg_size)
 {
 	enum _starpu_mp_command command;
 

+ 4 - 5
src/drivers/mp_common/mp_common.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2016,2017                           Inria
- * Copyright (C) 2015-2017                                CNRS
+ * Copyright (C) 2015-2017, 2019                          CNRS
  * Copyright (C) 2013-2016                                Université de Bordeaux
  * Copyright (C) 2013                                     Thibaut Lambert
  *
@@ -138,7 +138,7 @@ LIST_TYPE(mp_message,
 		int size;
 	 );
 
-struct mp_task 
+struct mp_task
 {
 	void (*kernel)(void **, void *);
 	void **interfaces;
@@ -221,7 +221,7 @@ struct _starpu_mp_node
         struct _starpu_mp_event_list event_list;
 
         /* */
-        starpu_pthread_barrier_t init_completed_barrier; 
+        starpu_pthread_barrier_t init_completed_barrier;
 
         /* table to store pointer of the thread workers*/
         void* thread_table;
@@ -275,8 +275,7 @@ void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
 				    const enum _starpu_mp_command command,
 				    void *arg, int arg_size);
 
-enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node,
-						       void **arg, int *arg_size);
+enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node, void **arg, int *arg_size);
 
 
 #endif /* STARPU_USE_MP */

+ 48 - 97
src/drivers/mp_common/sink_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2016,2017                           Inria
- * Copyright (C) 2013-2017                                CNRS
+ * Copyright (C) 2013-2017,2019                           CNRS
  * Copyright (C) 2013-2017                                Université de Bordeaux
  * Copyright (C) 2013                                     Thibaut Lambert
  *
@@ -59,14 +59,12 @@ static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
 static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
 {
 	// Process packet received from `_starpu_src_common_sink_cores'.
-     	_starpu_mp_common_send_command (node, STARPU_MP_COMMAND_ANSWER_SINK_NBCORES,
-					&node->nb_cores, sizeof (int));
+     	_starpu_mp_common_send_command (node, STARPU_MP_COMMAND_ANSWER_SINK_NBCORES, &node->nb_cores, sizeof (int));
 }
 
 /* Send to host the address of the function given in parameter
  */
-static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
-				       char *func_name)
+static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node, char *func_name)
 {
 	void (*func)(void);
 	func = node->lookup(node,func_name);
@@ -76,17 +74,14 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 	/* If we couldn't find the function, let's send an error to the host.
 	 * The user probably made a mistake in the name */
 	if (func)
-		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ANSWER_LOOKUP,
-					       &func, sizeof(func));
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ANSWER_LOOKUP, &func, sizeof(func));
 	else
-		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ERROR_LOOKUP,
-					       NULL, 0);
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_ERROR_LOOKUP, NULL, 0);
 }
 
 /* Allocate a memory space and send the address of this space to the host
  */
-void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
-				  void *arg, int arg_size)
+void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(size_t));
 
@@ -96,23 +91,19 @@ void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
 	/* If the allocation fail, let's send an error to the host.
 	 */
 	if (addr)
-		_starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ANSWER_ALLOCATE,
-					       &addr, sizeof(addr));
+		_starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ANSWER_ALLOCATE, &addr, sizeof(addr));
 	else
-		_starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ERROR_ALLOCATE,
-					       NULL, 0);
+		_starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_ERROR_ALLOCATE, NULL, 0);
 }
 
-void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED,
-			      void *arg, int arg_size)
+void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(void *));
 
 	free(*(void **)(arg));
 }
 
-static void _starpu_sink_common_copy_from_host_sync(const struct _starpu_mp_node *mp_node,
-					       void *arg, int arg_size)
+static void _starpu_sink_common_copy_from_host_sync(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
@@ -121,9 +112,7 @@ static void _starpu_sink_common_copy_from_host_sync(const struct _starpu_mp_node
         mp_node->dt_recv(mp_node, cmd->addr, cmd->size, NULL);
 }
 
-
-static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_node,
-					       void *arg, int arg_size)
+static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
@@ -148,9 +137,7 @@ static void _starpu_sink_common_copy_from_host_async(struct _starpu_mp_node *mp_
         _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-
-static void _starpu_sink_common_copy_to_host_sync(const struct _starpu_mp_node *mp_node,
-					     void *arg, int arg_size)
+static void _starpu_sink_common_copy_to_host_sync(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
@@ -165,15 +152,13 @@ static void _starpu_sink_common_copy_to_host_sync(const struct _starpu_mp_node *
         mp_node->dt_send(mp_node, addr, size, NULL);
 }
 
-
-static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_node,
-					     void *arg, int arg_size)
+static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
 	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
 
-        /* For asynchronous transfers, we need to say dt_send that we are in async mode 
+        /* For asynchronous transfers, we need to say dt_send that we are in async mode
          * but we don't push event on list because we don't need to know if it's finished
          */
         struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
@@ -194,9 +179,7 @@ static void _starpu_sink_common_copy_to_host_async(struct _starpu_mp_node *mp_no
         _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-
-static void _starpu_sink_common_copy_from_sink_sync(const struct _starpu_mp_node *mp_node,
-					       void *arg, int arg_size)
+static void _starpu_sink_common_copy_from_sink_sync(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
@@ -206,9 +189,7 @@ static void _starpu_sink_common_copy_from_sink_sync(const struct _starpu_mp_node
         _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_TRANSFER_COMPLETE, NULL, 0);
 }
 
-
-static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_node,
-					       void *arg, int arg_size)
+static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
@@ -234,9 +215,7 @@ static void _starpu_sink_common_copy_from_sink_async(struct _starpu_mp_node *mp_
         _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-
-static void _starpu_sink_common_copy_to_sink_sync(const struct _starpu_mp_node *mp_node,
-					     void *arg, int arg_size)
+static void _starpu_sink_common_copy_to_sink_sync(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
@@ -245,15 +224,13 @@ static void _starpu_sink_common_copy_to_sink_sync(const struct _starpu_mp_node *
         mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size, NULL);
 }
 
-
-static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_node,
-					     void *arg, int arg_size)
+static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_node, void *arg, int arg_size)
 {
 	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
 	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
 
-        /* For asynchronous transfers, we need to say dt_send that we are in async mode 
+        /* For asynchronous transfers, we need to say dt_send that we are in async mode
          * but we don't push event on list because we don't need to know if it's finished
          */
         struct _starpu_mp_event * sink_event = _starpu_mp_event_new();
@@ -275,7 +252,6 @@ static void _starpu_sink_common_copy_to_sink_async(struct _starpu_mp_node *mp_no
         _starpu_mp_event_list_push_back(&mp_node->event_list, sink_event);
 }
 
-
 /* Receive workers and combined workers and store them into the struct config
  */
 static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void *arg, int arg_size)
@@ -300,7 +276,6 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
 	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	config->topology.nworkers = *(int *)arg_ptr;
 
-
 	/* Retrieve workers */
 	struct _starpu_worker * workers = &config->workers[baseworkerid];
 	node->dt_recv(node,workers,worker_size, NULL);
@@ -336,8 +311,6 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
 	STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
 }
 
-
-
 /* Function looping on the sink, waiting for tasks to execute.
  * If the caller is the host, don't do anything.
  */
@@ -351,9 +324,7 @@ void _starpu_sink_common_worker(void)
 	enum _starpu_mp_node_kind node_kind = _starpu_sink_common_get_kind();
 
 	if (node_kind == STARPU_NODE_INVALID_KIND)
-		_STARPU_ERROR("No valid sink kind retrieved, use the"
-			      "STARPU_SINK environment variable to specify"
-			      "this\n");
+		_STARPU_ERROR("No valid sink kind retrieved, use the STARPU_SINK environment variable to specify this\n");
 
 	/* Create and initialize the node */
 	node = _starpu_mp_common_node_create(node_kind, -1);
@@ -450,23 +421,21 @@ void _starpu_sink_common_worker(void)
 			STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
 		}
 
-        if(!_starpu_mp_event_list_empty(&node->event_list))
-        {
-            struct _starpu_mp_event * sink_event = _starpu_mp_event_list_pop_front(&node->event_list);
-            if (node->dt_test(&sink_event->event))
-            {
-                /* send ACK to host */
-                _starpu_mp_common_send_command(node, sink_event->answer_cmd , &sink_event->remote_event, sizeof(sink_event->remote_event));
-                _starpu_mp_event_delete(sink_event);
-            }
-            else
-            {
-                /* try later */
-                 _starpu_mp_event_list_push_back(&node->event_list, sink_event);
-            }
-            
-
-        }
+		if(!_starpu_mp_event_list_empty(&node->event_list))
+		{
+			struct _starpu_mp_event * sink_event = _starpu_mp_event_list_pop_front(&node->event_list);
+			if (node->dt_test(&sink_event->event))
+			{
+				/* send ACK to host */
+				_starpu_mp_common_send_command(node, sink_event->answer_cmd , &sink_event->remote_event, sizeof(sink_event->remote_event));
+				_starpu_mp_event_delete(sink_event);
+			}
+			else
+			{
+				/* try later */
+				_starpu_mp_event_list_push_back(&node->event_list, sink_event);
+			}
+		}
 	}
 
 	STARPU_PTHREAD_KEY_DELETE(worker_key);
@@ -481,7 +450,6 @@ void _starpu_sink_common_worker(void)
 	exit(0);
 }
 
-
 /* Search for the mp_barrier correspondind to the specified combined worker
  * and create it if it doesn't exist
  */
@@ -491,8 +459,8 @@ static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_nod
 	STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
 	/* Search if the barrier already exist */
 	for(b = mp_barrier_list_begin(&node->barrier_list);
-			b != mp_barrier_list_end(&node->barrier_list) && b->id != cb_workerid;
-			b = mp_barrier_list_next(b));
+	    b != mp_barrier_list_end(&node->barrier_list) && b->id != cb_workerid;
+	    b = mp_barrier_list_next(b));
 
 	/* If we found the barrier */
 	if(b != NULL)
@@ -514,7 +482,6 @@ static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_nod
 	}
 }
 
-
 /* Erase for the mp_barrier correspondind to the specified combined worker
 */
 static void _starpu_sink_common_erase_barrier(struct _starpu_mp_node * node, struct mp_barrier *barrier)
@@ -531,8 +498,8 @@ static void _starpu_sink_common_append_message(struct _starpu_mp_node *node, str
 	STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
 	mp_message_list_push_front(&node->message_queue,message);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
-
 }
+
 /* Append to the message list a "STARPU_PRE_EXECUTION" message
  */
 static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *node, struct mp_task *task)
@@ -544,10 +511,8 @@ static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *no
 	*(int *) message->buffer = task->combined_workerid;
 	message->size = sizeof(int);
 
-
 	/* Append the message to the queue */
 	_starpu_sink_common_append_message(node, message);
-
 }
 
 /* Append to the message list a "STARPU_EXECUTION_COMPLETED" message
@@ -568,7 +533,6 @@ static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_no
 	_starpu_sink_common_append_message(node, message);
 }
 
-
 /* Bind the thread which is running on the specified core to the combined worker */
 static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *node, int coreid, struct _starpu_combined_worker * combined_worker)
 {
@@ -580,8 +544,6 @@ static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *
 	node->bind_thread(node, coreid, bind_set, combined_worker->worker_size);
 }
 
-
-
 /* Get the current rank of the worker in the combined worker
  */
 static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_combined_worker * combined_worker)
@@ -606,7 +568,6 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
 	else
 		task = node->run_table[coreid];
 
-
 	/* If it's a parallel task */
 	if(task->is_parallel_task)
 	{
@@ -619,7 +580,6 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
 		/* Synchronize with others threads of the combined worker*/
 		STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
 
-
 		/* The first thread of the combined worker */
 		if(worker->current_rank == 0)
 		{
@@ -641,6 +601,7 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
 		worker->combined_workerid = 0;
 		worker->worker_size = 1;
 	}
+
 	if(task->type != STARPU_FORKJOIN || worker->current_rank == 0)
 	{
 		if (_starpu_get_disable_kernels() <= 0)
@@ -686,10 +647,8 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
     	if (task->cl_arg != NULL)
         	free(task->cl_arg);
 	free(task);
-
 }
 
-
 /* The main function executed by the thread
  * thread_arg is a structure containing the information needed by the thread
  */
@@ -721,7 +680,6 @@ void* _starpu_sink_thread(void * thread_arg)
 	starpu_pthread_exit(NULL);
 }
 
-
 /* Add the task to the specific thread and wake him up
 */
 static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
@@ -736,8 +694,6 @@ static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, str
 	sem_post(&node->sem_run_table[task->coreid]);
 }
 
-
-
 /* Receive paquet from _starpu_src_common_execute_kernel in the form below :
  * [Function pointer on sink, number of interfaces, interfaces
  * (union _starpu_interface), cl_arg]
@@ -745,8 +701,7 @@ static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, str
  * addresses of the received interfaces
  */
 
-void _starpu_sink_common_execute(struct _starpu_mp_node *node,
-		void *arg, int arg_size)
+void _starpu_sink_common_execute(struct _starpu_mp_node *node, void *arg, int arg_size)
 {
 	unsigned i;
 
@@ -800,25 +755,21 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
 
 	/* Was cl_arg sent ? */
 	if (arg_size > arg_ptr - (uintptr_t) arg)
-    {
-        /* Copy cl_arg to prevent overwriting by an other task */
-        unsigned cl_arg_size = arg_size - (arg_ptr - (uintptr_t) arg);
-        _STARPU_MALLOC(task->cl_arg, cl_arg_size);
-        memcpy(task->cl_arg, (void *) arg_ptr, cl_arg_size);
-    }
+	{
+		/* Copy cl_arg to prevent overwriting by an other task */
+		unsigned cl_arg_size = arg_size - (arg_ptr - (uintptr_t) arg);
+		_STARPU_MALLOC(task->cl_arg, cl_arg_size);
+		memcpy(task->cl_arg, (void *) arg_ptr, cl_arg_size);
+	}
 	else
 		task->cl_arg = NULL;
 
-
 	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
 	if (task->detached)
-		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED,
-				NULL, 0);
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED, NULL, 0);
 	else
-		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_SUBMITTED,
-				NULL, 0);
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_SUBMITTED, NULL, 0);
 
 	//_STARPU_DEBUG("executing the task %p\n", task->kernel);
 	_starpu_sink_common_execute_thread(node, task);
-
 }

+ 65 - 99
src/drivers/mp_common/source_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2016,2017                           Inria
- * Copyright (C) 2013-2017                                CNRS
+ * Copyright (C) 2013-2017,2019                           CNRS
  * Copyright (C) 2013-2015,2017                           Université de Bordeaux
  * Copyright (C) 2013                                     Thibaut Lambert
  *
@@ -75,9 +75,7 @@ static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starp
 	/* Finalize the execution */
 	if(count == 0)
 	{
-
-		_starpu_driver_update_job_feedback(j, worker, &worker->perf_arch,
-				profiling);
+		_starpu_driver_update_job_feedback(j, worker, &worker->perf_arch, profiling);
 
 		_starpu_push_task_output (j);
 
@@ -86,7 +84,6 @@ static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starp
 	return 0;
 }
 
-
 /* Complete the execution of the job */
 static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node, struct _starpu_worker_set *workerset, void * arg, int arg_size, int stored)
 {
@@ -138,42 +135,46 @@ static void _starpu_src_common_pre_exec(struct _starpu_mp_node *node, void * arg
  * return 0 if the message has not been handle (it's certainly mean that it's a synchronous message)
  * return 1 if the message has been handle
  */
-static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
-		void * arg, int arg_size,
-		enum _starpu_mp_command answer, int stored)
+static int _starpu_src_common_handle_async(struct _starpu_mp_node *node, void * arg, int arg_size, enum _starpu_mp_command answer, int stored)
 {
         struct _starpu_worker_set * worker_set = NULL;
         switch(answer)
         {
                 case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
+		{
                         worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
                         _starpu_src_common_process_completed_job(node, worker_set, arg, arg_size, stored);
                         break;
+		}
                 case STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED:
-			_STARPU_ERROR("Detached execution completed should not arrive here... \n"); 
+		{
+			_STARPU_ERROR("Detached execution completed should not arrive here... \n");
                         break;
+		}
                 case STARPU_MP_COMMAND_PRE_EXECUTION:
+		{
                         _starpu_src_common_pre_exec(node, arg,arg_size, stored);
                         break;
+		}
                 case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
                 case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
-                        {
-                                struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
-                                event->starpu_mp_common_finished_receiver--;
-                                if (!stored)
-                                        STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
-                                break;
-                        }
+		{
+			struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+			event->starpu_mp_common_finished_receiver--;
+			if (!stored)
+				STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
+			break;
+		}
                 case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
                 case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
-                        {
-                                struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
-                                event->starpu_mp_common_finished_sender--;
-                                if (!stored)
-                                        STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
-                                break;
-                        }
-                default:
+		{
+			struct _starpu_async_channel * event = *((struct _starpu_async_channel **) arg);
+			event->starpu_mp_common_finished_sender--;
+			if (!stored)
+				STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
+			break;
+		}
+		default:
                         return 0;
                         break;
         }
@@ -194,8 +195,7 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
 		stopped_progress = 1;
 		_STARPU_TRACE_END_PROGRESS(mp_node_memory_node(node));
                 STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
-		_starpu_src_common_handle_async(node, message->buffer,
-				message->size, message->type, 1);
+		_starpu_src_common_handle_async(node, message->buffer, message->size, message->type, 1);
 		free(message->buffer);
 		mp_message_delete(message);
                 /* Take it again */
@@ -209,8 +209,7 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
 /* Store a message if is asynchronous
  * return 1 if the message has been stored
  * return 0 if the message is unknown or synchrone */
-int _starpu_src_common_store_message(struct _starpu_mp_node *node,
-				     void * arg, int arg_size, enum _starpu_mp_command answer)
+int _starpu_src_common_store_message(struct _starpu_mp_node *node, void * arg, int arg_size, enum _starpu_mp_command answer)
 {
 	switch(answer)
 	{
@@ -250,8 +249,7 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
 }
 
 /* Store all asynchronous messages and return when a synchronous message is received */
-static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
-		void ** arg, int* arg_size)
+static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, void ** arg, int* arg_size)
 {
 	enum _starpu_mp_command answer;
 	int is_sync = 0;
@@ -295,10 +293,9 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
 			coreid = *(int *) *arg;
 			if(devid == coreid)
 				completed = 1;
-			else
-				if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
-					/* We receive a unknown or asynchronous message  */
-					STARPU_ASSERT(0);
+			else if(!_starpu_src_common_store_message(node, *arg, *arg_size, answer))
+				/* We receive a unknown or asynchronous message  */
+				STARPU_ASSERT(0);
 		}
 		else
 		{
@@ -310,11 +307,9 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
 	return answer;
 }
 
-
 /* Send a request to the sink NODE for the number of cores on it. */
 int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf)
 {
-
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size = sizeof (int);
@@ -339,8 +334,7 @@ int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf)
  * In case of success, it returns 0 and FUNC_PTR contains the pointer ;
  * else it returns -ESPIPE if the function was not found.
  */
-int _starpu_src_common_lookup(struct _starpu_mp_node *node,
-		void (**func_ptr)(void), const char *func_name)
+int _starpu_src_common_lookup(struct _starpu_mp_node *node, void (**func_ptr)(void), const char *func_name)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
@@ -355,9 +349,7 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 	_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_LOOKUP, (void *) func_name,
 			arg_size);
 
-	answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
-			&arg_size);
-
+	answer = _starpu_src_common_wait_command_sync(node, (void **) &arg, &arg_size);
 
 	if (answer == STARPU_MP_COMMAND_ERROR_LOOKUP)
 	{
@@ -391,13 +383,13 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  * Data interfaces in task are send to the sink.
  */
 int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
-		void (*kernel)(void), unsigned coreid,
-		enum starpu_codelet_type type,
-		int is_parallel_task, int cb_workerid,
-		starpu_data_handle_t *handles,
-		void **interfaces,
-		unsigned nb_interfaces,
-		void *cl_arg, size_t cl_arg_size, int detached)
+				      void (*kernel)(void), unsigned coreid,
+				      enum starpu_codelet_type type,
+				      int is_parallel_task, int cb_workerid,
+				      starpu_data_handle_t *handles,
+				      void **interfaces,
+				      unsigned nb_interfaces,
+				      void *cl_arg, size_t cl_arg_size, int detached)
 {
 	void *buffer, *arg =NULL;
 	uintptr_t buffer_ptr;
@@ -460,8 +452,7 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
 	{
 		starpu_data_handle_t handle = handles[i];
 
-		memcpy ((void*) buffer_ptr, interfaces[i],
-				handle->ops->interface_size);
+		memcpy ((void*) buffer_ptr, interfaces[i], handle->ops->interface_size);
 		/* The sink side has no mean to get the type of each
 		 * interface, we use a union to make it generic and permit the
 		 * sink to go through the array */
@@ -498,11 +489,8 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
 	return 0;
 }
 
-
 /* Get the information and call the function to send to the sink a message to execute the task*/
-static int _starpu_src_common_execute(struct _starpu_job *j,
-		struct _starpu_worker *worker,
-		struct _starpu_mp_node * node)
+static int _starpu_src_common_execute(struct _starpu_job *j, struct _starpu_worker *worker, struct _starpu_mp_node * node)
 {
 	STARPU_ASSERT(j);
 	struct starpu_task *task = j->task;
@@ -518,24 +506,20 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
 	//_STARPU_DEBUG("\nworkerid:%d, rank:%d, type:%d,	cb_workerid:%d, task_size:%d\n\n",worker->devid,worker->current_rank,task->cl->type,j->combined_workerid,j->task_size);
 
 	_starpu_src_common_execute_kernel(node, kernel, worker->subworkerid, task->cl->type,
-			(j->task_size > 1),
-			j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
-			_STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
-			task->cl_arg, task->cl_arg_size, 0);
-
-
+					  (j->task_size > 1),
+					  j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
+					  _STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
+					  task->cl_arg, task->cl_arg_size, 0);
 	return 0;
 }
 
-
 /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  * the sink.
  * In case of success, it returns 0 and *ADDR contains the address of the
  * allocated area ;
  * else it returns 1 if the allocation fail.
  */
-int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
-		void **addr, size_t size)
+int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node, void **addr, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
@@ -554,9 +538,8 @@ int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
                 return 1;
         }
 
-	STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_ALLOCATE &&
-			arg_size == sizeof(*addr));
-    
+	STARPU_ASSERT(answer == STARPU_MP_COMMAND_ANSWER_ALLOCATE && arg_size == sizeof(*addr));
+
 	memcpy(addr, arg, arg_size);
 
         STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
@@ -567,8 +550,7 @@ int _starpu_src_common_allocate(struct _starpu_mp_node *mp_node,
 /* Send a request to the sink linked to the MP_NODE to deallocate the memory
  * area pointed by ADDR.
  */
-void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
-		void *addr)
+void _starpu_src_common_free(struct _starpu_mp_node *mp_node, void *addr)
 {
         STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
         _starpu_mp_common_send_command(mp_node, STARPU_MP_COMMAND_FREE, &addr, sizeof(addr));
@@ -578,8 +560,7 @@ void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
 /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with a
  * synchronous mode.
  */
-int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
-		void *src, void *dst, size_t size)
+int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size)
 {
         struct _starpu_mp_transfer_command cmd = {size, dst, NULL};
 
@@ -597,8 +578,7 @@ int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
 /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE with an
  * asynchronous mode.
  */
-int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
-		void *src, void *dst, size_t size, void * event)
+int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size, void * event)
 {
         struct _starpu_mp_transfer_command cmd = {size, dst, event};
 
@@ -622,8 +602,7 @@ int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
 /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  * with a synchronous mode.
  */
-int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
-		void *src, void *dst, size_t size)
+int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size)
 {
         enum _starpu_mp_command answer;
         void *arg;
@@ -648,8 +627,7 @@ int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
 /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST
  * with an asynchronous mode.
  */
-int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
-		void *src, void *dst, size_t size, void * event)
+int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size, void * event)
 {
         struct _starpu_mp_transfer_command cmd = {size, src, event};
 
@@ -674,8 +652,7 @@ int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
  * to the sink linked to DST_NODE. The latter store them in DST with a synchronous
  * mode.
  */
-int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
-		struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
+int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node, struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
 {
         enum _starpu_mp_command answer;
         void *arg;
@@ -723,8 +700,7 @@ int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
  * to the sink linked to DST_NODE. The latter store them in DST with an asynchronous
  * mode.
  */
-int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
-		struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
+int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node, struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void * event)
 {
         struct _starpu_mp_transfer_command_to_device cmd = {dst_node->peer_id, size, src, event};
 
@@ -744,8 +720,8 @@ int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
          * to test is they are finished
          */
         struct _starpu_async_channel * async_channel = event;
-        async_channel->polling_node_sender = src_node; 
-        async_channel->polling_node_receiver = dst_node; 
+        async_channel->polling_node_sender = src_node;
+        async_channel->polling_node_receiver = dst_node;
         /* Increase number of ack waited */
         async_channel->starpu_mp_common_finished_receiver++;
         async_channel->starpu_mp_common_finished_sender++;
@@ -770,8 +746,7 @@ int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
 /* 5 functions to determine the executable to run on the device (MIC, SCC,
  * MPI).
  */
-static void _starpu_src_common_cat_3(char *final, const size_t len, const char *first,
-				     const char *second, const char *third)
+static void _starpu_src_common_cat_3(char *final, const size_t len, const char *first, const char *second, const char *third)
 {
 	snprintf(final, len, "%s%s%s", first, second, third);
 }
@@ -876,7 +851,6 @@ int _starpu_src_common_locate_file(char *located_file_name, size_t len,
 	return 1;
 }
 
-
 #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
 void _starpu_src_common_init_switch_env(unsigned this)
 {
@@ -899,18 +873,16 @@ static void _starpu_src_common_switch_env(unsigned old, unsigned new)
         save_thread_env[old].current_omp_task = STARPU_PTHREAD_GETSPECIFIC(omp_task_key);
 #endif
 
-
         _starpu_set_current_task(save_thread_env[new].current_task);
         STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, save_thread_env[new].current_worker);
         STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, save_thread_env[new].current_worker_set);
 #ifdef STARPU_OPENMP
         STARPU_PTHREAD_SETSPECIFIC(omp_thread_key, save_thread_env[new].current_omp_thread);
-        STARPU_PTHREAD_SETSPECIFIC(omp_task_key, save_thread_env[new].current_omp_task); 
+        STARPU_PTHREAD_SETSPECIFIC(omp_task_key, save_thread_env[new].current_omp_task);
 #endif
 }
 #endif
 
-
 /* Send workers to the sink node
  */
 static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
@@ -928,8 +900,7 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
         STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
 
 	/* tell the sink node that we will send him all workers */
-	_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_SYNC_WORKERS,
-			&msg, sizeof(msg));
+	_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_SYNC_WORKERS, &msg, sizeof(msg));
 
 	/* Send all worker to the sink node */
 	node->dt_send(node,&config->workers[baseworkerid],worker_size, NULL);
@@ -938,7 +909,6 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
 	node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
 
         STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
-
 }
 
 static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
@@ -1019,7 +989,7 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
                 starpu_pthread_wait_wait(&worker_set->workers[0].wait);
 #endif
 
-    /*if at least one worker have pop a task*/
+	/*if at least one worker have pop a task*/
 	if(res != 0)
 	{
 		for(i=0; i<worker_set->nworkers; i++)
@@ -1040,11 +1010,9 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
 	}
 }
 
-
 #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
 /* Function looping on the source node */
-void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
-        int ndevices, struct _starpu_mp_node ** mp_node)
+void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set, int ndevices, struct _starpu_mp_node ** mp_node)
 {
         unsigned memnode[ndevices];
         unsigned offsetmemnode[ndevices];
@@ -1101,9 +1069,7 @@ void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
 #endif
 
 /* Function looping on the source node */
-void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
-		unsigned baseworkerid,
-		struct _starpu_mp_node * mp_node)
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, unsigned baseworkerid, struct _starpu_mp_node * mp_node)
 {
         unsigned memnode = worker_set->workers[0].memory_node;
         struct starpu_task **tasks;
@@ -1119,6 +1085,7 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
                 _starpu_src_common_worker_internal_work(worker_set, mp_node, tasks, memnode);
         }
         free(tasks);
+
         _STARPU_TRACE_END_PROGRESS(memnode);
 
         _starpu_handle_all_pending_node_data_requests(memnode);
@@ -1127,5 +1094,4 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
          * allocated by StarPU, we release it now. Note that data
          * coherency is not maintained anymore at that point ! */
         _starpu_free_all_automatically_allocated_buffers(memnode);
-
 }

+ 15 - 35
src/drivers/mp_common/source_common.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2012,2016,2017                           Inria
- * Copyright (C) 2015,2017                                CNRS
+ * Copyright (C) 2015,2017,2019                           CNRS
  * Copyright (C) 2013                                     Université de Bordeaux
  * Copyright (C) 2013                                     Thibaut Lambert
  *
@@ -17,35 +17,27 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
-
 #ifndef __SOURCE_COMMON_H__
 #define __SOURCE_COMMON_H__
 
-
 #ifdef STARPU_USE_MP
 
 #include <core/sched_policy.h>
 #include <core/task.h>
 #include <drivers/mp_common/mp_common.h>
 
-
-enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, 
-							     void ** arg, int* arg_size);
-int _starpu_src_common_store_message(struct _starpu_mp_node *node, 
-		void * arg, int arg_size, enum _starpu_mp_command answer);
+enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, void ** arg, int* arg_size);
+int _starpu_src_common_store_message(struct _starpu_mp_node *node, void * arg, int arg_size, enum _starpu_mp_command answer);
 
 enum _starpu_mp_command _starpu_src_common_wait_completed_execution(struct _starpu_mp_node *node, int devid, void **arg, int * arg_size);
 
-int _starpu_src_common_sink_nbcores (struct _starpu_mp_node *node, int *buf);
+int _starpu_src_common_sink_nbcores(struct _starpu_mp_node *node, int *buf);
 
-int _starpu_src_common_lookup(const struct _starpu_mp_node *node,
-			      void (**func_ptr)(void), const char *func_name);
+int _starpu_src_common_lookup(const struct _starpu_mp_node *node, void (**func_ptr)(void), const char *func_name);
 
-int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
-				void **addr, size_t size);
+int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node, void **addr, size_t size);
 
-void _starpu_src_common_free(struct _starpu_mp_node *mp_node,
-			     void *addr);
+void _starpu_src_common_free(struct _starpu_mp_node *mp_node, void *addr);
 
 int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 				      void (*kernel)(void), unsigned coreid,
@@ -56,42 +48,30 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 				      unsigned nb_interfaces,
 				      void *cl_arg, size_t cl_arg_size, int detached);
 
+int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size);
 
-int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size);
+int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size);
 
-int _starpu_src_common_copy_sink_to_host_sync(struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size);
+int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node, struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size);
 
-int _starpu_src_common_copy_sink_to_sink_sync(struct _starpu_mp_node *src_node,
-					 struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size);
+int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size, void *event);
 
-int _starpu_src_common_copy_host_to_sink_async(struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size, void *event);
+int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node, void *src, void *dst, size_t size, void *event);
 
-int _starpu_src_common_copy_sink_to_host_async(struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size, void *event);
-
-int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node,
-					 struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void *event);
+int _starpu_src_common_copy_sink_to_sink_async(struct _starpu_mp_node *src_node, struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size, void *event);
 
 int _starpu_src_common_locate_file(char *located_file_name, size_t len,
 				   const char *env_file_name, const char *env_mic_path,
 				   const char *config_file_name, const char *actual_file_name,
 				   const char **suffixes);
 
-void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
-			       unsigned baseworkerid, 
-			       struct _starpu_mp_node * node_set);
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, unsigned baseworkerid, struct _starpu_mp_node * node_set);
 
 #if defined(STARPU_USE_MPI_MASTER_SLAVE) && !defined(STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD)
 void _starpu_src_common_init_switch_env(unsigned this);
-void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
-                 int ndevices,
-                 struct _starpu_mp_node ** mp_node);
+void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set, int ndevices, struct _starpu_mp_node ** mp_node);
 #endif
 
 #endif /* STARPU_USE_MP */
 
-
 #endif /* __SOURCE_COMMON_H__ */

+ 5 - 5
src/drivers/mpi/driver_mpi_common.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2016,2017                                Inria
- * Copyright (C) 2017                                     CNRS
+ * Copyright (C) 2017, 2019                               CNRS
  * Copyright (C) 2017                                     Université de Bordeaux
  * Copyright (C) 2015                                     Mathieu Lirzin
  *
@@ -162,6 +162,7 @@ void _starpu_mpi_common_send(const struct _starpu_mp_node *node, void *msg, int
 {
         int res;
         int id_proc;
+
         MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
 
         //_STARPU_MSG("envoi %d B to %d\n", len, node->mp_connection.mpi_remote_nodeid);
@@ -202,13 +203,13 @@ void _starpu_mpi_common_mp_send(const struct _starpu_mp_node *node, void *msg, i
         _starpu_mpi_common_send(node, msg, len, NULL);
 }
 
-
 /* RECV to source node */
 void _starpu_mpi_common_recv(const struct _starpu_mp_node *node, void *msg, int len, void * event)
 {
         int res;
         int id_proc;
         MPI_Status s;
+
         MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
 
         //_STARPU_MSG("recv %d B from %d in %p\n", len, node->mp_connection.mpi_remote_nodeid, msg);
@@ -258,6 +259,7 @@ void _starpu_mpi_common_send_to_device(const struct _starpu_mp_node *node STARPU
 {
         int res;
         int id_proc;
+
         MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
 
         //_STARPU_MSG("S_to_D send %d bytes from %d from %p\n", len, dst_devid, msg);
@@ -299,6 +301,7 @@ void _starpu_mpi_common_recv_from_device(const struct _starpu_mp_node *node STAR
 {
         int res;
         int id_proc;
+
         MPI_Comm_rank(MPI_COMM_WORLD, &id_proc);
 
         //_STARPU_MSG("R_to_D nop recv %d bytes from %d\n", len, src_devid);
@@ -405,7 +408,6 @@ int _starpu_mpi_common_test_event(struct _starpu_async_channel * event)
         return !event->starpu_mp_common_finished_sender && !event->starpu_mp_common_finished_receiver;
 }
 
-
 /* - In device to device communications, the first ack received by host
  * is considered as the sender (but it cannot be, in fact, the sender)
  */
@@ -448,8 +450,6 @@ void _starpu_mpi_common_wait_event(struct _starpu_async_channel * event)
         }
 }
 
-
-
 void _starpu_mpi_common_barrier(void)
 {
         int ret = MPI_Barrier(MPI_COMM_WORLD);

+ 6 - 13
src/drivers/mpi/driver_mpi_source.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2016,2017                                Inria
- * Copyright (C) 2017                                     CNRS
+ * Copyright (C) 2017, 2019                               CNRS
  * Copyright (C) 2017                                     Université de Bordeaux
  * Copyright (C) 2015                                     Mathieu Lirzin
  *
@@ -46,7 +46,6 @@ struct _starpu_mpi_ms_kernel
 	starpu_mpi_ms_kernel_t func[STARPU_MAXMPIDEVS];
 } *kernels;
 
-
 /* Array of structures containing all the informations useful to send
  * and receive informations with devices */
 struct _starpu_mp_node *_starpu_mpi_ms_nodes[STARPU_MAXMPIDEVS];
@@ -114,8 +113,8 @@ int _starpu_mpi_copy_mpi_to_ram_sync(void *src, unsigned src_node, void *dst, un
 int _starpu_mpi_copy_sink_to_sink_sync(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size)
 {
         return _starpu_src_common_copy_sink_to_sink_sync(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
-                        _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
-                        src, dst, size);
+							 _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
+							 src, dst, size);
 }
 
 int _starpu_mpi_copy_mpi_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size, void * event)
@@ -133,11 +132,10 @@ int _starpu_mpi_copy_ram_to_mpi_async(void *src, unsigned src_node STARPU_ATTRIB
 int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size, void * event)
 {
         return _starpu_src_common_copy_sink_to_sink_async(_starpu_mpi_src_get_mp_node_from_memory_node(src_node),
-                        _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
-                        src, dst, size, event);
+							  _starpu_mpi_src_get_mp_node_from_memory_node(dst_node),
+							  src, dst, size, event);
 }
 
-
 int starpu_mpi_ms_register_kernel(starpu_mpi_ms_func_symbol_t *symbol, const char *func_name)
 {
         unsigned int func_name_size = (strlen(func_name) + 1) * sizeof(char);
@@ -186,7 +184,6 @@ int starpu_mpi_ms_register_kernel(starpu_mpi_ms_func_symbol_t *symbol, const cha
         return 0;
 }
 
-
 starpu_mpi_ms_kernel_t starpu_mpi_ms_get_kernel(starpu_mpi_ms_func_symbol_t symbol)
 {
         int workerid = starpu_worker_get_id();
@@ -290,7 +287,6 @@ unsigned _starpu_mpi_src_get_device_count()
         nb_mpi_devices = nb_mpi_devices - 1;
 
         return nb_mpi_devices;
-
 }
 
 void *_starpu_mpi_src_worker(void *arg)
@@ -308,7 +304,7 @@ void *_starpu_mpi_src_worker(void *arg)
 #endif
 
                 /* As all workers of a set share common data, we just use the first
-                 *       * one for intializing the following stuffs. */
+		 * one for intializing the following stuffs. */
                 struct _starpu_worker *baseworker = &worker_set->workers[0];
                 struct _starpu_machine_config *config = baseworker->config;
                 unsigned baseworkerid = baseworker - config->workers;
@@ -376,7 +372,6 @@ void *_starpu_mpi_src_worker(void *arg)
         }
 #endif
 
-
 #ifndef STARPU_MPI_MASTER_SLAVE_MULTIPLE_THREAD
         _starpu_src_common_workers_set(worker_set_mpi, nbsinknodes, _starpu_mpi_ms_nodes);
 #else
@@ -384,6 +379,4 @@ void *_starpu_mpi_src_worker(void *arg)
 #endif
 
         return NULL;
-
-
 }