Browse Source

add discretionary locking in _starpu_sched_ctx_wake_up_workers

Olivier Aumage 8 years ago
parent
commit
95d6bd6ba7
1 changed files with 11 additions and 7 deletions
  1. 11 7
      src/core/sched_ctx.c

+ 11 - 7
src/core/sched_ctx.c

@@ -39,7 +39,7 @@ static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *
 
 
 static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all);
 static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all);
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all);
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, const int workers_locked);
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
 
@@ -365,7 +365,9 @@ static void _starpu_add_workers_to_existing_sched_ctx(struct _starpu_sched_ctx *
 			worker->removed_from_ctx[sched_ctx->id] = 0;
 			worker->removed_from_ctx[sched_ctx->id] = 0;
 		}
 		}
 	}
 	}
+	lock_workers_for_changing_ctx(nworkers, workerids);
 	_do_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers);
 	_do_starpu_add_workers_to_sched_ctx(sched_ctx, workerids, nworkers);
+	unlock_workers_for_changing_ctx(nworkers, workerids);
 	if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
 	if(sched_ctx->sched_policy && sched_ctx->sched_policy->add_workers)
 	{
 	{
 		_STARPU_SCHED_BEGIN;
 		_STARPU_SCHED_BEGIN;
@@ -1145,7 +1147,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 	{
 		if(!sched_ctx->sched_policy)
 		if(!sched_ctx->sched_policy)
-			_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0);
+			_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0, 1);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		  scheduling data before deleting the context */
 
 
@@ -2444,7 +2446,7 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 		sched_ctx->main_master = -1;
 		sched_ctx->main_master = -1;
 }
 }
 
 
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all)
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all, const int workers_locked)
 {
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
 	int current_worker_id = starpu_worker_get_id();
@@ -2470,9 +2472,11 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
 			{
 				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
+				if (!workers_locked)
+					STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
 				STARPU_PTHREAD_COND_SIGNAL(&sched_ctx->parallel_sect_cond[workerid]);
-				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
+				if (!workers_locked)
+					STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 				sem_wait(&sched_ctx->wake_up_sem[master]);
 				sem_wait(&sched_ctx->wake_up_sem[master]);
 			}
 			}
 			else
 			else
@@ -2494,7 +2498,7 @@ void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, uns
 	void* ret = func(param);
 	void* ret = func(param);
 
 
 	/* wake up starpu workers */
 	/* wake up starpu workers */
-	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 1);
+	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 1, 0);
 	return ret;
 	return ret;
 }
 }
 
 
@@ -2526,7 +2530,7 @@ static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx
 
 
 	if(!sched_ctx->awake_workers)
 	if(!sched_ctx->awake_workers)
 	{
 	{
-		_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0);
+		_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0, 0);
 	}
 	}
 }
 }