Pārlūkot izejas kodu

mic: mode forkjoin spmd (begining)

Thibaud Lambert 12 gadi atpakaļ
vecāks
revīzija
89de5a4afa

+ 2 - 0
src/core/workers.c

@@ -1421,6 +1421,8 @@ struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
 struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id)
 {
 	unsigned basic_worker_count = starpu_worker_get_count();
+	
+	_STARPU_DEBUG("basic_worker_count:%d\n",basic_worker_count);
 
 	STARPU_ASSERT(id >= basic_worker_count);
 	return &config.combined_workers[id - basic_worker_count];

+ 5 - 4
src/datawizard/reduction.c

@@ -92,10 +92,11 @@ void _starpu_redux_init_data_replicate(starpu_data_handle_t handle, struct _star
 		int arg_size = 0;
 
 		// XXX: give the correct coreid.
-		_starpu_src_common_execute_kernel(node,
-						  (void(*)(void))init_func, 0,
-						  &handle, &(replicate->data_interface), 1,
-						  NULL, 0);
+	       _starpu_src_common_execute_kernel(node,
+						 (void(*)(void))init_func, 0,
+						 STARPU_SEQ, 0, 0, &handle, 
+						 &(replicate->data_interface), 1,
+						 NULL, 0);
 		answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
 		STARPU_ASSERT (answer == STARPU_EXECUTION_COMPLETED);
 	}

+ 10 - 12
src/drivers/mic/driver_mic_sink.c

@@ -45,6 +45,10 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	_starpu_mic_common_accept(&node->host_sink_dt_connection.mic_endpoint,
 									 STARPU_MIC_SOURCE_DT_PORT_NUMBER);
 
+	node->nb_cores = COISysGetCoreCount();
+
+	_starpu_sink_common_init(node);
+
 	//node->sink_sink_dt_connections = malloc(node->nb_mp_sinks * sizeof(union _starpu_mp_connection));
 
 	//for (i = 0; i < (unsigned int)node->devid; ++i)
@@ -141,17 +145,11 @@ void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUT
 
 /* bind the thread to a core
  */
-void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread)
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid)
 {
-  int j, ret;
-  //init the set
-  CPU_ZERO(cpuset);
-
-  //adding the core to the set
-  for(j=0;j<HYPER_THREAD_NUMBER;j++)
-    CPU_SET(j+coreid*HYPER_THREAD_NUMBER,cpuset);
-  
-  //affect the thread to the core
-  ret = pthread_setaffinity_np(*thread, sizeof(cpu_set_t), cpuset);
-  STARPU_ASSERT(ret == 0);
+	int j, ret;
+
+	//adding the core to the set
+	for(j=0;j<HYPER_THREAD_NUMBER;j++)
+		CPU_SET(j+coreid*HYPER_THREAD_NUMBER,cpuset);
 }

+ 1 - 1
src/drivers/mic/driver_mic_sink.h

@@ -41,7 +41,7 @@ unsigned int _starpu_mic_sink_get_nb_core(void);
 
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
-void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread);
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid);
 
 #endif /* STARPU_USE_MIC */
 

+ 17 - 10
src/drivers/mp_common/mp_common.h

@@ -35,7 +35,7 @@
 #define STARPU_MP_SRC_NODE 0
 #define STARPU_MP_SINK_NODE(a) ((a) + 1)
 
-#define STARPU_MP_COMMON_REPORT_ERROR(node, status) \
+#define STARPU_MP_COMMON_REPORT_ERROR(node, status)			\
 	(node)->report_error(__starpu_func__, __FILE__, __LINE__, (status))
 
 
