Explorar o código

port r11359 from 1.1: Make eager release pressure on the mutexes by using a bitmap for signaling which workers are waiting for a task.

Samuel Thibault %!s(int64=11) %!d(string=hai) anos
pai
achega
8b8f4bffe4

+ 29 - 2
src/sched_policies/eager_central_policy.c

@@ -24,11 +24,13 @@
 #include <starpu_scheduler.h>
 #include <sched_policies/fifo_queues.h>
 #include <common/thread.h>
+#include <starpu_bitmap.h>
 
 struct _starpu_eager_center_policy_data
 {
 	struct _starpu_fifo_taskq *fifo;
 	starpu_pthread_mutex_t policy_mutex;
+	struct starpu_bitmap *waiters;
 };
 
 static void initialize_eager_center_policy(unsigned sched_ctx_id)
@@ -41,6 +43,7 @@ static void initialize_eager_center_policy(unsigned sched_ctx_id)
 
 	/* there is only a single queue in that trivial design */
 	data->fifo =  _starpu_create_fifo();
+	data->waiters = starpu_bitmap_create();
 
 	 /* Tell helgrind that it's fine to check for empty fifo in
 	  * pop_task_eager_policy without actual mutex (it's just an integer)
@@ -59,6 +62,7 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 
 	/* deallocate the job queue */
 	_starpu_destroy_fifo(data->fifo);
+	starpu_bitmap_destroy(data->waiters);
 
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
@@ -79,7 +83,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 	starpu_push_task_end(task);
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
 
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
@@ -93,14 +96,29 @@ static int push_task_eager_policy(struct starpu_task *task)
 	while(workers->has_next(workers, &it))
 	{
 		worker = workers->get_next(workers, &it);
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		if (starpu_bitmap_get(data->waiters, worker))
+		{
+			/* This worker is waiting for a task */
+			unsigned nimpl;
+			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+				if (starpu_worker_can_execute_task(worker, task, nimpl))
+				{
+					/* It can execute this one, tell him! */
+					starpu_bitmap_unset(data->waiters, worker);
+					break;
+				}
+		}
+#else
 		starpu_pthread_mutex_t *sched_mutex;
 		starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 
 		if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
 		    break; // wake up a single worker
-	}
 #endif
+	}
 	return ret_val;
 }
 
@@ -129,8 +147,17 @@ static struct starpu_task *pop_task_eager_policy(unsigned sched_ctx_id)
 	if (_starpu_fifo_empty(data->fifo))
 		return NULL;
 
+	if (starpu_bitmap_get(data->waiters, workerid))
+		/* Nobody woke us, avoid bothering the mutex */
+		return NULL;
+
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
+
 	task = _starpu_fifo_pop_task(data->fifo, workerid);
+	if (!task)
+		/* Tell pushers that we are waiting for tasks for us */
+		starpu_bitmap_set(data->waiters, workerid);
+
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 	return task;

+ 32 - 4
src/sched_policies/eager_central_priority_policy.c

@@ -23,6 +23,7 @@
 
 #include <starpu.h>
 #include <starpu_scheduler.h>
+#include <starpu_bitmap.h>
 
 #include <common/fxt.h>
 
@@ -45,6 +46,7 @@ struct _starpu_eager_central_prio_data
 {
 	struct _starpu_priority_taskq *taskq;
 	starpu_pthread_mutex_t policy_mutex;
+	struct starpu_bitmap *waiters;
 };
 
 /*
@@ -93,6 +95,7 @@ static void initialize_eager_center_priority_policy(unsigned sched_ctx_id)
 
 	/* only a single queue (even though there are several internaly) */
 	data->taskq = _starpu_create_priority_taskq(starpu_sched_ctx_get_min_priority(sched_ctx_id), starpu_sched_ctx_get_max_priority(sched_ctx_id));
+	data->waiters = starpu_bitmap_create();
 
 	/* Tell helgrind that it's fine to check for empty fifo in
 	 * _starpu_priority_pop_task without actual mutex (it's just an
@@ -109,6 +112,7 @@ static void deinitialize_eager_center_priority_policy(unsigned sched_ctx_id)
 
 	/* deallocate the task queue */
 	_starpu_destroy_priority_taskq(data->taskq);
+	starpu_bitmap_destroy(data->waiters);
 
 	starpu_sched_ctx_delete_worker_collection(sched_ctx_id);
 	STARPU_PTHREAD_MUTEX_DESTROY(&data->policy_mutex);
@@ -142,18 +146,32 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	if(workers->init_iterator)
 		workers->init_iterator(workers, &it);
 	
-#ifndef STARPU_NON_BLOCKING_DRIVERS
 	while(workers->has_next(workers, &it))
 	{
 		worker = workers->get_next(workers, &it);
+
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+		if (starpu_bitmap_get(data->waiters, worker))
+		{
+			/* This worker is waiting for a task */
+			unsigned nimpl;
+			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+				if (starpu_worker_can_execute_task(worker, task, nimpl))
+				{
+					/* It can execute this one, tell him! */
+					starpu_bitmap_unset(data->waiters, worker);
+					break;
+				}
+		}
+#else
 		starpu_pthread_mutex_t *sched_mutex;
 		starpu_pthread_cond_t *sched_cond;
 		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 
 		if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
 		    break; // wake up a single worker
-	}
 #endif
+	}
 
 #endif
 	return 0;
@@ -176,6 +194,10 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 	if (taskq->total_ntasks == 0)
 		return NULL;
 
+	if (starpu_bitmap_get(data->waiters, workerid))
+		/* Nobody woke us, avoid bothering the mutex */
+		return NULL;
+
 	/* release this mutex before trying to wake up other workers */
 	starpu_pthread_mutex_t *curr_sched_mutex;
 	starpu_pthread_cond_t *curr_sched_cond;
@@ -226,24 +248,30 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 		if(workers->init_iterator)
 			workers->init_iterator(workers, &it);
 		
-#ifndef STARPU_NON_BLOCKING_DRIVERS
 		while(workers->has_next(workers, &it))
 		{
 			worker = workers->get_next(workers, &it);
 			if(worker != workerid)
 			{
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+				starpu_bitmap_unset(data->waiters, worker);
+#else
 				starpu_pthread_mutex_t *sched_mutex;
 				starpu_pthread_cond_t *sched_cond;
 				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 				STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 				STARPU_PTHREAD_COND_SIGNAL(sched_cond);
 				STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
+#endif
 			}
 		}
-#endif
 	
 	}
 
+	if (!chosen_task)
+		/* Tell pushers that we are waiting for tasks for us */
+		starpu_bitmap_set(data->waiters, workerid);
+
 	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 	/* leave the mutex how it was found before this */