Prechádzať zdrojové kódy

implement a new synchronization scheme for parallel section blocking

Olivier Aumage 8 rokov pred
rodič
commit
691ab4aefb

+ 47 - 62
src/core/sched_ctx.c

@@ -549,13 +549,6 @@ struct _starpu_sched_ctx* _starpu_create_sched_ctx(struct starpu_sched_policy *p
 	}
 
 	sched_ctx->nsub_ctxs = 0;
-
-	int w;
-	for(w = 0; w < nworkers; w++)
-	{
-		sched_ctx->parallel_sect[w] = 0;
-	}
-
 	sched_ctx->parallel_view = 0;
 
   /*init the strategy structs and the worker_collection of the ressources of the context */
@@ -2326,22 +2319,6 @@ void starpu_sched_ctx_list_task_counters_reset_all(struct starpu_task *task, uns
 	}
 }
 
-static unsigned _worker_blocked_in_other_ctx(unsigned sched_ctx_id, int workerid)
-{
-	int s;
-	for(s = 0; s < STARPU_NMAX_SCHED_CTXS; s++)
-	{
-		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(s);
-		if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS && sched_ctx->id != sched_ctx_id)
-		{
-			if(sched_ctx->parallel_sect[workerid])
-				return 1;
-		}
-	}
-	return 0;
-
-}
-
 static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all, int notified_workers)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
@@ -2349,7 +2326,6 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 	int master, temp_master = 0;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct starpu_sched_ctx_iterator it;
-	unsigned blocked[workers->nworkers];
 	int workers_count = 0;
 
 	/* temporarily put a master if needed */
@@ -2360,26 +2336,6 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 	}
 	master = sched_ctx->main_master;
 
-	workers->init_iterator(workers, &it);
-	while(workers->has_next(workers, &it))
-	{
-		int workerid = workers->get_next(workers, &it);
-		blocked[workers_count] = _worker_blocked_in_other_ctx(sched_ctx_id, workerid);
-
-		if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
-				&& !sched_ctx->parallel_sect[workerid] && (workerid != master || all))
-		{
-			if (current_worker_id == -1 || workerid != current_worker_id)
-			{
-				struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-				STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-				sched_ctx->parallel_sect[workerid] = 1;
-				STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-			}
-		}
-		workers_count++;
-	}
-
 	workers_count = 0;
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
@@ -2387,17 +2343,34 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsign
 		int workerid = workers->get_next(workers, &it);
 		if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
 				&& (workerid != master || all)
-				&& (current_worker_id == -1 || workerid != current_worker_id)
-				&& !blocked[workers_count])
+				&& (current_worker_id == -1 || workerid != current_worker_id))
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-			if (!notified_workers)
+			while (worker->state_unblock_req)
+				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+			STARPU_ASSERT(worker->block_ref_count < UINT_MAX);
+			worker->block_ref_count++;
+			if (worker->block_ref_count == 1)
 			{
-				/* only wait if we are not in the middle of a notified changing ctx op,
-				 * otherwise, the workers will */
-				while (!worker->state_blocked)
+				STARPU_ASSERT(!worker->state_blocked);
+				STARPU_ASSERT(!worker->state_block_req);
+				STARPU_ASSERT(!worker->state_block_ack);
+				STARPU_ASSERT(!worker->state_unblock_req);
+				STARPU_ASSERT(!worker->state_unblock_ack);
+
+				worker->state_block_req = 1;
+				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+
+				while (!worker->state_block_ack)
 					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+				STARPU_ASSERT(worker->block_ref_count >= 1);
+				STARPU_ASSERT(worker->state_block_req);
+				STARPU_ASSERT(worker->state_blocked);
+				worker->state_block_req = 0;
+				worker->state_block_ack = 0;
+				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 			}
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
@@ -2429,27 +2402,39 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned al
 	{
 		int workerid = workers->get_next(workers, &it);
 		if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
-			 && sched_ctx->parallel_sect[workerid] && (workerid != master || all))
+			 && (workerid != master || all))
 		{
 			struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 			STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
-			if((current_worker_id == -1 || workerid != current_worker_id) && worker->state_blocked)
+			if((current_worker_id == -1 || workerid != current_worker_id))
 			{
-				if (!notified_workers && worker->state_wait_ack__blocked)
+				while (worker->state_block_req)
+					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+				if (worker->state_blocked)
 				{
-					worker->state_wait_ack__blocked = 0;
-					worker->state_wait_handshake__blocked = 1;
-					/* broadcast is required because sched_cond is shared for multiple purpose */
-					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					do
+					STARPU_ASSERT(worker->block_ref_count > 0);
+					if (worker->block_ref_count == 1)
 					{
-						STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+						STARPU_ASSERT(!worker->state_block_req);
+						STARPU_ASSERT(!worker->state_block_ack);
+						STARPU_ASSERT(!worker->state_unblock_req);
+						STARPU_ASSERT(!worker->state_unblock_ack);
+
+						worker->state_unblock_req = 1;
+						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+
+						while (!worker->state_unblock_ack)
+							STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+						STARPU_ASSERT(worker->state_unblock_req);
+						STARPU_ASSERT(!worker->state_blocked);
+						worker->state_unblock_req = 0;
+						worker->state_unblock_ack = 0;
+						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 					}
-					while (worker->state_wait_handshake__blocked);
+					worker->block_ref_count--;
 				}
 			}
-			else
-				sched_ctx->parallel_sect[workerid] = 0;
 			STARPU_PTHREAD_MUTEX_UNLOCK(&worker->sched_mutex);
 		}
 	}

