Forráskód Böngészése

Store tasks instead of jobs in the per-worker lists.

Cédric Augonnet 14 éve
szülő
commit
babde97945

+ 13 - 15
src/core/jobs.c

@@ -339,32 +339,30 @@ unsigned _starpu_enforce_deps_starting_from_task(starpu_job_t j, unsigned job_is
 	return ret;
 }
 
-struct starpu_job_s *_starpu_pop_local_task(struct starpu_worker_s *worker)
+struct starpu_task *_starpu_pop_local_task(struct starpu_worker_s *worker)
 {
-	struct starpu_job_s *j = NULL;
+	struct starpu_task *task = NULL;
 
-	PTHREAD_MUTEX_LOCK(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->local_tasks_mutex);
 
-	if (!starpu_job_list_empty(worker->local_jobs))
-		j = starpu_job_list_pop_back(worker->local_jobs);
+	if (!starpu_task_list_empty(&worker->local_tasks))
+		task = starpu_task_list_pop_back(&worker->local_tasks);
 
-	PTHREAD_MUTEX_UNLOCK(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_UNLOCK(&worker->local_tasks_mutex);
 
-	return j;
+	return task;
 }
 
-int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_job_s *j)
+int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task)
 {
 	/* Check that the worker is able to execute the task ! */
-	STARPU_ASSERT(j->task && j->task->cl);
-	if (STARPU_UNLIKELY(!(worker->worker_mask & j->task->cl->where)))
+	STARPU_ASSERT(task && task->cl);
+	if (STARPU_UNLIKELY(!(worker->worker_mask & task->cl->where)))
 		return -ENODEV;
 
-	PTHREAD_MUTEX_LOCK(&worker->local_jobs_mutex);
-
-	starpu_job_list_push_front(worker->local_jobs, j);
-
-	PTHREAD_MUTEX_UNLOCK(&worker->local_jobs_mutex);
+	PTHREAD_MUTEX_LOCK(&worker->local_tasks_mutex);
+	starpu_task_list_push_front(&worker->local_tasks, task);
+	PTHREAD_MUTEX_UNLOCK(&worker->local_tasks_mutex);
 
 #ifndef STARPU_NON_BLOCKING_DRIVERS
 	/* XXX that's a bit excessive ... */

+ 2 - 2
src/core/jobs.h

@@ -134,11 +134,11 @@ size_t _starpu_job_get_data_size(starpu_job_t j);
 
 /* Get a task from the local pool of tasks that were explicitly attributed to
  * that worker. */
-starpu_job_t _starpu_pop_local_task(struct starpu_worker_s *worker);
+struct starpu_task *_starpu_pop_local_task(struct starpu_worker_s *worker);
 
 /* Put a task into the pool of tasks that are explicitly attributed to the
  * specified worker. */
-int _starpu_push_local_task(struct starpu_worker_s *worker, starpu_job_t j);
+int _starpu_push_local_task(struct starpu_worker_s *worker, struct starpu_task *task);
 
 /* Returns the symbol associated to that job if any. */
 const char *_starpu_get_model_name(starpu_job_t j);

+ 1 - 1
src/core/sched_policy.c

@@ -231,7 +231,7 @@ int _starpu_push_task(starpu_job_t j, unsigned job_is_already_locked)
 			_starpu_prefetch_task_input_on_node(task, memory_node);
 		}
 
-		ret = _starpu_push_local_task(worker, j);
+		ret = _starpu_push_local_task(worker, task);
 	}
 	else {
 		STARPU_ASSERT(policy.push_task);

+ 4 - 3
src/core/workers.c

@@ -22,6 +22,7 @@
 #include <core/debug.h>
 #include <core/task.h>
 #include <profiling/profiling.h>
+#include <starpu_task_list.h>
 
 #ifdef __MINGW32__
 #include <windows.h>
@@ -120,8 +121,8 @@ static void _starpu_init_workers(struct starpu_machine_config_s *config)
 		 * directly */
 		workerarg->terminated_jobs = starpu_job_list_new();
 
-		workerarg->local_jobs = starpu_job_list_new();
-		PTHREAD_MUTEX_INIT(&workerarg->local_jobs_mutex, NULL);
+		starpu_task_list_init(&workerarg->local_tasks);
+		PTHREAD_MUTEX_INIT(&workerarg->local_tasks_mutex, NULL);
 	
 		workerarg->status = STATUS_INITIALIZING;
 
@@ -345,7 +346,7 @@ static void _starpu_terminate_workers(struct starpu_machine_config_s *config)
 			}
 		}
 
-		starpu_job_list_delete(worker->local_jobs);
+		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
 		starpu_job_list_delete(worker->terminated_jobs);
 	}
 }

+ 2 - 2
src/core/workers.h

