瀏覽代碼

Fix parallel_code example, make sure it works correctly and trim it a little.

Terry Cojean 8 年之前
父節點
當前提交
dfc8872c1d
共有 3 個文件被更改,包括 66 次插入177 次删除
  1. 5 5
      examples/Makefile.am
  2. 23 151
      examples/sched_ctx/parallel_code.c
  3. 38 21
      src/core/sched_ctx.c

+ 5 - 5
examples/Makefile.am

@@ -336,11 +336,11 @@ STARPU_EXAMPLES +=				\
 	sched_ctx/nested_sched_ctxs		\
 	sched_ctx/sched_ctx_without_sched_policy_awake\
 	sched_ctx/parallel_tasks_reuse_handle
-# FIXME: does not compile any more due to missing starpu_sched_ctx_book/unbook_workers_for_task
-#if STARPU_LONG_CHECK
-#STARPU_EXAMPLES +=				\
-#	sched_ctx/parallel_code
-#endif
+
+if STARPU_LONG_CHECK
+STARPU_EXAMPLES +=				\
+	sched_ctx/parallel_code
+endif
 
 if STARPU_HAVE_HWLOC
 if STARPU_HWLOC_HAVE_TOPOLOGY_DUP

+ 23 - 151
examples/sched_ctx/parallel_code.c

@@ -19,13 +19,11 @@
 #include <omp.h>
 
 #ifdef STARPU_QUICK_CHECK
-#define NTASKS 64
+#define NTASKS 4
 #else
 #define NTASKS 10
 #endif
 
-int tasks_executed[2];
-
 int parallel_code(unsigned *sched_ctx)
 {
 	int i;
@@ -34,53 +32,36 @@ int parallel_code(unsigned *sched_ctx)
 	int ncpuids = 0;
 	starpu_sched_ctx_get_available_cpuids(*sched_ctx, &cpuids, &ncpuids);
 
-//	printf("execute task of %d threads \n", ncpuids);
-#pragma omp parallel num_threads(ncpuids)
+	/* printf("execute task of %d threads \n", ncpuids); */
+	omp_set_num_threads(ncpuids);
+#pragma omp parallel
 	{
 		starpu_sched_ctx_bind_current_thread_to_cpuid(cpuids[omp_get_thread_num()]);
-// 			printf("cpu = %d ctx%d nth = %d\n", sched_getcpu(), sched_ctx, omp_get_num_threads());
+			/* printf("cpu = %d ctx%d nth = %d\n", sched_getcpu(), *sched_ctx, omp_get_num_threads()); */
 #pragma omp for
 		for(i = 0; i < NTASKS; i++)
-			t++;
+		{
+#pragma omp atomic
+				t++;
+		}
 	}
 
 	free(cpuids);
-	tasks_executed[*sched_ctx-1] = t;
 	return t;
 }
 
