Browse Source

mic: parallel tasks (mode spmd)

Thibaud Lambert 11 years ago
parent
commit
49d190e8de

+ 1 - 1
configure.ac

@@ -967,7 +967,7 @@ AC_DEFINE_UNQUOTED(STARPU_MAXMICDEVS, [$nmaxmicdev],
 AC_MSG_CHECKING(maximum number of MIC threads)
 AC_ARG_ENABLE(maxmicthreads, [AS_HELP_STRING([--enable-maxmicthreads=<number>],
 			[maximum number of MIC threads])],
-			nmaxmicthreads=$enableval, nmaxmicthreads=128)
+			nmaxmicthreads=$enableval, nmaxmicthreads=480)
 AC_MSG_RESULT($nmaxmicthread)
 
 AC_DEFINE_UNQUOTED(STARPU_MAXMICCORES, [$nmaxmicthreads],

+ 7 - 0
doc/doxygen/chapters/configure_options.doxy

@@ -270,6 +270,13 @@ fail when copying data asynchronously. When using this implementation,
 it is therefore necessary to disable asynchronous data transfers.
 </dd>
 
+<dt>--enable-maxmicthreads</dt>
+<dd>
+\anchor enable-maxmicthreads
+\addindex __configure__--enable-maxmicthreads
+Specify the maximum number of MIC threads
+</dd>
+
 <dt>--disable-asynchronous-mic-copy</dt>
 <dd>
 \anchor disable-asynchronous-mic-copy

+ 24 - 24
src/common/list.h

@@ -24,48 +24,48 @@
  * *********************************************************
  * LIST_TYPE(FOO, contenu);
  *  - déclare les types suivants
- *      + pour les cellules : FOO
- *      + pour les listes : FOO_list
- *      + pour les itérateurs : FOO
+ *      + pour les cellules : struct FOO
+ *      + pour les listes : struct FOO_list
+ *      + pour les itérateurs : struct FOO
  *  - déclare les accesseurs suivants :
  *     * création d'une cellule
- *   FOO*       FOO_new(void);
+ *   struct FOO*	FOO_new(void);
  *     * suppression d'une cellule
- *   void       FOO_delete(FOO*);
+ *   void		FOO_delete(struct FOO*);
  *     * création d'une liste (vide)
- *   FOO_list*  FOO_list_new(void);
+ *   struct FOO_list*	FOO_list_new(void);
  *     * suppression d'une liste
- *   void       FOO_list_delete(FOO_list*);
+ *   void		FOO_list_delete(struct FOO_list*);
  *     * teste si une liste est vide
- *   int        FOO_list_empty(FOO_list*);
+ *   int		FOO_list_empty(struct FOO_list*);
  *     * retire un élément de la liste
- *   void       FOO_list_erase(FOO_list*, FOO*);
+ *   void		FOO_list_erase(struct FOO_list*, struct FOO*);
  *     * ajoute une élément en queue de liste
- *   void       FOO_list_push_back(FOO_list*, FOO*);
+ *   void		FOO_list_push_back(struct FOO_list*, struct FOO*);
  *     * ajoute un élément en tête de list
- *   void       FOO_list_push_front(FOO_list*, FOO*);
+ *   void		FOO_list_push_front(struct FOO_list*, struct FOO*);
  *     * ajoute la deuxième liste à la fin de la première liste
- *   FOO*       FOO_list_push_list_back(FOO_list*, FOO_list*);
+ *   struct FOO*	FOO_list_push_list_back(struct FOO_list*, struct FOO_list*);
  *     * ajoute la première liste au début de la deuxième liste
- *   FOO*       FOO_list_push_list_front(FOO_list*, FOO_list*);
+ *   struct FOO*	FOO_list_push_list_front(struct FOO_list*, struct FOO_list*);
  *     * retire l'élément en queue de liste
- *   FOO*       FOO_list_pop_back(FOO_list*);
+ *   struct FOO*	FOO_list_pop_back(struct FOO_list*);
  *     * retire l'élement en tête de liste
- *   FOO*       FOO_list_pop_front(FOO_list*);
+ *   struct FOO*	FOO_list_pop_front(struct FOO_list*);
  *     * retourne l'élément en queue de liste
- *   FOO*       FOO_list_back(FOO_list*);
+ *   struct FOO*	FOO_list_back(struct FOO_list*);
  *     * retourne l'élement en tête de liste
- *   FOO*       FOO_list_front(FOO_list*);
+ *   struct FOO*	FOO_list_front(struct FOO_list*);
  *     * vérifie si la liste chainée est cohérente
- *   int	FOO_list_check(FOO_list*);
+ *   int		FOO_list_check(struct FOO_list*);
  *     *
- *   FOO*       FOO_list_begin(FOO_list*);
+ *   struct FOO*	FOO_list_begin(struct FOO_list*);
  *     *
- *   FOO*       FOO_list_end(FOO_list*);
+ *   struct FOO*	FOO_list_end(struct FOO_list*);
  *     *
- *   FOO*       FOO_list_next(FOO*)
+ *   struct FOO*	FOO_list_next(struct FOO*)
  *     *
- *   int        FOO_list_size(FOO_list*)
+ *   int		FOO_list_size(struct FOO_list*)
  * *********************************************************
  * Exemples d'utilisation :
  *  - au départ, on a :
@@ -83,12 +83,12 @@
  *  - allocation d'une liste vide :
  *  struct ma_structure_list * l = ma_structure_list_new();
  *  - ajouter un élément 'e' en tête de la liste 'l' :
- *  ma_structure * e = ma_structure_new();
+ *  struct ma_structure * e = ma_structure_new();
  *  e->a = 0;
  *  e->b = 0;
  *  ma_structure_list_push_front(l, e);
  *  - itérateur de liste :
- *  ma_structure * i;
+ *  struct ma_structure * i;
  *  for(i  = ma_structure_list_begin(l);
  *      i != ma_structure_list_end(l);
  *      i  = ma_structure_list_next(i))

+ 2 - 0
src/core/combined_workers.c

@@ -102,6 +102,8 @@ int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
 	combined_worker->worker_size = nworkers;
 	combined_worker->perf_arch = (enum starpu_perfmodel_archtype) (STARPU_CPU_DEFAULT + nworkers - 1);
 	combined_worker->worker_mask = STARPU_CPU;
+	combined_worker->count = nworkers -1;
+	pthread_mutex_init(&combined_worker->count_mutex,NULL);
 
 	/* We assume that the memory node should either be that of the first
 	 * entry, and it is very likely that every worker in the combination

+ 6 - 11
src/core/detect_combined_workers.c

@@ -223,7 +223,6 @@ static void assign_combinations_without_hwloc(struct starpu_worker_collection* w
 			}
 		}
 	}
-	_STARPU_DEBUG("%d\n",count);
 }
 
 
@@ -266,22 +265,18 @@ static void find_and_assign_combinations_without_hwloc(int *workerids, int nwork
 #ifdef STARPU_USE_MIC
 		else if(worker->arch == STARPU_MIC_WORKER)
 		{
-			if(worker->devid != 0)
+			for(j=0; mic_id[j] != worker->mp_nodeid && mic_id[j] != -1 && j<nb_mics; j++);
+			if(j<nb_mics)
 			{
-				for(j=0; mic_id[j] != worker->mp_nodeid && mic_id[j] != -1 && j<nb_mics; j++);
-				if(j<nb_mics)
+				if(mic_id[j] == -1)
 				{
-					if(mic_id[j] == -1)
-					{
-						mic_id[j] = worker->mp_nodeid;					
-					}
-					mic_workers[j][nmics_table[j]++] = i;
+					mic_id[j] = worker->mp_nodeid;					
 				}
+				mic_workers[j][nmics_table[j]++] = i;
 			}
 		}
-	
-
 #endif /* STARPU_USE_MIC */
+
 	}
 
 

