Procházet zdrojové kódy

mic: multi thread working

Thibaud Lambert před 12 roky
rodič
revize
32b2a57c47

+ 57 - 35
src/drivers/driver_common/driver_common.c

@@ -95,7 +95,7 @@ void _starpu_driver_end_job(struct _starpu_worker *args, struct _starpu_job *j,
 	}
 
 	if (starpu_top)
-	  _starpu_top_task_ended(task,workerid,codelet_end);
+		_starpu_top_task_ended(task,workerid,codelet_end);
 
 	args->status = STATUS_UNKNOWN;
 }
@@ -129,9 +129,9 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 			profiling_info->workerid = workerid;
 
 			_starpu_worker_update_profiling_info_executing(workerid, &measured_ts, 1,
-				profiling_info->used_cycles,
-				profiling_info->stall_cycles,
-				profiling_info->power_consumed);
+								       profiling_info->used_cycles,
+								       profiling_info->stall_cycles,
+								       profiling_info->power_consumed);
 			updated =  1;
 		}
 
@@ -150,6 +150,29 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 	}
 }
 
+
+
+static void _starpu_worker_set_status_sleeping(int workerid)
+{
+	if (_starpu_worker_get_status(workerid) != STATUS_SLEEPING)
+	{
+		_STARPU_TRACE_WORKER_SLEEP_START;
+		_starpu_worker_restart_sleeping(workerid);
+		_starpu_worker_set_status(workerid, STATUS_SLEEPING);
+	}
+
+}
+
+static void _starpu_worker_set_status_wakeup(int workerid)
+{
+	if (_starpu_worker_get_status(workerid) == STATUS_SLEEPING)
+	{
+		_STARPU_TRACE_WORKER_SLEEP_END;
+		_starpu_worker_stop_sleeping(workerid);
+		_starpu_worker_set_status(workerid, STATUS_UNKNOWN);
+	}
+}
+
 /* Workers may block when there is no work to do at all. */
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode)
 {
@@ -175,12 +198,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 		 * driver may go block just after the scheduler got a new task to be
 		 * executed, and thus hanging. */
 
-		if (_starpu_worker_get_status(workerid) != STATUS_SLEEPING)
-		{
-			_STARPU_TRACE_WORKER_SLEEP_START;
-			_starpu_worker_restart_sleeping(workerid);
-			_starpu_worker_set_status(workerid, STATUS_SLEEPING);
-		}
+		_starpu_worker_set_status_sleeping(workerid);
 
 		if (_starpu_worker_can_block(memnode))
 			STARPU_PTHREAD_COND_WAIT(&args->sched_cond, &args->sched_mutex);
@@ -208,12 +226,7 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
 
-	if (_starpu_worker_get_status(workerid) == STATUS_SLEEPING)
-	{
-		_STARPU_TRACE_WORKER_SLEEP_END;
-		_starpu_worker_stop_sleeping(workerid);
-		_starpu_worker_set_status(workerid, STATUS_UNKNOWN);
-	}
+	_starpu_worker_set_status_wakeup(workerid);
 
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event)
@@ -229,23 +242,32 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 
 int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworkers)
 {
-  int i, count = 0;
-  //_STARPU_DEBUG(" nworkers:%d\n", nworkers);
-
-  for (i = 0 ; (i < nworkers); i++){
-    if(workers[i].current_task)
-      tasks[i] = NULL;
-    else
-      {
-	//_STARPU_DEBUG(" try pop task\n");
-	tasks[i] = _starpu_pop_task(&workers[i]);
-	if(tasks[i] != NULL)
-	  count ++;
-	//else
-	  //_STARPU_DEBUG(" pop task fail\n");
-	  
-      }
-  }
-  //  _STARPU_DEBUG("count:%d\n", count);
-  return count;
+	int i, count = 0;
+	/*for each worker*/
+	for (i = 1; (i < nworkers); i++)
+	{
+		/*if the worker is already executinf a task then */
+		if(workers[i].current_task)
+		{
+			tasks[i] = NULL;
+		}
+		/*else try to pop a task*/
+		else
+		{
+			STARPU_PTHREAD_MUTEX_LOCK(&workers[i].sched_mutex);
+			tasks[i] = _starpu_pop_task(&workers[i]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
+			if(tasks[i] != NULL)
+			{
+				count ++;
+				_starpu_worker_set_status_sleeping(workers[i].workerid);
+			}
+			else
+			{
+				_starpu_worker_set_status_wakeup(workers[i].workerid);
+			}
+		}
+	}
+	return count;
 }
+

+ 1 - 1
src/drivers/mic/driver_mic_common.c

@@ -129,7 +129,7 @@ void _starpu_mic_common_accept(scif_epd_t *endpoint, uint16_t port_number)
 	_STARPU_DEBUG("MIC accepting connection on %u...\n", port_number);
 	if ((scif_accept(init_epd, &portID, endpoint, SCIF_ACCEPT_SYNC)) < 0)
 		STARPU_MIC_COMMON_REPORT_SCIF_ERROR(errno);
-	_STARPU_DEBUG("done\n", init_epd);
+	_STARPU_DEBUG("done : %d\n", init_epd);
 
 	scif_close(init_epd);
 }

