Thibaud Lambert 12 anni fa
parent
commit
417631bc44

+ 49 - 169
src/drivers/mic/driver_mic_source.c

@@ -110,95 +110,6 @@ void _starpu_mic_clear_kernels(void)
 	}
 }
 
-static int
-_starpu_mic_src_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
-{
-	uint32_t mask = 0;
-	int profiling = starpu_profiling_status_get();
-	struct timespec codelet_end;
-
-	_starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
-			       profiling);
-
-	_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
-					   &j->cl_start, &codelet_end,
-					   profiling);
-
-	_starpu_push_task_output (j, mask);
-
-	_starpu_handle_job_termination(j);
-
-	return 0;
-}
-
-static int
-_starpu_mic_src_process_completed_job (struct _starpu_worker_set *workerset)
-{
-	struct _starpu_mp_node *node = mic_nodes[workerset->workers[0].mp_nodeid];
-	enum _starpu_mp_command answer;
-	void *arg;
-	int arg_size;
-
-	answer = _starpu_mp_common_recv_command (node, &arg, &arg_size);
-	STARPU_ASSERT (answer == STARPU_EXECUTION_COMPLETED);
-
-	void *arg_ptr = arg;
-	int coreid;
-
-	coreid = *(int *) arg_ptr;
-	arg_ptr += sizeof (coreid); // Useless.
-
-	struct _starpu_worker *worker = &workerset->workers[coreid];
-	struct starpu_task *task = worker->current_task;
-	struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
-
-	_starpu_mic_src_finalize_job (j, worker);
-	worker->current_task = NULL;
-
-	return 0;
-}
-
-
-static int _starpu_mic_src_execute_job(struct _starpu_job *j, struct _starpu_worker *args)
-{
-	int ret;
-	uint32_t mask = 0;
-
-	STARPU_ASSERT(j);
-	struct starpu_task *task = j->task;
-
-	//struct timespec codelet_end;
-
-	int profiling = starpu_profiling_status_get();
-	unsigned calibrate_model = 0;
-
-	STARPU_ASSERT(task);
-	struct starpu_codelet *cl = task->cl;
-	STARPU_ASSERT(cl);
-
-	if (cl->model && cl->model->benchmarking)
-		calibrate_model = 1;
-
-	ret = _starpu_fetch_task_input(j, mask);
-	if (ret != 0)
-	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
-	}
-
-
-	starpu_mic_kernel_t kernel = _starpu_mic_src_get_kernel_from_codelet(j->task->cl, j->nimpl);
-
-	_starpu_driver_start_job (args, j, &j->cl_start, 0, profiling);
-
-	_starpu_src_common_execute_kernel_from_task(mic_nodes[args->mp_nodeid],
-						    (void (*)(void)) kernel, args->devid, task);
-
-	return 0;
-}
-
 int _starpu_mic_src_register_kernel(starpu_mic_func_symbol_t *symbol, const char *func_name)
 {
 	unsigned int func_name_size = (strlen(func_name) + 1) * sizeof(char);
@@ -247,9 +158,11 @@ int _starpu_mic_src_register_kernel(starpu_mic_func_symbol_t *symbol, const char
 	return 0;
 }
 
+
 starpu_mic_kernel_t _starpu_mic_src_get_kernel(starpu_mic_func_symbol_t symbol)
 {
 	int workerid = starpu_worker_get_id();
+	
 	/* This function has to be called in the codelet only, by the thread
 	 * which will handle the task */
 	if (workerid < 0)
@@ -364,6 +277,43 @@ starpu_mic_kernel_t _starpu_mic_src_get_kernel_from_codelet(struct starpu_codele
 	return kernel;
 }
 
+
+
+void(* _starpu_mic_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void)
+{
+	starpu_mic_kernel_t kernel = NULL;
+
+	starpu_mic_func_t func = _starpu_task_get_mic_nth_implementation(j->task->cl, j->nimpl);
+	if (func)
+	{
+		/* We execute the function contained in the codelet, it must return a
+		 * pointer to the function to execute on the device, either specified
+		 * directly by the user or by a call to starpu_mic_get_func().
+		 */
+		kernel = func();
+	}
+	else
+	{
+		/* If user dont define any starpu_mic_fun_t in cl->mic_func we try to use
+		 * cpu_func_name.
+		 */
+		char *func_name = _starpu_task_get_cpu_name_nth_implementation(j->task->cl, j->nimpl);
+		if (func_name)
+		{
+			starpu_mic_func_symbol_t symbol;
+
+			_starpu_mic_src_register_kernel(&symbol, func_name);
+
+			kernel = _starpu_mic_src_get_kernel(symbol);
+		}
+	}
+	STARPU_ASSERT(kernel);
+
+	return (void (*)(void))kernel;
+}
+
+
+
 /* Initialize the node structure describing the MIC source.
  */
 void _starpu_mic_src_init(struct _starpu_mp_node *node)
@@ -552,18 +502,20 @@ int _starpu_mic_request_is_complete(struct _starpu_mic_async_event *event)
 	return 1;
 }
 
+
+
 void *_starpu_mic_src_worker(void *arg)
 {
-	struct _starpu_worker_set *args = arg;
+	struct _starpu_worker_set *worker_set = arg;
 	/* As all workers of a set share common data, we just use the first
 	 * one for intializing the following stuffs. */
-	struct _starpu_worker *baseworker = &args->workers[0];
+	struct _starpu_worker *baseworker = &worker_set->workers[0];
 	struct _starpu_machine_config *config = baseworker->config;
 	unsigned baseworkerid = baseworker - config->workers;
 	unsigned mp_nodeid = baseworker->mp_nodeid;
 	unsigned i;
 
-	unsigned memnode = baseworker->memory_node;
+	/* unsigned memnode = baseworker->memory_node; */
 
 	_starpu_worker_init(baseworker, _STARPU_FUT_MIC_KEY);
 
@@ -581,87 +533,15 @@ void *_starpu_mic_src_worker(void *arg)
 	_STARPU_TRACE_WORKER_INIT_END;
 
 	/* tell the main thread that this one is ready */
-	STARPU_PTHREAD_MUTEX_LOCK(&args->mutex);
-	args->set_is_initialized = 1;
-	STARPU_PTHREAD_COND_SIGNAL(&args->ready_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&args->mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&worker_set->mutex);
+	worker_set->set_is_initialized = 1;
+	STARPU_PTHREAD_COND_SIGNAL(&worker_set->ready_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&worker_set->mutex);
 
-	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*args->nworkers);
-	
-	/*main loop*/
-	while (_starpu_machine_is_running())
-	  {
-	    int res;
-	    struct _starpu_job * j;
-
-	    _STARPU_TRACE_START_PROGRESS(memnode);
-	    _starpu_datawizard_progress(memnode, 1);
-	    _STARPU_TRACE_END_PROGRESS(memnode);
-
-	    STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
-
-	    /* get task for each worker*/
-	    res = _starpu_get_multi_worker_task(args->workers, tasks, args->nworkers);
-	    STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
-
-
-	    /* poll the MIC device for completed jobs.*/
-	    if (_starpu_mic_common_recv_is_ready(mic_nodes[args->workers[0].mp_nodeid]))
-	      _starpu_mic_src_process_completed_job (args);
-	   	    
-
-	    /*if at least one worker have pop a task*/
-	    if(res != 0)
-	      {
-		//printf("\n nb_tasks:%d\n", res);
-		_STARPU_DEBUG("\n nb_tasks:%d\n", res);
-		for(i=0; i<args->nworkers; i++)
-		  {
-		    if(tasks[i] != NULL)
-		      {
-			j = _starpu_get_job_associated_to_task(tasks[i]);
-
-			/* can a MIC device do that task ? */
-			if (!_STARPU_MIC_MAY_PERFORM(j))
-			  {
-			    /* this isn't a mic task */
-			    _starpu_push_task_to_workers(tasks[i]);
-			    continue;
-			  }
-
-			args->workers[i].current_task = j->task;
-
-			res = _starpu_mic_src_execute_job (j, &args->workers[i]);
-		
-			if (res)
-			  {
-			    switch (res)
-			      {
-			      case -EAGAIN:
-				_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
-				_starpu_push_task_to_workers(tasks[i]);
-				STARPU_ABORT();
-				continue;
-			      default:
-				STARPU_ASSERT(0);
-			      }
-			  }
-		      }
-		  }
-	      }
-	  }
-
-	free(tasks);
+	_starpu_src_common_worker(worker_set, baseworkerid, mic_nodes[mp_nodeid]);
 
 	_STARPU_TRACE_WORKER_DEINIT_START;
 
-	_starpu_handle_all_pending_node_data_requests(memnode);
-
-	/* In case there remains some memory that was automatically
-	 * allocated by StarPU, we release it now. Note that data
-	 * coherency is not maintained anymore at that point ! */
-	_starpu_free_all_automatically_allocated_buffers(memnode);
-
 	_STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_CUDA_KEY);
 
 	return NULL;

