Browse Source

synchronization issue(pop is protected by a single mutex/cond, the one of the worker & eager has a special mutex to avoid several workers to get the same task) ->only eager is fixed,
worker_collection has an iterator passed as parametter (to be fixed in order not to have the starpu_iterator structure public)

Andra Hugo 12 years ago
parent
commit
4146573b69

+ 10 - 7
include/starpu_sched_ctx.h

@@ -28,6 +28,13 @@ extern "C"
 #  warning rename all objects to start with starpu_sched_ctx
 #  warning rename all objects to start with starpu_sched_ctx
 #endif
 #endif
 
 
+//struct starpu_iterator;
+struct starpu_iterator
+{
+	int cursor;
+};
+
+
 /* generic structure used by the scheduling contexts to iterate the workers */
 /* generic structure used by the scheduling contexts to iterate the workers */
 struct starpu_sched_ctx_worker_collection
 struct starpu_sched_ctx_worker_collection
 {
 {
@@ -35,14 +42,12 @@ struct starpu_sched_ctx_worker_collection
 	void *workerids;
 	void *workerids;
 	/* the number of workers in the collection */
 	/* the number of workers in the collection */
 	unsigned nworkers;
 	unsigned nworkers;
-	/* the current cursor of the collection*/
-	pthread_key_t cursor_key;
 	/* the type of structure (WORKER_LIST,...) */
 	/* the type of structure (WORKER_LIST,...) */
 	int type;
 	int type;
 	/* checks if there is another element in collection */
 	/* checks if there is another element in collection */
-	unsigned (*has_next)(struct starpu_sched_ctx_worker_collection *workers);
+	unsigned (*has_next)(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it);
 	/* return the next element in the collection */
 	/* return the next element in the collection */
-	int (*get_next)(struct starpu_sched_ctx_worker_collection *workers);
+	int (*get_next)(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it);
 	/* add a new element in the collection */
 	/* add a new element in the collection */
 	int (*add)(struct starpu_sched_ctx_worker_collection *workers, int worker);
 	int (*add)(struct starpu_sched_ctx_worker_collection *workers, int worker);
 	/* remove an element from the collection */
 	/* remove an element from the collection */
@@ -52,9 +57,7 @@ struct starpu_sched_ctx_worker_collection
 	/* free the structure */
 	/* free the structure */
 	void (*deinit)(struct starpu_sched_ctx_worker_collection *workers);
 	void (*deinit)(struct starpu_sched_ctx_worker_collection *workers);
 	/* initialize the cursor if there is one */
 	/* initialize the cursor if there is one */
-	void (*init_cursor)(struct starpu_sched_ctx_worker_collection *workers);
-	/* free the cursor if there is one */
-	void (*deinit_cursor)(struct starpu_sched_ctx_worker_collection *workers);
+	void (*init_iterator)(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it);
 };
 };
 
 
 /* types of structures the worker collection can implement */
 /* types of structures the worker collection can implement */

+ 20 - 25
sched_ctx_hypervisor/src/hypervisor_policies/policy_tools.c

@@ -28,17 +28,16 @@ static int _compute_priority(unsigned sched_ctx)
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	int worker;
 	int worker;
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		total_priority += config->priority[worker];
 		total_priority += config->priority[worker];
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 	return total_priority;
 	return total_priority;
 }
 }
 
 
