浏览代码

from Lionel: MCT: place the computation of expected end times in critical section, to avoid race issues in decisions

Computation of perf models predictions can still be parallel.
Samuel Thibault 8 年之前
父节点
当前提交
26a2ac8bef

+ 7 - 2
src/sched_policies/component_heft.c

@@ -90,12 +90,17 @@ static int heft_progress_one(struct starpu_sched_component *component)
 			min_exp_end_with_task[n] = DBL_MAX;
 			max_exp_end_with_task[n] = 0.0;
 
-			nsuitable_components[n] = starpu_mct_compute_expected_times(component, tasks[n],
+			nsuitable_components[n] = starpu_mct_compute_execution_times(component, tasks[n],
+					estimated_lengths + offset,
+					estimated_transfer_length + offset,
+					suitable_components + offset);
+
+			starpu_mct_compute_expected_times(component, tasks[n],
 					estimated_lengths + offset,
 					estimated_transfer_length + offset,
 					estimated_ends_with_task + offset,
 					&min_exp_end_with_task[n], &max_exp_end_with_task[n],
-					suitable_components + offset);
+							  suitable_components + offset, nsuitable_components[n]);
 		}
 
 		int best_task = 0;

+ 23 - 4
src/sched_policies/component_mct.c

@@ -53,9 +53,8 @@ static int mct_push_task(struct starpu_sched_component * component, struct starp
 	int suitable_components[component->nchildren];
 	int nsuitable_components = 0;
 
-	nsuitable_components = starpu_mct_compute_expected_times(component, task,
-			estimated_lengths, estimated_transfer_length, estimated_ends_with_task,
-			&min_exp_end_with_task, &max_exp_end_with_task, suitable_components);
+	nsuitable_components = starpu_mct_compute_execution_times(component, task,
+								  estimated_lengths, estimated_transfer_length, suitable_components);
 
 	/* If no suitable components were found, it means that the perfmodel of
 	 * the task had been purged since it has been pushed on the mct component.
@@ -64,6 +63,15 @@ static int mct_push_task(struct starpu_sched_component * component, struct starp
 	if(nsuitable_components == 0)
 		return 1;
 
+
+	/* Entering critical section to make sure no two workers
+	   make scheduling decisions at the same time */
+	STARPU_PTHREAD_MUTEX_LOCK(&d->scheduling_mutex); 
+
+
+	starpu_mct_compute_expected_times(component, task, estimated_lengths, estimated_transfer_length, 
+					  estimated_ends_with_task, &min_exp_end_with_task, &max_exp_end_with_task, suitable_components, nsuitable_components);
+
 	double best_fitness = DBL_MAX;
 	int best_icomponent = -1;
 	for(i = 0; i < nsuitable_components; i++)
@@ -90,8 +98,10 @@ static int mct_push_task(struct starpu_sched_component * component, struct starp
 	 * the task had been purged since it has been pushed on the mct component.
 	 * We should send a push_fail message to its parent so that it will
 	 * be able to reschedule the task properly. */
-	if(best_icomponent == -1)
+	if(best_icomponent == -1) {
+		STARPU_PTHREAD_MUTEX_UNLOCK(&d->scheduling_mutex); 
 		return 1;
+	}
 
 	best_component = component->children[best_icomponent];
 
@@ -101,11 +111,18 @@ static int mct_push_task(struct starpu_sched_component * component, struct starp
 	if(starpu_sched_component_is_worker(best_component))
 	{
 		best_component->can_pull(best_component);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&d->scheduling_mutex); 
+		
 		return 1;
 	}
 
 	_STARPU_TASK_BREAK_ON(task, sched);
 	int ret = starpu_sched_component_push_task(component, best_component, task);
+	
+	/* I can now exit the critical section: Pushing the task below ensures that its execution 
+	   time will be taken into account for subsequent scheduling decisions */
+	STARPU_PTHREAD_MUTEX_UNLOCK(&d->scheduling_mutex); 
+
 	return ret;
 }
 
@@ -113,6 +130,7 @@ static void mct_component_deinit_data(struct starpu_sched_component * component)
 {
 	STARPU_ASSERT(starpu_sched_component_is_mct(component));
 	struct _starpu_mct_data * d = component->data;
+	STARPU_PTHREAD_MUTEX_DESTROY(&d->scheduling_mutex); 
 	free(d);
 }
 
@@ -127,6 +145,7 @@ struct starpu_sched_component * starpu_sched_component_mct_create(struct starpu_
 	struct _starpu_mct_data *data = starpu_mct_init_parameters(params);
 
 	component->data = data;
+	STARPU_PTHREAD_MUTEX_INIT(&data->scheduling_mutex, NULL); 
 
 	component->push_task = mct_push_task;
 	component->deinit_data = mct_component_deinit_data;

+ 27 - 16
src/sched_policies/helper_mct.c

@@ -119,9 +119,8 @@ double starpu_mct_compute_fitness(struct _starpu_mct_data * d, double exp_end, d
 		+ d->_gamma * d->idle_power * (exp_end - max_exp_end);
 }
 
-int starpu_mct_compute_expected_times(struct starpu_sched_component *component, struct starpu_task *task,
-		double *estimated_lengths, double *estimated_transfer_length, double *estimated_ends_with_task,
-		double *min_exp_end_with_task, double *max_exp_end_with_task, int *suitable_components)
+int starpu_mct_compute_execution_times(struct starpu_sched_component *component, struct starpu_task *task,
+				       double *estimated_lengths, double *estimated_transfer_length, int *suitable_components) 
 {
 	int nsuitable_components = 0;
 	double now = starpu_timing_now();
@@ -136,22 +135,34 @@ int starpu_mct_compute_expected_times(struct starpu_sched_component *component,
 				/* The perfmodel had been purged since the task was pushed
 				 * onto the mct component. */
 				continue;
-
-			/* Estimated availability of worker */
-			double estimated_end = c->estimated_end(c);
-			if (estimated_end < now)
-				estimated_end = now;
 			estimated_transfer_length[i] = starpu_sched_component_transfer_length(c, task);
-			estimated_ends_with_task[i] = compute_expected_time(now,
-									    estimated_end,
-									    estimated_lengths[i],
-									    estimated_transfer_length[i]);
-			if(estimated_ends_with_task[i] < *min_exp_end_with_task)
-				*min_exp_end_with_task = estimated_ends_with_task[i];
-			if(estimated_ends_with_task[i] > *max_exp_end_with_task)
-				*max_exp_end_with_task = estimated_ends_with_task[i];
 			suitable_components[nsuitable_components++] = i;
 		}
 	}
 	return nsuitable_components;
 }
+
+void starpu_mct_compute_expected_times(struct starpu_sched_component *component, struct starpu_task *task,
+		double *estimated_lengths, double *estimated_transfer_length, double *estimated_ends_with_task,
+				       double *min_exp_end_with_task, double *max_exp_end_with_task, int *suitable_components, int nsuitable_components)
+{
+	int i;
+	double now = starpu_timing_now();
+	for(i = 0; i < nsuitable_components; i++)
+	{
+		int icomponent = suitable_components[i];
+		struct starpu_sched_component * c = component->children[icomponent];
+		/* Estimated availability of worker */
+		double estimated_end = c->estimated_end(c);
+		if (estimated_end < now)
+			estimated_end = now;
+		estimated_ends_with_task[icomponent] = compute_expected_time(now,
+								    estimated_end,
+								    estimated_lengths[icomponent],
+								    estimated_transfer_length[icomponent]);
+		if(estimated_ends_with_task[icomponent] < *min_exp_end_with_task)
+			*min_exp_end_with_task = estimated_ends_with_task[icomponent];
+		if(estimated_ends_with_task[icomponent] > *max_exp_end_with_task)
+			*max_exp_end_with_task = estimated_ends_with_task[icomponent];
+	}
+}

+ 7 - 2
src/sched_policies/helper_mct.h

@@ -20,12 +20,17 @@ struct _starpu_mct_data
 	double beta;
 	double _gamma;
 	double idle_power;
+	starpu_pthread_mutex_t scheduling_mutex; 
 };
 
 struct _starpu_mct_data *starpu_mct_init_parameters(struct starpu_sched_component_mct_data *params);
 
-int starpu_mct_compute_expected_times(struct starpu_sched_component *component, struct starpu_task *task,
+int starpu_mct_compute_execution_times(struct starpu_sched_component *component, struct starpu_task *task,
+				       double *estimated_lengths, double *estimated_transfer_length, int *suitable_components);
+
+
+void starpu_mct_compute_expected_times(struct starpu_sched_component *component, struct starpu_task *task,
 		double *estimated_lengths, double *estimated_transfer_length, double *estimated_ends_with_task,
-		double *min_exp_end_with_task, double *max_exp_end_with_task, int *suitable_components);
+				       double *min_exp_end_with_task, double *max_exp_end_with_task, int *suitable_components, int nsuitable_components);
 
 double starpu_mct_compute_fitness(struct _starpu_mct_data * d, double exp_end, double min_exp_end, double max_exp_end, double transfer_len, double local_energy);