+ 0 - 4
src/core/sched_ctx.h

@@ -127,10 +127,6 @@ struct _starpu_sched_ctx
 	   if not master is -1 */
 	int main_master;
 
-	/* boolean indicating that workers should block in order to allow
-	   parallel sections to be executed on their allocated resources */
-	unsigned parallel_sect[STARPU_NMAXWORKERS];
-
 	/* ctx nesting the current ctx */
 	unsigned nesting_sched_ctx;
 

+ 15 - 10
src/core/workers.c

@@ -310,12 +310,12 @@ static inline int _starpu_can_use_nth_implementation(enum starpu_worker_archtype
 	return 0;
 }
 
+/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task, unsigned nimpl)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(sched_ctx->parallel_sect[workerid] ) return 0;
+	if(worker->state_blocked) return 0;
 
 	/* TODO: check that the task operand sizes will fit on that device */
 	return (task->cl->where & _starpu_config.workers[workerid].worker_mask) &&
@@ -323,12 +323,12 @@ int starpu_worker_can_execute_task(unsigned workerid, struct starpu_task *task,
 		(!task->cl->can_execute || task->cl->can_execute(workerid, task, nimpl));
 }
 
+/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *task, unsigned *impl_mask)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
-
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
-	if(sched_ctx->parallel_sect[workerid]) return 0;
+	if(worker->state_blocked) return 0;
 
 	unsigned mask;
 	int i;
@@ -366,13 +366,15 @@ int starpu_worker_can_execute_task_impl(unsigned workerid, struct starpu_task *t
 	return mask != 0;
 }
 
+/* must be called with sched_mutex locked to protect state_blocked */
 int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_task *task, unsigned *nimpl)
 {
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
+	/* if the worker is blocked in a parallel ctx don't submit tasks on it */
+	if(worker->state_blocked) return 0;
 	int i;
 	enum starpu_worker_archtype arch;
 	struct starpu_codelet *cl;
-	if(sched_ctx->parallel_sect[workerid]) return 0;
 	/* TODO: check that the task operand sizes will fit on that device */
 	cl = task->cl;
 	if (!(cl->where & _starpu_config.workers[workerid].worker_mask)) return 0;
@@ -583,8 +585,11 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->state_changing_ctx_waiting = 0;
 	workerarg->state_changing_ctx_notice = 0;
 	workerarg->state_blocked = 0;
-	workerarg->state_wait_ack__blocked = 0;
-	workerarg->state_wait_handshake__blocked = 0;
+	workerarg->state_block_req = 0;
+	workerarg->state_block_ack = 0;
+	workerarg->state_unblock_req = 0;
+	workerarg->state_unblock_ack = 0;
+	workerarg->block_ref_count = 0;
 
 	/* cpu_set/hwloc_cpu_set initialized in topology.c */
 }

+ 38 - 2
src/core/workers.h