+ 2 - 0
src/drivers/mic/driver_mic_source.h

@@ -24,6 +24,7 @@
 
 #include <source/COIProcess_source.h>
 #include <source/COIEngine_source.h>
+#include <core/workers.h>
 
 #include <drivers/mp_common/mp_common.h>
 
@@ -44,6 +45,7 @@ struct _starpu_mic_async_event *event;
 const struct _starpu_mp_node *_starpu_mic_src_get_actual_thread_mp_node();
 const struct _starpu_mp_node *_starpu_mic_src_get_mp_node_from_memory_node(int memory_node);
 
+void(* _starpu_mic_src_get_kernel_from_job(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED, struct _starpu_job *j))(void);
 int _starpu_mic_src_register_kernel(starpu_mic_func_symbol_t *symbol, const char *func_name);
 starpu_mic_kernel_t _starpu_mic_src_get_kernel(starpu_mic_func_symbol_t symbol);
 

+ 5 - 1
src/drivers/mp_common/mp_common.c

@@ -56,12 +56,13 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->deinit = _starpu_mic_src_deinit;
 				node->report_error = _starpu_mic_src_report_scif_error;
 
-				node->mp_recv_is_ready = NULL;
+				node->mp_recv_is_ready = _starpu_mic_common_recv_is_ready;
 				node->mp_send = _starpu_mic_common_send;
 				node->mp_recv = _starpu_mic_common_recv;
 				node->dt_send = _starpu_mic_common_dt_send;
 				node->dt_recv = _starpu_mic_common_dt_recv;
 