@@ -114,15 +113,16 @@ int* _get_first_workers(unsigned sched_ctx, int *nworkers, enum starpu_archtype
 	int worker;
 	int worker;
 	int considered = 0;
 	int considered = 0;
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
 	for(index = 0; index < *nworkers; index++)
 	for(index = 0; index < *nworkers; index++)
 	{
 	{
-		while(workers->has_next(workers))
+		while(workers->has_next(workers, &it))
 		{
 		{
 			considered = 0;
 			considered = 0;
-			worker = workers->get_next(workers);
+			worker = workers->get_next(workers, &it);
 			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 			if(arch == STARPU_ANY_WORKER || curr_arch == arch)
 			if(arch == STARPU_ANY_WORKER || curr_arch == arch)
 			{
 			{
@@ -169,8 +169,6 @@ int* _get_first_workers(unsigned sched_ctx, int *nworkers, enum starpu_archtype
 		}
 		}
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 
 
 	return curr_workers;
 	return curr_workers;
 }
 }
@@ -183,11 +181,12 @@ unsigned _get_potential_nworkers(struct starpu_sched_ctx_hypervisor_policy_confi
 	unsigned potential_workers = 0;
 	unsigned potential_workers = 0;
 	int worker;
 	int worker;
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
-	while(workers->has_next(workers))
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
                 if(arch == STARPU_ANY_WORKER || curr_arch == arch)
                 if(arch == STARPU_ANY_WORKER || curr_arch == arch)
                 {
                 {
@@ -195,8 +194,6 @@ unsigned _get_potential_nworkers(struct starpu_sched_ctx_hypervisor_policy_confi
 				potential_workers++;
 				potential_workers++;
 		}
 		}
 	}
 	}
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 
 
 	return potential_workers;
 	return potential_workers;
 }
 }
@@ -307,12 +304,13 @@ static double _get_elapsed_flops(struct starpu_sched_ctx_hypervisor_wrapper* sc_
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sc_w->sched_ctx);
         int worker;
         int worker;
 
 
-	if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
 
 
-        while(workers->has_next(workers))
+        while(workers->has_next(workers, &it))
 	{
 	{
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
                 enum starpu_archtype arch = starpu_worker_get_type(worker);
                 enum starpu_archtype arch = starpu_worker_get_type(worker);
                 if(arch == req_arch)
                 if(arch == req_arch)
                 {
                 {
@@ -321,9 +319,6 @@ static double _get_elapsed_flops(struct starpu_sched_ctx_hypervisor_wrapper* sc_
                 }
                 }
         }
         }
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	return ret_val;
 	return ret_val;
 }
 }
 
 

+ 17 - 22
sched_ctx_hypervisor/src/hypervisor_policies/simple_policy.c

@@ -25,18 +25,16 @@ static int _compute_priority(unsigned sched_ctx)
 
 
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	int worker;
 	int worker;
+	
+	starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
-
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		total_priority += config->priority[worker];
 		total_priority += config->priority[worker];
 	}
 	}
-
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 	return total_priority;
 	return total_priority;
 }
 }
 
 
@@ -87,15 +85,16 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_arch
 	int worker;
 	int worker;
 	int considered = 0;
 	int considered = 0;
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
 	for(index = 0; index < *nworkers; index++)
 	for(index = 0; index < *nworkers; index++)
 	{
 	{
-		while(workers->has_next(workers))
+		while(workers->has_next(workers, &it))
 		{
 		{
 			considered = 0;
 			considered = 0;
-			worker = workers->get_next(workers);
+			worker = workers->get_next(workers, &it);
 			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 			enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 			if(arch == 0 || curr_arch == arch)
 			if(arch == 0 || curr_arch == arch)
 			{
 			{
@@ -142,9 +141,6 @@ int* _get_first_workers(unsigned sched_ctx, unsigned *nworkers, enum starpu_arch
 		}
 		}
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	return curr_workers;
 	return curr_workers;
 }
 }
 
 
@@ -154,12 +150,13 @@ static unsigned _get_potential_nworkers(struct starpu_sched_ctx_hypervisor_polic
 
 
 	unsigned potential_workers = 0;
 	unsigned potential_workers = 0;
 	int worker;
 	int worker;
-
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
-	while(workers->has_next(workers))
+	
+	starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
                 if(arch == 0 || curr_arch == arch)
                 if(arch == 0 || curr_arch == arch)
                 {
                 {
@@ -167,8 +164,6 @@ static unsigned _get_potential_nworkers(struct starpu_sched_ctx_hypervisor_polic
 				potential_workers++;
 				potential_workers++;
 		}
 		}
 	}
 	}
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 
 
 	return potential_workers;
 	return potential_workers;
 }
 }

+ 5 - 4
sched_ctx_hypervisor/src/sched_ctx_hypervisor.c

@@ -330,12 +330,13 @@ int get_nworkers_ctx(unsigned sched_ctx, enum starpu_archtype arch)
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx);
 	int worker;
 	int worker;
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		if(curr_arch == arch || arch == STARPU_ANY_WORKER)
 		if(curr_arch == arch || arch == STARPU_ANY_WORKER)
 			nworkers_ctx++;
 			nworkers_ctx++;

+ 20 - 29
src/core/sched_ctx.c

@@ -838,8 +838,7 @@ struct starpu_sched_ctx_worker_collection* starpu_sched_ctx_create_worker_collec
 		sched_ctx->workers->remove = worker_list.remove;
 		sched_ctx->workers->remove = worker_list.remove;
 		sched_ctx->workers->init = worker_list.init;
 		sched_ctx->workers->init = worker_list.init;
 		sched_ctx->workers->deinit = worker_list.deinit;
 		sched_ctx->workers->deinit = worker_list.deinit;
