|
@@ -329,8 +329,10 @@ int ws_push_task(struct starpu_task *task)
|
|
|
{
|
|
|
unsigned sched_ctx_id = task->sched_ctx;
|
|
|
struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
|
+ int workerid;
|
|
|
|
|
|
- int workerid = starpu_worker_get_id();
|
|
|
+pick_worker:
|
|
|
+ workerid = starpu_worker_get_id();
|
|
|
|
|
|
/* If the current thread is not a worker but
|
|
|
* the main thread (-1), we find the better one to
|
|
@@ -343,6 +345,10 @@ int ws_push_task(struct starpu_task *task)
|
|
|
starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
|
|
|
STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
|
|
|
|
+ /* Maybe the worker we selected was removed before we picked the mutex */
|
|
|
+ if (ws->queue_array[workerid] == NULL)
|
|
|
+ goto pick_worker;
|
|
|
+
|
|
|
#ifdef HAVE_AYUDAME_H
|
|
|
struct _starpu_job *j = _starpu_get_job_associated_to_task(task);
|
|
|
if (AYU_event)
|
|
@@ -398,7 +404,12 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
|
|
|
|
|
|
for (i = 0; i < nworkers; i++)
|
|
|
{
|
|
|
+ starpu_pthread_mutex_t *sched_mutex;
|
|
|
+ starpu_pthread_cond_t *sched_cond;
|
|
|
+
|
|
|
workerid = workerids[i];
|
|
|
+ starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
|
if (ws->queue_array[workerid] != NULL)
|
|
|
{
|
|
|
_starpu_destroy_fifo(ws->queue_array[workerid]);
|
|
@@ -406,6 +417,7 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
|
|
|
}
|
|
|
if (ws->proxlist != NULL)
|
|
|
free(ws->proxlist[workerid]);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -462,6 +474,12 @@ static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
|
|
|
for (i = 0; i < nworkers; i++)
|
|
|
{
|
|
|
neighbor = ws->proxlist[workerid][i];
|
|
|
+ /* if a worker was removed, then nothing tells us that the proxlist is correct */
|
|
|
+ if (!starpu_sched_ctx_contains_worker(neighbor, sched_ctx_id))
|
|
|
+ {
|
|
|
+ i--;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
int ntasks = ws->queue_array[neighbor]->ntasks;
|
|
|
if (ntasks)
|
|
|
return neighbor;
|