+				node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
 				node->bind_thread = NULL;
 				node->execute = NULL;
 				node->nbcores = NULL;
@@ -89,6 +90,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->dt_send = _starpu_mic_common_dt_send;
 				node->dt_recv = _starpu_mic_common_dt_recv;
 
+				node->get_kernel_from_job = NULL;
 				node->bind_thread = _starpu_mic_sink_bind_thread;
 				node->execute = _starpu_sink_common_execute;
 				node->nbcores = _starpu_sink_nbcores;
@@ -115,6 +117,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->dt_send_to_device = NULL;
 				node->dt_recv_from_device = NULL;
 
+				node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
 				node->bind_thread = NULL;
 				node->execute = NULL;
 				node->allocate = NULL;
@@ -138,6 +141,7 @@ struct _starpu_mp_node * STARPU_ATTRIBUTE_MALLOC
 				node->dt_send_to_device = _starpu_scc_sink_send_to_device;
 				node->dt_recv_from_device = _starpu_scc_sink_recv_from_device;
 
+				node->get_kernel_from_job = NULL;
 				node->bind_thread = NULL;
 				node->execute = _starpu_scc_sink_execute;
 				node->allocate = _starpu_sink_common_allocate;

+ 1 - 0
src/drivers/mp_common/mp_common.h

@@ -158,6 +158,7 @@ struct _starpu_mp_node
 	void (*dt_send_to_device)(const struct _starpu_mp_node *, int, void *, int);
 	void (*dt_recv_from_device)(const struct _starpu_mp_node *, int, void *, int);
 
+  void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
   void (*bind_thread)(const struct _starpu_mp_node *, cpu_set_t *,int, pthread_t *);
 	void (*execute)(const struct _starpu_mp_node *, void *, int);
 	void (*nbcores)(const struct _starpu_mp_node *);

+ 2 - 11
src/drivers/mp_common/sink_common.c

@@ -77,24 +77,16 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
 	void *dl_handle = dlopen(NULL, RTLD_NOW);
 	func = dlsym(dl_handle, func_name);
 	
-	printf("Looked up %s, got %p\n", func_name, func);
-	_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
+	//_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
 
 	/* If we couldn't find the function, let's send an error to the host.
 	 * The user probably made a mistake in the name */
 	if (func)
-	  {
-	    printf("\n LOOL UP OK \n");
 	    _starpu_mp_common_send_command(node, STARPU_ANSWER_LOOKUP,
 					       &func, sizeof(func));
-	
-	  }
 	else