+ 8 - 8
src/core/perfmodel/perfmodel.c

@@ -446,8 +446,8 @@ void _starpu_create_sampling_directory_if_needed(void)
 {
 	if (!directory_existence_was_tested)
 	{
-		char perf_model_dir[256];
-		_starpu_get_perf_model_dir(perf_model_dir, 256);
+		char perf_model_dir[STARPU_NMAXWORKERS];
+		_starpu_get_perf_model_dir(perf_model_dir, STARPU_NMAXWORKERS);
 
 		/* The performance of the codelets are stored in
 		 * $STARPU_PERF_MODEL_DIR/codelets/ while those of the bus are stored in
@@ -461,18 +461,18 @@ void _starpu_create_sampling_directory_if_needed(void)
 
 
 		/* Per-task performance models */
-		char perf_model_dir_codelets[256];
-		_starpu_get_perf_model_dir_codelets(perf_model_dir_codelets, 256);
+		char perf_model_dir_codelets[STARPU_NMAXWORKERS];
+		_starpu_get_perf_model_dir_codelets(perf_model_dir_codelets, STARPU_NMAXWORKERS);
 		_starpu_mkpath_and_check(perf_model_dir_codelets, S_IRWXU);
 
 		/* Performance of the memory subsystem */
-		char perf_model_dir_bus[256];
-		_starpu_get_perf_model_dir_bus(perf_model_dir_bus, 256);
+		char perf_model_dir_bus[STARPU_NMAXWORKERS];
+		_starpu_get_perf_model_dir_bus(perf_model_dir_bus, STARPU_NMAXWORKERS);
 		_starpu_mkpath_and_check(perf_model_dir_bus, S_IRWXU);
 
 		/* Performance debug measurements */
-		char perf_model_dir_debug[256];
-		_starpu_get_perf_model_dir_debug(perf_model_dir_debug, 256);
+		char perf_model_dir_debug[STARPU_NMAXWORKERS];
+		_starpu_get_perf_model_dir_debug(perf_model_dir_debug, STARPU_NMAXWORKERS);
 		_starpu_mkpath_and_check(perf_model_dir_debug, S_IRWXU);
 
 		directory_existence_was_tested = 1;

+ 0 - 1
src/core/workers.c

@@ -438,7 +438,6 @@ static void _starpu_launch_drivers(struct _starpu_machine_config *pconfig)
 		AYU_event(AYU_INIT, 0, (void*) &n);
 	}
 #endif
-
 	for (worker = 0; worker < nworkers; worker++)
 	{
 		struct _starpu_worker *workerarg = &pconfig->workers[worker];

+ 4 - 0
src/core/workers.h

@@ -120,6 +120,10 @@ struct _starpu_combined_worker
 	int worker_size;
 	unsigned memory_node; /* which memory node is associated that worker to ? */
 	int combined_workerid[STARPU_NMAXWORKERS];
+#ifdef STARPU_USE_MIC 
+	int count;
+	pthread_mutex_t count_mutex;
+#endif
 
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;

+ 1 - 1
src/drivers/driver_common/driver_common.c

@@ -274,7 +274,7 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 	int is_parallel_task;
 	struct _starpu_combined_worker *combined_worker;
 	/*for each worker*/
-	for (i = 1; i < nworkers; i++)
+	for (i = 0; i < nworkers; i++)
 	{
 		/*if the worker is already executinf a task then */
 		if(workers[i].current_task)

+ 12 - 21
src/drivers/mic/driver_mic_sink.c

@@ -27,9 +27,6 @@
 #include "driver_mic_common.h"
 #include "driver_mic_sink.h"
 
-#define HYPER_THREAD_NUMBER 4
-
-
 /* Initialize the MIC sink, initializing connection to the source
  * and to the other devices (not implemented yet).
  */
@@ -39,7 +36,7 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	pthread_t thread, self;
 	cpu_set_t cpuset;
 	pthread_attr_t attr;
-	int i, j, ret;
+	int i, ret;
 	struct arg_sink_thread * arg;
 
 	/*Bind on the first core*/
@@ -48,30 +45,33 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 	CPU_SET(0,&cpuset);
 	pthread_setaffinity_np(self,sizeof(cpu_set_t),&cpuset);
 
+
 	/* Initialize connection with the source */
 	_starpu_mic_common_accept(&node->mp_connection.mic_endpoint,
 					 STARPU_MIC_SOURCE_PORT_NUMBER);
 
 	_starpu_mic_common_accept(&node->host_sink_dt_connection.mic_endpoint,
 									 STARPU_MIC_SOURCE_DT_PORT_NUMBER);
+	
 	//init the set
 	CPU_ZERO(&cpuset);
 
-	node->nb_cores = COISysGetCoreCount();
+//	node->nb_cores = COISysGetCoreCount();
 
+	node->nb_cores = COISysGetHardwareThreadCount() - COISysGetHardwareThreadCount() / COISysGetCoreCount();
 	node->thread_table = malloc(sizeof(pthread_t)*node->nb_cores);
 
 	node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
 	node->mutex_run_table = malloc(sizeof(pthread_mutex_t)*node->nb_cores);
 
-	node->barrier_list = mp_task_list_new();
-	node->dead_queue = mp_task_list_new();
-	pthread_mutex_init(&node->dead_queue_mutex,NULL);
+	node->barrier_list = mp_barrier_list_new();
+	node->message_queue = mp_message_list_new();
+	pthread_mutex_init(&node->message_queue_mutex,NULL);
 	pthread_mutex_init(&node->barrier_mutex,NULL);
 
 
 	/*for each core init the mutex, the task pointer and launch the thread */
-	for(i=1; i<node->nb_cores; i++)
+	for(i=0; i<node->nb_cores; i++)
 	{
 		node->run_table[i] = NULL;
 
@@ -80,8 +80,7 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 
 		//init the set
 		CPU_ZERO(&cpuset);
-		for(j=0;j<HYPER_THREAD_NUMBER;j++)
-			CPU_SET(j+i*HYPER_THREAD_NUMBER,&cpuset);
+		CPU_SET(i,&cpuset);
 
 		ret = pthread_attr_init(&attr);
 		STARPU_ASSERT(ret == 0);
@@ -143,13 +142,6 @@ void _starpu_mic_sink_report_error(const char *func, const char *file, const int
 	STARPU_ASSERT(0);
 }
 
-/* Return the number of cores on the callee, a MIC device or Processor Xeon
- */
-unsigned int _starpu_mic_sink_get_nb_core(void)
-{
-	return (unsigned int) COISysGetCoreCount();
-}
-
 /* Allocate memory on the MIC.
  * Memory is register for remote direct access. */
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size)
@@ -199,15 +191,14 @@ void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUT
 void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core)
 {
 	cpu_set_t cpuset;
-	int i, j, ret;
+	int i;
 
   	//init the set
 	CPU_ZERO(&cpuset);
 
 	//adding the core to the set
 	for(i=0;i<nb_core;i++)
-		for(j=0;j<HYPER_THREAD_NUMBER;j++)
-			CPU_SET(j+core_table[i]*HYPER_THREAD_NUMBER,&cpuset);
+		CPU_SET(core_table[i],&cpuset);
 
 	pthread_setaffinity_np(((pthread_t*)mp_node->thread_table)[coreid],sizeof(cpu_set_t),&cpuset);
 }

+ 0 - 2
src/drivers/mic/driver_mic_sink.h

@@ -37,8 +37,6 @@ void _starpu_mic_sink_init(struct _starpu_mp_node *node);
 
 void _starpu_mic_sink_deinit(struct _starpu_mp_node *node);
 
-unsigned int _starpu_mic_sink_get_nb_core(void);
-
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core);

+ 1 - 12
src/drivers/mp_common/mp_common.c

@@ -65,13 +65,8 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->get_kernel_from_job =_starpu_mic_src_get_kernel_from_job;
 		node->bind_thread = NULL;
 		node->execute = NULL;
-		node->nbcores = NULL;
 		node->allocate = NULL;
 		node->free = NULL;
-
-		/* A source node is only working on one core,
-		 * there is no need for this function */
-		node->get_nb_core = NULL;
 	}
 	break;
 