-static void sched_ctx_func(void *descr[] STARPU_ATTRIBUTE_UNUSED, void *arg)
-{
-	int w = starpu_worker_get_id();
-	unsigned *sched_ctx = (unsigned*)arg;
-	int n = parallel_code(sched_ctx);
-	printf("w %d executed %d it \n", w, n);
-}
-
-
-static struct starpu_codelet sched_ctx_codelet =
-{
-	.cpu_funcs = {sched_ctx_func},
-	.model = NULL,
-	.nbuffers = 0,
-	.name = "sched_ctx"
-};
-
 void *th(void* p)
 {
 	unsigned* sched_ctx = (unsigned*)p;
-	tasks_executed[*sched_ctx-1] = 0;
-	//here the return of parallel code could be used (as a void*)
-	starpu_sched_ctx_exec_parallel_code((void*)parallel_code, p, *sched_ctx);
-	return &tasks_executed[*sched_ctx-1];
+	void* ret;
+	ret = starpu_sched_ctx_exec_parallel_code((void*)parallel_code, p, *sched_ctx);
+	pthread_exit(ret);
 }
 
 int main(int argc, char **argv)
 {
-	tasks_executed[0] = 0;
-	tasks_executed[1] = 0;
-	int ntasks = NTASKS;
-	int ret, j, k;
+	int ret;
+	void* tasks_executed;
 
 	ret = starpu_init(NULL);
 	if (ret == -ENODEV)
@@ -88,142 +69,33 @@ int main(int argc, char **argv)
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
 	int nprocs1;
-	int nprocs2;
-	int *procs1, *procs2;
+	int *procs1;
 
 #ifdef STARPU_USE_CPU
 	unsigned ncpus =  starpu_cpu_worker_get_count();
 	procs1 = (int*)malloc(ncpus*sizeof(int));
-	procs2 = (int*)malloc(ncpus*sizeof(int));
 	starpu_worker_get_ids_by_type(STARPU_CPU_WORKER, procs1, ncpus);
-
-	nprocs1 = ncpus/2;
-	nprocs2 =  nprocs1;
-	k = 0;
-	for(j = nprocs1; j < nprocs1+nprocs2; j++)
-		procs2[k++] = j;
+	nprocs1 = ncpus;
 #else
 	nprocs1 = 1;
-	nprocs2 = 1;
 	procs1 = (int*)malloc(nprocs1*sizeof(int));
-	procs2 = (int*)malloc(nprocs2*sizeof(int));
-	procs1[0] = 0;
-	procs2[0] = 0;
 #endif
 
-	if (nprocs1 < 4)
-	{
-		/* Not enough procs */
-		free(procs1);
-		free(procs2);
-		starpu_shutdown();
-		return 77;
-	}
-
-	/*create contexts however you want*/
 	unsigned sched_ctx1 = starpu_sched_ctx_create(procs1, nprocs1, "ctx1", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
-	unsigned sched_ctx2 = starpu_sched_ctx_create(procs2, nprocs2, "ctx2", STARPU_SCHED_CTX_POLICY_NAME, "dmda", 0);
-
-	/*indicate what to do with the resources when context 2 finishes (it depends on your application)*/
-//	starpu_sched_ctx_set_inheritor(sched_ctx2, sched_ctx1);
-
-	int nprocs3 = nprocs1/2;
-	int nprocs4 = nprocs1/2;
-	int nprocs5 = nprocs2/2;
-	int nprocs6 = nprocs2/2;
-	int procs3[nprocs3];
-	int procs4[nprocs4];
-	int procs5[nprocs5];
-	int procs6[nprocs6];
-
-	k = 0;
-	for(j = 0; j < nprocs3; j++)
-		procs3[k++] = procs1[j];
-	k = 0;
-	for(j = nprocs3; j < nprocs3+nprocs4; j++)
-		procs4[k++] = procs1[j];
-
-	k = 0;
-	for(j = 0; j < nprocs5; j++)
-		procs5[k++] = procs2[j];
-	k = 0;
-	for(j = nprocs5; j < nprocs5+nprocs6; j++)
-		procs6[k++] = procs2[j];
-
-	int master3 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs3, nprocs3);
-	int master4 = starpu_sched_ctx_book_workers_for_task(sched_ctx1, procs4, nprocs4);
-
-	int master5 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs5, nprocs5);
-	int master6 = starpu_sched_ctx_book_workers_for_task(sched_ctx2, procs6, nprocs6);
-
-/* 	int master1 = starpu_sched_ctx_book_workers_for_task(procs1, nprocs1); */
-/* 	int master2 = starpu_sched_ctx_book_workers_for_task(procs2, nprocs2); */
-
-
-	int i;
-	for (i = 0; i < ntasks; i++)
-	{
-		struct starpu_task *task = starpu_task_create();
-
-		task->cl = &sched_ctx_codelet;
-		task->cl_arg = &sched_ctx1;
-
-		/*submit tasks to context*/
-		ret = starpu_task_submit_to_ctx(task,sched_ctx1);
-		if (ret == -ENODEV) goto enodev;
-
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
-	}
-
-	for (i = 0; i < ntasks; i++)
-	{
-		struct starpu_task *task = starpu_task_create();
-
-		task->cl = &sched_ctx_codelet;
-		task->cl_arg = &sched_ctx2;
-
-		/*submit tasks to context*/
-		ret = starpu_task_submit_to_ctx(task,sched_ctx2);
-		if (ret == -ENODEV) goto enodev;
-
-		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
-	}
-
-
-	/* tell starpu when you finished submitting tasks to this context
-	   in order to allow moving resources from this context to the inheritor one
-	   when its corresponding tasks finished executing */
-
-
-enodev:
-
-	/* wait for all tasks at the end*/
-	starpu_task_wait_for_all();
-
-/* 	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master1); */
-/* 	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master2); */
-
-	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master3);
-	starpu_sched_ctx_unbook_workers_for_task(sched_ctx1, master4);
-
-	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master5);
-	starpu_sched_ctx_unbook_workers_for_task(sched_ctx2, master6);
 
-	pthread_t mp[2];
-	pthread_create(&mp[0], NULL, th, &sched_ctx1);
-	pthread_create(&mp[1], NULL, th, &sched_ctx2);
+	/* This is the interesting part, we can launch a code to hijack the context and
+		 use its cores to do something else entirely thanks to this */
+	pthread_t mp;
+	pthread_create(&mp, NULL, th, &sched_ctx1);
 
-	pthread_join(mp[0], NULL);
-	pthread_join(mp[1], NULL);
+	pthread_join(mp, &tasks_executed);
 
+	/* Finished, delete the context and print the amount of executed tasks */
 	starpu_sched_ctx_delete(sched_ctx1);
-	starpu_sched_ctx_delete(sched_ctx2);
-	printf("ctx%u: tasks starpu executed %d out of %d\n", sched_ctx1, tasks_executed[0], NTASKS);
-	printf("ctx%u: tasks starpu executed %d out of %d\n", sched_ctx2, tasks_executed[1], NTASKS);
+	printf("ctx%u: tasks starpu executed %ld out of %d\n", sched_ctx1, (intptr_t)tasks_executed, NTASKS);
 	starpu_shutdown();
 
 	free(procs1);
-	free(procs2);
 
 	return (ret == -ENODEV ? 77 : 0);
 }

