Przeglądaj źródła

Bug fixed in parallel_heft.

Task aliases are now used by _starpu_sched_pre_exec_hook and _starpu_sched_post_exec_hook instead
of the combined worker task.
Nicolas Collin 13 lat temu
rodzic
commit
415864e912

+ 5 - 1
src/core/jobs.c

@@ -203,7 +203,11 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 			_starpu_clock_gettime(&task->profiling_info->callback_end_time);
 	}
 
-	_starpu_sched_post_exec_hook(task);
+	/* If the job was executed on a combined worker there is no need for the
+	 * scheduler to process it : the task structure doesn't contain any valuable
+	 * data as it's not linked to an actual worker */
+	if (j->task_size == 1)
+		_starpu_sched_post_exec_hook(task);
 
 	_STARPU_TRACE_TASK_DONE(j);
 

+ 22 - 8
src/drivers/cpu/driver_cpu.c

@@ -26,9 +26,14 @@
 #include "driver_cpu.h"
 #include <core/sched_policy.h>
 
-static int execute_job_on_cpu(struct _starpu_job *j, struct _starpu_worker *cpu_args, int is_parallel_task, int rank, enum starpu_perf_archtype perf_arch)
+/* Actually launch the job on a cpu worker.
+ * Handle binding CPUs on cores.
+ * In the case of a combined worker WORKER_TASK != J->TASK */
+
+static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_task, struct _starpu_worker *cpu_args, int rank, enum starpu_perf_archtype perf_arch)
 {
 	int ret;
+	int is_parallel_task = (j->task_size > 1);
 	int profiling = starpu_profiling_status_get();
 	struct timespec codelet_start, codelet_end;
 
@@ -49,8 +54,14 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct _starpu_worker *cpu_
 	}
 
 	if (is_parallel_task)
+	{
 		_STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
 
+		/* In the case of a combined worker, the scheduler needs to know
+		 * when each actual worker begins the execution */
+		_starpu_sched_pre_exec_hook(worker_task);
+	}
+
 	/* Give profiling variable */
 	_starpu_driver_start_job(cpu_args, j, &codelet_start, rank, profiling);
 
@@ -163,10 +174,6 @@ void *_starpu_cpu_worker(void *arg)
 		/* Get the rank in case it is a parallel task */
 		if (is_parallel_task)
 		{
-			/* We can release the fake task */
-			STARPU_ASSERT(task != j->task);
-			free(task);
-
 			_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 			rank = j->active_task_alias_count++;
 			_STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
@@ -190,7 +197,7 @@ void *_starpu_cpu_worker(void *arg)
 		_starpu_set_current_task(j->task);
 		cpu_arg->current_task = j->task;
 
-                res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
+                res = execute_job_on_cpu(j, task, cpu_arg, rank, perf_arch);
 
 		_starpu_set_current_task(NULL);
 		cpu_arg->current_task = NULL;
@@ -207,10 +214,17 @@ void *_starpu_cpu_worker(void *arg)
 			}
 		}
 
+		/* In the case of combined workers, we need to inform the
+		 * scheduler each worker's execution is over.
+		 * Then we free the workers' task alias */
+		if (is_parallel_task)
+		{
+			_starpu_sched_post_exec_hook(task);
+			free(task);
+		}
+
 		if (rank == 0)
 			_starpu_handle_job_termination(j);
-		else
-			_starpu_sched_post_exec_hook(j->task);
         }
 
 	_STARPU_TRACE_WORKER_DEINIT_START

+ 5 - 1
src/drivers/driver_common/driver_common.c

@@ -39,7 +39,11 @@ void _starpu_driver_start_job(struct _starpu_worker *args, struct _starpu_job *j
 	if (cl->model && cl->model->benchmarking)
 		calibrate_model = 1;
 
-	_starpu_sched_pre_exec_hook(task);
+	/* If the job is executed on a combined worker there is no need for the
+	 * scheduler to process it : it doesn't contain any valuable data
+	 * as it's not linked to an actual worker */
+	if (j->task_size == 1)
+		_starpu_sched_pre_exec_hook(task);
 
 	args->status = STATUS_EXECUTING;
 	task->status = STARPU_TASK_RUNNING;

+ 5 - 0
src/sched_policies/parallel_heft.c

@@ -129,6 +129,11 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		j->task_size = worker_size;
 		j->combined_workerid = best_workerid;
 		j->active_task_alias_count = 0;
+
+		/* This task doesn't belong to an actual worker, it belongs
+		 * to a combined worker and thus the scheduler doesn't care
+		 * of its predicted values which are insignificant */
+		task->predicted = 0;
 		task->predicted_transfer = 0;
 
 		_STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);