Prechádzať zdrojové kódy

- Provide helpers for pthread_barrier_* functions
- Do destroy the barrier when destroying the job

Cédric Augonnet 14 rokov pred
rodič
commit
879bcb4752

+ 4 - 0
src/common/utils.h

@@ -70,4 +70,8 @@ void _starpu_drop_comments(FILE *f);
 #define PTHREAD_COND_BROADCAST(cond) { int p_ret = pthread_cond_broadcast(cond); if (STARPU_UNLIKELY(p_ret)) { fprintf(stderr, "pthread_cond_broadcast : %s\n", strerror(p_ret)); STARPU_ABORT();}}
 #define PTHREAD_COND_WAIT(cond, mutex) { int p_ret = pthread_cond_wait((cond), (mutex)); if (STARPU_UNLIKELY(p_ret)) { fprintf(stderr, "pthread_cond_wait : %s\n", strerror(p_ret)); STARPU_ABORT();}}
 
+#define PTHREAD_BARRIER_INIT(barrier, attr, count) { int p_ret = pthread_barrier_init((barrier), (attr), (count)); if (STARPU_UNLIKELY(p_ret)) { fprintf(stderr, "pthread_barrier_init : %s\n", strerror(p_ret)); STARPU_ABORT();}}
+#define PTHREAD_BARRIER_DESTROY(barrier) { int p_ret = pthread_barrier_destroy((barrier)); if (STARPU_UNLIKELY(p_ret)) { fprintf(stderr, "pthread_barrier_destroy : %s\n", strerror(p_ret)); STARPU_ABORT();}}
+#define PTHREAD_BARRIER_WAIT(barrier) { int p_ret = pthread_barrier_wait(barrier); if (STARPU_UNLIKELY(!((p_ret == 0) || (p_ret == PTHREAD_BARRIER_SERIAL_THREAD)))) { fprintf(stderr, "pthread_barrier_wait : %s\n", strerror(p_ret)); STARPU_ABORT();}}
+
 #endif // __COMMON_UTILS_H__

+ 6 - 0
src/core/jobs.c

@@ -100,6 +100,12 @@ void _starpu_job_destroy(starpu_job_t j)
 	PTHREAD_COND_DESTROY(&j->sync_cond);
 	PTHREAD_MUTEX_DESTROY(&j->sync_mutex);
 
+	if (j->task_size > 1)
+	{
+		PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
+		PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
+	}
+
 	_starpu_cg_list_deinit(&j->job_successors);
 
 	starpu_job_delete(j);

+ 2 - 2
src/core/sched_policy.c

@@ -247,8 +247,8 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		j->combined_workerid = workerid;
 		j->active_task_alias_count = 0;
 
-		pthread_barrier_init(&j->before_work_barrier, NULL, worker_size);
-		pthread_barrier_init(&j->after_work_barrier, NULL, worker_size);
+		PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
+		PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
 
 		for (i = 0; i < worker_size; i++)
 		{

+ 2 - 2
src/drivers/cpu/driver_cpu.c

@@ -52,7 +52,7 @@ static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args,
 	}
 
 	if (is_parallel_task)
-		pthread_barrier_wait(&j->before_work_barrier);
+		PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
 
 	STARPU_TRACE_START_CODELET_BODY(j);
 
@@ -92,7 +92,7 @@ static int execute_job_on_cpu(starpu_job_t j, struct starpu_worker_s *cpu_args,
 	}
 
 	if (is_parallel_task)
-		pthread_barrier_wait(&j->after_work_barrier);
+		PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
 
 	if (rank == 0)
 	{

+ 2 - 2
src/sched_policies/parallel_heft.c

@@ -115,8 +115,8 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		j->combined_workerid = best_workerid;
 		j->active_task_alias_count = 0;
 
-		pthread_barrier_init(&j->before_work_barrier, NULL, worker_size);
-		pthread_barrier_init(&j->after_work_barrier, NULL, worker_size);
+		PTHREAD_BARRIER_INIT(&j->before_work_barrier, NULL, worker_size);
+		PTHREAD_BARRIER_INIT(&j->after_work_barrier, NULL, worker_size);
 
 		PTHREAD_MUTEX_LOCK(&big_lock);