+ 109 - 109
src/drivers/mp_common/mp_common.c

@@ -31,8 +31,8 @@
  * all the pointer of functions are linked to the right ones.
  */
 struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
-    _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
-				  int peer_id)
+_starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
+			      int peer_id)
 {
 	struct _starpu_mp_node *node;
 
@@ -47,123 +47,123 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 	switch(node->kind)
 	{
 #ifdef STARPU_USE_MIC
-		case STARPU_MIC_SOURCE:
-			{
-				node->nb_mp_sinks = starpu_mic_worker_get_count();
-				node->devid = peer_id;
-
-				node->init = _starpu_mic_src_init;
-				node->deinit = _starpu_mic_src_deinit;
-				node->report_error = _starpu_mic_src_report_scif_error;
-
-				node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
-				node->mp_send = _starpu_mic_common_send;
-				node->mp_recv = _starpu_mic_common_recv;
-				node->dt_send = _starpu_mic_common_dt_send;
-				node->dt_recv = _starpu_mic_common_dt_recv;
-
-				node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
-				node->bind_thread = NULL;
-				node->execute = NULL;
-				node->nbcores = NULL;
-				node->allocate = NULL;
-				node->free = NULL;
-
-				/* A source node is only working on one core,
-				 * there is no need for this function */
-				node->get_nb_core = NULL;
-			}
-			break;
-
-		case STARPU_MIC_SINK:
-			{
-				node->devid = atoi(getenv("DEVID"));;
-				node->nb_mp_sinks = atoi(getenv("NB_MIC"));
-
-				node->init = _starpu_mic_sink_init;
-				node->deinit = _starpu_mic_sink_deinit;
-				node->report_error = _starpu_mic_sink_report_error;
-
-				node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
-				node->mp_send = _starpu_mic_common_send;
-				node->mp_recv = _starpu_mic_common_recv;
-				node->dt_send = _starpu_mic_common_dt_send;
-				node->dt_recv = _starpu_mic_common_dt_recv;
-
-				node->get_kernel_from_job = NULL;
-				node->bind_thread = _starpu_mic_sink_bind_thread;
-				node->execute = _starpu_sink_common_execute;
-				node->nbcores = _starpu_sink_nbcores;
-				node->allocate = _starpu_mic_sink_allocate;
-				node->free = _starpu_mic_sink_free;
-
-				node->get_nb_core = _starpu_mic_sink_get_nb_core;
-			}
-			break;
+	case STARPU_MIC_SOURCE:
+	{
+		node->nb_mp_sinks = starpu_mic_worker_get_count();
+		node->devid = peer_id;
+
+		node->init = _starpu_mic_src_init;
+		node->deinit = _starpu_mic_src_deinit;
+		node->report_error = _starpu_mic_src_report_scif_error;
+
+		node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
+		node->mp_send = _starpu_mic_common_send;
+		node->mp_recv = _starpu_mic_common_recv;
+		node->dt_send = _starpu_mic_common_dt_send;
+		node->dt_recv = _starpu_mic_common_dt_recv;
+
+		node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
+		node->bind_thread = NULL;
+		node->execute = NULL;
+		node->nbcores = NULL;
+		node->allocate = NULL;
+		node->free = NULL;
+
+		/* A source node is only working on one core,
+		 * there is no need for this function */
+		node->get_nb_core = NULL;
+	}
+	break;
+
+	case STARPU_MIC_SINK:
+	{
+		node->devid = atoi(getenv("DEVID"));;
+		node->nb_mp_sinks = atoi(getenv("NB_MIC"));
+
+		node->init = _starpu_mic_sink_init;
+		node->deinit = _starpu_mic_sink_deinit;
+		node->report_error = _starpu_mic_sink_report_error;
+
+		node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
+		node->mp_send = _starpu_mic_common_send;
+		node->mp_recv = _starpu_mic_common_recv;
+		node->dt_send = _starpu_mic_common_dt_send;
+		node->dt_recv = _starpu_mic_common_dt_recv;
+
+		node->get_kernel_from_job = NULL;
+		node->bind_thread = _starpu_mic_sink_bind_thread;
+		node->execute = _starpu_sink_common_execute;
+		node->nbcores = _starpu_sink_nbcores;
+		node->allocate = _starpu_mic_sink_allocate;
+		node->free = _starpu_mic_sink_free;
+
+		node->get_nb_core = _starpu_mic_sink_get_nb_core;
+	}
+	break;
 #endif /* STARPU_USE_MIC */
 
 #ifdef STARPU_USE_SCC
-		case STARPU_SCC_SOURCE:
-			{
-				node->init = _starpu_scc_src_init;
-				node->deinit = NULL;
-				node->report_error = _starpu_scc_common_report_rcce_error;
+	case STARPU_SCC_SOURCE:
+	{
+		node->init = _starpu_scc_src_init;
+		node->deinit = NULL;
+		node->report_error = _starpu_scc_common_report_rcce_error;
 				
-				node->mp_recv_is_ready = NULL;
-				node->mp_send = _starpu_scc_common_send;
-				node->mp_recv = _starpu_scc_common_recv;
-				node->dt_send = _starpu_scc_common_send;
-				node->dt_recv = _starpu_scc_common_recv;
-				node->dt_send_to_device = NULL;
-				node->dt_recv_from_device = NULL;
-
-				node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
-				node->bind_thread = NULL;
-				node->execute = NULL;
-				node->allocate = NULL;
-				node->free = NULL;
-
-				node->get_nb_core = NULL;
-			}
-			break;
-
-		case STARPU_SCC_SINK:
-			{
-				node->init = _starpu_scc_sink_init;
-				node->deinit = _starpu_scc_sink_deinit;
-				node->report_error = _starpu_scc_common_report_rcce_error;
-
-				node->mp_recv_is_ready = NULL;
-				node->mp_send = _starpu_scc_common_send;
-				node->mp_recv = _starpu_scc_common_recv;
-				node->dt_send = _starpu_scc_common_send;
-				node->dt_recv = _starpu_scc_common_recv;
-				node->dt_send_to_device = _starpu_scc_sink_send_to_device;
-				node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
-
-				node->get_kernel_from_job = NULL;
-				node->bind_thread = NULL;
-				node->execute = _starpu_scc_sink_execute;
-				node->allocate = _starpu_sink_common_allocate;
-				node->free = _starpu_sink_common_free;
-
-				node->get_nb_core = NULL;
-			}
-			break;
+		node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
+		node->mp_send = _starpu_scc_common_send;
+		node->mp_recv = _starpu_scc_common_recv;
+		node->dt_send = _starpu_scc_common_send;
+		node->dt_recv = _starpu_scc_common_recv;
+		node->dt_send_to_device = NULL;
+		node->dt_recv_from_device = NULL;
+
+		node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
+		node->bind_thread = _starpu_scc_sink_bind_thread;
+		node->execute = NULL;
+		node->allocate = NULL;
+		node->free = NULL;
+
+		node->get_nb_core = NULL;
+	}
+	break;
+
+	case STARPU_SCC_SINK:
+	{
+		node->init = _starpu_scc_sink_init;
+		node->deinit = _starpu_scc_sink_deinit;
+		node->report_error = _starpu_scc_common_report_rcce_error;
+
+		node->mp_recv_is_ready = _starpu_scc_common_recv_is_ready;
+		node->mp_send = _starpu_scc_common_send;
+		node->mp_recv = _starpu_scc_common_recv;
+		node->dt_send = _starpu_scc_common_send;
+		node->dt_recv = _starpu_scc_common_recv;
+		node->dt_send_to_device = _starpu_scc_sink_send_to_device;
+		node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
+
+		node->get_kernel_from_job = NULL;
+		node->bind_thread = _starpu_scc_sink_bind_thread;
+		node->execute = _starpu_scc_sink_execute;
+		node->allocate = _starpu_sink_common_allocate;
+		node->free = _starpu_sink_common_free;
+
+		node->get_nb_core = NULL;
+	}
+	break;
 #endif /* STARPU_USE_SCC */
 
 #ifdef STARPU_USE_MPI
-		case STARPU_MPI_SOURCE:
-			STARPU_ABORT();
-			break;
+	case STARPU_MPI_SOURCE:
+		STARPU_ABORT();
+		break;
 
-		case STARPU_MPI_SINK:
-			STARPU_ABORT();
-			break;
+	case STARPU_MPI_SINK:
+		STARPU_ABORT();
+		break;
 #endif /* STARPU_USE_MPI */
 
-		default:
-			STARPU_ASSERT(0);
+	default:
+		STARPU_ASSERT(0);
 	}
 
 	/* Let's allocate the buffer, we want it to be big enough to contain

+ 19 - 19
src/drivers/mp_common/mp_common.h

@@ -41,25 +41,25 @@
 
 enum _starpu_mp_command
 {
-	STARPU_EXIT = 0x00,
-	STARPU_EXECUTE = 0x01,
-	STARPU_ERROR_EXECUTE = 0x02,
-	STARPU_LOOKUP = 0X03,
-	STARPU_ANSWER_LOOKUP = 0X04,
-	STARPU_ERROR_LOOKUP = 0X05,
-	STARPU_ALLOCATE = 0x06,
-	STARPU_ANSWER_ALLOCATE = 0x07,
-	STARPU_ERROR_ALLOCATE = 0x08,
-	STARPU_FREE = 0x09,
-	STARPU_RECV_FROM_HOST = 0x10,
-	STARPU_SEND_TO_HOST = 0x11,
-	STARPU_RECV_FROM_SINK = 0x12,
-	STARPU_SEND_TO_SINK = 0x13,
-	STARPU_TRANSFER_COMPLETE = 0x14,
-	STARPU_SINK_NBCORES = 0x15,
-	STARPU_ANSWER_SINK_NBCORES = 0x16,
-	STARPU_EXECUTION_SUBMITTED = 0x17,
-	STARPU_EXECUTION_COMPLETED = 0x18,
+	STARPU_EXIT,
+	STARPU_EXECUTE,
+	STARPU_ERROR_EXECUTE,
+	STARPU_LOOKUP,
+	STARPU_ANSWER_LOOKUP,
+	STARPU_ERROR_LOOKUP,
+	STARPU_ALLOCATE,
+	STARPU_ANSWER_ALLOCATE,
+	STARPU_ERROR_ALLOCATE,
+	STARPU_FREE,
+	STARPU_RECV_FROM_HOST,
+	STARPU_SEND_TO_HOST,
+	STARPU_RECV_FROM_SINK,
+	STARPU_SEND_TO_SINK,
+	STARPU_TRANSFER_COMPLETE,
+	STARPU_SINK_NBCORES,
+	STARPU_ANSWER_SINK_NBCORES,
+	STARPU_EXECUTION_SUBMITTED,
+	STARPU_EXECUTION_COMPLETED,
 };
 
 enum _starpu_mp_node_kind

+ 64 - 64
src/drivers/mp_common/sink_common.c

@@ -55,17 +55,17 @@ static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
 void
 _starpu_sink_nbcores (const struct _starpu_mp_node *node)
 {
-    // Process packet received from `_starpu_src_common_sink_cores'.
-    int nbcores = 1;
+	// Process packet received from `_starpu_src_common_sink_cores'.
+	int nbcores = 1;
 
 #ifdef STARPU_USE_MIC
-    // XXX I currently only support MIC for now.
-    if (STARPU_MIC_SINK == _starpu_sink_common_get_kind ())
-	nbcores = COISysGetCoreCount();
+	// XXX I currently only support MIC for now.
+	if (STARPU_MIC_SINK == _starpu_sink_common_get_kind ())
+		nbcores = COISysGetCoreCount();
 #endif
 
-    _starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
-				    &nbcores, sizeof (int));
+	_starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
+					&nbcores, sizeof (int));
 }
 
 
@@ -82,28 +82,28 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 	/* If we couldn't find the function, let's send an error to the host.
 	 * The user probably made a mistake in the name */
 	if (func)
-	    _starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
+		_starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
 					       &func, sizeof(func));
 	else
