Browse Source

mic: comment correction in list.h + added barrier in mic sink

Thibaud Lambert 12 years ago
parent
commit
b9857a47d0

+ 1 - 1
include/starpu_perfmodel.h

@@ -38,7 +38,7 @@ enum starpu_perfmodel_archtype
 	STARPU_CUDA_DEFAULT = STARPU_MAXCPUS,
 	STARPU_OPENCL_DEFAULT = STARPU_CUDA_DEFAULT + STARPU_MAXCUDADEVS,
 	STARPU_MIC_DEFAULT = STARPU_OPENCL_DEFAULT + STARPU_MAXOPENCLDEVS,
-	STARPU_SCC_DEFAULT = STARPU_MIC_DEFAULT + STARPU_MAXMICDEVS
+	STARPU_SCC_DEFAULT = STARPU_MIC_DEFAULT + STARPU_MAXMICDEVS //* STARPU_MAXMICCPUS
 };
 
 #ifdef __STDC_VERSION__

+ 0 - 2
src/Makefile.am

@@ -108,7 +108,6 @@ noinst_HEADERS = 						\
 	drivers/mp_common/mp_common.h				\
 	drivers/mp_common/source_common.h			\
 	drivers/mp_common/sink_common.h				\
-	drivers/mp_common/task_fifo.h				\
 	drivers/cpu/driver_cpu.h				\
 	drivers/cuda/driver_cuda.h				\
 	drivers/opencl/driver_opencl.h				\
@@ -177,7 +176,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	sched_policies/parallel_heft.c				\
 	sched_policies/parallel_eager.c				\
 	drivers/driver_common/driver_common.c			\
-	drivers/mp_common/task_fifo.c				\
 	datawizard/memory_nodes.c				\
 	datawizard/write_back.c					\
 	datawizard/coherency.c					\

+ 24 - 24
src/common/list.h

@@ -29,43 +29,43 @@
  *      + pour les itérateurs : FOO
  *  - déclare les accesseurs suivants :
  *     * création d'une cellule
- *   FOO_t      FOO_new(void);
+ *   FOO*       FOO_new(void);
  *     * suppression d'une cellule
- *   void       FOO_delete(FOO_t);
+ *   void       FOO_delete(FOO*);
  *     * création d'une liste (vide)
- *   FOO_list_t FOO_list_new(void);
+ *   FOO_list*  FOO_list_new(void);
  *     * suppression d'une liste
- *   void       FOO_list_delete(FOO_list_t);
+ *   void       FOO_list_delete(FOO_list*);
  *     * teste si une liste est vide
- *   int        FOO_list_empty(FOO_list_t);
+ *   int        FOO_list_empty(FOO_list*);
  *     * retire un élément de la liste
- *   void       FOO_list_erase(FOO_list_t, FOO_t);
+ *   void       FOO_list_erase(FOO_list*, FOO*);
  *     * ajoute une élément en queue de liste
- *   void       FOO_list_push_back(FOO_list_t, FOO_t);
+ *   void       FOO_list_push_back(FOO_list*, FOO*);
  *     * ajoute un élément en tête de list
- *   void       FOO_list_push_front(FOO_list_t, FOO_t);
+ *   void       FOO_list_push_front(FOO_list*, FOO*);
  *     * ajoute la deuxième liste à la fin de la première liste
- *   FOO_t      FOO_list_push_list_back(FOO_list_t, FOO_list_t);
+ *   FOO*       FOO_list_push_list_back(FOO_list*, FOO_list*);
  *     * ajoute la première liste au début de la deuxième liste
- *   FOO_t      FOO_list_push_list_front(FOO_list_t, FOO_list_t);
+ *   FOO*       FOO_list_push_list_front(FOO_list*, FOO_list*);
  *     * retire l'élément en queue de liste
- *   FOO_t      FOO_list_pop_back(FOO_list_t);
+ *   FOO*       FOO_list_pop_back(FOO_list*);
  *     * retire l'élement en tête de liste
- *   FOO_t      FOO_list_pop_front(FOO_list_t);
+ *   FOO*       FOO_list_pop_front(FOO_list*);
  *     * retourne l'élément en queue de liste
- *   FOO_t      FOO_list_back(FOO_list_t);
+ *   FOO*       FOO_list_back(FOO_list*);
  *     * retourne l'élement en tête de liste
- *   FOO_t      FOO_list_front(FOO_list_t);
+ *   FOO*       FOO_list_front(FOO_list*);
  *     * vérifie si la liste chainée est cohérente
- *   int	FOO_list_check(FOO_list_t);
+ *   int	FOO_list_check(FOO_list*);
  *     *
- *   FOO_t      FOO_list_begin(FOO_list_t);
+ *   FOO*       FOO_list_begin(FOO_list*);
  *     *
- *   FOO_t      FOO_list_end(FOO_list_t);
+ *   FOO*       FOO_list_end(FOO_list*);
  *     *
- *   FOO_t      FOO_list_next(FOO_t)
+ *   FOO*       FOO_list_next(FOO*)
  *     *
- *   int        FOO_list_size(FOO_list_t)
+ *   int        FOO_list_size(FOO_list*)
  * *********************************************************
  * Exemples d'utilisation :
  *  - au départ, on a :
@@ -79,16 +79,16 @@
  *      int a;
  *      int b;
  *    );
- *    qui crée les types ma_structure_t et ma_structure_list_t.
+ *    qui crée les types struct ma_structure et struct ma_structure_list.
  *  - allocation d'une liste vide :
- *  ma_structure_list_t l = ma_structure_list_new();
+ *  struct ma_structure_list * l = ma_structure_list_new();
  *  - ajouter un élément 'e' en tête de la liste 'l' :
- *  ma_structure_t e = ma_structure_new();
+ *  ma_structure * e = ma_structure_new();
  *  e->a = 0;
- *  e->b = 1;
+ *  e->b = 0;
  *  ma_structure_list_push_front(l, e);
  *  - itérateur de liste :
- *  ma_structure i;
+ *  ma_structure * i;
  *  for(i  = ma_structure_list_begin(l);
  *      i != ma_structure_list_end(l);
  *      i  = ma_structure_list_next(i))

+ 6 - 1
src/core/combined_workers.c

