浏览代码

refactoring

Andra Hugo 13 年之前
父节点
当前提交
38bcc8f71a

+ 4 - 4
include/starpu_scheduler.h

@@ -263,10 +263,10 @@ double starpu_task_bundle_expected_power(starpu_task_bundle_t bundle, enum starp
 }
 #endif
 
-/* Waits until all the tasks of a worker, already submitted, have been executed */
-int starpu_wait_for_all_tasks_of_worker(int workerid);
+/* /\* Waits until all the tasks of a worker, already submitted, have been executed *\/ */
+/* int starpu_wait_for_all_tasks_of_worker(int workerid); */
 
-/* Waits until all the tasks of a bunch of workers have been executed */
-int starpu_wait_for_all_tasks_of_workers(int *workerids_ctx, int nworkers_ctx);
+/* /\* Waits until all the tasks of a bunch of workers have been executed *\/ */
+/* int starpu_wait_for_all_tasks_of_workers(int *workerids_ctx, int nworkers_ctx); */
 
 #endif /* __STARPU_SCHEDULER_H__ */

+ 0 - 4
include/starpu_task.h

@@ -316,10 +316,6 @@ int starpu_task_wait_for_all(void);
 /* This function waits until there is no more ready task. */
 int starpu_task_wait_for_no_ready(void);
 
-/* This function waits until all the tasks that were already submitted to a specific
- * context have been executed. */
-int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id);
-
 void starpu_codelet_init(struct starpu_codelet *cl);
 
 void starpu_display_codelet_stats(struct starpu_codelet *cl);

+ 2 - 5
src/core/jobs.c

@@ -150,6 +150,7 @@ void _starpu_wait_job(struct _starpu_job *j)
 void _starpu_handle_job_termination(struct _starpu_job *j, int workerid)
 {
 	struct starpu_task *task = j->task;
+	unsigned sched_ctx = task->sched_ctx;
 	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 	task->status = STARPU_TASK_FINISHED;
@@ -212,7 +213,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j, int workerid)
 #endif //STARPU_USE_SCHED_CTX_HYPERVISOR
 	}
 
-
 	_STARPU_TRACE_TASK_DONE(j);
 
 	/* NB: we do not save those values before the callback, in case the
@@ -253,10 +253,7 @@ void _starpu_handle_job_termination(struct _starpu_job *j, int workerid)
 	_starpu_decrement_nsubmitted_tasks();
 	_starpu_decrement_nready_tasks();
 
-	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(task->sched_ctx);
-
-	if(workerid >= 0)
-		_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
+	_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
 }
 
 /* This function is called when a new task is submitted to StarPU

+ 7 - 53
src/core/sched_ctx.c

@@ -289,7 +289,7 @@ void starpu_delete_sched_ctx(unsigned sched_ctx_id, unsigned inheritor_sched_ctx
 		starpu_add_workers_to_sched_ctx(sched_ctx->workers->workerids, sched_ctx->workers->nworkers, inheritor_sched_ctx_id);
 	}
 
-	if(!starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
+	if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx_id))
 		_starpu_delete_sched_ctx(sched_ctx);
 	return;	
 }
@@ -303,8 +303,11 @@ void _starpu_delete_all_sched_ctxs()
 		struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(i);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
-			_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
-			_starpu_delete_sched_ctx(sched_ctx);
+			if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx->id))
+			{
+				_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);
+				_starpu_delete_sched_ctx(sched_ctx);
+			}
 		}
 	}
 	return;
@@ -446,56 +449,7 @@ static unsigned _starpu_worker_get_sched_ctx_id(struct _starpu_worker *worker, u
 	return STARPU_NMAX_SCHED_CTXS;
 }
 
-int starpu_wait_for_all_tasks_of_worker(int workerid)
-{
-	if (STARPU_UNLIKELY(!_starpu_worker_may_perform_blocking_calls()))
-		return -EDEADLK;
-
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	
-	_starpu_barrier_counter_wait_for_empty_counter(&worker->tasks_barrier);
-	
-	return 0;
-}
-
-int starpu_wait_for_all_tasks_of_workers(int *workerids, int nworkers_ctx){
-	int ret_val = 0;
-	
-	struct _starpu_machine_config *config = _starpu_get_machine_config();
-	int nworkers = nworkers_ctx == -1 ? (int)config->topology.nworkers : nworkers_ctx;
-	
-	int workerid = -1;
-	int i, n;
-	
-	for(i = 0; i < nworkers; i++)
-	  {
-		workerid = workerids == NULL ? i : workerids[i];
-		n = starpu_wait_for_all_tasks_of_worker(workerid);
-		ret_val = (ret_val && n);
-	  }
-	
-	return ret_val;
-}
-
-void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid)
-{
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-	
-	_starpu_barrier_counter_decrement_until_empty_counter(&worker->tasks_barrier);
-
-	return;
-}
-
-void _starpu_increment_nsubmitted_tasks_of_worker(int workerid)
-{
-	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
-
-	_starpu_barrier_counter_increment(&worker->tasks_barrier);
-
-	return;
-}
-
-int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
+int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
 	

+ 3 - 3
src/core/sched_ctx.h

@@ -89,9 +89,9 @@ struct _starpu_sched_ctx*  _starpu_create_sched_ctx(const char *policy_name, int
 /* delete all sched_ctx */
 void _starpu_delete_all_sched_ctxs();
 
