Browse Source

Add starpu_worker_can_execute_task_impl and starpu_worker_can_execute_task_first_impl to optimize getting the working implementations

Samuel Thibault 10 years ago
parent
commit
3a33757343

+ 20 - 0
doc/doxygen/chapters/api/scheduling_policy.doxy

@@ -126,6 +126,26 @@ Check if the worker specified by workerid can execute the codelet.
 Schedulers need to call it before assigning a task to a worker,
 otherwise the task may fail to execute.
 
+\fn int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
+\ingroup API_Scheduling_Policy
+Check if the worker specified by workerid can execute the codelet and returns
+which implementation numbers can be used.
+Schedulers need to call it before assigning a task to a worker,
+otherwise the task may fail to execute.
+This should be preferred rather than calling starpu_worker_can_execute_task for
+each and every implementation. It can also be used with impl_mask == NULL to
+check for at least one implementation without determining which.
+
+\fn int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
+\ingroup API_Scheduling_Policy
+Check if the worker specified by workerid can execute the codelet and returns
+the first implementation which can be used.
+Schedulers need to call it before assigning a task to a worker,
+otherwise the task may fail to execute.
+This should be preferred rather than calling starpu_worker_can_execute_task for
+each and every implementation. It can also be used with impl_mask == NULL to
+check for at least one implementation without determining which.
+
 \fn uint32_t starpu_task_footprint(struct starpu_perfmodel *model, struct starpu_task *task, struct starpu_perfmodel_arch *arch, unsigned nimpl)
 \ingroup API_Scheduling_Policy
 Returns the footprint for a given task, taking into account user-provided

+ 2 - 0
include/starpu_scheduler.h

@@ -56,6 +56,8 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
 
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl);
+int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask);
+int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl);
 
 int starpu_push_local_task(int workerid, struct starpu_task *task, int back);
 

+ 1 - 1
src/core/sched_ctx.c

@@ -882,7 +882,7 @@ int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _star
 	{
 		worker = workers->get_next(workers, &it);
 		STARPU_ASSERT_MSG(worker < STARPU_NMAXWORKERS, "worker id %d", worker);
-		if (starpu_worker_can_execute_task(worker, task, 0))
+		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
 			nworkers++;
 	}
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx->id]);

+ 1 - 1
src/core/sched_policy.c

@@ -821,7 +821,7 @@ pick:
 	 * Here, we do not care about what implementation is used.
 	 */
 	worker_id = starpu_worker_get_id();
-	if (!starpu_worker_can_execute_task(worker_id, task, 0))
+	if (!starpu_worker_can_execute_task_first_impl(worker_id, task, NULL))
 		return task;
 
 	node = starpu_worker_get_memory_node(worker_id);

+ 69 - 1
src/core/workers.c

@@ -224,7 +224,7 @@ uint32_t _starpu_can_submit_scc_task(void)
 	return (STARPU_SCC & config.worker_mask);
 }
 
-static int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch, struct starpu_codelet *cl, unsigned nimpl)
+static inline int _starpu_can_use_nth_implementation(enum starpu_worker_archtype arch, struct starpu_codelet *cl, unsigned nimpl)
 {
 	switch(arch)
 	{
@@ -293,6 +293,74 @@ int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task,
 		(!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl));
 }
 
+int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	unsigned mask;
+	int i;
+	enum starpu_worker_archtype arch;
+	struct starpu_codelet *cl;
+	if(sched_ctx->parallel_sect[workerid]) return 0;
+	/* TODO: check that the task operand sizes will fit on that device */
+	cl = task->cl;
+	if (!(cl->where & config.workers[workerid].worker_mask)) return 0;
+
+	mask = 0;
+	arch = config.workers[workerid].arch;
+	if (!task->cl->can_execute)
+	{
+		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
+			if (_starpu_can_use_nth_implementation(arch, cl, i)) {
+				mask |= 1U << i;
+				if (!impl_mask)
+					break;
+			}
+	} else {
+		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
+			if (_starpu_can_use_nth_implementation(arch, cl, i)
+			 && (!task->cl->can_execute || task->cl->can_execute(workerid, task, i))) {
+				mask |= 1U << i;
+				if (!impl_mask)
+					break;
+			}
+	}
+	if (impl_mask)
+		*impl_mask = mask;
+	return mask != 0;
+}
+
+int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	int i;
+	enum starpu_worker_archtype arch;
+	struct starpu_codelet *cl;
+	if(sched_ctx->parallel_sect[workerid]) return 0;
+	/* TODO: check that the task operand sizes will fit on that device */
+	cl = task->cl;
+	if (!(cl->where & config.workers[workerid].worker_mask)) return 0;
+
+	arch = config.workers[workerid].arch;
+	if (!task->cl->can_execute)
+	{
+		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
+			if (_starpu_can_use_nth_implementation(arch, cl, i)) {
+				if (nimpl)
+					*nimpl = i;
+				return 1;
+			}
+	} else {
+		for (i = 0; i < STARPU_MAXIMPLEMENTATIONS; i++)
+			if (_starpu_can_use_nth_implementation(arch, cl, i)
+			 && (!task->cl->can_execute || task->cl->can_execute(workerid, task, i))) {
+				if (nimpl)
+					*nimpl = i;
+				return 1;
+			}
+	}
+	return 0;
+}
+
 
 
 int starpu_combined_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)