@@ -103,6 +103,9 @@ struct _starpu_mp_node
 {
 	enum _starpu_mp_node_kind kind;
 
+	/*the number of core*/
+	int nb_cores;
+
 	/* Buffer used for scif data transfers, allocated
 	 * during node initialization.
 	 * Size : BUFFER_SIZE */
@@ -117,11 +120,8 @@ struct _starpu_mp_node
 	 * This is the devid both for the sink and the host. */
 	int devid;
 
-        /*dead queue*/
-        struct task_fifo dead_queue;
-
 	/* Only MIC use this for now !!
-	*  Is the number ok MIC on the system. */
+	 *  Is the number ok MIC on the system. */
 	unsigned int nb_mp_sinks;
 
 	/* Connection used for command passing between the host thread and the
@@ -142,13 +142,20 @@ struct _starpu_mp_node
 	 *  - sink_sink_dt_connections[j] is not initialized for the sink number j. */
 	union _starpu_mp_connection *sink_sink_dt_connections;
 
+        /*dead queue where the finished kernel are added */
+        struct mp_task_fifo dead_queue;
+
+	/**/
+	struct mp_task ** run_table;
+	pthread_mutex_t * mutex_table;
+
 	/* Node general functions */
 	void (*init)(struct _starpu_mp_node *node);
 	void (*deinit)(struct _starpu_mp_node *node);
 	void (*report_error)(const char *, const char *, const int, const int);
 
-  /* Message passing */
-  int (*mp_recv_is_ready)(const struct _starpu_mp_node *);
+	/* Message passing */
+	int (*mp_recv_is_ready)(const struct _starpu_mp_node *);
 	void (*mp_send)(const struct _starpu_mp_node *, void *, int);
 	void (*mp_recv)(const struct _starpu_mp_node *, void *, int);
 
@@ -158,8 +165,8 @@ struct _starpu_mp_node
 	void (*dt_send_to_device)(const struct _starpu_mp_node *, int, void *, int);
 	void (*dt_recv_from_device)(const struct _starpu_mp_node *, int, void *, int);
 
-  void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
-  void (*bind_thread)(const struct _starpu_mp_node *, cpu_set_t *,int, pthread_t *);
+	void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
+	void (*bind_thread)(const struct _starpu_mp_node *, cpu_set_t *,int);
 	void (*execute)(const struct _starpu_mp_node *, void *, int);
 	void (*nbcores)(const struct _starpu_mp_node *);
 	void (*allocate)(const struct _starpu_mp_node *, void *, int);
@@ -177,7 +184,7 @@ void _starpu_mp_common_send_command(const struct _starpu_mp_node *node,
 				    void *arg, int arg_size);
 
 enum _starpu_mp_command _starpu_mp_common_recv_command(const struct _starpu_mp_node *node,
-						    void **arg, int *arg_size);
+						       void **arg, int *arg_size);
 
 
 #endif /* STARPU_USE_MP */

+ 119 - 28
src/drivers/mp_common/sink_common.c

@@ -31,6 +31,14 @@
 
 #include "task_fifo.h"
 
