Procházet zdrojové kódy

fix deadlock: we have to lock the worker's sched_mutex when iterating on its list of contexts but unlock it before the worker is going to sleep
ex: remove cuda and opencl impl of the codelet

Andra Hugo před 11 roky
rodič
revize
3c8000976d

+ 2 - 2
examples/sched_ctx/parallel_code.c

@@ -61,8 +61,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"

+ 2 - 2
examples/sched_ctx/sched_ctx.c

@@ -36,8 +36,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg STAR
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = {NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"

+ 3 - 3
examples/sched_ctx/sched_ctx_without_sched_policy.c

@@ -59,8 +59,8 @@ static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
 static struct starpu_codelet sched_ctx_codelet =
 {
 	.cpu_funcs = {sched_ctx_func, NULL},
-	.cuda_funcs = {sched_ctx_func, NULL},
-	.opencl_funcs = {sched_ctx_func, NULL},
+	.cuda_funcs = { NULL},
+	.opencl_funcs = {NULL},
 	.model = NULL,
 	.nbuffers = 0,
 	.name = "sched_ctx"
@@ -93,7 +93,7 @@ int main(int argc, char **argv)
 	if(ncpus > 1)
 	{
 		nprocs1 = ncpus/2;
-		nprocs2 =  nprocs1;
+		nprocs2 =  ncpus-nprocs1;
 		k = 0;
 		for(j = nprocs1; j < nprocs1+nprocs2; j++)
 			procs2[k++] = j;

+ 6 - 8
src/core/sched_ctx.c

@@ -47,6 +47,8 @@ static void _starpu_worker_gets_into_ctx(unsigned sched_ctx_id, struct _starpu_w
 		worker->nsched_ctxs++;
 	}
 	worker->removed_from_ctx[sched_ctx_id] = 0;
+	if(worker->tmp_sched_ctx == sched_ctx_id)
+		worker->tmp_sched_ctx = -1;
 	return;
 }
 
@@ -167,12 +169,7 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 			workers->add(workers, worker);
 			workers_to_add[i] = worker;
 			struct _starpu_worker *str_worker = _starpu_get_worker_struct(worker);
-			if(!str_worker->tmp_sched_ctx_list)
-			{
-				str_worker->tmp_sched_ctx_list = (struct _starpu_sched_ctx_list*)malloc(sizeof(struct _starpu_sched_ctx_list));
-				_starpu_sched_ctx_list_init(str_worker->tmp_sched_ctx_list);
-			}
-			_starpu_sched_ctx_list_add(&str_worker->tmp_sched_ctx_list, sched_ctx->id);
+			str_worker->tmp_sched_ctx = (int)sched_ctx->id;
 
 		}
 	}
@@ -605,8 +602,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 		free(sched_ctx->sched_policy);
 		sched_ctx->sched_policy = NULL;
 	}
-	else
-		starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
+	
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -658,6 +654,8 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
+		if(!sched_ctx->sched_policy)
+			starpu_sched_ctx_unbook_workers_for_task(sched_ctx->id, sched_ctx->main_master);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);

+ 1 - 2
src/core/workers.c

@@ -429,7 +429,7 @@ static void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu
 	workerarg->run_by_starpu = 1;
 
 	workerarg->sched_ctx_list = NULL;
-	workerarg->tmp_sched_ctx_list = NULL;
+	workerarg->tmp_sched_ctx = -1;
 	workerarg->nsched_ctxs = 0;
 	_starpu_barrier_counter_init(&workerarg->tasks_barrier, 0);
 
@@ -1178,7 +1178,6 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 out:
 		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);
-		_starpu_sched_ctx_list_delete(&worker->tmp_sched_ctx_list);
 		_starpu_job_list_delete(worker->terminated_jobs);
 	}
 }

+ 1 - 1
src/core/workers.h

@@ -84,7 +84,7 @@ LIST_TYPE(_starpu_worker,
 	unsigned run_by_starpu; /* Is this run by StarPU or directly by the application ? */
 
 	struct _starpu_sched_ctx_list *sched_ctx_list;
-	struct _starpu_sched_ctx_list *tmp_sched_ctx_list;
+	int tmp_sched_ctx;
 	unsigned nsched_ctxs; /* the no of contexts a worker belongs to*/
 	struct _starpu_barrier_counter tasks_barrier; /* wait for the tasks submitted */
 

+ 22 - 18
src/drivers/driver_common/driver_common.c

@@ -195,54 +195,58 @@ static void _starpu_exponential_backoff(struct _starpu_worker *args)
 /* Workers may block when there is no work to do at all. */
 struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *args, int workerid, unsigned memnode)
 {
+	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 	struct starpu_task *task;
 	unsigned needed = 1;
 	while(needed)
 	{
+		struct _starpu_sched_ctx *sched_ctx = NULL;
 		struct _starpu_sched_ctx_list *l = NULL;
 		for (l = args->sched_ctx_list; l; l = l->next)
 		{
-			struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
+			sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
 			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
 			{
 				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 				if(sched_ctx->parallel_sect[workerid])
 				{
+					/* don't let the worker sleep with the sched_mutex taken */
+					/* we need it until here bc of the list of ctxs of the workers
+					   that can change in another thread */
+					STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
 					needed = 0;
 					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
 					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
 					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
 					sched_ctx->parallel_sect[workerid] = 0;
+					STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 				}
 				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 			}
 			if(!needed)
 				break;
 		}
-
-		for (l = args->tmp_sched_ctx_list; l; l = l->next)
+		/* don't worry if the value is not correct (no lock) it will do it next time */
+		if(args->tmp_sched_ctx != -1)
 		{
-			struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(l->sched_ctx);
-			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
+			sched_ctx = _starpu_get_sched_ctx_struct(args->tmp_sched_ctx);
+			STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+			if(sched_ctx->parallel_sect[workerid])
 			{
-				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
-				if(sched_ctx->parallel_sect[workerid])
-				{
-					needed = 0;
-					_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
-					STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
-					_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
-					sched_ctx->parallel_sect[workerid] = 0;
-				}
-				STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+//				needed = 0;
+				STARPU_PTHREAD_MUTEX_UNLOCK(&args->sched_mutex);
+				_starpu_sched_ctx_signal_worker_blocked(sched_ctx->id, workerid);
+				STARPU_PTHREAD_COND_WAIT(&sched_ctx->parallel_sect_cond[workerid], &sched_ctx->parallel_sect_mutex[workerid]);
+				_starpu_sched_ctx_signal_worker_woke_up(sched_ctx->id, workerid);
+				sched_ctx->parallel_sect[workerid] = 0;
+				STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
 			}
-			if(!needed)
-				break;
+			STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->parallel_sect_mutex[workerid]);
 		}
 
 		needed = !needed;
 	}
-	STARPU_PTHREAD_MUTEX_LOCK(&args->sched_mutex);
+
 	task = _starpu_pop_task(args);
 
 	if (task == NULL)