@@ -67,9 +67,14 @@ int starpu_combined_worker_assign_workerid(int nworkers, int workerid_array[])
 	{
 		int id = workerid_array[i];
 
+#ifdef STARPU_USE_MIC
+		STARPU_ASSERT(config->workers[id].arch == STARPU_CPU_WORKER || config->workers[id].arch == STARPU_MIC_WORKER);
+		STARPU_ASSERT(config->workers[id].worker_mask == STARPU_CPU || config->workers[id].worker_mask == STARPU_MIC);
+#else/* STARPU_USE_MIC */
 		/* We only combine CPUs */
-		STARPU_ASSERT(config->workers[id].perf_arch == STARPU_CPU_DEFAULT);
+		STARPU_ASSERT(config->workers[id].arch == STARPU_CPU_WORKER);
 		STARPU_ASSERT(config->workers[id].worker_mask == STARPU_CPU);
+#endif /* STARPU_USE_MIC */
 
 		/* We only combine valid "basic" workers */
 		if ((id < 0) || (id >= basic_worker_count))

+ 91 - 25
src/core/detect_combined_workers.c

@@ -191,58 +191,124 @@ static void find_and_assign_combinations_with_hwloc(int *workerids, int nworkers
 
 #else /* STARPU_HAVE_HWLOC */
 
+static void assign_combinations_without_hwloc(struct starpu_worker_collection* worker_collection, int* workers, unsigned n, int min, int max)
+{
+
+	int size,i,count =0;
+	//if the maximun number of worker is already reached
+	if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
+		return;
+
+	for (size = min; size <= max; size *= 2)
+	{
+		unsigned first;
+		for (first = 0; first < n; first += size)
+		{
+			if (first + size <= n)
+			{
+				int found_workerids[size];
+
+				for (i = 0; i < size; i++)
+					found_workerids[i] = workers[first + i];
+
+				/* We register this combination */
+				int newworkerid;
+				newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
+				STARPU_ASSERT(newworkerid >= 0);
+				count++;
+				worker_collection->add(worker_collection, newworkerid);
+				//if the maximun number of worker is reached, then return
+				if(worker_collection->nworkers >= STARPU_NMAXWORKERS - 1)
+					return;
+			}
+		}
+	}
+	_STARPU_DEBUG("%d\n",count);
+}
+
+
 static void find_and_assign_combinations_without_hwloc(int *workerids, int nworkers)
 {
+	int i;
+	unsigned j;
 	unsigned sched_ctx_id  = starpu_sched_ctx_get_context();
 	if(sched_ctx_id == STARPU_NMAX_SCHED_CTXS)
 		sched_ctx_id = 0;
-	int min;
-	int max;
+	int min, max, mic_min, mic_max;
 
 	struct starpu_worker_collection* workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 	/* We put the id of all CPU workers in this array */
 	int cpu_workers[STARPU_NMAXWORKERS];
 	unsigned ncpus = 0;
+#ifdef STARPU_USE_MIC
+	unsigned nb_mics = _starpu_get_machine_config()->topology.nmicdevices;
+	unsigned * nmics_table;
+	int * mic_id;
+	int ** mic_workers;
+	mic_id = malloc(sizeof(int)*nb_mics);
+	nmics_table = malloc(sizeof(unsigned)*nb_mics);
+	mic_workers = malloc(sizeof(int*)*nb_mics);
+	for(i=0; i<nb_mics; i++)
+	{
+		mic_id[i] = -1;
+		nmics_table[i] = 0;
+		mic_workers[i] = malloc(sizeof(int)*STARPU_NMAXWORKERS);
+	}
+#endif /* STARPU_USE_MIC */
 
 	struct _starpu_worker *worker;
-	int i;
 	for (i = 0; i < nworkers; i++)
 	{
 		worker = _starpu_get_worker_struct(workerids[i]);
-
-		if (worker->perf_arch == STARPU_CPU_DEFAULT)
+		if (worker->arch == STARPU_CPU_WORKER)
 			cpu_workers[ncpus++] = i;
+#ifdef STARPU_USE_MIC
+		else if(worker->arch == STARPU_MIC_WORKER)
+		{
+			if(worker->devid != 0)
+			{
+				for(j=0; mic_id[j] != worker->mp_nodeid && mic_id[j] != -1 && j<nb_mics; j++);
+				if(j<nb_mics)
+				{
+					if(mic_id[j] == -1)
+					{
+						mic_id[j] = worker->mp_nodeid;					
+					}
+					mic_workers[j][nmics_table[j]++] = i;
+				}
+			}
+		}
+	
+
+#endif /* STARPU_USE_MIC */
 	}
 
+
 	min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
 	if (min < 2)
 		min = 2;
 	max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
 	if (max == -1 || max > (int) ncpus)
 		max = ncpus;
-
-	int size;
-	for (size = min; size <= max; size *= 2)
+	
+	assign_combinations_without_hwloc(workers,cpu_workers,ncpus,min,max);
+#ifdef STARPU_USE_MIC
+	mic_min = starpu_get_env_number("STARPU_MIN_WORKERSIZE");
+	if (mic_min < 2)
+		mic_min = 2;
+	for(i=0; i<nb_mics; i++)
 	{
-		unsigned first_cpu;
-		for (first_cpu = 0; first_cpu < ncpus; first_cpu += size)
-		{
-			if (first_cpu + size <= ncpus)
-			{
-				int found_workerids[size];
-
-				for (i = 0; i < size; i++)
-					found_workerids[i] = cpu_workers[first_cpu + i];
-
-				/* We register this combination */
-				int newworkerid;
-				newworkerid = starpu_combined_worker_assign_workerid(size, found_workerids);
-				STARPU_ASSERT(newworkerid >= 0);
-				workers->add(workers, newworkerid);
-			}
-		}
+		mic_max = starpu_get_env_number("STARPU_MAX_WORKERSIZE");
+		if (mic_max == -1 || mic_max > (int) nmics_table[i])
+			mic_max = nmics_table[i];
+		assign_combinations_without_hwloc(workers,mic_workers[i],nmics_table[i],mic_min,mic_max);
+		free(mic_workers[i]);
 	}
+	free(mic_id);
+	free(nmics_table);
+	free(mic_workers);
+#endif /* STARPU_USE_MIC */
 }
 
 #endif /* STARPU_HAVE_HWLOC */

+ 1 - 0
src/core/topology.c

@@ -1264,6 +1264,7 @@ _starpu_init_workers_binding (struct _starpu_machine_config *config, int no_mp_c
 
 		if (is_a_set_of_accelerators)
 		{
+/* TODO: il faudrait changer quand on change de device */
 			if (accelerator_bindid == -1)
 				accelerator_bindid = _starpu_get_next_bindid(config, preferred_binding, npreferred);
 

+ 1 - 1
src/core/workers.c

@@ -1424,7 +1424,7 @@ struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id)
 {
 	unsigned basic_worker_count = starpu_worker_get_count();
 	
-	_STARPU_DEBUG("basic_worker_count:%d\n",basic_worker_count);
+	//_STARPU_DEBUG("basic_worker_count:%d\n",basic_worker_count);
 
 	STARPU_ASSERT(id >= basic_worker_count);
 	return &config.combined_workers[id - basic_worker_count];

+ 29 - 3
src/drivers/driver_common/driver_common.c

@@ -243,8 +243,11 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int wor
 int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_task ** tasks, int nworkers)
 {
 	int i, count = 0;
+	struct _starpu_job * j;
+	int is_parallel_task;
+	struct _starpu_combined_worker *combined_worker;
 	/*for each worker*/
-	for (i = 1; (i < nworkers); i++)
+	for (i = 1; i < nworkers; i++)
 	{
 		/*if the worker is already executinf a task then */
 		if(workers[i].current_task)
@@ -255,16 +258,39 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		else
 		{
 			STARPU_PTHREAD_MUTEX_LOCK(&workers[i].sched_mutex);
+			_starpu_set_local_worker_key(&workers[i]);
 			tasks[i] = _starpu_pop_task(&workers[i]);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&workers[i].sched_mutex);
 			if(tasks[i] != NULL)
 			{
 				count ++;
-				_starpu_worker_set_status_sleeping(workers[i].workerid);
+				j = _starpu_get_job_associated_to_task(tasks[i]);
+				is_parallel_task = (j->task_size > 1);
+				workers[i].current_task = j->task;
+				/* Get the rank in case it is a parallel task */
+				if (is_parallel_task)
+				{
+
+					STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+					workers[i].current_rank = j->active_task_alias_count++;
+					STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+					
+					combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
+					workers[i].combined_workerid = j->combined_workerid;
+					workers[i].worker_size = combined_worker->worker_size;
+				}
+				else
+				{
+					workers[i].combined_workerid = workers[i].workerid;
+					workers[i].worker_size = 1;
+					workers[i].current_rank = 0;
+				}
+
+				_starpu_worker_set_status_wakeup(workers[i].workerid);
 			}
 			else
 			{
-				_starpu_worker_set_status_wakeup(workers[i].workerid);
+				_starpu_worker_set_status_sleeping(workers[i].workerid);
 			}
 		}
 	}

+ 65 - 7
src/drivers/mic/driver_mic_sink.c

@@ -36,18 +36,69 @@
 
 void _starpu_mic_sink_init(struct _starpu_mp_node *node)
 {
-	//unsigned int i;
-	
+	pthread_t thread, self;
+	cpu_set_t cpuset;
+	pthread_attr_t attr;
+	int i, j, ret;
+	struct arg_sink_thread * arg;
+
+	/*Bind on the first core*/
+	self = pthread_self();
+	CPU_ZERO(&cpuset);
+	CPU_SET(0,&cpuset);
+	pthread_setaffinity_np(self,sizeof(cpu_set_t),&cpuset);
+
 	/* Initialize connection with the source */
 	_starpu_mic_common_accept(&node->mp_connection.mic_endpoint,
 					 STARPU_MIC_SOURCE_PORT_NUMBER);
 
 	_starpu_mic_common_accept(&node->host_sink_dt_connection.mic_endpoint,
 									 STARPU_MIC_SOURCE_DT_PORT_NUMBER);
+	//init the set
+	CPU_ZERO(&cpuset);
 
 	node->nb_cores = COISysGetCoreCount();
 
-	_starpu_sink_common_init(node);
+	node->thread_table = malloc(sizeof(pthread_t)*node->nb_cores);
+
+	node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
+	node->mutex_run_table = malloc(sizeof(pthread_mutex_t)*node->nb_cores);
+
+	node->barrier_list = mp_task_list_new();
+	node->dead_queue = mp_task_list_new();
+	pthread_mutex_init(&node->dead_queue_mutex,NULL);
+	pthread_mutex_init(&node->barrier_mutex,NULL);
+
+
+	/*for each core init the mutex, the task pointer and launch the thread */
+	for(i=1; i<node->nb_cores; i++)
+	{
+		node->run_table[i] = NULL;
+
+		pthread_mutex_init(&node->mutex_run_table[i],NULL);
+		pthread_mutex_lock(&node->mutex_run_table[i]);
+
+		//init the set
+		CPU_ZERO(&cpuset);
+		for(j=0;j<HYPER_THREAD_NUMBER;j++)
+			CPU_SET(j+i*HYPER_THREAD_NUMBER,&cpuset);
+
+		ret = pthread_attr_init(&attr);
+		STARPU_ASSERT(ret == 0);
+		ret = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
+		STARPU_ASSERT(ret == 0);
+
+		/*prepare the argument for the thread*/
+		arg= malloc(sizeof(struct arg_sink_thread));
+		arg->task = &node->run_table[i];
+		arg->coreid = i;
+		arg->node = node;
+		arg->mutex = &node->mutex_run_table[i];
+		
+		ret = pthread_create(&thread, &attr, _starpu_sink_thread, arg);
+		((pthread_t *)node->thread_table)[i] = thread;
+		STARPU_ASSERT(ret == 0);
+	}
 
 	//node->sink_sink_dt_connections = malloc(node->nb_mp_sinks * sizeof(union _starpu_mp_connection));
 
@@ -145,11 +196,18 @@ void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUT
 
 /* bind the thread to a core
  */
-void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid)
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core)
 {
-	int j, ret;
+	cpu_set_t cpuset;
+	int i, j, ret;
+
+  	//init the set
+	CPU_ZERO(&cpuset);
 
 	//adding the core to the set
-	for(j=0;j<HYPER_THREAD_NUMBER;j++)
-		CPU_SET(j+coreid*HYPER_THREAD_NUMBER,cpuset);
+	for(i=0;i<nb_core;i++)
+		for(j=0;j<HYPER_THREAD_NUMBER;j++)
+			CPU_SET(j+core_table[i]*HYPER_THREAD_NUMBER,&cpuset);
+
+	pthread_setaffinity_np(((pthread_t*)mp_node->thread_table)[coreid],sizeof(cpu_set_t),&cpuset);
 }

+ 1 - 2
src/drivers/mic/driver_mic_sink.h

@@ -41,8 +41,7 @@ unsigned int _starpu_mic_sink_get_nb_core(void);
 
 void _starpu_mic_sink_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_mic_sink_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
-void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, cpu_set_t * cpuset, int coreid);
-
+void _starpu_mic_sink_bind_thread(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, int coreid, int * core_table, int nb_core);
 #endif /* STARPU_USE_MIC */
 
 

+ 2 - 2
src/drivers/mp_common/mp_common.c

@@ -27,6 +27,8 @@
 #include <drivers/scc/driver_scc_source.h>
 #include <drivers/scc/driver_scc_sink.h>
 
+#include <common/list.h>
+
 /* Allocate and initialize the sink structure, when the function returns
  * all the pointer of functions are linked to the right ones.
  */
@@ -42,8 +44,6 @@ _starpu_mp_common_node_create(enum _starpu_mp_node_kind node_kind,
 
 	node->peer_id = peer_id;
 
-	task_fifo_init(&(node->dead_queue));
-
 	switch(node->kind)
 	{
 #ifdef STARPU_USE_MIC

+ 34 - 8
src/drivers/mp_common/mp_common.h

@@ -21,9 +21,9 @@
 
 #include <starpu.h>
 #include <common/config.h>
-
-#include "task_fifo.h"
-
+#include <common/list.h>
+#include <common/barrier.h>
+#include <common/thread.h>
 #ifdef STARPU_USE_MP
 
 #ifdef STARPU_USE_MIC
@@ -37,7 +37,25 @@
 
 #define STARPU_MP_COMMON_REPORT_ERROR(node, status)			\
 	(node)->report_error(__starpu_func__, __FILE__, __LINE__, (status))
-
+LIST_TYPE(mp_barrier,
+		int id;
+		_starpu_pthread_barrier_t barrier;
+	 );
+
+
+LIST_TYPE(mp_task,
+		void (*kernel)(void **, void *);
+		void *interfaces[STARPU_NMAXBUFS]; 
+		void *cl_arg;
+		unsigned coreid;
+		enum starpu_codelet_type type;
+		int is_parallel_task;
+		int combined_workerid;
+		int combined_worker_size;
+		int combined_worker[STARPU_NMAXWORKERS];
+		_starpu_pthread_barrier_t * barrier;
+		struct mp_task * next;
+	 );
 
 enum _starpu_mp_command
 {
@@ -142,12 +160,20 @@ struct _starpu_mp_node
 	 *  - sink_sink_dt_connections[j] is not initialized for the sink number j. */
 	union _starpu_mp_connection *sink_sink_dt_connections;
 
+	/* table to store pointer of the thread workers*/
+	void* thread_table;
+
         /*dead queue where the finished kernel are added */
-        struct mp_task_fifo dead_queue;
+        struct mp_task_list* dead_queue;
+	pthread_mutex_t dead_queue_mutex;
+
+	/*list of barrier for combined worker*/
+	struct mp_barrier_list* barrier_list;
+	pthread_mutex_t barrier_mutex;
 
-	/**/
+	/*table where worker comme pick task*/
 	struct mp_task ** run_table;
-	pthread_mutex_t * mutex_table;
+	pthread_mutex_t * mutex_run_table;
 
 	/* Node general functions */
 	void (*init)(struct _starpu_mp_node *node);
@@ -166,7 +192,7 @@ struct _starpu_mp_node
 	void (*dt_recv_from_device)(const struct _starpu_mp_node *, int, void *, int);
 
 	void (*(*get_kernel_from_job)(const struct _starpu_mp_node *,struct _starpu_job *))(void);
-	void (*bind_thread)(const struct _starpu_mp_node *, cpu_set_t *,int);
+	void (*bind_thread)(const struct _starpu_mp_node *, int,int *,int);
 	void (*execute)(const struct _starpu_mp_node *, void *, int);
 	void (*nbcores)(const struct _starpu_mp_node *);
 	void (*allocate)(const struct _starpu_mp_node *, void *, int);

+ 88 - 107
src/drivers/mp_common/sink_common.c

@@ -22,22 +22,13 @@
 #include <common/utils.h>
 #include <drivers/mp_common/mp_common.h>
 #include <datawizard/interfaces/data_interface.h>
-
+#include <common/barrier.h>
 #ifdef STARPU_USE_MIC
 #include <common/COISysInfo_common.h>
 #endif
 
 #include "sink_common.h"
 
-#include "task_fifo.h"
-
-struct arg_sink_thread
-{
-	struct mp_task ** task;
-	pthread_mutex_t * mutex;
-	struct _starpu_mp_node *node;
-	int coreid;
-};
 
 /* Return the sink kind of the running process, based on the value of the
  * STARPU_SINK environment variable.
@@ -226,14 +217,19 @@ void _starpu_sink_common_worker(void)
 			}
 		}
 
-		if(!task_fifo_is_empty(&(node->dead_queue)))
+		pthread_mutex_lock(&node->dead_queue_mutex);
+		if(!mp_task_list_empty(node->dead_queue))
 		{
-			struct mp_task * task = node->dead_queue.first;
+			struct mp_task * task = mp_task_list_pop_back(node->dead_queue);
+			pthread_mutex_unlock(&node->dead_queue_mutex);
 			//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
-			_starpu_mp_common_send_command(task->node, STARPU_EXECUTION_COMPLETED,
+			_starpu_mp_common_send_command(node, STARPU_EXECUTION_COMPLETED,
 						       &(task->coreid), sizeof(task->coreid));
-			task_fifo_pop(&(node->dead_queue));
-			free(task);
+			mp_task_delete(task);
+		}
+		else
+		{
+			pthread_mutex_unlock(&node->dead_queue_mutex);
 		}
 	}
 
@@ -245,90 +241,48 @@ void _starpu_sink_common_worker(void)
 
 
 
-static void* _starpu_sink_thread(void * thread_arg)
+
+void* _starpu_sink_thread(void * thread_arg)
 {
 
 	struct mp_task **task = ((struct arg_sink_thread *)thread_arg)->task;
-	pthread_mutex_t * mutex = ((struct arg_sink_thread *)thread_arg)->mutex;
+	struct mp_task * task_tmp;
 	struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
+	pthread_mutex_t * mutex = ((struct arg_sink_thread *)thread_arg)->mutex;
 	int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
-	int i;
-	cpu_set_t base_cpu_set, para_cpu_set;
-	pthread_t thread = pthread_self();
-
-	//init the set
-	CPU_ZERO(&base_cpu_set);
-	node->bind_thread(node, &base_cpu_set, coreid);
 	free(thread_arg);
 	while(1)
 	{
 		pthread_mutex_lock(mutex);
-		if((*task)->type == STARPU_FORKJOIN && (*task)->is_parallel_task)
+		if((*task) != NULL)
 		{
-			//init the set
-			CPU_ZERO(&para_cpu_set);
-
-			for(i=0; i<(*task)->combined_worker_size; i++)
+			task_tmp = (*task);
+			if(task_tmp->is_parallel_task)
 			{
-				node->bind_thread(node, &para_cpu_set, (*task)->combined_worker[i]);
+				_STARPU_DEBUG("BARRIER WAIT\n");
+				STARPU_PTHREAD_BARRIER_WAIT(task_tmp->barrier);
+				_STARPU_DEBUG("BARRIER JUMP\n");
 			}
-			pthread_setaffinity_np(thread,sizeof(cpu_set_t),&para_cpu_set);
-		}
+			if(task_tmp->type == STARPU_FORKJOIN && task_tmp->is_parallel_task)
+				node->bind_thread(node, coreid, task_tmp->combined_worker, task_tmp->combined_worker_size);
 
-		//execute the task
-		(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
-		
-		if((*task)->type == STARPU_FORKJOIN && (*task)->is_parallel_task)
-		{
-			pthread_setaffinity_np(thread,sizeof(cpu_set_t),&base_cpu_set);
-		}
+			//execute the task
+			task_tmp->kernel(task_tmp->interfaces,task_tmp->cl_arg);
 
-		//append the finished task to the dead queue
-		task_fifo_append(&((*task)->node->dead_queue),(*task));	
-		
-	}
-	pthread_exit(NULL);
-}
 
+			if(task_tmp->type == STARPU_FORKJOIN && task_tmp->is_parallel_task)
+				node->bind_thread(node, coreid, &coreid, 1);
 
-void _starpu_sink_common_init(struct _starpu_mp_node *node)
-{
-	pthread_t thread;
-	cpu_set_t cpuset;
-	pthread_attr_t attr;
-	int i, ret;
-	struct arg_sink_thread * arg;
+			(*task) = NULL;
 
-	node->run_table = malloc(sizeof(struct mp_task *)*node->nb_cores);
-	node->mutex_table = malloc(sizeof(pthread_mutex_t)*node->nb_cores);
+			//append the finished task to the dead queue
+			pthread_mutex_lock(&node->dead_queue_mutex);
+			mp_task_list_push_front(node->dead_queue,task_tmp);
+			pthread_mutex_unlock(&node->dead_queue_mutex);
 
-	/*for each core init the mutex, the task pointer and launch the thread */
-	for(i=1; i<node->nb_cores; i++)
-	{
-		node->run_table[i] = NULL;
-		pthread_mutex_init(&node->mutex_table[i], NULL);
-		pthread_mutex_lock(&node->mutex_table[i]);
-
-		//init the set
-		CPU_ZERO(&cpuset);
-
-		/*prepare the cpuset*/
-		node->bind_thread(node, &cpuset, i);
-		ret = pthread_attr_init(&attr);
-		STARPU_ASSERT(ret == 0);
-		ret = pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);
-		STARPU_ASSERT(ret == 0);
-
-		/*prepare the argument for the thread*/
-		arg= malloc(sizeof(struct arg_sink_thread));
-		arg->task = &node->run_table[i];
-		arg->mutex = &node->mutex_table[i];
-		arg->coreid = i;
-		arg->node = node;
-		
-		ret = pthread_create(&thread, &attr, _starpu_sink_thread, arg);
-		STARPU_ASSERT(ret == 0);
+		}
 	}
+	pthread_exit(NULL);
 }
 
 static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
@@ -336,9 +290,36 @@ static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, str
 	//add the task to the spesific thread
 	node->run_table[task->coreid] = task;
 	//unlock the mutex
-	pthread_mutex_unlock(&node->mutex_table[task->coreid]);
+	pthread_mutex_unlock(&node->mutex_run_table[task->coreid]);
 }
 
