瀏覽代碼

fix dummy too

Andra Hugo 12 年之前
父節點
當前提交
29536adfdd
共有 3 個文件被更改,包括 40 次插入16 次删除
  1. 37 13
      examples/scheduler/dummy_sched.c
  2. 1 1
      src/core/sched_ctx.c
  3. 2 2
      src/sched_policies/eager_central_policy.c

+ 37 - 13
examples/scheduler/dummy_sched.c

@@ -23,8 +23,7 @@
 
 typedef struct dummy_sched_data {
 	struct starpu_task_list sched_list;
-	pthread_mutex_t sched_mutex;
-	pthread_cond_t sched_cond;
+	pthread_mutex_t policy_mutex;
 } dummy_sched_data;
 
 static void init_dummy_sched(unsigned sched_ctx_id)
@@ -37,11 +36,9 @@ static void init_dummy_sched(unsigned sched_ctx_id)
 	/* Create a linked-list of tasks and a condition variable to protect it */
 	starpu_task_list_init(&data->sched_list);
 
-	pthread_mutex_init(&data->sched_mutex, NULL);
-	pthread_cond_init(&data->sched_cond, NULL);
-
 	starpu_sched_ctx_set_policy_data(sched_ctx_id, (void*)data);		
 
+	pthread_mutex_init(&data->policy_mutex, NULL);
 	FPRINTF(stderr, "Initialising Dummy scheduler\n");
 }
 
@@ -51,11 +48,10 @@ static void deinit_dummy_sched(unsigned sched_ctx_id)
 
 	STARPU_ASSERT(starpu_task_list_empty(&data->sched_list));
 
-	pthread_cond_destroy(&data->sched_cond);
-	pthread_mutex_destroy(&data->sched_mutex);
-
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 
+	pthread_mutex_destroy(&data->policy_mutex);
+
 	free(data);
 	
 	FPRINTF(stderr, "Destroying Dummy scheduler\n");
@@ -66,13 +62,38 @@ static int push_task_dummy(struct starpu_task *task)
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 
-	pthread_mutex_lock(&data->sched_mutex);
+	/* NB: In this simplistic strategy, we assume that the context in which
+	   we push task has at least one worker*/
 
-	starpu_task_list_push_front(&data->sched_list, task);
 
-	pthread_cond_signal(&data->sched_cond);
+	/* lock all workers when pushing tasks on a list where all
+	   of them would pop for tasks */
+	unsigned worker = 0;
+	struct starpu_sched_ctx_worker_collection *workers = starpu_sched_ctx_get_worker_collection(sched_ctx_id);
+	struct starpu_iterator it;
+	if(workers->init_iterator)
+		workers->init_iterator(workers, &it);
 
-	pthread_mutex_unlock(&data->sched_mutex);
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		pthread_mutex_t *sched_mutex;
+		pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		pthread_mutex_lock(sched_mutex);
+	}
+
+	starpu_task_list_push_front(&data->sched_list, task);
+
+	while(workers->has_next(workers, &it))
+	{
+		worker = workers->get_next(workers, &it);
+		pthread_mutex_t *sched_mutex;
+		pthread_cond_t *sched_cond;
+		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+		pthread_cond_signal(sched_cond);
+		pthread_mutex_unlock(sched_mutex);
+	}
 
 	return 0;
 }
@@ -86,7 +107,10 @@ static struct starpu_task *pop_task_dummy(unsigned sched_ctx_id)
 	 * the calling worker. So we just take the head of the list and give it
 	 * to the worker. */
 	struct dummy_sched_data *data = (struct dummy_sched_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	return starpu_task_list_pop_back(&data->sched_list);
+	pthread_mutex_lock(&data->policy_mutex);
+	struct starpu_task *task = starpu_task_list_pop_back(&data->sched_list);
+	pthread_mutex_unlock(&data->policy_mutex);
+	return task;
 }
 
 static struct starpu_sched_policy dummy_sched_policy =

+ 1 - 1
src/core/sched_ctx.c

@@ -504,7 +504,7 @@ void _starpu_delete_all_sched_ctxs()
 		_STARPU_PTHREAD_MUTEX_LOCK(&changing_ctx_mutex[i]);
 		if(sched_ctx->id != STARPU_NMAX_SCHED_CTXS)
 		{
-			if(_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx->id) && _starpu_wait_for_all_tasks_of_sched_ctx(0))
+			if(!_starpu_wait_for_all_tasks_of_sched_ctx(sched_ctx->id) && !_starpu_wait_for_all_tasks_of_sched_ctx(0))
 			{
 				_starpu_sched_ctx_free_scheduling_data(sched_ctx);
 				_starpu_barrier_counter_destroy(&sched_ctx->tasks_barrier);

+ 2 - 2
src/sched_policies/eager_central_policy.c

@@ -21,7 +21,7 @@
  *	JOB QUEUE.
  */
 
-#include <core/workers.h>
+//#include <core/workers.h>
 #include <sched_policies/fifo_queues.h>
 
 struct _starpu_eager_center_policy_data
@@ -60,7 +60,7 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 }
 
 static int push_task_eager_policy(struct starpu_task *task)
-{
+ {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
 	_starpu_pthread_mutex_t *changing_ctx_mutex = starpu_get_changing_ctx_mutex(sched_ctx_id);