@@ -93,11 +88,9 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->get_kernel_from_job = NULL;
 		node->bind_thread = _starpu_mic_sink_bind_thread;
 		node->execute = _starpu_sink_common_execute;
-		node->nbcores = _starpu_sink_nbcores;
 		node->allocate = _starpu_mic_sink_allocate;
 		node->free = _starpu_mic_sink_free;
 
-		node->get_nb_core = _starpu_mic_sink_get_nb_core;
 	}
 	break;
 #endif /* STARPU_USE_MIC */
@@ -118,12 +111,10 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->dt_recv_from_device = NULL;
 
 		node->get_kernel_from_job =_starpu_scc_src_get_kernel_from_job;
-		node->bind_thread = _starpu_scc_sink_bind_thread;
+		node->bind_thread = NULL;
 		node->execute = NULL;
 		node->allocate = NULL;
 		node->free = NULL;
-
-		node->get_nb_core = NULL;
 	}
 	break;
 
@@ -146,8 +137,6 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 		node->execute = _starpu_scc_sink_execute;
 		node->allocate = _starpu_sink_common_allocate;
 		node->free = _starpu_sink_common_free;
-
-		node->get_nb_core = NULL;
 	}
 	break;
 #endif /* STARPU_USE_SCC */

+ 33 - 27
src/drivers/mp_common/mp_common.h

