浏览代码

fix policies having central lists:
* for push & pop: if data_policy mutex is taken no point in taking worker's personal mutex

Andra Hugo 12 年之前
父节点
当前提交
32708a04e4
共有 2 个文件被更改,包括 17 次插入44 次删除
  1. 2 23
      src/sched_policies/eager_central_policy.c
  2. 15 21
      src/sched_policies/eager_central_priority_policy.c

+ 2 - 23
src/sched_policies/eager_central_policy.c

@@ -75,34 +75,13 @@ static int push_task_eager_policy(struct starpu_task *task)
 		return ret_val;
 	}
 
-	unsigned worker = 0;
-	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-	struct starpu_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		_starpu_pthread_mutex_t *sched_mutex;
-		_starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-	}
 		
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 
 	_starpu_push_task_end(task);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		_starpu_pthread_mutex_t *sched_mutex;
-		_starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-	}
 		
 	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;

+ 15 - 21
src/sched_policies/eager_central_priority_policy.c

@@ -123,6 +123,15 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	}
 
 
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
+	
+	starpu_task_list_push_back(&taskq->taskq[priolevel], task);
+	taskq->ntasks[priolevel]++;
+	taskq->total_ntasks++;
+	_starpu_push_task_end(task);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
 	unsigned worker = 0;
@@ -132,28 +141,13 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 	
-	while(workers->has_next(workers,&it))
-	{
-		worker = workers->get_next(workers, &it);
-		_starpu_pthread_mutex_t *sched_mutex;
-		_starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-	}
-	
-	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
-	
-	starpu_task_list_push_back(&taskq->taskq[priolevel], task);
-	taskq->ntasks[priolevel]++;
-	taskq->total_ntasks++;
-	_starpu_push_task_end(task);
-
 	while(workers->has_next(workers, &it))
 	{
 		worker = workers->get_next(workers, &it);
 		_starpu_pthread_mutex_t *sched_mutex;
 		_starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 		_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
@@ -175,13 +169,13 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	/* block until some event happens */
 	if (taskq->total_ntasks == 0)
 		return NULL;
-	
+
 	/* release this mutex before trying to wake up other workers */
 	_starpu_pthread_mutex_t *curr_sched_mutex;
 	_starpu_pthread_cond_t *curr_sched_cond;
 	starpu_worker_get_sched_condition(workerid, &curr_sched_mutex, &curr_sched_cond);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(curr_sched_mutex);
-
+	
 	/* all workers will block on this mutex anyway so 
 	   there's no need for their own mutex to be locked */
 	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
@@ -214,9 +208,9 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	}
 	while (!chosen_task && priolevel-- > 0);
 
+
 	if (!chosen_task && skipped)
 	{
-
 		/* Notify another worker to do that task */
 		unsigned worker = 0;
 		struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
@@ -238,12 +232,12 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 				_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 			}
 		}
-
+	
 	}
 
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-	/* leave the mutex how it was found before this */
+	/* leave the mutex how it was found before this */	
 	_STARPU_PTHREAD_MUTEX_LOCK(curr_sched_mutex);
 
 	return chosen_task;