Sfoglia il codice sorgente

after merge fixes: missing lines: in heft && worker.c

Andra Hugo 12 anni fa
parent
commit
5f259a5439

+ 1 - 1
examples/cholesky/cholesky_implicit.c

@@ -353,6 +353,6 @@ int main(int argc, char **argv)
 
 	starpu_helper_cublas_shutdown();
 	starpu_shutdown();
-
+	printf("af shutdown\n");
 	return ret;
 }

+ 39 - 14
src/core/sched_ctx.c

@@ -19,7 +19,7 @@
 #include <common/utils.h>
 
 extern struct worker_collection worker_list;
-static pthread_mutex_t sched_ctx_manag = PTHREAD_MUTEX_INITIALIZER;
+static _starpu_pthread_mutex_t sched_ctx_manag = PTHREAD_MUTEX_INITIALIZER;
 struct starpu_task stop_submission_task = STARPU_TASK_INITIALIZER;
 pthread_key_t sched_ctx_key;
 unsigned with_hypervisor = 0;
@@ -161,6 +161,30 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 
 	int i = 0;
 
+
+	if(nworkers == -1)
+	{
+		int nrem_workers = 0;
+		int rem_workers[STARPU_NMAXWORKERS];
+
+		if(workers->init_cursor)
+			workers->init_cursor(workers);
+
+		int worker = -1;
+		while(workers->has_next(workers))
+		{
+			worker = workers->get_next(workers);
+			rem_workers[nrem_workers++] = worker;
+		}
+
+		if(workers->init_cursor)
+			workers->deinit_cursor(workers);
+
+		if(nrem_workers > 0)
+			sched_ctx->sched_policy->remove_workers(sched_ctx->id, rem_workers, nrem_workers);
+		return;
+	}
+
 	for(i = 0; i < nworkers; i++)
 	{
 		if(workers->nworkers > 0)
@@ -169,10 +193,10 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 			if(worker >= 0)
 				removed_workers[(*n_removed_workers)++] = worker;
 		}
+		if(*n_removed_workers)
+			sched_ctx->sched_policy->remove_workers(sched_ctx->id, removed_workers, *n_removed_workers);
 	}
 
-	if(*n_removed_workers)
-		sched_ctx->sched_policy->remove_workers(sched_ctx->id, removed_workers, *n_removed_workers);
 	return;
 }
 
@@ -210,8 +234,8 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 	_starpu_barrier_counter_init(&sched_ctx->tasks_barrier, 0);
 
 	/* initialise all sync structures bc the number of workers can modify */
-	sched_ctx->sched_mutex = (pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_mutex_t*));
-	sched_ctx->sched_cond = (pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(pthread_cond_t*));
+	sched_ctx->sched_mutex = (_starpu_pthread_mutex_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_mutex_t*));
+	sched_ctx->sched_cond = (_starpu_pthread_cond_t**)malloc(STARPU_NMAXWORKERS * sizeof(_starpu_pthread_cond_t*));
 
 	
 	/*init the strategy structs and the worker_collection of the ressources of the context */
@@ -469,6 +493,7 @@ void _starpu_delete_all_sched_ctxs()
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
+			_starpu_remove_workers_from_sched_ctx(sched_ctx, NULL, -1, NULL, NULL);
 			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
 			_starpu_delete_sched_ctx(sched_ctx);
 		}
@@ -690,15 +715,15 @@ void* starpu_get_sched_ctx_policy_data(unsigned sched_ctx_id)
 	return sched_ctx->policy_data;
 }
 
-pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int workerid)
+_starpu_pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int workerid)
 {
-        if(sched_ctx->sched_mutex)
-                return sched_ctx->sched_mutex[workerid];
+	if(sched_ctx->sched_mutex)
+		return sched_ctx->sched_mutex[workerid];
 	else
-                return NULL;
+		return NULL;
 }
 
-void starpu_worker_set_sched_condition(unsigned sched_ctx_id, int workerid, pthread_mutex_t *sched_mutex, pthread_cond_t *sched_cond)
+void starpu_worker_set_sched_condition(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t *sched_mutex, _starpu_pthread_cond_t *sched_cond)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	if(sched_ctx->sched_mutex && sched_ctx->sched_cond)
@@ -708,7 +733,7 @@ void starpu_worker_set_sched_condition(unsigned sched_ctx_id, int workerid, pthr
 	}
 }
 
-void starpu_worker_get_sched_condition(unsigned sched_ctx_id, int workerid, pthread_mutex_t **sched_mutex, pthread_cond_t **sched_cond)
+void starpu_worker_get_sched_condition(unsigned sched_ctx_id, int workerid, _starpu_pthread_mutex_t **sched_mutex, _starpu_pthread_cond_t **sched_cond)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	*sched_mutex = sched_ctx->sched_mutex[workerid];
@@ -728,8 +753,8 @@ void starpu_worker_get_sched_condition(unsigned sched_ctx_id, int workerid, pthr
 void starpu_worker_init_sched_condition(unsigned sched_ctx_id, int workerid)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
-	sched_ctx->sched_mutex[workerid] = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
-	sched_ctx->sched_cond[workerid] = (pthread_cond_t*)malloc(sizeof(pthread_cond_t));
+	sched_ctx->sched_mutex[workerid] = (_starpu_pthread_mutex_t*)malloc(sizeof(_starpu_pthread_mutex_t));
+	sched_ctx->sched_cond[workerid] = (_starpu_pthread_cond_t*)malloc(sizeof(_starpu_pthread_cond_t));
 	_STARPU_PTHREAD_MUTEX_INIT(sched_ctx->sched_mutex[workerid], NULL);
 	_STARPU_PTHREAD_COND_INIT(sched_ctx->sched_cond[workerid], NULL);
 }