-	    _starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
+		_starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
 					       NULL, 0);
 }
 
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
 				  void *arg, int arg_size)
 {
-    STARPU_ASSERT(arg_size == sizeof(size_t));
+	STARPU_ASSERT(arg_size == sizeof(size_t));
 
-    void *addr = malloc(*(size_t *)(arg));
+	void *addr = malloc(*(size_t *)(arg));
 
-    /* If the allocation fail, let's send an error to the host.
-     */
-    if (addr)
-	_starpu_mp_common_send_command(mp_node, STARPU_ANSWER_ALLOCATE,
-				       &addr, sizeof(addr));
-    else
-	_starpu_mp_common_send_command(mp_node, STARPU_ERROR_ALLOCATE,
-				       NULL, 0);
+	/* If the allocation fail, let's send an error to the host.
+	 */
+	if (addr)
+		_starpu_mp_common_send_command(mp_node, STARPU_ANSWER_ALLOCATE,
+					       &addr, sizeof(addr));
+	else
+		_starpu_mp_common_send_command(mp_node, STARPU_ERROR_ALLOCATE,
+					       NULL, 0);
 }
 
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED,
@@ -117,43 +117,43 @@ void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRI
 static void _starpu_sink_common_copy_from_host(const struct _starpu_mp_node *mp_node,
 					       void *arg, int arg_size)
 {
-    STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
-    struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
 
-    mp_node->dt_recv(mp_node, cmd->addr, cmd->size);
+	mp_node->dt_recv(mp_node, cmd->addr, cmd->size);
 }
 
 static void _starpu_sink_common_copy_to_host(const struct _starpu_mp_node *mp_node,
 					     void *arg, int arg_size)
 {
-    STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command));
 