+ 38 - 21
src/core/sched_ctx.c

@@ -39,7 +39,8 @@ static int occupied_sms = 0;
 static unsigned _starpu_get_first_free_sched_ctx(struct _starpu_machine_config *config);
 
 static void _starpu_sched_ctx_put_new_master(unsigned sched_ctx_id);
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id);
+static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all);
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all);
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id);
 static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx_id);
 
@@ -1123,7 +1124,7 @@ void starpu_sched_ctx_delete(unsigned sched_ctx_id)
 	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 	{
 		if(!sched_ctx->sched_policy)
-			_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
+			_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0);
 		/*if btw the mutex release & the mutex lock the context has changed take care to free all
 		  scheduling data before deleting the context */
 		_starpu_update_workers_without_ctx(workerids, nworkers_ctx, sched_ctx_id, 1);
@@ -2359,18 +2360,23 @@ void _starpu_sched_ctx_signal_worker_woke_up(unsigned sched_ctx_id, int workerid
 	return;
 }
 
-static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id)
+static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id, unsigned all)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
-	int master = sched_ctx->main_master;
+	int master, temp_master = 0;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct starpu_sched_ctx_iterator it;
 	unsigned sleeping[workers->nworkers];
 	int workers_count = 0;
 
-	if (master == -1)
-		return;
+	/* temporarily put a master if needed */
+	if (sched_ctx->main_master == -1)
+	{
+		_starpu_sched_ctx_put_new_master(sched_ctx_id);
+		temp_master = 1;
+	}
+	master = sched_ctx->main_master;
 
     workers->init_iterator(workers, &it);
     while(workers->has_next(workers, &it))
@@ -2379,7 +2385,7 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id)
 			sleeping[workers_count] = _worker_sleeping_in_other_ctx(sched_ctx_id, workerid);
 
 			if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
-				 && !sched_ctx->parallel_sect[workerid] && workerid != master)
+				 && !sched_ctx->parallel_sect[workerid] && (workerid != master || all))
        {
             if (current_worker_id == -1 || workerid != current_worker_id)
             {
@@ -2397,7 +2403,7 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id)
     {
             int workerid = workers->get_next(workers, &it);
             if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
-							 && workerid != master
+							 && (workerid != master || all)
                && (current_worker_id == -1 || workerid != current_worker_id)
                && !sleeping[workers_count])
             {
@@ -2406,26 +2412,34 @@ static void _starpu_sched_ctx_put_workers_to_sleep(unsigned sched_ctx_id)
 						workers_count++;
     }
 
+		if (temp_master)
+			sched_ctx->main_master = -1;
+
     return;
 }
 
-static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
+static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id, unsigned all)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	int current_worker_id = starpu_worker_get_id();
-	int master = sched_ctx->main_master;
+	int master, temp_master = 0;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 	struct starpu_sched_ctx_iterator it;
 
-	if (master == -1)
-		return;
+	/* temporarily put a master if needed */
+	if (sched_ctx->main_master == -1)
+	{
+		_starpu_sched_ctx_put_new_master(sched_ctx_id);
+		temp_master = 1;
+	}
+	master = sched_ctx->main_master;
 
 	workers->init_iterator(workers, &it);
 	while(workers->has_next(workers, &it))
 	{
 		int workerid = workers->get_next(workers, &it);
 		if(starpu_worker_get_type(workerid) == STARPU_CPU_WORKER
-			 && sched_ctx->parallel_sect[workerid] && workerid != master)
+			 && sched_ctx->parallel_sect[workerid] && (workerid != master || all))
 		{
 			if((current_worker_id == -1 || workerid != current_worker_id) && sched_ctx->sleeping[workerid])
 			{
@@ -2439,19 +2453,22 @@ static void _starpu_sched_ctx_wake_up_workers(unsigned sched_ctx_id)
 		}
 	}
 
+	if (temp_master)
+		sched_ctx->main_master = -1;
+
 	return;
 }
 
 void* starpu_sched_ctx_exec_parallel_code(void* (*func)(void*), void* param, unsigned sched_ctx_id)
 {
-    _starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id);
+	_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 1);
 
-    /* execute parallel code */
-    void* ret = func(param);
+	/* execute parallel code */
+	void* ret = func(param);
 
-    /* wake up starpu workers */
-    _starpu_sched_ctx_wake_up_workers(sched_ctx_id);
-    return ret;
+	/* wake up starpu workers */
+	_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 1);
+	return ret;
 }
 
 static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id)
@@ -2466,7 +2483,7 @@ static void _starpu_sched_ctx_update_parallel_workers_with(unsigned sched_ctx_id
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id);
+		_starpu_sched_ctx_put_workers_to_sleep(sched_ctx_id, 0);
 	}
 }
 
@@ -2482,7 +2499,7 @@ static void _starpu_sched_ctx_update_parallel_workers_without(unsigned sched_ctx
 
 	if(!sched_ctx->awake_workers)
 	{
-		_starpu_sched_ctx_wake_up_workers(sched_ctx_id);
+		_starpu_sched_ctx_wake_up_workers(sched_ctx_id, 0);
 	}
 }