Explorar o código

port r11361 from 1.1: Fix using fields of the just-pushed task without keeping the mutex, otherwise it might get executed and freed in between. Also make the blocking mode wake only workers which can execute the task

Samuel Thibault %!s(int64=12) %!d(string=hai) anos
pai
achega
c3d255416f

+ 25 - 21
src/sched_policies/eager_central_policy.c

@@ -70,10 +70,9 @@ static void deinitialize_eager_center_policy(unsigned sched_ctx_id)
 }
 
 static int push_task_eager_policy(struct starpu_task *task)
- {
+{
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_center_policy_data *data = (struct _starpu_eager_center_policy_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-	int ret_val = 0;
 		
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	starpu_task_list_push_back(&data->fifo->taskq,task);
@@ -81,7 +80,6 @@ static int push_task_eager_policy(struct starpu_task *task)
 	data->fifo->nprocessed++;
 
 	starpu_push_task_end(task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
 
 	/*if there are no tasks block */
@@ -98,28 +96,34 @@ static int push_task_eager_policy(struct starpu_task *task)
 		worker = workers->get_next(workers, &it);
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-		if (starpu_bitmap_get(data->waiters, worker))
-		{
-			/* This worker is waiting for a task */
-			unsigned nimpl;
-			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-				if (starpu_worker_can_execute_task(worker, task, nimpl))
-				{
-					/* It can execute this one, tell him! */
-					starpu_bitmap_unset(data->waiters, worker);
-					break;
-				}
-		}
+		if (!starpu_bitmap_get(data->waiters, worker))
+			/* This worker is not waiting for a task */
+			continue;
+#endif
+
+		unsigned nimpl;
+		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+			if (starpu_worker_can_execute_task(worker, task, nimpl))
+			{
+				/* It can execute this one, tell him! */
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+				starpu_bitmap_unset(data->waiters, worker);
+				/* We really woke at least somebody, no need to wake somebody else */
+				goto out;
 #else
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				starpu_pthread_mutex_t *sched_mutex;
+				starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 
-		if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
-		    break; // wake up a single worker
+				if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
+				    goto out; // wake up a single worker
 #endif
+			}
 	}
-	return ret_val;
+out:
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
+
+	return 0;
 }
 
 static struct starpu_task *pop_every_task_eager_policy(unsigned sched_ctx_id)

+ 22 - 24
src/sched_policies/eager_central_priority_policy.c

@@ -123,9 +123,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 {
 	unsigned sched_ctx_id = task->sched_ctx;
 	struct _starpu_eager_central_prio_data *data = (struct _starpu_eager_central_prio_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
-
 	struct _starpu_priority_taskq *taskq = data->taskq;
-	
 
 	STARPU_PTHREAD_MUTEX_LOCK(&data->policy_mutex);
 	unsigned priolevel = task->priority - STARPU_MIN_PRIO;
@@ -134,9 +132,7 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 	taskq->ntasks[priolevel]++;
 	taskq->total_ntasks++;
 	starpu_push_task_end(task);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-#ifndef STARPU_NON_BLOCKING_DRIVERS
 	/*if there are no tasks block */
 	/* wake people waiting for a task */
 	unsigned worker = 0;
@@ -151,29 +147,33 @@ static int _starpu_priority_push_task(struct starpu_task *task)
 		worker = workers->get_next(workers, &it);
 
 #ifdef STARPU_NON_BLOCKING_DRIVERS
-		if (starpu_bitmap_get(data->waiters, worker))
-		{
-			/* This worker is waiting for a task */
-			unsigned nimpl;
-			for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
-				if (starpu_worker_can_execute_task(worker, task, nimpl))
-				{
-					/* It can execute this one, tell him! */
-					starpu_bitmap_unset(data->waiters, worker);
-					break;
-				}
-		}
+		if (!starpu_bitmap_get(data->waiters, worker))
+			/* This worker is not waiting for a task */
+			continue;
+#endif
+
+		unsigned nimpl;
+		for (nimpl = 0; nimpl < STARPU_MAXIMPLEMENTATIONS; nimpl++)
+			if (starpu_worker_can_execute_task(worker, task, nimpl))
+			{
+				/* It can execute this one, tell him! */
+#ifdef STARPU_NON_BLOCKING_DRIVERS
+				starpu_bitmap_unset(data->waiters, worker);
+				/* We really woke at least somebody, no need to wake somebody else */
+				goto out;
 #else
-		starpu_pthread_mutex_t *sched_mutex;
-		starpu_pthread_cond_t *sched_cond;
-		starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
+				starpu_pthread_mutex_t *sched_mutex;
+				starpu_pthread_cond_t *sched_cond;
+				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
 
-		if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
-		    break; // wake up a single worker
+				if (starpu_wakeup_worker(worker, sched_cond, sched_mutex))
+				    goto out; // wake up a single worker
 #endif
+			}
 	}
+out:
+	STARPU_PTHREAD_MUTEX_UNLOCK(&data->policy_mutex);
 
-#endif
 	return 0;
 }
 
@@ -259,9 +259,7 @@ static struct starpu_task *_starpu_priority_pop_task(unsigned sched_ctx_id)
 				starpu_pthread_mutex_t *sched_mutex;
 				starpu_pthread_cond_t *sched_cond;
 				starpu_worker_get_sched_condition(worker, &sched_mutex, &sched_cond);
-				STARPU_PTHREAD_MUTEX_LOCK(sched_mutex);
 				STARPU_PTHREAD_COND_SIGNAL(sched_cond);
-				STARPU_PTHREAD_MUTEX_UNLOCK(sched_mutex);
 #endif
 			}
 		}