Browse Source

dead lock fixed : a worker of a combined worker first free his mutex before waking up his mates
initialization problem if starpu is not in the kernel's cache

Simon Archipoff 12 years ago
parent
commit
d8872d542c
1 changed files with 27 additions and 14 deletions
  1. 27 14
      src/sched_policies/node_worker.c

+ 27 - 14
src/sched_policies/node_worker.c

@@ -22,15 +22,15 @@ static struct _starpu_sched_node * _worker_nodes[STARPU_NMAXWORKERS];
  *
  *
  * its possible that a _starpu_task_grid wont have task
  * its possible that a _starpu_task_grid wont have task
  *
  *
- * N = no task 
+ * N = no task
  *
  *
  *   T  T  T
  *   T  T  T
  *   |  |  |
  *   |  |  |
  *   P--N--N
  *   P--N--N
  *   |  |  |
  *   |  |  |
  *   W  W  W
  *   W  W  W
- * 
- * 
+
+
  * this API is a little asymmetric : _starpu_task_grid are allocated by the caller and freed by the data structure
  * this API is a little asymmetric : _starpu_task_grid are allocated by the caller and freed by the data structure
  *
  *
  * exp_{start,end,len} are filled by the caller
  * exp_{start,end,len} are filled by the caller
@@ -43,7 +43,7 @@ struct _starpu_task_grid
 	 */
 	 */
 	struct starpu_task * task;
 	struct starpu_task * task;
 	struct _starpu_task_grid *up, *down, *left, *right;
 	struct _starpu_task_grid *up, *down, *left, *right;
-	
+
 	/* this is used to count the number of task to be poped by a worker
 	/* this is used to count the number of task to be poped by a worker
 	 * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
 	 * the leftist _starpu_task_grid maintain the ntasks counter (ie .left == NULL),
 	 * all the others use the pntasks that point to it
 	 * all the others use the pntasks that point to it
@@ -100,7 +100,7 @@ static void _starpu_worker_task_list_destroy(struct _starpu_worker_task_list * l
 	free(l);
 	free(l);
 }
 }
 
 
-//the task, ntasks, pntasks, left and right field members are set by the caller
+/* the task, ntasks, pntasks, left and right field members are set by the caller */
 static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
 static inline void _starpu_worker_task_list_push(struct _starpu_worker_task_list * l, struct _starpu_task_grid * t)
 {
 {
 	if(l->first == NULL)
 	if(l->first == NULL)
@@ -156,7 +156,7 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
 		_starpu_task_grid_destroy(t);
 		_starpu_task_grid_destroy(t);
 		return _starpu_worker_task_list_pop(l);
 		return _starpu_worker_task_list_pop(l);
 	}
 	}
-	
+
 	while(t)
 	while(t)
 	{
 	{
 		if(t->task)
 		if(t->task)
@@ -226,7 +226,7 @@ struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_nod
 		starpu_push_task_end(task);
 		starpu_push_task_end(task);
 		return task;
 		return task;
 	}
 	}
-	
+
 	struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
 	struct _starpu_sched_node *father = node->fathers[sched_ctx_id];
 	if(father == NULL)
 	if(father == NULL)
 		return NULL;
 		return NULL;
@@ -245,7 +245,7 @@ struct starpu_task * _starpu_sched_node_worker_pop_task(struct _starpu_sched_nod
 		(void)combined_worker_node->push_task(combined_worker_node, task);
 		(void)combined_worker_node->push_task(combined_worker_node, task);
 		//we have pushed a task in queue, so can make a recursive call
 		//we have pushed a task in queue, so can make a recursive call
 		return _starpu_sched_node_worker_pop_task(node, sched_ctx_id);
 		return _starpu_sched_node_worker_pop_task(node, sched_ctx_id);
-		
+
 	}
 	}
 	if(task)
 	if(task)
 		starpu_push_task_end(task);
 		starpu_push_task_end(task);