+struct arg_sink_thread
+{
+	struct mp_task ** task;
+	pthread_mutex_t * mutex;
+	struct _starpu_mp_node *node;
+	int coreid;
+};
+
 /* Return the sink kind of the running process, based on the value of the
  * STARPU_SINK environment variable.
  * If there is no valid value retrieved, return STARPU_INVALID_KIND
@@ -56,16 +64,8 @@ void
 _starpu_sink_nbcores (const struct _starpu_mp_node *node)
 {
 	// Process packet received from `_starpu_src_common_sink_cores'.
-	int nbcores = 1;
-
-#ifdef STARPU_USE_MIC
-	// XXX I currently only support MIC for now.
-	if (STARPU_MIC_SINK == _starpu_sink_common_get_kind ())
-		nbcores = COISysGetCoreCount();
-#endif
-
-	_starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
-					&nbcores, sizeof (int));
+     	_starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
+					&node->nb_cores, sizeof (int));
 }
 
 
@@ -228,7 +228,7 @@ void _starpu_sink_common_worker(void)
 
 		if(!task_fifo_is_empty(&(node->dead_queue)))
 		{
-			struct task * task = node->dead_queue.first;
+			struct mp_task * task = node->dead_queue.first;
 			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 			_starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
 						       &(task->coreid), sizeof(task->coreid));
@@ -247,28 +247,96 @@ void _starpu_sink_common_worker(void)
 
 static void* _starpu_sink_thread(void * thread_arg)
 {
-	struct task *arg = (struct task *)thread_arg;
-  
-	//execute the task
-	arg->kernel(arg->interfaces,arg->cl_arg);
 
-	//append the finished task to the dead queue
-	task_fifo_append(&(arg->node->dead_queue),arg);
+	struct mp_task **task = ((struct arg_sink_thread *)thread_arg)->task;
+	pthread_mutex_t * mutex = ((struct arg_sink_thread *)thread_arg)->mutex;
+	struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
+	int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
+	int i;
+	cpu_set_t base_cpu_set, para_cpu_set;
+	pthread_t thread = pthread_self();
+
+	//init the set
+	CPU_ZERO(&base_cpu_set);
+	node->bind_thread(node, &base_cpu_set, coreid);
+	free(thread_arg);
+	while(1)
+	{
+		pthread_mutex_lock(mutex);
+		if((*task)->type == STARPU_FORKJOIN && (*task)->is_parallel_task)
+		{
+			//init the set
+			CPU_ZERO(&para_cpu_set);
+
+			for(i=0; i<(*task)->combined_worker_size; i++)
+			{
+				node->bind_thread(node, &para_cpu_set, (*task)->combined_worker[i]);
+			}
+			pthread_setaffinity_np(thread,sizeof(cpu_set_t),&para_cpu_set);
+		}
+
+		//execute the task
+		(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
+		
+		if((*task)->type == STARPU_FORKJOIN && (*task)->is_parallel_task)
+		{
+			pthread_setaffinity_np(thread,sizeof(cpu_set_t),&base_cpu_set);
+		}
+
+		//append the finished task to the dead queue
+		task_fifo_append(&((*task)->node->dead_queue),(*task));	
+		
+	}
 	pthread_exit(NULL);
 }
 
-static void _starpu_sink_execute_thread(struct task *arg)
+
+void _starpu_sink_common_init(struct _starpu_mp_node *node)
 {
 	pthread_t thread;
 	cpu_set_t cpuset;
-	int ret;
-  
-	//create the tread
-	ret = pthread_create(&thread, NULL, _starpu_sink_thread, arg);
-	STARPU_ASSERT(ret == 0);
-  
-	//bind the thread on the core coreid
-	arg->node->bind_thread(arg->node, &cpuset, arg->coreid, &thread);
+	pthread_attr_t attr;
+	int i, ret;
+	struct arg_sink_thread * arg;
+
+	node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
+	node->mutex_table = malloc(sizeof(pthread_mutex_t)*node->nb_cores);
+
+	/*for each core init the mutex, the task pointer and launch the thread */
+	for(i=1; i<node->nb_cores; i++)
+	{
+		node->run_table[i] = NULL;
+		pthread_mutex_init(&node->mutex_table[i], NULL);
+		pthread_mutex_lock(&node->mutex_table[i]);
+
+		//init the set
+		CPU_ZERO(&cpuset);
+
+		/*prepare the cpuset*/
+		node->bind_thread(node, &cpuset, i);
+		ret = pthread_attr_init(&attr);
+		STARPU_ASSERT(ret == 0);
+		ret = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
+		STARPU_ASSERT(ret == 0);
+
+		/*prepare the argument for the thread*/
+		arg= malloc(sizeof(struct arg_sink_thread));
+		arg->task = &node->run_table[i];
+		arg->mutex = &node->mutex_table[i];
+		arg->coreid = i;
+		arg->node = node;
+		
+		ret = pthread_create(&thread, &attr, _starpu_sink_thread, arg);
+		STARPU_ASSERT(ret == 0);
+	}
+}
+
+static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
+{
+	//add the task to the spesific thread
+	node->run_table[task->coreid] = task;
+	//unlock the mutex
+	pthread_mutex_unlock(&node->mutex_table[task->coreid]);
 }
 
 
@@ -286,13 +354,36 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	unsigned nb_interfaces;
 
 	void *arg_ptr = arg;
-	struct task *thread_arg = malloc(sizeof(struct task));
+	struct mp_task *thread_arg = malloc(sizeof(struct mp_task));
 	
 	thread_arg->node = node;
 
 	thread_arg->kernel = *(void(**)(void **, void *)) arg_ptr;
 	arg_ptr += sizeof(thread_arg->kernel);
 
