Parcourir la source

Fix the multiformat interface.

Cyril Roelandt il y a 13 ans
Parent
commit
842e4c9f44

+ 1 - 0
examples/basic_examples/multiformat.c

@@ -79,6 +79,7 @@ static struct starpu_codelet  cl = {
 	.opencl_funcs = {multiformat_scal_opencl_func, NULL},
 #endif
 	.nbuffers = 1,
+	.name = "codelet_real"
 };
 
 /*

+ 4 - 2
examples/basic_examples/multiformat_conversion_codelets.c

@@ -34,13 +34,15 @@ extern void cpu_to_cuda_cuda_func(void *buffers[], void *args);
 struct starpu_codelet cpu_to_cuda_cl = {
 	.where = STARPU_CUDA,
 	.cuda_funcs = {cpu_to_cuda_cuda_func, NULL},
-	.nbuffers = 1
+	.nbuffers = 1,
+	.name = "codelet_cpu_to_cuda"
 };
 
 struct starpu_codelet cuda_to_cpu_cl = {
 	.where = STARPU_CPU,
 	.cpu_funcs = {cuda_to_cpu, NULL},
-	.nbuffers = 1
+	.nbuffers = 1,
+	.name = "codelet_cude_to_cpu"
 };
 #endif
 

+ 2 - 0
include/starpu_task.h

@@ -185,6 +185,8 @@ struct starpu_task
 	struct starpu_task *prev;
 	struct starpu_task *next;
 
+	unsigned int mf_skip;
+
 	/* this is private to StarPU, do not modify. If the task is allocated
 	 * by hand (without starpu_task_create), this field should be set to
 	 * NULL. */

+ 20 - 37
src/core/sched_policy.c

@@ -256,7 +256,10 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 					continue;
 
 				conversion_task = _starpu_create_conversion_task(handle, node);
-				_starpu_push_local_task(worker, conversion_task, 0);
+				conversion_task->mf_skip = 1;
+				conversion_task->execute_on_a_specific_worker = 1;
+				conversion_task->workerid = workerid;
+				_starpu_task_submit_conversion_task(conversion_task, workerid);
 				//_STARPU_DEBUG("Pushing a conversion task\n");
 			}
 
@@ -393,10 +396,8 @@ struct starpu_task *_starpu_create_conversion_task(starpu_data_handle_t handle,
 	return conversion_task;
 }
 
-
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
 {
-	int i;
 	struct starpu_task *task;
 
 	/* We can't tell in advance which task will be picked up, so we measure
@@ -406,30 +407,10 @@ struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker)
 	if (profiling)
 		_starpu_clock_gettime(&pop_start_time);
 
+pick:
 	/* perhaps there is some local task to be executed first */
 	task = _starpu_pop_local_task(worker);
-	if (task)
-		goto profiling;
-
-	/*
-	 * The first STARPU_NMAXBUFS elements of queued_tasks[i] are conversion
-	 * tasks for multiformat handles. The last element is the "real" task.
-	 */
-	int worker_id = starpu_worker_get_id();
-	static struct starpu_task *queued_tasks[STARPU_NMAXWORKERS][STARPU_NMAXBUFS+1] = { };
 
-	/* Maybe there is a queued task for this worker */
-pick_from_queued_tasks:
-	for (i = 0; i < STARPU_NMAXBUFS+1; i++)
-	{
-		if (queued_tasks[worker_id][i])
-		{
-			task = queued_tasks[worker_id][i];
-			queued_tasks[worker_id][i] = NULL;
-			goto profiling;
-		}
-	}
-	
 	if (!task && policy.pop_task)
 		task = policy.pop_task();
 
@@ -441,12 +422,13 @@ pick_from_queued_tasks:
 	if (!_starpu_task_uses_multiformat_handles(task))
 		goto profiling;
 
-	/*
-	 * This worker may not be able to execute this task. In this case, we
-	 * should return the task anyway. It will be pushed back almost immediatly.
-	 * This way, we avoid computing and executing the conversions tasks.
-	 * Here, we do not care about what implementation is used.
-	 */
+
+	/* This is either a conversion task, or a regular task for which the
+	 * conversion tasks have already been created and submitted */
+	if (task->mf_skip)
+		goto profiling;
+
+	int worker_id = starpu_worker_get_id();
 	if (!starpu_worker_can_execute_task(worker_id, task, 0))
 		return task;
 
@@ -456,6 +438,7 @@ pick_from_queued_tasks:
 	 * We do have a task that uses multiformat handles. Let's create the 
 	 * required conversion tasks.
 	 */
+	int i;
 	for (i = 0; i < task->cl->nbuffers; i++)
 	{
 		struct starpu_task *conversion_task;
@@ -466,7 +449,10 @@ pick_from_queued_tasks:
 			continue;
 
 		conversion_task = _starpu_create_conversion_task(handle, node);
-		queued_tasks[worker_id][i] = conversion_task;
+		conversion_task->mf_skip = 1;
+		conversion_task->execute_on_a_specific_worker = 1;
+		conversion_task->workerid = worker_id;
+		_starpu_task_submit_conversion_task(conversion_task, worker_id);
 	}
 
 	/*
@@ -475,13 +461,10 @@ pick_from_queued_tasks:
 	for (i = 0; i < task->cl->nbuffers; i++)
 		task->buffers[i].handle->mf_node = node;
 
-	queued_tasks[worker_id][STARPU_NMAXBUFS] = task;
-
-	/* We know there is at least one task in queued_tasks[worker_id]. */
-	goto pick_from_queued_tasks;
-	
+	task->mf_skip = 1;
+	starpu_task_list_push_front(&worker->local_tasks, task);
+	goto pick;
 
-	/* We finally got our task */
 profiling:
 	/* Note that we may get a NULL task in case the scheduler was unlocked
 	 * for some reason. */

+ 47 - 0
src/core/task.c

@@ -392,6 +392,53 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 	return ret;
 }
 
