|
@@ -72,6 +72,7 @@ struct _starpu_work_stealing_data_per_worker
|
|
|
{
|
|
|
struct _starpu_fifo_taskq *queue_array;
|
|
|
int *proxlist;
|
|
|
+ pthread_mutex_t worker_mutex;
|
|
|
|
|
|
#ifdef USE_LOCALITY_TASKS
|
|
|
/* This records the same as queue_array, but hashed by data accessed with locality flag. */
|
|
@@ -527,31 +528,25 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
|
|
|
struct starpu_task *task;
|
|
|
int workerid = starpu_worker_get_id_check();
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
task = ws_pick_task(workerid, workerid, sched_ctx_id);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
if (task)
|
|
|
{
|
|
|
locality_popped_task(task, workerid, sched_ctx_id);
|
|
|
/* there was a local task */
|
|
|
return task;
|
|
|
}
|
|
|
- starpu_pthread_mutex_t *worker_sched_mutex;
|
|
|
- starpu_pthread_cond_t *worker_sched_cond;
|
|
|
- starpu_worker_get_sched_condition(workerid, &worker_sched_mutex, &worker_sched_cond);
|
|
|
|
|
|
/* we need to steal someone's job */
|
|
|
unsigned victim = ws->select_victim(sched_ctx_id, workerid);
|
|
|
|
|
|
- /* Note: Releasing this mutex before taking the victim mutex, to avoid interlock*/
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(worker_sched_mutex);
|
|
|
-
|
|
|
- starpu_pthread_mutex_t *victim_sched_mutex;
|
|
|
- starpu_pthread_cond_t *victim_sched_cond;
|
|
|
-
|
|
|
- starpu_worker_get_sched_condition(victim, &victim_sched_mutex, &victim_sched_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(victim_sched_mutex);
|
|
|
-
|
|
|
if (ws->per_worker[victim].queue_array != NULL && ws->per_worker[victim].queue_array->ntasks > 0)
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[victim].worker_mutex);
|
|
|
task = ws_pick_task(victim, workerid, sched_ctx_id);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[victim].worker_mutex);
|
|
|
+ }
|
|
|
if (task)
|
|
|
{
|
|
|
_STARPU_TRACE_WORK_STEALING(workerid, victim);
|
|
@@ -560,13 +555,15 @@ static struct starpu_task *ws_pop_task(unsigned sched_ctx_id)
|
|
|
locality_popped_task(task, victim, sched_ctx_id);
|
|
|
}
|
|
|
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(victim_sched_mutex);
|
|
|
-
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(worker_sched_mutex);
|
|
|
if(!task)
|
|
|
{
|
|
|
if (ws->per_worker[workerid].queue_array != NULL && ws->per_worker[workerid].queue_array->ntasks > 0)
|
|
|
+ {
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
task = ws_pick_task(workerid, workerid, sched_ctx_id);
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
|
|
|
+ }
|
|
|
+
|
|
|
if (task)
|
|
|
{
|
|
|
locality_popped_task(task, workerid, sched_ctx_id);
|
|
@@ -653,6 +650,7 @@ static void ws_add_workers(unsigned sched_ctx_id, int *workerids,unsigned nworke
|
|
|
/* Tell helgrid that we are fine with getting outdated values,
|
|
|
* this is just an estimation */
|
|
|
STARPU_HG_DISABLE_CHECKING(ws->per_worker[workerid].queue_array->ntasks);
|
|
|
+ pthread_mutex_init(&ws->per_worker[workerid].worker_mutex, NULL);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -665,12 +663,8 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
|
|
|
|
|
|
for (i = 0; i < nworkers; i++)
|
|
|
{
|
|
|
- starpu_pthread_mutex_t *sched_mutex;
|
|
|
- starpu_pthread_cond_t *sched_cond;
|
|
|
-
|
|
|
workerid = workerids[i];
|
|
|
- starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
|
|
|
- STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
|
+
|
|
|
if (ws->per_worker[workerid].queue_array != NULL)
|
|
|
{
|
|
|
_starpu_destroy_fifo(ws->per_worker[workerid].queue_array);
|
|
@@ -678,7 +672,7 @@ static void ws_remove_workers(unsigned sched_ctx_id, int *workerids, unsigned nw
|
|
|
}
|
|
|
free(ws->per_worker[workerid].proxlist);
|
|
|
ws->per_worker[workerid].proxlist = NULL;
|
|
|
- STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
|
|
|
+ pthread_mutex_destroy(&ws->per_worker[workerid].worker_mutex);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -735,12 +729,6 @@ static unsigned lws_select_victim(unsigned sched_ctx_id, int workerid)
|
|
|
for (i = 0; i < nworkers; i++)
|
|
|
{
|
|
|
neighbor = ws->per_worker[workerid].proxlist[i];
|
|
|
- /* if a worker was removed, then nothing tells us that the proxlist is correct */
|
|
|
- if (!starpu_sched_ctx_contains_worker(neighbor, sched_ctx_id))
|
|
|
- {
|
|
|
- i--;
|
|
|
- continue;
|
|
|
- }
|
|
|
int ntasks = ws->per_worker[neighbor].queue_array->ntasks;
|
|
|
if (ntasks)
|
|
|
return neighbor;
|