-		sched_ctx->workers->init_cursor = worker_list.init_cursor;
-		sched_ctx->workers->deinit_cursor = worker_list.deinit_cursor;
+		sched_ctx->workers->init_iterator = worker_list.init_iterator;
 		sched_ctx->workers->type = WORKER_LIST; 
 		sched_ctx->workers->type = WORKER_LIST; 
 		break;
 		break;
 	}
 	}
@@ -852,17 +851,16 @@ static unsigned _get_workers_list(struct starpu_sched_ctx_worker_collection *wor
 	*workerids = (int*)malloc(workers->nworkers*sizeof(int));
 	*workerids = (int*)malloc(workers->nworkers*sizeof(int));
 	int worker;
 	int worker;
 	int i = 0;
 	int i = 0;
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 	
 	
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		(*workerids)[i++] = worker;
 		(*workerids)[i++] = worker;
 	}
 	}
 	
 	
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 	return 0;
 	return 0;
 }
 }
 void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
 void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id)
@@ -887,20 +885,18 @@ int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu
 	int worker;
 	int worker;
 
 
 	int npus = 0;
 	int npus = 0;
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 	
 	
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
-	
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		enum starpu_archtype curr_arch = starpu_worker_get_type(worker);
 		if(curr_arch == arch)
 		if(curr_arch == arch)
 			pus[npus++] = worker;
 			pus[npus++] = worker;
 	}
 	}
 	
 	
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 	return npus;
 	return npus;
 }
 }
 
 
@@ -929,29 +925,24 @@ unsigned starpu_sched_ctx_get_nshared_workers(unsigned sched_ctx_id, unsigned sc
         int worker, worker2;
         int worker, worker2;
         int shared_workers = 0;
         int shared_workers = 0;
 
 
-        if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it1, it2;
+        if(workers->init_iterator)
+                workers->init_iterator(workers, &it1);
 
 
-        if(workers2->init_cursor)
-                workers2->init_cursor(workers2);
+        if(workers2->init_iterator)
+                workers2->init_iterator(workers2, &it2);
 
 
-        while(workers->has_next(workers))
+        while(workers->has_next(workers, &it1))
         {
         {
-                worker = workers->get_next(workers);
-                while(workers2->has_next(workers2))
+                worker = workers->get_next(workers, &it1);
+                while(workers2->has_next(workers2, &it2))
 		{
 		{
-                        worker2 = workers2->get_next(workers2);
+                        worker2 = workers2->get_next(workers2, &it2);
                         if(worker == worker2)
                         if(worker == worker2)
 				shared_workers++;
 				shared_workers++;
                 }
                 }
         }
         }
 
 
-        if (workers->deinit_cursor)
-                workers->deinit_cursor(workers);
-
-        if (workers2->deinit_cursor)
-                workers2->deinit_cursor(workers2);
-
 	return shared_workers;
 	return shared_workers;
 }
 }
 
 

+ 1 - 0
src/core/sched_ctx.h

@@ -28,6 +28,7 @@
 #define REQ_RESIZE 0
 #define REQ_RESIZE 0
 #define DO_RESIZE 1
 #define DO_RESIZE 1
 
 
+
 /* used when changes (delete, modify) are applyed to contexts */
 /* used when changes (delete, modify) are applyed to contexts */
 _starpu_pthread_mutex_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
 _starpu_pthread_mutex_t changing_ctx_mutex[STARPU_NMAX_SCHED_CTXS];
 
 

+ 6 - 6
src/core/sched_policy.c

@@ -293,18 +293,18 @@ static int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struc
 {
 {
 	int worker = -1, nworkers = 0;
 	int worker = -1, nworkers = 0;
 	struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
 	struct starpu_sched_ctx_worker_collection *workers = sched_ctx->workers;
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		if (starpu_worker_can_execute_task(worker, task, 0) && starpu_is_ctxs_turn(worker, sched_ctx->id))
 		if (starpu_worker_can_execute_task(worker, task, 0) && starpu_is_ctxs_turn(worker, sched_ctx->id))
 			nworkers++;
 			nworkers++;
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
 	return nworkers;
 	return nworkers;
 }
 }
 
 

+ 5 - 6
src/core/workers.c