@@ -267,14 +267,13 @@ void _starpu_sched_node_worker_destroy(struct _starpu_sched_node *node)
 static void available_worker(struct _starpu_sched_node * worker_node)
 static void available_worker(struct _starpu_sched_node * worker_node)
 {
 {
 	(void) worker_node;
 	(void) worker_node;
-	
+
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
 	struct _starpu_worker * w = _starpu_sched_node_worker_get_worker(worker_node);
 	if(w->workerid == starpu_worker_get_id())
 	if(w->workerid == starpu_worker_get_id())
 		return;
 		return;
 	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
 	starpu_pthread_mutex_t *sched_mutex = &w->sched_mutex;
 	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
 	starpu_pthread_cond_t *sched_cond = &w->sched_cond;
-
 	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 	STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
@@ -370,7 +369,7 @@ static struct _starpu_task_execute_preds estimated_execute_preds(struct _starpu_
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	STARPU_ASSERT(_starpu_sched_node_is_worker(node));
 	starpu_task_bundle_t bundle = task->bundle;
 	starpu_task_bundle_t bundle = task->bundle;
 	struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
 	struct _starpu_worker * worker = _starpu_sched_node_worker_get_worker(node);
-			
+
 	struct _starpu_task_execute_preds preds =
 	struct _starpu_task_execute_preds preds =
 		{
 		{
 			.state = CANNOT_EXECUTE,
 			.state = CANNOT_EXECUTE,
@@ -482,7 +481,7 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
 	struct _starpu_task_grid * task_alias[combined_worker->worker_size];
 	struct _starpu_task_grid * task_alias[combined_worker->worker_size];
 	starpu_parallel_task_barrier_init(task, _starpu_bitmap_first(node->workers));
 	starpu_parallel_task_barrier_init(task, _starpu_bitmap_first(node->workers));
 	task_alias[0] = _starpu_task_grid_create();
 	task_alias[0] = _starpu_task_grid_create();
-	task_alias[0]->task = task;
+	task_alias[0]->task = starpu_task_dup(task);
 	task_alias[0]->left = NULL;
 	task_alias[0]->left = NULL;
 	task_alias[0]->ntasks = combined_worker->worker_size;
 	task_alias[0]->ntasks = combined_worker->worker_size;
 	int i;
 	int i;
@@ -495,7 +494,7 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
 		task_alias[i]->pntasks = &task_alias[0]->ntasks;
 		task_alias[i]->pntasks = &task_alias[0]->ntasks;
 	}
 	}
 
 
-	starpu_pthread_mutex_t * mutex_to_unlock = NULL; 
+	starpu_pthread_mutex_t * mutex_to_unlock = NULL;
 	i = 0;
 	i = 0;
 	do
 	do
 	{
 	{
@@ -508,11 +507,25 @@ static int _starpu_sched_node_combined_worker_push_task(struct _starpu_sched_nod
 		mutex_to_unlock = &list->mutex;
 		mutex_to_unlock = &list->mutex;
 		
 		
 		_starpu_worker_task_list_push(list, task_alias[i]);
 		_starpu_worker_task_list_push(list, task_alias[i]);
-		worker_node->available(worker_node);
 		i++;
 		i++;
 	}
 	}
 	while(i < combined_worker->worker_size);
 	while(i < combined_worker->worker_size);
 	STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
 	STARPU_PTHREAD_MUTEX_UNLOCK(mutex_to_unlock);
+
+	int workerid = starpu_worker_get_id();
+	starpu_pthread_mutex_t *worker_sched_mutex;
+	starpu_pthread_cond_t *worker_sched_cond;
+	starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(worker_sched_mutex);
+
+	/* wake up all other workers of combined worker */
+	for(i = 0; i < combined_worker->worker_size; i++)
+	{
+		struct _starpu_sched_node * worker_node = _starpu_sched_node_worker_get(combined_worker->combined_workerid[i]);
+		worker_node->available(worker_node);
+	}
+
+	STARPU_PTHREAD_MUTEX_LOCK(worker_sched_mutex);
 	return 0;
 	return 0;
 }
 }