Browse Source

refatctoring to use j->task_size = nworkers of ctx

Andra Hugo 10 years ago
parent
commit
46367259cb

+ 4 - 23
src/core/jobs.c

@@ -159,40 +159,22 @@ void _starpu_wait_job(struct _starpu_job *j)
 void _starpu_handle_job_termination(struct _starpu_job *j)
 {
 	struct starpu_task *task = j->task;
-	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
-	   so pointless to continue */
-	if(!j->task) return;
-
 	unsigned sched_ctx = task->sched_ctx;
 	int workerid = starpu_worker_get_id();
-	/* if parallel task (managed by a context) only the master should execute this function */
-	struct _starpu_sched_ctx *sched_ctx_str = _starpu_get_sched_ctx_struct(sched_ctx);
-	if(!sched_ctx_str->sched_policy && sched_ctx_str->awake_workers)
-	{
-		if(sched_ctx_str->main_master != workerid)
-		{
-			return;
-		}
-		else
-		{
-			STARPU_PTHREAD_BARRIER_DESTROY(&j->before_work_barrier);
-			STARPU_PTHREAD_BARRIER_DESTROY(&j->after_work_barrier);
-		}
-	}
-
 	double flops = task->flops;
 	STARPU_PTHREAD_MUTEX_LOCK(&j->sync_mutex);
-
+	
 	task->status = STARPU_TASK_FINISHED;
-
+	
 	/* We must have set the j->terminated flag early, so that it is
 	 * possible to express task dependencies within the callback
 	 * function. A value of 1 means that the codelet was executed but that
 	 * the callback is not done yet. */
 	j->terminated = 1;
-
+		
 	STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
+
 #ifdef STARPU_USE_SC_HYPERVISOR
 	size_t data_size = 0;
 #endif //STARPU_USE_SC_HYPERVISOR
@@ -220,7 +202,6 @@ void _starpu_handle_job_termination(struct _starpu_job *j)
 				_starpu_spin_unlock(&handle->header_lock);
 		}
 	}
-
 	/* Tell other tasks that we don't exist any more, thus no need for
 	 * implicit dependencies any more.  */
 	_starpu_release_task_enforce_sequential_consistency(j);

+ 19 - 14
src/core/sched_ctx.c

@@ -262,16 +262,16 @@ static void _starpu_add_workers_to_sched_ctx(struct _starpu_sched_ctx *sched_ctx
 		if(!sched_ctx->awake_workers)
 		{
 			if(sched_ctx->main_master == -1)
-				sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, workerids, nworkers);
+				sched_ctx->main_master = starpu_sched_ctx_book_workers_for_task(sched_ctx->id, wa, na);
 			else
 			{
-				_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, workerids, nworkers, sched_ctx->main_master);
+				_starpu_sched_ctx_add_workers_to_master(sched_ctx->id, wa, na, sched_ctx->main_master);
 			}
 		}
 		else
 		{
-			sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, workerids, nworkers);
-			_starpu_sched_ctx_set_master(sched_ctx, workerids, nworkers, sched_ctx->main_master);
+			sched_ctx->main_master = _starpu_sched_ctx_find_master(sched_ctx->id, wa, na);
+			_starpu_sched_ctx_set_master(sched_ctx, wa, na, sched_ctx->main_master);
 		}
 	}
 	else if(sched_ctx->sched_policy->add_workers)