@@ -37,26 +37,6 @@
 
 #define STARPU_MP_COMMON_REPORT_ERROR(node, status)			\
 	(node)->report_error(__starpu_func__, __FILE__, __LINE__, (status))
-LIST_TYPE(mp_barrier,
-		int id;
-		_starpu_pthread_barrier_t barrier;
-	 );
-
-
-LIST_TYPE(mp_task,
-		void (*kernel)(void **, void *);
-		void *interfaces[STARPU_NMAXBUFS]; 
-		void *cl_arg;
-		unsigned coreid;
-		enum starpu_codelet_type type;
-		int is_parallel_task;
-		int combined_workerid;
-		int combined_worker_size;
-		int combined_worker[STARPU_NMAXWORKERS];
-		_starpu_pthread_barrier_t * barrier;
-		struct mp_task * next;
-	 );
-
 enum _starpu_mp_command
 {
 	STARPU_EXIT,
@@ -78,6 +58,7 @@ enum _starpu_mp_command
 	STARPU_ANSWER_SINK_NBCORES,
 	STARPU_EXECUTION_SUBMITTED,
 	STARPU_EXECUTION_COMPLETED,
+	STARPU_PRE_EXECUTION,
 };
 
 enum _starpu_mp_node_kind
@@ -115,13 +96,41 @@ struct _starpu_mp_transfer_command_to_device
 	void *addr;
 };
 
+LIST_TYPE(mp_barrier,
+		int id;
+		_starpu_pthread_barrier_t before_work_barrier;
+		_starpu_pthread_barrier_t after_work_barrier;
+	 );
+
+LIST_TYPE(mp_message,
+		enum _starpu_mp_command type;
+		char buffer[BUFFER_SIZE];
+		int size;
+	 );
+
+struct mp_task 
+{
+	void (*kernel)(void **, void *);
+	void *interfaces[STARPU_NMAXBUFS]; 
+	void *cl_arg;
+	unsigned coreid;
+	enum starpu_codelet_type type;
+	int is_parallel_task;
+	int combined_workerid;
+	int combined_worker_size;
+	int combined_worker[STARPU_NMAXWORKERS];
+ 	struct mp_barrier* mp_barrier;
+};
+
+
 /* Message-passing working node, whether source
  * or sink */
 struct _starpu_mp_node
 {
 	enum _starpu_mp_node_kind kind;
 
-	/*the number of core*/
+	/*the number of core on the device
+	 * Must be initialized during init function*/
 	int nb_cores;
 
 	/* Buffer used for scif data transfers, allocated
@@ -164,8 +173,8 @@ struct _starpu_mp_node
 	void* thread_table;
 
         /*dead queue where the finished kernel are added */
-        struct mp_task_list* dead_queue;
-	pthread_mutex_t dead_queue_mutex;
+        struct mp_message_list* message_queue;
+	pthread_mutex_t message_queue_mutex;
 
 	/*list of barrier for combined worker*/
 	struct mp_barrier_list* barrier_list;
@@ -193,12 +202,9 @@ struct _starpu_mp_node
 
 	void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
 	void (*bind_thread)(const struct _starpu_mp_node *, int,int *,int);
-	void (*execute)(const struct _starpu_mp_node *, void *, int);
-	void (*nbcores)(const struct _starpu_mp_node *);
+	void (*execute)(struct _starpu_mp_node *, void *, int);
 	void (*allocate)(const struct _starpu_mp_node *, void *, int);
 	void (*free)(const struct _starpu_mp_node *, void *, int);
-
-	unsigned int (*get_nb_core)(void);
 };
 
 struct _starpu_mp_node * _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind, int peer_devid);

