Selaa lähdekoodia

replace tasks_increments

Andra Hugo 13 vuotta sitten
vanhempi
commit
19dba4c51f

+ 9 - 2
src/core/jobs.c

@@ -135,10 +135,11 @@ void _starpu_wait_job(starpu_job_t j)
         _STARPU_LOG_OUT();
 }
 
-void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked)
+void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked, int workerid)
 {
 	struct starpu_task *task = j->task;
-
+	unsigned sched_ctx = task->sched_ctx;
+	
 	if (!job_is_already_locked)
 		PTHREAD_MUTEX_LOCK(&j->sync_mutex);
 
@@ -234,6 +235,12 @@ void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_lock
 	else {
 		_starpu_decrement_nsubmitted_tasks();
 	}
+	if(workerid >= 0)
+	{
+		_starpu_decrement_nsubmitted_tasks_of_worker(workerid);
+		_starpu_decrement_nsubmitted_tasks_of_sched_ctx(sched_ctx);
+	}
+			
 }
 
 /* This function is called when a new task is submitted to StarPU 

+ 1 - 1
src/core/jobs.h

@@ -149,7 +149,7 @@ unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j, unsigned job_is
 
 /* This function must be called after the execution of a job, this triggers all
  * job's dependencies and perform the callback function if any. */
-void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked);
+void _starpu_handle_job_termination(starpu_job_t j, unsigned job_is_already_locked, int workerid);
 
 /* Get the sum of the size of the data accessed by the job. */
 size_t _starpu_job_get_data_size(starpu_job_t j);

+ 5 - 2
src/core/sched_ctx.c

@@ -535,13 +535,16 @@ int starpu_wait_for_all_tasks_of_sched_ctx(unsigned sched_ctx_id)
 	return _starpu_barrier_counter_wait_for_empty_counter(&sched_ctx->tasks_barrier);
 }
 
-void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 	_starpu_barrier_counter_decrement_until_empty_counter(&sched_ctx->tasks_barrier);
+//	printf("%d: %d\n", sched_ctx_id, sched_ctx->tasks_barrier.barrier.reached_start);
 }
 
-void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx)
+void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id)
 {
+	struct starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx(sched_ctx_id);
 	_starpu_barrier_counter_increment(&sched_ctx->tasks_barrier);
 }
 

+ 2 - 2
src/core/sched_ctx.h

@@ -80,8 +80,8 @@ void _starpu_increment_nsubmitted_tasks_of_worker(int workerid);
 
 /* In order to implement starpu_wait_for_all_tasks_of_ctx, we keep track of the number of 
  * task currently submitted to the context */
-void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
-void _starpu_increment_nsubmitted_tasks_of_sched_ctx(struct starpu_sched_ctx *sched_ctx);
+void _starpu_decrement_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
+void _starpu_increment_nsubmitted_tasks_of_sched_ctx(unsigned sched_ctx_id);
 
 /* Return the corresponding index of the workerid in the ctx table */
 int _starpu_get_index_in_ctx_of_workerid(unsigned sched_ctx, unsigned workerid);

+ 2 - 2
src/core/sched_policy.c

