瀏覽代碼

update sched_modular_eager policy with new synchro scheme

Olivier Aumage 8 年之前
父節點
當前提交
0b9ff3f17c

+ 1 - 1
src/core/sched_policy.c

@@ -53,7 +53,6 @@ int starpu_get_prefetch_flag(void)
 static struct starpu_sched_policy *predefined_policies[] =
 {
 #if 0
-	&_starpu_sched_modular_eager_policy,
 	&_starpu_sched_modular_eager_prefetching_policy,
 	&_starpu_sched_modular_prio_policy,
 	&_starpu_sched_modular_prio_prefetching_policy,
@@ -66,6 +65,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_modular_heft_prio_policy,
 	&_starpu_sched_modular_heft2_policy,
 #else
+	&_starpu_sched_modular_eager_policy,
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
 	&_starpu_sched_random_policy,

+ 2 - 0
src/sched_policies/component_fifo.c

@@ -153,7 +153,9 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 	struct _starpu_fifo_data * data = component->data;
 	struct _starpu_fifo_taskq * fifo = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
+	_starpu_worker_enter_section_safe_for_observation();
 	STARPU_PTHREAD_MUTEX_LOCK(mutex);
+	_starpu_worker_leave_section_safe_for_observation();
 	struct starpu_task * task = _starpu_fifo_pop_task(fifo, starpu_worker_get_id_check());
 	if(task)
 	{

+ 4 - 4
src/sched_policies/component_sched.c

@@ -376,7 +376,7 @@ void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsign
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
-	_starpu_sched_component_lock_all_workers(sched_ctx_id);
+	_starpu_sched_component_lock_all_workers();
 
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
@@ -384,7 +384,7 @@ void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsign
 
 	starpu_sched_tree_update_workers_in_ctx(t);
 
-	_starpu_sched_component_unlock_all_workers(sched_ctx_id);
+	_starpu_sched_component_unlock_all_workers();
 	STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
 }
 
@@ -395,7 +395,7 @@ void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, uns
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
-	_starpu_sched_component_lock_all_workers(sched_ctx_id);
+	_starpu_sched_component_lock_all_workers();
 
 	unsigned i;
 	for(i = 0; i < nworkers; i++)
@@ -403,7 +403,7 @@ void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, uns
 
 	starpu_sched_tree_update_workers_in_ctx(t);
 
-	_starpu_sched_component_unlock_all_workers(sched_ctx_id);
+	_starpu_sched_component_unlock_all_workers();
 	STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
 }
 

+ 27 - 84
src/sched_policies/component_worker.c

@@ -157,11 +157,7 @@ struct _starpu_worker_component_data
 {
 	union
 	{
-		struct
-		{
-			struct _starpu_worker * worker;
-			starpu_pthread_mutex_t lock;
-		};
+		struct _starpu_worker * worker;
 		struct _starpu_combined_worker * combined_worker;
 	};
 	struct _starpu_worker_task_list * list;
@@ -372,55 +368,12 @@ struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_com
 	return data->combined_worker;
 }
 
-void _starpu_sched_component_lock_worker(unsigned sched_ctx_id, int workerid)
-{
-	STARPU_ASSERT(workerid >= 0 && workerid < (int) starpu_worker_get_count());
-	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(sched_ctx_id, workerid)->data;
-	STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
-}
-void _starpu_sched_component_unlock_worker(unsigned sched_ctx_id, int workerid)
-{
-	STARPU_ASSERT(workerid >= 0  && workerid < (int)starpu_worker_get_count());
-	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(sched_ctx_id, workerid)->data;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
-}
-
-
-
 /******************************************************************************
  *				Worker Components' Private Helper Functions			      	  *
  *****************************************************************************/
 
 
 