+ 121 - 94
src/drivers/mp_common/sink_common.c

@@ -52,7 +52,7 @@ static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
 }
 
 void
-_starpu_sink_nbcores (const struct _starpu_mp_node *node)
+_starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
 {
 	// Process packet received from `_starpu_src_common_sink_cores'.
      	_starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
@@ -167,69 +167,71 @@ void _starpu_sink_common_worker(void)
 
 	/* Create and initialize the node */
 	node = _starpu_mp_common_node_create(node_kind, -1);
-	
+
+	sleep(1);
+
 	while (!exit_starpu)
 	{
 		if(node->mp_recv_is_ready(node))
-		{	
+		{
+
 			command = _starpu_mp_common_recv_command(node, &arg, &arg_size);
 			switch(command)
 			{
-			case STARPU_EXIT:
-				exit_starpu = 1;
-				break;
-			case STARPU_EXECUTE:
-				node->execute(node, arg, arg_size);
-				break;
-			case STARPU_SINK_NBCORES:
-				node->nbcores (node);
-				break;
-			case STARPU_LOOKUP:
-				_starpu_sink_common_lookup(node, (char *) arg);
-				break;
-
-			case STARPU_ALLOCATE:
-				node->allocate(node, arg, arg_size);
-				break;
-
-			case STARPU_FREE:
-				node->free(node, arg, arg_size);
-				break;
-
-			case STARPU_RECV_FROM_HOST:
-				_starpu_sink_common_copy_from_host(node, arg, arg_size);
-				break;
-
-			case STARPU_SEND_TO_HOST:
-				_starpu_sink_common_copy_to_host(node, arg, arg_size);
-				break;
-
-			case STARPU_RECV_FROM_SINK:
-				_starpu_sink_common_copy_from_sink(node, arg, arg_size);
-				break;
-
-			case STARPU_SEND_TO_SINK:
-				_starpu_sink_common_copy_to_sink(node, arg, arg_size);
-				break;
-
-			default:
-				printf("Oops, command %x unrecognized\n", command);
+				case STARPU_EXIT:
+					exit_starpu = 1;
+					break;
+				case STARPU_EXECUTE:
+					node->execute(node, arg, arg_size);
+					break;
+				case STARPU_SINK_NBCORES:
+					_starpu_sink_common_get_nb_cores(node);
+					break;
+				case STARPU_LOOKUP:
+					_starpu_sink_common_lookup(node, (char *) arg);
+					break;
+
+				case STARPU_ALLOCATE:
+					node->allocate(node, arg, arg_size);
+					break;
+
+				case STARPU_FREE:
+					node->free(node, arg, arg_size);
+					break;
+
+				case STARPU_RECV_FROM_HOST:
+					_starpu_sink_common_copy_from_host(node, arg, arg_size);
+					break;
+
+				case STARPU_SEND_TO_HOST:
+					_starpu_sink_common_copy_to_host(node, arg, arg_size);
+					break;
+
+				case STARPU_RECV_FROM_SINK:
+					_starpu_sink_common_copy_from_sink(node, arg, arg_size);
+					break;
+
+				case STARPU_SEND_TO_SINK:
+					_starpu_sink_common_copy_to_sink(node, arg, arg_size);
+					break;
+
+				default:
+					printf("Oops, command %x unrecognized\n", command);
 			}
 		}
-
-		pthread_mutex_lock(&node->dead_queue_mutex);
-		if(!mp_task_list_empty(node->dead_queue))
+		pthread_mutex_lock(&node->message_queue_mutex);
+		if(!mp_message_list_empty(node->message_queue))
 		{
-			struct mp_task * task = mp_task_list_pop_back(node->dead_queue);
-			pthread_mutex_unlock(&node->dead_queue_mutex);
+			struct mp_message * message = mp_message_list_pop_back(node->message_queue);
+			pthread_mutex_unlock(&node->message_queue_mutex);
 			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
-			_starpu_mp_common_send_command(node, STARPU_EXECUTION_COMPLETED,
-						       &(task->coreid), sizeof(task->coreid));
-			mp_task_delete(task);
+			_starpu_mp_common_send_command(node, message->type, 
+					&message->buffer, message->size);
+			mp_message_delete(message);
 		}
 		else
 		{
-			pthread_mutex_unlock(&node->dead_queue_mutex);
+			pthread_mutex_unlock(&node->message_queue_mutex);
 		}
 	}
 
@@ -240,13 +242,9 @@ void _starpu_sink_common_worker(void)
 }
 
 
-
-
 void* _starpu_sink_thread(void * thread_arg)
 {
-
 	struct mp_task **task = ((struct arg_sink_thread *)thread_arg)->task;
-	struct mp_task * task_tmp;
 	struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
 	pthread_mutex_t * mutex = ((struct arg_sink_thread *)thread_arg)->mutex;
 	int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
@@ -254,33 +252,62 @@ void* _starpu_sink_thread(void * thread_arg)
 	while(1)
 	{
 		pthread_mutex_lock(mutex);
-		if((*task) != NULL)
+		if((*task)->is_parallel_task)
 		{
-			task_tmp = (*task);
-			if(task_tmp->is_parallel_task)
+			STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->before_work_barrier);
+			if((*task)->coreid == (*task)->combined_worker[0])
 			{
-				_STARPU_DEBUG("BARRIER WAIT\n");
-				STARPU_PTHREAD_BARRIER_WAIT(task_tmp->barrier);
-				_STARPU_DEBUG("BARRIER JUMP\n");
+				//init message to tell the sink that the execution has begun
+				struct mp_message * message = mp_message_new();
+				message->type = STARPU_PRE_EXECUTION;
+				*(int *) message->buffer = (*task)->combined_workerid;
+				message->size = sizeof((*task)->combined_workerid);
+
+				//append the message to the queue	
+				pthread_mutex_lock(&node->message_queue_mutex);
+				mp_message_list_push_front(node->message_queue,message);
+				pthread_mutex_unlock(&node->message_queue_mutex);
+
+				if((*task)->type == STARPU_FORKJOIN)
+					node->bind_thread(node, coreid, (*task)->combined_worker, (*task)->combined_worker_size);
 			}
-			if(task_tmp->type == STARPU_FORKJOIN && task_tmp->is_parallel_task)
-				node->bind_thread(node, coreid, task_tmp->combined_worker, task_tmp->combined_worker_size);
-
+		}
+		if((*task)->type != STARPU_FORKJOIN || (*task)->coreid == (*task)->combined_worker[0])
+		{
 			//execute the task
-			task_tmp->kernel(task_tmp->interfaces,task_tmp->cl_arg);
+			(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
+		}
+
 
+		if((*task)->is_parallel_task)
+		{
+			STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->after_work_barrier);
+			if((*task)->coreid == (*task)->combined_worker[0])
+			{
+				//erase the barrier from the list
+				pthread_mutex_lock(&node->barrier_mutex);
+				mp_barrier_list_erase(node->barrier_list,(*task)->mp_barrier);
+				pthread_mutex_unlock(&node->barrier_mutex);
+		
+				if((*task)->type == STARPU_FORKJOIN)
+					node->bind_thread(node, coreid, &coreid, 1);
 
-			if(task_tmp->type == STARPU_FORKJOIN && task_tmp->is_parallel_task)
-				node->bind_thread(node, coreid, &coreid, 1);
+			}
+		}
+		//init message to tell the sink that the execution is completed
+		struct mp_message * message = mp_message_new();
+		message->type = STARPU_EXECUTION_COMPLETED;
+		message->size = sizeof((*task)->coreid);
+		*(int*) message->buffer = (*task)->coreid;
 
-			(*task) = NULL;
+		free(*task);
+		(*task) = NULL;
 
-			//append the finished task to the dead queue
-			pthread_mutex_lock(&node->dead_queue_mutex);
-			mp_task_list_push_front(node->dead_queue,task_tmp);
-			pthread_mutex_unlock(&node->dead_queue_mutex);
+		//append the message to the queue
+		pthread_mutex_lock(&node->message_queue_mutex);
+		mp_message_list_push_front(node->message_queue,message);
+		pthread_mutex_unlock(&node->message_queue_mutex);
 
-		}
 	}
 	pthread_exit(NULL);
 }
