Browse Source

update peager with new synchro scheme

Olivier Aumage 8 years ago
parent
commit
fa947f5c97

+ 0 - 2
examples/mandelbrot/mandelbrot.c

@@ -499,8 +499,6 @@ int main(int argc, char **argv)
 	if (use_spmd_p)
 	{
 		conf.sched_policy_name = "peager";
-#warning "peager needs update with new synchro scheme"
-		return 77;
 	}
 
 	ret = starpu_init(&conf);

+ 1 - 1
src/core/sched_policy.c

@@ -73,13 +73,13 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_parallel_heft_policy,
-	&_starpu_sched_peager_policy,
 	&_starpu_sched_heteroprio_policy,
 	&_starpu_sched_graph_test_policy,
 #else
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,
+	&_starpu_sched_peager_policy,
 #warning TODO: update sched policies with new synchro scheme
 #endif
 	NULL

+ 15 - 6
src/sched_policies/parallel_eager.c

@@ -179,6 +179,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 
 static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 {
+	_starpu_worker_enter_section_safe_for_observation();
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	int workerid = starpu_worker_get_id_check();
@@ -188,6 +189,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 	{
 		struct starpu_task *task = NULL;
 		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		_starpu_worker_leave_section_safe_for_observation();
 		task = _starpu_fifo_pop_task(data->fifo, workerid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
@@ -199,16 +201,17 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 	//_STARPU_DEBUG("workerid:%d, master:%d\n",workerid,master);
 
 
+	struct starpu_task *task = NULL;
 	if (master == workerid)
 	{
 		/* The worker is a master */
-		struct starpu_task *task = NULL;
 		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		_starpu_worker_leave_section_safe_for_observation();
 		task = _starpu_fifo_pop_task(data->fifo, workerid);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 		if (!task)
-			return NULL;
+			goto ret;
 
 		/* Find the largest compatible worker combination */
 		int best_size = -1;
@@ -231,7 +234,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		 * worker take it anyway, so that it can discard it afterward.
 		 * */
 		if (best_workerid == -1)
-			return task;
+			goto ret;
 
 		/* Is this a basic worker or a combined worker ? */
 		int nbasic_workers = (int)starpu_worker_get_count();
@@ -240,7 +243,7 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 		{
 
 			/* The master is alone */
-			return task;
+			goto ret;
 		}
 		else
 		{
@@ -274,14 +277,20 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 			/* The master also manipulated an alias */
 			struct starpu_task *master_alias = starpu_task_dup(task);
 			master_alias->destroy = 1;
-			return master_alias;
+			task = master_alias;
+			goto ret;
 		}
 	}
 	else
 	{
 		/* The worker is a slave */
-		return _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
+		STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		_starpu_worker_leave_section_safe_for_observation();
+		task = _starpu_fifo_pop_task(data->local_fifo[workerid], workerid);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 	}
+ret:
+	return task;
 }
 
 struct starpu_sched_policy _starpu_sched_peager_policy =

+ 0 - 2
tests/parallel_tasks/spmd_peager.c

@@ -72,8 +72,6 @@ int main(int argc, char **argv)
         struct starpu_conf conf;
 	starpu_conf_init(&conf);
         conf.sched_policy_name = "peager";
-#warning "peager needs update with new synchro scheme"
-	return 77;
 
 	ret = starpu_init(&conf);
 	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;