Browse Source

use pthread_key for the sched_ctxs

Andra Hugo 13 years ago
parent
commit
52bc102de1

+ 3 - 10
examples/cholesky_2ctxs/cholesky/cholesky_implicit.c

@@ -90,7 +90,6 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, unsigned sch
 				   STARPU_PRIORITY, prio_level,
 				   STARPU_RW, sdatakk,
 				   STARPU_CALLBACK, (k == 3*nblocks/4)?callback_turn_spmd_on:NULL,
-				   STARPU_CTX, sched_ctx,
 				   0);
 
 		for (j = k+1; j<nblocks; j++)
@@ -100,7 +99,6 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, unsigned sch
 					   STARPU_PRIORITY, (j == k+1)?prio_level:STARPU_DEFAULT_PRIO,
 					   STARPU_R, sdatakk,
 					   STARPU_RW, sdatakj,
-					   STARPU_CTX, sched_ctx,
 					   0);
 
 			for (i = k+1; i<nblocks; i++)
@@ -115,18 +113,14 @@ static double _cholesky(starpu_data_handle dataA, unsigned nblocks, unsigned sch
 							   STARPU_R, sdataki,
 							   STARPU_R, sdatakj,
 							   STARPU_RW, sdataij,
-							   STARPU_CTX, sched_ctx,
 							   0);
                                 }
 			}
 		}
 	}
 
-	if(sched_ctx != 0)
-		starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
-	else
-		starpu_task_wait_for_all();
-
+	starpu_task_wait_for_all();
+		
 	starpu_data_unpartition(dataA, 0);
 
 	gettimeofday(&end, NULL);
@@ -214,8 +208,7 @@ double run_cholesky_implicit(unsigned sched_ctx, int start, int argc, char **arg
 		printf("\n");
 	}
 #endif
-	//	if(barrier != NULL)
-	//	  pthread_barrier_wait(barrier);
+//	starpu_set_sched_ctx(&sched_ctx);
 	double gflops = cholesky(mat, size, size, nblocks, sched_ctx, timing);
 
 #ifdef PRINT_OUTPUT

+ 4 - 2
examples/cholesky_2ctxs/cholesky_2ctxs.c

@@ -40,7 +40,7 @@ pthread_barrier_t barrier;
 
 void* func_cholesky(void *val){
   params *p = (params*)val;
-  unsigned sched_ctx = p->ctx;
+  unsigned *sched_ctx = &p->ctx;
   int the_other_ctx = p->the_other_ctx;
 
   int i;
@@ -48,9 +48,11 @@ void* func_cholesky(void *val){
   rv->flops = 0;
   rv->avg_timing = 0;
   double timing = 0;
+
+  starpu_set_sched_ctx(sched_ctx);
   for(i = 0; i < NSAMPLES; i++)
     {
-      rv->flops += run_cholesky_implicit(sched_ctx, p->start, p->argc, p->argv, &timing, &barrier);
+      rv->flops += run_cholesky_implicit(*sched_ctx, p->start, p->argc, p->argv, &timing, &barrier);
       rv->avg_timing += timing;
 
     }

+ 11 - 2
include/starpu_task.h

@@ -178,6 +178,10 @@ struct starpu_task {
 
 	/* Scheduling context */
 	unsigned sched_ctx;
+
+	/* flag to differentiate tasks needed by starpu management purposes 
+	 from the ones provided by the appl*/
+	unsigned specific_starpu;
 };
 
 /* It is possible to initialize statically allocated tasks with this value.
@@ -202,7 +206,8 @@ struct starpu_task {
 	.profiling_info = NULL,				\
 	.predicted = -1.0,				\
 	.starpu_private = NULL,				\
-	.sched_ctx = 0					\
+	.sched_ctx = 0,					\
+	.specific_starpu = 1				\
 };
 
 /*
@@ -264,8 +269,12 @@ struct starpu_task *starpu_task_create(void);
  * structure (default behaviour). Calling this function on a statically
  * allocated task results in an undefined behaviour. */
 void starpu_task_destroy(struct starpu_task *task);
+
 int starpu_task_submit(struct starpu_task *task);
-int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx);
+	
+void starpu_set_sched_ctx(unsigned *sched_ctx);
+
+unsigned starpu_get_sched_ctx();
 
 /* This function blocks until the task was executed. It is not possible to
  * synchronize with a task more than once. It is not possible to wait

+ 24 - 4
src/core/sched_ctx.c

@@ -16,6 +16,9 @@
 
 #include <core/sched_policy.h>
 #include <core/sched_ctx.h>
+#include <common/utils.h>
+
+pthread_key_t sched_ctx_key;
 
 static unsigned _starpu_get_first_available_sched_ctx_id(struct starpu_machine_config_s *config);
 static unsigned _starpu_get_first_free_sched_ctx_in_worker_list(struct starpu_worker_s *worker);
@@ -183,7 +186,6 @@ struct starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 		struct starpu_worker_s *worker = _starpu_get_worker_struct(sched_ctx->workerid[i]);
 		worker->sched_ctx[_starpu_get_first_free_sched_ctx_in_worker_list(worker)] = sched_ctx;
 	      }
-	    //	    printf("ctx %d created\n", sched_ctx->sched_ctx_id);
 	  }
 
 	return sched_ctx;
@@ -195,7 +197,6 @@ unsigned starpu_create_sched_ctx(const char *policy_name, int *workerids_in_ctx,
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_create_sched_ctx(policy_name, workerids_in_ctx, nworkerids_in_ctx, 0, sched_name);
 	_starpu_update_workers(sched_ctx->workerid, sched_ctx->nworkers_in_ctx, -1, sched_ctx);
-	//	printf("ctx %d created\n", sched_ctx->sched_ctx_id);
 	return sched_ctx->sched_ctx_id;
 }
 
@@ -276,7 +277,6 @@ static void _starpu_add_workers_to_sched_ctx(int *new_workers, int nnew_workers,
 
 	_starpu_update_workers(added_workers, n_added_workers, -1, sched_ctx);
 
-	//	printf("ctx %d added workers\n", sched_ctx->sched_ctx_id);
         return;
 }
 
@@ -310,6 +310,7 @@ void _starpu_delete_all_sched_ctxs()
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 	  {
 		struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(i);
+		_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
 		if(sched_ctx->sched_ctx_id != STARPU_NMAX_SCHED_CTXS)
 			free_sched_ctx_mem(sched_ctx);
 	  }
@@ -398,6 +399,8 @@ void starpu_remove_workers_from_sched_ctx(int *workerids_in_ctx, int nworkerids_
 /* unused sched_ctx have the id STARPU_NMAX_SCHED_CTXS */
 void _starpu_init_all_sched_ctx(struct starpu_machine_config_s *config)
 {
+	pthread_key_create(&sched_ctx_key, NULL);
+
 	unsigned i;
 	for(i = 0; i < STARPU_NMAX_SCHED_CTXS; i++)
 		config->sched_ctxs[i].sched_ctx_id = STARPU_NMAX_SCHED_CTXS;
@@ -542,7 +545,6 @@ void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sc
 	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
 }
 
-
 int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx_id, unsigned workerid)
 {
 	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
@@ -568,3 +570,21 @@ pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int w
 	int workerid_ctx = _starpu_get_index_in_ctx_of_workerid(sched_ctx->sched_ctx_id, worker);
 	return (workerid_ctx == -1 ? NULL : sched_ctx->sched_cond[workerid_ctx]);
 }