+ 16 - 5
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -406,6 +406,7 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 
 	unsigned best_impl = 0;
 	unsigned nimpl;
+	unsigned impl_mask;
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 
 	struct starpu_sched_ctx_iterator it;
@@ -421,9 +422,12 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		double exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
 
+		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
+			continue;
+
 		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 		{
-			if (!starpu_worker_can_execute_task(worker, task, nimpl))
+			if (!(impl_mask & (1U << nimpl)))
 			{
 				/* no one on that queue may execute this task */
 				//			worker_ctx++;
@@ -540,6 +544,7 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	unsigned worker, worker_ctx = 0;
 
 	unsigned nimpl;
+	unsigned impl_mask;
 
 	starpu_task_bundle_t bundle = task->bundle;
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
@@ -559,9 +564,12 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 		/* Sometimes workers didn't take the tasks as early as we expected */
 		double exp_start = STARPU_MAX(fifo->exp_start, starpu_timing_now());
 
-		for(nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-	 	{
-			if (!starpu_worker_can_execute_task(worker, task, nimpl))
+		if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
+			continue;
+
+		for (nimpl  = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+		{
+			if (!(impl_mask & (1U << nimpl)))
 			{
 				/* no one on that queue may execute this task */
 				continue;
@@ -711,6 +719,7 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 	double best_fitness = -1;
 
 	unsigned nimpl;
+	unsigned impl_mask;
 	if (forced_best == -1)
 	{
 		struct starpu_sched_ctx_iterator it;
@@ -719,9 +728,11 @@ static int _dmda_push_task(struct starpu_task *task, unsigned prio, unsigned sch
 		while(workers->has_next_master(workers, &it))
 		{
 			worker = workers->get_next_master(workers, &it);
+			if (!starpu_worker_can_execute_task_impl(worker, task, &impl_mask))
+				continue;
 			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
 			{
-				if (!starpu_worker_can_execute_task(worker, task, nimpl))
+				if (!(impl_mask & (1U << nimpl)))
 				{
 					/* no one on that queue may execute this task */
 					continue;

+ 9 - 11
src/sched_policies/deque_queues.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2011  Université de Bordeaux
+ * Copyright (C) 2010-2011, 2014  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -74,14 +74,13 @@ struct starpu_task *_starpu_deque_pop_task(struct _starpu_deque_jobq *deque_queu
 		unsigned nimpl;
 		STARPU_ASSERT(j);
 
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(workerid, j->task, nimpl))
-			{
-				j->nimpl = nimpl;
-				j = _starpu_job_list_pop_front(deque_queue->jobq);
-				_STARPU_TRACE_JOB_POP(j, 0);
-				return j->task;
-			}
+		if (starpu_worker_can_execute_task_first_impl(workerid, j->task, &nimpl))
+		{
+			j->nimpl = nimpl;
+			j = _starpu_job_list_pop_front(deque_queue->jobq);
+			_STARPU_TRACE_JOB_POP(j, 0);
+			return j->task;
+		}
 	}
 
 	return NULL;
@@ -117,8 +116,7 @@ struct _starpu_job_list *_starpu_deque_pop_every_task(struct _starpu_deque_jobq
 			unsigned nimpl;
 			next_job = _starpu_job_list_next(i);
 
-			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(workerid, i->task, nimpl))
+			if (starpu_worker_can_execute_task_first_impl(workerid, i->task, &nimpl))
 			{
 				/* this elements can be moved into the new list */
 				new_list_size++;

+ 8 - 10
src/sched_policies/eager_central_policy.c

@@ -106,19 +106,17 @@ static int push_task_eager_policy(struct starpu_task *task)
 			continue;
 #endif
 
-		unsigned nimpl;
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(worker, task, nimpl))
-			{
-				/* It can execute this one, tell him! */
+		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
+		{
+			/* It can execute this one, tell him! */
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-				starpu_bitmap_unset(data->waiters, worker);
-				/* We really woke at least somebody, no need to wake somebody else */
-				break;
+			starpu_bitmap_unset(data->waiters, worker);
+			/* We really woke at least somebody, no need to wake somebody else */
+			break;
 #else
-				dowake[worker] = 1;
+			dowake[worker] = 1;
 #endif
-			}
+		}
 	}
 	/* Let the task free */
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);

+ 18 - 23
src/sched_policies/eager_central_priority_policy.c

@@ -154,19 +154,17 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 			continue;
 #endif
 
-		unsigned nimpl;
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(worker, task, nimpl))
-			{
-				/* It can execute this one, tell him! */
+		if (starpu_worker_can_execute_task_first_impl(worker, task, NULL))
+		{
+			/* It can execute this one, tell him! */
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-				starpu_bitmap_unset(data->waiters, worker);
-				/* We really woke at least somebody, no need to wake somebody else */
-				break;
+			starpu_bitmap_unset(data->waiters, worker);
+			/* We really woke at least somebody, no need to wake somebody else */
+			break;
 #else
-				dowake[worker] = 1;
+			dowake[worker] = 1;
 #endif
-			}
+		}
 	}
 	/* Let the task free */
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
@@ -237,20 +235,17 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 			{
 				unsigned nimpl;
 				nexttask = starpu_task_list_next(task);
-				for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+				if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
 				{
-					if (starpu_worker_can_execute_task(workerid, task, nimpl))
-					{
-						/* there is some task that we can grab */
-						starpu_task_set_implementation(task, nimpl);
-						starpu_task_list_erase(&taskq->taskq[priolevel], task);
-						chosen_task = task;
-						taskq->ntasks[priolevel]--;
-						taskq->total_ntasks--;
-						_STARPU_TRACE_JOB_POP(task, 0);
-						break;
-					} else skipped = 1;
-				}
+					/* there is some task that we can grab */
+					starpu_task_set_implementation(task, nimpl);
+					starpu_task_list_erase(&taskq->taskq[priolevel], task);
+					chosen_task = task;
+					taskq->ntasks[priolevel]--;
+					taskq->total_ntasks--;
+					_STARPU_TRACE_JOB_POP(task, 0);
+					break;
+				} else skipped = 1;
 			}
 		}
 	}

+ 10 - 13
src/sched_policies/fifo_queues.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2013  Université de Bordeaux
+ * Copyright (C) 2010-2014  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -141,15 +141,14 @@ struct starpu_task *_starpu_fifo_pop_task(struct _starpu_fifo_taskq *fifo_queue,
 		unsigned nimpl;
 		STARPU_ASSERT(task);
 
-		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(workerid, task, nimpl))
-			{
-				starpu_task_set_implementation(task, nimpl);
-				starpu_task_list_erase(&fifo_queue->taskq, task);
-				fifo_queue->ntasks--;
-				_STARPU_TRACE_JOB_POP(task, 0);
-				return task;
-			}
+		if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
+		{
+			starpu_task_set_implementation(task, nimpl);
+			starpu_task_list_erase(&fifo_queue->taskq, task);
+			fifo_queue->ntasks--;
+			_STARPU_TRACE_JOB_POP(task, 0);
+			return task;
+		}
 	}
 
 	return NULL;
@@ -197,8 +196,7 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_
 			unsigned nimpl;
 			next_task = task->next;
 
-			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-			if (starpu_worker_can_execute_task(workerid, task, nimpl))
+			if (starpu_worker_can_execute_task_first_impl(workerid, task, &nimpl))
 			{
 				/* this elements can be moved into the new list */
 				new_list_size++;
@@ -220,7 +218,6 @@ struct starpu_task *_starpu_fifo_pop_every_task(struct _starpu_fifo_taskq *fifo_
 					task->next = NULL;
 				}
 				starpu_task_set_implementation(task, nimpl);
-				break;
 			}
 
 			task = next_task;

+ 8 - 11
src/sched_policies/random_policy.c

@@ -44,18 +44,15 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 	while(workers->has_next(workers, &it))
 	{
                 worker = workers->get_next(workers, &it);
-		int impl = 0;
-		for(impl = 0; impl < STARPU_MAXIMPLEMENTATIONS; impl++)
+		unsigned impl;
+		if(starpu_worker_can_execute_task_first_impl(worker, task, &impl))
 		{
-			if(starpu_worker_can_execute_task(worker, task, impl))
-			{
-				struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker);
-				double speedup = starpu_worker_get_relative_speedup(perf_arch);
-				alpha_sum += speedup;
-				speedup_arr[size] = speedup;
-				worker_arr[size++] = worker;
-				break;
-			}
+			struct starpu_perfmodel_arch* perf_arch = starpu_worker_get_perf_archtype(worker);
+			double speedup = starpu_worker_get_relative_speedup(perf_arch);
+			alpha_sum += speedup;
+			speedup_arr[size] = speedup;
+			worker_arr[size++] = worker;
+			break;
 		}
 	}