@@ -1289,12 +1289,13 @@ int starpu_worker_get_nids_ctx_free_by_type(enum starpu_archtype type, int *work
 				if(config.sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
 				if(config.sched_ctxs[s].id != STARPU_NMAX_SCHED_CTXS)
 				{
 				{
 					struct starpu_sched_ctx_worker_collection *workers = config.sched_ctxs[s].workers;
 					struct starpu_sched_ctx_worker_collection *workers = config.sched_ctxs[s].workers;
-					if(workers->init_cursor)
-						workers->init_cursor(workers);
+					struct starpu_iterator it;
+					if(workers->init_iterator)
+						workers->init_iterator(workers, &it);
 
 
-					while(workers->has_next(workers))
+					while(workers->has_next(workers, &it))
 					{
 					{
-						worker = workers->get_next(workers);
+						worker = workers->get_next(workers, &it);
 						if(worker == id)
 						if(worker == id)
 						{
 						{
 							found = 1;
 							found = 1;
@@ -1302,8 +1303,6 @@ int starpu_worker_get_nids_ctx_free_by_type(enum starpu_archtype type, int *work
 						}
 						}
 					}
 					}
 
 
-					if (workers->deinit_cursor)
-						workers->deinit_cursor(workers);
 					if(found) break;
 					if(found) break;
 				}
 				}
 			}
 			}

+ 26 - 24
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -359,12 +359,13 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	unsigned nimpl;
 	unsigned nimpl;
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
 		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
@@ -445,9 +446,6 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
 
 
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	_starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 	_starpu_get_job_associated_to_task(task)->nimpl = best_impl;
 
 
 	/* we should now have the best worker in variable "best" */
 	/* we should now have the best worker in variable "best" */
@@ -481,9 +479,13 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 
-	while(workers->has_next(workers))
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
 		struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
@@ -604,27 +606,29 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 
 	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 
 
-	if(workers->init_cursor)
-		workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	compute_all_performance_predictions(task,
-										local_task_length,
-										exp_end,
-										&max_exp_end,
-										&best_exp_end,
-										local_data_penalty,
-										local_power,
-										&forced_best,
-										&forced_impl, sched_ctx_id);
 
 
+	compute_all_performance_predictions(task,
+					    local_task_length,
+					    exp_end,
+					    &max_exp_end,
+					    &best_exp_end,
+					    local_data_penalty,
+					    local_power,
+					    &forced_best,
+					    &forced_impl, sched_ctx_id);
+	
 	double best_fitness = -1;
 	double best_fitness = -1;
 
 
 	unsigned nimpl;
 	unsigned nimpl;
 	if (forced_best == -1)
 	if (forced_best == -1)
 	{
 	{
-		while(workers->has_next(workers))
+		while(workers->has_next(workers, &it))
 		{
 		{
-			worker = workers->get_next(workers);
+			worker = workers->get_next(workers, &it);
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 			{
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))
@@ -687,8 +691,6 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 
 	if (task->bundle)
 	if (task->bundle)
 		starpu_task_bundle_remove(task->bundle, task);
 		starpu_task_bundle_remove(task->bundle, task);
-        if (workers->deinit_cursor)
-                workers->deinit_cursor(workers);
 
 
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
 	//_STARPU_DEBUG("Scheduler dmda: kernel (%u)\n", best_impl);
 	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 	 _starpu_get_job_associated_to_task(task)->nimpl = selected_impl;

+ 25 - 17
src/sched_policies/eager_central_policy.c

@@ -27,6 +27,7 @@
 struct _starpu_eager_center_policy_data
 struct _starpu_eager_center_policy_data
 {
 {
 	struct _starpu_fifo_taskq *fifo;
 	struct _starpu_fifo_taskq *fifo;
+	_starpu_pthread_mutex_t policy_mutex;
 };
 };
 
 
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
@@ -41,6 +42,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 	data->fifo =  _starpu_create_fifo();
 	data->fifo =  _starpu_create_fifo();
 
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+	_STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 }
 }
 
 
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
@@ -53,7 +55,7 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 	_starpu_destroy_fifo(data->fifo);
 	_starpu_destroy_fifo(data->fifo);
 
 
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
-
+	_STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 	free(data);
 	free(data);
 }
 }
 
 
@@ -75,12 +77,13 @@ static int push_task_eager_policy(struct starpu_task *task)
 
 
 	unsigned worker = 0;
 	unsigned worker = 0;
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-	if(workers->init_cursor)
-        workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_cond_t *sched_cond;
 		_starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