@@ -397,18 +397,21 @@ static void _starpu_remove_workers_from_sched_ctx(struct _starpu_sched_ctx *sche
 
 static void _starpu_sched_ctx_free_scheduling_data(struct _starpu_sched_ctx *sched_ctx)
 {
-	int *workerids = NULL;
-
-	unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
-
-	if(nworkers_ctx > 0 && sched_ctx->sched_policy->remove_workers)
+	if(sched_ctx->sched_policy && sched_ctx->sched_policy->remove_workers)
 	{
-		_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
-		sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
-		_STARPU_TRACE_WORKER_SCHEDULING_POP;
+		int *workerids = NULL;
+		
+		unsigned nworkers_ctx = starpu_sched_ctx_get_workers_list(sched_ctx->id, &workerids);
+		
+		if(nworkers_ctx > 0)
+		{
+			_STARPU_TRACE_WORKER_SCHEDULING_PUSH;
+			sched_ctx->sched_policy->remove_workers(sched_ctx->id, workerids, nworkers_ctx);
+			_STARPU_TRACE_WORKER_SCHEDULING_POP;
+		}
+		
+		free(workerids);
 	}
-
-	free(workerids);
 	return;
 
 }
@@ -2167,6 +2170,8 @@ int starpu_sched_ctx_get_worker_rank(unsigned sched_ctx_id)
 	int curr_workerid = starpu_worker_get_id();
 	int worker;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(sched_ctx_id);
+	if(sched_ctx->sched_policy || !sched_ctx->awake_workers)
+		return -1;
 	struct starpu_worker_collection *workers = sched_ctx->workers;
 
 	struct starpu_sched_ctx_iterator it;

+ 6 - 8
src/core/sched_policy.c

@@ -465,7 +465,10 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				struct starpu_worker_collection *workers = sched_ctx->workers;
 				
 				struct _starpu_job *job = _starpu_get_job_associated_to_task(task);
-				
+				job->task_size = workers->nworkers;
+				job->combined_workerid = -1; // workerid; its a ctx not combined worker
+				job->active_task_alias_count = 0;
+
 				STARPU_PTHREAD_BARRIER_INIT(&job->before_work_barrier, NULL, workers->nworkers);
 				STARPU_PTHREAD_BARRIER_INIT(&job->after_work_barrier, NULL, workers->nworkers);
 				
@@ -481,13 +484,8 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 				while(workers->has_next(workers, &it))
 				{
 					workerid = workers->get_next(workers, &it);
-					if(workerid != sched_ctx->main_master)
-					{
-						struct starpu_task *alias = starpu_task_dup(task);
-						ret |= _starpu_push_task_on_specific_worker(alias, workerid);
-					}
-					else
-						ret |= _starpu_push_task_on_specific_worker(task, workerid);
+					struct starpu_task *alias = starpu_task_dup(task);
+					ret |= _starpu_push_task_on_specific_worker(alias, workerid);
 				}
 			}
 		}

+ 0 - 13
src/datawizard/coherency.c

