Browse Source

fixed policies and completly remove the fields: mutex & cond of sched_ctx

Andra Hugo 12 years ago
parent
commit
dbc4b33af8

+ 2 - 26
examples/scheduler/dummy_sched.c

@@ -27,30 +27,6 @@ typedef struct dummy_sched_data {
 	pthread_cond_t sched_cond;
 } dummy_sched_data;
 
-static void dummy_sched_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	
-	unsigned i;
-	int workerid;
-	for(i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &data->sched_mutex,  &data->sched_cond);
-	}
-}
-
-static void dummy_sched_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int workerid;
-	for(i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL,  NULL);
-	}
-}
-
 static void init_dummy_sched(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
@@ -116,8 +92,8 @@ static struct starpu_task *pop_task_dummy(unsigned sched_ctx_id)
 static struct starpu_sched_policy dummy_sched_policy =
 {
 	.init_sched = init_dummy_sched,
-	.add_workers = dummy_sched_add_workers,
-	.remove_workers = dummy_sched_remove_workers,
+	.add_workers = NULL,
+	.remove_workers = NULL,
 	.deinit_sched = deinit_dummy_sched,
 	.push_task = push_task_dummy,
 	.pop_task = pop_task_dummy,

+ 0 - 21
include/starpu_sched_ctx.h

@@ -95,27 +95,6 @@ void starpu_sched_ctx_set_policy_data(unsigned sched_ctx_id, void *policy_data);
 
 void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id);
 
-/* When there is no available task for a worker, StarPU blocks this worker on a
-condition variable. This function specifies which condition variable (and the
-associated mutex) should be used to block (and to wake up) a worker. Note that
-multiple workers may use the same condition variable. For instance, in the case
-of a scheduling strategy with a single task queue, the same condition variable
-would be used to block and wake up all workers.  The initialization method of a
-scheduling strategy (init_sched) must call this function once per worker. */
-#if !defined(_MSC_VER) && !defined(STARPU_SIMGRID)
-#ifdef STARPU_DEVEL
-#warning do we really need both starpu_sched_ctx_set_worker_mutex_and_cond and starpu_sched_ctx_init_worker_mutex_and_cond functions
-#endif
-
-void starpu_sched_ctx_set_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond);
-
-void starpu_sched_ctx_get_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond);
-#endif
-
-void starpu_sched_ctx_init_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid);
-
-void starpu_sched_ctx_deinit_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid);
-
 struct starpu_sched_ctx_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, int type);
 
 void starpu_sched_ctx_delete_worker_collection(unsigned sched_ctx_id);

+ 0 - 63
src/core/sched_ctx.c

@@ -229,11 +229,6 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	sched_ctx->finished_submit = 0;
 
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
-
-	/* initialise all sync structures bc the number of workers can modify */
-	sched_ctx->sched_mutex = (_starpu_pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_mutex_t*));
-	sched_ctx->sched_cond = (_starpu_pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_cond_t*));
-
 	
 	/*init the strategy structs and the worker_collection of the ressources of the context */
 	_starpu_init_sched_policy(config, sched_ctx, policy_name);
@@ -444,12 +439,7 @@ static void _starpu_delete_sched_ctx(struct _starpu_sched_ctx *sched_ctx)
 	STARPU_ASSERT(sched_ctx->id != STARPU_NMAX_SCHED_CTXS);
 	_starpu_deinit_sched_policy(sched_ctx);		
 	free(sched_ctx->sched_policy);
-	free(sched_ctx->sched_mutex);
-	free(sched_ctx->sched_cond);
-
 	sched_ctx->sched_policy = NULL;
-	sched_ctx->sched_mutex = NULL;
-	sched_ctx->sched_cond = NULL;
 
 	_STARPU_PTHREAD_MUTEX_DESTROY(&sched_ctx->empty_ctx_mutex);
 	sched_ctx->id = STARPU_NMAX_SCHED_CTXS;
@@ -771,59 +761,6 @@ void* starpu_sched_ctx_get_policy_data(unsigned sched_ctx_id)
 	return sched_ctx->policy_data;
 }
 
