Kaynağa Gözat

add wrapper macros to reduce code duplication

Olivier Aumage 8 yıl önce
ebeveyn
işleme
8f00d2501f

+ 14 - 0
include/starpu_sched_component.h

@@ -196,6 +196,20 @@ struct starpu_sched_component_specs
 struct starpu_sched_tree *starpu_sched_component_make_scheduler(unsigned sched_ctx_id, struct starpu_sched_component_specs s);
 #endif /* STARPU_HAVE_HWLOC */
 
+#define STARPU_COMPONENT_MUTEX_LOCK(m) \
+do \
+{ \
+	const int _relaxed_state = _starpu_worker_get_observation_safe_state(); \
+	if (!_relaxed_state) \
+		_starpu_worker_enter_section_safe_for_observation(); \
+	STARPU_PTHREAD_MUTEX_LOCK((m)); \
+	if (!_relaxed_state) \
+		_starpu_worker_leave_section_safe_for_observation(); \
+} \
+while(0)
+
+#define STARPU_COMPONENT_MUTEX_UNLOCK(m) STARPU_PTHREAD_MUTEX_UNLOCK((m))
+
 #ifdef __cplusplus
 }
 #endif

+ 9 - 25
src/sched_policies/component_fifo.c

@@ -61,18 +61,13 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	if(STARPU_SCHED_COMPONENT_IS_HOMOGENEOUS(component))
 	{
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(mutex);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
 		load += fifo->ntasks / relative_speedup;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
 	else
@@ -84,13 +79,9 @@ static double fifo_estimated_load(struct starpu_sched_component * component)
 			relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(i, component->tree->sched_ctx_id));
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(mutex);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
 		load += fifo->ntasks / relative_speedup;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
 }
@@ -103,12 +94,7 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 	struct _starpu_fifo_taskq * fifo = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret = 0;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
-	if (!relaxed_state)
-		_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	if (!relaxed_state)
-		_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	double exp_len;
 	if(!isnan(task->predicted))
 		exp_len = fifo->exp_len + task->predicted;
