浏览代码

merge trunk@9601:9617

Nathalie Furmento 12 年之前
父节点
当前提交
05b438c4e1

+ 4 - 4
src/core/jobs.c

@@ -430,12 +430,12 @@ struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker)
 	struct starpu_task *task = NULL;
 
 	if (!starpu_task_list_empty(&worker->local_tasks))
-		task = starpu_task_list_pop_back(&worker->local_tasks);
+		task = starpu_task_list_pop_front(&worker->local_tasks);
 
 	return task;
 }
 
-int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int back)
+int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio)
 {
 	/* Check that the worker is able to execute the task ! */
 	STARPU_ASSERT(task && task->cl);
@@ -445,9 +445,9 @@ int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *t
 	STARPU_PTHREAD_MUTEX_LOCK(&worker->sched_mutex);
 
 	if (back)
-		starpu_task_list_push_back(&worker->local_tasks, task);
-	else
 		starpu_task_list_push_front(&worker->local_tasks, task);
+	else
+		starpu_task_list_push_back(&worker->local_tasks, task);
 
 	STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 	starpu_push_task_end(task);

+ 1 - 1
src/core/jobs.h

@@ -171,7 +171,7 @@ struct starpu_task *_starpu_pop_local_task(struct _starpu_worker *worker);
  * specified worker. If "back" is set, the task is put at the back of the list.
  * Considering the tasks are popped from the back, this value should be 0 to
  * enforce a FIFO ordering. */
-int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int back);
+int _starpu_push_local_task(struct _starpu_worker *worker, struct starpu_task *task, int prio);
 
 #define _STARPU_JOB_GET_ORDERED_BUFFER_HANDLE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].handle : job->ordered_buffers[i].handle)
 #define _STARPU_JOB_GET_ORDERED_BUFFER_MODE(job, i) ((job->dyn_ordered_buffers) ? job->dyn_ordered_buffers[i].mode : job->ordered_buffers[i].mode)

+ 3 - 3
src/core/sched_policy.c

@@ -680,7 +680,7 @@ pick:
 	}
 
 	task->mf_skip = 1;
-	starpu_task_list_push_front(&worker->local_tasks, task);
+	starpu_task_list_push_back(&worker->local_tasks, task);
 	goto pick;
 
 profiling:
@@ -758,9 +758,9 @@ void _starpu_wait_on_sched_event(void)
  * is sufficient. If "back" not null, the task is put at the back of the queue
  * where the worker will pop tasks first. Setting "back" to 0 therefore ensures
  * a FIFO ordering. */
-int starpu_push_local_task(int workerid, struct starpu_task *task, int back)
+int starpu_push_local_task(int workerid, struct starpu_task *task, int prio)
 {
 	struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
 
-	return  _starpu_push_local_task(worker, task, back);
+	return  _starpu_push_local_task(worker, task, prio);
 }

+ 1 - 1
src/core/task.c

