Browse Source

add support to defer sched_ctx changes

Olivier Aumage 8 years ago
parent
commit
81afddffd1
4 changed files with 160 additions and 17 deletions
  1. 133 17
      src/core/sched_ctx.c
  2. 10 0
      src/core/sched_ctx.h
  3. 2 0
      src/core/workers.c
  4. 15 0
      src/core/workers.h

+ 133 - 17
src/core/sched_ctx.c

@@ -21,6 +21,12 @@
 #include <stdarg.h>
 #include <core/task.h>
 
+enum _starpu_ctx_change_op
+{
+	ctx_change_invalid = 0,
+	ctx_change_add = 1,
+	ctx_change_remove = 2
+};
 static starpu_pthread_mutex_t sched_ctx_manag = STARPU_PTHREAD_MUTEX_INITIALIZER;
 static starpu_pthread_mutex_t finished_submit_mutex = STARPU_PTHREAD_MUTEX_INITIALIZER;
 static struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
@@ -50,6 +56,7 @@ static void add_notified_workers(int *workers_to_add, int nworkers_to_add, unsig
 
 static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, const int * const workerids)
 {
+	STARPU_ASSERT(!_starpu_worker_sched_op_pending());
 	const int cur_workerid = _starpu_worker_get_id();
 	unsigned i;
 	for (i=0; i<nworkers; i++)
@@ -67,6 +74,7 @@ static void notify_workers_about_changing_ctx_pending(const unsigned nworkers, c
 
 static void notify_workers_about_changing_ctx_done(const unsigned nworkers, const int * const workerids)
 {
+	STARPU_ASSERT(!_starpu_worker_sched_op_pending());
 	const int cur_workerid = _starpu_worker_get_id();
 	unsigned i;
 	for (i=0; i<nworkers; i++)
@@ -1342,6 +1350,38 @@ static void add_notified_workers(int *workerids, int nworkers, unsigned sched_ct
 	fetch_tasks_from_empty_ctx_list(sched_ctx);
 }
 
+static void _defer_ctx_change(int sched_ctx_id, enum _starpu_ctx_change_op op, int nworkers_to_notify, int *workerids_to_notify, int nworkers_to_change, int *workerids_to_change)
+{
+	STARPU_ASSERT(_starpu_worker_sched_op_pending());
+	if (nworkers_to_change == 0)
+		return;
+	int workerid = starpu_worker_get_id_check();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_ctx_change_list *l = &worker->ctx_change_list;
+	struct _starpu_ctx_change *chg = _starpu_ctx_change_new();
+	chg->sched_ctx_id = sched_ctx_id;
+	STARPU_ASSERT(op == ctx_change_add || op == ctx_change_remove);
+	chg->op = op;
+	STARPU_ASSERT(workerids_to_change != NULL);
+	chg->nworkers_to_change = nworkers_to_change;
+	chg->workerids_to_change = malloc(nworkers_to_change * sizeof(chg->workerids_to_change[0]));
+	memcpy(chg->workerids_to_change, workerids_to_change, nworkers_to_change * sizeof(chg->workerids_to_change[0]));
+	if (nworkers_to_notify != 0)
+	{
+		STARPU_ASSERT(workerids_to_notify != NULL);
+		chg->nworkers_to_notify = nworkers_to_notify;
+		chg->workerids_to_notify = malloc(nworkers_to_notify * sizeof(chg->workerids_to_notify[0]));
+		memcpy(chg->workerids_to_notify, workerids_to_notify, nworkers_to_notify * sizeof(chg->workerids_to_notify[0]));
+	}
+	else
+	{
+		STARPU_ASSERT(workerids_to_notify == NULL);
+		chg->nworkers_to_notify = 0;
+		chg->workerids_to_notify = 0;
+	}
+	_starpu_ctx_change_list_push_back(l, chg);
+}
+
 void starpu_sched_ctx_add_workers(int *workers_to_add, unsigned nworkers_to_add, unsigned sched_ctx_id)
 {
 	STARPU_ASSERT(workers_to_add != NULL && nworkers_to_add > 0);
@@ -1379,11 +1419,18 @@ void starpu_sched_ctx_add_workers(int *workers_to_add, unsigned nworkers_to_add,
 	/* all workers from the cumulated list must be notified, notifying the
 	 * workers_to_add list is not sufficient because the other workers of
 	 * the context might access the ctx worker list being changed. */
-	notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
-	_starpu_sched_ctx_lock_write(sched_ctx_id);
-	add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
-	notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
-	_starpu_sched_ctx_unlock_write(sched_ctx_id);
+	if (_starpu_worker_sched_op_pending())
+	{
+		_defer_ctx_change(sched_ctx_id, ctx_change_add, cumulated_nworkers, cumulated_workerids, nworkers_to_add, workers_to_add);
+	}
+	else
+	{
+		notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
+		add_notified_workers(workers_to_add, nworkers_to_add, sched_ctx_id);
+		notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
+	}
 }
 
 void starpu_sched_ctx_add_combined_workers(int *combined_workers_to_add, unsigned n_combined_workers_to_add, unsigned sched_ctx_id)
@@ -1393,6 +1440,21 @@ void starpu_sched_ctx_add_combined_workers(int *combined_workers_to_add, unsigne
 	_starpu_sched_ctx_unlock_write(sched_ctx_id);
 }
 
+static void remove_notified_workers(int *workerids, int nworkers, unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	int removed_workers[sched_ctx->workers->nworkers];
+	int n_removed_workers = 0;
+
+	_starpu_remove_workers_from_sched_ctx(sched_ctx, workerids, nworkers, removed_workers, &n_removed_workers);
+
+	if(n_removed_workers > 0)
+	{
+		_starpu_update_notified_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
+		set_priority_on_notified_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
+	}
+}
+
 void starpu_sched_ctx_remove_workers(int *workers_to_remove, unsigned nworkers_to_remove, unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -1431,20 +1493,18 @@ void starpu_sched_ctx_remove_workers(int *workers_to_remove, unsigned nworkers_t
 	/* if the context has not already been deleted */
 	if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 	{
-		int removed_workers[sched_ctx->workers->nworkers];
-		int n_removed_workers = 0;
-
-		notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
-		_starpu_sched_ctx_lock_write(sched_ctx_id);
-		_starpu_remove_workers_from_sched_ctx(sched_ctx, workers_to_remove, nworkers_to_remove, removed_workers, &n_removed_workers);
-
-		if(n_removed_workers > 0)
+		if (_starpu_worker_sched_op_pending())
 		{
-			_starpu_update_notified_workers_without_ctx(removed_workers, n_removed_workers, sched_ctx_id, 0);
-			set_priority_on_notified_workers(removed_workers, n_removed_workers, sched_ctx_id, 1);
+			_defer_ctx_change(sched_ctx_id, ctx_change_remove, cumulated_nworkers, cumulated_workerids, nworkers_to_remove, workers_to_remove);
+		}
+		else
+		{
+			notify_workers_about_changing_ctx_pending(cumulated_nworkers, cumulated_workerids);
+			_starpu_sched_ctx_lock_write(sched_ctx_id);
+			remove_notified_workers(workers_to_remove, nworkers_to_remove, sched_ctx_id);
+			notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
+			_starpu_sched_ctx_unlock_write(sched_ctx_id);
 		}
-		notify_workers_about_changing_ctx_done(cumulated_nworkers, cumulated_workerids);
-		_starpu_sched_ctx_unlock_write(sched_ctx_id);
 	}
 }
 
@@ -2649,3 +2709,59 @@ void *starpu_sched_ctx_get_user_data(unsigned sched_ctx_id)
 	STARPU_ASSERT(sched_ctx != NULL);
 	return sched_ctx->user_data;
 }
+
+void _starpu_worker_apply_deferred_ctx_changes(void)
+{
+	int workerid = starpu_worker_get_id_check();
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	struct _starpu_ctx_change_list *l = &worker->ctx_change_list;
+	STARPU_ASSERT(!_starpu_worker_sched_op_pending());
+	while (!_starpu_ctx_change_list_empty(l))
+	{
+		struct _starpu_ctx_change *chg = _starpu_ctx_change_list_pop_front(l);
+		STARPU_ASSERT(chg->workerids_to_change != NULL);
+		_starpu_sched_ctx_lock_write(chg->sched_ctx_id);
+
+		if (chg->nworkers_to_notify)
+		{
+			STARPU_ASSERT(chg->workerids_to_notify != NULL);
+			notify_workers_about_changing_ctx_pending(chg->nworkers_to_notify, chg->workerids_to_notify);
+		}
+		else
+		{
+			STARPU_ASSERT(chg->workerids_to_notify == NULL);
+			notify_workers_about_changing_ctx_pending(chg->nworkers_to_change, chg->workerids_to_change);
+		}
+		switch (chg->op)
+		{
+			case ctx_change_add:
+			{
+				add_notified_workers(chg->workerids_to_change, chg->nworkers_to_change, chg->sched_ctx_id);
+			}
+			break;
+			case ctx_change_remove:
+			{
+				remove_notified_workers(chg->workerids_to_change, chg->nworkers_to_change, chg->sched_ctx_id);
+			}
+			break;
+			default:
+			STARPU_ASSERT_MSG(0, "invalid ctx changeg opcode\n");
+		}
+		if (chg->nworkers_to_notify)
+		{
+			notify_workers_about_changing_ctx_done(chg->nworkers_to_notify, chg->workerids_to_notify);
+		}
+		else
+		{
+			notify_workers_about_changing_ctx_done(chg->nworkers_to_change, chg->workerids_to_change);
+		}
+		
+		_starpu_sched_ctx_unlock_write(chg->sched_ctx_id);
+		free(chg->workerids_to_notify);
+		free(chg->workerids_to_change);
+		_starpu_ctx_change_delete(chg);
+	}
+	
+
+}
+

+ 10 - 0
src/core/sched_ctx.h

@@ -159,6 +159,16 @@ struct _starpu_sched_ctx
 	starpu_pthread_t lock_write_owner;
 };
 
+/* per-worker list of deferred ctx_change ops */
+LIST_TYPE(_starpu_ctx_change,
+	int sched_ctx_id;
+	int op;
+	int nworkers_to_notify;
+	int *workerids_to_notify;
+	int nworkers_to_change;
+	int *workerids_to_change;
+);
+
 struct _starpu_machine_config;
 
 /* init sched_ctx_id of all contextes*/

+ 2 - 0
src/core/workers.c

@@ -528,6 +528,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
 	starpu_task_list_init(&workerarg->local_tasks);
+	_starpu_ctx_change_list_init(&workerarg->ctx_change_list);
 	workerarg->local_ordered_tasks = NULL;
 	workerarg->local_ordered_tasks_size = 0;
 	workerarg->current_ordered_task = 0;
@@ -1460,6 +1461,7 @@ out:
 			STARPU_ASSERT(worker->local_ordered_tasks[n] == NULL);
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
 		free(worker->local_ordered_tasks);
+		STARPU_ASSERT(_starpu_ctx_change_list_empty(&worker->ctx_change_list));
 	}
 }
 

+ 15 - 0
src/core/workers.h

@@ -64,6 +64,8 @@
 
 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
 
+struct _starpu_ctx_change_list;
+
 /* This is initialized from in _starpu_worker_init */
 LIST_TYPE(_starpu_worker,
 	struct _starpu_machine_config *config;
@@ -104,6 +106,7 @@ LIST_TYPE(_starpu_worker,
 	  * - transition from 1 to 0 triggers a unblock_req
 	  */
 	unsigned block_in_parallel_ref_count;
+	struct _starpu_ctx_change_list ctx_change_list;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
@@ -798,10 +801,22 @@ static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const w
  * Mark the end of a scheduling operation, and notify potential waiters that
  * scheduling context changes can safely be performed again.
  */
+void _starpu_worker_apply_deferred_ctx_changes(void);
 static inline void  _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
 {
 	worker->state_safe_for_observation = 1;
 	worker->state_sched_op_pending = 0;
+	_starpu_worker_apply_deferred_ctx_changes();
+}
+
+static inline int _starpu_worker_sched_op_pending(void)
+{
+	int workerid = starpu_worker_get_id();
+	if (workerid == -1)
+		return 0;
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	STARPU_ASSERT(worker != NULL);
+	return worker->state_sched_op_pending;
 }
 
 /* Must be called with worker's sched_mutex held.