Bläddra i källkod

refactorise push task to contextes without any worker

Andra Hugo 12 år sedan
förälder
incheckning
cef7c70f72
3 ändrade filer med 46 tillägg och 28 borttagningar
  1. 1 2
      src/core/sched_ctx.c
  2. 41 26
      src/core/sched_policy.c
  3. 4 0
      src/core/sched_policy.h

+ 1 - 2
src/core/sched_ctx.c

@@ -542,8 +542,7 @@ void _starpu_fetch_tasks_from_empty_ctx_list(struct _starpu_sched_ctx *sched_ctx
 		if(old_task == &stop_submission_task)
 			break;
 
-		struct _starpu_job *old_j = _starpu_get_job_associated_to_task(old_task);
-		int ret = _starpu_push_task(old_j);
+		int ret =  _starpu_push_task_to_workers(old_task);
 		/* if we should stop poping from empty ctx tasks */
 		if(ret == -1) break;
 	}

+ 41 - 26
src/core/sched_policy.c

@@ -318,6 +318,16 @@ int _starpu_push_task(struct _starpu_job *j)
 
 	_STARPU_LOG_IN();
 
+	_STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
+	_starpu_increment_nready_tasks();
+	task->status = STARPU_TASK_READY;
+#ifdef HAVE_AYUDAME_H
+	if (AYU_event) {
+		int id = -1;
+		AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
+	}
+#endif
+	/* if the context does not have any workers save the tasks in a temp list */
 	if(!sched_ctx->is_initial_sched)
 	{
 		/*if there are workers in the ctx that are not able to execute tasks
@@ -326,37 +336,42 @@ int _starpu_push_task(struct _starpu_job *j)
 
 		if(nworkers == 0)
 		{
-			if(task->already_pushed)
-			{
-				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
-				starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
-				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
-				_STARPU_LOG_OUT();
-				return -1;
-			}
-			else
-			{
-				_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
-				task->already_pushed = 1;
-				starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
-				_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
-				_STARPU_LOG_OUT();
-				return 0;
-			}
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+			starpu_task_list_push_front(&sched_ctx->empty_ctx_tasks, task);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+			return -1;
 		}
 	}
 
-	_STARPU_TRACE_JOB_PUSH(task, task->priority > 0);
-	_starpu_increment_nready_tasks();
-	task->status = STARPU_TASK_READY;
-#ifdef HAVE_AYUDAME_H
-	if (AYU_event) {
-		int id = -1;
-		AYU_event(AYU_ADDTASKTOQUEUE, j->job_id, &id);
+	return _starpu_push_task_to_workers(task);
+}
+
+int _starpu_push_task_to_workers(struct starpu_task *task)
+{
+	struct _starpu_sched_ctx *sched_ctx = _starpu_get_sched_ctx_struct(task->sched_ctx);
+	unsigned nworkers = 0;
+
+	/* if the contexts still does not have workers put the task back to its place in
+	   the empty ctx list */
+	if(!sched_ctx->is_initial_sched)
+	{
+		/*if there are workers in the ctx that are not able to execute tasks
+		  we consider the ctx empty */
+		nworkers = _starpu_nworkers_able_to_execute_task(task, sched_ctx);
+
+		if (nworkers == 0)
+		{
+			_STARPU_PTHREAD_MUTEX_LOCK(&sched_ctx->empty_ctx_mutex);
+			starpu_task_list_push_back(&sched_ctx->empty_ctx_tasks, task);
+			_STARPU_PTHREAD_MUTEX_UNLOCK(&sched_ctx->empty_ctx_mutex);
+			return -1;
+		}
 	}
-#endif
+
 	_starpu_profiling_set_task_push_start_time(task);
 
+	struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
+
 	/* in case there is no codelet associated to the task (that's a control
 	 * task), we directly execute its callback and enforce the
 	 * corresponding dependencies */
@@ -389,8 +404,8 @@ int _starpu_push_task(struct _starpu_job *j)
 
 	_STARPU_LOG_OUT();
 	return ret;
-}
 
+} 
 /*
  * Given a handle that needs to be converted in order to be used on the given
  * node, returns a task that takes care of the conversion.

+ 4 - 0
src/core/sched_policy.h

@@ -32,6 +32,10 @@ void _starpu_init_sched_policy(struct _starpu_machine_config *config,
 void _starpu_deinit_sched_policy(struct _starpu_sched_ctx *sched_ctx);
 
 int _starpu_push_task(struct _starpu_job *task);
+
+/* actually pushes the tasks to the specific worker or to the scheduler */
+int _starpu_push_task_to_workers(struct starpu_task *task);
+
 /* pop a task that can be executed on the worker */
 struct starpu_task *_starpu_pop_task(struct _starpu_worker *worker);
 /* pop every task that can be executed on the worker */