Parcourir la source

mic: update parallels tasks (with sync worker and combined worker)

Thibaud Lambert il y a 11 ans
Parent
commit
cd5faf37f8

+ 4 - 0
src/common/barrier_counter.h

@@ -14,6 +14,9 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#ifndef __BARRIER_COUNTER_H__
+#define __BARRIER_COUNTER_H__
+
 #include <common/utils.h>
 #include <common/barrier.h>
 
@@ -37,3 +40,4 @@ int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_
 
 int _starpu_barrier_counter_increment(struct _starpu_barrier_counter *barrier_c);
 
+#endif

+ 2 - 2
src/core/topology.c

@@ -683,8 +683,8 @@ _starpu_init_mp_config (struct _starpu_machine_config *config,
 		if (0 == _starpu_init_mic_node (config, i, &handles[i], &process[i]))
 			topology->nmicdevices++;
 
-	i = 0;
-	for (; i < topology->nmicdevices; i++)
+	
+	for (i = 0; i < topology->nmicdevices; i++)
 		_starpu_init_mic_config (config, user_conf, i);
 #endif
 }

+ 2 - 3
src/drivers/mic/driver_mic_common.c

@@ -37,10 +37,9 @@ void _starpu_mic_common_send(const struct _starpu_mp_node *node, void *msg, int
 }
 
 
-/* 
- *
+/* Teel is the mic endpoint is ready
+ * return 1 if a message has been receive, 0 if no message has been receive
  */