@@ -294,7 +321,7 @@ static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, str
 }
 
 /**/
-_starpu_pthread_barrier_t * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
+struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
 {
 	struct mp_barrier * b = NULL;
 	pthread_mutex_lock(&node->barrier_mutex);
@@ -304,19 +331,19 @@ _starpu_pthread_barrier_t * _starpu_sink_common_get_barrier(struct _starpu_mp_no
 				b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid; 
 				b = mp_barrier_list_next(b));
 
-		if(b->id == cb_workerid)
+		if(b != NULL && b->id == cb_workerid)
 		{
 			pthread_mutex_unlock(&node->barrier_mutex);
-			return &b->barrier;
+			return b;
 		}
 	}
-       	b = mp_barrier_new();
+	b = mp_barrier_new();
 	b->id = cb_workerid;
-
-	STARPU_PTHREAD_BARRIER_INIT(&b->barrier,NULL,cb_workersize);
+	STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
+	STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
 	mp_barrier_list_push_back(node->barrier_list,b);
 	pthread_mutex_unlock(&node->barrier_mutex);
-	return &b->barrier;
+	return b;
 }
 
 
@@ -328,15 +355,15 @@ _starpu_pthread_barrier_t * _starpu_sink_common_get_barrier(struct _starpu_mp_no
  * addresses of the received interfaces
  */
 
-void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
-				 void *arg, int arg_size)
+void _starpu_sink_common_execute(struct _starpu_mp_node *node,
+		void *arg, int arg_size)
 {
-	unsigned id = 0;
-	unsigned nb_interfaces;
+	int id = 0;
+	unsigned nb_interfaces, i;
 
 	void *arg_ptr = arg;
 	struct mp_task *task = malloc(sizeof(struct mp_task));
-	
+
 	task->kernel = *(void(**)(void **, void *)) arg_ptr;
 	arg_ptr += sizeof(task->kernel);
 
@@ -345,7 +372,7 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 
 	task->is_parallel_task = *(int *) arg_ptr;
 	arg_ptr += sizeof(task->is_parallel_task);
-	
+
 	if(task->is_parallel_task)
 	{
 		task->combined_workerid= *(int *) arg_ptr;
@@ -353,15 +380,14 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 
 		task->combined_worker_size = *(int *) arg_ptr;
 		arg_ptr += sizeof(task->combined_worker_size);
-	
+
 		for (id = 0; id < task->combined_worker_size; id++)
 		{
-			
+
 			task->combined_worker[id] = *(int*) arg_ptr;
 			arg_ptr += sizeof(task->combined_worker[id]);
 		}
-		
-		task->barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,task->combined_worker_size);
+		task->mp_barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,task->combined_worker_size);
 	}
 
 	task->coreid = *(unsigned *) arg_ptr;
