Browse Source

factorise changing_ctx_mutex (get it out of the policies)

Andra Hugo 12 years ago
parent
commit
3de4a1581a

+ 1 - 1
configure.ac

@@ -267,7 +267,7 @@ AC_MSG_RESULT($max_sched_ctxs)
 AC_DEFINE_UNQUOTED(STARPU_NMAX_SCHED_CTXS, [$max_sched_ctxs], [Maximum number of sched_ctxs supported])
 
 AC_ARG_ENABLE([sc_hypervisor],
-  [AS_HELP_STRING([--enable-sct-hypervisor],
+  [AS_HELP_STRING([--enable-sc-hypervisor],
     [enable resizing contexts (experimental)])],
   [enable_sc_hypervisor="yes"],
   [enable_sc_hypervisor="no"])

+ 7 - 1
src/core/sched_policy.c

@@ -396,7 +396,13 @@ int _starpu_push_task_to_workers(struct starpu_task *task)
 	else
 	{
 		STARPU_ASSERT(sched_ctx->sched_policy->push_task);
-		ret = sched_ctx->sched_policy->push_task(task);
+		/* check out if there are any workers in the context */
+		starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx->id);
+		_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
+		nworkers = starpu_sched_ctx_get_nworkers(sched_ctx->id);
+		ret = nworkers == 0 ? -1 : sched_ctx->sched_policy->push_task(task);
+		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
+
 		if(ret == -1)
 		{
 			fprintf(stderr, "repush task \n");

+ 3 - 49
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -772,64 +772,18 @@ static int dmda_push_sorted_task(struct starpu_task *task)
 #ifdef STARPU_DEVEL
 #warning TODO: after defining a scheduling window, use that instead of empty_ctx_tasks
 #endif
-	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
-	int ret_val = -1;
-
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
-
-	ret_val = _dmda_push_task(task, 1, sched_ctx_id);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-	return ret_val;
-
+	return _dmda_push_task(task, 1, task->sched_ctx);
 }
 
 static int dm_push_task(struct starpu_task *task)
 {
-	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
-	int ret_val = -1;
-
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
-
-	ret_val = _dm_push_task(task, 0, sched_ctx_id);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-	return ret_val;
+	return _dm_push_task(task, 0, task->sched_ctx);
 }
 
 static int dmda_push_task(struct starpu_task *task)
 {
-	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
-	int ret_val = -1;
-
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
-
 	STARPU_ASSERT(task);
-	ret_val = _dmda_push_task(task, 0, sched_ctx_id);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-	return ret_val;
+	return _dmda_push_task(task, 0, task->sched_ctx);
 }
 
 static void dmda_add_workers(unsigned sched_ctx_id, int *workerids, unsigned nworkers)

+ 0 - 11
src/sched_policies/eager_central_policy.c

@@ -63,17 +63,8 @@ static int push_task_eager_policy(struct starpu_task *task)
  {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
 	int ret_val = -1;
 
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
 
 		
 	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
@@ -103,8 +94,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 
-		
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;
 }
 

+ 0 - 13
src/sched_policies/eager_central_priority_policy.c

@@ -109,20 +109,8 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
 	struct _starpu_priority_taskq *taskq = data->taskq;
-
-	/* if the context has no workers return */
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
 	int ret_val = -1;
 	
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
-
 
 	_STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
@@ -153,7 +141,6 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return 0;
 }
 

+ 0 - 13
src/sched_policies/parallel_eager.c

@@ -153,19 +153,8 @@ static void deinitialize_peager_policy(unsigned sched_ctx_id)
 static int push_task_peager_policy(struct starpu_task *task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
 	int ret_val = -1;
 	
-	/* if the context has no workers return */
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-	
-   	if(nworkers == 0)
-	{
-   		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-		return ret_val;
-	}
 	struct _starpu_peager_data *data = (struct _starpu_peager_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	int worker = 0;
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
@@ -206,8 +195,6 @@ static int push_task_peager_policy(struct starpu_task *task)
 			_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 		}
 	}
-	
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 
 	return ret_val;
 }

+ 0 - 21
src/sched_policies/parallel_heft.c

@@ -489,36 +489,15 @@ static int _parallel_heft_push_task(struct starpu_task *task, unsigned prio, uns
 static int parallel_heft_push_task(struct starpu_task *task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
 	int ret_val = -1;
 
 	if (task->priority == STARPU_MAX_PRIO)
 	{
-		_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-                nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-                if(nworkers == 0)
-                {
-                        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-                        return ret_val;
-                }
-
 		ret_val = _parallel_heft_push_task(task, 1, sched_ctx_id);
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-                return ret_val;
-        }
-
-
-	_STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-        if(nworkers == 0)
-	{
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
                 return ret_val;
         }
 
         ret_val = _parallel_heft_push_task(task, 0, sched_ctx_id);
-	_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
 	return ret_val;
 }
 

+ 1 - 16
src/sched_policies/random_policy.c

@@ -83,22 +83,7 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 
 static int random_push_task(struct starpu_task *task)
 {
-	unsigned sched_ctx_id = task->sched_ctx;
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-	unsigned nworkers;
-        int ret_val = -1;
-
-        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-	nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-        if(nworkers == 0)
-        {
-		_STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-                return ret_val;
-        }
-
-        ret_val = _random_push_task(task, !!task->priority);
-        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-        return ret_val;
+        return _random_push_task(task, !!task->priority);
 }
 
 static void initialize_random_policy(unsigned sched_ctx_id)

+ 0 - 13
src/sched_policies/work_stealing_policy.c

@@ -336,19 +336,8 @@ int ws_push_task(struct starpu_task *task)
 	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
 	int workerid = starpu_worker_get_id();
 
-	starpu_pthread_mutex_t *changing_ctx_mutex = starpu_sched_ctx_get_changing_ctx_mutex(sched_ctx_id);
-        unsigned nworkers;
         int ret_val = -1;
 
-	/* if the context has no workers return */
-        _STARPU_PTHREAD_MUTEX_LOCK(changing_ctx_mutex);
-        nworkers = starpu_sched_ctx_get_nworkers(sched_ctx_id);
-        if(nworkers == 0)
-        {
-                _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-                return ret_val;
-        }
-
 	unsigned worker = 0;
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
 	struct starpu_sched_ctx_iterator it;
@@ -394,8 +383,6 @@ int ws_push_task(struct starpu_task *task)
 		_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 	}
 		
-        _STARPU_PTHREAD_MUTEX_UNLOCK(changing_ctx_mutex);
-
 	return 0;
 }