@@ -89,9 +92,9 @@ static int push_task_eager_policy(struct starpu_task *task)
 		
 		
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-		worker = workers->get_next(workers);
+		worker = workers->get_next(workers, &it);
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_cond_t *sched_cond;
 		_starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
@@ -99,9 +102,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 	}
 		
 		
-	if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;
 	return ret_val;
 }
 }
@@ -109,12 +109,12 @@ static int push_task_eager_policy(struct starpu_task *task)
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-    int workerid = starpu_worker_get_id();
-
-    _starpu_pthread_mutex_t *sched_mutex;
-    _starpu_pthread_cond_t *sched_cond;
-    starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-
+	int workerid = starpu_worker_get_id();
+	
+	_starpu_pthread_mutex_t *sched_mutex;
+	_starpu_pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	
 	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
@@ -126,7 +126,15 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	unsigned workerid = starpu_worker_get_id();
 	unsigned workerid = starpu_worker_get_id();
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 
-	return _starpu_fifo_pop_task(data->fifo, workerid);
+	struct starpu_task *task = NULL;
+	if(!_starpu_fifo_empty(data->fifo))
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		 task = _starpu_fifo_pop_task(data->fifo, workerid);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+	}
+		
+	return task;
 }
 }
 
 
 struct starpu_sched_policy _starpu_sched_eager_policy =
 struct starpu_sched_policy _starpu_sched_eager_policy =

+ 18 - 19
src/sched_policies/eager_central_priority_policy.c

@@ -124,16 +124,18 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	/* wake people waiting for a task */
 	/* wake people waiting for a task */
 	unsigned worker = 0;
 	unsigned worker = 0;
     struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
     struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-    if(workers->init_cursor)
-        workers->init_cursor(workers);
 
 
-    while(workers->has_next(workers))
+    struct starpu_iterator it;
+    if(workers->init_iterator)
+	    workers->init_iterator(workers, &it);
+
+    while(workers->has_next(workers,&it))
     {
     {
-		worker = workers->get_next(workers);
-        _starpu_pthread_mutex_t *sched_mutex;
-        _starpu_pthread_cond_t *sched_cond;
-        starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-        _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	    worker = workers->get_next(workers, &it);
+	    _starpu_pthread_mutex_t *sched_mutex;
+	    _starpu_pthread_cond_t *sched_cond;
+	    starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+	    _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
     }
     }
 
 
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
@@ -142,9 +144,9 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	taskq->ntasks[priolevel]++;
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 	taskq->total_ntasks++;
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
     {
     {
-		worker = workers->get_next(workers);
+	    worker = workers->get_next(workers, &it);
         _starpu_pthread_mutex_t *sched_mutex;
         _starpu_pthread_mutex_t *sched_mutex;
         _starpu_pthread_cond_t *sched_cond;
         _starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
@@ -152,9 +154,6 @@ static int _starpu_priority_push_task(struct starpu_task *task)
         _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
         _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
     }
     }
 
 
-    if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return 0;
 	return 0;
 }
 }
@@ -219,12 +218,14 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		/* Notify another worker to do that task */
 		/* Notify another worker to do that task */
 		unsigned worker = 0;
 		unsigned worker = 0;
 		struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 		struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-		if(workers->init_cursor)
-			workers->init_cursor(workers);
+
+		struct starpu_iterator it;
+		if(workers->init_iterator)
+			workers->init_iterator(workers, &it);
 		
 		
-		while(workers->has_next(workers))
+		while(workers->has_next(workers, &it))
 		{
 		{
-			worker = workers->get_next(workers);
+			worker = workers->get_next(workers, &it);
 			if(worker != workerid)
 			if(worker != workerid)
 			{
 			{
 				_starpu_pthread_mutex_t *sched_mutex;
 				_starpu_pthread_mutex_t *sched_mutex;
@@ -234,8 +235,6 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 			}
 			}
 		}
 		}
 
 
-		if (workers->deinit_cursor)
-			workers->deinit_cursor(workers);
 	}
 	}
 
 
 	return chosen_task;
 	return chosen_task;

+ 8 - 9
src/sched_policies/parallel_eager.c

@@ -164,12 +164,14 @@ static int push_task_peager_policy(struct starpu_task *task)
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int worker = 0;
 	int worker = 0;
     struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
     struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-    if(workers->init_cursor)