-/* Keeps track of the number of tasks currently submitted to a worker */
-void _starpu_decrement_nsubmitted_tasks_of_worker(int workerid);
-void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);
+/* This function waits until all the tasks that were already submitted to a specific
+ * context have been executed. */
+int _starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id);
 
 /* In order to implement starpu_wait_for_all_tasks_of_ctx, we keep track of the number of 
  * task currently submitted to the context */

+ 0 - 1
src/core/sched_policy.c

@@ -214,7 +214,6 @@ void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx)
  * each worker of the combination. */
 static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
 {
-	_starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
 	int nbasic_workers = (int)starpu_worker_get_count();
 
 	/* Is this a basic worker or a combined worker ? */

+ 1 - 3
src/core/task.c

@@ -466,7 +466,6 @@ int _starpu_task_submit_nodeps(struct starpu_task *task)
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	_starpu_increment_nsubmitted_tasks();
-//	_starpu_increment_nsubmitted_tasks_of_sched_ctx(task->sched_ctx);
 	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
 	j->submitted = 1;
@@ -508,7 +507,6 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	_starpu_increment_nsubmitted_tasks();
-//	_starpu_increment_nsubmitted_tasks_of_sched_ctx(task->sched_ctx);
 	_STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 	j->submitted = 1;
 	_starpu_increment_nready_tasks();
@@ -579,7 +577,7 @@ int starpu_task_wait_for_all(void)
 {
 	unsigned nsched_ctxs = _starpu_get_nsched_ctxs();
 	unsigned sched_ctx = nsched_ctxs == 1 ? 0 : starpu_get_sched_ctx();
-	starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
+	_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx);
 	return 0;
 }
 

+ 0 - 2
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -409,8 +409,6 @@ static int _dm_push_task(struct starpu_task *task, unsigned prio, unsigned sched
 		best = ntasks_best;
 		model_best = 0.0;
 	}
-	
-	_starpu_increment_nsubmitted_tasks_of_worker(best);
 
 	//_STARPU_DEBUG("Scheduler dm: kernel (%u)\n", best_impl);
 

+ 0 - 2
src/sched_policies/eager_central_policy.c

@@ -109,7 +109,6 @@ static int push_task_eager_policy(struct starpu_task *task)
         while(workers->has_next(workers))
 	{
 		worker = workers->get_next(workers);
-		_starpu_increment_nsubmitted_tasks_of_worker(worker);
 	}
 
 	if(workers->init_cursor)
@@ -143,7 +142,6 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 		while(workers->has_next(workers))
 		{
 			worker = workers->get_next(workers);
-			_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
 		}
 		
 		if(workers->init_cursor)

+ 0 - 1
src/sched_policies/heft.c

@@ -219,7 +219,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
  {
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
-	_starpu_increment_nsubmitted_tasks_of_worker(best_workerid);
 
 	pthread_mutex_t *sched_mutex;
 	pthread_cond_t *sched_cond;

+ 0 - 2
src/sched_policies/parallel_heft.c

@@ -87,8 +87,6 @@ static int push_task_on_best_worker(struct starpu_task *task, int best_workerid,
 	/* make sure someone coule execute that task ! */
 	STARPU_ASSERT(best_workerid != -1);
 
-	_starpu_increment_nsubmitted_tasks_of_worker(best_workerid);
-
 	/* Is this a basic worker or a combined worker ? */
 	int nbasic_workers = starpu_get_nworkers_of_sched_ctx(sched_ctx_id);
 	int is_basic_worker = (best_workerid < nbasic_workers);

+ 0 - 1
src/sched_policies/random_policy.c

@@ -69,7 +69,6 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
                 workers->deinit_cursor(workers);
 
 	/* we should now have the best worker in variable "selected" */
-	_starpu_increment_nsubmitted_tasks_of_worker(selected);
 	int n = starpu_push_local_task(selected, task, prio);
 	return n;
 }