@@ -217,6 +217,7 @@ void _starpu_deinit_sched_policy(struct starpu_machine_config_s *config, struct
  * each worker of the combination. */
 static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int workerid)
 {
+	_starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
 	int nbasic_workers = (int)starpu_worker_get_count();
 
 	/* Is this a basic worker or a combined worker ? */
@@ -296,7 +297,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 	 * corresponding dependencies */
 	if (task->cl == NULL)
 	{
-		_starpu_handle_job_termination(j, job_is_already_locked);
+		_starpu_handle_job_termination(j, job_is_already_locked, -1);
                 _STARPU_LOG_OUT_TAG("handle_job_termination");
 		return 0;
 	}
@@ -304,7 +305,6 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
         int ret;
 	if (STARPU_UNLIKELY(task->execute_on_a_specific_worker))
 	{
-		_starpu_increment_nsubmitted_tasks_of_worker(task->workerid);
 		ret = _starpu_push_task_on_specific_worker(task, task->workerid);
 	}
 	else {

+ 1 - 1
src/core/task.c

@@ -212,7 +212,7 @@ int _starpu_submit_job(starpu_job_t j, unsigned do_not_increment_nsubmitted)
 
 	if (!do_not_increment_nsubmitted){
 		_starpu_increment_nsubmitted_tasks();
-		_starpu_increment_nsubmitted_tasks_of_sched_ctx(_starpu_get_sched_ctx(j->task->sched_ctx));
+		_starpu_increment_nsubmitted_tasks_of_sched_ctx(j->task->sched_ctx);
 	}
 
 	PTHREAD_MUTEX_LOCK(&j->sync_mutex);

+ 1 - 6
src/drivers/cpu/driver_cpu.c

@@ -197,8 +197,6 @@ void *_starpu_cpu_worker(void *arg)
 
 		_starpu_set_current_task(j->task);
 
-		struct starpu_sched_ctx *local_sched_ctx = _starpu_get_sched_ctx(j->task->sched_ctx);
-
 		res = execute_job_on_cpu(j, cpu_arg, is_parallel_task, rank, perf_arch);
 
 		_starpu_set_current_task(NULL);
@@ -214,11 +212,8 @@ void *_starpu_cpu_worker(void *arg)
 		}
 
 		if (rank == 0){
-			_starpu_handle_job_termination(j, 0);
+			_starpu_handle_job_termination(j, 0, workerid);
 		}
-			_starpu_decrement_nsubmitted_tasks_of_worker(cpu_arg->workerid);
-			_starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
-
         }
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 1 - 5
src/drivers/cuda/driver_cuda.c

@@ -343,11 +343,7 @@ void *_starpu_cuda_worker(void *arg)
 			}
 		}
 
-		struct starpu_sched_ctx *local_sched_ctx = _starpu_get_sched_ctx(j->task->sched_ctx);
-		_starpu_handle_job_termination(j, 0);
-		_starpu_decrement_nsubmitted_tasks_of_worker(args->workerid);
-		_starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
-
+		_starpu_handle_job_termination(j, 0, workerid);
 	}
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 1 - 5
src/drivers/opencl/driver_opencl.c

@@ -494,11 +494,7 @@ void *_starpu_opencl_worker(void *arg)
 			}
 		}
 
-		struct starpu_sched_ctx *local_sched_ctx = _starpu_get_sched_ctx(j->task->sched_ctx);
-		_starpu_handle_job_termination(j, 0);
-		_starpu_decrement_nsubmitted_tasks_of_worker(args->workerid);
-                _starpu_decrement_nsubmitted_tasks_of_sched_ctx(local_sched_ctx);
-
+		_starpu_handle_job_termination(j, 0, workerid);
 	}
 
 	STARPU_TRACE_WORKER_DEINIT_START

+ 12 - 14
src/sched_policies/heft.c

@@ -151,20 +151,18 @@ static void heft_init(unsigned sched_ctx_id)
 static void heft_post_exec_hook(struct starpu_task *task, unsigned sched_ctx_id)
 {
 	int workerid = starpu_worker_get_id();
-	if(workerid >= 0)
-	  {
-		struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
-		double model = task->predicted;
-		
-		/* Once we have executed the task, we can update the predicted amount
-		 * of work. */
-		PTHREAD_MUTEX_LOCK(worker->sched_mutex);
-		exp_len[workerid] -= model;
-		exp_start[workerid] = starpu_timing_now() + model;
-		exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
-		ntasks[workerid]--;
-		PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
-	  }
+	STARPU_ASSERT(workerid >= 0);
+	struct starpu_worker_s *worker = _starpu_get_worker_struct(workerid);
+	double model = task->predicted;
+	
+	/* Once we have executed the task, we can update the predicted amount
+	 * of work. */
+	PTHREAD_MUTEX_LOCK(worker->sched_mutex);
+	exp_len[workerid] -= model;
+	exp_start[workerid] = starpu_timing_now() + model;
+	exp_end[workerid] = exp_start[workerid] + exp_len[workerid];
+	ntasks[workerid]--;
+	PTHREAD_MUTEX_UNLOCK(worker->sched_mutex);
 }
 
 static void heft_push_task_notify(struct starpu_task *task, int workerid, unsigned sched_ctx_id)