-
 int _starpu_mic_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
 {
   struct scif_pollepd pollepd;

+ 8 - 12
src/drivers/mic/driver_mic_sink.c

@@ -31,7 +31,6 @@
 /* Initialize the MIC sink, initializing connection to the source
  * and to the other devices (not implemented yet).
  */
-
 void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 {
 	pthread_t thread, self;
@@ -43,7 +42,7 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	/*Bind on the first core*/
 	self = pthread_self();
 	CPU_ZERO(&cpuset);
-	CPU_SET(0,&cpuset);
+	CPU_SET(241,&cpuset);
 	pthread_setaffinity_np(self,sizeof(cpu_set_t),&cpuset);
 
 
@@ -56,9 +55,6 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	
 	node->is_running = 1;
 
-	//init the set
-	CPU_ZERO(&cpuset);
-
 	node->nb_cores = COISysGetHardwareThreadCount() - COISysGetHardwareThreadCount() / COISysGetCoreCount();
 	node->thread_table = malloc(sizeof(pthread_t)*node->nb_cores);
 
@@ -67,8 +63,10 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 
 	node->barrier_list = mp_barrier_list_new();
 	node->message_queue = mp_message_list_new();
-	pthread_mutex_init(&node->message_queue_mutex,NULL);
-	pthread_mutex_init(&node->barrier_mutex,NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&node->message_queue_mutex,NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&node->barrier_mutex,NULL);
+
+	STARPU_PTHREAD_BARRIER_INIT(&node->init_completed_barrier, NULL, node->nb_cores+1);
 
 
 	/*for each core init the mutex, the task pointer and launch the thread */
@@ -89,7 +87,6 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 
 		/*prepare the argument for the thread*/
 		arg= malloc(sizeof(struct arg_sink_thread));
-		arg->task = &node->run_table[i];
 		arg->coreid = i;
 		arg->node = node;
 		arg->sem = &node->sem_run_table[i];
@@ -114,7 +111,6 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 
 /* Deinitialize the MIC sink, close all the connections.
  */
-
 void _starpu_mic_sink_deinit(struct _starpu_mp_node *node)
 {
 	
@@ -134,9 +130,9 @@ void _starpu_mic_sink_deinit(struct _starpu_mp_node *node)
 	mp_barrier_list_delete(node->barrier_list);
 	mp_message_list_delete(node->message_queue);
 
-	pthread_mutex_destroy(&node->message_queue_mutex);
-	pthread_mutex_destroy(&node->barrier_mutex);
-	
+	STARPU_PTHREAD_MUTEX_DESTROY(&node->message_queue_mutex);
+	STARPU_PTHREAD_MUTEX_DESTROY(&node->barrier_mutex);
+	STARPU_PTHREAD_BARRIER_DESTROY(&node->init_completed_barrier);
 	//unsigned int i;
 
 	//for (i = 0; i < node->nb_mp_sinks; ++i)

+ 8 - 4
src/drivers/mp_common/mp_common.h

@@ -60,6 +60,7 @@ enum _starpu_mp_command
 	STARPU_EXECUTION_SUBMITTED,
 	STARPU_EXECUTION_COMPLETED,
 	STARPU_PRE_EXECUTION,
+	STARPU_SYNC_WORKERS,
 };
 
 enum _starpu_mp_node_kind
@@ -118,8 +119,6 @@ struct mp_task
 	enum starpu_codelet_type type;
 	int is_parallel_task;
 	int combined_workerid;
-	int combined_worker_size;
-	int combined_worker[STARPU_NMAXWORKERS];
  	struct mp_barrier* mp_barrier;
 };
 
@@ -130,6 +129,8 @@ struct _starpu_mp_node
 {
 	enum _starpu_mp_node_kind kind;
 
+	int baseworkerid;
+
 	/*the number of core on the device
 	 * Must be initialized during init function*/
 	int nb_cores;
@@ -173,16 +174,19 @@ struct _starpu_mp_node
 	 *  - sink_sink_dt_connections[j] is not initialized for the sink number j. */
 	union _starpu_mp_connection *sink_sink_dt_connections;
 
+	/* */
+	_starpu_pthread_barrier_t init_completed_barrier; 
+	
 	/* table to store pointer of the thread workers*/
 	void* thread_table;
 
         /*list where threads add messages to send to the source node */
         struct mp_message_list* message_queue;
-	pthread_mutex_t message_queue_mutex;
+	starpu_pthread_mutex_t message_queue_mutex;
 
 	/*list of barrier for combined worker*/
 	struct mp_barrier_list* barrier_list;
-	pthread_mutex_t barrier_mutex;
+	starpu_pthread_mutex_t barrier_mutex;
 
 	/*table where worker comme pick task*/
 	struct mp_task ** run_table;

+ 222 - 91
src/drivers/mp_common/sink_common.c

@@ -21,6 +21,8 @@
 #include <drivers/mp_common/mp_common.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <common/barrier.h>
+#include <core/workers.h>
+#include <common/barrier_counter.h>
 #ifdef STARPU_USE_MIC
 #include <common/COISysInfo_common.h>
 #endif
@@ -150,6 +152,81 @@ static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_no
 	mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
 }
 
+
+/* Receive workers and combined workers and store them into the struct config
+ */
+static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void *arg, int arg_size)
+{
+	/* Retrieve information from the message */
+	STARPU_ASSERT(arg_size == (sizeof(int)*5));
+	void * arg_ptr = arg;
+	int i;
+	
+	int nworkers = *(int *)arg_ptr; 
+	arg_ptr += sizeof(nworkers);
+
+	int worker_size = *(int *)arg_ptr;
+	arg_ptr += sizeof(worker_size);
+
+	int combined_worker_size = *(int *)arg_ptr;
+	arg_ptr += sizeof(combined_worker_size);
+	
+	int baseworkerid = *(int *)arg_ptr;
+	arg_ptr += sizeof(baseworkerid);
+
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	config->topology.nworkers = *(int *)arg_ptr;
+
+
+	/* Retrieve workers */
+	struct _starpu_worker * workers = &config->workers[baseworkerid];
+	node->dt_recv(node,workers,worker_size);
+	
+	/* Update workers to have coherent field */
+	for(i=0; i<nworkers; i++)
+	{
+		workers[i].config = config;
+		starpu_pthread_mutex_init(&workers[i].mutex,NULL);
+		starpu_pthread_mutex_destroy(&workers[i].mutex);
+
+		starpu_pthread_cond_init(&workers[i].started_cond,NULL);
+		starpu_pthread_cond_destroy(&workers[i].started_cond);
+
+		starpu_pthread_cond_init(&workers[i].ready_cond,NULL);
+		starpu_pthread_cond_destroy(&workers[i].ready_cond);
+
+		starpu_pthread_mutex_init(&workers[i].sched_mutex,NULL);
+		starpu_pthread_mutex_destroy(&workers[i].sched_mutex);
+
+		starpu_pthread_cond_init(&workers[i].sched_cond,NULL);
+		starpu_pthread_cond_destroy(&workers[i].sched_cond);
+
+		workers[i].current_task = NULL;
+		workers[i].set = NULL;
+		workers[i].terminated_jobs = NULL;
+		workers[i].sched_ctx = NULL;
+	
+		//_starpu_barrier_counter_init(&workers[i].tasks_barrier, 1);
+		//_starpu_barrier_counter_destroy(&workers[i].tasks_barrier);
+
+		starpu_pthread_mutex_init(&workers[i].parallel_sect_mutex,NULL);
+		starpu_pthread_mutex_destroy(&workers[i].parallel_sect_mutex);
+
+		starpu_pthread_cond_init(&workers[i].parallel_sect_cond,NULL);
+		starpu_pthread_cond_destroy(&workers[i].parallel_sect_cond);
+
+	}
+
+	/* Retrieve combined workers */
+	struct _starpu_combined_worker * combined_workers = config->combined_workers; 
+	node->dt_recv(node, combined_workers, combined_worker_size);
+
+	node->baseworkerid = baseworkerid;
+	STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);	
+}
+
+
+
 /* Function looping on the sink, waiting for tasks to execute.
  * If the caller is the host, don't do anything.
  */