-_starpu_pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int workerid)
-{
-	if(sched_ctx->sched_mutex)
-		return sched_ctx->sched_mutex[workerid];
-	else
-		return NULL;
-}
-
-void starpu_sched_ctx_set_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t *sched_mutex, _starpu_pthread_cond_t *sched_cond)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	if(sched_ctx->sched_mutex && sched_ctx->sched_cond)
-	{
-		sched_ctx->sched_mutex[workerid] = sched_mutex;
-		sched_ctx->sched_cond[workerid] = sched_cond;
-	}
-}
-
-void starpu_sched_ctx_get_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t **sched_mutex, _starpu_pthread_cond_t **sched_cond)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	*sched_mutex = sched_ctx->sched_mutex[workerid];
-	*sched_cond = sched_ctx->sched_cond[workerid];
-
-	/* the tasks concerning changings of the the ctxs were not executed in order */
-	if(!*sched_mutex)
-	{
-		struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
-		*sched_mutex = &workerarg->sched_mutex;
-		*sched_cond = &workerarg->sched_cond;
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, *sched_mutex, *sched_cond);
-	}
-
-}
-
-void starpu_sched_ctx_init_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	sched_ctx->sched_mutex[workerid] = (_starpu_pthread_mutex_t*)malloc(sizeof(_starpu_pthread_mutex_t));
-	sched_ctx->sched_cond[workerid] = (_starpu_pthread_cond_t*)malloc(sizeof(_starpu_pthread_cond_t));
-	_STARPU_PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
-	_STARPU_PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
-}
-
-void starpu_sched_ctx_deinit_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid)
-{
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	_STARPU_PTHREAD_MUTEX_DESTROY(sched_ctx->sched_mutex[workerid]);
-	_STARPU_PTHREAD_COND_DESTROY(sched_ctx->sched_cond[workerid]);
-	free(sched_ctx->sched_mutex[workerid]);
-	free(sched_ctx->sched_cond[workerid]);
-}
-
 struct starpu_sched_ctx_worker_collection* starpu_sched_ctx_create_worker_collection(unsigned sched_ctx_id, int worker_collection_type)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);

+ 0 - 6
src/core/sched_ctx.h

@@ -54,12 +54,6 @@ struct _starpu_sched_ctx
 	/* wait for the tasks submitted to the context to be executed */
 	struct _starpu_barrier_counter tasks_barrier;
 
-	/* table of sched cond corresponding to each worker in this ctx */
-	_starpu_pthread_cond_t **sched_cond;
-
-	/* table of sched mutex corresponding to each worker in this ctx */
-	_starpu_pthread_mutex_t **sched_mutex;
-
 	/* cond to block push when there are no workers in the ctx */
 	_starpu_pthread_cond_t no_workers_cond;
 

+ 0 - 6
src/core/workers.h

@@ -246,12 +246,6 @@ void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status);
 /* TODO move */
 unsigned _starpu_execute_registered_progression_hooks(void);
 
-#if defined(_MSC_VER) || defined(STARPU_SIMGRID)
-void starpu_sched_ctx_set_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t *sched_mutex, _starpu_pthread_cond_t *sched_cond);
-
-void starpu_sched_ctx_get_worker_mutex_and_cond(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t **sched_mutex, _starpu_pthread_cond_t **sched_cond);
-#endif
-
 /* We keep an initial sched ctx which might be used in case no other ctx is available */
 struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void);
 

+ 5 - 18
src/sched_policies/parallel_heft.c

@@ -74,8 +74,7 @@ static void parallel_heft_pre_exec_hook(struct starpu_task *task)
 
 	_starpu_pthread_mutex_t *sched_mutex;
 	_starpu_pthread_cond_t *sched_cond;
-	starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, workerid, &sched_mutex, &sched_cond);
-
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
 	/* Once we have executed the task, we can update the predicted amount
 	 * of work. */
 	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