+	thread_arg->type = *(enum starpu_codelet_type *) arg_ptr;
+	arg_ptr += sizeof(thread_arg->type);
+
+	thread_arg->is_parallel_task = *(int *) arg_ptr;
+	arg_ptr += sizeof(thread_arg->is_parallel_task);
+	
+
+	//_STARPU_DEBUG("type:%d\n",thread_arg->type);
+
+	if(thread_arg->type == STARPU_FORKJOIN && thread_arg->is_parallel_task)
+	{
+		thread_arg->combined_worker_size = *(int *) arg_ptr;
+		arg_ptr += sizeof(thread_arg->combined_worker_size);
+	
+		for (id = 0; id < thread_arg->combined_worker_size; id++)
+		{
+			
+			thread_arg->combined_worker[id] = *(int*) arg_ptr;
+			arg_ptr += sizeof(thread_arg->combined_worker[id]);
+		}
+		
+	}
+
 	thread_arg->coreid = *(unsigned *) arg_ptr;
 	arg_ptr += sizeof(thread_arg->coreid);
 
@@ -320,5 +411,5 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 				       NULL, 0);
 
 	//_STARPU_DEBUG("executing the task %p\n", thread_arg->kernel);
-	_starpu_sink_execute_thread(thread_arg);	
+	_starpu_sink_common_execute_thread(node, thread_arg);	
 }

+ 1 - 0
src/drivers/mp_common/sink_common.h

@@ -40,6 +40,7 @@ void _starpu_sink_nbcores (const struct _starpu_mp_node *node);
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 
+void _starpu_sink_common_init(struct _starpu_mp_node *node);
 
 #endif /* STARPU_USE_MP */
 

+ 93 - 59
src/drivers/mp_common/source_common.c

@@ -183,26 +183,47 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  * [Function pointer on sink, number of interfaces, interfaces
  * (union _starpu_interface), cl_arg]
  */
+/* Launch the execution of the function KERNEL points to on the sink linked
+ * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
+ * pointer.
+ * Data interfaces in task are send to the sink.
+ */
 int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 				      void (*kernel)(void), unsigned coreid,
