瀏覽代碼

update heteroprio with new synchro scheme

Olivier Aumage 8 年之前
父節點
當前提交
428ed04158
共有 2 個文件被更改,包括 12 次插入17 次删除
  1. 1 1
      src/core/sched_policy.c
  2. 11 16
      src/sched_policies/heteroprio.c

+ 1 - 1
src/core/sched_policy.c

@@ -71,7 +71,6 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_dmda_sorted_policy,
 	&_starpu_sched_dmda_sorted_decision_policy,
 	&_starpu_sched_parallel_heft_policy,
-	&_starpu_sched_heteroprio_policy,
 #else
 	&_starpu_sched_eager_policy,
 	&_starpu_sched_prio_policy,
@@ -79,6 +78,7 @@ static struct starpu_sched_policy *predefined_policies[] =
 	&_starpu_sched_peager_policy,
 	&_starpu_sched_ws_policy,
 	&_starpu_sched_lws_policy,
+	&_starpu_sched_heteroprio_policy,
 	&_starpu_sched_graph_test_policy,
 #warning TODO: update sched policies with new synchro scheme
 #endif

+ 11 - 16
src/sched_policies/heteroprio.c

@@ -467,14 +467,9 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 		return NULL;
 	}
 #endif
-	starpu_pthread_mutex_t *worker_sched_mutex;
-	starpu_pthread_cond_t *worker_sched_cond;
-	starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
-
-	/* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
-	STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(worker_sched_mutex);
-
+	_starpu_worker_enter_section_safe_for_observation();
 	STARPU_PTHREAD_MUTEX_LOCK(&hp->policy_mutex);
+	_starpu_worker_leave_section_safe_for_observation();
 
 	/* keep track of the new added task to perfom real prefetch on node */
 	unsigned nb_added_tasks = 0;
@@ -587,12 +582,8 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 				if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
 				   && hp->workers_heteroprio[victim].tasks_queue->ntasks)
 				{
-					starpu_pthread_mutex_t *victim_sched_mutex;
-					starpu_pthread_cond_t *victim_sched_cond;
-					starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
-
 					/* ensure the worker is not currently prefetching its data */
-					STARPU_PTHREAD_MUTEX_LOCK_SCHED(victim_sched_mutex);
+					_starpu_worker_lock_for_observation_relax(victim);
 
 					if(hp->workers_heteroprio[victim].arch_index == worker->arch_index
 					   && hp->workers_heteroprio[victim].tasks_queue->ntasks)
@@ -602,10 +593,10 @@ static struct starpu_task *pop_task_heteroprio_policy(unsigned sched_ctx_id)
 						/* we steal a task update global counter */
 						hp->nb_prefetched_tasks_per_arch_index[hp->workers_heteroprio[victim].arch_index] -= 1;
 
-						STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
+						_starpu_worker_unlock_for_observation(victim);
 						goto done;
 					}
-					STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
+					_starpu_worker_unlock_for_observation(victim);
 				}
 			}
 		}
@@ -619,16 +610,20 @@ done:		;
 	}
 	STARPU_PTHREAD_MUTEX_UNLOCK(&hp->policy_mutex);
 
-	STARPU_PTHREAD_MUTEX_LOCK_SCHED(worker_sched_mutex);
 	if(task)
 	{
+		_starpu_worker_enter_section_safe_for_observation();
+		_starpu_sched_ctx_lock_write(sched_ctx_id);
 		unsigned child_sched_ctx = starpu_sched_ctx_worker_is_master_for_child_ctx(workerid, sched_ctx_id);
 		if(child_sched_ctx != STARPU_NMAX_SCHED_CTXS)
 		{
 			starpu_sched_ctx_move_task_to_ctx(task, child_sched_ctx, 1, 1);
 			starpu_sched_ctx_revert_task_counters(sched_ctx_id, task->flops);
-			return NULL;
+			task = NULL;
 		}
+		_starpu_sched_ctx_unlock_write(sched_ctx_id);
+		_starpu_worker_leave_section_safe_for_observation();
+		return task;
 	}
 
 	/* if we have task (task) me way have some in the queue (worker->tasks_queue_size) that was freshly addeed (nb_added_tasks) */