@@ -86,8 +86,11 @@ LIST_TYPE(_starpu_worker,
 	int state_changing_ctx_waiting:1; /* a thread is waiting for transient operations such as pop to complete before acquiring sched_mutex and modifying the worker ctx*/
 	int state_changing_ctx_notice:1; /* the worker ctx is about to change or being changed, wait for flag to be cleared before starting new transient scheduling operations */
 	int state_blocked:1;
-	int state_wait_ack__blocked:1;
-	int state_wait_handshake__blocked:1;
+	int state_block_req:1;
+	int state_block_ack:1;
+	int state_unblock_req:1;
+	int state_unblock_ack:1;
+	unsigned block_ref_count;
 	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /* this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /* this records the size of local_ordered_tasks */
@@ -611,14 +614,47 @@ void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *
 
 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
 
+static inline void _starpu_worker_process_block_requests(struct _starpu_worker * const worker)
+{
+	while (worker->state_block_req)
+	{
+		STARPU_ASSERT(!worker->state_blocked);
+		STARPU_ASSERT(!worker->state_block_ack);
+		STARPU_ASSERT(!worker->state_unblock_req);
+		STARPU_ASSERT(!worker->state_unblock_ack);
+		STARPU_ASSERT(worker->block_ref_count > 0);
+		
+		worker->state_blocked = 1;
+		worker->state_block_ack = 1;
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+
+		while (!worker->state_unblock_req)
+			STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+		STARPU_ASSERT(worker->state_blocked);
+		STARPU_ASSERT(!worker->state_block_req);
+		STARPU_ASSERT(!worker->state_block_ack);
+		STARPU_ASSERT(!worker->state_unblock_ack);
+		STARPU_ASSERT(worker->block_ref_count > 0);
+
+		worker->state_blocked = 0;
+		worker->state_unblock_ack = 1;
+		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
+	}
+}
+
 /* Must be called with worker's sched_mutex held.
  * Mark the beginning of a scheduling operation during which the sched_mutex
  * lock may be temporarily released, but the scheduling context of the worker
  * should not be modified */
 static inline void _starpu_worker_enter_transient_sched_op(struct _starpu_worker * const worker)
 {
+	_starpu_worker_process_block_requests(worker);
 	while (worker->state_changing_ctx_notice)
+	{
 		STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+		_starpu_worker_process_block_requests(worker);
+	}
 	worker->state_sched_op_pending = 1;
 }
 

+ 1 - 72
src/drivers/driver_common/driver_common.c

@@ -340,83 +340,12 @@ static void _starpu_exponential_backoff(struct _starpu_worker *worker)
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int workerid, unsigned memnode STARPU_ATTRIBUTE_UNUSED)
 {
 	STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
-	_starpu_worker_enter_transient_sched_op(worker);
 	struct starpu_task *task;
 	unsigned needed = 1;
 	unsigned executing STARPU_ATTRIBUTE_UNUSED = 0;
 
 	_starpu_worker_set_status_scheduling(workerid);
-	while(needed)
-	{
-		struct _starpu_sched_ctx *sched_ctx = NULL;
-		struct _starpu_sched_ctx_elt *e = NULL;
-		struct _starpu_sched_ctx_list_iterator list_it;
-
-		_starpu_sched_ctx_list_iterator_init(worker->sched_ctx_list, &list_it);
-		while (_starpu_sched_ctx_list_iterator_has_next(&list_it))
-		{
-			e = _starpu_sched_ctx_list_iterator_get_next(&list_it);
-			sched_ctx = _starpu_get_sched_ctx_struct(e->sched_ctx);
-			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
-			{
-				if(!sched_ctx->sched_policy)
-					worker->is_slave_somewhere = sched_ctx->main_master != workerid;
-
-				if(sched_ctx->parallel_sect[workerid])
-				{
-					/* don't let the worker sleep with the sched_mutex taken */
-					/* we need it until here bc of the list of ctxs of the workers
-					   that can change in another thread */
-					needed = 0;
-					worker->state_blocked = 1;
-					worker->state_wait_ack__blocked = 1;
-					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					do
-					{
-						STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-					}
-					while (worker->state_wait_ack__blocked && !worker->state_changing_ctx_waiting);
-					worker->state_blocked = 0;
-					sched_ctx->parallel_sect[workerid] = 0;
-					if (worker->state_wait_handshake__blocked && !worker->state_changing_ctx_waiting)
-					{
-						worker->state_wait_handshake__blocked = 0;
-						STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-					}
-				}
-			}
-			if(!needed)
-				break;
-		}
-		/* don't worry if the value is not correct (no lock) it will do it next time */
-		if(worker->tmp_sched_ctx != -1)
-		{
-			sched_ctx = _starpu_get_sched_ctx_struct(worker->tmp_sched_ctx);
-			if(sched_ctx->parallel_sect[workerid])
-			{
-//				needed = 0;
-				worker->state_blocked = 1;
-				worker->state_wait_ack__blocked = 1;
-				STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-				do
-				{
-					STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
-				}
-				while (worker->state_wait_ack__blocked && !worker->state_changing_ctx_waiting);
-				worker->state_blocked = 0;
-				sched_ctx->parallel_sect[workerid] = 0;
-				if (worker->state_wait_handshake__blocked && !worker->state_changing_ctx_waiting)
-				{
-					worker->state_wait_handshake__blocked = 0;
-					STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
-				}
-			}
-		}
-		if (worker->state_changing_ctx_waiting)
-			break;
-		needed = !needed;
-	}
-
+	_starpu_worker_enter_transient_sched_op(worker);
 	if ((worker->pipeline_length == 0 && worker->current_task)
 		|| (worker->pipeline_length != 0 && worker->ntasks))
 		/* This worker is executing something */