-        workers->init_cursor(workers);
 
 
-    while(workers->has_next(workers))
+    struct starpu_iterator it;
+    if(workers->init_iterator)
+	    workers->init_iterator(workers, &it);
+
+    while(workers->has_next(workers, &it))
     {
     {
-        worker = workers->get_next(workers);
+	    worker = workers->get_next(workers, &it);
 		int master = data->master_id[worker];
 		int master = data->master_id[worker];
 		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
 		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
@@ -184,9 +186,9 @@ static int push_task_peager_policy(struct starpu_task *task)
 
 
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
     {
     {
-		worker = workers->get_next(workers);
+	    worker = workers->get_next(workers, &it);
 		int master = data->master_id[worker];
 		int master = data->master_id[worker];
 		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
 		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
@@ -199,9 +201,6 @@ static int push_task_peager_policy(struct starpu_task *task)
 		}
 		}
     }
     }
 
 
-    if (workers->deinit_cursor)
-        workers->deinit_cursor(workers);
-
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
 
 	return ret_val;
 	return ret_val;

+ 9 - 11
src/sched_policies/parallel_heft.c

@@ -274,12 +274,13 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 
 	/* A priori, we know all estimations */
 	/* A priori, we know all estimations */
 	int unknown = 0;
 	int unknown = 0;
-	if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
         {
         {
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 
 
 		if(!starpu_worker_is_combined_worker(worker))
 		if(!starpu_worker_is_combined_worker(worker))
 		{
 		{
@@ -298,9 +299,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 
 
 	unsigned nimpl;
 	unsigned nimpl;
 	worker_ctx = 0;
 	worker_ctx = 0;
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
 	{
 	{
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 
 
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
 		{
@@ -386,9 +387,9 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 	if (forced_best == -1)
 	if (forced_best == -1)
 	{
 	{
 		worker_ctx = 0;
 		worker_ctx = 0;
-		while(workers->has_next(workers))
+		while(workers->has_next(workers, &it))
 		{
 		{
-			worker = workers->get_next(workers);
+			worker = workers->get_next(workers, &it);
 
 
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 			{
@@ -423,9 +424,6 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 		}
 		}
 	}
 	}
 
 
-        if (workers->deinit_cursor)
-		workers->deinit_cursor(workers);
-
 	STARPU_ASSERT(forced_best != -1 || best != -1);
 	STARPU_ASSERT(forced_best != -1 || best != -1);
 
 
 	if (forced_best != -1)
 	if (forced_best != -1)

+ 7 - 8
src/sched_policies/random_policy.c

@@ -36,12 +36,13 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 	unsigned sched_ctx_id = task->sched_ctx;
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
         int worker;
         int worker;
-        if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it;
+        if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
 
 
-        while(workers->has_next(workers))
+        while(workers->has_next(workers, &it))
 	{
 	{
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
 		alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
@@ -51,9 +52,9 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 //	_STARPU_DEBUG("my rand is %e\n", random);
 //	_STARPU_DEBUG("my rand is %e\n", random);
 
 
 	double alpha = 0.0;
 	double alpha = 0.0;
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
         {
         {
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 
 
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
 		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
 		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
@@ -75,8 +76,6 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 		AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
 		AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
 	}
 	}
 #endif
 #endif
-	if (workers->deinit_cursor)
-                workers->deinit_cursor(workers);
 
 
 	/* we should now have the best worker in variable "selected" */
 	/* we should now have the best worker in variable "selected" */
 	return starpu_push_local_task(selected, task, prio);
 	return starpu_push_local_task(selected, task, prio);

+ 67 - 29
src/sched_policies/work_stealing_policy.c

@@ -32,8 +32,6 @@ struct _starpu_work_stealing_data
 	 * better decisions about which queue to select when stealing or deferring work
 	 * better decisions about which queue to select when stealing or deferring work
 	 */
 	 */
 	unsigned performed_total;
 	unsigned performed_total;
-	_starpu_pthread_mutex_t sched_mutex;
-	_starpu_pthread_cond_t sched_cond;
 	unsigned last_pop_worker;
 	unsigned last_pop_worker;
 	unsigned last_push_worker;
 	unsigned last_push_worker;
 };
 };
@@ -150,12 +148,13 @@ static unsigned select_victim_overload(unsigned sched_ctx_id)
 
 
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 
-        if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it;
+        if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
         {
         {
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 		worker_ratio = overload_metric(sched_ctx_id, worker);
 		worker_ratio = overload_metric(sched_ctx_id, worker);
 
 
 		if (worker_ratio > best_ratio)
 		if (worker_ratio > best_ratio)
@@ -165,9 +164,6 @@ static unsigned select_victim_overload(unsigned sched_ctx_id)
 		}
 		}
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-                workers->deinit_cursor(workers);
-
 	return best_worker;
 	return best_worker;
 }
 }
 
 
@@ -192,12 +188,13 @@ static unsigned select_worker_overload(unsigned sched_ctx_id)
 
 
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 
-        if(workers->init_cursor)
-                workers->init_cursor(workers);
+	struct starpu_iterator it;
+        if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
 
 
-	while(workers->has_next(workers))
+	while(workers->has_next(workers, &it))
         {
         {
-                worker = workers->get_next(workers);
+                worker = workers->get_next(workers, &it);
 
 
 		worker_ratio = overload_metric(sched_ctx_id, worker);
 		worker_ratio = overload_metric(sched_ctx_id, worker);
 
 
@@ -208,9 +205,6 @@ static unsigned select_worker_overload(unsigned sched_ctx_id)
 		}
 		}
 	}
 	}
 
 
-	if (workers->deinit_cursor)
-                workers->deinit_cursor(workers);
-
 	return best_worker;
 	return best_worker;
 }
 }
 
 
@@ -272,6 +266,25 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		return task;
 		return task;
 	}
 	}
 
 