@@ -170,6 +247,10 @@ void _starpu_sink_common_worker(void)
 	/* Create and initialize the node */
 	node = _starpu_mp_common_node_create(node_kind, -1);
 
+	starpu_pthread_key_t worker_key;
+	STARPU_PTHREAD_KEY_CREATE(&worker_key, NULL);
+
+
 	while (!exit_starpu)
 	{
 		/* If we have received a message */
@@ -216,6 +297,9 @@ void _starpu_sink_common_worker(void)
 					_starpu_sink_common_copy_to_sink(node, arg, arg_size);
 					break;
 
+				case STARPU_SYNC_WORKERS:
+					_starpu_sink_common_recv_workers(node, arg, arg_size);
+					break;
 				default:
 					printf("Oops, command %x unrecognized\n", command);
 			}
@@ -245,97 +329,187 @@ void _starpu_sink_common_worker(void)
 	exit(0);
 }
 
+
+/* Search for the mp_barrier correspondind to the specified combined worker 
+ * and create it if it doesn't exist
+ */
+static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
+{
+	struct mp_barrier * b = NULL;
+	STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
+	/* Search if the barrier already exist */
+	for(b = mp_barrier_list_begin(node->barrier_list); 
+			b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid; 
+			b = mp_barrier_list_next(b));
+
+	/* If we found the barrier */
+	if(b != NULL)
+	{
+		STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
+		return b;
+	}
+	else
+	{
+
+		/* Else we create, initialize and add it to the list*/
+		b = mp_barrier_new();
+		b->id = cb_workerid;
+		STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
+		STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
+		mp_barrier_list_push_back(node->barrier_list,b);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
+		return b;
+	}
+}
+
+
+/* Erase for the mp_barrier correspondind to the specified combined worker
+*/
+static void _starpu_sink_common_erase_barrier(struct _starpu_mp_node * node, struct mp_barrier *barrier)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
+	mp_barrier_list_erase(node->barrier_list,barrier);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->barrier_mutex);
+}
+
+/* Append the message given in parameter to the message list
+ */
+static void _starpu_sink_common_append_message(struct _starpu_mp_node *node, struct mp_message * message)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
+	mp_message_list_push_front(node->message_queue,message);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
+
+}
+/* Append to the message list a "STARPU_PRE_EXECUTION" message
+ */
+static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *node, struct mp_task *task)
+{
+	/* Init message to tell the sink that the execution has begun */
+	struct mp_message * message = mp_message_new();
+	message->type = STARPU_PRE_EXECUTION;
+	*(int *) message->buffer = task->combined_workerid;
+	message->size = sizeof(task->combined_workerid);
+
+
+	/* Append the message to the queue */	
+	_starpu_sink_common_append_message(node, message);
+
+}
+
+/* Append to the message list a "STARPU_EXECUTION_COMPLETED" message
+ */
+static void _starpu_sink_common_execution_completed_message(struct _starpu_mp_node *node, struct mp_task *task)
+{
+	/* Init message to tell the sink that the execution is completed */
+	struct mp_message * message = mp_message_new();
+	message->type = STARPU_EXECUTION_COMPLETED;
+	message->size = sizeof(task->coreid);
+	*(int*) message->buffer = task->coreid;
+
+	/* Append the message to the queue */
+	_starpu_sink_common_append_message(node, message);
+}
+
+
+/* Bind the thread which is running on the specified core to the combined worker */
+static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *node, int coreid, struct _starpu_combined_worker * combined_worker)
+{
+	int i;
+	int * bind_set = malloc(sizeof(int)*combined_worker->worker_size);
+	for(i=0;i<combined_worker->worker_size;i++)
+		bind_set[i] = combined_worker->combined_workerid[i] - node->baseworkerid;
+	node->bind_thread(node, coreid, bind_set, combined_worker->worker_size);
+}
+
+
 /* The main function executed by the thread 
  * thread_arg is a structure containing the information needed by the thread
  */
 void* _starpu_sink_thread(void * thread_arg)
 {
 	/* Retrieve the information from the structure */
-	struct mp_task **task = ((struct arg_sink_thread *)thread_arg)->task;
 	struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
 	sem_t * sem = ((struct arg_sink_thread *)thread_arg)->sem;
 	int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
 	/* free the structure */
 	free(thread_arg);
 
+	STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);	
+	
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	struct _starpu_worker * worker = &config->workers[node->baseworkerid + coreid];
+
+	_starpu_set_local_worker_key(worker);
+	struct _starpu_combined_worker * combined_worker; 
 	while(node->is_running)
 	{
 		/*Wait there is a task available */
 		sem_wait(sem);
-
-		if((*task) != NULL)
+		struct mp_task *task = node->run_table[coreid];
+		combined_worker = NULL;
+		if(task != NULL)
 		{
 
 			/* If it's a parallel task */
-			if((*task)->is_parallel_task)
+			if(task->is_parallel_task)
 			{
+				combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
 				/* Synchronize with others threads of the combined worker*/
-				STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->before_work_barrier);
+				STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
 
-				/* The first thread of the combined worker 
-				 * tell the sink that the execution has begun
-				 */
-				if((*task)->coreid == (*task)->combined_worker[0])
-				{
-					/* Init message to tell the sink that the execution has begun */
-					struct mp_message * message = mp_message_new();
-					message->type = STARPU_PRE_EXECUTION;
-					*(int *) message->buffer = (*task)->combined_workerid;
-					message->size = sizeof((*task)->combined_workerid);
 
-					/* Append the message to the queue */	
-					pthread_mutex_lock(&node->message_queue_mutex);
-					mp_message_list_push_front(node->message_queue,message);
-					pthread_mutex_unlock(&node->message_queue_mutex);
+				/* The first thread of the combined worker */
+				if(worker->workerid == combined_worker->combined_workerid[0])
+				{
+					/* tell the sink that the execution has begun */
+					_starpu_sink_common_pre_execution_message(node,task);
 
 					/* If the mode is FORKJOIN, 
-					 * the first thread binds himself on all core of the combined worker 
-					 */
-					if((*task)->type == STARPU_FORKJOIN)
-						node->bind_thread(node, coreid, (*task)->combined_worker, (*task)->combined_worker_size);
+					 * the first thread binds himself 
+					 * on all core of the combined worker*/
+					if(task->type == STARPU_FORKJOIN)
+					{
+						_starpu_sink_common_bind_to_combined_worker(node, coreid, combined_worker);
+					}
 				}
 			}
-			if((*task)->type != STARPU_FORKJOIN || (*task)->coreid == (*task)->combined_worker[0])
+			if(task->type != STARPU_FORKJOIN || worker->workerid == combined_worker->combined_workerid[0])
 			{
+				if(combined_worker != NULL)
+					worker->worker_size = combined_worker->worker_size;
+				else 
+					worker->worker_size = 1;
 				/* execute the task */
-				(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
+				task->kernel(task->interfaces,task->cl_arg);
 			}
 
 			/* If it's a parallel task */
-			if((*task)->is_parallel_task)
+			if(task->is_parallel_task)
 			{
 				/* Synchronize with others threads of the combined worker*/
-				STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->after_work_barrier);
+				STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->after_work_barrier);
 
 				/* The fisrt thread of the combined */
-				if((*task)->coreid == (*task)->combined_worker[0])
+				if(worker->workerid == combined_worker->combined_workerid[0])
 				{
 					/* Erase the barrier from the list */
-					pthread_mutex_lock(&node->barrier_mutex);
-					mp_barrier_list_erase(node->barrier_list,(*task)->mp_barrier);
-					pthread_mutex_unlock(&node->barrier_mutex);
+					_starpu_sink_common_erase_barrier(node,task->mp_barrier);
 
 					/* If the mode is FORKJOIN, 
-					 * the first thread rebinds himself on his own core 
-					 */
-					if((*task)->type == STARPU_FORKJOIN)
+					 * the first thread rebinds himself on his own core */
+					if(task->type == STARPU_FORKJOIN)
 						node->bind_thread(node, coreid, &coreid, 1);
 
 				}
 			}
-			/* Init message to tell the sink that the execution is completed */
-			struct mp_message * message = mp_message_new();
-			message->type = STARPU_EXECUTION_COMPLETED;
-			message->size = sizeof((*task)->coreid);
-			*(int*) message->buffer = (*task)->coreid;
-
-			free(*task);
-			(*task) = NULL;
-
-			/* Append the message to the queue */
-			pthread_mutex_lock(&node->message_queue_mutex);
-			mp_message_list_push_front(node->message_queue,message);
-			pthread_mutex_unlock(&node->message_queue_mutex);
+			
+			node->run_table[coreid] = NULL;
+
+			/* tell the sink that the execution is completed */
+			_starpu_sink_common_execution_completed_message(node,task);
+
+			free(task);
 		}
 
 	}
