|
@@ -939,6 +939,7 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
|
|
static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
|
|
static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
|
|
{
|
|
{
|
|
int res = 0;
|
|
int res = 0;
|
|
|
|
+ unsigned i;
|
|
|
|
|
|
_starpu_may_pause();
|
|
_starpu_may_pause();
|
|
|
|
|
|
@@ -947,38 +948,39 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
|
|
- /* Test if async transfers are completed */
|
|
|
|
- for (unsigned i = 0; i < worker_set->nworkers; i++)
|
|
|
|
- {
|
|
|
|
- struct starpu_task *task = worker_set->workers[i].task_transferring;
|
|
|
|
- /* We send all buffers to execute the task */
|
|
|
|
- if (task != NULL && worker_set->workers[i].nb_buffers_transferred == worker_set->workers[i].nb_buffers_totransfer)
|
|
|
|
- {
|
|
|
|
- struct _starpu_job * j = _starpu_get_job_associated_to_task(task);
|
|
|
|
-
|
|
|
|
- _starpu_set_local_worker_key(&worker_set->workers[i]);
|
|
|
|
- _starpu_release_fetch_task_input_async(j, worker_set->workers[i].workerid, worker_set->workers[i].nb_buffers_totransfer);
|
|
|
|
-
|
|
|
|
- /* Execute the task */
|
|
|
|
- res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
|
|
|
|
- switch (res)
|
|
|
|
- {
|
|
|
|
- case 0:
|
|
|
|
- /* The task task has been launched with no error */
|
|
|
|
- break;
|
|
|
|
- case -EAGAIN:
|
|
|
|
- _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
|
|
|
|
- _starpu_push_task_to_workers(worker_set->workers[i].task_transferring);
|
|
|
|
- STARPU_ABORT();
|
|
|
|
- continue;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- STARPU_ASSERT(0);
|
|
|
|
- }
|
|
|
|
|
|
+ /* Test if async transfers are completed */
|
|
|
|
+ for (i = 0; i < worker_set->nworkers; i++)
|
|
|
|
+ {
|
|
|
|
+ struct starpu_task *task = worker_set->workers[i].task_transferring;
|
|
|
|
+ /* We send all buffers to execute the task */
|
|
|
|
+ if (task != NULL && worker_set->workers[i].nb_buffers_transferred == worker_set->workers[i].nb_buffers_totransfer)
|
|
|
|
+ {
|
|
|
|
+ struct _starpu_job * j = _starpu_get_job_associated_to_task(task);
|
|
|
|
|
|
- /* Reset it */
|
|
|
|
- worker_set->workers[i].task_transferring = NULL;
|
|
|
|
- }
|
|
|
|
|
|
+ _starpu_set_local_worker_key(&worker_set->workers[i]);
|
|
|
|
+ _starpu_release_fetch_task_input_async(j, worker_set->workers[i].workerid, worker_set->workers[i].nb_buffers_totransfer);
|
|
|
|
+
|
|
|
|
+ /* Execute the task */
|
|
|
|
+ res = _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
|
|
|
|
+ switch (res)
|
|
|
|
+ {
|
|
|
|
+ case 0:
|
|
|
|
+ /* The task task has been launched with no error */
|
|
|
|
+ break;
|
|
|
|
+ case -EAGAIN:
|
|
|
|
+ _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
|
|
|
|
+ _starpu_push_task_to_workers(worker_set->workers[i].task_transferring);
|
|
|
|
+ STARPU_ABORT();
|
|
|
|
+ continue;
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ STARPU_ASSERT(0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /* Reset it */
|
|
|
|
+ worker_set->workers[i].task_transferring = NULL;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
_STARPU_TRACE_START_PROGRESS(memnode);
|
|
_STARPU_TRACE_START_PROGRESS(memnode);
|
|
res |= __starpu_datawizard_progress(1, 1);
|
|
res |= __starpu_datawizard_progress(1, 1);
|
|
@@ -1008,21 +1010,21 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
|
|
#endif
|
|
#endif
|
|
|
|
|
|
/*if at least one worker have pop a task*/
|
|
/*if at least one worker have pop a task*/
|
|
- if(res != 0)
|
|
|
|
- {
|
|
|
|
- unsigned i;
|
|
|
|
- for(i=0; i<worker_set->nworkers; i++)
|
|
|
|
- {
|
|
|
|
- if(tasks[i] != NULL)
|
|
|
|
- {
|
|
|
|
- _starpu_set_local_worker_key(&worker_set->workers[i]);
|
|
|
|
- _starpu_fetch_task_input(tasks[i], _starpu_get_job_associated_to_task(tasks[i]), 1);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /* Handle message which have been store */
|
|
|
|
- _starpu_src_common_handle_stored_async(mp_node);
|
|
|
|
|
|
+ if(res != 0)
|
|
|
|
+ {
|
|
|
|
+ unsigned i;
|
|
|
|
+ for(i=0; i<worker_set->nworkers; i++)
|
|
|
|
+ {
|
|
|
|
+ if(tasks[i] != NULL)
|
|
|
|
+ {
|
|
|
|
+ _starpu_set_local_worker_key(&worker_set->workers[i]);
|
|
|
|
+ _starpu_fetch_task_input(tasks[i], _starpu_get_job_associated_to_task(tasks[i]), 1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ /* Handle message which have been store */
|
|
|
|
+ _starpu_src_common_handle_stored_async(mp_node);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|