Bläddra i källkod

Fix destruction of the after_worker_barrier: we need to wait for threads to have gone out of the barrier... So let's use a simple tiny busy barrier

Samuel Thibault 9 år sedan
förälder
incheckning
0bc621df90
5 ändrade filer med 26 tillägg och 5 borttagningar
  1. 1 0
      src/core/jobs.c
  2. 2 1
      src/core/jobs.h
  3. 2 1
      src/core/parallel_task.c
  4. 5 3
      src/core/sched_policy.c
  5. 16 0
      src/drivers/cpu/driver_cpu.c

+ 1 - 0
src/core/jobs.c

@@ -107,6 +107,7 @@ void _starpu_job_destroy(struct _starpu_job *j)
 	{
 		STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
 		STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
+		STARPU_ASSERT(j->after_work_busy_barrier == 0);
 	}
 
 	_starpu_cg_list_deinit(&j->job_successors);

+ 2 - 1
src/core/jobs.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2009-2014  Université de Bordeaux
+ * Copyright (C) 2009-2015  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2014, 2015  CNRS
  * Copyright (C) 2011  Télécom-SudParis
  * Copyright (C) 2014  INRIA
@@ -185,6 +185,7 @@ LIST_TYPE(_starpu_job,
 	/* Parallel workers may have to synchronize before/after the execution of a parallel task. */
 	starpu_pthread_barrier_t before_work_barrier;
 	starpu_pthread_barrier_t after_work_barrier;
+	unsigned after_work_busy_barrier;
 )
 
 /* Create an internal struct _starpu_job *structure to encapsulate the task. */

+ 2 - 1
src/core/parallel_task.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2014  Université de Bordeaux
+ * Copyright (C) 2010, 2014-2015  Université de Bordeaux
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -45,6 +45,7 @@ void starpu_parallel_task_barrier_init_n(struct starpu_task* task, int worker_si
 
 	STARPU_PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
 	STARPU_PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
+	j->after_work_busy_barrier = worker_size;
 
 	return;
 }

+ 5 - 3
src/core/sched_policy.c

@@ -264,10 +264,8 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		int j;
 		for (j = 0; j < worker_size; j++)
 		{
-			struct starpu_task *alias = starpu_task_dup(task);
 			int subworkerid = combined_workerid[j];
-			alias->destroy = 1;
-			_starpu_push_task_on_specific_worker_notify_sched(alias, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
+			_starpu_push_task_on_specific_worker_notify_sched(task, _starpu_get_worker_struct(subworkerid), subworkerid, workerid);
 		}
 	}
 
@@ -326,6 +324,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 
 		STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, worker_size);
 		STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, worker_size);
+		job->after_work_busy_barrier = worker_size;
 
 		/* Note: we have to call that early, or else the task may have
 		 * disappeared already */
@@ -335,6 +334,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		for (j = 0; j < worker_size; j++)
 		{
 			struct starpu_task *alias = starpu_task_dup(task);
+			alias->destroy = 1;
 
 			worker = _starpu_get_worker_struct(combined_workerid[j]);
 			ret |= _starpu_push_local_task(worker, alias, 0);
@@ -502,6 +502,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 
 				STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
 				STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
+				job->after_work_busy_barrier = workers->nworkers;
 				
 				/* Note: we have to call that early, or else the task may have
 				 * disappeared already */
@@ -516,6 +517,7 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				{
 					workerid = workers->get_next(workers, &it);
 					struct starpu_task *alias = starpu_task_dup(task);
+					alias->destroy = 1;
 					ret |= _starpu_push_task_on_specific_worker(alias, workerid);
 				}
 			}

+ 16 - 0
src/drivers/cpu/driver_cpu.c

@@ -118,7 +118,23 @@ static int execute_job_on_cpu(struct _starpu_job *j, struct starpu_task *worker_
 	_starpu_driver_end_job(cpu_args, j, perf_arch, &codelet_end, rank, profiling);
 
 	if (is_parallel_task)
+	{
 		STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
+		(void) STARPU_ATOMIC_ADD(&j->after_work_busy_barrier, -1);
+		ANNOTATE_HAPPENS_BEFORE(&j->after_work_busy_barrier);
+		if (rank == 0)
+		{
+			/* Wait with a busy barrier for other workers to have
+			 * finished with the blocking barrier before we can
+			 * safely drop the job structure */
+			while (j->after_work_busy_barrier > 0)
+			{
+				STARPU_UYIELD();
+				STARPU_SYNCHRONIZE();
+			}
+			ANNOTATE_HAPPENS_AFTER(&j->after_work_busy_barrier);
+		}
+	}
 
 	if (rank == 0)
 	{