-/* Allows a worker to lock/unlock scheduling mutexes. Currently used in
- * self-defined can_push calls to allow can_pull calls to take those mutexes while the
- * current worker is pushing tasks on other workers (or itself).
- */
-static void _starpu_sched_component_worker_lock_scheduling(unsigned sched_ctx_id)
-{
-	unsigned workerid = starpu_worker_get_id_check();
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	_starpu_sched_component_lock_worker(sched_ctx_id, workerid);
-#ifdef STARPU_DEVEL
-#warning Reverses locking order between worker lock and worker component lock!
-#warning See helgrind suppression file for the details
-#endif
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
-}
-
-static void _starpu_sched_component_worker_unlock_scheduling(unsigned sched_ctx_id)
-{
-	unsigned workerid = starpu_worker_get_id_check();
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
-	_starpu_sched_component_unlock_worker(sched_ctx_id, workerid);
-}
-
 static void _starpu_sched_component_worker_set_sleep_status(struct starpu_sched_component * worker_component)
 {
 	STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
@@ -490,24 +443,14 @@ static int _worker_consistant(struct starpu_sched_component * component)
 
 static void simple_worker_can_pull(struct starpu_sched_component * worker_component)
 {
-	(void) worker_component;
-	struct _starpu_worker * w = _starpu_sched_component_worker_get_worker(worker_component);
-	_starpu_sched_component_lock_worker(worker_component->tree->sched_ctx_id, w->workerid);
+	struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(worker_component);
+	int workerid = worker->workerid;
+	_starpu_worker_lock_for_observation_relax(workerid);
 	if(_starpu_sched_component_worker_is_reset_status(worker_component))
 		_starpu_sched_component_worker_set_changed_status(worker_component);
-
-	if(w->workerid == _starpu_worker_get_id())
-	{
-		_starpu_sched_component_unlock_worker(worker_component->tree->sched_ctx_id, w->workerid);
-		return;
-	}
-	if(_starpu_sched_component_worker_is_sleeping_status(worker_component))
-	{
-		_starpu_sched_component_unlock_worker(worker_component->tree->sched_ctx_id, w->workerid);
-		starpu_wake_worker(w->workerid);
-	}
-	else
-		_starpu_sched_component_unlock_worker(worker_component->tree->sched_ctx_id, w->workerid);
+	if(workerid != _starpu_worker_get_id() && _starpu_sched_component_worker_is_sleeping_status(worker_component))
+		starpu_wake_worker_locked(workerid);
+	_starpu_worker_unlock_for_observation(workerid);
 }
 
 static int simple_worker_push_task(struct starpu_sched_component * component, struct starpu_task *task)
@@ -540,7 +483,9 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_worker_component_data * data = component->data;
 	struct _starpu_worker_task_list * list = data->list;
+	_starpu_worker_enter_section_safe_for_observation();
 	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	_starpu_worker_leave_section_safe_for_observation();
 	/* Take the opportunity to update start time */
 	data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
 	data->list->exp_end = data->list->exp_start + data->list->exp_len;
@@ -550,10 +495,9 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 		_starpu_worker_task_list_transfer_started(list, task);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 		starpu_push_task_end(task);
-		return task;
+		goto ret;
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
-	_starpu_sched_component_lock_worker(component->tree->sched_ctx_id, workerid);
 	int i;
 	do
 	{
@@ -564,9 +508,7 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 				continue;
 			else
 			{
-				_starpu_sched_component_worker_unlock_scheduling(component->tree->sched_ctx_id);
 				task = starpu_sched_component_pull_task(component->parents[i],component);
-				_starpu_sched_component_worker_lock_scheduling(component->tree->sched_ctx_id);
 				if(task)
 					break;
 			}
@@ -574,34 +516,39 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	}
 	while((!task) && _starpu_sched_component_worker_is_changed_status(component));
 	_starpu_sched_component_worker_set_sleep_status(component);
-	_starpu_sched_component_unlock_worker(component->tree->sched_ctx_id, workerid);
 	if(!task)
-		return NULL;
+		goto ret;
 	if(task->cl->type == STARPU_SPMD)
 	{
 		if(!starpu_worker_is_combined_worker(workerid))
 		{
+			_starpu_worker_enter_section_safe_for_observation();
 			STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+			_starpu_worker_leave_section_safe_for_observation();
 			_starpu_worker_task_list_add(list, task);
 			_starpu_worker_task_list_transfer_started(list, task);
 			STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 			starpu_push_task_end(task);
-			return task;
+			goto ret;
 		}
 		struct starpu_sched_component * combined_worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, workerid);
 		starpu_sched_component_push_task(component, combined_worker_component, task);
 		/* we have pushed a task in queue, so can make a recursive call */
-		return simple_worker_pull_task(component);
+		task = simple_worker_pull_task(component);
+		goto ret;
 
 	}
 	if(task)
 	{
+		_starpu_worker_enter_section_safe_for_observation();
 		STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+		_starpu_worker_leave_section_safe_for_observation();
 		_starpu_worker_task_list_add(list, task);
 		_starpu_worker_task_list_transfer_started(list, task);
 		STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
 		starpu_push_task_end(task);
 	}
