Bläddra i källkod

add reduction support for MPI Master-Slave devices

Corentin Salingue 8 år sedan
förälder
incheckning
92a6491016

+ 43 - 16
src/datawizard/reduction.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010-2014, 2016  Université de Bordeaux
  * Copyright (C) 2011, 2012, 2013, 2016  CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -79,6 +80,11 @@ void _starpu_redux_init_data_replicate(starpu_data_handle_t handle, struct _star
 			init_func = _starpu_mic_src_get_kernel_from_codelet(init_cl, 0);
 			break;
 #endif
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+		case STARPU_MPI_MS_WORKER:
+			init_func = _starpu_mpi_ms_src_get_kernel_from_codelet(init_cl, 0); 
+			break;
+#endif
 			/* TODO: SCC */
 		default:
 			STARPU_ABORT();
@@ -87,24 +93,45 @@ void _starpu_redux_init_data_replicate(starpu_data_handle_t handle, struct _star
 
 	STARPU_ASSERT(init_func);
 
-#ifdef STARPU_USE_MIC
-	if (starpu_worker_get_type(workerid) == STARPU_MIC_WORKER)
+	switch (starpu_worker_get_type(workerid))
 	{
-		struct _starpu_mp_node *node = _starpu_mic_src_get_actual_thread_mp_node();
-		int devid = _starpu_get_worker_struct(workerid)->devid;
-		void * arg;
-		int arg_size;
-		_starpu_src_common_execute_kernel(node,
-						 (void(*)(void))init_func, devid,
-						 STARPU_SEQ, 0, 0, &handle, 
-						 &(replicate->data_interface), 1,
-						 NULL, 0);
-		_starpu_src_common_wait_completed_execution(node,devid,&arg,&arg_size);
-	}
-	else
+#ifdef STARPU_USE_MIC
+		case STARPU_MIC_WORKER:
+		{
+			struct _starpu_mp_node *node = _starpu_mic_src_get_actual_thread_mp_node();
+			int devid = _starpu_get_worker_struct(workerid)->devid;
+			void * arg;
+			int arg_size;
+			_starpu_src_common_execute_kernel(node,
+					(void(*)(void))init_func, devid,
+					STARPU_SEQ, 0, 0, &handle, 
+					&(replicate->data_interface), 1,
+					NULL, 0, 1);
+			_starpu_src_common_wait_completed_execution(node,devid,&arg,&arg_size);
+			break;
+		}
 #endif
-	{
-		init_func(&replicate->data_interface, NULL);
+#ifdef STARPU_USE_MPI_MASTER_SLAVE
+		case STARPU_MPI_MS_WORKER:
+		{
+			struct _starpu_mp_node *node = _starpu_mpi_ms_src_get_actual_thread_mp_node();
+			int devid = _starpu_get_worker_struct(workerid)->devid;
+			void * arg;
+			int arg_size;
+
+			_starpu_src_common_execute_kernel(node,
+					(void(*)(void))init_func, devid,
+					STARPU_SEQ, 0, 0, &handle, 
+					&(replicate->data_interface), 1,
+					NULL, 0 , 1);
+
+			_starpu_src_common_wait_completed_execution(node,devid,&arg,&arg_size);
+			break;
+		}
+#endif
+		default:
+			init_func(&replicate->data_interface, NULL);
+			break;
 	}
 
 	replicate->initialized = 1;

+ 31 - 1
src/drivers/mp_common/mp_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2016  INRIA
+ * Copyright (C) 2012, 2016, 2017  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -40,8 +40,12 @@ const char *_starpu_mp_common_command_to_string(const int command)
 			return "EXIT";
 		case STARPU_MP_COMMAND_EXECUTE:
 			return "EXECUTE";
+		case STARPU_MP_COMMAND_EXECUTE_DETACHED:
+			return "EXECUTE_DETACHED";
 		case STARPU_MP_COMMAND_ERROR_EXECUTE:
 			return "ERROR_EXECUTE";
+		case STARPU_MP_COMMAND_ERROR_EXECUTE_DETACHED:
+			return "ERROR_EXECUTE_DETACHED";
 		case STARPU_MP_COMMAND_LOOKUP:
 			return "LOOKUP";
 		case STARPU_MP_COMMAND_ANSWER_LOOKUP:
@@ -56,6 +60,7 @@ const char *_starpu_mp_common_command_to_string(const int command)
 			return "ERROR_ALLOCATE";
 		case STARPU_MP_COMMAND_FREE:
 			return "FREE";
+		/* Synchronous send */
 		case STARPU_MP_COMMAND_RECV_FROM_HOST:
 			return "RECV_FROM_HOST";
 		case STARPU_MP_COMMAND_SEND_TO_HOST:
@@ -64,6 +69,24 @@ const char *_starpu_mp_common_command_to_string(const int command)
 			return "RECV_FROM_SINK";
 		case STARPU_MP_COMMAND_SEND_TO_SINK:
 			return "SEND_TO_SINK";
+		/* Asynchronous send */
+		case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC:
+			return "RECV_FROM_HOST_ASYNC";
+		case STARPU_MP_COMMAND_RECV_FROM_HOST_ASYNC_COMPLETED:
+			return "RECV_FROM_HOST_ASYNC_COMPLETED";
+		case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC:
+			return "SEND_TO_HOST_ASYNC";
+		case STARPU_MP_COMMAND_SEND_TO_HOST_ASYNC_COMPLETED:
+			return "SEND_TO_HOST_ASYNC_COMPLETED";
+		case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC:
+			return "RECV_FROM_SINK_ASYNC";
+		case STARPU_MP_COMMAND_RECV_FROM_SINK_ASYNC_COMPLETED:
+			return "RECV_FROM_SINK_ASYNC_COMPLETED";
+		case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC:
+			return "SEND_TO_SINK_ASYNC";
+		case STARPU_MP_COMMAND_SEND_TO_SINK_ASYNC_COMPLETED:
+			return "SEND_TO_SINK_ASYNC_COMPLETED";
+
 		case STARPU_MP_COMMAND_TRANSFER_COMPLETE:
 			return "TRANSFER_COMPLETE";
 		case STARPU_MP_COMMAND_SINK_NBCORES:
@@ -74,6 +97,10 @@ const char *_starpu_mp_common_command_to_string(const int command)
 			return "EXECUTION_SUBMITTED";
 		case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
 			return "EXECUTION_COMPLETED";
+		case STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED:
+			return "EXECUTION_SUBMITTED_DETACHED";
+		case STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED:
+			return "EXECUTION_DETACHED_COMPLETED";
 		case STARPU_MP_COMMAND_PRE_EXECUTION:
 			return "PRE_EXECUTION";
 		case STARPU_MP_COMMAND_SYNC_WORKERS:
@@ -317,11 +344,13 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		int i;
 		node->is_running = 1;
 		_STARPU_MALLOC(node->run_table, sizeof(struct mp_task *)*node->nb_cores);
+		_STARPU_MALLOC(node->run_table_detached, sizeof(struct mp_task *)*node->nb_cores);
 		_STARPU_MALLOC(node->sem_run_table, sizeof(sem_t)*node->nb_cores);
 
 		for(i=0; i<node->nb_cores; i++)
 		{
 			node->run_table[i] = NULL;
+			node->run_table_detached[i] = NULL;
 			sem_init(&node->sem_run_table[i],0,0);
 		}
 		mp_barrier_list_init(&node->barrier_list);
@@ -352,6 +381,7 @@ void _starpu_mp_common_node_destroy(struct _starpu_mp_node *node)
 		}
 
 		free(node->run_table);
+		free(node->run_table_detached);
 		free(node->sem_run_table);
 
 		STARPU_PTHREAD_MUTEX_DESTROY(&node->barrier_mutex);

+ 7 - 1
src/drivers/mp_common/mp_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2016  INRIA
+ * Copyright (C) 2012, 2016, 2017  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -45,7 +45,9 @@ enum _starpu_mp_command
 {
 	STARPU_MP_COMMAND_EXIT,
 	STARPU_MP_COMMAND_EXECUTE,
+	STARPU_MP_COMMAND_EXECUTE_DETACHED,
 	STARPU_MP_COMMAND_ERROR_EXECUTE,
+	STARPU_MP_COMMAND_ERROR_EXECUTE_DETACHED,
 	STARPU_MP_COMMAND_LOOKUP,
 	STARPU_MP_COMMAND_ANSWER_LOOKUP,
 	STARPU_MP_COMMAND_ERROR_LOOKUP,
@@ -73,6 +75,8 @@ enum _starpu_mp_command
 	STARPU_MP_COMMAND_ANSWER_SINK_NBCORES,
 	STARPU_MP_COMMAND_EXECUTION_SUBMITTED,
 	STARPU_MP_COMMAND_EXECUTION_COMPLETED,
+	STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED,
+	STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED,
 	STARPU_MP_COMMAND_PRE_EXECUTION,
 	STARPU_MP_COMMAND_SYNC_WORKERS,
 };
@@ -142,6 +146,7 @@ struct mp_task
 	enum starpu_codelet_type type;
 	int is_parallel_task;
 	int combined_workerid;
+	int detached;
  	struct mp_barrier* mp_barrier;
 };
 