+	int worker = 0;
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		if(worker != workerid)
+		{
+			_starpu_pthread_mutex_t *sched_mutex;
+			_starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+		}
+	}
+
+
 	/* we need to steal someone's job */
 	/* we need to steal someone's job */
 	unsigned victim = select_victim(sched_ctx_id);
 	unsigned victim = select_victim(sched_ctx_id);
 	struct _starpu_deque_jobq *victimq = ws->queue_array[victim];
 	struct _starpu_deque_jobq *victimq = ws->queue_array[victim];
@@ -288,6 +301,18 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		victimq->njobs--;
 		victimq->njobs--;
 	}
 	}
 
 
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		if(worker != workerid)
+		{
+			_starpu_pthread_mutex_t *sched_mutex;
+			_starpu_pthread_cond_t *sched_cond;
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+		}
+	}		
+
 	return task;
 	return task;
 }
 }
 
 
@@ -313,8 +338,22 @@ int ws_push_task(struct starpu_task *task)
                 return ret_val;
                 return ret_val;
         }
         }
 
 
-	_STARPU_PTHREAD_MUTEX_LOCK(&ws->sched_mutex);
-
+	unsigned worker = 0;
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	}
+	
+	
 	/* If the current thread is not a worker but
 	/* If the current thread is not a worker but
 	 * the main thread (-1), we find the better one to
 	 * the main thread (-1), we find the better one to
 	 * put task on its queue */
 	 * put task on its queue */
@@ -333,9 +372,16 @@ int ws_push_task(struct starpu_task *task)
 	_starpu_job_list_push_back(deque_queue->jobq, j);
 	_starpu_job_list_push_back(deque_queue->jobq, j);
 	deque_queue->njobs++;
 	deque_queue->njobs++;
 
 
-	_STARPU_PTHREAD_COND_SIGNAL(&ws->sched_cond);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(&ws->sched_mutex);
-
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
+		
         _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
         _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
 
 	return 0;
 	return 0;
@@ -358,8 +404,6 @@ static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworke
 		 */
 		 */
 		ws->queue_array[workerid]->nprocessed = -1;
 		ws->queue_array[workerid]->nprocessed = -1;
 		ws->queue_array[workerid]->njobs = 0;
 		ws->queue_array[workerid]->njobs = 0;
-
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &ws->sched_mutex, &ws->sched_cond);
 	}
 	}
 }
 }
 
 
@@ -374,7 +418,6 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
 	{
 	{
 		workerid = workerids[i];
 		workerid = workerids[i];
 		_starpu_destroy_deque(ws->queue_array[workerid]);
 		_starpu_destroy_deque(ws->queue_array[workerid]);
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL, NULL);
 	}
 	}
 }
 }
 
 
@@ -395,9 +438,6 @@ static void initialize_ws_policy(unsigned sched_ctx_id)
 	ws->performed_total = -1;
 	ws->performed_total = -1;
 
 
 	ws->queue_array = (struct _starpu_deque_jobq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_deque_jobq*));
 	ws->queue_array = (struct _starpu_deque_jobq**)malloc(STARPU_NMAXWORKERS*sizeof(struct _starpu_deque_jobq*));
