|
@@ -582,7 +582,6 @@ int ws_push_task(struct starpu_task *task)
|
|
struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
struct _starpu_work_stealing_data *ws = (struct _starpu_work_stealing_data*)starpu_sched_ctx_get_policy_data(sched_ctx_id);
|
|
int workerid = -1;
|
|
int workerid = -1;
|
|
|
|
|
|
-pick_worker:
|
|
|
|
#ifdef USE_LOCALITY
|
|
#ifdef USE_LOCALITY
|
|
workerid = select_worker_locality(task, sched_ctx_id);
|
|
workerid = select_worker_locality(task, sched_ctx_id);
|
|
#endif
|
|
#endif
|
|
@@ -590,20 +589,15 @@ pick_worker:
|
|
workerid = starpu_worker_get_id();
|
|
workerid = starpu_worker_get_id();
|
|
|
|
|
|
/* If the current thread is not a worker but
|
|
/* If the current thread is not a worker but
|
|
- * the main thread (-1), we find the better one to
|
|
|
|
- * put task on its queue */
|
|
|
|
- if (workerid == -1)
|
|
|
|
|
|
+ * the main thread (-1) or the current worker is not in the target
|
|
|
|
+ * context, we find the better one to put task on its queue */
|
|
|
|
+ if (workerid == -1 || !starpu_sched_ctx_contains_worker(workerid, sched_ctx_id))
|
|
workerid = select_worker(sched_ctx_id);
|
|
workerid = select_worker(sched_ctx_id);
|
|
|
|
|
|
starpu_pthread_mutex_t *sched_mutex;
|
|
starpu_pthread_mutex_t *sched_mutex;
|
|
starpu_pthread_cond_t *sched_cond;
|
|
starpu_pthread_cond_t *sched_cond;
|
|
starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
|
|
starpu_worker_get_sched_condition(workerid, &sched_mutex, &sched_cond);
|
|
STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_LOCK_SCHED(sched_mutex);
|
|
-
|
|
|
|
- /* Maybe the worker we selected was removed before we picked the mutex */
|
|
|
|
- if (ws->per_worker[workerid].queue_array == NULL)
|
|
|
|
- goto pick_worker;
|
|
|
|
-
|
|
|
|
record_data_locality(task, workerid);
|
|
record_data_locality(task, workerid);
|
|
|
|
|
|
#ifdef HAVE_AYUDAME_H
|
|
#ifdef HAVE_AYUDAME_H
|
|
@@ -615,11 +609,12 @@ pick_worker:
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+ STARPU_PTHREAD_MUTEX_LOCK(&ws->per_worker[workerid].worker_mutex);
|
|
_starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
|
|
_starpu_fifo_push_task(ws->per_worker[workerid].queue_array, task);
|
|
|
|
+ STARPU_PTHREAD_MUTEX_UNLOCK(&ws->per_worker[workerid].worker_mutex);
|
|
locality_pushed_task(task, workerid, sched_ctx_id);
|
|
locality_pushed_task(task, workerid, sched_ctx_id);
|
|
|
|
|
|
starpu_push_task_end(task);
|
|
starpu_push_task_end(task);
|
|
-
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
|
|
STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(sched_mutex);
|
|
|
|
|
|
#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
|
|
#if !defined(STARPU_NON_BLOCKING_DRIVERS) || defined(STARPU_SIMGRID)
|