@@ -229,6 +234,7 @@ struct _starpu_mp_node
 
         /*table where worker comme pick task*/
         struct mp_task ** run_table;
+        struct mp_task ** run_table_detached;
         sem_t * sem_run_table;
 
         /* Node general functions */

+ 34 - 10
src/drivers/mp_common/sink_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2016  INRIA
+ * Copyright (C) 2012, 2016, 2017  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -370,6 +370,7 @@ void _starpu_sink_common_worker(void)
 				case STARPU_MP_COMMAND_EXIT:
 					exit_starpu = 1;
 					break;
+				case STARPU_MP_COMMAND_EXECUTE_DETACHED:
 				case STARPU_MP_COMMAND_EXECUTE:
 					node->execute(node, arg, arg_size);
 					break;
@@ -552,7 +553,10 @@ static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_no
 {
 	/* Init message to tell the sink that the execution is completed */
 	struct mp_message * message = mp_message_new();
-	message->type = STARPU_MP_COMMAND_EXECUTION_COMPLETED;
+	if (task->detached)
+		message->type = STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED;
+	else
+		message->type = STARPU_MP_COMMAND_EXECUTION_COMPLETED;
 	_STARPU_MALLOC(message->buffer, sizeof(int));
 	*(int*) message->buffer = task->coreid;
 	message->size = sizeof(int);
@@ -590,10 +594,14 @@ static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_com
 
 /* Execute the task
  */
-static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct _starpu_worker * worker)
+static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct _starpu_worker * worker, int detached)
 {
 	struct _starpu_combined_worker * combined_worker = NULL;
-	struct mp_task* task = node->run_table[coreid];
+	struct mp_task* task;
+	if (detached)
+		task = node->run_table_detached[coreid];
+	else
+		task = node->run_table[coreid];
 
 
 	/* If it's a parallel task */
@@ -659,7 +667,10 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
 		}
 	}
 