-    struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
+	struct _starpu_mp_transfer_command *cmd = (struct _starpu_mp_transfer_command *)arg;
 
-    mp_node->dt_send(mp_node, cmd->addr, cmd->size);
+	mp_node->dt_send(mp_node, cmd->addr, cmd->size);
 }
 
 static void _starpu_sink_common_copy_from_sink(const struct _starpu_mp_node *mp_node,
 					       void *arg, int arg_size)
 {
-    STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
-    struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
+	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
 
-    mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size);
+	mp_node->dt_recv_from_device(mp_node, cmd->devid, cmd->addr, cmd->size);
 
-    _starpu_mp_common_send_command(mp_node, STARPU_TRANSFER_COMPLETE, NULL, 0);
+	_starpu_mp_common_send_command(mp_node, STARPU_TRANSFER_COMPLETE, NULL, 0);
 }
 
 static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_node,
 					     void *arg, int arg_size)
 {
-    STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
+	STARPU_ASSERT(arg_size == sizeof(struct _starpu_mp_transfer_command_to_device));
 
-    struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
+	struct _starpu_mp_transfer_command_to_device *cmd = (struct _starpu_mp_transfer_command_to_device *)arg;
 
-    mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
+	mp_node->dt_send_to_device(mp_node, cmd->devid, cmd->addr, cmd->size);
 }
 
 /* Function looping on the sink, waiting for tasks to execute.
@@ -179,14 +179,14 @@ void _starpu_sink_common_worker(void)
 	
 	while (!exit_starpu)
 	{
-	  if(node->mp_recv_is_ready(node))
-	    {	
-	      command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
-		switch(command)
-		{
-		case STARPU_EXIT:
-		  exit_starpu = 1;
-		  break;
+		if(node->mp_recv_is_ready(node))
+		{	
+			command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
+			switch(command)
+			{
+			case STARPU_EXIT:
+				exit_starpu = 1;
+				break;
 			case STARPU_EXECUTE:
 				node->execute(node, arg, arg_size);
 				break;
@@ -223,18 +223,18 @@ void _starpu_sink_common_worker(void)
 
 			default:
 				printf("Oops, command %x unrecognized\n", command);
+			}
 		}
-	    }
 
 		if(!task_fifo_is_empty(&(node->dead_queue)))
-		  {
-		    struct task * task = node->dead_queue.first;
-		    //_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
-		    _starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
-								    &(task->coreid), sizeof(task->coreid));
-		    task_fifo_pop(&(node->dead_queue));
-		    free(task);
-		  }
+		{
+			struct task * task = node->dead_queue.first;
+			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
+			_starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
+						       &(task->coreid), sizeof(task->coreid));
+			task_fifo_pop(&(node->dead_queue));
+			free(task);
+		}
 	}
 
 	/* Deinitialize the node and release it */
