Преглед изворни кода

no need for the condition based scheme anymore since the multi-lock ordering issue is removed

Olivier Aumage пре 8 година
родитељ
комит
7fb90ffcba

+ 8 - 47
src/sched_policies/eager_central_policy.c

@@ -32,41 +32,9 @@ struct _starpu_eager_center_policy_data
 {
 {
 	struct _starpu_fifo_taskq *fifo;
 	struct _starpu_fifo_taskq *fifo;
 	starpu_pthread_mutex_t policy_mutex;
 	starpu_pthread_mutex_t policy_mutex;
-	starpu_pthread_cond_t policy_cond;
-	int state_busy:1;
 	struct starpu_bitmap *waiters;
 	struct starpu_bitmap *waiters;
-#warning debug code
-	/* debug only */
-	void *busy_thread;
-	int busy_workerid;
-	int busy_leave_signaled;
 };
 };
 
 
-static void enter_busy_state(struct _starpu_eager_center_policy_data *data)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	while (data->state_busy)
-	{
-		STARPU_PTHREAD_COND_WAIT(&data->policy_cond, &data->policy_mutex);
-	}
-	data->state_busy = 1;
-	data->busy_thread = (void *)pthread_self();
-	data->busy_workerid = starpu_worker_get_id();
-	data->busy_leave_signaled = 0;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-}
-
-static void leave_busy_state(struct _starpu_eager_center_policy_data *data)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	data->state_busy = 0;
-	data->busy_thread = NULL;
-	data->busy_workerid = -1;
-	STARPU_PTHREAD_COND_SIGNAL(&data->policy_cond);
-	data->busy_leave_signaled = 1;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-}
-
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
 {
 {
 	struct _starpu_eager_center_policy_data *data;
 	struct _starpu_eager_center_policy_data *data;
@@ -85,12 +53,6 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&data->policy_cond, NULL);
-	data->state_busy = 0;
-
-	data->busy_thread = NULL;
-	data->busy_workerid = -1;
-	data->busy_leave_signaled = 0;
 }
 }
 
 
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
@@ -105,7 +67,6 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 	starpu_bitmap_destroy(data->waiters);
 	starpu_bitmap_destroy(data->waiters);
 
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
-	STARPU_PTHREAD_COND_DESTROY(&data->policy_cond);
 	free(data);
 	free(data);
 }
 }
 
 
@@ -114,7 +75,7 @@ static int push_task_eager_policy(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 
-	enter_busy_state(data);
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	starpu_task_list_push_back(&data->fifo->taskq,task);
 	starpu_task_list_push_back(&data->fifo->taskq,task);
 	data->fifo->ntasks++;
 	data->fifo->ntasks++;
 	data->fifo->nprocessed++;
 	data->fifo->nprocessed++;
@@ -154,7 +115,7 @@ static int push_task_eager_policy(struct starpu_task *task)
 		}
 		}
 	}
 	}
 	/* Let the task free */
 	/* Let the task free */
-	leave_busy_state(data);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* Now that we have a list of potential workers, try to wake one */
 	/* Now that we have a list of potential workers, try to wake one */