-	node->run_table[coreid] = NULL;
+	if (detached)
+		node->run_table_detached[coreid] = NULL;
+	else
+		node->run_table[coreid] = NULL;
 
 	/* tell the sink that the execution is completed */
 	_starpu_sink_common_execution_completed_message(node,task);
@@ -698,8 +709,10 @@ void* _starpu_sink_thread(void * thread_arg)
 	{
 		/*Wait there is a task available */
 		sem_wait(&node->sem_run_table[coreid]);
-		if(node->run_table[coreid] != NULL)
-			_starpu_sink_common_execute_kernel(node,coreid,worker);
+		if (node->run_table_detached[coreid] != NULL)
+			_starpu_sink_common_execute_kernel(node, coreid, worker, 1);
+		if (node->run_table[coreid] != NULL)
+			_starpu_sink_common_execute_kernel(node, coreid, worker, 0);
 
 	}
 	starpu_pthread_exit(NULL);
@@ -710,8 +723,12 @@ void* _starpu_sink_thread(void * thread_arg)
 */
 static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
 {
+	int detached = task->detached;
 	/* Add the task to the specific thread */
-	node->run_table[task->coreid] = task;
+	if (detached)
+		node->run_table_detached[task->coreid] = task;
+	else
+		node->run_table[task->coreid] = task;
 	/* Unlock the mutex to wake up the thread which will execute the task */
 	sem_post(&node->sem_run_table[task->coreid]);
 }