+/**/
+_starpu_pthread_barrier_t * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
+{
+	struct mp_barrier * b = NULL;
+	pthread_mutex_lock(&node->barrier_mutex);
+	if(!mp_barrier_list_empty(node->barrier_list))
+	{
+		for(b = mp_barrier_list_begin(node->barrier_list); 
+				b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid; 
+				b = mp_barrier_list_next(b));
+
+		if(b->id == cb_workerid)
+		{
+			pthread_mutex_unlock(&node->barrier_mutex);
+			return &b->barrier;
+		}
+	}
+       	b = mp_barrier_new();
+	b->id = cb_workerid;
+
+	STARPU_PTHREAD_BARRIER_INIT(&b->barrier,NULL,cb_workersize);
+	mp_barrier_list_push_back(node->barrier_list,b);
+	pthread_mutex_unlock(&node->barrier_mutex);
+	return &b->barrier;
+}
+
+
 
 /* Receive paquet from _starpu_src_common_execute_kernel in the form below :
  * [Function pointer on sink, number of interfaces, interfaces
@@ -354,38 +335,37 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	unsigned nb_interfaces;
 
 	void *arg_ptr = arg;
-	struct mp_task *thread_arg = malloc(sizeof(struct mp_task));
+	struct mp_task *task = malloc(sizeof(struct mp_task));
 	
-	thread_arg->node = node;
-
-	thread_arg->kernel = *(void(**)(void **, void *)) arg_ptr;
-	arg_ptr += sizeof(thread_arg->kernel);
+	task->kernel = *(void(**)(void **, void *)) arg_ptr;
+	arg_ptr += sizeof(task->kernel);
 
-	thread_arg->type = *(enum starpu_codelet_type *) arg_ptr;
-	arg_ptr += sizeof(thread_arg->type);
+	task->type = *(enum starpu_codelet_type *) arg_ptr;
+	arg_ptr += sizeof(task->type);
 
-	thread_arg->is_parallel_task = *(int *) arg_ptr;
-	arg_ptr += sizeof(thread_arg->is_parallel_task);
+	task->is_parallel_task = *(int *) arg_ptr;
+	arg_ptr += sizeof(task->is_parallel_task);
 	
-
-	//_STARPU_DEBUG("type:%d\n",thread_arg->type);
-
-	if(thread_arg->type == STARPU_FORKJOIN && thread_arg->is_parallel_task)
+	if(task->is_parallel_task)
 	{
-		thread_arg->combined_worker_size = *(int *) arg_ptr;
-		arg_ptr += sizeof(thread_arg->combined_worker_size);
+		task->combined_workerid= *(int *) arg_ptr;
+		arg_ptr += sizeof(task->combined_workerid);
+
+		task->combined_worker_size = *(int *) arg_ptr;
+		arg_ptr += sizeof(task->combined_worker_size);
 	
-		for (id = 0; id < thread_arg->combined_worker_size; id++)
+		for (id = 0; id < task->combined_worker_size; id++)
 		{
 			
-			thread_arg->combined_worker[id] = *(int*) arg_ptr;
-			arg_ptr += sizeof(thread_arg->combined_worker[id]);
+			task->combined_worker[id] = *(int*) arg_ptr;
+			arg_ptr += sizeof(task->combined_worker[id]);
 		}
 		
+		task->barrier = _starpu_sink_common_get_barrier(node,task->combined_workerid,task->combined_worker_size);
 	}
 
-	thread_arg->coreid = *(unsigned *) arg_ptr;
-	arg_ptr += sizeof(thread_arg->coreid);
+	task->coreid = *(unsigned *) arg_ptr;
+	arg_ptr += sizeof(task->coreid);
 
 	nb_interfaces = *(unsigned *) arg_ptr;
 	arg_ptr += sizeof(nb_interfaces);
@@ -396,20 +376,21 @@ void _starpu_sink_common_execute(const struct _starpu_mp_node *node,
 	 * interfaces, thus we expect the same size anyway */
 	for (id = 0; id < nb_interfaces; id++)
 	{
-		thread_arg->interfaces[id] = arg_ptr;
+		task->interfaces[id] = arg_ptr;
 		arg_ptr += sizeof(union _starpu_interface);
 	}
 
 	/* Was cl_arg sent ? */
 	if (arg_size > arg_ptr - arg)