+ret:
 	return task;
 }
 
@@ -641,8 +588,6 @@ static void _worker_component_deinit_data(struct starpu_sched_component * compon
 {
 	struct _starpu_worker_component_data * d = component->data;
 	_starpu_worker_task_list_destroy(d->list);
-	if(starpu_sched_component_is_simple_worker(component))
-		STARPU_PTHREAD_MUTEX_DESTROY(&d->lock);
 	int i, j;
 	for(j = 0; j < STARPU_NMAX_SCHED_CTXS; j++)
 	for(i = 0; i < STARPU_NMAXWORKERS; i++)
@@ -672,7 +617,6 @@ static struct starpu_sched_component * starpu_sched_component_worker_create(stru
 	memset(data, 0, sizeof(*data));
 
 	data->worker = worker;
-	STARPU_PTHREAD_MUTEX_INIT(&data->lock,NULL);
 	data->status = COMPONENT_STATUS_SLEEPING;
 	data->list = _starpu_worker_task_list_create();
 	component->data = data;
@@ -719,16 +663,15 @@ static void combined_worker_can_pull(struct starpu_sched_component * component)
 	{
 		if((unsigned) i == workerid)
 			continue;
-		int worker = data->combined_worker->combined_workerid[i];
-		_starpu_sched_component_lock_worker(component->tree->sched_ctx_id, worker);
+		int workerid = data->combined_worker->combined_workerid[i];
+		_starpu_worker_lock_for_observation_relax(workerid);
 		if(_starpu_sched_component_worker_is_sleeping_status(component))
 		{
-			starpu_wake_worker(worker);
+			starpu_wake_worker_locked(workerid);
 		}
 		if(_starpu_sched_component_worker_is_reset_status(component))
 			_starpu_sched_component_worker_set_changed_status(component);
-
-		_starpu_sched_component_unlock_worker(component->tree->sched_ctx_id, worker);
+		_starpu_worker_unlock_for_observation(workerid);
 	}
 }
 
@@ -881,17 +824,17 @@ static struct starpu_sched_component  * starpu_sched_component_combined_worker_c
 
 
 
-void _starpu_sched_component_lock_all_workers(unsigned sched_ctx_id)
+void _starpu_sched_component_lock_all_workers(void)
 {
 	unsigned i;
 	for(i = 0; i < starpu_worker_get_count(); i++)
-		_starpu_sched_component_lock_worker(sched_ctx_id, i);
+		_starpu_worker_lock_for_observation_relax(i);
 }
-void _starpu_sched_component_unlock_all_workers(unsigned sched_ctx_id)
+void _starpu_sched_component_unlock_all_workers(void)
 {
 	unsigned i;
 	for(i = 0; i < starpu_worker_get_count(); i++)
-		_starpu_sched_component_unlock_worker(sched_ctx_id, i);
+		_starpu_worker_unlock_for_observation(i);
 }
 
 void _starpu_sched_component_workers_destroy(void)

+ 2 - 8
src/sched_policies/sched_component.h

@@ -21,10 +21,8 @@
 
 
 /* lock and unlock drivers for modifying schedulers */
-void _starpu_sched_component_lock_all_workers(unsigned sched_ctx_id);
-void _starpu_sched_component_unlock_all_workers(unsigned sched_ctx_id);
-void _starpu_sched_component_lock_worker(unsigned sched_ctx_id, int workerid);
-void _starpu_sched_component_unlock_worker(unsigned sched_ctx_id, int workerid);
+void _starpu_sched_component_lock_all_workers(void);
+void _starpu_sched_component_unlock_all_workers(void);
 
 void _starpu_sched_component_workers_destroy(void);
 
@@ -33,8 +31,4 @@ struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_com
 
 struct starpu_bitmap * _starpu_get_worker_mask(unsigned sched_ctx_id);
 
-
-void _starpu_sched_component_lock_scheduling(void);
-void _starpu_sched_component_unlock_scheduling(void);
-
 #endif