@@ -757,6 +774,9 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
 	task->nb_interfaces = *(unsigned *) arg_ptr;
 	arg_ptr += sizeof(task->nb_interfaces);
 
+	task->detached = *(int *) arg_ptr;
+	arg_ptr += sizeof(task->detached);
+
 	_STARPU_MALLOC(task->interfaces, task->nb_interfaces * sizeof(*task->interfaces));
 
 	/* The function needs an array pointing to each interface it needs
@@ -785,8 +805,12 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
 
 
 	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
-	_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_SUBMITTED,
-			NULL, 0);
+	if (task->detached)
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED,
+				NULL, 0);
+	else
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTION_SUBMITTED,
+				NULL, 0);
 
 	//_STARPU_DEBUG("executing the task %p\n", task->kernel);
 	_starpu_sink_common_execute_thread(node, task);

+ 22 - 8
src/drivers/mp_common/source_common.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2016  INRIA
+ * Copyright (C) 2012, 2016, 2017  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -109,6 +109,7 @@ static int _starpu_src_common_process_completed_job(struct _starpu_mp_node *node
 	_starpu_set_local_worker_key(old_worker);
 
 	worker->current_task = NULL;
+
 	return 0;
 }
 
@@ -147,6 +148,9 @@ static int _starpu_src_common_handle_async(struct _starpu_mp_node *node,
                         worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
                         _starpu_src_common_process_completed_job(node, worker_set, arg, arg_size, stored);
                         break;
+                case STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED:
+			_STARPU_ERROR("Detached execution completed should not arrive here... \n"); 
+                        break;
                 case STARPU_MP_COMMAND_PRE_EXECUTION:
                         _starpu_src_common_pre_exec(node, arg,arg_size, stored);
                         break;
@@ -211,6 +215,7 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
 	switch(answer)
 	{
 		case STARPU_MP_COMMAND_EXECUTION_COMPLETED:
+		case STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED:
 		case STARPU_MP_COMMAND_PRE_EXECUTION:
 		{
 			message = mp_message_new();
@@ -284,7 +289,7 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
 	{
 		answer = _starpu_mp_common_recv_command (node, arg, arg_size);
 
-		if(answer == STARPU_MP_COMMAND_EXECUTION_COMPLETED)
+		if(answer == STARPU_MP_COMMAND_EXECUTION_DETACHED_COMPLETED)
 		{
 			int coreid;
 			STARPU_ASSERT(sizeof(coreid) == *arg_size);
@@ -393,14 +398,14 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
 		starpu_data_handle_t *handles,
 		void **interfaces,
 		unsigned nb_interfaces,
-		void *cl_arg, size_t cl_arg_size)
+		void *cl_arg, size_t cl_arg_size, int detached)
 {
 	void *buffer, *arg =NULL;
 	uintptr_t buffer_ptr;
 	int buffer_size = 0, arg_size =0;
 	unsigned i;
 
-	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
+	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type) + sizeof(detached)
 		+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
 
 	/*if the task is parallel*/
@@ -444,6 +449,9 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
 	*(unsigned *) buffer_ptr = nb_interfaces;
 	buffer_ptr += sizeof(nb_interfaces);
 