@@ -817,19 +817,6 @@ enomem:
 
 void _starpu_push_task_output(struct _starpu_job *j)
 {
-	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
-	   so pointless to continue */
-	if(!j->task) return;
-	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(j->task->sched_ctx);
-	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a the ctx of this job \n");
-
-	if (!sched_ctx->sched_policy)
-	{
-		int workerid = starpu_worker_get_id();
-       		if(sched_ctx->main_master != workerid)
-			return;
-	}
-
 	_STARPU_TRACE_START_PUSH_OUTPUT(NULL);
 
 	int profiling = starpu_profiling_status_get();

+ 10 - 7
src/drivers/cpu/driver_cpu.c

@@ -228,13 +228,16 @@ int _starpu_cpu_driver_run_once(struct _starpu_worker *cpu_worker)
 		rank = j->active_task_alias_count++;
 		STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
-		struct _starpu_combined_worker *combined_worker;
-		combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
-
-		cpu_worker->combined_workerid = j->combined_workerid;
-		cpu_worker->worker_size = combined_worker->worker_size;
-		cpu_worker->current_rank = rank;
-		perf_arch = &combined_worker->perf_arch;
+		if(j->combined_workerid != -1)
+		{
+			struct _starpu_combined_worker *combined_worker;
+			combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
+			
+			cpu_worker->combined_workerid = j->combined_workerid;
+			cpu_worker->worker_size = combined_worker->worker_size;
+			cpu_worker->current_rank = rank;
+			perf_arch = &combined_worker->perf_arch;
+		}
 	}
 	else
 	{

+ 20 - 27
src/drivers/driver_common/driver_common.c

@@ -79,27 +79,20 @@ void _starpu_driver_start_job(struct _starpu_worker *worker, struct _starpu_job
 	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker->workerid);
 	if(!sched_ctx->sched_policy)
 	{
-		if(sched_ctx->awake_workers)
+		if(!sched_ctx->awake_workers && sched_ctx->main_master == worker->workerid)
 		{
-			STARPU_PTHREAD_BARRIER_WAIT(&j->before_work_barrier);
-		}
-		else
-		{
-			if(sched_ctx->main_master == worker->workerid)
-			{
-				struct starpu_worker_collection *workers = sched_ctx->workers;
-				struct starpu_sched_ctx_iterator it;
+			struct starpu_worker_collection *workers = sched_ctx->workers;
+			struct starpu_sched_ctx_iterator it;
 
-				if (workers->init_iterator)
-					workers->init_iterator(workers, &it);
-				while (workers->has_next(workers, &it))
+			if (workers->init_iterator)
+				workers->init_iterator(workers, &it);
+			while (workers->has_next(workers, &it))
+			{
+				int _workerid = workers->get_next(workers, &it);
+				if (_workerid != workerid)
 				{
-					int _workerid = workers->get_next(workers, &it);
-					if (_workerid != workerid)
-					{
-						struct _starpu_worker *_worker = _starpu_get_worker_struct(_workerid);
-						_starpu_driver_start_job(_worker, j, &_worker->perf_arch, codelet_start, rank, profiling);
-					}
+					struct _starpu_worker *worker = _starpu_get_worker_struct(_workerid);
+					_starpu_driver_start_job(worker, j, &worker->perf_arch, codelet_start, rank, profiling);
 				}
 			}
 		}
@@ -124,9 +117,6 @@ void _starpu_driver_end_job(struct _starpu_worker *worker, struct _starpu_job *j
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker, j);
 	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker->workerid);
 
-	if(!sched_ctx->sched_policy && sched_ctx->awake_workers)
-		STARPU_PTHREAD_BARRIER_WAIT(&j->after_work_barrier);
-
 	if (!sched_ctx->sched_policy)
 	{
 		if(sched_ctx->main_master == worker->workerid)
@@ -176,9 +166,6 @@ void _starpu_driver_update_job_feedback(struct _starpu_job *j, struct _starpu_wo
 					struct starpu_perfmodel_arch* perf_arch,
 					struct timespec *codelet_start, struct timespec *codelet_end, int profiling)
 {
-	/* if sched_ctx without policy and awake workers, task may be destroyed in handle_job_termination by the master
-	   so pointless to continue */
-	if(!j->task) return;
 	struct _starpu_sched_ctx *sched_ctx = _starpu_sched_ctx_get_sched_ctx_for_worker_and_job(worker, j);
 	STARPU_ASSERT_MSG(sched_ctx != NULL, "there should be a worker %d in the ctx of this job \n", worker->workerid);
 
@@ -310,6 +297,9 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 			if(sched_ctx && sched_ctx->id > 0 && sched_ctx->id < STARPU_NMAX_SCHED_CTXS)
 			{
 				STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->parallel_sect_mutex[workerid]);
+				if(!sched_ctx->sched_policy && sched_ctx->awake_workers) 
+					worker->slave = sched_ctx->main_master != workerid;
+
 				if(sched_ctx->parallel_sect[workerid])
 				{
 					/* don't let the worker sleep with the sched_mutex taken */
@@ -445,9 +435,12 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 					workers[i].current_rank = j->active_task_alias_count++;
 					STARPU_PTHREAD_MUTEX_UNLOCK(&j->sync_mutex);
 
-					combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
-					workers[i].combined_workerid = j->combined_workerid;
-					workers[i].worker_size = combined_worker->worker_size;
+					if(j->combined_workerid != -1)
+					{
+						combined_worker = _starpu_get_combined_worker_struct(j->combined_workerid);
+						workers[i].combined_workerid = j->combined_workerid;
+						workers[i].worker_size = combined_worker->worker_size;
+					}
 				}
 				else
 				{