Browse Source

fix race condition between starpu_sched_ctx_delete and pop operations

Olivier Aumage 8 years ago
parent
commit
5efd860ef0

+ 49 - 1
src/core/sched_ctx.c

@@ -125,6 +125,26 @@ static void _starpu_update_workers_without_ctx(int *workerids, int nworkers, int
 	return;
 }
 
+static void _starpu_update_locked_workers_without_ctx(int *workerids, int nworkers, int sched_ctx_id, unsigned now)
+{
+	int i;
+	struct _starpu_worker *worker = NULL;
+
+	for(i = 0; i < nworkers; i++)
+	{
+		worker = _starpu_get_worker_struct(workerids[i]);
+		if(now)
+		{
+			_starpu_worker_gets_out_of_ctx(sched_ctx_id, worker);
+		}
+		else
+		{
+			worker->removed_from_ctx[sched_ctx_id] = 1;
+		}
+	}
+	return;
+}
+
 void starpu_sched_ctx_stop_task_submission()
 {
 	_starpu_exclude_task_from_dag(&stop_submission_task);
@@ -1110,6 +1130,8 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 	int *workerids;
 	unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
+	int backup_workerids[nworkers_ctx];
+	memcpy(backup_workerids, workerids, nworkers_ctx*sizeof(backup_workerids[0]));
 
 	/*if both of them have all the ressources is pointless*/
 	/*trying to transfer ressources from one ctx to the other*/
@@ -1130,9 +1152,35 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 			_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
-		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
+
+		/* announce upcoming context changes, then wait for transient unlocked operations to
+		 * complete before altering the sched_ctx under sched_mutex protection */
+		unsigned i;
+		for (i=0; i<nworkers_ctx; i++)
+		{
+			struct _starpu_worker *worker = _starpu_get_worker_struct(backup_workerids[i]);
+			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+			if (worker->state_pop_pending)
+			{
+				worker->state_changing_ctx_waiting = 1;
+				do
+				{
+					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+				}
+				while (worker->state_pop_pending);
+				worker->state_changing_ctx_waiting = 0;
+			}
+		}
+
+		_starpu_update_locked_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
 		_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 		_starpu_delete_sched_ctx(sched_ctx);
+
+		for (i=0; i<nworkers_ctx; i++)
+		{
+			struct _starpu_worker *worker = _starpu_get_worker_struct(backup_workerids[i]);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+		}
 	}
 
 	STARPU_PTHREAD_RWLOCK_UNLOCK(&changing_ctx_mutex[sched_ctx_id]);

+ 6 - 0
src/core/sched_policy.c

@@ -850,6 +850,7 @@ pick:
 			{
 				if (sched_ctx->sched_policy && sched_ctx->sched_policy->pop_task)
 				{
+					worker->state_pop_pending = 1;
 					/* Note: we do not push the scheduling state here, because
 					 * otherwise when a worker is idle, we'd keep
 					 * pushing/popping a scheduling state here, while what we
@@ -858,6 +859,11 @@ pick:
 					if (task)
 						_STARPU_TASK_BREAK_ON(task, pop);
 					_starpu_pop_task_end(task);
+					worker->state_pop_pending = 0;
+					if (worker->state_changing_ctx_waiting)
+						/* cond_broadcast is required over cond_signal since
+						 * the condition is share for multiple purpose */
+						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 				}
 			}
 

+ 6 - 4
src/core/workers.c

@@ -2047,7 +2047,7 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 	*sched_mutex = &_starpu_config.workers[workerid].sched_mutex;
 }
 
-int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex STARPU_ATTRIBUTE_UNUSED)
+int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
 	starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[workerid]);
@@ -2055,17 +2055,19 @@ int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *cond, starp
 	if (_starpu_config.workers[workerid].status == STATUS_SLEEPING)
 	{
 		_starpu_config.workers[workerid].status = STATUS_WAKING_UP;
-		STARPU_PTHREAD_COND_SIGNAL(cond);
+		/* cond_broadcast is required over cond_signal since
+		 * the condition is share for multiple purpose */
+		STARPU_PTHREAD_COND_BROADCAST(sched_cond);
 		return 1;
 	}
 	return 0;
 }
 
-int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex)
+int starpu_wakeup_worker(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex)
 {
 	int success;
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(mutex);
-	success = starpu_wakeup_worker_locked(workerid, cond, mutex);
+	success = starpu_wakeup_worker_locked(workerid, sched_cond, mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(mutex);
 	return success;
 }

+ 2 - 0
src/core/workers.h

@@ -82,6 +82,8 @@ LIST_TYPE(_starpu_worker,
 	unsigned memory_node; /* which memory node is the worker associated with ? */
 	starpu_pthread_cond_t sched_cond; /* condition variable used when the worker waits for tasks. */
         starpu_pthread_mutex_t sched_mutex; /* mutex protecting sched_cond */
+	int state_pop_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
+	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */

+ 10 - 2
src/drivers/driver_common/driver_common.c

@@ -437,7 +437,11 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		if (_starpu_worker_can_block(memnode, worker)
 			&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			do
+			{
+				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			}
+			while (worker->status == STATUS_SLEEPING);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		else
@@ -580,7 +584,11 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		if (_starpu_worker_can_block(memnode, worker)
 				&& !_starpu_sched_ctx_last_worker_awake(worker))
 		{
-			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			do
+			{
+				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			}
+			while (worker->status == STATUS_SLEEPING);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		else