Sfoglia il codice sorgente

Use priorities for the local tasks list

Samuel Thibault 4 anni fa
parent
commit
eb35e0f01c

+ 2 - 3
include/starpu_scheduler.h

@@ -294,9 +294,8 @@ int starpu_worker_can_execute_task_first_impl(unsigned workerid, struct starpu_t
 /**
    The scheduling policy may put tasks directly into a worker’s local
    queue so that it is not always necessary to create its own queue
-   when the local queue is sufficient. If \p back is not 0, \p task is
-   put at the back of the queue where the worker will pop tasks first.
-   Setting \p back to 0 therefore ensures a FIFO ordering.
+   when the local queue is sufficient. \p back is ignored: the task priority is
+   used to order tasks in this queue.
 */
 int starpu_push_local_task(int workerid, struct starpu_task *task, int back);
 

+ 0 - 1
mpi/src/mpi/starpu_mpi_mpi.c

@@ -41,7 +41,6 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 #include <core/topology.h>
-#include <core/workers.h>
 
 #ifdef STARPU_USE_MPI_MPI
 

+ 0 - 1
mpi/src/starpu_mpi.c

@@ -33,7 +33,6 @@
 #include <core/simgrid.h>
 #include <core/task.h>
 #include <core/topology.h>
-#include <core/workers.h>
 
 static void _starpu_mpi_isend_irecv_common(struct _starpu_mpi_req *req, enum starpu_data_access_mode mode, int sequential_consistency)
 {

+ 1 - 1
socl/src/init.c

@@ -16,7 +16,7 @@
  */
 
 #include <stdlib.h>
-#include "../src/core/workers.h"
+#include "../src/common/utils.h"
 #include "socl.h"
 #include "gc.h"
 #include "mem_objects.h"

+ 2 - 0
src/common/utils.h

@@ -183,4 +183,6 @@ int _starpu_check_mutex_deadlock(starpu_pthread_mutex_t *mutex);
 
 void _starpu_util_init(void);
 
+enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
+
 #endif // __COMMON_UTILS_H__

+ 4 - 10
src/core/jobs.c

@@ -765,14 +765,14 @@ struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
 		}
 	}
 
-	if (!starpu_task_list_empty(&worker->local_tasks))
-		task = starpu_task_list_pop_front(&worker->local_tasks);
+	if (!starpu_task_prio_list_empty(&worker->local_tasks))
+		task = starpu_task_prio_list_pop_front_highest(&worker->local_tasks);
 
 	_starpu_pop_task_end(task);
 	return task;
 }
 
-int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
+int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task)
 {
 	/* Check that the worker is able to execute the task ! */
 	STARPU_ASSERT(task && task->cl);
@@ -815,13 +815,7 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 	}
 	else
 	{
-#ifdef STARPU_DEVEL
-#warning FIXME use a prio_list
-#endif
-		if (prio)
-			starpu_task_list_push_front(&worker->local_tasks, task);
-		else
-			starpu_task_list_push_back(&worker->local_tasks, task);
+		starpu_task_prio_list_push_back(&worker->local_tasks, task);
 	}
 
 	starpu_wake_worker_locked(worker->workerid);

+ 2 - 4
src/core/jobs.h

@@ -269,10 +269,8 @@ size_t _starpu_job_get_data_size(struct starpu_perfmodel *model, struct starpu_p
 struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker);
 
 /** Put a task into the pool of tasks that are explicitly attributed to the
- * specified worker. If "back" is set, the task is put at the back of the list.
- * Considering the tasks are popped from the back, this value should be 0 to
- * enforce a FIFO ordering. */
-int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio);
+ * specified worker. */
+int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task);
 
 #define _STARPU_JOB_GET_ORDERED_BUFFER_INDEX(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].index : job->ordered_buffers[i].index)
 #define _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].handle : job->ordered_buffers[i].handle)

+ 5 - 13
src/core/sched_policy.c

@@ -372,10 +372,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 		}
 //		if(task->sched_ctx != _starpu_get_initial_sched_ctx()->id)
 
-		if(task->priority > 0)
-			return _starpu_push_local_task(worker, task, 1);
-		else
-			return _starpu_push_local_task(worker, task, 0);
+		return _starpu_push_local_task(worker, task);
 	}
 	else
 	{
@@ -406,7 +403,7 @@ static int _starpu_push_task_on_specific_worker(struct starpu_task *task, int wo
 
 			_STARPU_TRACE_JOB_PUSH(alias, alias->priority);
 			worker = _starpu_get_worker_struct(combined_workerid[j]);
-			ret |= _starpu_push_local_task(worker, alias, 0);
+			ret |= _starpu_push_local_task(worker, alias);
 		}
 
 		return ret;
@@ -1033,7 +1030,7 @@ pick:
 	}
 
 	task->mf_skip = 1;