@@ -247,28 +247,28 @@ void _starpu_sink_common_worker(void)
 
 static void* _starpu_sink_thread(void * thread_arg)
 {
-  struct task *arg = (struct task *)thread_arg;
+	struct task *arg = (struct task *)thread_arg;
   
-  //execute the task
-  arg->kernel(arg->interfaces,arg->cl_arg);
+	//execute the task
+	arg->kernel(arg->interfaces,arg->cl_arg);
 
-  //append the finished task to the dead queue
-  task_fifo_append(&(arg->node->dead_queue),arg);
-  pthread_exit(NULL);
+	//append the finished task to the dead queue
+	task_fifo_append(&(arg->node->dead_queue),arg);
+	pthread_exit(NULL);
 }
 
 static void _starpu_sink_execute_thread(struct task *arg)
 {
-  pthread_t thread;
-  cpu_set_t cpuset;
-  int ret;
+	pthread_t thread;
+	cpu_set_t cpuset;
+	int ret;
   
-  //create the tread
-  ret = pthread_create(&thread, NULL, _starpu_sink_thread, arg);
-  STARPU_ASSERT(ret == 0);
+	//create the tread
+	ret = pthread_create(&thread, NULL, _starpu_sink_thread, arg);
+	STARPU_ASSERT(ret == 0);
   
-  //bind the thread on the core coreid
-  arg->node->bind_thread(arg->node, &cpuset, arg->coreid, &thread);
+	//bind the thread on the core coreid
+	arg->node->bind_thread(arg->node, &cpuset, arg->coreid, &thread);
 }
 
 
@@ -280,7 +280,7 @@ static void _starpu_sink_execute_thread(struct task *arg)
  */
 
 void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
-					void *arg, int arg_size)
+				 void *arg, int arg_size)
 {
 	unsigned id = 0;
 	unsigned nb_interfaces;

+ 112 - 132
src/drivers/mp_common/source_common.c

@@ -72,69 +72,70 @@ _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset,
 	return 0;
 }
 
-enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, 
-							   void ** arg, int* arg_size)
+
+/* recv a message and handle asynchrone message
+ * return 0 if the message has not been handle (it's certainly mean that it's a synchrone message)
+ * return 1 if the message has been handle
+ */
+static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node, 
+				    void ** arg, int* arg_size, 
+				    enum _starpu_mp_command *answer)
 {
-  enum _starpu_mp_command answer;
-  int sync_commande = 0;
-  struct _starpu_worker_set * worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
-  
-  while(!sync_commande)
-    {
-      answer = _starpu_mp_common_recv_command(node, arg, arg_size);
-      switch(answer) 
+	struct _starpu_worker_set * worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
+	*answer = _starpu_mp_common_recv_command(node, arg, arg_size);
+	switch(*answer) 
 	{
 	case STARPU_EXECUTION_COMPLETED:
-	  _starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);	  
-	  break;
+		_starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);
+		break;
 	default:
-	  sync_commande = 1;
-	  break;
+		return 0;
+		break;
 	}
-    }
-  return answer;
+	return 1;
+}
+
+enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starpu_mp_node *node, 
+							     void ** arg, int* arg_size)
+{
+	enum _starpu_mp_command answer;
+	while(_starpu_src_common_handle_async(node,arg,arg_size,&answer));
+	return answer;
 }
 
 
- void _starpu_src_common_recv_async(struct _starpu_worker_set *worker_set, 
-					  struct _starpu_mp_node * baseworker_node)
+void _starpu_src_common_recv_async(struct _starpu_mp_node * baseworker_node)
 {
-  enum _starpu_mp_command answer;
-  void *arg;
-  int arg_size;
-  
-  answer = _starpu_mp_common_recv_command(baseworker_node, &arg, &arg_size);
+	enum _starpu_mp_command answer;
+	void *arg;
+	int arg_size;
   
-  switch(answer) {
-    case STARPU_EXECUTION_COMPLETED:
-      _starpu_src_common_process_completed_job (worker_set, arg, arg_size);
-      break;
-    default :
-      printf("incorrect commande: unknown command or sync command");
-      STARPU_ASSERT(0);
-      break;
-    }
+	if(!_starpu_src_common_handle_async(baseworker_node,&arg,&arg_size,&answer))
+	{
+	printf("incorrect commande: unknown command or sync command");
+	STARPU_ASSERT(0);
+	}	
 }
 
 
 int
 _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf)
 {
-    // Send a request to the sink NODE for the number of cores on it.
+	// Send a request to the sink NODE for the number of cores on it.
 
-    enum _starpu_mp_command answer;
-    void *arg;
-    int arg_size = sizeof (int);
+	enum _starpu_mp_command answer;
+	void *arg;
+	int arg_size = sizeof (int);
 
-    _starpu_mp_common_send_command (node, STARPU_SINK_NBCORES, NULL, 0);
+	_starpu_mp_common_send_command (node, STARPU_SINK_NBCORES, NULL, 0);
 
-    answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
+	answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
 
-    STARPU_ASSERT (answer == STARPU_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
+	STARPU_ASSERT (answer == STARPU_ANSWER_SINK_NBCORES && arg_size == sizeof (int));
 
-    memcpy (buf, arg, arg_size);
+	memcpy (buf, arg, arg_size);
 
-    return 0;
+	return 0;
 }
 
 /* Send a request to the sink linked to NODE for the pointer to the
@@ -157,9 +158,10 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 				       arg_size);
 
 	answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
-						&arg_size);
+						      &arg_size);
 
-	if (answer == STARPU_ERROR_LOOKUP) {
+	if (answer == STARPU_ERROR_LOOKUP) 
+	{
 		_STARPU_DISP("Error looking up symbol %s\n", func_name);
 		return -ESPIPE;
 	}
@@ -176,7 +178,7 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 	return 0;
 }
 
- /* Send a message to the sink to execute a kernel.
+/* Send a message to the sink to execute a kernel.
  * The message sent has the form below :
  * [Function pointer on sink, number of interfaces, interfaces
  * (union _starpu_interface), cl_arg]
@@ -194,8 +196,8 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 
 	/* If the user didn't give any cl_arg, there is no need to send it */
 	buffer_size =
-	    sizeof(kernel) + sizeof(coreid) + sizeof(nb_interfaces) +
-	    nb_interfaces * sizeof(union _starpu_interface);
+		sizeof(kernel) + sizeof(coreid) + sizeof(nb_interfaces) +
+		nb_interfaces * sizeof(union _starpu_interface);
 	if (cl_arg)
 	{
 		STARPU_ASSERT(cl_arg_size);
@@ -258,9 +260,9 @@ int _starpu_src_common_execute_kernel_from_task(const struct _starpu_mp_node *no
 						void (*kernel)(void), unsigned coreid,
 						struct starpu_task *task)
 {
-    return _starpu_src_common_execute_kernel(node, kernel, coreid,
-					     task->handles, task->interfaces, task->cl->nbuffers,
-					     task->cl_arg, task->cl_arg_size);
+	return _starpu_src_common_execute_kernel(node, kernel, coreid,
+						 task->handles, task->interfaces, task->cl->nbuffers,
+						 task->cl_arg, task->cl_arg_size);
 }
 
 /* Send a request to the sink linked to the MP_NODE to allocate SIZE bytes on
@@ -270,14 +272,14 @@ int _starpu_src_common_execute_kernel_from_task(const struct _starpu_mp_node *no
  * else it returns 1 if the allocation fail.
  */
 int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
-								void **addr, size_t size)
+				void **addr, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
 
 	_starpu_mp_common_send_command(mp_node, STARPU_ALLOCATE, &size,
-								   sizeof(size));
+				       sizeof(size));
 
 	answer = _starpu_mp_common_recv_command(mp_node, &arg, &arg_size);
 
