Bladeren bron

notify both the added/removed workers and the workers already/still in the sched_ctx about sched_ctx changes

Olivier Aumage 8 jaren geleden
bovenliggende
commit
2f9c1318e3
2 gewijzigde bestanden met toevoegingen van 81 en 12 verwijderingen
  1. 2 2
      include/starpu_sched_ctx.h
  2. 79 10
      src/core/sched_ctx.c

+ 2 - 2
include/starpu_sched_ctx.h

@@ -43,9 +43,9 @@ unsigned starpu_sched_ctx_create_inside_interval(const char *policy_name, const
 
 void starpu_sched_ctx_register_close_callback(unsigned sched_ctx_id, void (*close_callback)(unsigned sched_ctx_id, void* args), void *args);
 
-void starpu_sched_ctx_add_workers(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx_id);
+void starpu_sched_ctx_add_workers(int *workerids_ctx, unsigned nworkers_ctx, unsigned sched_ctx_id);
 
-void starpu_sched_ctx_remove_workers(int *workerids_ctx, int nworkers_ctx, unsigned sched_ctx_id);
+void starpu_sched_ctx_remove_workers(int *workerids_ctx, unsigned nworkers_ctx, unsigned sched_ctx_id);
 
 void starpu_sched_ctx_display_workers(unsigned sched_ctx_id, FILE *f);
 

+ 79 - 10
src/core/sched_ctx.c

@@ -50,9 +50,14 @@ static void add_notified_workers(int *workers_to_add, int nworkers_to_add, unsig
 
 static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, const int * const workerids)
 {
+	const int cur_workerid = _starpu_worker_get_id();
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
+		if (starpu_worker_is_combined_worker(workerids[i]))
+			continue;
+		if (workerids[i] == cur_workerid)
+			continue;
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 		_starpu_worker_enter_changing_ctx_op(worker);
@@ -62,9 +67,14 @@ static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, c
 
 static void notify_workers_about_changing_ctx_done(const unsigned nworkers, const int * const workerids)
 {
+	const int cur_workerid = _starpu_worker_get_id();
 	unsigned i;
 	for (i=0; i<nworkers; i++)
 	{
+		if (starpu_worker_is_combined_worker(workerids[i]))
+			continue;
+		if (workerids[i] == cur_workerid)
+			continue;
 		struct _starpu_worker *worker = _starpu_get_worker_struct(workerids[i]);
 		STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
 		_starpu_worker_leave_changing_ctx_op(worker);
@@ -1333,32 +1343,93 @@ static void add_notified_workers(int *workerids, int nworkers, unsigned sched_ct
 	fetch_tasks_from_empty_ctx_list(sched_ctx);
 }
 
-void starpu_sched_ctx_add_workers(int *workers_to_add, int nworkers_to_add, unsigned sched_ctx_id)
+void starpu_sched_ctx_add_workers(int *workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx_id)
 {
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
 	_starpu_check_workers(workers_to_add, nworkers_to_add);
+	int *ctx_workerids = NULL;
+	_starpu_sched_ctx_lock_read(sched_ctx_id);
+	unsigned ctx_nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &ctx_workerids);
+	_starpu_sched_ctx_unlock_read(sched_ctx_id);
+	int cumulated_workerids[ctx_nworkers + nworkers_to_add];
+	memcpy(cumulated_workerids, ctx_workerids, ctx_nworkers*sizeof(cumulated_workerids[0]));
+	unsigned cumulated_nworkers = ctx_nworkers;
+	{
+		unsigned i;
+		for (i=0; i<nworkers_to_add; i++)
+		{
+			unsigned workerid = workers_to_add[i];
+			unsigned duplicate = 0;
+			unsigned j;
+			for (j = 0; j < cumulated_nworkers; j++)
+			{
+				if (cumulated_workerids[j] == workerid)
+				{
+					duplicate = 1;
+					break;
+				}
+			}
+			if (!duplicate)
+			{
+				cumulated_workerids[cumulated_nworkers] = workerid;
+				cumulated_nworkers++;
+			}
+		}
+	}
 
+	/* all workers from the cumulated list must be notified, notifying the
+	 * workers_to_add list is not sufficient because the other workers of
+	 * the context might access the ctx worker list being changed. */
+	notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
 	_starpu_sched_ctx_lock_write(sched_ctx_id);
-	notify_workers_about_changing_ctx_pending(nworkers_to_add, workers_to_add);
 	add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
-	notify_workers_about_changing_ctx_done(nworkers_to_add, workers_to_add);
+	notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
 	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 }
 
-void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_remove, unsigned sched_ctx_id)
+void starpu_sched_ctx_remove_workers(int *workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 
 	_starpu_check_workers(workers_to_remove, nworkers_to_remove);
+	int *ctx_workerids = NULL;
+	_starpu_sched_ctx_lock_read(sched_ctx_id);
+	unsigned ctx_nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &ctx_workerids);
+	_starpu_sched_ctx_unlock_read(sched_ctx_id);
+	int cumulated_workerids[ctx_nworkers + nworkers_to_remove];
+	memcpy(cumulated_workerids, ctx_workerids, ctx_nworkers*sizeof(cumulated_workerids[0]));
+	unsigned cumulated_nworkers = ctx_nworkers;
+	{
+		unsigned i;
+		for (i=0; i<nworkers_to_remove; i++)
+		{
+			unsigned workerid = workers_to_remove[i];
+			unsigned duplicate = 0;
+			unsigned j;
+			for (j = 0; j < cumulated_nworkers; j++)
+			{
+				if (cumulated_workerids[j] == workerid)
+				{
+					duplicate = 1;
+					break;
+				}
+			}
+			if (!duplicate)
+			{
+				cumulated_workerids[cumulated_nworkers] = workerid;
+				cumulated_nworkers++;
+			}
+		}
+	}
 
-	_starpu_sched_ctx_lock_write(sched_ctx_id);
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
 		int removed_workers[sched_ctx->workers->nworkers];
 		int n_removed_workers = 0;
 
-		notify_workers_about_changing_ctx_pending(nworkers_to_remove, workers_to_remove);
+		notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
 
 		if(n_removed_workers > 0)
@@ -1366,11 +1437,9 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, int nworkers_to_rem
 			_starpu_update_notified_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
 			set_priority_on_notified_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
 		}
-		notify_workers_about_changing_ctx_done(nworkers_to_remove, workers_to_remove);
+		notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
-	_starpu_sched_ctx_unlock_write(sched_ctx_id);
-
-	return;
 }
 
 int _starpu_nworkers_able_to_execute_task(struct starpu_task *task, struct _starpu_sched_ctx *sched_ctx)