@@ -374,9 +400,9 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	 * during execution. As in sink-side there is no mean to know which
 	 * kind of interface to expect, the array is composed of unions of
 	 * interfaces, thus we expect the same size anyway */
-	for (id = 0; id < nb_interfaces; id++)
+	for (i = 0; i < nb_interfaces; i++)
 	{
-		task->interfaces[id] = arg_ptr;
+		task->interfaces[i] = arg_ptr;
 		arg_ptr += sizeof(union _starpu_interface);
 	}
 
@@ -386,11 +412,12 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	else
 		task->cl_arg = NULL;
 
-	
+
 	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
 	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
-				       NULL, 0);
+			NULL, 0);
 
 	//_STARPU_DEBUG("executing the task %p\n", task->kernel);
 	_starpu_sink_common_execute_thread(node, task);	
+
 }

+ 2 - 2
src/drivers/mp_common/sink_common.h

@@ -42,8 +42,8 @@ struct arg_sink_thread
 
 void _starpu_sink_common_worker(void);
 
-void _starpu_sink_common_execute(const struct _starpu_mp_node *node, void *arg, int arg_size);
-void _starpu_sink_nbcores (const struct _starpu_mp_node *node);
+void _starpu_sink_common_execute(struct _starpu_mp_node *node, void *arg, int arg_size);
+void _starpu_sink_nbcores (struct _starpu_mp_node *node);
 
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);

+ 58 - 46
src/drivers/mp_common/source_common.c

@@ -31,38 +31,49 @@
 
 
 	static int
-_starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
+_starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker) 
 {
 	uint32_t mask = 0;
 	int profiling = starpu_profiling_status_get();
 	struct timespec codelet_end;
-
 	_starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
 			profiling);
+	int count = 0;	
+	if(j->task_size > 1)
+	{
+		struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid); 
+
+		pthread_mutex_lock(&cb_worker->count_mutex);
+		count = cb_worker->count--;
+		if(count == 0)
+			cb_worker->count = cb_worker->worker_size - 1; 
+		pthread_mutex_unlock(&cb_worker->count_mutex);
+	//	_STARPU_DEBUG("\ncb_workerid:%d, count:%d\n",worker->combined_workerid, count);
+	}
+	if(count == 0)
+	{
 
-	_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
-			&j->cl_start, &codelet_end,
-			profiling);
+		_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
+				&j->cl_start, &codelet_end,
+				profiling);
 
-	if(worker->current_rank == 0)
-	{
 		_starpu_push_task_output (j, mask);
-	}
-	_starpu_handle_job_termination(j);
 
+		_starpu_handle_job_termination(j);
+	}
 	return 0;
 }
 
 
 
 	static int
-_starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset, void * arg, int arg_size STARPU_ATTRIBUTE_UNUSED)
+_starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset, void * arg, int arg_size)
 {
-	void *arg_ptr = arg;
 	int coreid;
 
-	coreid = *(int *) arg_ptr;
-	arg_ptr += sizeof (coreid); // Useless.
+	STARPU_ASSERT(sizeof(coreid) == arg_size);	
+	
+	coreid = *(int *) arg;
 
 	struct _starpu_worker *worker = &workerset->workers[coreid];
 	struct starpu_task *task = worker->current_task;
@@ -70,10 +81,8 @@ _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset,
 
 	struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
 	_starpu_set_local_worker_key(worker);
-	if(worker->current_rank == 0)
-	{
-		_starpu_src_common_finalize_job (j, worker);
-	}
+	
+	_starpu_src_common_finalize_job (j, worker);
 	worker->current_task = NULL;
 
 	_starpu_set_local_worker_key(old_worker);
@@ -81,6 +90,19 @@ _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset,
 	return 0;
 }
 