+	*(int *) buffer_ptr = detached;
+	buffer_ptr += sizeof(detached);
+
 	/* Message-passing execution is a particular case as the codelet is
 	 * executed on a sink with a different memory, whereas a codelet is
 	 * executed on the host part for the other accelerators.
@@ -466,17 +474,23 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
 
         STARPU_PTHREAD_MUTEX_LOCK(&node->connection_mutex);
 
-	_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE, buffer, buffer_size);
+	if (detached)
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE_DETACHED, buffer, buffer_size);
+	else
+		_starpu_mp_common_send_command(node, STARPU_MP_COMMAND_EXECUTE, buffer, buffer_size);
 
 	enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
 
-        if (answer == STARPU_MP_COMMAND_ERROR_EXECUTE)
+        if (answer == STARPU_MP_COMMAND_ERROR_EXECUTE_DETACHED)
         {
                 STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
                 return -EINVAL;
         }
 
-	STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_SUBMITTED);
+	if (detached)
+		STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_DETACHED_SUBMITTED);
+	else
+		STARPU_ASSERT(answer == STARPU_MP_COMMAND_EXECUTION_SUBMITTED);
 
         STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
 
@@ -508,7 +522,7 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
 			(j->task_size > 1),
 			j->combined_workerid, STARPU_TASK_GET_HANDLES(task),
 			_STARPU_TASK_GET_INTERFACES(task), STARPU_TASK_GET_NBUFFERS(task),
-			task->cl_arg, task->cl_arg_size);
+			task->cl_arg, task->cl_arg_size, 0);
 
 
 	return 0;

+ 2 - 2
src/drivers/mp_common/source_common.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2012, 2016  INRIA
+ * Copyright (C) 2012, 2016, 2017  INRIA
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -51,7 +51,7 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 				      starpu_data_handle_t *handles,
 				      void **interfaces,
 				      unsigned nb_interfaces,
-				      void *cl_arg, size_t cl_arg_size);
+				      void *cl_arg, size_t cl_arg_size, int detached);
 
 
 int _starpu_src_common_copy_host_to_sink_sync(struct _starpu_mp_node *mp_node,

+ 45 - 1
src/drivers/mpi/driver_mpi_source.c

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
- * Copyright (C) 2016  Inria
+ * Copyright (C) 2016, 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -49,6 +49,17 @@ struct _starpu_mpi_ms_kernel
  * and receive informations with devices */
 struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMPIDEVS];
 