-
-	_STARPU_PTHREAD_MUTEX_INIT(&ws->sched_mutex, NULL);
-	_STARPU_PTHREAD_COND_INIT(&ws->sched_cond, NULL);
 }
 }
 
 
 static void deinit_ws_policy(unsigned sched_ctx_id)
 static void deinit_ws_policy(unsigned sched_ctx_id)
@@ -405,8 +445,6 @@ static void deinit_ws_policy(unsigned sched_ctx_id)
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 
 	free(ws->queue_array);
 	free(ws->queue_array);
-	_STARPU_PTHREAD_MUTEX_DESTROY(&ws->sched_mutex);
-	_STARPU_PTHREAD_COND_DESTROY(&ws->sched_cond);
         free(ws);
         free(ws);
         starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
         starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 }
 }

+ 10 - 26
src/worker_collection/worker_list.c

@@ -1,29 +1,26 @@
 #include <starpu.h>
 #include <starpu.h>
 #include <pthread.h>
 #include <pthread.h>
 
 
-static unsigned list_has_next(struct starpu_sched_ctx_worker_collection *workers)
+static unsigned list_has_next(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it)
 {
 {
 	int nworkers = (int)workers->nworkers;
 	int nworkers = (int)workers->nworkers;
+	STARPU_ASSERT(it != NULL);
 
 
-	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
+	unsigned ret = it->cursor < nworkers ;
 
 
-	unsigned ret = cursor ? *cursor < nworkers : 0;
-
-	if(!ret && cursor) *cursor = 0;
+	if(!ret) it->cursor = 0;
 
 
 	return ret;
 	return ret;
 }
 }
 
 
-static int list_get_next(struct starpu_sched_ctx_worker_collection *workers)
+static int list_get_next(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it)
 {
 {
 	int *workerids = (int *)workers->workerids;
 	int *workerids = (int *)workers->workerids;
 	int nworkers = (int)workers->nworkers;
 	int nworkers = (int)workers->nworkers;
 
 
-	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
-
-	STARPU_ASSERT(*cursor < nworkers);
+	STARPU_ASSERT(it->cursor < nworkers);
 
 
-	int ret = workerids[(*cursor)++];
+	int ret = workerids[(it->cursor)++];
 
 
 	return ret;
 	return ret;
 }
 }
@@ -127,29 +124,17 @@ static void* list_init(struct starpu_sched_ctx_worker_collection *workers)
 	int *workerids = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
 	int *workerids = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
 	_init_workers(workerids);
 	_init_workers(workerids);
 
 
-	pthread_key_create(&workers->cursor_key, NULL);
-
 	return (void*)workerids;
 	return (void*)workerids;
 }
 }
 
 
 static void list_deinit(struct starpu_sched_ctx_worker_collection *workers)
 static void list_deinit(struct starpu_sched_ctx_worker_collection *workers)
 {
 {
 	free(workers->workerids);
 	free(workers->workerids);
-	pthread_key_delete(workers->cursor_key);
-}
-
-static void list_init_cursor(struct starpu_sched_ctx_worker_collection *workers)
-{
-	int *cursor = (int*)malloc(sizeof(int));
-	*cursor = 0;
-	pthread_setspecific(workers->cursor_key, (void*)cursor);
 }
 }
 
 
-static void list_deinit_cursor(struct starpu_sched_ctx_worker_collection *workers)
+static void list_init_iterator(struct starpu_sched_ctx_worker_collection *workers, struct starpu_iterator *it)
 {
 {
-	int *cursor = (int*)pthread_getspecific(workers->cursor_key);
-	*cursor = 0;
-	free(cursor);
+	*((int*)it) = 0;
 }
 }
 
 
 struct starpu_sched_ctx_worker_collection worker_list = {
 struct starpu_sched_ctx_worker_collection worker_list = {
@@ -159,8 +144,7 @@ struct starpu_sched_ctx_worker_collection worker_list = {
 	.remove = list_remove,
 	.remove = list_remove,
 	.init = list_init,
 	.init = list_init,
 	.deinit = list_deinit,
 	.deinit = list_deinit,
-	.init_cursor = list_init_cursor,
-	.deinit_cursor = list_deinit_cursor,
+	.init_iterator = list_init_iterator,
 	.type = WORKER_LIST
 	.type = WORKER_LIST
 };
 };