@@ -805,7 +830,7 @@ int starpu_get_workers_of_sched_ctx(unsigned sched_ctx_id, int *pus, enum starpu
 	return npus;
 }
 
-pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
+_starpu_pthread_mutex_t* starpu_get_changing_ctx_mutex(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	return &sched_ctx->changing_ctx_mutex;

+ 7 - 7
src/core/sched_ctx.h

@@ -44,7 +44,7 @@ struct _starpu_sched_ctx {
 	struct worker_collection *workers;
 	
 	/* mutex for temp_nworkers_in_ctx*/
-	pthread_mutex_t changing_ctx_mutex;
+	_starpu_pthread_mutex_t changing_ctx_mutex;
 
 	/* we keep an initial sched which we never delete */
 	unsigned is_initial_sched; 
@@ -53,22 +53,22 @@ struct _starpu_sched_ctx {
 	struct _starpu_barrier_counter tasks_barrier;
 
 	/* table of sched cond corresponding to each worker in this ctx */
-	pthread_cond_t **sched_cond;
+	_starpu_pthread_cond_t **sched_cond;
 
 	/* table of sched mutex corresponding to each worker in this ctx */
-	pthread_mutex_t **sched_mutex;
+	_starpu_pthread_mutex_t **sched_mutex;
 
 	/* cond to block push when there are no workers in the ctx */
-	pthread_cond_t no_workers_cond;
+	_starpu_pthread_cond_t no_workers_cond;
 
 	/* mutex to block push when there are no workers in the ctx */
-	pthread_mutex_t no_workers_mutex;
+	_starpu_pthread_mutex_t no_workers_mutex;
 
 	/*ready tasks that couldn't be pushed because the ctx has no workers*/
 	struct starpu_task_list empty_ctx_tasks;
 
 	/* mutext protecting empty_ctx_tasks list */
-	pthread_mutex_t empty_ctx_mutex; 
+	_starpu_pthread_mutex_t empty_ctx_mutex; 
 
 	/* min CPUs to execute*/
 	int min_ncpus;
@@ -122,7 +122,7 @@ int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);
 unsigned _starpu_get_nsched_ctxs();
 
 /* Get the mutex corresponding to the global workerid */
-pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int worker);
+_starpu_pthread_mutex_t *_starpu_get_sched_mutex(struct _starpu_sched_ctx *sched_ctx, int worker);
 
 /* Get workers belonging to a certain context, it returns the number of workers 
  take care: no mutex taken, the list of workers might not be updated */

+ 2 - 1
src/core/sched_policy.c

@@ -515,7 +515,7 @@ pick:
 	if(!task)
 	{
 		struct _starpu_sched_ctx *sched_ctx;
-		pthread_mutex_t *sched_ctx_mutex;
+		_starpu_pthread_mutex_t *sched_ctx_mutex;
 		
 		int been_here[STARPU_NMAX_SCHED_CTXS];
 		int i;
@@ -545,6 +545,7 @@ pick:
 
 			if((!task && sched_ctx->pop_counter[worker->workerid] == 0 && been_here[sched_ctx->id]) || worker->nsched_ctxs == 1)
 				break;
+
 			
 			been_here[sched_ctx->id] = 1;
 			

+ 4 - 0
src/core/workers.c

@@ -766,6 +766,10 @@ int starpu_init(struct starpu_conf *user_conf)
 
 	_starpu_init_tags();
 
+#ifdef STARPU_USE_FXT
+	_starpu_start_fxt_profiling();
+#endif
+
 	ret = _starpu_build_topology(&config);
 	if (ret)
 	{

+ 2 - 3
src/sched_policies/heft.c

@@ -286,7 +286,7 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 		}
 	}
 
-	#ifdef HAVE_AYUDAME_H
+#ifdef HAVE_AYUDAME_H
 	if (AYU_event) {
 		int id = best_workerid;
 		AYU_event(AYU_ADDTASKTOQUEUE, _starpu_get_job_associated_to_task(task)->job_id, &id);
@@ -614,7 +614,6 @@ static struct starpu_task *heft_pop_task(unsigned sched_ctx_id)
 		fifo->exp_start = starpu_timing_now() + model;
 		fifo->exp_end = fifo->exp_start + fifo->exp_len;
 	}
-
 	return task;
 }
 
@@ -631,7 +630,7 @@ struct starpu_sched_policy _starpu_sched_heft_policy =
 	.deinit_sched = heft_deinit,
 	.push_task = heft_push_task,
 	.push_task_notify = heft_push_task_notify,
-	.pop_task = NULL,
+	.pop_task = heft_pop_task,
 	.pop_every_task = NULL,
 	.pre_exec_hook = heft_pre_exec_hook,
 	.post_exec_hook = NULL,