浏览代码

src/sched_policies : Fix locks outside of Worker Components by relocating them inside Worker Components, and relocating can_pull calls near pumps (i.e in Flow-Control Components)

Marc Sergent 11 年之前
父节点
当前提交
64351f3462

+ 2 - 9
src/sched_policies/component_eager.c

@@ -36,7 +36,7 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 				int i;
 				for (i = 0; i < component->nchildren; i++)
 				{
-					int idworker,ret;
+					int idworker;
 					for(idworker = starpu_bitmap_first(component->children[i]->workers);
 						idworker != -1;
 						idworker = starpu_bitmap_next(component->children[i]->workers, idworker))
@@ -49,14 +49,7 @@ static int eager_push_task(struct starpu_sched_component * component, struct sta
 								return 1;
 							}
 							else
-							{
-								ret = component->children[i]->push_task(component->children[i],task);
-								if(!ret)
-								{
-									component->children[i]->can_pull(component->children[i]);
-									return ret;
-								}
-							}
+								return component->children[i]->push_task(component->children[i],task);
 						}
 					}
 				}

+ 2 - 9
src/sched_policies/component_eager_calibration.c

@@ -48,7 +48,7 @@ static int eager_calibration_push_task(struct starpu_sched_component * component
 					int i;
 					for (i = 0; i < component->nchildren; i++)
 					{
-						int idworker,ret;
+						int idworker;
 						for(idworker = starpu_bitmap_first(component->children[i]->workers);
 							idworker != -1;
 							idworker = starpu_bitmap_next(component->children[i]->workers, idworker))
@@ -61,14 +61,7 @@ static int eager_calibration_push_task(struct starpu_sched_component * component
 									return 1;
 								}
 								else
-								{
-									ret = component->children[i]->push_task(component->children[i],task);
-									if(!ret)
-									{
-										component->children[i]->can_pull(component->children[i]);
-										return ret;
-									}
-								}
+									return component->children[i]->push_task(component->children[i],task);
 							}
 						}
 					}

+ 3 - 2
src/sched_policies/component_fifo.c

@@ -141,6 +141,9 @@ static int fifo_push_local_task(struct starpu_sched_component * component, struc
 		STARPU_ASSERT(!isnan(fifo->exp_end));
 		STARPU_ASSERT(!isnan(fifo->exp_len));
 		STARPU_ASSERT(!isnan(fifo->exp_start));
+		
+		if(!is_pushback)
+			component->can_pull(component);
 		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 	}
 	
@@ -211,7 +214,6 @@ static int fifo_can_push(struct starpu_sched_component * component)
 	STARPU_ASSERT(component->nchildren == 1);
 	struct starpu_sched_component * child = component->children[0];
 
-	_starpu_sched_component_unlock_scheduling();
 	struct starpu_task * task = component->pop_task(component);
 	if(task)
 		ret = child->push_task(child,task);	
@@ -224,7 +226,6 @@ static int fifo_can_push(struct starpu_sched_component * component)
 		if(task)
 			ret = child->push_task(child,task);	
 	} 
-	_starpu_sched_component_lock_scheduling();
 	if(task && ret)
 		fifo_push_local_task(component,task,1); 
 

+ 0 - 5
src/sched_policies/component_heft.c

@@ -163,10 +163,7 @@ static int heft_progress_one(struct starpu_sched_component *component)
 			return 1;
 		}
 		else
-		{
-			best_component->can_pull(best_component);
 			return 0;
-		}
 	}
 }
 
