Browse Source

Cleanup the job queue structure to remove the fields that are meaningless when
there are more than one worker associated to a queue. Instead. The "alpha"
field is now accessible with the _starpu_worker_get_relative_speedup function,
and the "arch" field is given by starpu_worker_get_perf_archtype.

Cédric Augonnet 14 years ago
parent
commit
247f68170a

+ 0 - 5
src/core/mechanisms/queues.h

@@ -41,11 +41,6 @@ struct starpu_jobq_s {
  	 * */
 	struct starpu_job_list_s *(*pop_every_task)(struct starpu_jobq_s *, uint32_t);
 
-	/* this is only relevant if there is a single worker per queue */
-	uint32_t memory_node;
-	enum starpu_perf_archtype arch;
-	float alpha;
-
 	/* for performance analysis purpose */
 	double total_computation_time;
 	double total_communication_time;

+ 33 - 19
src/core/perfmodel/perfmodel.c

@@ -42,6 +42,13 @@ unsigned _starpu_get_calibrate_flag(void)
 	return calibrate_flag;
 }
 
+enum starpu_perf_archtype starpu_worker_get_perf_archtype(int workerid)
+{
+	struct starpu_machine_config_s *config = _starpu_get_machine_config();
+
+	return config->workers[workerid].perf_arch;
+}
+
 /*
  * PER ARCH model
  */