+
+void starpu_set_sched_ctx(unsigned *sched_ctx)
+{
+	pthread_setspecific(sched_ctx_key, (void*)sched_ctx);
+}
+
+unsigned starpu_get_sched_ctx()
+{
+	unsigned sched_ctx = *(unsigned*)pthread_getspecific(sched_ctx_key);
+	STARPU_ASSERT(sched_ctx >= 0 && sched_ctx < STARPU_NMAX_SCHED_CTXS);
+	return sched_ctx;
+}
+
+unsigned _starpu_get_nsched_ctxs()
+{
+	struct starpu_machine_config_s *config = (struct starpu_machine_config_s *)_starpu_get_machine_config();
+	return config->topology.nsched_ctxs;
+}

+ 3 - 0
src/core/sched_ctx.h

@@ -91,4 +91,7 @@ pthread_mutex_t *_starpu_get_sched_mutex(struct starpu_sched_ctx *sched_ctx, int
 
 /* Get the cond corresponding to the global workerid */
 pthread_cond_t *_starpu_get_sched_cond(struct starpu_sched_ctx *sched_ctx, int worker);
+
+/* Get the total number of sched_ctxs created till now */
+unsigned _starpu_get_nsched_ctxs();
 #endif // __SCHED_CONTEXT_H__

+ 19 - 16
src/core/task.c

@@ -81,6 +81,8 @@ void starpu_task_init(struct starpu_task *task)
 	task->starpu_private = NULL;
 
 	task->sched_ctx = _starpu_get_initial_sched_ctx()->sched_ctx_id;
+	
+	task->specific_starpu = 1;
 }
 
 /* Free all the ressources allocated for a task, without deallocating the task
@@ -226,10 +228,13 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 }
 
 /* application should submit new tasks to StarPU through this function */
-int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx)
+int starpu_task_submit(struct starpu_task *task)
 {
-	task->sched_ctx = sched_ctx;
-	  
+	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
+
+	task->sched_ctx = nsched_ctxs == 1 || task->specific_starpu ? 
+		0 : starpu_get_sched_ctx();
+
 	int ret;
 	unsigned is_sync = task->synchronous;
         _STARPU_LOG_IN();
@@ -308,11 +313,6 @@ int starpu_task_submit_to_ctx(struct starpu_task *task, unsigned sched_ctx)
 	return ret;
 }
 