+				      enum starpu_codelet_type type,
+				      int is_parallel_task, int cb_workerid,
 				      starpu_data_handle_t *handles,
 				      void **interfaces,
 				      unsigned nb_interfaces,
 				      void *cl_arg, size_t cl_arg_size)
 {
-	unsigned id;
-	void *buffer, *buffer_ptr, *arg = NULL;
-	int buffer_size = 0, arg_size = 0;
+
+	void *buffer, *buffer_ptr, *arg =NULL;
+	int i, buffer_size = 0, cb_worker_size = 0, arg_size =0;
+	struct _starpu_combined_worker * cb_worker;
+	unsigned devid;
+
+	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
+		+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
+
+	/*if the task is paralle*/
+	if(type == STARPU_FORKJOIN && is_parallel_task)
+	{
+		_STARPU_DEBUG("\n Parallele\n");
+		_STARPU_DEBUG("type:%d\n",type);
+		_STARPU_DEBUG("cb_workerid:%d\n",cb_workerid);
+		cb_worker = _starpu_get_combined_worker_struct(cb_workerid);
+		cb_worker_size = cb_worker->worker_size;
+		buffer_size = sizeof(cb_worker_size) + cb_worker_size * sizeof(devid);
+	}
 
 	/* If the user didn't give any cl_arg, there is no need to send it */
-	buffer_size =
-		sizeof(kernel) + sizeof(coreid) + sizeof(nb_interfaces) +
-		nb_interfaces * sizeof(union _starpu_interface);
 	if (cl_arg)
 	{
 		STARPU_ASSERT(cl_arg_size);
 		buffer_size += cl_arg_size;
 	}
+	
 
 	/* We give to send_command a buffer we just allocated, which contains
 	 * a pointer to the function (sink-side), core on which execute this
@@ -213,6 +234,26 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	*(void(**)(void)) buffer = kernel;
 	buffer_ptr += sizeof(kernel);
 
+	*(enum starpu_codelet_type *) buffer_ptr = type;
+	buffer_ptr += sizeof(type);
+
+	*(int *) buffer_ptr = is_parallel_task;
+	buffer_ptr += sizeof(is_parallel_task);
+
+	if(type == STARPU_FORKJOIN && is_parallel_task)
+	{
+
+		*(int *) buffer_ptr = cb_worker_size;
+		buffer_ptr += sizeof(cb_worker_size);
+
+		for (i = 0; i < cb_worker_size; i++)
+		{
+			int devid = _starpu_get_worker_struct(cb_worker->combined_workerid[i])->devid;
+			*(int *) buffer_ptr = devid;
+			buffer_ptr += sizeof(devid);
+		}
+	}
+		
 	*(unsigned *) buffer_ptr = coreid;
 	buffer_ptr += sizeof(coreid);
 
@@ -223,10 +264,10 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	 * executed on a sink with a different memory, whereas a codelet is
 	 * executed on the host part for the other accelerators.
 	 * Thus we need to send a copy of each interface on the MP device */
-	for (id = 0; id < nb_interfaces; id++)
+	for (i = 0; i < nb_interfaces; i++)
 	{
-		starpu_data_handle_t handle = handles[id];
-		memcpy (buffer_ptr, interfaces[id],
+		starpu_data_handle_t handle = handles[i];
+		memcpy (buffer_ptr, interfaces[i],
 			handle->ops->interface_size);
 		/* The sink side has no mean to get the type of each
 		 * interface, we use a union to make it generic and permit the
@@ -248,23 +289,52 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	free(buffer);
 
 	return 0;
-
 }
 
-/* Launch the execution of the function KERNEL points to on the sink linked
- * to NODE. Returns 0 in case of success, -EINVAL if kernel is an invalid
- * pointer.
- * Data interfaces in task are send to the sink.
- */
-int _starpu_src_common_execute_kernel_from_task(const struct _starpu_mp_node *node,
-						void (*kernel)(void), unsigned coreid,
-						struct starpu_task *task)
+static int _starpu_src_common_execute(struct _starpu_job *j, 
+				      struct _starpu_worker *worker, 
+				      struct _starpu_mp_node * node)
 {
-	return _starpu_src_common_execute_kernel(node, kernel, coreid,
-						 task->handles, task->interfaces, task->cl->nbuffers,
-						 task->cl_arg, task->cl_arg_size);
+        int ret;
+	uint32_t mask = 0;
+
+	STARPU_ASSERT(j);
+	struct starpu_task *task = j->task;
+
+	int profiling = starpu_profiling_status_get();
+
+	STARPU_ASSERT(task);
+	
+	ret = _starpu_fetch_task_input(j, mask);
+	if (ret != 0)
+	{
+		/* there was not enough memory, so the input of
+		 * the codelet cannot be fetched ... put the
+		 * codelet back, and try it later */
+		return -EAGAIN;
+	}
+
+	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
+
+
+	_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
+
+	_STARPU_DEBUG("j->task_size:%d\n",j->task_size);	
+	_STARPU_DEBUG("j->cb_workerid:%d\n",j->combined_workerid);	
+
+	_STARPU_DEBUG("cb_worker_count:%d\n",starpu_combined_worker_get_count());
+
+
+	_starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
+					  (j->task_size > 1),
+					  j->combined_workerid, task->handles,
+					  task->interfaces, task->cl->nbuffers,
+					  task->cl_arg, task->cl_arg_size);
+
+	return 0;
 }
 
+
 /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
  * the sink.
  * In case of success, it returns 0 and *ADDR contains the address of the
@@ -470,42 +540,6 @@ int _starpu_src_common_locate_file(char *located_file_name,
 	return 1;
 }
 
- 
-
-static int _starpu_src_common_execute_job(struct _starpu_job *j, 
-					  struct _starpu_worker *worker, 
-					  struct _starpu_mp_node * node)
-{
-        int ret;
-	uint32_t mask = 0;
-
-	STARPU_ASSERT(j);
-	struct starpu_task *task = j->task;
-
-	int profiling = starpu_profiling_status_get();
-
-	STARPU_ASSERT(task);
-	
-	ret = _starpu_fetch_task_input(j, mask);
-	if (ret != 0)
-	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
-	}
-
-	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
-
-	_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
-
-	_starpu_src_common_execute_kernel_from_task(node, kernel, 
-						    worker->devid, task);	
-
-	return 0;
-}
-
-
 void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
 			       unsigned baseworkerid, 
 			       struct _starpu_mp_node * mp_node)