@@ -71,31 +78,38 @@ static double per_arch_task_expected_length(struct starpu_perfmodel_t *model, en
  * Common model
  */
 
+double _starpu_worker_get_relative_speedup(int workerid)
+{
+	double alpha;
+	enum starpu_archtype arch = starpu_worker_get_type(workerid);
+
+	switch (arch) {
+		case STARPU_CPU_WORKER:
+			alpha = STARPU_CPU_ALPHA;
+			break;
+		case STARPU_CUDA_WORKER:
+			alpha = STARPU_CUDA_ALPHA;
+			break;
+	        case STARPU_OPENCL_WORKER:
+                        alpha = STARPU_OPENCL_ALPHA;
+                        break;
+		default:
+			/* perhaps there are various worker types on that queue */
+			alpha = 1.0; // this value is not significant ...
+			break;
+	}
+
+	return alpha;
+}
+
 static double common_task_expected_length(struct starpu_perfmodel_t *model, int workerid, struct starpu_task *task)
 {
 	double exp;
+	double alpha;
 
 	if (model->cost_model) {
-		float alpha;
 		exp = model->cost_model(task->buffers);
-
-		enum starpu_archtype arch = starpu_worker_get_type(workerid);
-
-		switch (arch) {
-			case STARPU_CPU_WORKER:
-				alpha = STARPU_CPU_ALPHA;
-				break;
-			case STARPU_CUDA_WORKER:
-				alpha = STARPU_CUDA_ALPHA;
-				break;
-  		        case STARPU_OPENCL_WORKER:
-	                        alpha = STARPU_OPENCL_ALPHA;
-                                break;
-			default:
-				/* perhaps there are various worker types on that queue */
-				alpha = 1.0; // this value is not significant ...
-				break;
-		}
+		alpha = _starpu_worker_get_relative_speedup(workerid);
 
 		STARPU_ASSERT(alpha != 0.0f);
 

+ 4 - 0
src/core/perfmodel/perfmodel.h

@@ -107,6 +107,10 @@ double _starpu_predict_transfer_time(unsigned src_node, unsigned dst_node, size_
 void _starpu_set_calibrate_flag(unsigned val);
 unsigned _starpu_get_calibrate_flag(void);
 
+double _starpu_worker_get_relative_speedup(int workerid);
+
+enum starpu_perf_archtype starpu_worker_get_perf_archtype(int workerid);
+
 #if defined(STARPU_USE_CUDA)
 int *_starpu_get_cuda_affinity_vector(unsigned gpuid);
 #endif

+ 5 - 2
src/core/policies/deque_modeling_policy.c

@@ -90,7 +90,8 @@ static int _dm_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), starp
 			continue;
 		}
 
-		double local_length = _starpu_job_expected_length(worker, j, queue_array[worker]->arch);
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+		double local_length = _starpu_job_expected_length(worker, j, perf_arch);
 
 		if (local_length == -1.0) 
 		{
@@ -128,8 +129,10 @@ static int _dm_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), starp
 
 	j->predicted = model_best;
 
+	unsigned memory_node = starpu_worker_get_memory_node(best);
+
 	if (_starpu_get_prefetch_flag())
-		_starpu_prefetch_task_input_on_node(task, queue_array[best]->memory_node);
+		_starpu_prefetch_task_input_on_node(task, memory_node);
 
 	if (prio) {
 		return _starpu_fifo_push_prio_task(queue_array[best], j);

+ 5 - 4
src/core/policies/deque_modeling_policy_data_aware.c

@@ -89,10 +89,11 @@ static int _dmda_push_task(struct starpu_jobq_s *q __attribute__ ((unused)) , st
 			continue;
 		}
 
-		local_task_length[worker] = _starpu_job_expected_length(worker,	j, queue_array[worker]->arch);
+		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+		local_task_length[worker] = _starpu_job_expected_length(worker,	j, perf_arch);
 
-		//local_data_penalty[worker] = 0;
-		local_data_penalty[worker] = _starpu_data_expected_penalty(queue_array[worker]->memory_node, task);
+		unsigned memory_node = starpu_worker_get_memory_node(worker);
+		local_data_penalty[worker] = _starpu_data_expected_penalty(memory_node, task);
 
 		if (local_task_length[worker] == -1.0)
 		{
@@ -163,7 +164,7 @@ static int _dmda_push_task(struct starpu_jobq_s *q __attribute__ ((unused)) , st
 	j->predicted = model_best;
 	j->penality = penality_best;
 
-	uint32_t memory_node = queue_array[best]->memory_node;
+	unsigned memory_node = starpu_worker_get_memory_node(best);
 
 	update_data_requests(memory_node, task);
 	

+ 5 - 3
src/core/policies/random_policy.c

@@ -40,7 +40,7 @@ static int _random_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), s
 
 	for (worker = 0; worker < nworkers; worker++)
 	{
-		alpha_sum += queue_array[worker]->alpha;
+		alpha_sum += _starpu_worker_get_relative_speedup(worker);
 	}
 
 	double random = starpu_drand48()*alpha_sum;
@@ -49,13 +49,15 @@ static int _random_push_task(struct starpu_jobq_s *q __attribute__ ((unused)), s
 	double alpha = 0.0;
 	for (worker = 0; worker < nworkers; worker++)
 	{
-		if (alpha + queue_array[worker]->alpha > random) {
+		double worker_alpha = _starpu_worker_get_relative_speedup(worker);
+
+		if (alpha + worker_alpha > random) {
 			/* we found the worker */
 			selected = worker;
 			break;
 		}
 
-		alpha += queue_array[worker]->alpha;
+		alpha += worker_alpha;
 	}
 
 	/* we should now have the best worker in variable "best" */

+ 0 - 25
src/core/workers.c

@@ -85,31 +85,6 @@ static void _starpu_init_worker_queue(struct starpu_worker_s *workerarg)
 
 	PTHREAD_MUTEX_LOCK(&jobq->activity_mutex);
 
-	/* warning : in case there are multiple workers on the same
-	  queue, we overwrite this value so that it is meaningless */
-	jobq->arch = workerarg->perf_arch;
-		
-	switch (workerarg->arch) {
-		case STARPU_CPU_WORKER:
-			jobq->alpha = STARPU_CPU_ALPHA;
-			break;
-		case STARPU_CUDA_WORKER:
-			jobq->alpha = STARPU_CUDA_ALPHA;
-			break;
-		case STARPU_OPENCL_WORKER:
-			jobq->alpha = STARPU_OPENCL_ALPHA;
-			break;
-		case STARPU_GORDON_WORKER:
-			jobq->alpha = STARPU_GORDON_ALPHA;
-			break;
-		default:
-			STARPU_ABORT();
-	}
-
-	/* This is only useful (and meaningful) is there is a single memory
-	 * node "related" to that queue */
-	jobq->memory_node = workerarg->memory_node;
-
 	jobq->total_computation_time = 0.0;
 	jobq->total_communication_time = 0.0;
 	jobq->total_computation_time_error = 0.0;