-int starpu_task_submit(struct starpu_task *task){
-	struct starpu_sched_ctx *sched_ctx = _starpu_get_initial_sched_ctx();
-	return  starpu_task_submit_to_ctx(task, sched_ctx->sched_ctx_id);
-}
-
 void starpu_display_codelet_stats(struct starpu_codelet_t *cl)
 {
 	unsigned worker;
@@ -342,18 +342,21 @@ void starpu_display_codelet_stats(struct starpu_codelet_t *cl)
  */
 int starpu_task_wait_for_all(void)
 {
-	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
-		return -EDEADLK;
+	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
+	unsigned sched_ctx = nsched_ctxs == 1 ? 0 : starpu_get_sched_ctx();
+	starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
 
-	PTHREAD_MUTEX_LOCK(&submitted_mutex);
+	/* if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls())) */
+	/* 	return -EDEADLK; */
 
-	STARPU_TRACE_TASK_WAIT_FOR_ALL;
+	/* PTHREAD_MUTEX_LOCK(&submitted_mutex); */
 
-	while (nsubmitted > 0)
-		PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex);
-	
-	PTHREAD_MUTEX_UNLOCK(&submitted_mutex);
+	/* STARPU_TRACE_TASK_WAIT_FOR_ALL; */
 
+	/* while (nsubmitted > 0) */
+	/* 	PTHREAD_COND_WAIT(&submitted_cond, &submitted_mutex); */
+	
+	/* PTHREAD_MUTEX_UNLOCK(&submitted_mutex); */
 	return 0;
 }
 

+ 4 - 2
src/core/workers.c

@@ -389,11 +389,13 @@ int starpu_init(struct starpu_conf *user_conf)
 
 	/* initialize the scheduling policy */
 
+	struct starpu_sched_ctx *sched_ctx;
 	if(user_conf == NULL)
-	  _starpu_create_sched_ctx(NULL, NULL, -1, 1, "init");
+		sched_ctx = _starpu_create_sched_ctx(NULL, NULL, -1, 1, "init");
 	else
-	  _starpu_create_sched_ctx(user_conf->sched_policy_name, NULL, -1, 1, "init");
+		sched_ctx = _starpu_create_sched_ctx(user_conf->sched_policy_name, NULL, -1, 1, "init");
 
+	starpu_set_sched_ctx(&sched_ctx->sched_ctx_id);
 	_starpu_initialize_registered_performance_models();
 
 	/* Launch "basic" workers (ie. non-combined workers) */

+ 3 - 2
src/drivers/cpu/driver_cpu.c

@@ -199,7 +199,7 @@ void *_starpu_cpu_worker(void *arg)
 
 		struct starpu_sched_ctx *local_sched_ctx = _starpu_get_sched_ctx(j->task->sched_ctx);
 
-        res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
+		res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
 
 		_starpu_set_current_task(NULL);
 
@@ -215,9 +215,10 @@ void *_starpu_cpu_worker(void *arg)
 
 		if (rank == 0){
 			_starpu_handle_job_termination(j, 0);
+		}
 			_starpu_decrement_nsubmitted_tasks_of_worker(cpu_arg->workerid);
 			_starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
-		}
+
         }
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 1 - 0
src/util/starpu_insert_task.c

@@ -77,5 +77,6 @@ int starpu_insert_task(starpu_codelet *cl, ...)
 
 	va_start(varg_list, cl);
         struct starpu_task *task = starpu_task_create();
+	task->specific_starpu = 0;
         return _starpu_insert_task_create_and_submit(arg_buffer, cl, &task, varg_list);
 }

+ 1 - 7
src/util/starpu_insert_task_utils.c

@@ -145,7 +145,6 @@ int _starpu_pack_cl_args(size_t arg_buffer_size, char **arg_buffer, va_list varg
 int _starpu_insert_task_create_and_submit(char *arg_buffer, starpu_codelet *cl, struct starpu_task **task, va_list varg_list) {
         int arg_type;
 	unsigned current_buffer = 0;
-	unsigned ctx = 0;
 
 	struct insert_task_cb_wrapper *cl_arg_wrapper = (struct insert_task_cb_wrapper *) malloc(sizeof(struct insert_task_cb_wrapper));
 	STARPU_ASSERT(cl_arg_wrapper);
@@ -194,11 +193,6 @@ int _starpu_insert_task_create_and_submit(char *arg_buffer, starpu_codelet *cl,
 		else if (arg_type==STARPU_EXECUTE_ON_DATA) {
 			va_arg(varg_list, starpu_data_handle);
 		}
-
-		else if (arg_type==STARPU_CTX) {
-			ctx = va_arg(varg_list, unsigned);
-		}
-
 	}
 
 	va_end(varg_list);
@@ -213,7 +207,7 @@ int _starpu_insert_task_create_and_submit(char *arg_buffer, starpu_codelet *cl,
 	(*task)->callback_func = starpu_task_insert_callback_wrapper;
 	(*task)->callback_arg = cl_arg_wrapper;
 
-	 int ret = starpu_task_submit_to_ctx(*task, ctx);
+	 int ret = starpu_task_submit(*task);
 
 	if (STARPU_UNLIKELY(ret == -ENODEV))
           fprintf(stderr, "submission of task %p wih codelet %p failed (symbol `%s')\n", *task, (*task)->cl, ((*task)->cl->model && (*task)->cl->model->symbol)?(*task)->cl->model->symbol:"none");