@@ -535,7 +569,7 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 		if(res != 0)
 		{
 			unsigned i;
-			_STARPU_DEBUG(" nb_tasks:%d\n", res);
+			//_STARPU_DEBUG(" nb_tasks:%d\n", res);
 			for(i=1; i<worker_set->nworkers; i++)
 			{
 				if(tasks[i] != NULL)
@@ -544,7 +578,7 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 			
 					worker_set->workers[i].current_task = j->task;
 
-					res =  _starpu_src_common_execute_job(j, &worker_set->workers[i], mp_node);
+					res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
 		
 					if (res)
 					{

+ 10 - 9
src/drivers/mp_common/source_common.h

@@ -36,21 +36,22 @@ int _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *bu
 int _starpu_src_common_lookup(const struct _starpu_mp_node *node,
 			      void (**func_ptr)(void), const char *func_name);
 
-int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
-				      void (*kernel)(void), unsigned coreid,
-				      starpu_data_handle_t *handles, void **interfaces, unsigned nb_interfaces,
-				      void *cl_arg, size_t cl_arg_size);
-
-int _starpu_src_common_execute_kernel_from_task(const struct _starpu_mp_node *node,
-						void (*kernel)(void), unsigned coreid,
-						struct starpu_task *task);
-
 int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
 				void **addr, size_t size);
 
 void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
 			     void *addr);
 
+int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
+				      void (*kernel)(void), unsigned coreid,
+				      enum starpu_codelet_type type,
+				      int is_parallel_task, int cb_workerid,
+				      starpu_data_handle_t *handles,
+				      void **interfaces,
+				      unsigned nb_interfaces,
+				      void *cl_arg, size_t cl_arg_size);
+
+
 int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
 					 void *src, void *dst, size_t size);
 

+ 4 - 4
src/drivers/mp_common/task_fifo.c

@@ -1,15 +1,15 @@
 #include "task_fifo.h"
 