@@ -196,9 +193,7 @@ static int heft_push_task(struct starpu_sched_component * component, struct star
 
 static int heft_can_push(struct starpu_sched_component *component)
 {
-	_starpu_sched_component_unlock_scheduling();
 	heft_progress(component);
-	_starpu_sched_component_lock_scheduling();
 	int ret, j;
 	for(j=0; j < component->nparents; j++)
 	{

+ 0 - 3
src/sched_policies/component_mct.c

@@ -100,9 +100,6 @@ static int mct_push_task(struct starpu_sched_component * component, struct starp
 	}
 
 	int ret = best_component->push_task(best_component, task);
-	if(!ret)
-		best_component->can_pull(best_component);
-
 	return ret;
 }
 

+ 3 - 2
src/sched_policies/component_prio.c

@@ -154,6 +154,9 @@ static int prio_push_local_task(struct starpu_sched_component * component, struc
 		STARPU_ASSERT(!isnan(prio->exp_end));
 		STARPU_ASSERT(!isnan(prio->exp_len));
 		STARPU_ASSERT(!isnan(prio->exp_start));
+		
+		if(!is_pushback)
+			component->can_pull(component);
 		STARPU_PTHREAD_MUTEX_UNLOCK(mutex);
 	}
 
@@ -227,7 +230,6 @@ static int prio_can_push(struct starpu_sched_component * component)
 	STARPU_ASSERT(component->nchildren == 1);
 	struct starpu_sched_component * child = component->children[0];
 
-	_starpu_sched_component_unlock_scheduling();
 	struct starpu_task * task = component->pop_task(component);
 	if(task)
 		ret = child->push_task(child,task);	
@@ -240,7 +242,6 @@ static int prio_can_push(struct starpu_sched_component * component)
 		if(task)
 			ret = child->push_task(child,task);	
 	} 
-	_starpu_sched_component_lock_scheduling();
 	if(task && ret)
 		prio_push_local_task(component,task,1); 
 

+ 0 - 3
src/sched_policies/component_random.c

@@ -91,9 +91,6 @@ static int random_push_task(struct starpu_sched_component * component, struct st
 	}
 
 	int ret_val = select->push_task(select,task);
-	if(!ret_val)
-		select->can_pull(select);
-
 	return ret_val;
 }
 

+ 4 - 34
src/sched_policies/component_sched.c

@@ -32,30 +32,6 @@
 
 
 
-/* Allows a worker to lock/unlock scheduling mutexes. Currently used in 
- * self-defined can_push calls to allow can_pull calls to take those mutexes while the 
- * current worker is pushing tasks on other workers (or itself). 
- */
-void _starpu_sched_component_lock_scheduling(void)
-{
-	int workerid = starpu_worker_get_id();
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	_starpu_sched_component_lock_worker(workerid);	
-	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
-}
-
-void _starpu_sched_component_unlock_scheduling(void)
-{
-	int workerid = starpu_worker_get_id();
-	starpu_pthread_mutex_t *sched_mutex;
-	starpu_pthread_cond_t *sched_cond;
-	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
-	_starpu_sched_component_unlock_worker(workerid);	
-}
-
 /* this function find the best implementation or an implementation that need to be calibrated for a worker available
  * and set prediction in *length. nan if a implementation need to be calibrated, 0.0 if no perf model are available
  * return false if no worker on the component can execute that task
@@ -352,20 +328,14 @@ int starpu_sched_tree_push_task(struct starpu_task * task)
 	STARPU_ASSERT(task);
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_sched_tree *tree = starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int workerid = starpu_worker_get_id();
+
 	/* application should take tree->lock to prevent concurent acces from hypervisor
 	 * worker take they own mutexes
 	 */
-	if(-1 == workerid)
-		STARPU_PTHREAD_MUTEX_LOCK(&tree->lock);
-	else
-		_starpu_sched_component_lock_worker(workerid);
-		
+	STARPU_PTHREAD_MUTEX_LOCK(&tree->lock);
 	int ret_val = tree->root->push_task(tree->root,task);
-	if(-1 == workerid)
-		STARPU_PTHREAD_MUTEX_UNLOCK(&tree->lock);
-	else
-		_starpu_sched_component_unlock_worker(workerid);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tree->lock);
+	
 	return ret_val;
 }
 

+ 63 - 36
src/sched_policies/component_worker.c

