Browse Source

fix synchro issue in peager

Andra Hugo 12 years ago
parent
commit
87e8dfca95
2 changed files with 31 additions and 28 deletions
  1. 0 3
      src/sched_policies/eager_central_policy.c
  2. 31 25
      src/sched_policies/parallel_eager.c

+ 0 - 3
src/sched_policies/eager_central_policy.c

@@ -64,8 +64,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int ret_val = -1;
-
-
 		
 	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
@@ -73,7 +71,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 	starpu_push_task_end(task);
 	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
 	unsigned worker = 0;

+ 31 - 25
src/sched_policies/parallel_eager.c

@@ -28,12 +28,14 @@ struct _starpu_peager_data
 	struct _starpu_fifo_taskq *local_fifo[STARPU_NMAXWORKERS];
 
 	int master_id[STARPU_NMAXWORKERS];
+        starpu_pthread_mutex_t policy_mutex;
 };
 
+#define STARPU_NMAXCOMBINED_WORKERS 10
 /* XXX instead of 10, we should use some "MAX combination .."*/
 static int possible_combinations_cnt[STARPU_NMAXWORKERS];
-static int possible_combinations[STARPU_NMAXWORKERS][10];
-static int possible_combinations_size[STARPU_NMAXWORKERS][10];
+static int possible_combinations[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
+static int possible_combinations_size[STARPU_NMAXWORKERS][STARPU_NMAXCOMBINED_WORKERS];
 
 
 /*!!!!!!! It doesn't work with several contexts because the combined workers are constructed
@@ -135,6 +137,7 @@ static void initialize_peager_policy(unsigned sched_ctx_id)
 	data->fifo = _starpu_create_fifo();
 
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);
+        _STARPU_PTHREAD_MUTEX_INIT(&data->policy_mutex, NULL);
 }
 
 static void deinitialize_peager_policy(unsigned sched_ctx_id)
@@ -146,6 +149,7 @@ static void deinitialize_peager_policy(unsigned sched_ctx_id)
 	_starpu_destroy_fifo(data->fifo);
 
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
+        _STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
 
 	free(data);
 }
@@ -156,30 +160,21 @@ static int push_task_peager_policy(struct starpu_task *task)
 	int ret_val = -1;
 	
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int worker = 0;
-	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
-	
-	struct starpu_sched_ctx_iterator it;
-	if(workers->init_iterator)
-		workers->init_iterator(workers, &it);
-	
-	while(workers->has_next(workers, &it))
-	{
-		worker = workers->get_next(workers, &it);
-		int master = data->master_id[worker];
-		/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
-		if (starpu_worker_get_type(worker) != STARPU_CPU_WORKER  || master == worker)
-		{
-			starpu_pthread_mutex_t *sched_mutex;
-			starpu_pthread_cond_t *sched_cond;
-			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-		}
-	}
-	
 	
+	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	ret_val = _starpu_fifo_push_task(data->fifo, task);
 	starpu_push_task_end(task);
+	_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+        /*if there are no tasks block */
+        /* wake people waiting for a task */
+        unsigned worker = 0;
+        struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+
+        struct starpu_sched_ctx_iterator it;
+        if(workers->init_iterator)
+                workers->init_iterator(workers, &it);
+
 
 	while(workers->has_next(workers, &it))
 	{
@@ -191,6 +186,7 @@ static int push_task_peager_policy(struct starpu_task *task)
 			starpu_pthread_mutex_t *sched_mutex;
 			starpu_pthread_cond_t *sched_cond;
 			starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+			_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 			_STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 		}
@@ -207,14 +203,24 @@ static struct starpu_task *pop_task_peager_policy(unsigned sched_ctx_id)
 
 	/* If this is not a CPU, then the worker simply grabs tasks from the fifo */
 	if (starpu_worker_get_type(workerid) != STARPU_CPU_WORKER)
-		return _starpu_fifo_pop_task(data->fifo, workerid);
+	{
+		struct starpu_task *task = NULL;
+		_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		task = _starpu_fifo_pop_task(data->fifo, workerid);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+		return task;
+	}
 
 	int master = data->master_id[workerid];
 
 	if (master == workerid)
 	{
 		/* The worker is a master */
-		struct starpu_task *task = _starpu_fifo_pop_task(data->fifo, workerid);
+		struct starpu_task *task = NULL;
+		_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+		task = _starpu_fifo_pop_task(data->fifo, workerid);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 		if (!task)
 			return NULL;