Prechádzať zdrojové kódy

fix for the syncronisation between deleting ctx and starpu_shutdow

Andra Hugo 12 rokov pred
rodič
commit
8598fe2043

+ 34 - 17
examples/sched_ctx/sched_ctx.c

@@ -56,15 +56,18 @@ int main(int argc, char **argv)
 
 	pthread_mutex_init(&mut, NULL);
 
-    int cpus[9];
-    int ncpus = starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, cpus, 9);
+	unsigned ncpus =  starpu_cpu_worker_get_count();
+	unsigned ncuda = starpu_cuda_worker_get_count();
 
-    int cudadevs[3];
-    int ncuda = starpu_worker_get_ids_by_type(STARPU_CUDA_WORKER, cudadevs, 3);
+    int cpus[ncpus];
+    starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, cpus, ncpus);
 
+    int cudadevs[ncuda];
+    starpu_worker_get_ids_by_type(STARPU_CUDA_WORKER, cudadevs, ncuda);
 
-	int nprocs1 = 9;
-	int nprocs2 = 3;
+
+	int nprocs1 = ncpus;
+	int nprocs2 = ncuda;
 
 	int procs1[nprocs1];
 	int procs2[nprocs2];
@@ -74,35 +77,49 @@ int main(int argc, char **argv)
 	{
 		if(k < ncpus)
 			procs1[k] = cpus[k];
-		else
-			procs1[k] = cudadevs[k-ncpus];
 	}
 
-	int j = 0;
-	for(k = nprocs1; k < nprocs1+nprocs2; k++)
-		procs2[j++] = cudadevs[k-ncpus];
+	for(k = 0; k < nprocs2; k++)
+	{
+		procs2[k] = cudadevs[k];
+	}
 
 
 	unsigned sched_ctx1 = starpu_create_sched_ctx("heft", procs1, nprocs1, "ctx1");
 	unsigned sched_ctx2 = starpu_create_sched_ctx("heft", procs2, nprocs2, "ctx2");
 
-//	starpu_set_sched_ctx(NULL);
+	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
+
 	unsigned i;
-	for (i = 0; i < ntasks; i++)
+	for (i = 0; i < ntasks/2; i++)
+	{
+		struct starpu_task *task = starpu_task_create();
+	
+		task->cl = &sched_ctx_codelet;
+		task->cl_arg = NULL;
+	
+		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
+			
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	starpu_sched_ctx_finished_submit(sched_ctx1);
+
+	for (i = 0; i < ntasks/2; i++)
 	{
 		struct starpu_task *task = starpu_task_create();
 	
 		task->cl = &sched_ctx_codelet;
 		task->cl_arg = NULL;
 	
-		if(i % 2 == 0)
-			ret = starpu_task_submit_to_ctx(task,sched_ctx1);
-		else
-			ret = starpu_task_submit_to_ctx(task,sched_ctx2);
+		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
 
 		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
 	}
 
+
+	starpu_sched_ctx_finished_submit(sched_ctx2);
+
 	starpu_task_wait_for_all();
 
 	printf("tasks executed %d out of %d\n", tasks_executed, ntasks);

+ 4 - 0
include/starpu_sched_ctx.h

@@ -121,4 +121,8 @@ double starpu_get_max_time_worker_on_ctx(void);
 
 void starpu_stop_task_submission(void);
 
+void starpu_sched_ctx_set_inheritor(unsigned sched_ctx, unsigned inheritor);
+
+void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id);
+
 #endif /* __STARPU_SCHED_CTX_H__ */

+ 5 - 1
src/common/barrier_counter.c