@@ -299,11 +299,69 @@ static inline struct starpu_task * _starpu_worker_task_list_pop(struct _starpu_w
 
 
 /******************************************************************************
+ *			Worker Components' Public Helper Functions (Part 1)		     	  *
+ *****************************************************************************/
+
+
+
+struct _starpu_worker * _starpu_sched_component_worker_get_worker(struct starpu_sched_component * worker_component)
+{
+	STARPU_ASSERT(starpu_sched_component_is_simple_worker(worker_component));
+	struct _starpu_worker_component_data * data = worker_component->data;
+	return data->worker;
+}
+struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_combined_worker(struct starpu_sched_component * worker_component)
+{
+	STARPU_ASSERT(starpu_sched_component_is_combined_worker(worker_component));
+	struct _starpu_worker_component_data * data = worker_component->data;
+	return data->combined_worker;
+}
+
+void _starpu_sched_component_lock_worker(int workerid)
+{
+	STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
+	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
+	STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
+}
+void _starpu_sched_component_unlock_worker(int workerid)
+{
+	STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
+	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
+}
+
+
+
+/******************************************************************************
  *				Worker Components' Private Helper Functions			      	  *
  *****************************************************************************/
 
 
 
+/* Allows a worker to lock/unlock scheduling mutexes. Currently used in 
+ * self-defined can_push calls to allow can_pull calls to take those mutexes while the 
+ * current worker is pushing tasks on other workers (or itself). 
+ */
+static void _starpu_sched_component_worker_lock_scheduling(void)
+{
+	int workerid = starpu_worker_get_id();
+	starpu_pthread_mutex_t *sched_mutex;
+	starpu_pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	_starpu_sched_component_lock_worker(workerid);	
+	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+}
+
+static void _starpu_sched_component_worker_unlock_scheduling(void)
+{
+	int workerid = starpu_worker_get_id();
+	starpu_pthread_mutex_t *sched_mutex;
+	starpu_pthread_cond_t *sched_cond;
+	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	_starpu_sched_component_unlock_worker(workerid);	
+}
+
 static void _starpu_sched_component_worker_set_sleep_status(struct starpu_sched_component * worker_component)
 {
 	STARPU_ASSERT(starpu_sched_component_is_worker(worker_component));
@@ -366,40 +424,6 @@ static int _worker_consistant(struct starpu_sched_component * component)
 
 
 /******************************************************************************
- *			Worker Components' Public Helper Functions (Part 1)		     	  *
- *****************************************************************************/
-
-
-
-struct _starpu_worker * _starpu_sched_component_worker_get_worker(struct starpu_sched_component * worker_component)
-{
-	STARPU_ASSERT(starpu_sched_component_is_simple_worker(worker_component));
-	struct _starpu_worker_component_data * data = worker_component->data;
-	return data->worker;
-}
-struct _starpu_combined_worker * _starpu_sched_component_combined_worker_get_combined_worker(struct starpu_sched_component * worker_component)
-{
-	STARPU_ASSERT(starpu_sched_component_is_combined_worker(worker_component));
-	struct _starpu_worker_component_data * data = worker_component->data;
-	return data->combined_worker;
-}
-
-void _starpu_sched_component_lock_worker(int workerid)
-{
-	STARPU_ASSERT(0 <= workerid && workerid < (int) starpu_worker_get_count());
-	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
-	STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
-}
-void _starpu_sched_component_unlock_worker(int workerid)
-{
-	STARPU_ASSERT(0 <= workerid && workerid < (int)starpu_worker_get_count());
-	struct _starpu_worker_component_data * data = starpu_sched_component_worker_get(workerid)->data;
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
-}
-
-
-
-/******************************************************************************
  *				Simple Worker Components' Interface Functions			      *
  *****************************************************************************/
 
@@ -456,6 +480,7 @@ static int simple_worker_push_task(struct starpu_sched_component * component, st
 
 static struct starpu_task * simple_worker_pop_task(struct starpu_sched_component *component)
 {
+	int workerid = starpu_worker_get_id();
 	struct _starpu_worker_component_data * data = component->data;
 	struct _starpu_worker_task_list * list = data->list;
 	STARPU_PTHREAD_MUTEX_LOCK(&list->mutex);
@@ -466,7 +491,7 @@ static struct starpu_task * simple_worker_pop_task(struct starpu_sched_component
 		starpu_push_task_end(task);
 		return task;
 	}
-	STARPU_PTHREAD_MUTEX_LOCK(&data->lock);
+	_starpu_sched_component_lock_worker(workerid);	
 	int i;
 	do {
 		_starpu_sched_component_worker_reset_status(component);
@@ -476,14 +501,16 @@ static struct starpu_task * simple_worker_pop_task(struct starpu_sched_component
 				continue;
 			else
 			{
+				_starpu_sched_component_worker_unlock_scheduling();
 				task = component->parents[i]->pop_task(component->parents[i]);
+				_starpu_sched_component_worker_lock_scheduling();
 				if(task)
 					break;
 			}
 		}
 	} while((!task) && _starpu_sched_component_worker_is_changed_status(component));
 	_starpu_sched_component_worker_set_sleep_status(component);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->lock);
+	_starpu_sched_component_unlock_worker(workerid);	
 	if(!task)
 		return NULL;
 	if(task->cl->type == STARPU_SPMD)