-void task_fifo_init(struct task_fifo* fifo){
+void task_fifo_init(struct mp_task_fifo* fifo){
   fifo->first = fifo->last = NULL;
   pthread_mutex_init(&(fifo->mutex), NULL);
 }
 
-int task_fifo_is_empty(struct task_fifo* fifo){
+int task_fifo_is_empty(struct mp_task_fifo* fifo){
   return fifo->first == NULL;
 }
 
-void task_fifo_append(struct task_fifo* fifo, struct task * task){
+void task_fifo_append(struct mp_task_fifo* fifo, struct mp_task * task){
   pthread_mutex_lock(&(fifo->mutex));
   if(task_fifo_is_empty(fifo)){
     fifo->first = fifo->last = task;
@@ -22,7 +22,7 @@ void task_fifo_append(struct task_fifo* fifo, struct task * task){
   pthread_mutex_unlock(&(fifo->mutex));
 }
 
-void task_fifo_pop(struct task_fifo* fifo){
+void task_fifo_pop(struct mp_task_fifo* fifo){
   pthread_mutex_lock(&(fifo->mutex));
   if(!task_fifo_is_empty(fifo)){
     if(fifo->first == fifo->last)

+ 25 - 21
src/drivers/mp_common/task_fifo.h

@@ -23,36 +23,40 @@
 #include <common/config.h>
 
 
-struct task{
-  struct _starpu_mp_node *node;
-  void (*kernel)(void **, void *);
-  void *interfaces[STARPU_NMAXBUFS]; 
-  void *cl_arg;
-  unsigned coreid;
-
-  /*the next task of the fifo*/
-  struct task * next;
+struct mp_task{
+	struct _starpu_mp_node *node;
+	void (*kernel)(void **, void *);
+	void *interfaces[STARPU_NMAXBUFS]; 
+	void *cl_arg;
+	unsigned coreid;
+	enum starpu_codelet_type type;
+	int is_parallel_task;
+	int combined_worker_size;
+	int combined_worker[STARPU_NMAXWORKERS];
+
+	/*the next task of the fifo*/
+	struct mp_task * next;
 };
 
 
-struct task_fifo{
-  /*the first task of the fifo*/
-  struct task * first;
+struct mp_task_fifo{
+	/*the first task of the fifo*/
+	struct mp_task * first;
   
-  /*the last task of the fifo*/
-  struct task * last;
+	/*the last task of the fifo*/
+	struct mp_task * last;
 
-  /*mutex to protect concurrent access on the fifo*/
-  pthread_mutex_t mutex;
+	/*mutex to protect concurrent access on the fifo*/
+	pthread_mutex_t mutex;
 };
 
 
-void task_fifo_init(struct task_fifo* fifo);
+void task_fifo_init(struct mp_task_fifo* fifo);
 
-int task_fifo_is_empty(struct task_fifo* fifo);
+int task_fifo_is_empty(struct mp_task_fifo* fifo);
 
-void task_fifo_append(struct task_fifo* fifo, struct task * task);
+void task_fifo_append(struct mp_task_fifo* fifo, struct mp_task * task);
 
-void task_fifo_pop(struct task_fifo* fifo);
+void task_fifo_pop(struct mp_task_fifo* fifo);
 
-#endif /* __TASK_FIFO_H__*/
+#endif /*__TASK_FIFO_H__*/

+ 12 - 4
src/drivers/scc/driver_scc_sink.c

@@ -27,6 +27,14 @@
 void _starpu_scc_sink_init(struct _starpu_mp_node *node)
 {
 	node->mp_connection.scc_nodeid = _starpu_scc_common_get_src_node_id();
+
+	/****************
+	 *     TODO     *
+	 * get nb_cores *
+	 ****************/
+	node->nb_cores = 1; 
+
+	_starpu_sink_common_init(node);
 }
 
 void _starpu_scc_sink_deinit(struct _starpu_mp_node *node)
@@ -53,10 +61,10 @@ void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int s
 
 void _starpu_scc_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread)
 {
-  /****************
-        TODO
-  ****************/
-  STARPU_ASSERT(0);
+	/****************
+	 *     TODO     *
+	 ****************/
+	STARPU_ASSERT(0);
 }
 
 

+ 1 - 1
tests/microbenchs/async_tasks_overhead.c

@@ -31,7 +31,7 @@ static double cumulated_pop = 0.0;
 
 void dummy_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STARPU_ATTRIBUTE_UNUSED)
 {
-  usleep(3000);
+	usleep(10000);
 }
 
 static struct starpu_codelet dummy_codelet =

+ 2 - 1
tests/parallel_tasks/explicit_combined_worker.c

@@ -24,7 +24,7 @@
 #define N	1000
 #define VECTORSIZE	1024
 
-static void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
@@ -45,6 +45,7 @@ static struct starpu_codelet cl =
 	.cpu_funcs = {codelet_null, NULL},
 	.cuda_funcs = {codelet_null, NULL},
         .opencl_funcs = {codelet_null, NULL},
+	.cpu_funcs_name = {"codelet_null", NULL},
 	.nbuffers = 1,
 	.modes = {STARPU_R}
 };

+ 2 - 1
tests/parallel_tasks/parallel_kernels.c

@@ -24,7 +24,7 @@
 #define N	1000
 #define VECTORSIZE	1024
 
-static void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
@@ -51,6 +51,7 @@ static struct starpu_codelet cl =
 	.cpu_funcs = {codelet_null, NULL},
 	.cuda_funcs = {codelet_null, NULL},
         .opencl_funcs = {codelet_null, NULL},
+	.cpu_funcs_name = {"codelet_null", NULL},
 	.model = &model,
 	.nbuffers = 1,
 	.modes = {STARPU_R}

+ 2 - 1
tests/parallel_tasks/parallel_kernels_spmd.c

@@ -24,7 +24,7 @@
 #define N	1000
 #define VECTORSIZE	1024
 
-static void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
@@ -53,6 +53,7 @@ static struct starpu_codelet cl =
 	.cpu_funcs = {codelet_null, NULL},
 	.cuda_funcs = {codelet_null, NULL},
         .opencl_funcs = {codelet_null, NULL},
+	.cpu_funcs_name = {"codelet_null", NULL},
 	.model = &model,
 	.nbuffers = 1,
 	.modes = {STARPU_R}

+ 2 - 1
tests/parallel_tasks/spmd_peager.c

@@ -26,7 +26,7 @@
 starpu_data_handle_t v_handle;
 static unsigned *v;
 
-static void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
@@ -51,6 +51,7 @@ static struct starpu_codelet cl =
 	.cpu_funcs = {codelet_null, NULL},
 	.cuda_funcs = {codelet_null, NULL},
         .opencl_funcs = {codelet_null, NULL},
+	.cpu_funcs_name = {"codelet_null", NULL},
 	.nbuffers = 1,
 	.modes = {STARPU_R}
 };