+/*
+ * worker->sched_mutex must be locked when calling this function.
+ */
+int _starpu_task_submit_conversion_task(struct starpu_task *task,
+					unsigned int workerid)
+{
+	int ret;
+
+	STARPU_ASSERT(task->cl);
+	STARPU_ASSERT(task->execute_on_a_specific_worker);
+
+	/* We should factorize that */
+	if (task->cl->model)
+		_starpu_load_perfmodel(task->cl->model);
+
+	if (task->cl->power_model)
+		_starpu_load_perfmodel(task->cl->power_model);
+
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+	_starpu_increment_nsubmitted_tasks();
+	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
+	j->submitted = 1;
+	_starpu_increment_nready_tasks();
+
+	memcpy(j->ordered_buffers, j->task->buffers, task->cl->nbuffers*sizeof(struct starpu_buffer_descr));
+
+        _STARPU_LOG_IN();
+
+	task->status = STARPU_TASK_READY;
+	_starpu_profiling_set_task_push_start_time(task);
+
+	unsigned node = starpu_worker_get_memory_node(workerid);
+	if (starpu_get_prefetch_flag())
+		starpu_prefetch_task_input_on_node(task, node);
+
+	struct _starpu_worker *worker;
+	worker = _starpu_get_worker_struct(workerid);
+	starpu_task_list_push_front(&worker->local_tasks, task);
+
+	_starpu_profiling_set_task_push_end_time(task);
+
+        _STARPU_LOG_OUT();
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
+
+	return 0;
+}
+
 void starpu_display_codelet_stats(struct starpu_codelet *cl)
 {
 	unsigned worker;

+ 53 - 25
src/sched_policies/heft.c

@@ -212,32 +212,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		starpu_prefetch_task_input_on_node(task, memory_node);
 	}
 
-	if (!_starpu_task_uses_multiformat_handles(task))
-		goto push_task;
 
-	/*
-	 * Our task uses multiformat handles, which may need to be converted.
-	 */
-	int i;
-	for (i = 0; i < task->cl->nbuffers; i++)
-	{
-		struct starpu_task *conversion_task;
-		starpu_data_handle_t handle;
-
-		handle = task->buffers[i].handle;
-		unsigned int node = starpu_worker_get_memory_node(best_workerid);
-		if (!_starpu_handle_needs_conversion_task(handle, node))
-			continue;
-
-		conversion_task = _starpu_create_conversion_task(handle, node);
-		starpu_push_local_task(best_workerid, conversion_task, prio);
-	}
-
-	unsigned node = starpu_worker_get_memory_node(best_workerid);
-	for (i = 0; i < task->cl->nbuffers; i++)
-		task->buffers[i].handle->mf_node = node;
-
-push_task:
 	//_STARPU_DEBUG("Heft : pushing local task\n");
 	return starpu_push_local_task(best_workerid, task, prio);
 }
@@ -355,6 +330,40 @@ static void compute_all_performance_predictions(struct starpu_task *task,
 	*max_exp_endp = max_exp_end;
 }
 
+static int push_conversion_tasks(struct starpu_task *task, unsigned int workerid)
+{
+	int i, ret;
+	unsigned int node = starpu_worker_get_memory_node(workerid);
+
+	_STARPU_PTHREAD_MUTEX_LOCK(&sched_mutex[workerid]);
+	for (i = 0; i < task->cl->nbuffers; i++)
+	{
+		struct starpu_task *conversion_task;
+		starpu_data_handle_t handle;
+
+		handle = task->buffers[i].handle;
+		if (!_starpu_handle_needs_conversion_task(handle, node))
+			continue;
+
+		conversion_task = _starpu_create_conversion_task(handle, node);
+		conversion_task->execute_on_a_specific_worker = 1;
+		conversion_task->workerid = workerid;
+		conversion_task->mf_skip = 1;
+		ret = _starpu_task_submit_conversion_task(conversion_task, workerid);
+		STARPU_ASSERT(ret == 0);
+	}
+
+	for (i = 0; i < task->cl->nbuffers; i++)
+		task->buffers[i].handle->mf_node = node;
+
+	task->execute_on_a_specific_worker = 1;
+	task->workerid = workerid;
+	task->mf_skip= 1;
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_mutex[workerid]);
+
+	return 0;
+}
+
 static int _heft_push_task(struct starpu_task *task, unsigned prio)
 {
 	unsigned worker, nimpl;
@@ -392,6 +401,16 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 	if (forced_worker != -1)
 	{
 		_starpu_get_job_associated_to_task(task)->nimpl = forced_impl;
+
+		if (_starpu_task_uses_multiformat_handles(task) && !task->mf_skip)
+		{
+			/*
+			 * Our task uses multiformat handles, which may need to be converted.
+			 */
+			push_conversion_tasks(task, forced_worker);
+			prio = 0;
+		}
+
 		return push_task_on_best_worker(task, forced_worker, 0.0, 0.0, prio);
 	}
 
@@ -472,6 +491,15 @@ static int _heft_push_task(struct starpu_task *task, unsigned prio)
 
 	_starpu_get_job_associated_to_task(task)->nimpl = selected_impl;
 
+	if (_starpu_task_uses_multiformat_handles(task) && !task->mf_skip)
+	{
+		/*
+		 * Our task uses multiformat handles, which may need to be converted.
+		 */
+		push_conversion_tasks(task, forced_worker);
+		prio = 0;
+	}
+
 	return push_task_on_best_worker(task, best, model_best, transfer_model_best, prio);
 }