-	  {
-	    printf("\n LOOL UP FAIL \n");
 	    _starpu_mp_common_send_command(node, STARPU_ERROR_LOOKUP,
 					       NULL, 0);
-	  }
 }
 
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
@@ -237,7 +229,7 @@ void _starpu_sink_common_worker(void)
 		if(!task_fifo_is_empty(&(node->dead_queue)))
 		  {
 		    struct task * task = node->dead_queue.first;
-		    _STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
+		    //_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
 		    _starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
 								    &(task->coreid), sizeof(task->coreid));
 		    task_fifo_pop(&(node->dead_queue));
@@ -267,7 +259,6 @@ static void* _starpu_sink_thread(void * thread_arg)
 
 static void _starpu_sink_execute_thread(struct task *arg)
 {
-  int j;
   pthread_t thread;
   cpu_set_t cpuset;
   int ret;

+ 231 - 11
src/drivers/mp_common/source_common.c

@@ -19,10 +19,104 @@
 #include <pthread.h>
 
 #include <starpu.h>
+#include <core/task.h>
+#include <core/sched_policy.h>
+
+#include <drivers/driver_common/driver_common.h>
+
+
 #include <datawizard/coherency.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <drivers/mp_common/mp_common.h>
 
+
+static int
+_starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
+{
+	uint32_t mask = 0;
+	int profiling = starpu_profiling_status_get();
+	struct timespec codelet_end;
+
+	_starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
+			       profiling);
+
+	_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
+					   &j->cl_start, &codelet_end,
+					   profiling);
+
+	_starpu_push_task_output (j, mask);
+
+	_starpu_handle_job_termination(j);
+
+	return 0;
+}
+
+
+
+static int
+_starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset, void * arg, int arg_size STARPU_ATTRIBUTE_UNUSED)
+{
+	void *arg_ptr = arg;
+	int coreid;
+
+	coreid = *(int *) arg_ptr;
+	arg_ptr += sizeof (coreid); // Useless.
+
+	struct _starpu_worker *worker = &workerset->workers[coreid];
+	struct starpu_task *task = worker->current_task;
+	struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
+
+	_starpu_src_common_finalize_job (j, worker);
+	worker->current_task = NULL;
+
+	return 0;
+}
+
+enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, 
+							   void ** arg, int* arg_size)
+{
+  enum _starpu_mp_command answer;
+  int sync_commande = 0;
+  struct _starpu_worker_set * worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
+  
+  while(!sync_commande)
+    {
+      answer = _starpu_mp_common_recv_command(node, arg, arg_size);
+      switch(answer) 
+	{
+	case STARPU_EXECUTION_COMPLETED:
+	  _starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);	  
+	  break;
+	default:
+	  sync_commande = 1;
+	  break;
+	}
+    }
+  return answer;
+}
+
+
+ void _starpu_src_common_recv_async(struct _starpu_worker_set *worker_set, 
+					  struct _starpu_mp_node * baseworker_node)
+{
+  enum _starpu_mp_command answer;
+  void *arg;
+  int arg_size;
+  
+  answer = _starpu_mp_common_recv_command(baseworker_node, &arg, &arg_size);
+  
+  switch(answer) {
+    case STARPU_EXECUTION_COMPLETED:
+      _starpu_src_common_process_completed_job (worker_set, arg, arg_size);
+      break;
+    default :
+      printf("incorrect commande: unknown command or sync command");
+      STARPU_ASSERT(0);
+      break;
+    }
+}
+
+
 int
 _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf)
 {
@@ -61,11 +155,9 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 	//_STARPU_DEBUG("Looking up %s\n", func_name);
 	_starpu_mp_common_send_command(node, STARPU_LOOKUP, (void *) func_name,
 				       arg_size);
-	answer = _starpu_mp_common_recv_command(node, (void **) &arg,
-						&arg_size);
 
-	//	printf("\n\n\n answer:%d\n\n", (int)answer);
-	
+	answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
+						&arg_size);
 
 	if (answer == STARPU_ERROR_LOOKUP) {
 		_STARPU_DISP("Error looking up symbol %s\n", func_name);
@@ -144,11 +236,11 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 		memcpy(buffer_ptr, cl_arg, cl_arg_size);
 
 	_starpu_mp_common_send_command(node, STARPU_EXECUTE, buffer, buffer_size);
-	enum _starpu_mp_command answer = _starpu_mp_common_recv_command(node, &arg, &arg_size);
+	enum _starpu_mp_command answer = _starpu_src_common_wait_command_sync(node, &arg, &arg_size);
 
 	if (answer == STARPU_ERROR_EXECUTE)
 		return -EINVAL;
-
+	
 	STARPU_ASSERT(answer == STARPU_EXECUTION_SUBMITTED);
 
 	free(buffer);
@@ -268,8 +360,8 @@ int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
 /* 5 functions to determine the executable to run on the device (MIC, SCC,
  * MPI).
  */
-static void _starpu_src_common_cat_3(char *final, const char *first, const char *second,
-										  const char *third)
+static void _starpu_src_common_cat_3(char *final, const char *first, 
+				     const char *second, const char *third)
 {
 	strcpy(final, first);
 	strcat(final, second);
@@ -307,9 +399,9 @@ static int _starpu_src_common_test_suffixes(char *located_file_name, const char
 }
 
 int _starpu_src_common_locate_file(char *located_file_name,
-							const char *env_file_name, const char *env_mic_path,
-							const char *config_file_name, const char *actual_file_name,
-							const char **suffixes)
+				   const char *env_file_name, const char *env_mic_path,
+				   const char *config_file_name, const char *actual_file_name,
+				   const char **suffixes)
 {
 	if (env_file_name != NULL)
 	{
@@ -375,3 +467,131 @@ int _starpu_src_common_locate_file(char *located_file_name,
 
 	return 1;
 }
+
+ 
+
+static int _starpu_src_common_execute_job(struct _starpu_job *j, 
+					  struct _starpu_worker *worker, 
+					  struct _starpu_mp_node * node)
+{
+
+  /*#################### */
+  /*#################### */
+  /* TODO */
+  /*calibrate_model*/
+  /*#################### */
+  /*#################### */
+
+
+	int ret;
+	uint32_t mask = 0;
+
+	STARPU_ASSERT(j);
+	struct starpu_task *task = j->task;
+
+	int profiling = starpu_profiling_status_get();
+	unsigned calibrate_model = 0;
+
+	STARPU_ASSERT(task);
+	struct starpu_codelet *cl = task->cl;
+	STARPU_ASSERT(cl);
+
+	if (cl->model && cl->model->benchmarking)
+		calibrate_model = 1;
+
+	ret = _starpu_fetch_task_input(j, mask);
+	if (ret != 0)
+	{
+		/* there was not enough memory, so the input of
+		 * the codelet cannot be fetched ... put the
+		 * codelet back, and try it later */
+		return -EAGAIN;
+	}
+
+	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
+
+	_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
+
+	_starpu_src_common_execute_kernel_from_task(node, kernel, 
+						    worker->devid, task);	
+
+	return 0;
+}
+
+
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
+			       unsigned baseworkerid, 
+			       struct _starpu_mp_node * mp_node)
+{ 
+  struct _starpu_worker * baseworker = &worker_set->workers[baseworkerid];
+  unsigned memnode = baseworker->memory_node;
+  struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
+ 
+  /*main loop*/
+  while (_starpu_machine_is_running())
+    {
+      int res;
+      struct _starpu_job * j;
+
+      _STARPU_TRACE_START_PROGRESS(memnode);
+      _starpu_datawizard_progress(memnode, 1);
+      _STARPU_TRACE_END_PROGRESS(memnode);
+
+      STARPU_PTHREAD_MUTEX_LOCK(&baseworker->sched_mutex);
+
+      /* get task for each worker*/
+      res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
+      STARPU_PTHREAD_MUTEX_UNLOCK(&baseworker->sched_mutex);
+
+
+      /* poll the device for completed jobs.*/
+      if (mp_node->mp_recv_is_ready(mp_node)){
+	//_STARPU_DEBUG(" recv_async\n");
+	_starpu_src_common_recv_async(worker_set,mp_node);
+      }
+      /*if at least one worker have pop a task*/
+      if(res != 0)
+	{
+	  unsigned i;
+	  _STARPU_DEBUG(" nb_tasks:%d\n", res);
+	  for(i=0; i<worker_set->nworkers; i++)
+	    {
+	      if(tasks[i] != NULL)
+		{
+		  //_STARPU_DEBUG(" exec deb\n");
+		  j = _starpu_get_job_associated_to_task(tasks[i]);
+			
+			
+		  worker_set->workers[i].current_task = j->task;
+
+		  res =  _starpu_src_common_execute_job(j, &worker_set->workers[i], mp_node);
+		
+		  if (res)
+		    {
+		      switch (res)
+			{
+			case -EAGAIN:
+			  _STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
+			  _starpu_push_task_to_workers(tasks[i]);
+			  STARPU_ABORT();
+			  continue;
+			  break;
+			default:
+			  STARPU_ASSERT(0);
+			}
+		    }
+		  //_STARPU_DEBUG(" exec fin\n");
+		}
+	    }
+	}
+    }
+  free(tasks);
+
+  _starpu_handle_all_pending_node_data_requests(memnode);
+
+  /* In case there remains some memory that was automatically
+   * allocated by StarPU, we release it now. Note that data
+   * coherency is not maintained anymore at that point ! */
+  _starpu_free_all_automatically_allocated_buffers(memnode);
+
+}

+ 13 - 0
src/drivers/mp_common/source_common.h

@@ -21,8 +21,16 @@
 
 #ifdef STARPU_USE_MP
 
+#include <core/sched_policy.h>
+#include <core/task.h>
 #include <drivers/mp_common/mp_common.h>
 
+
+enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node, 
+							     void ** arg, int* arg_size);
+void _starpu_src_common_recv_async(struct _starpu_worker_set *worker_set, 
+				   struct _starpu_mp_node * baseworker_node);
+
 int _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf);
 
 int _starpu_src_common_lookup(const struct _starpu_mp_node *node,
@@ -57,6 +65,11 @@ int _starpu_src_common_locate_file(char *located_file_name,
 				   const char *config_file_name, const char *actual_file_name,
 				   const char **suffixes);
 
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
+			       unsigned baseworkerid, 
+			       struct _starpu_mp_node * node_set);
+
+
 #endif /* STARPU_USE_MP */
 
 

