Kaynağa Gözat

make it clear that sched_ctx related blocking operations are only about parallel sections, and have nothing to do with workers entering the sleeping status

Olivier Aumage 8 yıl önce
ebeveyn
işleme
98729e8d02

+ 1 - 1
include/starpu_worker.h

@@ -127,7 +127,7 @@ struct starpu_tree* starpu_workers_get_tree(void);
 
 unsigned starpu_worker_get_sched_ctx_list(int worker, unsigned **sched_ctx);
 
-unsigned starpu_worker_is_blocked(int workerid);
+unsigned starpu_worker_is_blocked_in_parallel(int workerid);
 
 unsigned starpu_worker_is_slave_somewhere(int workerid);
 

+ 11 - 11
src/core/sched_ctx.c

@@ -38,8 +38,8 @@ static int occupied_sms = 0;
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 
 static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
-static void _starpu_sched_ctx_block_workers(unsigned sched_ctx_id, unsigned all);
-static void _starpu_sched_ctx_unblock_workers(unsigned sched_ctx_id, unsigned all);
+static void _starpu_sched_ctx_block_workers_in_parallel(unsigned sched_ctx_id, unsigned all);
+static void _starpu_sched_ctx_unblock_workers_in_parallel(unsigned sched_ctx_id, unsigned all);
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
@@ -1103,7 +1103,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 		if(!sched_ctx->sched_policy)
-			_starpu_sched_ctx_unblock_workers(sched_ctx_id, 0);
+			_starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 0);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 
@@ -2318,7 +2318,7 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 	}
 }
 
-static void _starpu_sched_ctx_block_workers(unsigned sched_ctx_id, unsigned all)
+static void _starpu_sched_ctx_block_workers_in_parallel(unsigned sched_ctx_id, unsigned all)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
@@ -2346,7 +2346,7 @@ static void _starpu_sched_ctx_block_workers(unsigned sched_ctx_id, unsigned all)
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-			_starpu_worker_request_blocking(worker);
+			_starpu_worker_request_blocking_in_parallel(worker);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		workers_count++;
@@ -2356,7 +2356,7 @@ static void _starpu_sched_ctx_block_workers(unsigned sched_ctx_id, unsigned all)
 		sched_ctx->main_master = -1;
 }
 
-static void _starpu_sched_ctx_unblock_workers(unsigned sched_ctx_id, unsigned all)
+static void _starpu_sched_ctx_unblock_workers_in_parallel(unsigned sched_ctx_id, unsigned all)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
@@ -2383,7 +2383,7 @@ static void _starpu_sched_ctx_unblock_workers(unsigned sched_ctx_id, unsigned al
 			{
 				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 				STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-				_starpu_worker_request_unblocking(worker);
+				_starpu_worker_request_unblocking_in_parallel(worker);
 				STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 			}
 		}
@@ -2397,13 +2397,13 @@ static void _starpu_sched_ctx_unblock_workers(unsigned sched_ctx_id, unsigned al
 
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 {
-	_starpu_sched_ctx_block_workers(sched_ctx_id, 1);
+	_starpu_sched_ctx_block_workers_in_parallel(sched_ctx_id, 1);
 
 	/* execute parallel code */
 	void* ret = func(param);
 
 	/* wake up starpu workers */
-	_starpu_sched_ctx_unblock_workers(sched_ctx_id, 1);
+	_starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 1);
 	return ret;
 }
 
@@ -2419,7 +2419,7 @@ static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_block_workers(sched_ctx_id, 0);
+		_starpu_sched_ctx_block_workers_in_parallel(sched_ctx_id, 0);
 	}
 }
 
@@ -2435,7 +2435,7 @@ static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_unblock_workers(sched_ctx_id, 0);
+		_starpu_sched_ctx_unblock_workers_in_parallel(sched_ctx_id, 0);
 	}
 }
 

+ 13 - 16
src/core/workers.c

@@ -310,12 +310,11 @@ static inline int _starpu_can_use_nth_implementation(enum starpu_worker_archtype
 	return 0;
 }
 
-/* must be called with sched_mutex locked to protect state_blocked */
+/* must be called with sched_mutex locked to protect state_blocked_in_parallel */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	if(worker->state_blocked_in_parallel) return 0;
 
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & _starpu_config.workers[workerid].worker_mask) &&
@@ -323,12 +322,11 @@ int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task,
 		(!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl));
 }
 