@@ -109,7 +108,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		task->predicted_transfer = 0;
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_cond_t *sched_cond;
-		starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, best_workerid, &sched_mutex, &sched_cond);
+		starpu_worker_get_sched_condition(best_workerid, &sched_mutex, &sched_cond);
 
 		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 		worker_exp_len[best_workerid] += task->predicted;
@@ -163,7 +162,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 			alias->predicted_transfer = 0;
 			_starpu_pthread_mutex_t *sched_mutex;
 			_starpu_pthread_cond_t *sched_cond;
-			starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, local_worker, &sched_mutex, &sched_cond);
+			starpu_worker_get_sched_condition(local_worker, &sched_mutex, &sched_cond);
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			worker_exp_len[local_worker] += alias->predicted;
 			worker_exp_end[local_worker] = exp_end_predicted;
@@ -286,7 +285,7 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 		{
 			_starpu_pthread_mutex_t *sched_mutex;
 			_starpu_pthread_cond_t *sched_cond;
-			starpu_sched_ctx_get_worker_mutex_and_cond(sched_ctx_id, worker, &sched_mutex, &sched_cond);
+			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 			/* Sometimes workers didn't take the tasks as early as we expected */
 			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			worker_exp_start[worker] = STARPU_MAX(worker_exp_start[worker], starpu_timing_now());
@@ -502,8 +501,6 @@ static void parallel_heft_add_workers(unsigned sched_ctx_id, int *workerids, uns
 			ntasks[workerid] = 0;
 			workerarg->has_prev_init = 1;
 		}
-
-		starpu_sched_ctx_init_worker_mutex_and_cond(sched_ctx_id, workerid);
 	}
 	_starpu_sched_find_worker_combinations(workerids, nworkers);
 
@@ -536,16 +533,6 @@ static void parallel_heft_add_workers(unsigned sched_ctx_id, int *workerids, uns
 
 }
 
-static void parallel_heft_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int worker;
-	for(i = 0; i < nworkers; i++)
-	{
-		worker = workerids[i];
-		starpu_sched_ctx_deinit_worker_mutex_and_cond(sched_ctx_id, worker);
-	}
-}
 static void initialize_parallel_heft_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
@@ -591,7 +578,7 @@ struct starpu_sched_policy _starpu_sched_parallel_heft_policy =
 	.init_sched = initialize_parallel_heft_policy,
 	.deinit_sched = parallel_heft_deinit,
 	.add_workers = parallel_heft_add_workers,
-	.remove_workers = parallel_heft_remove_workers,
+	.remove_workers = NULL,
 	.push_task = parallel_heft_push_task,
 	.pop_task = NULL,
 	.pre_exec_hook = parallel_heft_pre_exec_hook,

+ 2 - 26
src/sched_policies/random_policy.c

@@ -101,30 +101,6 @@ static int random_push_task(struct starpu_task *task)
         return ret_val;
 }
 
-static void random_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		struct _starpu_worker *workerarg = _starpu_get_worker_struct(workerid);
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, &workerarg->sched_mutex, &workerarg->sched_cond);
-	}
-}
-
-static void random_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
-{
-	unsigned i;
-	int workerid;
-	for (i = 0; i < nworkers; i++)
-	{
-		workerid = workerids[i];
-		starpu_sched_ctx_set_worker_mutex_and_cond(sched_ctx_id, workerid, NULL, NULL);
-	}
-
-}
-
 static void initialize_random_policy(unsigned sched_ctx_id)
 {
 	starpu_sched_ctx_create_worker_collection(sched_ctx_id, WORKER_LIST);
@@ -139,8 +115,8 @@ static void deinitialize_random_policy(unsigned sched_ctx_id)
 struct starpu_sched_policy _starpu_sched_random_policy =
 {
 	.init_sched = initialize_random_policy,
-	.add_workers = random_add_workers,
-	.remove_workers = random_remove_workers,
+	.add_workers = NULL,
+	.remove_workers = NULL,
 	.deinit_sched = deinitialize_random_policy,
 	.push_task = random_push_task,
 	.pop_task = NULL,