+ 28 - 121
src/drivers/scc/driver_scc_source.c

@@ -60,79 +60,40 @@ static void _starpu_scc_src_deinit_context(int devid)
 
 	_starpu_mp_common_node_destroy(scc_mp_nodes[devid]);
 }
-
-static int _starpu_scc_src_execute_job(struct _starpu_job *j, struct _starpu_worker *args)
+void (*_starpu_scc_src_get_kernel_from_job(const struct _starpu_mp_node *,struct _starpu_job *j))(void)
 {
-	int ret;
-	uint32_t mask = 0;
-
-	STARPU_ASSERT(j);
-	struct starpu_task *task = j->task;
-
-	struct timespec codelet_start, codelet_end;
-
-	int profiling = starpu_profiling_status_get();
-	unsigned calibrate_model = 0;
-
-	STARPU_ASSERT(task);
-	struct starpu_codelet *cl = task->cl;
-	STARPU_ASSERT(cl);
-
-	if (cl->model && cl->model->benchmarking)
-		calibrate_model = 1;
-
-	ret = _starpu_fetch_task_input(j, mask);
-	if (ret != 0)
+  starpu_scc_kernel_t kernel = NULL;
+
+  starpu_scc_func_t func = _starpu_task_get_scc_nth_implementation(j->task->cl, j->nimpl);
+  if (func)
+    {
+      /* We execute the function contained in the codelet, it must return a
+       * pointer to the function to execute on the device, either specified
+       * directly by the user or by a call to starpu_scc_get_kernel().
+       */
+      kernel = func();
+    }
+  else
+    {
+      /* If user doesn't define any starpu_scc_func_t in cl->scc_funcs we try to use
+       * cpu_funcs_name.
+       */
+      char *func_name = _starpu_task_get_cpu_name_nth_implementation(j->task->cl, j->nimpl);
+      if (func_name)
 	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
-	}
-
+	  starpu_scc_func_symbol_t symbol;
 
-	starpu_scc_kernel_t kernel = NULL;
+	  _starpu_scc_src_register_kernel(&symbol, func_name);
 
-	starpu_scc_func_t func = _starpu_task_get_scc_nth_implementation(j->task->cl, j->nimpl);
-	if (func)
-	{
-		/* We execute the function contained in the codelet, it must return a
-		 * pointer to the function to execute on the device, either specified
-		 * directly by the user or by a call to starpu_scc_get_kernel().
-		 */
-		kernel = func();
+	  kernel = _starpu_scc_src_get_kernel(symbol);
 	}
-	else
-	{
-		/* If user doesn't define any starpu_scc_func_t in cl->scc_funcs we try to use
-		 * cpu_funcs_name.
-		 */
-		char *func_name = _starpu_task_get_cpu_name_nth_implementation(j->task->cl, j->nimpl);
-		if (func_name)
-		{
-			starpu_scc_func_symbol_t symbol;
-
-			_starpu_scc_src_register_kernel(&symbol, func_name);
-
-			kernel = _starpu_scc_src_get_kernel(symbol);
-		}
-	}
-	STARPU_ASSERT(kernel);
-
-	_starpu_driver_start_job(args, j, &codelet_start, 0, profiling);
-
-	_starpu_src_common_execute_kernel_from_task(scc_mp_nodes[args->devid], (void (*)(void)) kernel, 0, task);
-
-	_starpu_driver_end_job(args, j, args->perf_arch, &codelet_end, 0, profiling);
-
-	_starpu_driver_update_job_feedback(j, args, args->perf_arch, &codelet_start, &codelet_end, profiling);
-
-	_starpu_push_task_output(j, mask);
+    }
+  STARPU_ASSERT(kernel);  
 