@@ -69,8 +69,8 @@ struct starpu_worker_s {
 	unsigned memory_node; /* which memory node is associated that worker to ? */
 	pthread_cond_t *sched_cond; /* condition variable used when the worker waits for tasks. */
 	pthread_mutex_t *sched_mutex; /* mutex protecting sched_cond */
-	struct starpu_job_list_s *local_jobs; /* this queue contains tasks that have been explicitely submitted to that queue */
-	pthread_mutex_t local_jobs_mutex; /* protect the local_jobs list */
+	struct starpu_task_list local_tasks; /* this queue contains tasks that have been explicitely submitted to that queue */
+	pthread_mutex_t local_tasks_mutex; /* protect the local_tasks list */
 	struct starpu_worker_set_s *set; /* in case this worker belongs to a set */
 	struct starpu_job_list_s *terminated_jobs; /* list of pending jobs which were executed */
 	unsigned worker_is_running;

+ 13 - 12
src/drivers/cpu/driver_cpu.c

@@ -114,6 +114,8 @@ void *_starpu_cpu_worker(void *arg)
 	PTHREAD_MUTEX_UNLOCK(&cpu_arg->mutex);
 
         starpu_job_t j;
+	struct starpu_task *task;
+
 	int res;
 
 	while (_starpu_machine_is_running())
@@ -127,17 +129,13 @@ void *_starpu_cpu_worker(void *arg)
 		PTHREAD_MUTEX_LOCK(cpu_arg->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
-		j = _starpu_pop_local_task(cpu_arg);
+		task = _starpu_pop_local_task(cpu_arg);
 
 		/* otherwise ask a task to the scheduler */
-		if (!j)
-		{
-			struct starpu_task *task = _starpu_pop_task();
-			if (task)
-				j = _starpu_get_job_associated_to_task(task);
-		}
-		
-                if (j == NULL) 
+		if (!task)
+			task = _starpu_pop_task();
+	
+                if (!task) 
 		{
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, cpu_arg->sched_cond, cpu_arg->sched_mutex);
@@ -146,9 +144,12 @@ void *_starpu_cpu_worker(void *arg)
 
 			continue;
 		};
+
+		PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);	
+
+		STARPU_ASSERT(task);
+		j = _starpu_get_job_associated_to_task(task);
 	
-		PTHREAD_MUTEX_UNLOCK(cpu_arg->sched_mutex);
-		
 		/* can a cpu perform that task ? */
 		if (!STARPU_CPU_MAY_PERFORM(j)) 
 		{
@@ -157,7 +158,7 @@ void *_starpu_cpu_worker(void *arg)
 			continue;
 		}
 
-		_starpu_set_current_task(j->task);
+		_starpu_set_current_task(task);
 
                 res = execute_job_on_cpu(j, cpu_arg);
 

+ 9 - 9
src/drivers/cuda/driver_cuda.c

@@ -251,6 +251,7 @@ void *_starpu_cuda_worker(void *arg)
 	PTHREAD_MUTEX_UNLOCK(&args->mutex);
 
 	struct starpu_job_s * j;
+	struct starpu_task *task;
 	int res;
 
 	while (_starpu_machine_is_running())
@@ -264,17 +265,13 @@ void *_starpu_cuda_worker(void *arg)
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
-		j = _starpu_pop_local_task(args);
+		task = _starpu_pop_local_task(args);
 
 		/* otherwise ask a task to the scheduler */
-		if (!j)
-		{
-			struct starpu_task *task = _starpu_pop_task();
-			if (task)
-				j = _starpu_get_job_associated_to_task(task);
-		}
+		if (!task)
+			task = _starpu_pop_task();
 	
-                if (j == NULL) 
+                if (task == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, args->sched_cond, args->sched_mutex);
@@ -286,6 +283,9 @@ void *_starpu_cuda_worker(void *arg)
 
 		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
 
+		STARPU_ASSERT(task);
+		j = _starpu_get_job_associated_to_task(task);
+
 		/* can CUDA do that task ? */
 		if (!STARPU_CUDA_MAY_PERFORM(j))
 		{
@@ -294,7 +294,7 @@ void *_starpu_cuda_worker(void *arg)
 			continue;
 		}
 
-		_starpu_set_current_task(j->task);
+		_starpu_set_current_task(task);
 
 		res = execute_job_on_cuda(j, args);
 

+ 9 - 9
src/drivers/opencl/driver_opencl.c

@@ -373,6 +373,7 @@ void *_starpu_opencl_worker(void *arg)
 	PTHREAD_MUTEX_UNLOCK(&args->mutex);
 
 	struct starpu_job_s * j;
+	struct starpu_task *task;
 	int res;
 
 	while (_starpu_machine_is_running())
@@ -386,17 +387,13 @@ void *_starpu_opencl_worker(void *arg)
 		PTHREAD_MUTEX_LOCK(args->sched_mutex);
 
 		/* perhaps there is some local task to be executed first */
-		j = _starpu_pop_local_task(args);
+		task = _starpu_pop_local_task(args);
 
 		/* otherwise ask a task to the scheduler */
-		if (!j)
-		{
-			struct starpu_task *task = _starpu_pop_task();
-			if (task)
-				j = _starpu_get_job_associated_to_task(task);
-		}
+		if (!task)
+			task = _starpu_pop_task();
 		
-                if (j == NULL) 
+                if (task == NULL) 
 		{
 			if (_starpu_worker_can_block(memnode))
 				_starpu_block_worker(workerid, args->sched_cond, args->sched_mutex);
@@ -407,7 +404,10 @@ void *_starpu_opencl_worker(void *arg)
 		};
 
 		PTHREAD_MUTEX_UNLOCK(args->sched_mutex);
-	       
+
+		STARPU_ASSERT(task);
+		j = _starpu_get_job_associated_to_task(task);
+
 		/* can OpenCL do that task ? */
 		if (!STARPU_OPENCL_MAY_PERFORM(j))
 		{