-		thread_arg->cl_arg = arg_ptr;
+		task->cl_arg = arg_ptr;
 	else
-		thread_arg->cl_arg = NULL;
+		task->cl_arg = NULL;
 
-	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", thread_arg->kernel);
+	
+	//_STARPU_DEBUG("telling host that we have submitted the task %p.\n", task->kernel);
 	_starpu_mp_common_send_command(node, STARPU_EXECUTION_SUBMITTED,
 				       NULL, 0);
 
-	//_STARPU_DEBUG("executing the task %p\n", thread_arg->kernel);
-	_starpu_sink_common_execute_thread(node, thread_arg);	
+	//_STARPU_DEBUG("executing the task %p\n", task->kernel);
+	_starpu_sink_common_execute_thread(node, task);	
 }

+ 9 - 1
src/drivers/mp_common/sink_common.h

@@ -32,6 +32,14 @@ struct _starpu_sink_topology
 	unsigned nb_cpus;
 };
 
+struct arg_sink_thread
+{
+	struct mp_task ** task;
+	struct _starpu_mp_node *node;
+	pthread_mutex_t* mutex;
+	int coreid;
+};
+
 void _starpu_sink_common_worker(void);
 
 void _starpu_sink_common_execute(const struct _starpu_mp_node *node, void *arg, int arg_size);