@@ -353,39 +527,6 @@ static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, str
 	sem_post(&node->sem_run_table[task->coreid]);
 }
 
-/* Search for the mp_barrier correspondind to the specified combined worker 
- * and create it if it doesn't exist
- */
-static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
-{
-	struct mp_barrier * b = NULL;
-	pthread_mutex_lock(&node->barrier_mutex);
-
-	/* Search if the barrier already exist */
-	for(b = mp_barrier_list_begin(node->barrier_list); 
-			b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid; 
-			b = mp_barrier_list_next(b));
-
-	/* If we found the barrier */
-	if(b != NULL && b->id == cb_workerid)
-	{
-		pthread_mutex_unlock(&node->barrier_mutex);
-		return b;
-	}
-	else
-	{
-
-		/* Else we create, initialize and add it to the list*/
-		b = mp_barrier_new();
-		b->id = cb_workerid;
-		STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
-		STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
-		mp_barrier_list_push_back(node->barrier_list,b);
-		pthread_mutex_unlock(&node->barrier_mutex);
-		return b;
-	}
-}
-
 
 
 /* Receive paquet from _starpu_src_common_execute_kernel in the form below :
@@ -398,7 +539,6 @@ static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_nod
 void _starpu_sink_common_execute(struct _starpu_mp_node *node,
 		void *arg, int arg_size)
 {
-	int id = 0;
 	unsigned nb_interfaces, i;
 
 	void *arg_ptr = arg;
@@ -418,16 +558,7 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
 		task->combined_workerid= *(int *) arg_ptr;
 		arg_ptr += sizeof(task->combined_workerid);
 
-		task->combined_worker_size = *(int *) arg_ptr;
-		arg_ptr += sizeof(task->combined_worker_size);
-
-		for (id = 0; id < task->combined_worker_size; id++)
-		{
-
-			task->combined_worker[id] = *(int*) arg_ptr;
-			arg_ptr += sizeof(task->combined_worker[id]);
-		}
-		task->mp_barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,task->combined_worker_size);
+		task->mp_barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,_starpu_get_combined_worker_struct(task->combined_workerid)->worker_size);
 	}
 
 	task->coreid = *(unsigned *) arg_ptr;

+ 0 - 1
src/drivers/mp_common/sink_common.h

@@ -34,7 +34,6 @@ struct _starpu_sink_topology
 
 struct arg_sink_thread
 {
-	struct mp_task ** task;
 	struct _starpu_mp_node *node;
 	sem_t* sem;
 	int coreid;

+ 30 - 18
src/drivers/mp_common/source_common.c

@@ -238,9 +238,8 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 {
 
 	void *buffer, *buffer_ptr, *arg =NULL;
-	int j, buffer_size = 0, cb_worker_size = 0, arg_size =0;
-	struct _starpu_combined_worker * cb_worker;
-	unsigned devid ,i;
+	int buffer_size = 0, arg_size =0;
+	unsigned i;
 
 	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
 		+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
@@ -248,9 +247,7 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	/*if the task is paralle*/
 	if(is_parallel_task)
 	{
-		cb_worker = _starpu_get_combined_worker_struct(cb_workerid);
-		cb_worker_size = cb_worker->worker_size;
-		buffer_size += sizeof(cb_workerid) + sizeof(cb_worker_size) + cb_worker_size * sizeof(devid);
+		buffer_size += sizeof(cb_workerid); 
 	}
 
 	/* If the user didn't give any cl_arg, there is no need to send it */
