Selaa lähdekoodia

fixing synchro pb for the other policies too

Andra Hugo 12 vuotta sitten
vanhempi
commit
2bf98abb22

+ 52 - 57
src/sched_policies/eager_central_priority_policy.c

@@ -45,6 +45,7 @@ struct _starpu_priority_taskq
 struct _starpu_eager_central_prio_data
 {
 	struct _starpu_priority_taskq *taskq;
+	_starpu_pthread_mutex_t policy_mutex;
 };
 
 /*
@@ -85,6 +86,7 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	/* only a single queue (even though there are several internaly) */
 	data->taskq = _starpu_create_priority_taskq();
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+	_STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 }
 
 static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
@@ -96,6 +98,7 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 	_starpu_destroy_priority_taskq(data->taskq);
 
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+	_STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 	free(data);
 }
 
@@ -123,36 +126,36 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
 	unsigned worker = 0;
-    struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-
-    struct starpu_iterator it;
-    if(workers->init_iterator)
-	    workers->init_iterator(workers, &it);
-
-    while(workers->has_next(workers,&it))
-    {
-	    worker = workers->get_next(workers, &it);
-	    _starpu_pthread_mutex_t *sched_mutex;
-	    _starpu_pthread_cond_t *sched_cond;
-	    starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-	    _STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-    }
-
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
+	
+	while(workers->has_next(workers,&it))
+	{
+		worker = workers->get_next(workers, &it);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	}
+	
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
-
+	
 	starpu_task_list_push_back(&taskq->taskq[priolevel], task);
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 
 	while(workers->has_next(workers, &it))
-    {
-	    worker = workers->get_next(workers, &it);
-        _starpu_pthread_mutex_t *sched_mutex;
-        _starpu_pthread_cond_t *sched_cond;
+	{
+		worker = workers->get_next(workers, &it);
+		_starpu_pthread_mutex_t *sched_mutex;
+		_starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-        _STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-        _STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-    }
+		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return 0;
@@ -169,49 +172,39 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	struct _starpu_priority_taskq *taskq = data->taskq;
 
 	/* block until some event happens */
-
-	if ((taskq->total_ntasks == 0) && _starpu_machine_is_running())
-	{
-#ifdef STARPU_NON_BLOCKING_DRIVERS
+	if (taskq->total_ntasks == 0)
 		return NULL;
-#else
-        _starpu_pthread_mutex_t *sched_mutex;
-        _starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-
-		_STARPU_PTHREAD_COND_WAIT(sched_cond, sched_mutex);
-#endif
-	}
+	
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 
-	if (taskq->total_ntasks > 0)
+	unsigned priolevel = NPRIO_LEVELS - 1;
+	do
 	{
-		unsigned priolevel = NPRIO_LEVELS - 1;
-		do
+		if (taskq->ntasks[priolevel] > 0)
 		{
-			if (taskq->ntasks[priolevel] > 0)
-			{
-				for (task  = starpu_task_list_begin(&taskq->taskq[priolevel]);
-				     task != starpu_task_list_end(&taskq->taskq[priolevel]);
-				     task  = starpu_task_list_next(task)) {
-					unsigned nimpl;
-					for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+			for (task  = starpu_task_list_begin(&taskq->taskq[priolevel]);
+			     task != starpu_task_list_end(&taskq->taskq[priolevel]);
+			     task  = starpu_task_list_next(task)) {
+				unsigned nimpl;
+				for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+				{
+					if (starpu_worker_can_execute_task(workerid, task, nimpl))
 					{
-						if (starpu_worker_can_execute_task(workerid, task, nimpl))
-						{
-							/* there is some task that we can grab */
-							_starpu_get_job_associated_to_task(task)->nimpl = nimpl;
-							starpu_task_list_erase(&taskq->taskq[priolevel], task);
-							chosen_task = task;
-							taskq->ntasks[priolevel]--;
-							taskq->total_ntasks--;
-							_STARPU_TRACE_JOB_POP(task, 0);
-						} else skipped = 1;
+						/* there is some task that we can grab */
+						_starpu_get_job_associated_to_task(task)->nimpl = nimpl;
+						starpu_task_list_erase(&taskq->taskq[priolevel], task);
+						chosen_task = task;
+						taskq->ntasks[priolevel]--;
+						taskq->total_ntasks--;
+						_STARPU_TRACE_JOB_POP(task, 0);
+					} else skipped = 1;
 					}
-				}
 			}
 		}
-		while (!chosen_task && priolevel-- > 0);
 	}
+	while (!chosen_task && priolevel-- > 0);
+
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 	if (!chosen_task && skipped)
 	{
@@ -231,7 +224,9 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 				_starpu_pthread_mutex_t *sched_mutex;
 				_starpu_pthread_cond_t *sched_cond;
 				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 				_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
+				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			}
 		}
 

+ 23 - 27
src/sched_policies/work_stealing_policy.c

@@ -265,28 +265,20 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		q->njobs--;
 		return task;
 	}
-
-	int worker = 0;
-	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-	struct starpu_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-	
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		if(worker != workerid)
-		{
-			_starpu_pthread_mutex_t *sched_mutex;
-			_starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-		}
-	}
-
+	_starpu_pthread_mutex_t *worker_sched_mutex;
+	_starpu_pthread_cond_t *worker_sched_cond;
+	starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
+       
 
 	/* we need to steal someone's job */
 	unsigned victim = select_victim(sched_ctx_id);
+
+	_starpu_pthread_mutex_t *victim_sched_mutex;
+	_starpu_pthread_cond_t *victim_sched_cond;
+
+	starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
+	_STARPU_PTHREAD_MUTEX_LOCK(victim_sched_mutex);
 	struct _starpu_deque_jobq *victimq = ws->queue_array[victim];
 
 	task = _starpu_deque_pop_task(victimq, workerid);
@@ -301,17 +293,21 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
 		victimq->njobs--;
 	}
 
-	while(workers->has_next(workers, &it))
+	_STARPU_PTHREAD_MUTEX_UNLOCK(victim_sched_mutex);
+
+	_STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
+	if(!task)
 	{
-		worker = workers->get_next(workers, &it);
-		if(worker != workerid)
+		task = _starpu_deque_pop_task(q, workerid);
+		if (task)
 		{
-			_starpu_pthread_mutex_t *sched_mutex;
-			_starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+			/* there was a local task */
+			ws->performed_total++;
+			q->nprocessed++;
+			q->njobs--;
+			return task;
 		}
-	}		
+	}
 
 	return task;
 }