@@ -40,7 +48,7 @@ void _starpu_sink_nbcores (const struct _starpu_mp_node *node);
 void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node, void *arg, int arg_size);
 void _starpu_sink_common_free(const struct _starpu_mp_node *mp_node STARPU_ATTRIBUTE_UNUSED, void *arg, int arg_size);
 
-void _starpu_sink_common_init(struct _starpu_mp_node *node);
+void* _starpu_sink_thread(void * thread_arg);
 
 #endif /* STARPU_USE_MP */
 

+ 106 - 90
src/drivers/mp_common/source_common.c

@@ -30,7 +30,7 @@
 #include <drivers/mp_common/mp_common.h>
 
 
-static int
+	static int
 _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
 {
 	uint32_t mask = 0;
@@ -38,14 +38,16 @@ _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *w
 	struct timespec codelet_end;
 
 	_starpu_driver_end_job(worker, j, worker->perf_arch, &codelet_end, 0,
-			       profiling);
+			profiling);
 
 	_starpu_driver_update_job_feedback(j, worker, worker->perf_arch,
-					   &j->cl_start, &codelet_end,
-					   profiling);
-
-	_starpu_push_task_output (j, mask);
+			&j->cl_start, &codelet_end,
+			profiling);
 
+	if(worker->current_rank == 0)
+	{
+		_starpu_push_task_output (j, mask);
+	}
 	_starpu_handle_job_termination(j);
 
 	return 0;
@@ -53,7 +55,7 @@ _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *w
 
 
 
-static int
+	static int
 _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset, void * arg, int arg_size STARPU_ATTRIBUTE_UNUSED)
 {
 	void *arg_ptr = arg;
@@ -66,9 +68,16 @@ _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset,
 	struct starpu_task *task = worker->current_task;
 	struct _starpu_job *j = _starpu_get_job_associated_to_task (task);
 
-	_starpu_src_common_finalize_job (j, worker);
+	struct _starpu_worker * old_worker = _starpu_get_local_worker_key();
+	_starpu_set_local_worker_key(worker);
+	if(worker->current_rank == 0)
+	{
+		_starpu_src_common_finalize_job (j, worker);
+	}
 	worker->current_task = NULL;
 
+	_starpu_set_local_worker_key(old_worker);
+
 	return 0;
 }
 
@@ -78,25 +87,26 @@ _starpu_src_common_process_completed_job (struct _starpu_worker_set *workerset,
  * return 1 if the message has been handle
  */
 static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node, 
-				    void ** arg, int* arg_size, 
-				    enum _starpu_mp_command *answer)
+		void ** arg, int* arg_size, 
+		enum _starpu_mp_command *answer)
 {
 	struct _starpu_worker_set * worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
 	*answer = _starpu_mp_common_recv_command(node, arg, arg_size);
 	switch(*answer) 
 	{
-	case STARPU_EXECUTION_COMPLETED:
-		_starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);
-		break;
-	default:
-		return 0;
-		break;
+		case STARPU_EXECUTION_COMPLETED:
+			_starpu_src_common_process_completed_job (worker_set, *arg, *arg_size);
+			break;
+		default:
+			return 0;
+			break;
 	}
