Forráskód Böngészése

nested scheduling contexts + worker collection keeping track of blocked workers

Andra Hugo 11 éve
szülő
commit
ab7bd6810c

+ 5 - 0
examples/Makefile.am

@@ -190,6 +190,7 @@ examplebin_PROGRAMS +=				\
 	sched_ctx/dummy_sched_with_ctx		\
 	sched_ctx/prio				\
 	sched_ctx/sched_ctx_without_sched_policy\
+	sched_ctx/nested_sched_ctxs		\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -270,6 +271,7 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/prio				\
 	sched_ctx/dummy_sched_with_ctx		\
 	sched_ctx/sched_ctx_without_sched_policy\
+	sched_ctx/nested_sched_ctx		\
 	worker_collections/worker_tree_example  \
 	worker_collections/worker_list_example  \
 	reductions/dot_product			\
@@ -925,6 +927,9 @@ sched_ctx_parallel_code_CFLAGS = \
 sched_ctx_sched_ctx_without_sched_policy_CFLAGS = \
 	$(AM_CFLAGS) -fopenmp
 
+sched_ctx_nested_sched_ctxs_CFLAGS = \
+	$(AM_CFLAGS) -fopenmp
+
 endif
 
 showcheck:

+ 194 - 0
examples/sched_ctx/nested_sched_ctxs.c

@@ -0,0 +1,194 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
+ * Copyright (C) 2010-2014  Centre National de la Recherche Scientifique
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <omp.h>
+
+#ifdef STARPU_QUICK_CHECK
+#define NTASKS 64
+#else
+#define NTASKS 100
+#endif
+
+int tasks_executed[2];
+starpu_pthread_mutex_t mut;
+
+int parallel_code(int sched_ctx)
+{
+	int i;
+	int t = 0;
+	int *cpuids = NULL;
+	int ncpuids = 0;
+	starpu_sched_ctx_get_available_cpuids(sched_ctx, &cpuids, &ncpuids);
+
+//	printf("execute task of %d threads \n", ncpuids);
+#pragma omp parallel num_threads(ncpuids)
+	{
+		starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
+// 			printf("cpu = %d ctx%d nth = %d\n", sched_getcpu(), sched_ctx, omp_get_num_threads());
+#pragma omp for
+		for(i = 0; i < NTASKS; i++)
+			t++;
+	}
+
+	free(cpuids);
+	return t;
+}
+
+static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
+{
+	int w = starpu_worker_get_id();
+	unsigned sched_ctx = (unsigned)arg;
+	int n = parallel_code(sched_ctx);
+//	printf("w %d executed %d it \n", w, n);
+}
+
+
+static struct starpu_codelet sched_ctx_codelet =
+{
+	.cpu_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
+	.model = NULL,
+	.nbuffers = 0,
+	.name = "sched_ctx"
+};
+
+int main(int argc, char **argv)
+{
+	tasks_executed[0] = 0;
+	tasks_executed[1] = 0;
+	int ntasks = NTASKS;
+	int ret, j, k;
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		return 77;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_pthread_mutex_init(&mut, NULL);
+	int nprocs1 = 1;
+	int nprocs2 = 1;
+	int *procs1, *procs2;
+
+#ifdef STARPU_USE_CPU
+	unsigned ncpus =  starpu_cpu_worker_get_count();
+	procs1 = (int*)malloc(ncpus*sizeof(int));
+	procs2 = (int*)malloc(ncpus*sizeof(int));
+	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
+
+	nprocs1 = ncpus/2;
+	nprocs2 =  nprocs1;
+	k = 0;
+	for(j = nprocs1; j < nprocs1+nprocs2; j++)
+		procs2[k++] = j;
+#else
+	procs1 = (int*)malloc(nprocs1*sizeof(int));
+	procs2 = (int*)malloc(nprocs2*sizeof(int));
+	procs1[0] = 0;
+	procs2[0] = 0;
+#endif
+
+	/*create contexts however you want*/
+	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "eager", 0);
+	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
+
+	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
+//	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
+
+	int nprocs3 = nprocs1/2;
+	int nprocs4 = nprocs1/2;
+	int nprocs5 = nprocs2/2;
+	int nprocs6 = nprocs2/2;
+	int procs3[nprocs3];
+	int procs4[nprocs4];
+	int procs5[nprocs5];
+	int procs6[nprocs6];
+
+	k = 0;
+	for(j = 0; j < nprocs3; j++)
+		procs3[k++] = procs1[j];
+	k = 0;
+	for(j = nprocs3; j < nprocs3+nprocs4; j++)
+		procs4[k++] = procs1[j];
+
+	k = 0;
+	for(j = 0; j < nprocs5; j++)
+		procs5[k++] = procs2[j];
+	k = 0;
+	for(j = nprocs5; j < nprocs5+nprocs6; j++)
+		procs6[k++] = procs2[j];
+
+	unsigned sched_ctx3 = starpu_sched_ctx_create(procs3, nprocs3, "ctx3", STARPU_SCHED_CTX_NESTED, sched_ctx1, 0);
+	unsigned sched_ctx4 = starpu_sched_ctx_create(procs4, nprocs4, "ctx4", STARPU_SCHED_CTX_NESTED, sched_ctx1, 0);
+
+	unsigned sched_ctx5 = starpu_sched_ctx_create(procs5, nprocs5, "ctx5", STARPU_SCHED_CTX_NESTED, sched_ctx2, 0);
+	unsigned sched_ctx6 = starpu_sched_ctx_create(procs6, nprocs6, "ctx6", STARPU_SCHED_CTX_NESTED, sched_ctx2, 0);
+
+
+	int i;
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx1;
+		
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	for (i = 0; i < ntasks; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = sched_ctx2;
+
+		/*submit tasks to context*/
+		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+
+	/* tell starpu when you finished submitting tasks to this context
+	   in order to allow moving resources from this context to the inheritor one
+	   when its corresponding tasks finished executing */
+
+
+
+	/* wait for all tasks at the end*/
+	starpu_task_wait_for_all();
+
+	starpu_sched_ctx_delete(sched_ctx3);
+	starpu_sched_ctx_delete(sched_ctx4);
+
+	starpu_sched_ctx_delete(sched_ctx5);
+	starpu_sched_ctx_delete(sched_ctx6);
+
+	starpu_sched_ctx_delete(sched_ctx1);
+	starpu_sched_ctx_delete(sched_ctx2);
+
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed[0], NTASKS);
+	printf("ctx%d: tasks starpu executed %d out of %d\n", sched_ctx2, tasks_executed[1], NTASKS);
+	starpu_shutdown();
+
+	return 0;
+}