@@ -178,9 +139,9 @@ static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)
 {
 {
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	unsigned workerid = starpu_worker_get_id_check();
 	unsigned workerid = starpu_worker_get_id_check();
-	enter_busy_state(data);
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
 	struct starpu_task* task = _starpu_fifo_pop_every_task(data->fifo, workerid);
-	leave_busy_state(data);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 
 	starpu_sched_ctx_list_task_counters_reset_all(task, sched_ctx_id);
 	starpu_sched_ctx_list_task_counters_reset_all(task, sched_ctx_id);
 
 
@@ -195,7 +156,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 
 	/* block until some event happens */
 	/* block until some event happens */
-	enter_busy_state(data);
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	_starpu_worker_leave_section_safe_for_observation();
 	_starpu_worker_leave_section_safe_for_observation();
 
 
 	/* Here helgrind would shout that this is unprotected, this is just an
 	/* Here helgrind would shout that this is unprotected, this is just an
@@ -203,7 +164,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	 * wake up. */
 	 * wake up. */
 	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
 	if (!STARPU_RUNNING_ON_VALGRIND && _starpu_fifo_empty(data->fifo))
 	{
 	{
-		leave_busy_state(data);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		return NULL;
 		return NULL;
 	}
 	}
 
 
@@ -211,7 +172,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 		/* Nobody woke us, avoid bothering the mutex */
 		/* Nobody woke us, avoid bothering the mutex */
 	{
 	{
-		leave_busy_state(data);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		return NULL;
 		return NULL;
 	}
 	}
 #endif
 #endif
@@ -222,7 +183,7 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 		starpu_bitmap_set(data->waiters, workerid);
 		starpu_bitmap_set(data->waiters, workerid);
 
 
 	_starpu_worker_enter_section_safe_for_observation();
 	_starpu_worker_enter_section_safe_for_observation();
-	leave_busy_state(data);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 	if(chosen_task)
 	if(chosen_task)
 	{
 	{
 		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		_starpu_sched_ctx_lock_write(sched_ctx_id);

+ 6 - 30
src/sched_policies/eager_central_priority_policy.c

@@ -48,30 +48,9 @@ struct _starpu_eager_central_prio_data
 {
 {
 	struct _starpu_priority_taskq *taskq;
 	struct _starpu_priority_taskq *taskq;
 	starpu_pthread_mutex_t policy_mutex;
 	starpu_pthread_mutex_t policy_mutex;
-	starpu_pthread_cond_t policy_cond;
-	int state_busy:1;
 	struct starpu_bitmap *waiters;
 	struct starpu_bitmap *waiters;
 };
 };
 
 
-static void enter_busy_state(struct _starpu_eager_central_prio_data *data)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	while (data->state_busy)
-	{
-		STARPU_PTHREAD_COND_WAIT(&data->policy_cond, &data->policy_mutex);
-	}
-	data->state_busy = 1;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-}
-
-static void leave_busy_state(struct _starpu_eager_central_prio_data *data)
-{
-	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
-	data->state_busy = 0;
-	STARPU_PTHREAD_COND_SIGNAL(&data->policy_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
-}
-
 /*
 /*
  * Centralized queue with priorities
  * Centralized queue with priorities
  */
  */
@@ -126,8 +105,6 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
 	STARPU_HG_DISABLE_CHECKING(data->taskq->total_ntasks);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&data->policy_cond, NULL);
-	data->state_busy = 0;
 }
 }
 
 
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
@@ -140,7 +117,6 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	starpu_bitmap_destroy(data->waiters);
 	starpu_bitmap_destroy(data->waiters);
 
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
-	STARPU_PTHREAD_COND_DESTROY(&data->policy_cond);
 	free(data);
 	free(data);
 }
 }
 
 
@@ -150,7 +126,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	struct _starpu_priority_taskq *taskq = data->taskq;
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
 
-	enter_busy_state(data);
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	unsigned priolevel = task->priority - starpu_sched_ctx_get_min_priority(sched_ctx_id);
 	unsigned priolevel = task->priority - starpu_sched_ctx_get_min_priority(sched_ctx_id);
 	STARPU_ASSERT_MSG(task->priority >= starpu_sched_ctx_get_min_priority(sched_ctx_id) &&
 	STARPU_ASSERT_MSG(task->priority >= starpu_sched_ctx_get_min_priority(sched_ctx_id) &&
 			  task->priority <= starpu_sched_ctx_get_max_priority(sched_ctx_id), "task priority %d is not between minimum %d and maximum %d\n", task->priority, starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
 			  task->priority <= starpu_sched_ctx_get_max_priority(sched_ctx_id), "task priority %d is not between minimum %d and maximum %d\n", task->priority, starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
@@ -193,7 +169,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 		}
 		}
 	}
 	}
 	/* Let the task free */
 	/* Let the task free */
-	leave_busy_state(data);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 #if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
 	/* Now that we have a list of potential workers, try to wake one */
 	/* Now that we have a list of potential workers, try to wake one */
@@ -225,7 +201,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	struct _starpu_priority_taskq *taskq = data->taskq;
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
 
 	/* block until some event happens */
 	/* block until some event happens */
-	enter_busy_state(data);
+	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	_starpu_worker_leave_section_safe_for_observation();
 	_starpu_worker_leave_section_safe_for_observation();
 
 
 	/* Here helgrind would shout that this is unprotected, this is just an
 	/* Here helgrind would shout that this is unprotected, this is just an
@@ -233,7 +209,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	 * wake up. */
 	 * wake up. */
 	if (!STARPU_RUNNING_ON_VALGRIND && taskq->total_ntasks == 0)
 	if (!STARPU_RUNNING_ON_VALGRIND && taskq->total_ntasks == 0)
 	{
 	{
-		leave_busy_state(data);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		return NULL;
 		return NULL;
 	}
 	}
 
 
@@ -241,7 +217,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 	if (!STARPU_RUNNING_ON_VALGRIND && starpu_bitmap_get(data->waiters, workerid))
 		/* Nobody woke us, avoid bothering the mutex */
 		/* Nobody woke us, avoid bothering the mutex */
 	{
 	{
-		leave_busy_state(data);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 		return NULL;
 		return NULL;
 	}
 	}
 #endif
 #endif
@@ -303,7 +279,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		starpu_bitmap_set(data->waiters, workerid);
 		starpu_bitmap_set(data->waiters, workerid);
 
 
 	_starpu_worker_enter_section_safe_for_observation();
 	_starpu_worker_enter_section_safe_for_observation();
-	leave_busy_state(data);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 
 	if(chosen_task)
 	if(chosen_task)
 	{
 	{