@@ -126,7 +112,7 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 		}
 		STARPU_ASSERT(!is_pushback);
 		ret = 1;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	else
 	{
@@ -149,7 +135,7 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 
 		if(!is_pushback)
 			component->can_pull(component);
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 
 	return ret;
@@ -166,9 +152,7 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 	struct _starpu_fifo_data * data = component->data;
 	struct _starpu_fifo_taskq * fifo = data->fifo;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
-	_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task = _starpu_fifo_pop_task(fifo, starpu_worker_get_id_check());
 	if(task)
 	{
@@ -184,7 +168,7 @@ static struct starpu_task * fifo_pull_task(struct starpu_sched_component * compo
 	STARPU_ASSERT(!isnan(fifo->exp_end));
 	STARPU_ASSERT(!isnan(fifo->exp_len));
 	STARPU_ASSERT(!isnan(fifo->exp_start));
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto
 	// the empty place of the queue left by the popped task.

+ 8 - 26
src/sched_policies/component_heft.c

@@ -46,12 +46,7 @@ static int heft_progress_one(struct starpu_sched_component *component)
 	struct starpu_task * (tasks[NTASKS]);
 	unsigned ntasks;
 
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
-	if (!relaxed_state)
-		_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	if (!relaxed_state)
-		_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	/* Try to look at NTASKS from the queue */
 	for (ntasks = 0; ntasks < NTASKS; ntasks++)
 	{
@@ -59,7 +54,7 @@ static int heft_progress_one(struct starpu_sched_component *component)
 		if (!tasks[ntasks])
 			break;
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	if (!ntasks)
 	{
@@ -121,15 +116,11 @@ static int heft_progress_one(struct starpu_sched_component *component)
 		int best_icomponent = -1;
 
 		/* Push back the other tasks */
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(mutex);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
 		for (n = ntasks - 1; n < ntasks; n--)
 			if ((int) n != best_task)
 				_starpu_prio_deque_push_back_task(prio, tasks[n]);
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 		/* And now find out which worker suits best for this task,
 		 * including data transfer */
@@ -170,13 +161,9 @@ static int heft_progress_one(struct starpu_sched_component *component)
 		if (ret)
 		{
 			/* Could not push to child actually, push that one back too */
-			if (!relaxed_state)
-				_starpu_worker_enter_section_safe_for_observation();
-			STARPU_PTHREAD_MUTEX_LOCK(mutex);
-			if (!relaxed_state)
-				_starpu_worker_leave_section_safe_for_observation();
+			STARPU_COMPONENT_MUTEX_LOCK(mutex);
 			_starpu_prio_deque_push_back_task(prio, tasks[best_task]);
-			STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+			STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 			return 1;
 		}
 		else
@@ -199,14 +186,9 @@ static int heft_push_task(struct starpu_sched_component * component, struct star
 	struct _starpu_prio_deque * prio = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
-	if (!relaxed_state)
-		_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	if (!relaxed_state)
-		_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	_starpu_prio_deque_push_task(prio,task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	heft_progress(component);
 

+ 9 - 25
src/sched_policies/component_prio.c

@@ -80,18 +80,13 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	double relative_speedup = 0.0;
 	double load = starpu_sched_component_estimated_load(component);
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	if(STARPU_SCHED_COMPONENT_IS_HOMOGENEOUS(component))
 	{
 		int first_worker = starpu_bitmap_first(component->workers_in_ctx);
 		relative_speedup = starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(first_worker, component->tree->sched_ctx_id));
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(mutex);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
 		load += prio->ntasks / relative_speedup;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 		return load;
 	}
 	else
@@ -103,13 +98,9 @@ static double prio_estimated_load(struct starpu_sched_component * component)
 			relative_speedup += starpu_worker_get_relative_speedup(starpu_worker_get_perf_archtype(i, component->tree->sched_ctx_id));
 		relative_speedup /= starpu_bitmap_cardinal(component->workers_in_ctx);
 		STARPU_ASSERT(!_STARPU_IS_ZERO(relative_speedup));
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(mutex);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(mutex);
 		load += prio->ntasks / relative_speedup;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	return load;
 }
@@ -122,12 +113,7 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 	struct _starpu_prio_deque * prio = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
 	int ret;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
-	if (!relaxed_state)
-		_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	if (!relaxed_state)
-		_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	double exp_len;
 	if(!isnan(task->predicted))
 		exp_len = prio->exp_len + task->predicted;
@@ -144,7 +130,7 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 			warned = 1;
 		}
 		ret = 1;
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 	else
 	{
@@ -168,7 +154,7 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 		
 		if(!is_pushback)
 			component->can_pull(component);
-		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 	}
 
 	return ret;
@@ -186,9 +172,7 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 	struct _starpu_prio_data * data = component->data;
 	struct _starpu_prio_deque * prio = &data->prio;
 	starpu_pthread_mutex_t * mutex = &data->mutex;
-	_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(mutex);
-	_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(mutex);
 	struct starpu_task * task = _starpu_prio_deque_pop_task(prio);
 	if(task)
 	{
@@ -206,7 +190,7 @@ static struct starpu_task * prio_pull_task(struct starpu_sched_component * compo
 	STARPU_ASSERT(!isnan(prio->exp_end));
 	STARPU_ASSERT(!isnan(prio->exp_len));
 	STARPU_ASSERT(!isnan(prio->exp_start));
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex);
 
 	// When a pop is called, a can_push is called for pushing tasks onto
 	// the empty place of the queue left by the popped task.

+ 4 - 4
src/sched_policies/component_sched.c

@@ -375,7 +375,7 @@ void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsign
 	STARPU_ASSERT(workerids);
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
+	STARPU_COMPONENT_MUTEX_LOCK(&t->lock);
 	_starpu_sched_component_lock_all_workers();
 
 	unsigned i;
@@ -385,7 +385,7 @@ void starpu_sched_tree_add_workers(unsigned sched_ctx_id, int *workerids, unsign
 	starpu_sched_tree_update_workers_in_ctx(t);
 
 	_starpu_sched_component_unlock_all_workers();
-	STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&t->lock);
 }
 
 void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)
@@ -394,7 +394,7 @@ void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, uns
 	STARPU_ASSERT(workerids);
 	struct starpu_sched_tree * t = starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	STARPU_PTHREAD_MUTEX_LOCK(&t->lock);
+	STARPU_COMPONENT_MUTEX_LOCK(&t->lock);
 	_starpu_sched_component_lock_all_workers();
 
 	unsigned i;
@@ -404,7 +404,7 @@ void starpu_sched_tree_remove_workers(unsigned sched_ctx_id, int *workerids, uns
 	starpu_sched_tree_update_workers_in_ctx(t);
 
 	_starpu_sched_component_unlock_all_workers();
-	STARPU_PTHREAD_MUTEX_UNLOCK(&t->lock);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&t->lock);
 }
 
 static struct starpu_sched_tree *trees[STARPU_NMAX_SCHED_CTXS];

+ 14 - 44
src/sched_policies/component_work_stealing.c

@@ -52,23 +52,18 @@ static struct starpu_task *  steal_task_round_robin(struct starpu_sched_componen
 	/* If the worker's queue have no suitable tasks, let's try
 	 * the next ones */
 	struct starpu_task * task = NULL;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	while (1)
 	{
 		struct _starpu_prio_deque * fifo = wsd->fifos[i];
 
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 		task = _starpu_prio_deque_deque_task_for_worker(fifo, workerid);
 		if(task && !isnan(task->predicted))
 		{
 			fifo->exp_len -= task->predicted;
 			fifo->nprocessed--;
 		}
-		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 		if(task)
 		{
 			_STARPU_TASK_BREAK_ON(task, sched);
@@ -139,9 +134,7 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component)
 	}
 	STARPU_ASSERT(i < component->nchildren);
 	struct _starpu_work_stealing_data * wsd = component->data;
-	_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-	_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 	struct starpu_task * task = _starpu_prio_deque_pop_task(wsd->fifos[i]);
 	if(task)
 	{
@@ -154,7 +147,7 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component)
 	else
 		wsd->fifos[i]->exp_len = 0.0;
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+	STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 	if(task)
 	{
 		return task;
@@ -163,11 +156,9 @@ static struct starpu_task * pull_task(struct starpu_sched_component * component)
 	task  = steal_task(component, workerid);
 	if(task)
 	{
-		_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-		_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 		wsd->fifos[i]->nprocessed++;
-		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 		return task;
 	}
@@ -195,18 +186,13 @@ double _ws_estimated_end(struct starpu_sched_component * component)
 	double sum_len = 0.0;
 	double sum_start = 0.0;
 	int i;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	for(i = 0; i < component->nchildren; i++)
 	{
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 		sum_len += wsd->fifos[i]->exp_len;
 		wsd->fifos[i]->exp_start = STARPU_MAX(starpu_timing_now(), wsd->fifos[i]->exp_start);
 		sum_start += wsd->fifos[i]->exp_start;
-		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 	}
 	int nb_workers = starpu_bitmap_cardinal(component->workers_in_ctx);
@@ -220,16 +206,11 @@ double _ws_estimated_load(struct starpu_sched_component * component)
 	struct _starpu_work_stealing_data * wsd = component->data;
 	int ntasks = 0;
 	int i;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	for(i = 0; i < component->nchildren; i++)
 	{
-		if (!relaxed_state)
-			_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-		if (!relaxed_state)
-			_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 		ntasks += wsd->fifos[i]->ntasks;
-		STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+		STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 	}
 	double speedup = 0.0;
 	int workerid;
@@ -249,16 +230,10 @@ static int push_task(struct starpu_sched_component * component, struct starpu_ta
 	int ret;
 	int i = wsd->last_push_child;
 	i = (i+1)%component->nchildren;
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
-
-	if (!relaxed_state)
-		_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-	if (!relaxed_state)
-		_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 	_STARPU_TASK_BREAK_ON(task, sched);
 	ret = _starpu_prio_deque_push_task(wsd->fifos[i], task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+	STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 	wsd->last_push_child = i;
 	component->can_pull(component);
@@ -275,7 +250,6 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_sched_component * component =starpu_sched_component_worker_get(sched_ctx_id, workerid);
-	const int relaxed_state = _starpu_worker_get_observation_safe_state();
 	while(component->parents[sched_ctx_id] != NULL)
 	{
 		component = component->parents[sched_ctx_id];
@@ -291,15 +265,11 @@ int starpu_sched_tree_work_stealing_push_task(struct starpu_task *task)
 			STARPU_ASSERT(i < component->nchildren);
 
 			struct _starpu_work_stealing_data * wsd = component->data;
-			if (!relaxed_state)
-				_starpu_worker_enter_section_safe_for_observation();
-			STARPU_PTHREAD_MUTEX_LOCK(wsd->mutexes[i]);
-			if (!relaxed_state)
-				_starpu_worker_leave_section_safe_for_observation();
+			STARPU_COMPONENT_MUTEX_LOCK(wsd->mutexes[i]);
 			int ret = _starpu_prio_deque_push_task(wsd->fifos[i] , task);
 			if(ret == 0 && !isnan(task->predicted))
 				wsd->fifos[i]->exp_len += task->predicted;
-			STARPU_PTHREAD_MUTEX_UNLOCK(wsd->mutexes[i]);
+			STARPU_COMPONENT_MUTEX_UNLOCK(wsd->mutexes[i]);
 
 			//we need to wake all workers
 			component->can_pull(component);

+ 18 - 31
src/sched_policies/component_worker.c

@@ -471,9 +471,9 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
 	}
 #endif
 	struct _starpu_worker_task_list * list = data->list;
-	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 	_starpu_worker_task_list_push(list, t);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 	simple_worker_can_pull(component);
 	return 0;
 }
@@ -483,9 +483,7 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	unsigned workerid = starpu_worker_get_id_check();
 	struct _starpu_worker_component_data * data = component->data;
 	struct _starpu_worker_task_list * list = data->list;
-	_starpu_worker_enter_section_safe_for_observation();
-	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-	_starpu_worker_leave_section_safe_for_observation();
+	STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 	/* Take the opportunity to update start time */
 	data->list->exp_start = STARPU_MAX(starpu_timing_now(), data->list->exp_start);
 	data->list->exp_end = data->list->exp_start + data->list->exp_len;
@@ -493,11 +491,11 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	if(task)
 	{
 		_starpu_worker_task_list_transfer_started(list, task);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 		starpu_push_task_end(task);
 		goto ret;
 	}
-	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 	int i;
 	do
 	{
@@ -522,12 +520,10 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	{
 		if(!starpu_worker_is_combined_worker(workerid))
 		{
-			_starpu_worker_enter_section_safe_for_observation();
-			STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-			_starpu_worker_leave_section_safe_for_observation();
+			STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 			_starpu_worker_task_list_add(list, task);
 			_starpu_worker_task_list_transfer_started(list, task);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+			STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 			starpu_push_task_end(task);
 			goto ret;
 		}
@@ -540,12 +536,10 @@ static struct starpu_task * simple_worker_pull_task(struct starpu_sched_componen
 	}
 	if(task)
 	{
-		_starpu_worker_enter_section_safe_for_observation();
-		STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
-		_starpu_worker_leave_section_safe_for_observation();
+		STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 		_starpu_worker_task_list_add(list, task);
 		_starpu_worker_task_list_transfer_started(list, task);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+		STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 		starpu_push_task_end(task);
 	}
 ret:
@@ -568,14 +562,14 @@ static double simple_worker_estimated_load(struct starpu_sched_component * compo
 {
 	struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
 	int nb_task = 0;
-	STARPU_PTHREAD_MUTEX_LOCK(&worker->mutex);
+	STARPU_COMPONENT_MUTEX_LOCK(&worker->mutex);
 	struct starpu_task_list list = worker->local_tasks;
 	struct starpu_task * task;
 	for(task = starpu_task_list_front(&list);
 	    task != starpu_task_list_end(&list);
 	    task = starpu_task_list_next(task))
 		nb_task++;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&worker->mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&worker->mutex);
 	struct _starpu_worker_component_data * d = component->data;
 	struct _starpu_worker_task_list * l = d->list;
 	int ntasks_in_fifo = l ? l->ntasks : 0;
@@ -709,9 +703,9 @@ static int combined_worker_push_task(struct starpu_sched_component * component,
 		struct starpu_sched_component * worker_component = starpu_sched_component_worker_get(component->tree->sched_ctx_id, combined_worker->combined_workerid[i]);
 		struct _starpu_worker_component_data * worker_data = worker_component->data;
 		struct _starpu_worker_task_list * list = worker_data->list;
-		STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+		STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 		if(mutex_to_unlock)
-			STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
+			STARPU_COMPONENT_MUTEX_UNLOCK(mutex_to_unlock);
 		mutex_to_unlock = &list->mutex;
 
 		_starpu_worker_task_list_push(list, task_alias[i]);
@@ -719,7 +713,7 @@ static int combined_worker_push_task(struct starpu_sched_component * component,
 	}
 	while(i < combined_worker->worker_size);
 
-	STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
+	STARPU_COMPONENT_MUTEX_UNLOCK(mutex_to_unlock);
 
 	int workerid = starpu_worker_get_id();
 	if(-1 == workerid)
@@ -728,11 +722,6 @@ static int combined_worker_push_task(struct starpu_sched_component * component,
 	}
 	else
 	{
-		starpu_pthread_mutex_t *worker_sched_mutex;
-		starpu_pthread_cond_t *worker_sched_cond;
-		starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
-		STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(worker_sched_mutex);
-
 		/* wake up all other workers of combined worker */
 		for(i = 0; i < combined_worker->worker_size; i++)
 		{
@@ -741,8 +730,6 @@ static int combined_worker_push_task(struct starpu_sched_component * component,
 		}
 
 		combined_worker_can_pull(component);
-
-		STARPU_PTHREAD_MUTEX_LOCK_SCHED(worker_sched_mutex);
 	}
 
 	return 0;
@@ -858,11 +845,11 @@ int starpu_sched_component_worker_get_workerid(struct starpu_sched_component * w
 void starpu_sched_component_worker_pre_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
-	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 	_starpu_worker_task_list_started(list, task);
 	/* Take the opportunity to update start time */
 	list->exp_start = STARPU_MAX(starpu_timing_now() + list->pipeline_len, list->exp_start);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 }
 
 void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task, unsigned sched_ctx_id STARPU_ATTRIBUTE_UNUSED)
@@ -870,9 +857,9 @@ void starpu_sched_component_worker_post_exec_hook(struct starpu_task * task, uns
 	if(task->execute_on_a_specific_worker)
 		return;
 	struct _starpu_worker_task_list * list = _worker_get_list(sched_ctx_id);
-	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_LOCK(&list->mutex);
 	_starpu_worker_task_list_finished(list, task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&list->mutex);
+	STARPU_COMPONENT_MUTEX_UNLOCK(&list->mutex);
 }
 
 int starpu_sched_component_is_simple_worker(struct starpu_sched_component * component)