-
-	return 0;
+  return (void (*)(void))kernel;
 }
 
+
 void _starpu_scc_src_mp_deinit()
 {
 	_starpu_scc_common_unmap_shared_memory();
@@ -320,7 +281,7 @@ int _starpu_scc_copy_sink_to_sink(void *src, unsigned src_node, void *dst, unsig
 
 void *_starpu_scc_src_worker(void *arg)
 {
-	struct _starpu_worker *args = arg;
+	struct _starpu_worker_set *args = arg;
 
 	int devid = args->devid;
 	int workerid = args->workerid;
@@ -350,64 +311,10 @@ void *_starpu_scc_src_worker(void *arg)
 	STARPU_PTHREAD_COND_SIGNAL(&args->ready_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&args->mutex);
 
-	struct _starpu_job * j;
-	struct starpu_task *task;
-	int res;
-
-	while (_starpu_machine_is_running())
-	{
-		_STARPU_TRACE_START_PROGRESS(memnode);
-		_starpu_datawizard_progress(memnode, 1);
-		_STARPU_TRACE_END_PROGRESS(memnode);
-
-		task = _starpu_get_worker_task(args, workerid, memnode);
-		if (!task)
-			continue;
-
-		j = _starpu_get_job_associated_to_task(task);
-
-		/* can a SCC device do that task ? */
-		if (!_STARPU_SCC_MAY_PERFORM(j))
-		{
-			/* this isn't a SCC task */
-			_starpu_push_task_to_workers(task);
-			continue;
-		}
-
-		_starpu_set_current_task(task);
-		args->current_task = j->task;
-
-		res = _starpu_scc_src_execute_job(j, args);
-
-		_starpu_set_current_task(NULL);
-		args->current_task = NULL;
-
-		if (res)
-		{
-			switch (res)
-			{
-				case -EAGAIN:
-					_STARPU_DISP("ouch, SCC could not actually run task %p, putting it back...\n", task);
-					_starpu_push_task_to_workers(task);
-					STARPU_ABORT();
-					continue;
-				default:
-					STARPU_ASSERT(0);
-			}
-		}
-
-		_starpu_handle_job_termination(j);
-	}
+	_starpu_src_common_worker(args, baseworkerid, scc_mp_nodes[mp_nodeid]);
 
 	_STARPU_TRACE_WORKER_DEINIT_START;
 
-	_starpu_handle_all_pending_node_data_requests(memnode);
-
-	/* In case there remains some memory that was automatically
-	 * allocated by StarPU, we release it now. Note that data
-	 * coherency is not maintained anymore at that point ! */
-	_starpu_free_all_automatically_allocated_buffers(memnode);
-
 	_starpu_scc_src_deinit_context(args->devid);
 
 	_STARPU_TRACE_WORKER_DEINIT_END(_STARPU_FUT_SCC_KEY);

+ 1 - 0
src/drivers/scc/driver_scc_source.h

@@ -29,6 +29,7 @@
 
 void _starpu_scc_src_mp_deinit();
 
+void (*_starpu_scc_src_get_kernel_from_job(const struct _starpu_mp_node *,struct _starpu_job *j))(void);
 int _starpu_scc_src_register_kernel(starpu_scc_func_symbol_t *symbol, const char *func_name);
 starpu_scc_kernel_t _starpu_scc_src_get_kernel(starpu_scc_func_symbol_t symbol);