Explorar o código

- fix a race condition between a pthread_cond wake-up and a resource destruction

Olivier Aumage %!s(int64=10) %!d(string=hai) anos
pai
achega
029217b712
Modificáronse 3 ficheiros con 23 adicións e 1 borrados
  1. 15 1
      src/core/sched_ctx.c
  2. 2 0
      src/core/sched_ctx.h
  3. 6 0
      src/drivers/driver_common/driver_common.c

+ 15 - 1
src/core/sched_ctx.c

@@ -506,6 +506,8 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 
 		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond[w], NULL);
 		STARPU_PTHREAD_MUTEX_INIT(&sched_ctx->parallel_sect_mutex[w], NULL);
+		STARPU_PTHREAD_COND_INIT(&sched_ctx->parallel_sect_cond_busy[w], NULL);
+		sched_ctx->busy[w] = 0;
 
 		sched_ctx->master[w] = -1;
 		sched_ctx->parallel_sect[w] = 0;
@@ -801,6 +803,17 @@ void starpu_sched_ctx_set_perf_counters(unsigned sched_ctx_id, void* perf_counte
 static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 {
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
+	struct _starpu_machine_config *config = _starpu_get_machine_config();
+	int nworkers = config->topology.nworkers;
+	int w;
+	for(w = 0; w < nworkers; w++)
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[w]);
+		while (sched_ctx->busy[w]) {
+			STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond_busy[w], &sched_ctx->parallel_sect_mutex[w]);
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[w]);
+	}
 	if(sched_ctx->sched_policy)
 	{
 		_starpu_deinit_sched_policy(sched_ctx);
@@ -825,7 +838,6 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	hwloc_bitmap_free(sched_ctx->hwloc_workers_set);
 #endif //STARPU_HAVE_HWLOC
 
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
 	STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx_manag);
 	config->topology.nsched_ctxs--;
 	STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx_manag);
@@ -2063,6 +2075,7 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, int master)
 			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
 				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				fprintf(stderr, "_starpu_sched_ctx_wake_up_workers: sched_ctx_id=%u, master=%d\n", sched_ctx_id, master);
 				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
 				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 				sem_wait(&sched_ctx->wake_up_sem[master]);
@@ -2129,6 +2142,7 @@ static void _starpu_sched_ctx_wake_these_workers_up(unsigned sched_ctx_id, int *
 		if(current_worker_id == -1 || workerid != current_worker_id)
 		{
 			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			fprintf(stderr, "_starpu_sched_ctx_wake_these_workers_up: sched_ctx_id=%u, nworkers=%d\n", sched_ctx_id, nworkers);
 			STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 		}

+ 2 - 0
src/core/sched_ctx.h

@@ -134,6 +134,8 @@ struct _starpu_sched_ctx
 	/* conditions variables used when parallel sections are executed in contexts */
 	starpu_pthread_cond_t parallel_sect_cond[STARPU_NMAXWORKERS];
 	starpu_pthread_mutex_t parallel_sect_mutex[STARPU_NMAXWORKERS];
+	starpu_pthread_cond_t parallel_sect_cond_busy[STARPU_NMAXWORKERS];
+	int busy[STARPU_NMAXWORKERS];
 
 	/* boolean indicating that workers should block in order to allow
 	   parallel sections to be executed on their allocated resources */

+ 6 - 0
src/drivers/driver_common/driver_common.c

@@ -360,7 +360,10 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 					needed = 0;
 					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+					sched_ctx->busy[workerid] = 1;
 					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+					sched_ctx->busy[workerid] = 0;
+					STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond_busy[workerid]);
 					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
 					sched_ctx->parallel_sect[workerid] = 0;
 					STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
@@ -380,7 +383,10 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 //				needed = 0;
 				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 				_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+				sched_ctx->busy[workerid] = 1;
 				STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+				sched_ctx->busy[workerid] = 0;
+				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond_busy[workerid]);
 				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
 				sched_ctx->parallel_sect[workerid] = 0;
 				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);