-	starpu_task_list_push_back(&worker->local_tasks, task);
+	starpu_task_prio_list_push_back(&worker->local_tasks, task);
 	goto pick;
 
 profiling:
@@ -1175,16 +1172,11 @@ void _starpu_wait_on_sched_event(void)
 	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 }
 
-/* The scheduling policy may put tasks directly into a worker's local queue so
- * that it is not always necessary to create its own queue when the local queue
- * is sufficient. If "back" not null, the task is put at the back of the queue
- * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
- * a FIFO ordering. */
-int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
+int starpu_push_local_task(int workerid, struct starpu_task *task, int back STARPU_ATTRIBUTE_UNUSED)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 
-	return  _starpu_push_local_task(worker, task, prio);
+	return  _starpu_push_local_task(worker, task);
 }
 
 void _starpu_print_idle_time()

+ 1 - 1
src/core/task.c

@@ -1055,7 +1055,7 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 
 	struct _starpu_worker *worker;
 	worker = _starpu_get_worker_struct(workerid);
-	starpu_task_list_push_back(&worker->local_tasks, task);
+	starpu_task_prio_list_push_back(&worker->local_tasks, task);
 	starpu_wake_worker_locked(worker->workerid);
 
 	_starpu_profiling_set_task_push_end_time(task);

+ 2 - 2
src/core/workers.c

@@ -681,7 +681,7 @@ void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machin
 	/* memory_node initialized by topology.c */
 	STARPU_PTHREAD_COND_INIT(&workerarg->sched_cond, NULL);
 	STARPU_PTHREAD_MUTEX_INIT(&workerarg->sched_mutex, NULL);
-	starpu_task_list_init(&workerarg->local_tasks);
+	starpu_task_prio_list_init(&workerarg->local_tasks);
 	_starpu_ctx_change_list_init(&workerarg->ctx_change_list);
 	workerarg->local_ordered_tasks = NULL;
 	workerarg->local_ordered_tasks_size = 0;
@@ -1785,7 +1785,7 @@ static void _starpu_terminate_workers(struct _starpu_machine_config *pconfig)
 		}
 
 out:
-		STARPU_ASSERT(starpu_task_list_empty(&worker->local_tasks));
+		STARPU_ASSERT(starpu_task_prio_list_empty(&worker->local_tasks));
 		for (n = 0; n < worker->local_ordered_tasks_size; n++)
 			STARPU_ASSERT(worker->local_ordered_tasks[n] == NULL);
 		_starpu_sched_ctx_list_delete(&worker->sched_ctx_list);

+ 1 - 3
src/core/workers.h

@@ -61,8 +61,6 @@
 
 #define STARPU_MAX_PIPELINE 4
 
-enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
-
 struct _starpu_ctx_change_list;
 
 /** This is initialized by _starpu_worker_init() */
@@ -125,7 +123,7 @@ LIST_TYPE(_starpu_worker,
 	     * subsequent processing once worker completes the ongoing scheduling
 	     * operation */
 	struct _starpu_ctx_change_list ctx_change_list;
-	struct starpu_task_list local_tasks; /**< this queue contains tasks that have been explicitely submitted to that queue */
+	struct starpu_task_prio_list local_tasks; /**< this queue contains tasks that have been explicitely submitted to that queue */
 	struct starpu_task **local_ordered_tasks; /**< this queue contains tasks that have been explicitely submitted to that queue with an explicit order */
 	unsigned local_ordered_tasks_size; /**< this records the size of local_ordered_tasks */
 	unsigned current_ordered_task; /**< this records the index (within local_ordered_tasks) of the next ordered task to be executed */

+ 4 - 4
src/sched_policies/component_worker.c

@@ -510,11 +510,11 @@ static double simple_worker_estimated_load(struct starpu_sched_component * compo
 	struct _starpu_worker * worker = _starpu_sched_component_worker_get_worker(component);
 	int nb_task = 0;
 	STARPU_COMPONENT_MUTEX_LOCK(&worker->mutex);
-	struct starpu_task_list list = worker->local_tasks;
+	struct starpu_task_prio_list *list = &worker->local_tasks;
 	struct starpu_task * task;
-	for(task = starpu_task_list_front(&list);
-	    task != starpu_task_list_end(&list);
-	    task = starpu_task_list_next(task))
+	for(task = starpu_task_prio_list_begin(list);
+	    task != starpu_task_prio_list_end(list);
+	    task = starpu_task_prio_list_next(list, task))
 		nb_task++;
 	STARPU_COMPONENT_MUTEX_UNLOCK(&worker->mutex);
 	struct _starpu_worker_component_data * d = component->data;

+ 1 - 0
tests/main/starpu_worker_exists.c

@@ -14,6 +14,7 @@
  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
  */
 
+#define BUILDING_STARPU
 #include <starpu.h>
 #include "core/workers.h"
 #include "../helper.h"