+ 8 - 0
include/starpu_sched_ctx.h

@@ -29,6 +29,7 @@ extern "C"
 #define STARPU_SCHED_CTX_POLICY_MIN_PRIO	 (3<<16)
 #define STARPU_SCHED_CTX_POLICY_MAX_PRIO	 (4<<16)
 #define STARPU_SCHED_CTX_HIERARCHY_LEVEL         (5<<16)
+#define STARPU_SCHED_CTX_NESTED                  (6<<16)
 
 unsigned starpu_sched_ctx_create(int *workerids_ctx, int nworkers_ctx, const char *sched_ctx_name, ...);
 
@@ -127,6 +128,13 @@ int starpu_sched_ctx_book_workers_for_task(unsigned sched_ctx_id, int *workerids
 
 void starpu_sched_ctx_unbook_workers_for_task(unsigned sched_ctx_id, int master);
 
+/* return the first context (child of sched_ctx_id) where the workerid is master */
+unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id);
+
+void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double flops);
+
+void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx);
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 void starpu_sched_ctx_call_pushed_task_cb(int workerid, unsigned sched_ctx_id);
 #endif /* STARPU_USE_SC_HYPERVISOR */

+ 7 - 0
include/starpu_worker.h

@@ -57,10 +57,15 @@ struct starpu_worker_collection
 {
 	void *workerids;
 	unsigned nworkers;
+	void *masters;
+	unsigned nmasters;
 	int present[STARPU_NMAXWORKERS];
+	int is_master[STARPU_NMAXWORKERS];
 	enum starpu_worker_collection_type type;
 	unsigned (*has_next)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	int (*get_next)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
+	unsigned (*has_next_master)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
+	int (*get_next_master)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	int (*add)(struct starpu_worker_collection *workers, int worker);
 	int (*remove)(struct starpu_worker_collection *workers, int worker);
 	void (*init)(struct starpu_worker_collection *workers);
@@ -109,6 +114,8 @@ int starpu_worker_get_mp_nodeid(int id);
 struct starpu_tree* starpu_workers_get_tree(void);
 
 unsigned starpu_worker_get_sched_ctx_list(int worker, unsigned **sched_ctx);
+
+unsigned starpu_worker_is_slave(int workerid);
 #ifdef __cplusplus
 }
 #endif