+struct _starpu_mp_node *_starpu_mpi_ms_src_get_actual_thread_mp_node()
+{
+	struct _starpu_worker *actual_worker = _starpu_get_local_worker_key();
+	STARPU_ASSERT(actual_worker);
+
+	int devid = actual_worker->devid;
+	STARPU_ASSERT(devid >= 0 && devid < STARPU_MAXMPIDEVS);
+
+	return mpi_ms_nodes[devid];
+}
+
 void _starpu_mpi_source_init(struct _starpu_mp_node *node)
 {
         _starpu_mpi_common_mp_initialize_src_sink(node);
@@ -198,6 +209,39 @@ starpu_mpi_ms_kernel_t _starpu_mpi_ms_src_get_kernel(starpu_mpi_ms_func_symbol_t
         return kernel->func[devid];
 }
 
+starpu_mpi_ms_kernel_t _starpu_mpi_ms_src_get_kernel_from_codelet(struct starpu_codelet *cl, unsigned nimpl)
+{
+	starpu_mpi_ms_kernel_t kernel = NULL;
+
+	starpu_mpi_ms_func_t func = _starpu_task_get_mpi_ms_nth_implementation(cl, nimpl);
+	if (func)
+	{
+		/* We execute the function contained in the codelet, it must return a
+		 * pointer to the function to execute on the device, either specified
+		 * directly by the user or by a call to starpu_mic_get_func().
+		 */
+		kernel = func();
+	}
+	else
+	{
+		/* If user dont define any starpu_mic_fun_t in cl->mic_func we try to use
+		 * cpu_func_name.
+		 */
+		const char *func_name = _starpu_task_get_cpu_name_nth_implementation(cl, nimpl);
+		if (func_name)
+		{
+			starpu_mpi_ms_func_symbol_t symbol;
+
+			_starpu_mpi_ms_src_register_kernel(&symbol, func_name);
+
+			kernel = _starpu_mpi_ms_src_get_kernel(symbol);
+		}
+	}
+	STARPU_ASSERT_MSG(kernel, "when STARPU_MPI_MS is defined in 'where', mpi_ms_funcs or cpu_funcs_name has to be defined and the function be non-static");
+
+	return kernel;
+}
+
 void(* _starpu_mpi_ms_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void)
 {
         starpu_mpi_ms_kernel_t kernel = NULL;

+ 4 - 1
src/drivers/mpi/driver_mpi_source.h

@@ -1,7 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2015  Mathieu Lirzin <mthl@openmailbox.org>
- * Copyright (C) 2016  Inria
+ * Copyright (C) 2016, 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -27,6 +27,7 @@
  * and receive informations with devices */
 extern struct _starpu_mp_node *mpi_ms_nodes[STARPU_MAXMPIDEVS];
 struct _starpu_mp_node *_starpu_mpi_src_get_mp_node_from_memory_node(int memory_node);
+struct _starpu_mp_node *_starpu_mpi_ms_src_get_actual_thread_mp_node();
 
 unsigned _starpu_mpi_src_get_device_count();
 void *_starpu_mpi_src_worker(void *arg);
@@ -45,6 +46,8 @@ int _starpu_mpi_copy_mpi_to_ram_async(void *src, unsigned src_node, void *dst, u
 int _starpu_mpi_copy_ram_to_mpi_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size, void * event);
 int _starpu_mpi_copy_sink_to_sink_async(void *src, unsigned src_node, void *dst, unsigned dst_node, size_t size, void * event);
 
+
+starpu_mpi_ms_kernel_t _starpu_mpi_ms_src_get_kernel_from_codelet(struct starpu_codelet *cl, unsigned nimpl);
 void(* _starpu_mpi_ms_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void);
 
 #endif /* STARPU_USE_MPI_MASTER_SLAVE */

+ 2 - 7
tests/datawizard/increment_init.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010, 2012-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2014  CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -142,14 +143,8 @@ int main(int argc, char **argv)
 {
 	unsigned *pvar = NULL;
 	int ret;
-	struct starpu_conf conf;
 	
-	starpu_conf_init(&conf);
-
-	/* MPI Master Slave does not support Redux */
-	conf.nmpi_ms = 0;
-
-	ret = starpu_init(&conf);
+	ret = starpu_init(NULL);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 

+ 2 - 7
tests/datawizard/increment_redux.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010, 2012-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -215,18 +216,12 @@ static struct starpu_codelet increment_cl =
 int main(int argc, char **argv)
 {
 	int ret;
-	struct starpu_conf conf;
-
-	starpu_conf_init(&conf);
-
-	/* MPI Master Slave doesn't support redux */
-	conf.nmpi_ms = 0;
 
 	/* Not supported yet */
 	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
 		return STARPU_TEST_SKIPPED;
 
-	ret = starpu_initialize(&conf, &argc, &argv);
+	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 

+ 2 - 7
tests/datawizard/increment_redux_lazy.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2010, 2012-2016  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -197,18 +198,12 @@ int main(int argc, char **argv)
 {
 	int ret;
 	unsigned *var;
-	struct starpu_conf conf;
-
-        starpu_conf_init(&conf);
-
-	/* MPI Master Slave does not support Redux */
-	conf.nmpi_ms = 0;
 
 	/* Not supported yet */
 	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
 		return STARPU_TEST_SKIPPED;
 
-	ret = starpu_initialize(&conf, &argc, &argv);
+	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 

+ 2 - 7
tests/datawizard/increment_redux_v2.c

@@ -1,6 +1,7 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  * Copyright (C) 2011-2016  Université de Bordeaux
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -231,18 +232,12 @@ struct starpu_codelet increment_cl_redux =
 int main(int argc, char **argv)
 {
 	int ret;
-	struct starpu_conf conf;
-
-	starpu_conf_init(&conf);
-
-	/* MPI Master-Slave doesn't support Redux */
-	conf.nmpi_ms = 0;
 
 	/* Not supported yet */
 	if (starpu_get_env_number_default("STARPU_GLOBAL_ARBITER", 0) > 0)
 		return STARPU_TEST_SKIPPED;
 
-	ret = starpu_initialize(&conf, &argc, &argv);
+	ret = starpu_initialize(NULL, &argc, &argv);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 

+ 1 - 1
tests/datawizard/interfaces/multiformat/advanced/multiformat_data_release.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2011-2012  INRIA
  * Copyright (C) 2011, 2012, 2013  CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -151,7 +152,6 @@ main(int argc, char **argv)
 	conf.ncuda = 1;
 	conf.nopencl = 1;
 	conf.nmic = 1;
-	conf.nmpi_ms = 0;
 	memset(&global_stats, 0, sizeof(global_stats));
 	ret = starpu_initialize(&conf, &argc, &argv);
 	if (ret == -ENODEV || starpu_cpu_worker_get_count() == 0) return STARPU_TEST_SKIPPED;