@@ -613,7 +613,7 @@ int _starpu_task_submit_conversion_task(struct starpu_task *task,
 
 	struct _starpu_worker *worker;
 	worker = _starpu_get_worker_struct(workerid);
-	starpu_task_list_push_front(&worker->local_tasks, task);
+	starpu_task_list_push_back(&worker->local_tasks, task);
 
 	_starpu_profiling_set_task_push_end_time(task);
 

+ 10 - 6
src/sched_policies/deque_modeling_policy_data_aware.c

@@ -28,6 +28,10 @@
 #include <sched_policies/fifo_queues.h>
 #include <limits.h>
 
+#ifdef HAVE_AYUDAME_H
+#include <Ayudame.h>
+#endif
+
 #ifndef DBL_MIN
 #define DBL_MIN __DBL_MIN__
 #endif
@@ -123,7 +127,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
 	{
 		fifo_queue->ntasks--;
 
-		task = starpu_task_list_back(&fifo_queue->taskq);
+		task = starpu_task_list_front(&fifo_queue->taskq);
 		if (STARPU_UNLIKELY(!task))
 			return NULL;
 
@@ -137,7 +141,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
 		{
 			int priority = current->priority;
 
-			if (priority <= first_task_priority)
+			if (priority >= first_task_priority)
 			{
 				int non_ready = count_non_ready_buffers(current, node);
 				if (non_ready < non_ready_best)
@@ -150,7 +154,7 @@ static struct starpu_task *_starpu_fifo_pop_first_ready_task(struct _starpu_fifo
 				}
 			}
 
-			current = current->prev;
+			current = current->next;
 		}
 
 		starpu_task_list_erase(&fifo_queue->taskq, task);
@@ -978,17 +982,17 @@ static void dmda_post_exec_hook(struct starpu_task * task)
 {
 
 	struct _starpu_dmda_data *dt = (struct _starpu_dmda_data*)starpu_sched_ctx_get_policy_data(task->sched_ctx);
-	unsigned workerid = task->workerid;
+	int workerid = starpu_worker_get_id();
 	struct _starpu_fifo_taskq *fifo = dt->queue_array[workerid];
 	starpu_pthread_mutex_t *sched_mutex;
 	starpu_pthread_cond_t *sched_cond;
 	starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
-	_STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 	if(task->execute_on_a_specific_worker)
 		fifo->ntasks--;
 	fifo->exp_start = starpu_timing_now();
 	fifo->exp_end = fifo->exp_start + fifo->exp_len;
-	_STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+	STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 }
 
 struct starpu_sched_policy _starpu_sched_dm_policy =

+ 3 - 4
src/sched_policies/fifo_queues.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010-2012  Université de Bordeaux 1
+ * Copyright (C) 2010-2013  Université de Bordeaux 1
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  * Copyright (C) 2011  Télécom-SudParis
  *
@@ -69,7 +69,7 @@ _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct star
 
 		while (current)
 		{
-			if (current->priority >= task->priority)
+			if (current->priority < task->priority)
 				break;
 
 			prev = current;
@@ -111,7 +111,6 @@ _starpu_fifo_push_sorted_task(struct _starpu_fifo_taskq *fifo_queue, struct star
 	return 0;
 }
 
-/* TODO: revert front/back? */
 int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_task *task)
 {
 
@@ -121,7 +120,7 @@ int _starpu_fifo_push_task(struct _starpu_fifo_taskq *fifo_queue, struct starpu_
 	}
 	else
 	{
-		starpu_task_list_push_front(&fifo_queue->taskq, task);
+		starpu_task_list_push_back(&fifo_queue->taskq, task);
 
 		fifo_queue->ntasks++;
 		fifo_queue->nprocessed++;

+ 29 - 13
src/sched_policies/random_policy.c

@@ -29,13 +29,15 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 {
 	/* find the queue */
 
-	unsigned selected = 0;
 
 	double alpha_sum = 0.0;
 
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct starpu_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
         int worker;
+	int worker_arr[STARPU_NMAXWORKERS];
+	double speedup_arr[STARPU_NMAXWORKERS];
+	int size = 0;
 	struct starpu_sched_ctx_iterator it;
         if(workers->init_iterator)
                 workers->init_iterator(workers, &it);
@@ -43,32 +45,47 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
         while(workers->has_next(workers, &it))
 	{
                 worker = workers->get_next(workers, &it);
-
-		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		alpha_sum += starpu_worker_get_relative_speedup(perf_arch);
+		int impl = 0;
+		for(impl = 0; impl < STARPU_MAXIMPLEMENTATIONS; impl++)
+		{
+			if(starpu_worker_can_execute_task(worker, task, impl))
+			{
+				enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
+				double speedup = starpu_worker_get_relative_speedup(perf_arch);
+				alpha_sum += speedup;
+				speedup_arr[size] = speedup;
+				worker_arr[size++] = worker;
+				break;
+			}
+		}
 	}
 
 	double random = starpu_drand48()*alpha_sum;
 //	_STARPU_DEBUG("my rand is %e\n", random);
 
-	double alpha = 0.0;
-	while(workers->has_next(workers, &it))
-        {
-                worker = workers->get_next(workers, &it);
+	if(size == 0)
+		return -ENODEV;
 
-		enum starpu_perf_archtype perf_arch = starpu_worker_get_perf_archtype(worker);
-		double worker_alpha = starpu_worker_get_relative_speedup(perf_arch);
+	unsigned selected = worker_arr[size - 1];
 
-		if (alpha + worker_alpha > random && starpu_worker_can_execute_task(worker, task, 0))
+	double alpha = 0.0;
+	int i;
+	for(i = 0; i < size; i++)
+	{
+                worker = worker_arr[i];
+		double worker_alpha = speedup_arr[i];
+		
+		if (alpha + worker_alpha >= random)
 		{
 			/* we found the worker */
 			selected = worker;
 			break;
 		}
-
+		
 		alpha += worker_alpha;
 	}
 
+
 #ifdef HAVE_AYUDAME_H
 	if (AYU_event)
 	{
@@ -77,7 +94,6 @@ static int _random_push_task(struct starpu_task *task, unsigned prio)
 	}
 #endif
 
-	/* we should now have the best worker in variable "selected" */
 	return starpu_push_local_task(selected, task, prio);
 }
 

+ 4 - 0
src/sched_policies/work_stealing_policy.c

@@ -24,6 +24,10 @@
 #include <sched_policies/deque_queues.h>
 #include <core/debug.h>
 
+#ifdef HAVE_AYUDAME_H
+#include <Ayudame.h>
+#endif
+
 struct _starpu_work_stealing_data
 {
 	struct _starpu_deque_jobq **queue_array;

+ 1 - 0
tests/Makefile.am

@@ -215,6 +215,7 @@ noinst_PROGRAMS =				\
 	perfmodels/valid_model			\
 	sched_policies/data_locality            \
 	sched_policies/execute_all_tasks        \
+	sched_policies/prio        		\
 	sched_policies/simple_deps              \
 	sched_policies/simple_cpu_gpu_sched
 

+ 124 - 0
tests/sched_policies/prio.c

@@ -0,0 +1,124 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013 Université Bordeaux 1
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <unistd.h>
+#include <starpu.h>
+#include <starpu_scheduler.h>
+#include "../helper.h"
+
+#ifdef STARPU_QUICK_CHECK
+#define NTASKS 10
+#else
+#define NTASKS 1000
+#endif
+
+/*
+ * Task1 must be executed before task0, even if task0 is submitted first.
+ * Applies to : all schedulers.
+ */
+
+static void
+A(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	FPRINTF(stdout,"A");
+	usleep(1000);
+}
+
+static void
+B(void *buffers[], void *args)
+{
+	(void) buffers;
+	(void) args;
+	FPRINTF(stdout,"B");
+	usleep(1000);
+}
+
+static int
+run(struct starpu_sched_policy *policy)
+{
+	int ret;
+	struct starpu_conf conf;
+	int i;
+
+	starpu_conf_init(&conf);
+	conf.sched_policy = policy;
+	ret = starpu_init(&conf);
+	if (ret != 0)
+		exit(STARPU_TEST_SKIPPED);
+	starpu_profiling_status_set(1);
+
+	struct starpu_codelet clA =
+	{
+		.cpu_funcs = {A, NULL},
+		.nbuffers = 0
+	};
+
+	struct starpu_codelet clB =
+	{
+		.cpu_funcs = {B, NULL},
+		.nbuffers = 0
+	};
+
+	for (i = 0; i < NTASKS; i++) {
+		struct starpu_task *task = starpu_task_create();
+
+		if (i%2) {
+			task->cl = &clA;
+			task->priority=STARPU_MIN_PRIO;
+		} else {
+			task->cl = &clB;
+			task->priority=STARPU_MAX_PRIO;
+		}
+		task->detach=1;
+		ret = starpu_task_submit(task);
+		if (ret == -ENODEV) goto enodev;
+		STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
+	}
+
+	starpu_task_wait_for_all();
+	FPRINTF(stdout,"\n");
+
+	starpu_shutdown();
+	return 0;
+
+enodev:
+	starpu_shutdown();
+	return -ENODEV;
+}
+
+int
+main(void)
+{
+	struct starpu_sched_policy **policies;
+	struct starpu_sched_policy **policy;
+
+	policies = starpu_sched_get_predefined_policies();
+	for(policy=policies ; *policy!=NULL ; policy++)
+	{
+		int ret;
+
+		FPRINTF(stderr, "Running with policy %s.\n", (*policy)->policy_name);
+		ret = run(*policy);
+		if (ret == -ENODEV)
+			return STARPU_TEST_SKIPPED;
+		if (ret == 1)
+			return EXIT_FAILURE;
+	}
+
+	return EXIT_SUCCESS;
+}