+ 54 - 0
src/core/sched_ctx.c

@@ -533,6 +533,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	int max_prio = 0;
 	struct starpu_sched_policy *sched_policy = NULL;
 	unsigned hierarchy_level = 0;
+	unsigned nesting_sched_ctx = STARPU_NMAX_SCHED_CTXS;
 
 	va_start(varg_list, sched_ctx_name);
 	while ((arg_type = va_arg(varg_list, int)) != 0)
@@ -561,6 +562,10 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 		{
 			hierarchy_level = va_arg(varg_list, unsigned);
 		}
+		else if (arg_type == STARPU_SCHED_CTX_NESTED)
+		{
+			nesting_sched_ctx = va_arg(varg_list, unsigned);
+		}
 		else
 		{
 			STARPU_ABORT_MSG("Unrecognized argument %d\n", arg_type);
@@ -572,6 +577,7 @@ unsigned starpu_sched_ctx_create(int *workerids, int nworkers, const char *sched
 	struct _starpu_sched_ctx *sched_ctx = NULL;
 	sched_ctx = _starpu_create_sched_ctx(sched_policy, workerids, nworkers, 0, sched_ctx_name, min_prio_set, min_prio, max_prio_set, max_prio);
 	sched_ctx->hierarchy_level = hierarchy_level;
+	sched_ctx->nesting_sched_ctx = nesting_sched_ctx;
 
 	_starpu_unlock_mutex_if_prev_locked();
 	int *added_workerids;
@@ -1142,6 +1148,8 @@ struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsig
 	case STARPU_WORKER_TREE:
 		sched_ctx->workers->has_next = worker_tree.has_next;
 		sched_ctx->workers->get_next = worker_tree.get_next;
+		sched_ctx->workers->has_next_master = worker_tree.has_next_master;
+		sched_ctx->workers->get_next_master = worker_tree.get_next_master;
 		sched_ctx->workers->add = worker_tree.add;
 		sched_ctx->workers->remove = worker_tree.remove;
 		sched_ctx->workers->init = worker_tree.init;
@@ -1154,6 +1162,8 @@ struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsig
 	default:
 		sched_ctx->workers->has_next = worker_list.has_next;
 		sched_ctx->workers->get_next = worker_list.get_next;
+		sched_ctx->workers->has_next_master = worker_list.has_next_master;
+		sched_ctx->workers->get_next_master = worker_list.get_next_master;
 		sched_ctx->workers->add = worker_list.add;
 		sched_ctx->workers->remove = worker_list.remove;
 		sched_ctx->workers->init = worker_list.init;
@@ -1615,6 +1625,44 @@ void starpu_sched_ctx_bind_current_thread_to_cpuid(unsigned cpuid STARPU_ATTRIBU
 
 }
 
+unsigned starpu_sched_ctx_worker_is_master_for_child_ctx(int workerid, unsigned sched_ctx_id)
+{
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_sched_ctx_list *l = NULL;
+	struct _starpu_sched_ctx *sched_ctx = NULL;
+	for (l = worker->sched_ctx_list; l; l = l->next)
+	{ 
+		 sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+		if(sched_ctx-> main_master == workerid && sched_ctx->nesting_sched_ctx == sched_ctx_id)
+			return sched_ctx->id;
+	}
+	return STARPU_NMAX_SCHED_CTXS;
+
+}
+
+void starpu_sched_ctx_revert_task_counters(unsigned sched_ctx_id, double flops)
+{
+        _starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx_id);
+        _starpu_decrement_nready_tasks_of_sched_ctx(sched_ctx_id, flops);
+}
+
+void starpu_sched_ctx_move_task_to_ctx(struct starpu_task *task, unsigned sched_ctx)
+{
+	int workerid = starpu_worker_get_id();
+	struct _starpu_worker *worker  = NULL;
+	if(workerid != -1)
+	{
+		worker = _starpu_get_worker_struct(workerid);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+	}
+
+	task->sched_ctx = sched_ctx;
+	_starpu_task_submit_nodeps(task);
+
+	if(workerid != -1)
+		STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+}
+
 static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workerid)
 {
 	int s;
@@ -1630,6 +1678,7 @@ static unsigned _worker_sleeping_in_other_ctx(unsigned sched_ctx_id, int workeri
 	return 0;
 
 }
+
 static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *workerids, int nworkers, int master)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -1662,6 +1711,8 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *w
 
 void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	worker->slave = 1;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int master = sched_ctx->master[workerid];
 	sem_post(&sched_ctx->fall_asleep_sem[master]);
