|
@@ -1704,13 +1704,28 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
|
|
int cur_workerid = starpu_worker_get_id();
|
|
int cur_workerid = starpu_worker_get_id();
|
|
if (workerid != cur_workerid)
|
|
if (workerid != cur_workerid)
|
|
{
|
|
{
|
|
|
|
+ /* in order to observe the 'blocked' state of a worker from
|
|
|
|
+ * another worker, we must avoid race conditions between
|
|
|
|
+ * 'blocked' state changes and state observations. This is the
|
|
|
|
+ * purpose of this 'if' block. */
|
|
struct _starpu_worker *cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
|
|
struct _starpu_worker *cur_worker = cur_workerid<starpu_worker_get_count()?_starpu_get_worker_struct(cur_workerid):NULL;
|
|
|
|
+
|
|
int relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_safe_for_observation == 0);
|
|
int relax_own_observation_state = (cur_worker != NULL) && (cur_worker->state_safe_for_observation == 0);
|
|
if (relax_own_observation_state && !worker->state_safe_for_observation)
|
|
if (relax_own_observation_state && !worker->state_safe_for_observation)
|
|
{
|
|
{
|
|
|
|
+ /* moreover, when a worker (cur_worker != NULL)
|
|
|
|
+ * observes another worker, we need to take special
|
|
|
|
+ * care to avoid live locks, thus the observing worker
|
|
|
|
+ * must enter the relaxed state (if not relaxed
|
|
|
|
+ * already) before doing the observation in mutual
|
|
|
|
+ * exclusion */
|
|
cur_worker->state_safe_for_observation = 1;
|
|
cur_worker->state_safe_for_observation = 1;
|
|
STARPU_PTHREAD_COND_BROADCAST(&cur_worker->sched_cond);
|
|
STARPU_PTHREAD_COND_BROADCAST(&cur_worker->sched_cond);
|
|
}
|
|
}
|
|
|
|
+ /* the observer waits for a safe window to observe the state,
|
|
|
|
+ * and also waits for any pending blocking state change
|
|
|
|
+ * requests to be processed, in order to not obtain an
|
|
|
|
+ * ephemeral information */
|
|
while (!worker->state_safe_for_observation
|
|
while (!worker->state_safe_for_observation
|
|
|| worker->state_block_in_parallel_req
|
|
|| worker->state_block_in_parallel_req
|
|
|| worker->state_unblock_in_parallel_req
|
|
|| worker->state_unblock_in_parallel_req
|
|
@@ -1724,10 +1739,11 @@ unsigned starpu_worker_is_blocked_in_parallel(int workerid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
unsigned ret = _starpu_config.workers[workerid].state_blocked_in_parallel;
|
|
unsigned ret = _starpu_config.workers[workerid].state_blocked_in_parallel;
|
|
|
|
+ /* once a worker state has been observed, the worker is 'tainted' for the next one full sched_op,
|
|
|
|
+ * to avoid changing the observed worker state - on which the observer
|
|
|
|
+ * made a scheduling decision - after the fact. */
|
|
worker->state_blocked_in_parallel_observed = 1;
|
|
worker->state_blocked_in_parallel_observed = 1;
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
|
|
|
|
- return ret;
|
|
|
|
-}
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex); return ret; }
|
|
|
|
|
|
unsigned starpu_worker_is_slave_somewhere(int workerid)
|
|
unsigned starpu_worker_is_slave_somewhere(int workerid)
|
|
{
|
|
{
|