-/* must be called with sched_mutex locked to protect state_blocked */
+/* must be called with sched_mutex locked to protect state_blocked_in_parallel */
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	if(worker->state_blocked_in_parallel) return 0;
 
 	unsigned mask;
 	int i;
@@ -370,8 +368,7 @@ int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *t
 int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(worker->state_blocked) return 0;
+	if(worker->state_blocked_in_parallel) return 0;
 	int i;
 	enum starpu_worker_archtype arch;
 	struct starpu_codelet *cl;
@@ -584,12 +581,12 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->state_sched_op_pending = 0;
 	workerarg->state_changing_ctx_waiting = 0;
 	workerarg->state_changing_ctx_notice = 0;
-	workerarg->state_blocked = 0;
-	workerarg->state_block_req = 0;
-	workerarg->state_block_ack = 0;
-	workerarg->state_unblock_req = 0;
-	workerarg->state_unblock_ack = 0;
-	workerarg->block_ref_count = 0;
+	workerarg->state_blocked_in_parallel = 0;
+	workerarg->state_block_in_parallel_req = 0;
+	workerarg->state_block_in_parallel_ack = 0;
+	workerarg->state_unblock_in_parallel_req = 0;
+	workerarg->state_unblock_in_parallel_ack = 0;
+	workerarg->block_in_parallel_ref_count = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }
@@ -1692,9 +1689,9 @@ unsigned starpu_worker_get_count(void)
 	return _starpu_config.topology.nworkers;
 }
 
-unsigned starpu_worker_is_blocked(int workerid)
+unsigned starpu_worker_is_blocked_in_parallel(int workerid)
 {
-	return (unsigned)_starpu_config.workers[workerid].state_blocked;
+	return (unsigned)_starpu_config.workers[workerid].state_blocked_in_parallel;
 }
 
 unsigned starpu_worker_is_slave_somewhere(int workerid)

+ 70 - 70
src/core/workers.h

@@ -90,18 +90,18 @@ LIST_TYPE(_starpu_worker,
 	int state_sched_op_pending:1; /* a task pop is ongoing even though sched_mutex may temporarily be unlocked */
 	int state_changing_ctx_waiting:1; /* a thread is waiting for operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
 	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new scheduling operations */
-	int state_blocked:1; /* worker is currently blocked */
-	int state_block_req:1; /* a request for state transition from unblocked to blocked is pending */
-	int state_block_ack:1; /* a block request has been honored */
-	int state_unblock_req:1; /* a request for state transition from blocked to unblocked is pending */
-	int state_unblock_ack:1; /* an unblock request has been honored */
+	int state_blocked_in_parallel:1; /* worker is currently blocked on a parallel section */
+	int state_block_in_parallel_req:1; /* a request for state transition from unblocked to blocked is pending */
+	int state_block_in_parallel_ack:1; /* a block request has been honored */
+	int state_unblock_in_parallel_req:1; /* a request for state transition from blocked to unblocked is pending */
+	int state_unblock_in_parallel_ack:1; /* an unblock request has been honored */
 	 /* cumulative blocking depth
 	  * - =0  worker unblocked
 	  * - >0  worker blocked
 	  * - transition from 0 to 1 triggers a block_req
 	  * - transition from 1 to 0 triggers a unblock_req
 	  */
-	unsigned block_ref_count;
+	unsigned block_in_parallel_ref_count;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
@@ -627,127 +627,127 @@ struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid
 
 /* Must be called with worker's sched_mutex held.
  */
-static inline void _starpu_worker_request_blocking(struct _starpu_worker * const worker)
+static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
 {
 	/* flush pending requests to start on a fresh transaction epoch */
-	while (worker->state_unblock_req)
+	while (worker->state_unblock_in_parallel_req)
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
 	/* announce blocking intent */
-	STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
-	worker->block_ref_count++;
+	STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
+	worker->block_in_parallel_ref_count++;
 
-	if (worker->block_ref_count == 1)
+	if (worker->block_in_parallel_ref_count == 1)
 	{
-		/* only the transition from 0 to 1 triggers the block_req */
+		/* only the transition from 0 to 1 triggers the block_in_parallel_req */
 
-		STARPU_ASSERT(!worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_req);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_req);
-		STARPU_ASSERT(!worker->state_unblock_ack);
+		STARPU_ASSERT(!worker->state_blocked_in_parallel);
+		STARPU_ASSERT(!worker->state_block_in_parallel_req);
+		STARPU_ASSERT(!worker->state_block_in_parallel_ack);
+		STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
+		STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
 
-		/* trigger the block_req */
-		worker->state_block_req = 1;
+		/* trigger the block_in_parallel_req */
+		worker->state_block_in_parallel_req = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
-		/* wait for block_req to be processed */
-		while (!worker->state_block_ack)
+		/* wait for block_in_parallel_req to be processed */
+		while (!worker->state_block_in_parallel_ack)
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
-		STARPU_ASSERT(worker->block_ref_count >= 1);
-		STARPU_ASSERT(worker->state_block_req);
-		STARPU_ASSERT(worker->state_blocked);
+		STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
+		STARPU_ASSERT(worker->state_block_in_parallel_req);
+		STARPU_ASSERT(worker->state_blocked_in_parallel);
 
-		/* reset block_req state flags */
-		worker->state_block_req = 0;
-		worker->state_block_ack = 0;
+		/* reset block_in_parallel_req state flags */
+		worker->state_block_in_parallel_req = 0;
+		worker->state_block_in_parallel_ack = 0;
 
-		/* broadcast block_req state flags reset */
+		/* broadcast block_in_parallel_req state flags reset */
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	}
 }
 
 /* Must be called with worker's sched_mutex held.
  */