@@ -58,13 +58,17 @@ int _starpu_barrier_counter_wait_for_full_counter(struct _starpu_barrier_counter
 int _starpu_barrier_counter_decrement_until_empty_counter(struct _starpu_barrier_counter *barrier_c)
 {
 	struct _starpu_barrier *barrier = &barrier_c->barrier;
+	int ret = 0;
 	_STARPU_PTHREAD_MUTEX_LOCK(&barrier->mutex);
 
 	if (--barrier->reached_start == 0)
+	{
+		ret = 1;
 		_STARPU_PTHREAD_COND_BROADCAST(&barrier->cond);
+	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&barrier->mutex);
-	return 0;
+	return ret;
 }
 
 int _starpu_barrier_counter_increment_until_full_counter(struct _starpu_barrier_counter *barrier_c)

+ 34 - 1
src/core/sched_ctx.c

@@ -20,6 +20,7 @@
 
 extern struct worker_collection worker_list;
 static _starpu_pthread_mutex_t sched_ctx_manag = PTHREAD_MUTEX_INITIALIZER;
+static _starpu_pthread_mutex_t finished_submit_mutex = PTHREAD_MUTEX_INITIALIZER;
 struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
 pthread_key_t sched_ctx_key;
 unsigned with_hypervisor = 0;
@@ -231,6 +232,8 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	sched_ctx->sched_policy = (struct starpu_sched_policy*)malloc(sizeof(struct starpu_sched_policy));
 	sched_ctx->is_initial_sched = is_initial_sched;
 	sched_ctx->name = sched_name;
+	sched_ctx->inheritor = STARPU_NMAX_SCHED_CTXS;
+	sched_ctx->finished_submit = 0;
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
@@ -481,7 +484,9 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 	}
 
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id) && !_starpu_wait_for_all_tasks_of_sched_ctx(0))
+	{
 		_starpu_delete_sched_ctx(sched_ctx);
+	}
 	return;	
 }
 
@@ -665,7 +670,19 @@ int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier);
+	int finished = _starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier);
+	if(finished && sched_ctx->inheritor != STARPU_NMAX_SCHED_CTXS)
+	{
+		_STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
+		if(sched_ctx->finished_submit)
+		{
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
+			starpu_delete_sched_ctx(sched_ctx_id, sched_ctx->inheritor);
+			return;
+		}
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
+	}
+	return;
 }
 
 void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
@@ -958,4 +975,20 @@ void starpu_call_pushed_task_cb(int workerid, unsigned sched_ctx_id)
 		sched_ctx->perf_counters->notify_pushed_task(sched_ctx_id, workerid);
 }
 
+void starpu_sched_ctx_set_inheritor(unsigned sched_ctx_id, unsigned inheritor)
+{
+	STARPU_ASSERT(inheritor < STARPU_NMAX_SCHED_CTXS);
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	sched_ctx->inheritor = inheritor;
+	return;
+}
+
+void starpu_sched_ctx_finished_submit(unsigned sched_ctx_id)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_LOCK(&finished_submit_mutex);
+	sched_ctx->finished_submit = 1;
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&finished_submit_mutex);
+	return;
+}
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR

+ 7 - 0
src/core/sched_ctx.h

@@ -86,6 +86,13 @@ struct _starpu_sched_ctx {
 	   determine which is the next context to pop tasks from */
 	unsigned pop_counter[STARPU_NMAXWORKERS];
 
+	/* in case we delete the context leave resources to the inheritor*/
+	unsigned inheritor;
+
+	/* indicates whether the application finished submitting tasks
+	   to this context*/
+	unsigned finished_submit;
+
 #ifdef STARPU_USE_SCHED_CTX_HYPERVISOR
 	/* a structure containing a series of performance counters determining the resize procedure */
 	struct starpu_performance_counters *perf_counters;

+ 2 - 2
src/core/task.c

@@ -374,7 +374,7 @@ int starpu_task_submit(struct starpu_task *task)
 	int ret;
 	unsigned is_sync = task->synchronous;
 	starpu_task_bundle_t bundle = task->bundle;
-        _STARPU_LOG_IN();
+	_STARPU_LOG_IN();
 
 	if (is_sync)
 	{
@@ -662,7 +662,7 @@ int starpu_task_wait_for_all(void)
 {
 	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
 	unsigned sched_ctx = nsched_ctxs == 1 ? 0 : starpu_get_sched_ctx();
-	
+
 	/* if there is no indication about which context to wait,
 	   we wait for all tasks submitted to starpu */
 	if(sched_ctx == STARPU_NMAX_SCHED_CTXS)