+
 	return 1;
 }
 
 enum _starpu_mp_command _starpu_src_common_wait_command_sync(const struct _starpu_mp_node *node, 
-							     void ** arg, int* arg_size)
+		void ** arg, int* arg_size)
 {
 	enum _starpu_mp_command answer;
 	while(_starpu_src_common_handle_async(node,arg,arg_size,&answer));
@@ -109,16 +119,16 @@ void _starpu_src_common_recv_async(struct _starpu_mp_node * baseworker_node)
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
-  
+
 	if(!_starpu_src_common_handle_async(baseworker_node,&arg,&arg_size,&answer))
 	{
-	printf("incorrect commande: unknown command or sync command");
-	STARPU_ASSERT(0);
+		printf("incorrect commande: unknown command or sync command");
+		STARPU_ASSERT(0);
 	}	
 }
 
 
-int
+	int
 _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf)
 {
 	// Send a request to the sink NODE for the number of cores on it.
@@ -144,7 +154,7 @@ _starpu_src_common_sink_nbcores (const struct _starpu_mp_node *node, int *buf)
  * else it returns -ESPIPE if the function was not found.
  */
 int _starpu_src_common_lookup(struct _starpu_mp_node *node,
-			      void (**func_ptr)(void), const char *func_name)
+		void (**func_ptr)(void), const char *func_name)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
@@ -155,10 +165,10 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
 
 	//_STARPU_DEBUG("Looking up %s\n", func_name);
 	_starpu_mp_common_send_command(node, STARPU_LOOKUP, (void *) func_name,
-				       arg_size);
+			arg_size);
 
 	answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
-						      &arg_size);
+			&arg_size);
 
 	if (answer == STARPU_ERROR_LOOKUP) 
 	{
@@ -189,13 +199,13 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
  * Data interfaces in task are send to the sink.
  */
 int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
-				      void (*kernel)(void), unsigned coreid,
-				      enum starpu_codelet_type type,
-				      int is_parallel_task, int cb_workerid,
-				      starpu_data_handle_t *handles,
-				      void **interfaces,
-				      unsigned nb_interfaces,
-				      void *cl_arg, size_t cl_arg_size)
+		void (*kernel)(void), unsigned coreid,
+		enum starpu_codelet_type type,
+		int is_parallel_task, int cb_workerid,
+		starpu_data_handle_t *handles,
+		void **interfaces,
+		unsigned nb_interfaces,
+		void *cl_arg, size_t cl_arg_size)
 {
 
 	void *buffer, *buffer_ptr, *arg =NULL;
@@ -205,16 +215,13 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 
 	buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
 		+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
-
+	
 	/*if the task is paralle*/
-	if(type == STARPU_FORKJOIN && is_parallel_task)
+	if(is_parallel_task)
 	{
-		_STARPU_DEBUG("\n Parallele\n");
-		_STARPU_DEBUG("type:%d\n",type);
-		_STARPU_DEBUG("cb_workerid:%d\n",cb_workerid);
 		cb_worker = _starpu_get_combined_worker_struct(cb_workerid);
 		cb_worker_size = cb_worker->worker_size;
-		buffer_size = sizeof(cb_worker_size) + cb_worker_size * sizeof(devid);
+		buffer_size += sizeof(cb_workerid) + sizeof(cb_worker_size) + cb_worker_size * sizeof(devid);
 	}
 
 	/* If the user didn't give any cl_arg, there is no need to send it */
@@ -224,7 +231,6 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 		buffer_size += cl_arg_size;
 	}
 	
-
 	/* We give to send_command a buffer we just allocated, which contains
 	 * a pointer to the function (sink-side), core on which execute this
 	 * function (sink-side), number of interfaces we send,
@@ -240,9 +246,12 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	*(int *) buffer_ptr = is_parallel_task;
 	buffer_ptr += sizeof(is_parallel_task);
 
-	if(type == STARPU_FORKJOIN && is_parallel_task)
+	if(is_parallel_task)
 	{
 
+		*(int *) buffer_ptr = cb_workerid;
+		buffer_ptr += sizeof(cb_workerid);
+
 		*(int *) buffer_ptr = cb_worker_size;
 		buffer_ptr += sizeof(cb_worker_size);
 
@@ -253,7 +262,7 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 			buffer_ptr += sizeof(devid);
 		}
 	}
-		
+
 	*(unsigned *) buffer_ptr = coreid;
 	buffer_ptr += sizeof(coreid);
 
@@ -268,7 +277,7 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 	{
 		starpu_data_handle_t handle = handles[i];
 		memcpy (buffer_ptr, interfaces[i],
-			handle->ops->interface_size);
+				handle->ops->interface_size);
 		/* The sink side has no mean to get the type of each
 		 * interface, we use a union to make it generic and permit the
 		 * sink to go through the array */
@@ -283,7 +292,7 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 
 	if (answer == STARPU_ERROR_EXECUTE)
 		return -EINVAL;
-	
+
 	STARPU_ASSERT(answer == STARPU_EXECUTION_SUBMITTED);
 
 	free(buffer);
@@ -292,10 +301,10 @@ int _starpu_src_common_execute_kernel(const struct _starpu_mp_node *node,
 }
 
 static int _starpu_src_common_execute(struct _starpu_job *j, 
-				      struct _starpu_worker *worker, 
-				      struct _starpu_mp_node * node)
+		struct _starpu_worker *worker, 
+		struct _starpu_mp_node * node)
 {
-        int ret;
+	int ret;
 	uint32_t mask = 0;
 
 	STARPU_ASSERT(j);
@@ -304,33 +313,42 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
 	int profiling = starpu_profiling_status_get();
 
 	STARPU_ASSERT(task);
-	
-	ret = _starpu_fetch_task_input(j, mask);
-	if (ret != 0)
+	if (worker->current_rank == 0) 
 	{
-		/* there was not enough memory, so the input of
-		 * the codelet cannot be fetched ... put the
-		 * codelet back, and try it later */
-		return -EAGAIN;
+		ret = _starpu_fetch_task_input(j, mask);
+		if (ret != 0)
+		{
+			/* there was not enough memory, so the input of
+			 * the codelet cannot be fetched ... put the
+			 * codelet back, and try it later */
+			return -EAGAIN;
+		}
 	}
 
 	void (*kernel)(void)  = node->get_kernel_from_job(node,j);
 
 
-	_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
+	if ((worker->current_rank == 0) || (task->cl->type != STARPU_FORKJOIN))
+	{
 
-	_STARPU_DEBUG("j->task_size:%d\n",j->task_size);	
-	_STARPU_DEBUG("j->cb_workerid:%d\n",j->combined_workerid);	
+		_starpu_driver_start_job(worker, j, &j->cl_start, 0, profiling);
 
-	_STARPU_DEBUG("cb_worker_count:%d\n",starpu_combined_worker_get_count());
 
+		_STARPU_DEBUG("workerid:%d\n",worker->devid);
+		_STARPU_DEBUG("rank:%d\n",worker->current_rank);
+		_STARPU_DEBUG("type:%d\n",task->cl->type);
+		_STARPU_DEBUG("is_parallel_task:%d\n",(j->task_size > 1));
+		_STARPU_DEBUG("cb_workerid:%d\n",j->combined_workerid);
+		_STARPU_DEBUG("j->task_size:%d\n",j->task_size);
+		_STARPU_DEBUG("cb_worker_count:%d\n\n",starpu_combined_worker_get_count());
 
-	_starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
-					  (j->task_size > 1),
-					  j->combined_workerid, task->handles,
-					  task->interfaces, task->cl->nbuffers,
-					  task->cl_arg, task->cl_arg_size);
+		_starpu_src_common_execute_kernel(node, kernel, worker->devid, task->cl->type,
+				(j->task_size > 1),
+				j->combined_workerid, task->handles,
+				task->interfaces, task->cl->nbuffers,
+				task->cl_arg, task->cl_arg_size);
 
+	}
 	return 0;
 }
 