-static inline void _starpu_worker_request_unblocking(struct _starpu_worker * const worker)
+static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
 {
 	/* flush pending requests to start on a fresh transaction epoch */
-	while (worker->state_block_req)
+	while (worker->state_block_in_parallel_req)
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
 	/* unblocking may be requested unconditionnally
 	 * thus, check is unblocking is really needed */
-	if (worker->state_blocked)
+	if (worker->state_blocked_in_parallel)
 	{
-		if (worker->block_ref_count == 1)
+		if (worker->block_in_parallel_ref_count == 1)
 		{
-			/* only the transition from 1 to 0 triggers the unblock_req */
+			/* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
 
-			STARPU_ASSERT(!worker->state_block_req);
-			STARPU_ASSERT(!worker->state_block_ack);
-			STARPU_ASSERT(!worker->state_unblock_req);
-			STARPU_ASSERT(!worker->state_unblock_ack);
+			STARPU_ASSERT(!worker->state_block_in_parallel_req);
+			STARPU_ASSERT(!worker->state_block_in_parallel_ack);
+			STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
+			STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
 
-			/* trigger the unblock_req */
-			worker->state_unblock_req = 1;
+			/* trigger the unblock_in_parallel_req */
+			worker->state_unblock_in_parallel_req = 1;
 			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
-			/* wait for the unblock_req to be processed */
-			while (!worker->state_unblock_ack)
+			/* wait for the unblock_in_parallel_req to be processed */
+			while (!worker->state_unblock_in_parallel_ack)
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
-			STARPU_ASSERT(worker->state_unblock_req);
-			STARPU_ASSERT(!worker->state_blocked);
+			STARPU_ASSERT(worker->state_unblock_in_parallel_req);
+			STARPU_ASSERT(!worker->state_blocked_in_parallel);
 
-			/* reset unblock_req state flags */
-			worker->state_unblock_req = 0;
-			worker->state_unblock_ack = 0;
+			/* reset unblock_in_parallel_req state flags */
+			worker->state_unblock_in_parallel_req = 0;
+			worker->state_unblock_in_parallel_ack = 0;
 
-			/* broadcast unblock_req state flags reset */
+			/* broadcast unblock_in_parallel_req state flags reset */
 			STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 		}
 
 		/* announce unblocking complete */
-		STARPU_ASSERT(worker->block_ref_count > 0);
-		worker->block_ref_count--;
+		STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
+		worker->block_in_parallel_ref_count--;
 	}
 }
 
 /* Must be called with worker's sched_mutex held.
  */