@@ -1676,6 +1727,9 @@ void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid
 	sem_post(&sched_ctx->wake_up_sem[master]);
 	sched_ctx->sleeping[workerid] = 0;
 	sched_ctx->master[workerid] = -1;
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	worker->slave = 0;
+
 	return;
 }
 

+ 3 - 0
src/core/sched_ctx.h

@@ -147,6 +147,9 @@ struct _starpu_sched_ctx
 	/* bool indicating if the workers is sleeping in this ctx */
 	unsigned sleeping[STARPU_NMAXWORKERS];
 
+	/* ctx nesting the current ctx */
+	unsigned nesting_sched_ctx;
+
 };
 
 struct _starpu_machine_config;

+ 6 - 0
src/core/workers.c

@@ -467,6 +467,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
+	workerarg->slave = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1374,6 +1375,11 @@ unsigned starpu_worker_get_count(void)
 	return config.topology.nworkers;
 }
 
+unsigned starpu_worker_is_slave(int workerid)
+{
+	return config.workers[workerid].slave;
+}
+
 int starpu_worker_get_count_by_type(enum starpu_worker_archtype type)
 {
 	switch (type)

+ 3 - 0
src/core/workers.h

@@ -112,6 +112,9 @@ LIST_TYPE(_starpu_worker,
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 
+	/* bool to indicate if the worker is slave in a ctx */
+	unsigned slave;
+
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;
 #endif /* __GLIBC__ */

+ 17 - 10
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -286,6 +286,13 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
+	unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(best_workerid, sched_ctx_id);
+        if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+        {
+		starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+                starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
+                return 0;
+        }
 
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[best_workerid];
 
@@ -405,9 +412,9 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 
-	while(workers->has_next(workers, &it))
+	while(workers->has_next_master(workers, &it))
 	{
-		worker = workers->get_next(workers, &it);
+		worker = workers->get_next_master(workers, &it);
 		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker);
@@ -543,9 +550,9 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 
-	while(workers->has_next(workers, &it))
+	while(workers->has_next_master(workers, &it))
 	{
-		worker = workers->get_next(workers, &it);
+		worker = workers->get_next_master(workers, &it);
 
 		struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker);
@@ -692,10 +699,6 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 
 	double fitness[nworkers_ctx][STARPU_MAXIMPLEMENTATIONS];
 
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
 	compute_all_performance_predictions(task,
 					    nworkers_ctx,
 					    local_task_length,
@@ -712,9 +715,13 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	unsigned nimpl;
 	if (forced_best == -1)
 	{
-		while(workers->has_next(workers, &it))
+		struct starpu_sched_ctx_iterator it;
+		if(workers->init_iterator)
+			workers->init_iterator(workers, &it);
+
+		while(workers->has_next_master(workers, &it))
 		{
-			worker = workers->get_next(workers, &it);
+			worker = workers->get_next_master(workers, &it);
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 				if (!starpu_worker_can_execute_task(worker, task, nimpl))

+ 13 - 2
src/sched_policies/eager_central_policy.c

@@ -94,9 +94,9 @@ static int push_task_eager_policy(struct starpu_task *task)
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 	
-	while(workers->has_next(workers, &it))
+	while(workers->has_next_master(workers, &it))
 	{
-		worker = workers->get_next(workers, &it);
+		worker = workers->get_next_master(workers, &it);
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		if (!starpu_bitmap_get(data->waiters, worker))
@@ -167,6 +167,17 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
+	if(task)
+	{
+		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
+		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
+		{
+			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
+			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx);
+			return NULL;
+		}
+	}
+
 	return task;
 }
 

+ 65 - 3
src/worker_collection/worker_list.c

@@ -42,6 +42,30 @@ static int list_get_next(struct starpu_worker_collection *workers, struct starpu
 	return ret;
 }
 
+static unsigned list_has_next_master(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int nworkers = workers->nmasters;
+	STARPU_ASSERT(it != NULL);
+
+	unsigned ret = it->cursor < nworkers ;
+
+	if(!ret) it->cursor = 0;
+
+	return ret;
+}
+
+static int list_get_next_master(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int *workerids = (int *)workers->masters;
+	int nworkers = (int)workers->nmasters;
+
+	STARPU_ASSERT_MSG(it->cursor < nworkers, "cursor %d nworkers %d\n", it->cursor, nworkers);
+
+	int ret = workerids[it->cursor++];
+
+	return ret;
+}
+
 static unsigned _worker_belongs_to_ctx(struct starpu_worker_collection *workers, int workerid)
 {
 	int *workerids = (int *)workers->workerids;
@@ -108,9 +132,12 @@ static int list_remove(struct starpu_worker_collection *workers, int worker)
 {
 	int *workerids = (int *)workers->workerids;
 	unsigned nworkers = workers->nworkers;
+
+	int *masters = (int *)workers->masters;
+	unsigned nmasters = workers->nmasters;
 	
-	int found_worker = -1;
 	unsigned i;
+	int found_worker = -1;
 	for(i = 0; i < nworkers; i++)
 	{
 		if(workerids[i] == worker)
@@ -125,13 +152,29 @@ static int list_remove(struct starpu_worker_collection *workers, int worker)
 	if(found_worker != -1)
 		workers->nworkers--;
 
+	int found_master = -1;
+	for(i = 0; i < nmasters; i++)
+	{
+		if(masters[i] == worker)
+		{
+			masters[i] = -1;
+			found_master = worker;
+			break;
+		}
+	}
+
+	_rearange_workerids(masters, nmasters);
+	if(found_master != -1)
+		workers->nmasters--;
+	printf("rem %d\n", found_worker);
 	return found_worker;
 }
 
 static void _init_workers(int *workerids)
 {
 	unsigned i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	int nworkers = starpu_worker_get_count();
+	for(i = 0; i < nworkers; i++)
 		workerids[i] = -1;
 	return;
 }
@@ -139,10 +182,14 @@ static void _init_workers(int *workerids)
 static void list_init(struct starpu_worker_collection *workers)
 {
 	int *workerids = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
+	int *masters = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
 	_init_workers(workerids);
+	_init_workers(masters);
 
 	workers->workerids = (void*)workerids;
 	workers->nworkers = 0;
+	workers->masters = (void*)masters;
+	workers->nmasters = 0;
 
 	return;
 }
@@ -150,17 +197,32 @@ static void list_init(struct starpu_worker_collection *workers)
 static void list_deinit(struct starpu_worker_collection *workers)
 {
 	free(workers->workerids);
+	free(workers->masters);
 }
 
-static void list_init_iterator(struct starpu_worker_collection *workers STARPU_ATTRIBUTE_UNUSED, struct starpu_sched_ctx_iterator *it)
+static void list_init_iterator(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
 {
 	it->cursor = 0;
+
+	int *workerids = (int *)workers->workerids;
+	unsigned nworkers = workers->nworkers;
+	unsigned i;
+	int nm = 0;
+	for(i = 0;  i < nworkers; i++)
+	{
+		if(!starpu_worker_is_slave(workerids[i]))
+			((int*)workers->masters)[nm++] = workerids[i];
+	}
+	workers->nmasters = nm;
+
 }
 
 struct starpu_worker_collection worker_list =
 {
 	.has_next = list_has_next,
 	.get_next = list_get_next,
+	.has_next_master = list_has_next_master,
+	.get_next_master = list_get_next_master,
 	.add = list_add,
 	.remove = list_remove,
 	.init = list_init,

+ 84 - 4
src/worker_collection/worker_tree.c

@@ -89,6 +89,75 @@ static int tree_get_next(struct starpu_worker_collection *workers, struct starpu
 	return ret;
 }
 
+static unsigned tree_has_next_master(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	STARPU_ASSERT(it != NULL);
+	if(workers->nworkers == 0)
+		return 0;
+
+	struct starpu_tree *tree = (struct starpu_tree*)workers->workerids;
+	struct starpu_tree *neighbour = starpu_tree_get_neighbour(tree, (struct starpu_tree*)it->value, it->visited, workers->is_master);
+	
+	if(!neighbour)
+	{
+		starpu_tree_reset_visited(tree, it->visited);
+		it->value = NULL;
+		it->possible_value = NULL;
+		return 0;
+	}
+	int id = -1;
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers = _starpu_worker_get_workerids(neighbour->id, workerids);
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		if(!it->visited[workerids[w]] && workers->is_master[workerids[w]])
+		{
+			id = workerids[w];
+			it->possible_value = neighbour;
+		}
+	}
+
+	STARPU_ASSERT_MSG(id != -1, "bind id (%d) for workerid (%d) not correct", neighbour->id, id);
+
+	return 1;
+}
+
+static int tree_get_next_master(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int ret = -1;
+	
+	struct starpu_tree *tree = (struct starpu_tree *)workers->workerids;
+	struct starpu_tree *neighbour = NULL;
+	if(it->possible_value)
+	{
+		neighbour = it->possible_value;
+		it->possible_value = NULL;
+	}
+	else
+		neighbour = starpu_tree_get_neighbour(tree, (struct starpu_tree*)it->value, it->visited, workers->is_master);
+	
+	STARPU_ASSERT_MSG(neighbour, "no element anymore");
+	
+	
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers = _starpu_worker_get_workerids(neighbour->id, workerids);
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		if(!it->visited[workerids[w]] && workers->is_master[workerids[w]])
+		{
+			ret = workerids[w];
+			it->visited[workerids[w]] = 1;
+			it->value = neighbour;
+		}
+	}
+	STARPU_ASSERT_MSG(ret != -1, "bind id not correct");
+
+	return ret;
+}
+
+
 static int tree_add(struct starpu_worker_collection *workers, int worker)
 {
 	struct starpu_tree *tree = (struct starpu_tree *)workers->workerids;
@@ -111,6 +180,7 @@ static int tree_remove(struct starpu_worker_collection *workers, int worker)
 	if(workers->present[worker])
 	{
 		workers->present[worker] = 0;
+		workers->is_master[worker] = 0;
 		workers->nworkers--;
 		return worker;
 	}
@@ -122,10 +192,14 @@ static void tree_init(struct starpu_worker_collection *workers)
 {
 	workers->workerids = (void*)starpu_workers_get_tree();
 	workers->nworkers = 0;
-	
+
 	int i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	int nworkers = starpu_worker_get_count();
+	for(i = 0; i < nworkers; i++)
+	{
 		workers->present[i] = 0;
+		workers->is_master[i] = 0;
+	}
 	
 	return;
 }
@@ -135,19 +209,25 @@ static void tree_deinit(struct starpu_worker_collection *workers)
 //	free(workers->workerids);
 }
 
-static void tree_init_iterator(struct starpu_worker_collection *workers STARPU_ATTRIBUTE_UNUSED, struct starpu_sched_ctx_iterator *it)
+static void tree_init_iterator(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
 {
 	it->value = NULL;
 	it->possible_value = NULL;
 	int i;
-	for(i = 0; i < STARPU_NMAXWORKERS; i++)
+	int nworkers = starpu_worker_get_count();
+	for(i = 0; i < nworkers; i++)
+	{
+		workers->is_master[i] = (workers->present[i] && !starpu_worker_is_slave(i));
 		it->visited[i] = 0;
+	}
 }
 
 struct starpu_worker_collection worker_tree =
 {
 	.has_next = tree_has_next,
 	.get_next = tree_get_next,
+	.has_next_master = tree_has_next_master,
+	.get_next_master = tree_get_next_master,
 	.add = tree_add,
 	.remove = tree_remove,
 	.init = tree_init,