@@ -277,19 +274,8 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 
 	if(is_parallel_task)
 	{
-
-		*(int *) buffer_ptr = cb_workerid;
+		*(int *) buffer_ptr = cb_workerid ;
 		buffer_ptr += sizeof(cb_workerid);
-
-		*(int *) buffer_ptr = cb_worker_size;
-		buffer_ptr += sizeof(cb_worker_size);
-
-		for (j = 0; j < cb_worker_size; j++)
-		{
-			int devid = _starpu_get_worker_struct(cb_worker->combined_workerid[j])->devid;
-			*(int *) buffer_ptr = devid;
-			buffer_ptr += sizeof(devid);
-		}
 	}
 
 	*(unsigned *) buffer_ptr = coreid;
@@ -579,6 +565,30 @@ int _starpu_src_common_locate_file(char *located_file_name,
 	return 1;
 }
 
+/* Send workers to the sink node 
+ */
+static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
+{	
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	int worker_size = sizeof(struct _starpu_worker)*nworkers;	
+	int combined_worker_size = STARPU_NMAX_COMBINEDWORKERS*sizeof(struct _starpu_combined_worker);
+	int msg[5];
+	msg[0] = nworkers;
+	msg[1] = worker_size;
+	msg[2] = combined_worker_size;
+	msg[3] = baseworkerid;
+	msg[4] = starpu_worker_get_count();
+
+	/* tell the sink node that we will send him all workers */
+	_starpu_mp_common_send_command(node, STARPU_SYNC_WORKERS, 
+			&msg, sizeof(msg));
+
+	/* Send all worker to the sink node */
+	node->dt_send(node,&config->workers[baseworkerid],worker_size);
+
+	/* Send all combined workers to the sink node */
+	node->dt_send(node, &config->combined_workers,combined_worker_size);
+}	
 
 /* Function looping on the source node */
 void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
@@ -589,6 +599,8 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 	unsigned memnode = baseworker->memory_node;
 	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
 
+	_starpu_src_common_send_workers(mp_node, baseworkerid, worker_set->nworkers);
+
 	/*main loop*/
 	while (_starpu_machine_is_running())
 	{

+ 4 - 5
tests/parallel_tasks/spmd_peager.c

@@ -30,14 +30,13 @@ void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
-//	int worker_size = starpu_combined_worker_get_size();
-//	STARPU_ASSERT(worker_size > 0);
+	int worker_size = starpu_combined_worker_get_size();
+	STARPU_ASSERT(worker_size > 0);
 
-//	FPRINTF(stderr, "WORKERSIZE : %d\n", worker_size);
+	//FPRINTF(stderr, "WORKERSIZE : %d\n", worker_size);
 
-//	usleep(1000/worker_size);
+	usleep(1000/worker_size);
 
-	usleep(1000);
 #if 0
 	int id = starpu_worker_get_id();
 	int combined_id = starpu_combined_worker_get_id();