-static inline void _starpu_worker_process_block_requests(struct _starpu_worker * const worker)
+static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
 {
-	while (worker->state_block_req)
+	while (worker->state_block_in_parallel_req)
 	{
-		STARPU_ASSERT(!worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_req);
-		STARPU_ASSERT(!worker->state_unblock_ack);
-		STARPU_ASSERT(worker->block_ref_count > 0);
+		STARPU_ASSERT(!worker->state_blocked_in_parallel);
+		STARPU_ASSERT(!worker->state_block_in_parallel_ack);
+		STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
+		STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
+		STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
 		
 		/* enter effective blocked state */
-		worker->state_blocked = 1;
+		worker->state_blocked_in_parallel = 1;
 
-		/* notify block_req processing */
-		worker->state_block_ack = 1;
+		/* notify block_in_parallel_req processing */
+		worker->state_block_in_parallel_ack = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
 		/* block */
-		while (!worker->state_unblock_req)
+		while (!worker->state_unblock_in_parallel_req)
 			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
-		STARPU_ASSERT(worker->state_blocked);
-		STARPU_ASSERT(!worker->state_block_req);
-		STARPU_ASSERT(!worker->state_block_ack);
-		STARPU_ASSERT(!worker->state_unblock_ack);
-		STARPU_ASSERT(worker->block_ref_count > 0);
+		STARPU_ASSERT(worker->state_blocked_in_parallel);
+		STARPU_ASSERT(!worker->state_block_in_parallel_req);
+		STARPU_ASSERT(!worker->state_block_in_parallel_ack);
+		STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
+		STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
 
 		/* leave effective blocked state */
-		worker->state_blocked = 0;
+		worker->state_blocked_in_parallel = 0;
 
-		/* notify unblock_req processing */
-		worker->state_unblock_ack = 1;
+		/* notify unblock_in_parallel_req processing */
+		worker->state_unblock_in_parallel_ack = 1;
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	}
 }
@@ -759,14 +759,14 @@ static inline void _starpu_worker_process_block_requests(struct _starpu_worker *
 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
 {
 	/* process pending block requests before entering a sched_op region */
-	_starpu_worker_process_block_requests(worker);
+	_starpu_worker_process_block_in_parallel_requests(worker);
 	while (worker->state_changing_ctx_notice)
 	{
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
 
 		/* new block requests may have been triggered during the wait,
 		 * need to check again */
-		_starpu_worker_process_block_requests(worker);
+		_starpu_worker_process_block_in_parallel_requests(worker);
 	}
 
 	/* no block request and no ctx change ahead,

+ 2 - 2
src/sched_policies/work_stealing_policy.c

@@ -135,7 +135,7 @@ static int select_victim_round_robin(struct _starpu_work_stealing_data *ws, unsi
 		ntasks = ws->per_worker[workerids[worker]].queue_array->ntasks;
 
 		if (ntasks && (ws->per_worker[workerids[worker]].busy
-					   || starpu_worker_is_blocked(workerids[worker])))
+					   || starpu_worker_is_blocked_in_parallel(workerids[worker])))
 			break;
 
 		worker = (worker + 1) % nworkers;
@@ -762,7 +762,7 @@ static int lws_select_victim(struct _starpu_work_stealing_data *ws, unsigned sch
 		int neighbor = ws->per_worker[workerid].proxlist[i];
 		int ntasks = ws->per_worker[neighbor].queue_array->ntasks;
 		if (ntasks && (ws->per_worker[neighbor].busy
-					   || starpu_worker_is_blocked(neighbor)))
+					   || starpu_worker_is_blocked_in_parallel(neighbor)))
 			return neighbor;
 	}
 	return -1;

+ 1 - 1
src/worker_collection/worker_list.c

@@ -284,7 +284,7 @@ static void list_init_iterator_for_parallel_tasks(struct starpu_worker_collectio
 	int nm = 0, nub = 0;
 	for(i = 0;  i < nworkers; i++)
 	{
-		if(!starpu_worker_is_blocked(workerids[i]))
+		if(!starpu_worker_is_blocked_in_parallel(workerids[i]))
 		{
 			((int*)workers->unblocked_workers)[nub++] = workerids[i];
 			if(!it->possibly_parallel) /* don't bother filling the table with masters we won't use it anyway */

+ 2 - 2
src/worker_collection/worker_tree.c

@@ -333,10 +333,10 @@ static void tree_init_iterator_for_parallel_tasks(struct starpu_worker_collectio
 	int nworkers = starpu_worker_get_count();
 	for(i = 0; i < nworkers; i++)
 	{
-		workers->is_unblocked[i] = (workers->present[i] && !starpu_worker_is_blocked(i));
+		workers->is_unblocked[i] = (workers->present[i] && !starpu_worker_is_blocked_in_parallel(i));
 		if(!it->possibly_parallel) /* don't bother filling the table with masters we won't use it anyway */
 			continue;
-		workers->is_master[i] = (workers->present[i] && !starpu_worker_is_blocked(i) && !starpu_worker_is_slave_somewhere(i));
+		workers->is_master[i] = (workers->present[i] && !starpu_worker_is_blocked_in_parallel(i) && !starpu_worker_is_slave_somewhere(i));
 	}
 }