@@ -285,7 +287,7 @@ int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
 		return 1;
 
 	STARPU_ASSERT(answer == STARPU_ANSWER_ALLOCATE &&
-				  arg_size == sizeof(*addr));
+		      arg_size == sizeof(*addr));
 
 	memcpy(addr, arg, arg_size);
 
@@ -296,7 +298,7 @@ int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
  * area pointed by ADDR.
  */
 void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
-							 void *addr)
+			     void *addr)
 {
 	_starpu_mp_common_send_command(mp_node, STARPU_FREE, &addr, sizeof(addr));
 }
@@ -304,7 +306,7 @@ void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
 /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE.
  */
 int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
-										 void *src, void *dst, size_t size)
+					 void *src, void *dst, size_t size)
 {
 	struct _starpu_mp_transfer_command cmd = {size, dst};
 
@@ -317,7 +319,7 @@ int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
 /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST.
  */
 int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
-										 void *src, void *dst, size_t size)
+					 void *src, void *dst, size_t size)
 {
 	struct _starpu_mp_transfer_command cmd = {size, src};
 
@@ -331,7 +333,7 @@ int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
  * to the sink linked to DST_NODE. The latter store them in DST.
  */
 int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
-		const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
+					 const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
@@ -474,31 +476,16 @@ static int _starpu_src_common_execute_job(struct _starpu_job *j,
 					  struct _starpu_worker *worker, 
 					  struct _starpu_mp_node * node)
 {
-
-  /*#################### */
-  /*#################### */
-  /* TODO */
-  /*calibrate_model*/
-  /*#################### */
-  /*#################### */
-
-
-	int ret;
+        int ret;
 	uint32_t mask = 0;
 
 	STARPU_ASSERT(j);
 	struct starpu_task *task = j->task;
 
 	int profiling = starpu_profiling_status_get();
-	unsigned calibrate_model = 0;
 
 	STARPU_ASSERT(task);
-	struct starpu_codelet *cl = task->cl;
-	STARPU_ASSERT(cl);
-
-	if (cl->model && cl->model->benchmarking)
-		calibrate_model = 1;
-
+	
 	ret = _starpu_fetch_task_input(j, mask);
 	if (ret != 0)
 	{
@@ -523,75 +510,68 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 			       unsigned baseworkerid, 
 			       struct _starpu_mp_node * mp_node)
 { 
-  struct _starpu_worker * baseworker = &worker_set->workers[baseworkerid];
-  unsigned memnode = baseworker->memory_node;
-  struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
+	struct _starpu_worker * baseworker = &worker_set->workers[baseworkerid];
+	unsigned memnode = baseworker->memory_node;
+	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
  
-  /*main loop*/
-  while (_starpu_machine_is_running())
-    {
-      int res;
-      struct _starpu_job * j;
-
-      _STARPU_TRACE_START_PROGRESS(memnode);
-      _starpu_datawizard_progress(memnode, 1);
-      _STARPU_TRACE_END_PROGRESS(memnode);
-
-      STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
-
-      /* get task for each worker*/
-      res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
-      STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
-
-
-      /* poll the device for completed jobs.*/
-      if (mp_node->mp_recv_is_ready(mp_node)){
-	//_STARPU_DEBUG(" recv_async\n");
-	_starpu_src_common_recv_async(worker_set,mp_node);
-      }
-      /*if at least one worker have pop a task*/
-      if(res != 0)
+	/*main loop*/
+	while (_starpu_machine_is_running())
 	{
-	  unsigned i;
-	  _STARPU_DEBUG(" nb_tasks:%d\n", res);
-	  for(i=0; i<worker_set->nworkers; i++)
-	    {
-	      if(tasks[i] != NULL)
+		int res;
+		struct _starpu_job * j;
+
+		_STARPU_TRACE_START_PROGRESS(memnode);
+		_starpu_datawizard_progress(memnode, 1);
+		_STARPU_TRACE_END_PROGRESS(memnode);
+
+		/* poll the device for completed jobs.*/
+		if (mp_node->mp_recv_is_ready(mp_node))
+			_starpu_src_common_recv_async(mp_node);
+		
+		/* get task for each worker*/
+		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
+
+		/*if at least one worker have pop a task*/
+		if(res != 0)
 		{
-		  //_STARPU_DEBUG(" exec deb\n");
-		  j = _starpu_get_job_associated_to_task(tasks[i]);
-			
+			unsigned i;
+			_STARPU_DEBUG(" nb_tasks:%d\n", res);
+			for(i=1; i<worker_set->nworkers; i++)
+			{
+				if(tasks[i] != NULL)
+				{
+					j = _starpu_get_job_associated_to_task(tasks[i]);
 			
-		  worker_set->workers[i].current_task = j->task;
+					worker_set->workers[i].current_task = j->task;
 
-		  res =  _starpu_src_common_execute_job(j, &worker_set->workers[i], mp_node);
+					res =  _starpu_src_common_execute_job(j, &worker_set->workers[i], mp_node);
 		
-		  if (res)
-		    {
-		      switch (res)
-			{
-			case -EAGAIN:
-			  _STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
-			  _starpu_push_task_to_workers(tasks[i]);
-			  STARPU_ABORT();
-			  continue;
-			  break;
-			default:
-			  STARPU_ASSERT(0);
+					if (res)
+					{
+						switch (res)
+						{
+						case -EAGAIN:
+							_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
+							_starpu_push_task_to_workers(tasks[i]);
+							STARPU_ABORT();
+							continue;
+							break;
+						default:
+							STARPU_ASSERT(0);
+						}
+					}
+					//_STARPU_DEBUG(" exec fin\n");
+				}
 			}
-		    }
-		  //_STARPU_DEBUG(" exec fin\n");
 		}
-	    }
 	}
-    }
-  free(tasks);
+	free(tasks);
 
-  _starpu_handle_all_pending_node_data_requests(memnode);
+	_starpu_handle_all_pending_node_data_requests(memnode);
 
-  /* In case there remains some memory that was automatically
-   * allocated by StarPU, we release it now. Note that data
-   * coherency is not maintained anymore at that point ! */
-  _starpu_free_all_automatically_allocated_buffers(memnode);
+	/* In case there remains some memory that was automatically
+	 * allocated by StarPU, we release it now. Note that data
+	 * coherency is not maintained anymore at that point ! */
+	_starpu_free_all_automatically_allocated_buffers(memnode);
 
 }

+ 8 - 0
src/drivers/scc/driver_scc_common.c

@@ -172,3 +172,11 @@ void _starpu_scc_common_report_rcce_error(const char *func, const char *file, co
 	fprintf(stderr, "RCCE error in %s (%s:%d): %s\n", func, file, line, error_string); 
 	STARPU_ABORT();
 }
+
+int _starpu_scc_common_recv_is_ready(const struct _starpu_mp_node *mp_node)
+{
+  /***********
+      TODO
+  ************/
+  STARPU_ASSERT(0);
+}

+ 2 - 0
src/drivers/scc/driver_scc_common.h

@@ -44,6 +44,8 @@ void _starpu_scc_common_recv(const struct _starpu_mp_node *node, void *msg, int
 
 void _starpu_scc_common_report_rcce_error(const char *func, const char *file, const int line, const int err_no);
 
+int _starpu_scc_common_recv_is_ready(const struct _starpu_mp_node *mp_node);
+
 #endif /* STARPU_USE_SCC */
 
 

+ 9 - 0
src/drivers/scc/driver_scc_sink.c

@@ -51,6 +51,15 @@ void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int s
 		STARPU_MP_COMMON_REPORT_ERROR(node, ret);
 }
 
+void _starpu_scc_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread)
+{
+  /****************
+        TODO
+  ****************/
+  STARPU_ASSERT(0);
+}
+
+
 /* arg -> [Function pointer on sink, number of interfaces, interfaces
  * (union _starpu_interface), cl_arg]
  *

+ 2 - 0
src/drivers/scc/driver_scc_sink.h

@@ -30,6 +30,8 @@ void _starpu_scc_sink_deinit(struct _starpu_mp_node *node);
 void _starpu_scc_sink_send_to_device(const struct _starpu_mp_node *node, int dst_devid, void *msg, int len);
 void _starpu_scc_sink_recv_from_device(const struct _starpu_mp_node *node, int src_devid, void *msg, int len);
 
+void _starpu_scc_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid, pthread_t *thread);
+
 void _starpu_scc_sink_execute(const struct _starpu_mp_node *node, void *arg, int arg_size);
 
 #endif /* STARPU_USE_SCC */

+ 1 - 0
tests/microbenchs/async_tasks_overhead.c

@@ -31,6 +31,7 @@ static double cumulated_pop = 0.0;
 
 void dummy_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STARPU_ATTRIBUTE_UNUSED)
 {
+  usleep(3000);
 }
 
 static struct starpu_codelet dummy_codelet =