@@ -342,14 +360,14 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
  * else it returns 1 if the allocation fail.
  */
 int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
-				void **addr, size_t size)
+		void **addr, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
 	int arg_size;
 
 	_starpu_mp_common_send_command(mp_node, STARPU_ALLOCATE, &size,
-				       sizeof(size));
+			sizeof(size));
 
 	answer = _starpu_mp_common_recv_command(mp_node, &arg, &arg_size);
 
@@ -357,7 +375,7 @@ int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
 		return 1;
 
 	STARPU_ASSERT(answer == STARPU_ANSWER_ALLOCATE &&
-		      arg_size == sizeof(*addr));
+			arg_size == sizeof(*addr));
 
 	memcpy(addr, arg, arg_size);
 
@@ -368,15 +386,15 @@ int _starpu_src_common_allocate(const struct _starpu_mp_node *mp_node,
  * area pointed by ADDR.
  */
 void _starpu_src_common_free(const struct _starpu_mp_node *mp_node,
-			     void *addr)
+		void *addr)
 {
 	_starpu_mp_common_send_command(mp_node, STARPU_FREE, &addr, sizeof(addr));
 }
 
 /* Send SIZE bytes pointed by SRC to DST on the sink linked to the MP_NODE.
- */
+*/
 int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size)
+		void *src, void *dst, size_t size)
 {
 	struct _starpu_mp_transfer_command cmd = {size, dst};
 
@@ -387,9 +405,9 @@ int _starpu_src_common_copy_host_to_sink(const struct _starpu_mp_node *mp_node,
 }
 
 /* Receive SIZE bytes pointed by SRC on the sink linked to the MP_NODE and store them in DST.
- */
+*/
 int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
-					 void *src, void *dst, size_t size)
+		void *src, void *dst, size_t size)
 {
 	struct _starpu_mp_transfer_command cmd = {size, src};
 
@@ -403,7 +421,7 @@ int _starpu_src_common_copy_sink_to_host(const struct _starpu_mp_node *mp_node,
  * to the sink linked to DST_NODE. The latter store them in DST.
  */
 int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
-					 const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
+		const struct _starpu_mp_node *dst_node, void *src, void *dst, size_t size)
 {
 	enum _starpu_mp_command answer;
 	void *arg;
@@ -433,7 +451,7 @@ int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
  * MPI).
  */
 static void _starpu_src_common_cat_3(char *final, const char *first, 
-				     const char *second, const char *third)
+		const char *second, const char *third)
 {
 	strcpy(final, first);
 	strcat(final, second);
@@ -471,9 +489,9 @@ static int _starpu_src_common_test_suffixes(char *located_file_name, const char
 }
 
 int _starpu_src_common_locate_file(char *located_file_name,
-				   const char *env_file_name, const char *env_mic_path,
-				   const char *config_file_name, const char *actual_file_name,
-				   const char **suffixes)
+		const char *env_file_name, const char *env_mic_path,
+		const char *config_file_name, const char *actual_file_name,
+		const char **suffixes)
 {
 	if (env_file_name != NULL)
 	{
@@ -541,13 +559,13 @@ int _starpu_src_common_locate_file(char *located_file_name,
 }
 
 void _starpu_src_common_worker(struct _starpu_worker_set * worker_set, 
-			       unsigned baseworkerid, 
-			       struct _starpu_mp_node * mp_node)
+		unsigned baseworkerid, 
+		struct _starpu_mp_node * mp_node)
 { 
 	struct _starpu_worker * baseworker = &worker_set->workers[baseworkerid];
 	unsigned memnode = baseworker->memory_node;
 	struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
- 
+
 	/*main loop*/
 	while (_starpu_machine_is_running())
 	{
@@ -561,7 +579,8 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 		/* poll the device for completed jobs.*/
 		if (mp_node->mp_recv_is_ready(mp_node))
 			_starpu_src_common_recv_async(mp_node);
-		
+
+
 		/* get task for each worker*/
 		res = _starpu_get_multi_worker_task(worker_set->workers, tasks, worker_set->nworkers);
 
@@ -575,23 +594,20 @@ void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
 				if(tasks[i] != NULL)
 				{
 					j = _starpu_get_job_associated_to_task(tasks[i]);
-			
-					worker_set->workers[i].current_task = j->task;
-
+					_starpu_set_local_worker_key(&worker_set->workers[i]);
 					res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
-		
 					if (res)
 					{
 						switch (res)
 						{
-						case -EAGAIN:
-							_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
-							_starpu_push_task_to_workers(tasks[i]);
-							STARPU_ABORT();
-							continue;
-							break;
-						default:
-							STARPU_ASSERT(0);
+							case -EAGAIN:
+								_STARPU_DISP("ouch, Xeon Phi could not actually run task %p, putting it back...\n", tasks[i]);
+								_starpu_push_task_to_workers(tasks[i]);
+								STARPU_ABORT();
+								continue;
+								break;
+							default:
+								STARPU_ASSERT(0);
 						}
 					}
 					//_STARPU_DEBUG(" exec fin\n");

+ 0 - 34
src/drivers/mp_common/task_fifo.c

@@ -1,34 +0,0 @@
-#include "task_fifo.h"
-
-void task_fifo_init(struct mp_task_fifo* fifo){
-  fifo->first = fifo->last = NULL;
-  pthread_mutex_init(&(fifo->mutex), NULL);
-}
-
-int task_fifo_is_empty(struct mp_task_fifo* fifo){
-  return fifo->first == NULL;
-}
-
-void task_fifo_append(struct mp_task_fifo* fifo, struct mp_task * task){
-  pthread_mutex_lock(&(fifo->mutex));
-  if(task_fifo_is_empty(fifo)){
-    fifo->first = fifo->last = task;
-  }
-  else{
-    fifo->last->next = task;
-    fifo->last = task;
-  }
-  task->next = NULL;
-  pthread_mutex_unlock(&(fifo->mutex));
-}
-
-void task_fifo_pop(struct mp_task_fifo* fifo){
-  pthread_mutex_lock(&(fifo->mutex));
-  if(!task_fifo_is_empty(fifo)){
-    if(fifo->first == fifo->last)
-      fifo->first = fifo->last = NULL;
-    else
-      fifo->first = fifo->first->next; 
-  }
-  pthread_mutex_unlock(&(fifo->mutex));
-}

+ 0 - 62
src/drivers/mp_common/task_fifo.h

@@ -1,62 +0,0 @@
-/* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2012  Inria
- *
- * StarPU is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation; either version 2.1 of the License, or (at
- * your option) any later version.
- *
- * StarPU is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- *
- * See the GNU Lesser General Public License in COPYING.LGPL for more details.
- */
-
-#ifndef __TASK_FIFO_H__
-#define __TASK_FIFO_H__
-
-#include <pthread.h>
-
-#include <starpu.h>
-#include <common/config.h>
-
-
-struct mp_task{
-	struct _starpu_mp_node *node;
-	void (*kernel)(void **, void *);
-	void *interfaces[STARPU_NMAXBUFS]; 
-	void *cl_arg;
-	unsigned coreid;
-	enum starpu_codelet_type type;
-	int is_parallel_task;
-	int combined_worker_size;
-	int combined_worker[STARPU_NMAXWORKERS];
-
-	/*the next task of the fifo*/
-	struct mp_task * next;
-};
-
-
-struct mp_task_fifo{
-	/*the first task of the fifo*/
-	struct mp_task * first;
-  
-	/*the last task of the fifo*/
-	struct mp_task * last;
-
-	/*mutex to protect concurrent access on the fifo*/
-	pthread_mutex_t mutex;
-};
-
-
-void task_fifo_init(struct mp_task_fifo* fifo);
-
-int task_fifo_is_empty(struct mp_task_fifo* fifo);
-
-void task_fifo_append(struct mp_task_fifo* fifo, struct mp_task * task);
-
-void task_fifo_pop(struct mp_task_fifo* fifo);
-
-#endif /*__TASK_FIFO_H__*/

+ 4 - 1
src/drivers/scc/driver_scc_sink.c

@@ -34,7 +34,10 @@ void _starpu_scc_sink_init(struct _starpu_mp_node *node)
 	 ****************/
 	node->nb_cores = 1; 
 
-	_starpu_sink_common_init(node);
+	/*****************
+	 *     TODO      *
+	 * init thread   *
+	 *****************/
 }
 
 void _starpu_scc_sink_deinit(struct _starpu_mp_node *node)

+ 16 - 19
src/sched_policies/parallel_eager.c

@@ -29,7 +29,7 @@ struct _starpu_peager_data
         starpu_pthread_mutex_t policy_mutex;
 };
 
-#define STARPU_NMAXCOMBINED_WORKERS 10
+#define STARPU_NMAXCOMBINED_WORKERS 200
 /* XXX instead of 10, we should use some "MAX combination .."*/
 static int possible_combinations_cnt[STARPU_NMAXWORKERS];
 static int possible_combinations[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
@@ -42,17 +42,12 @@ static int possible_combinations_size[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WO
 
 static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
 {
+	_starpu_sched_find_worker_combinations(workerids, nworkers);
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned nbasic_workers = starpu_worker_get_count();
-	unsigned ncombined_workers = starpu_combined_worker_get_count();
+	unsigned ncombined_workers= starpu_combined_worker_get_count();
 	unsigned ntotal_workers = nbasic_workers + ncombined_workers;
-		
-	_starpu_sched_find_worker_combinations(workerids, nworkers);
-
 	unsigned workerid, i;
-	unsigned ncombinedworkers;
-
-	ncombinedworkers = starpu_combined_worker_get_count();
 
 	/* Find the master of each worker. We first assign the worker as its
 	 * own master, and then iterate over the different worker combinations
@@ -69,9 +64,9 @@ static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned n
 	}
 
 
-	for (i = 0; i < ncombinedworkers; i++)
+	for (i = 0; i < ncombined_workers; i++)
 	{
-		workerid = ntotal_workers + i;
+		workerid = nbasic_workers + i;
 
 		/* Note that we ASSUME that the workers are sorted by size ! */
 		int *workers;
@@ -79,20 +74,17 @@ static void peager_add_workers(unsigned sched_ctx_id, int *workerids, unsigned n
 		starpu_combined_worker_get_description(workerid, &size, &workers);
 
 		int master = workers[0];
-
 		int j;
 		for (j = 0; j < size; j++)
 		{
 			if (data->master_id[workers[j]] > master)
 				data->master_id[workers[j]] = master;
-
 			int cnt = possible_combinations_cnt[workers[j]]++;
 			possible_combinations[workers[j]][cnt] = workerid;
 			possible_combinations_size[workers[j]][cnt] = size;
 		}
 	}
 
-
 	for(i = 0; i < nworkers; i++)
 	{
 		workerid = workerids[i];
@@ -178,9 +170,10 @@ static int push_task_peager_policy(struct starpu_task *task)
 	{
 		worker = workers->get_next(workers, &it);
 		int master = data->master_id[worker];
-		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
-		if (!starpu_worker_is_combined_worker(worker) &&
-				starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
+		/* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
+		if (!starpu_worker_is_combined_worker(worker) && 
+		    starpu_worker_get_type(worker) != STARPU_MIC_WORKER &&
+		    starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
 		{
 			starpu_pthread_mutex_t *sched_mutex;
 			starpu_pthread_cond_t *sched_cond;
@@ -200,8 +193,8 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 
 	int workerid = starpu_worker_get_id();
 
-	/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
-	if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
+	/* If this is not a CPU or a MIC, then the worker simply grabs tasks from the fifo */
+	if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER && starpu_worker_get_type(workerid) != STARPU_MIC_WORKER)
 	{
 		struct starpu_task *task = NULL;
 		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
@@ -211,8 +204,12 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		return task;
 	}
 
+
 	int master = data->master_id[workerid];
 
+	//_STARPU_DEBUG("workerid:%d, master:%d\n",workerid,master);
+
+
 	if (master == workerid)
 	{
 		/* The worker is a master */
@@ -250,9 +247,9 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		/* Is this a basic worker or a combined worker ? */
 		int nbasic_workers = (int)starpu_worker_get_count();
 		int is_basic_worker = (best_workerid < nbasic_workers);
-
 		if (is_basic_worker)
 		{
+
 			/* The master is alone */
 			return task;
 		}

+ 3 - 3
tests/parallel_tasks/parallel_kernels.c

@@ -21,7 +21,7 @@
 #include <unistd.h>
 #include "../helper.h"
 
-#define N	1000
+#define N	1
 #define VECTORSIZE	1024
 
 void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
@@ -29,7 +29,7 @@ void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 	STARPU_SKIP_IF_VALGRIND;
 
 	int worker_size = starpu_combined_worker_get_size();
-	STARPU_ASSERT(worker_size > 0);
+	//STARPU_ASSERT(worker_size > 0);
 	usleep(1000/worker_size);
 #if 0
 	int id = starpu_worker_get_id();
@@ -66,7 +66,7 @@ int main(int argc, char **argv)
 
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
-	conf.sched_policy_name = "pheft";
+	conf.sched_policy_name = "peager";
 	conf.calibrate = 1;
 
 	ret = starpu_init(&conf);

+ 5 - 5
tests/parallel_tasks/parallel_kernels_spmd.c

@@ -21,17 +21,17 @@
 #include <unistd.h>
 #include "../helper.h"
 
-#define N	1000
+#define N	100
 #define VECTORSIZE	1024
 
 void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
-	int worker_size = starpu_combined_worker_get_size();
-	STARPU_ASSERT(worker_size > 0);
+//	int worker_size = starpu_combined_worker_get_size();
+//	STARPU_ASSERT(worker_size > 0);
 
-	usleep(1000/worker_size);
+//	usleep(1000/worker_size);
 #if 0
 	int id = starpu_worker_get_id();
 	int combined_id = starpu_combined_worker_get_id();
@@ -68,7 +68,7 @@ int main(int argc, char **argv)
 
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
-	conf.sched_policy_name = "pheft";
+	conf.sched_policy_name = "peager";
 	conf.calibrate = 1;
 
 	ret = starpu_init(&conf);

+ 6 - 4
tests/parallel_tasks/spmd_peager.c

@@ -20,7 +20,7 @@
 #include <unistd.h>
 #include "../helper.h"
 
-#define N	1000
+#define N	1
 #define VECTORSIZE	1024
 
 starpu_data_handle_t v_handle;
@@ -30,12 +30,14 @@ void codelet_null(void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
 {
 	STARPU_SKIP_IF_VALGRIND;
 
-	int worker_size = starpu_combined_worker_get_size();
-	STARPU_ASSERT(worker_size > 0);
+//	int worker_size = starpu_combined_worker_get_size();
+//	STARPU_ASSERT(worker_size > 0);
 
 //	FPRINTF(stderr, "WORKERSIZE : %d\n", worker_size);
 
-	usleep(1000/worker_size);
+//	usleep(1000/worker_size);
+
+	usleep(1000);
 #if 0
 	int id = starpu_worker_get_id();
 	int combined_id = starpu_combined_worker_get_id();