Просмотр исходного кода

deal with shared workers between contexts having no starpu scheduling policy (allow the strategy to schedule tasks on not blocked shared workers)

Andra Hugo лет назад: 10
Родитель
Сommit
43f256f1c4

+ 2 - 0
examples/sched_ctx/parallel_tasks_reuse_handle.c

@@ -204,6 +204,8 @@ int main(int argc, char **argv)
 				    0);
 		t->cl_arg_free = 1;
 		t->destroy = 1;
+		t->possibly_parallel = 1;
+
 		ret=starpu_task_submit(t);
 		if (ret == -ENODEV)
 			goto out;

+ 1 - 1
examples/sched_ctx/sched_ctx_without_sched_policy.c

@@ -132,7 +132,7 @@ int main(int argc, char **argv)
 
 		task->cl = &sched_ctx_codelet;
 		task->cl_arg = (void*)(uintptr_t) sched_ctx1;
-
+		
 		/*submit tasks to context*/
 		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
 

+ 9 - 1
include/starpu_worker.h

@@ -51,17 +51,23 @@ enum starpu_worker_collection_type
 	STARPU_WORKER_LIST
 };
 
+
 struct starpu_worker_collection
 {
 	void *workerids;
 	unsigned nworkers;
+	void *unblocked_workers;
+	unsigned nunblocked_workers;
 	void *masters;
 	unsigned nmasters;
 	int present[STARPU_NMAXWORKERS];
+	int is_unblocked[STARPU_NMAXWORKERS];
 	int is_master[STARPU_NMAXWORKERS];
 	enum starpu_worker_collection_type type;
 	unsigned (*has_next)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	int (*get_next)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
+	unsigned (*has_next_unblocked_worker)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
+	int (*get_next_unblocked_worker)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	unsigned (*has_next_master)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	int (*get_next_master)(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it);
 	int (*add)(struct starpu_worker_collection *workers, int worker);
@@ -113,7 +119,9 @@ struct starpu_tree* starpu_workers_get_tree(void);
 
 unsigned starpu_worker_get_sched_ctx_list(int worker, unsigned **sched_ctx);
 
-unsigned starpu_worker_is_slave(int workerid);
+unsigned starpu_worker_is_blocked(int workerid);
+
+unsigned starpu_worker_is_slave_somewhere(int workerid);
 
 char *starpu_worker_get_type_as_string(enum starpu_worker_archtype type);
 

+ 7 - 2
src/core/sched_ctx.c

@@ -1465,6 +1465,8 @@ struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsig
 	case STARPU_WORKER_TREE:
 		sched_ctx->workers->has_next = worker_tree.has_next;
 		sched_ctx->workers->get_next = worker_tree.get_next;
+		sched_ctx->workers->has_next_unblocked_worker = worker_tree.has_next_unblocked_worker;
+		sched_ctx->workers->get_next_unblocked_worker = worker_tree.get_next_unblocked_worker;
 		sched_ctx->workers->has_next_master = worker_tree.has_next_master;
 		sched_ctx->workers->get_next_master = worker_tree.get_next_master;
 		sched_ctx->workers->add = worker_tree.add;
@@ -1479,6 +1481,8 @@ struct starpu_worker_collection* starpu_sched_ctx_create_worker_collection(unsig
 	default:
 		sched_ctx->workers->has_next = worker_list.has_next;
 		sched_ctx->workers->get_next = worker_list.get_next;
+		sched_ctx->workers->has_next_unblocked_worker = worker_list.has_next_unblocked_worker;
+		sched_ctx->workers->get_next_unblocked_worker = worker_list.get_next_unblocked_worker;
 		sched_ctx->workers->has_next_master = worker_list.has_next_master;
 		sched_ctx->workers->get_next_master = worker_list.get_next_master;
 		sched_ctx->workers->add = worker_list.add;
@@ -2046,7 +2050,7 @@ static void _starpu_sched_ctx_get_workers_to_sleep(unsigned sched_ctx_id, int *w
 void _starpu_sched_ctx_signal_worker_blocked(unsigned sched_ctx_id, int workerid)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	worker->slave = 1;
+	worker->blocked = 1;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	sched_ctx->sleeping[workerid] = 1;
 	int master = sched_ctx->master[workerid];
@@ -2063,7 +2067,7 @@ void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid
 	sched_ctx->sleeping[workerid] = 0;
 	sched_ctx->master[workerid] = -1;
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	worker->slave = 0;
+	worker->blocked = 0;
 
 	return;
 }
@@ -2125,6 +2129,7 @@ void starpu_sched_ctx_get_available_cpuids(unsigned sched_ctx_id, int **cpuids,
 	int workerid;
 
 	workers->init_iterator(workers, &it);
+
 	while(workers->has_next(workers, &it))
 	{
 		workerid = workers->get_next(workers, &it);

+ 1 - 1
src/core/sched_ctx.h

@@ -165,7 +165,7 @@ struct _starpu_sched_ctx
 	   the threads to sleep in order to replace them with other threads or leave
 	   them awake & use them in the parallel code*/
 	unsigned awake_workers;
-	
+
 	/* function called when initializing the scheduler */
 	void (*init_sched)();
 };

+ 9 - 8
src/core/workers.c

@@ -311,11 +311,6 @@ int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *t
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
 
-	/* if the task can't be parallel don't submit it to a ctx */
-	unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx->id);
-        if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
-		if(!task->possibly_parallel) return 0;
-
 	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
 	if(sched_ctx->parallel_sect[workerid]) return 0;
 
@@ -564,7 +559,8 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->reverse_phase[1] = 0;
 	workerarg->pop_ctx_priority = 1;
 	workerarg->sched_mutex_locked = 0;
-	workerarg->slave = 0;
+	workerarg->blocked = 0;
+	workerarg->is_slave_somewhere = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1581,9 +1577,14 @@ unsigned starpu_worker_get_count(void)
 	return config.topology.nworkers;
 }
 
-unsigned starpu_worker_is_slave(int workerid)
+unsigned starpu_worker_is_blocked(int workerid)
+{
+	return config.workers[workerid].blocked;
+}
+
+unsigned starpu_worker_is_slave_somewhere(int workerid)
 {
-	return config.workers[workerid].slave;
+	return config.workers[workerid].is_slave_somewhere;
 }
 
 int starpu_worker_get_count_by_type(enum starpu_worker_archtype type)

+ 4 - 1
src/core/workers.h

@@ -122,8 +122,11 @@ LIST_TYPE(_starpu_worker,
 	/* flag to know if sched_mutex is locked or not */
 	unsigned sched_mutex_locked;
 
+	/* bool to indicate if the worker is blocked in a ctx */
+	unsigned blocked;
+
 	/* bool to indicate if the worker is slave in a ctx */
-	unsigned slave;
+	unsigned is_slave_somewhere;
 
 #ifdef __GLIBC__
 	cpu_set_t cpu_set;

+ 2 - 2
src/drivers/driver_common/driver_common.c

@@ -349,8 +349,8 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
 			{
 				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
-				if(!sched_ctx->sched_policy && sched_ctx->awake_workers) 
-					worker->slave = sched_ctx->main_master != workerid;
+				if(!sched_ctx->sched_policy)
+					worker->is_slave_somewhere = sched_ctx->main_master != workerid;
 
 				if(sched_ctx->parallel_sect[workerid])
 				{

+ 50 - 5
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -476,8 +476,23 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 	struct starpu_sched_ctx_iterator it;
 
 	workers->init_iterator(workers, &it);
-	while(workers->has_next_master(workers, &it))
+	while(1)
 	{
+		if(task->possibly_parallel)
+		{
+			if(workers->has_next_master(workers, &it))
+				worker = workers->get_next_master(workers, &it);
+			else
+				break;	
+		}
+		else
+		{
+			if(workers->has_next_unblocked_worker(workers, &it))
+				worker = workers->get_next_unblocked_worker(workers, &it);
+			else
+				break;	
+		}
+
 		worker = workers->get_next_master(workers, &it);
 		struct _starpu_fifo_taskq *fifo  = dt->queue_array[worker];
 		unsigned memory_node = starpu_worker_get_memory_node(worker);
@@ -622,9 +637,23 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	struct starpu_sched_ctx_iterator it;
 
 	workers->init_iterator(workers, &it);
-	while(workers->has_next_master(workers, &it))
+	while(1)
 	{
-		worker = workers->get_next_master(workers, &it);
+		if(task->possibly_parallel)
+		{
+			if(workers->has_next_master(workers, &it))
+				worker = workers->get_next_master(workers, &it);
+			else
+				break;	
+		}
+		else
+		{
+			if(workers->has_next_unblocked_worker(workers, &it))
+				worker = workers->get_next_unblocked_worker(workers, &it);
+			else
+				break;	
+		}
+
 
 		struct _starpu_fifo_taskq *fifo = dt->queue_array[worker];
 		struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker, sched_ctx_id);
@@ -818,11 +847,27 @@ static double _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned
 		struct starpu_sched_ctx_iterator it;
 
 		workers->init_iterator(workers, &it);
-		while(workers->has_next_master(workers, &it))
+		
+		while(1)
 		{
-			worker = workers->get_next_master(workers, &it);
+			if(task->possibly_parallel)
+			{
+				if(workers->has_next_master(workers, &it))
+					worker = workers->get_next_master(workers, &it);
+				else
+					break;	
+			}
+			else
+			{
+				if(workers->has_next_unblocked_worker(workers, &it))
+					worker = workers->get_next_unblocked_worker(workers, &it);
+				else
+					break;	
+			}
+
 			if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
 				continue;
+
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
 				if (!(impl_mask & (1U << nimpl)))

+ 16 - 2
src/sched_policies/eager_central_policy.c

@@ -96,9 +96,23 @@ static int push_task_eager_policy(struct starpu_task *task)
 #endif
 
 	workers->init_iterator(workers, &it);
-	while(workers->has_next_master(workers, &it))
+	while(1)
 	{
-		worker = workers->get_next_master(workers, &it);
+		if(task->possibly_parallel)
+		{
+			if(workers->has_next_master(workers, &it))
+				worker = workers->get_next_master(workers, &it);
+			else
+				break;	
+		}
+		else
+		{
+			if(workers->has_next_unblocked_worker(workers, &it))
+				worker = workers->get_next_unblocked_worker(workers, &it);
+			else
+				break;	
+		}
+
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		if (!starpu_bitmap_get(data->waiters, worker))

+ 17 - 4
src/sched_policies/eager_central_priority_policy.c

@@ -144,9 +144,22 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 #endif
 
 	workers->init_iterator(workers, &it);
-	while(workers->has_next(workers, &it))
+	while(1)
 	{
-		worker = workers->get_next(workers, &it);
+		if(task->possibly_parallel)
+		{
+			if(workers->has_next_master(workers, &it))
+				worker = workers->get_next_master(workers, &it);
+			else
+				break;	
+		}
+		else
+		{
+			if(workers->has_next_unblocked_worker(workers, &it))
+				worker = workers->get_next(workers, &it);
+			else
+				break;	
+		}
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
 		if (!starpu_bitmap_get(data->waiters, worker))
@@ -257,9 +270,9 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		struct starpu_sched_ctx_iterator it;
 
 		workers->init_iterator(workers, &it);
-		while(workers->has_next(workers, &it))
+		while(workers->has_next_unblocked_worker(workers, &it))
 		{
-			worker = workers->get_next(workers, &it);
+			worker = workers->get_next_unblocked_worker(workers, &it);
 			if(worker != workerid)
 			{
 #ifdef STARPU_NON_BLOCKING_DRIVERS

+ 56 - 3
src/worker_collection/worker_list.c

@@ -42,6 +42,30 @@ static int list_get_next(struct starpu_worker_collection *workers, struct starpu
 	return ret;
 }
 
+static unsigned list_has_next_unblocked_worker(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int nworkers = workers->nunblocked_workers;
+	STARPU_ASSERT(it != NULL);
+
+	unsigned ret = it->cursor < nworkers ;
+
+	if(!ret) it->cursor = 0;
+
+	return ret;
+}
+
+static int list_get_next_unblocked_worker(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int *workerids = (int *)workers->unblocked_workers;
+	int nworkers = (int)workers->nunblocked_workers;
+
+	STARPU_ASSERT(it->cursor < nworkers);
+
+	int ret = workerids[it->cursor++];
+
+	return ret;
+}
+
 static unsigned list_has_next_master(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
 {
 	int nworkers = workers->nmasters;
@@ -133,6 +157,9 @@ static int list_remove(struct starpu_worker_collection *workers, int worker)
 	int *workerids = (int *)workers->workerids;
 	unsigned nworkers = workers->nworkers;
 
+	int *unblocked_workers = (int *)workers->unblocked_workers;
+	unsigned nunblocked_workers = workers->nunblocked_workers;
+
 	int *masters = (int *)workers->masters;
 	unsigned nmasters = workers->nmasters;
 	
@@ -152,6 +179,21 @@ static int list_remove(struct starpu_worker_collection *workers, int worker)
 	if(found_worker != -1)
 		workers->nworkers--;
 
+	int found_unblocked = -1;
+	for(i = 0; i < nunblocked_workers; i++)
+	{
+		if(unblocked_workers[i] == worker)
+		{
+			unblocked_workers[i] = -1;
+			found_unblocked = worker;
+			break;
+		}
+	}
+
+	_rearange_workerids(unblocked_workers, nunblocked_workers);
+	if(found_unblocked != -1)
+		workers->nunblocked_workers--;
+
 	int found_master = -1;
 	for(i = 0; i < nmasters; i++)
 	{
@@ -182,12 +224,16 @@ static void _init_workers(int *workerids)
 static void list_init(struct starpu_worker_collection *workers)
 {
 	int *workerids = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
+	int *unblocked_workers = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
 	int *masters = (int*)malloc(STARPU_NMAXWORKERS * sizeof(int));
 	_init_workers(workerids);
+	_init_workers(unblocked_workers);
 	_init_workers(masters);
 
 	workers->workerids = (void*)workerids;
 	workers->nworkers = 0;
+	workers->unblocked_workers = (void*)unblocked_workers;
+	workers->nunblocked_workers = 0;
 	workers->masters = (void*)masters;
 	workers->nmasters = 0;
 
@@ -197,6 +243,7 @@ static void list_init(struct starpu_worker_collection *workers)
 static void list_deinit(struct starpu_worker_collection *workers)
 {
 	free(workers->workerids);
+	free(workers->unblocked_workers);
 	free(workers->masters);
 }
 
@@ -207,11 +254,15 @@ static void list_init_iterator(struct starpu_worker_collection *workers, struct
 	int *workerids = (int *)workers->workerids;
 	unsigned nworkers = workers->nworkers;
 	unsigned i;
-	int nm = 0;
+	int nm = 0, nub = 0;
 	for(i = 0;  i < nworkers; i++)
 	{
-		if(!starpu_worker_is_slave(workerids[i]))
-			((int*)workers->masters)[nm++] = workerids[i];
+		if(!starpu_worker_is_blocked(workerids[i]))
+		{
+			((int*)workers->unblocked_workers)[nub++] = workerids[i];
+			if(!starpu_worker_is_slave_somewhere(workerids[i]))
+				((int*)workers->masters)[nm++] = workerids[i];
+		}
 	}
 	workers->nmasters = nm;
 
@@ -221,6 +272,8 @@ struct starpu_worker_collection worker_list =
 {
 	.has_next = list_has_next,
 	.get_next = list_get_next,
+	.has_next_unblocked_worker = list_has_next_unblocked_worker,
+	.get_next_unblocked_worker = list_get_next_unblocked_worker,
 	.has_next_master = list_has_next_master,
 	.get_next_master = list_get_next_master,
 	.add = list_add,

+ 75 - 2
src/worker_collection/worker_tree.c

@@ -77,7 +77,75 @@ static int tree_get_next(struct starpu_worker_collection *workers, struct starpu
 	int w;
 	for(w = 0; w < nworkers; w++)
 	{
-		if(!it->visited[workerids[w]] && workers->present[workerids[w]])
+		if(!it->visited[workerids[w]] && workers->present[workerids[w]] )
+		{
+			ret = workerids[w];
+			it->visited[workerids[w]] = 1;
+			it->value = neighbour;
+		}
+	}
+	STARPU_ASSERT_MSG(ret != -1, "bind id not correct");
+
+	return ret;
+}
+
+static unsigned tree_has_next_unblocked_worker(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	STARPU_ASSERT(it != NULL);
+	if(workers->nworkers == 0)
+		return 0;
+
+	struct starpu_tree *tree = (struct starpu_tree*)workers->workerids;
+	struct starpu_tree *neighbour = starpu_tree_get_neighbour(tree, (struct starpu_tree*)it->value, it->visited, workers->present);
+
+	if(!neighbour)
+	{
+		starpu_tree_reset_visited(tree, it->visited);
+		it->value = NULL;
+		it->possible_value = NULL;
+		return 0;
+	}
+	int id = -1;
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers = _starpu_worker_get_workerids(neighbour->id, workerids);
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		if(!it->visited[workerids[w]] && workers->present[workerids[w]] && workers->is_unblocked[workerids[w]])
+		{
+			id = workerids[w];
+			it->possible_value = neighbour;
+		}
+	}
+
+	STARPU_ASSERT_MSG(id != -1, "bind id (%d) for workerid (%d) not correct", neighbour->id, id);
+
+	return 1;
+}
+
+static int tree_get_next_unblocked_worker(struct starpu_worker_collection *workers, struct starpu_sched_ctx_iterator *it)
+{
+	int ret = -1;
+
+	struct starpu_tree *tree = (struct starpu_tree *)workers->workerids;
+	struct starpu_tree *neighbour = NULL;
+	if(it->possible_value)
+	{
+		neighbour = it->possible_value;
+		it->possible_value = NULL;
+	}
+	else
+		neighbour = starpu_tree_get_neighbour(tree, (struct starpu_tree*)it->value, it->visited, workers->present);
+
+	STARPU_ASSERT_MSG(neighbour, "no element anymore");
+
+
+	int workerids[STARPU_NMAXWORKERS];
+	int nworkers = _starpu_worker_get_workerids(neighbour->id, workerids);
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		if(!it->visited[workerids[w]] && workers->present[workerids[w]] && workers->is_unblocked[workerids[w]])
 		{
 			ret = workerids[w];
 			it->visited[workerids[w]] = 1;
@@ -176,6 +244,7 @@ static int tree_remove(struct starpu_worker_collection *workers, int worker)
 	if(workers->present[worker])
 	{
 		workers->present[worker] = 0;
+		workers->is_unblocked[worker] = 0;
 		workers->is_master[worker] = 0;
 		workers->nworkers--;
 		return worker;
@@ -194,6 +263,7 @@ static void tree_init(struct starpu_worker_collection *workers)
 	for(i = 0; i < nworkers; i++)
 	{
 		workers->present[i] = 0;
+		workers->is_unblocked[i] = 0;
 		workers->is_master[i] = 0;
 	}
 
@@ -213,7 +283,8 @@ static void tree_init_iterator(struct starpu_worker_collection *workers, struct
 	int nworkers = starpu_worker_get_count();
 	for(i = 0; i < nworkers; i++)
 	{
-		workers->is_master[i] = (workers->present[i] && !starpu_worker_is_slave(i));
+		workers->is_unblocked[i] = (workers->present[i] && !starpu_worker_is_blocked(i));
+		workers->is_master[i] = (workers->present[i] && !starpu_worker_is_blocked(i) && !starpu_worker_is_slave_somewhere(i));
 		it->visited[i] = 0;
 	}
 }
@@ -222,6 +293,8 @@ struct starpu_worker_collection worker_tree =
 {
 	.has_next = tree_has_next,
 	.get_next = tree_get_next,
+	.has_next_unblocked_worker = tree_has_next_unblocked_worker,
+	.get_next_unblocked_worker = tree_get_next_unblocked_worker,
 	.has_next_master = tree_has_next_master,
 	.get_next_master = tree_get_next_master,
 	.add = tree_add,