+static void _starpu_src_common_pre_exec(void * arg, int arg_size)
+{
+	int cb_workerid, i;
+	STARPU_ASSERT(sizeof(cb_workerid) == arg_size);
+	cb_workerid = *(int *) arg;
+	struct _starpu_combined_worker *combined_worker = _starpu_get_combined_worker_struct(cb_workerid);
+	for(i=0; i < combined_worker->worker_size; i++)
+	{
+		struct _starpu_worker * worker = _starpu_get_worker_struct(combined_worker->combined_workerid[i]);
+		_starpu_set_local_worker_key(worker);
+		_starpu_sched_pre_exec_hook(worker->current_task);
+	}	
+}
 
 /* recv a message and handle asynchrone message
  * return 0 if the message has not been handle (it's certainly mean that it's a synchrone message)
@@ -95,7 +117,10 @@ static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node,
 	switch(*answer) 
 	{
 		case STARPU_EXECUTION_COMPLETED:
-			_starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);
+			_starpu_src_common_process_completed_job(worker_set, *arg, *arg_size);
+			break;
+		case STARPU_PRE_EXECUTION:
+			_starpu_src_common_pre_exec(*arg,*arg_size);
 			break;
 		default:
 			return 0;
@@ -105,7 +130,7 @@ static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node,
 	return 1;
 }
 
-enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starpu_mp_node *node, 
+static enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starpu_mp_node *node, 
 		void ** arg, int* arg_size)
 {
 	enum _starpu_mp_command answer;
@@ -114,12 +139,11 @@ enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starp
 }
 
 
-void _starpu_src_common_recv_async(struct _starpu_mp_node * baseworker_node)
+static void _starpu_src_common_recv_async(struct _starpu_mp_node * baseworker_node)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
-
 	if(!_starpu_src_common_handle_async(baseworker_node,&arg,&arg_size,&answer))
 	{
 		printf("incorrect commande: unknown command or sync command");
@@ -209,9 +233,9 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 {
 
 	void *buffer, *buffer_ptr, *arg =NULL;
-	int i, buffer_size = 0, cb_worker_size = 0, arg_size =0;
+	int j, buffer_size = 0, cb_worker_size = 0, arg_size =0;
 	struct _starpu_combined_worker * cb_worker;
-	unsigned devid;
+	unsigned devid ,i;
 
 	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
 		+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
@@ -255,9 +279,9 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 		*(int *) buffer_ptr = cb_worker_size;
 		buffer_ptr += sizeof(cb_worker_size);
 
-		for (i = 0; i < cb_worker_size; i++)
+		for (j = 0; j < cb_worker_size; j++)
 		{
-			int devid = _starpu_get_worker_struct(cb_worker->combined_workerid[i])->devid;
+			int devid = _starpu_get_worker_struct(cb_worker->combined_workerid[j])->devid;
 			*(int *) buffer_ptr = devid;
 			buffer_ptr += sizeof(devid);
 		}
@@ -327,28 +351,18 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
 
 	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
 
+	_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
 
-	if ((worker->current_rank == 0) || (task->cl->type != STARPU_FORKJOIN))
-	{
-
-		_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
 
+	_STARPU_DEBUG("\nworkerid:%d, rank:%d, type:%d,	cb_workerid:%d, task_size:%d\n\n",worker->devid,worker->current_rank,task->cl->type,j->combined_workerid,j->task_size);
 
-		_STARPU_DEBUG("workerid:%d\n",worker->devid);
-		_STARPU_DEBUG("rank:%d\n",worker->current_rank);
-		_STARPU_DEBUG("type:%d\n",task->cl->type);
-		_STARPU_DEBUG("is_parallel_task:%d\n",(j->task_size > 1));
-		_STARPU_DEBUG("cb_workerid:%d\n",j->combined_workerid);
-		_STARPU_DEBUG("j->task_size:%d\n",j->task_size);
-		_STARPU_DEBUG("cb_worker_count:%d\n\n",starpu_combined_worker_get_count());
+	_starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
+			(j->task_size > 1),
+			j->combined_workerid, task->handles,
+			task->interfaces, task->cl->nbuffers,
+			task->cl_arg, task->cl_arg_size);
 
-		_starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
-				(j->task_size > 1),
-				j->combined_workerid, task->handles,
-				task->interfaces, task->cl->nbuffers,
-				task->cl_arg, task->cl_arg_size);
 
-	}
 	return 0;
 }
 
@@ -577,10 +591,9 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 		_STARPU_TRACE_END_PROGRESS(memnode);
 
 		/* poll the device for completed jobs.*/
-		if (mp_node->mp_recv_is_ready(mp_node))
+		while(mp_node->mp_recv_is_ready(mp_node))
 			_starpu_src_common_recv_async(mp_node);
 
-
 		/* get task for each worker*/
 		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
 
@@ -588,8 +601,7 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 		if(res != 0)
 		{
 			unsigned i;
-			//_STARPU_DEBUG(" nb_tasks:%d\n", res);
-			for(i=1; i<worker_set->nworkers; i++)
+			for(i=0; i<worker_set->nworkers; i++)
 			{
 				if(tasks[i] != NULL)
 				{

+ 3 - 3
tests/parallel_tasks/parallel_kernels.c

@@ -21,16 +21,16 @@
 #include <unistd.h>
 #include "../helper.h"
 
-#define N	1
+#define N	1000
 #define VECTORSIZE	1024
 
 void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
-	int worker_size = starpu_combined_worker_get_size();
+	//int worker_size = starpu_combined_worker_get_size();
 	//STARPU_ASSERT(worker_size > 0);
-	usleep(1000/worker_size);
+	//usleep(1000/worker_size);
 #if 0
 	int id = starpu_worker_get_id();
 	int combined_id = starpu_combined_worker_get_id();

+ 1 - 1
tests/parallel_tasks/spmd_peager.c

@@ -20,7 +20,7 @@
 #include <unistd.h>
 #include "../helper.h"
 
-#define N	1
+#define N	1000	
 #define VECTORSIZE	1024
 
 starpu_data_handle_t v_handle;