|
|
@@ -43,6 +43,11 @@ struct starpu_save_thread_env
|
|
|
struct starpu_save_thread_env save_thread_env[STARPU_MAXMPIDEVS];
|
|
|
#endif
|
|
|
|
|
|
+static unsigned mp_node_memory_node(struct _starpu_mp_node *node)
|
|
|
+{
|
|
|
+ return starpu_worker_get_memory_node(node->baseworkerid);
|
|
|
+}
|
|
|
+
|
|
|
/* Finalize the execution of a task by a worker*/
|
|
|
static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
|
|
|
{
|
|
|
@@ -182,7 +187,7 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
|
|
|
struct mp_message * message = mp_message_list_pop_back(&node->message_queue);
|
|
|
/* Release mutex during handle */
|
|
|
stopped_progress = 1;
|
|
|
- _STARPU_TRACE_END_PROGRESS(memnode);
|
|
|
+ _STARPU_TRACE_END_PROGRESS(mp_node_memory_node(node));
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
|
|
|
_starpu_src_common_handle_async(node, message->buffer,
|
|
|
message->size, message->type, 1);
|
|
|
@@ -193,7 +198,7 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
|
|
|
}
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
|
|
|
if (stopped_progress)
|
|
|
- _STARPU_TRACE_START_PROGRESS(memnode);
|
|
|
+ _STARPU_TRACE_START_PROGRESS(mp_node_memory_node(node));
|
|
|
}
|
|
|
|
|
|
/* Store a message if is asynchronous
|
|
|
@@ -933,6 +938,7 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
|
|
|
node->dt_send(node, &config->combined_workers,combined_worker_size, NULL);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
|
|
|
@@ -995,13 +1001,13 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
|
|
|
while(mp_node->mp_recv_is_ready(mp_node))
|
|
|
{
|
|
|
stopped_progress = 1;
|
|
|
- _STARPU_TRACE_END_PROGRESS(memnode);
|
|
|
+ _STARPU_TRACE_END_PROGRESS(mp_node_memory_node(mp_node));
|
|
|
_starpu_src_common_recv_async(mp_node);
|
|
|
/* Mutex is unlock in _starpu_src_common_recv_async */
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&mp_node->connection_mutex);
|
|
|
}
|
|
|
if (stopped_progress)
|
|
|
- _STARPU_TRACE_START_PROGRESS(memnode);
|
|
|
+ _STARPU_TRACE_START_PROGRESS(mp_node_memory_node(mp_node));
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&mp_node->connection_mutex);
|
|
|
|
|
|
@@ -1016,16 +1022,16 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
|
|
|
/*if at least one worker have pop a task*/
|
|
|
if(res != 0)
|
|
|
{
|
|
|
- unsigned i;
|
|
|
for(i=0; i<worker_set->nworkers; i++)
|
|
|
{
|
|
|
if(tasks[i] != NULL)
|
|
|
{
|
|
|
- _STARPU_TRACE_END_PROGRESS(memnode);
|
|
|
- _starpu_set_local_worker_key(&worker_set->workers[i]);
|
|
|
+ struct _starpu_worker *worker = &worker_set->workers[i];
|
|
|
+ _STARPU_TRACE_END_PROGRESS(worker->memory_node);
|
|
|
+ _starpu_set_local_worker_key(worker);
|
|
|
int ret = _starpu_fetch_task_input(tasks[i], _starpu_get_job_associated_to_task(tasks[i]), 1);
|
|
|
STARPU_ASSERT(!ret);
|
|
|
- _STARPU_TRACE_START_PROGRESS(memnode);
|
|
|
+ _STARPU_TRACE_START_PROGRESS(worker->memory_node);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1065,9 +1071,9 @@ void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
|
|
|
struct _starpu_machine_config *config = baseworker->config;
|
|
|
unsigned baseworkerid = baseworker - config->workers;
|
|
|
_starpu_src_common_send_workers(mp_node[device], baseworkerid, worker_set[device].nworkers);
|
|
|
+ _STARPU_TRACE_START_PROGRESS(memnode[device]);
|
|
|
}
|
|
|
|
|
|
- _STARPU_TRACE_START_PROGRESS(memnode);
|
|
|
/*main loop*/
|
|
|
while (_starpu_machine_is_running())
|
|
|
{
|
|
|
@@ -1078,10 +1084,12 @@ void _starpu_src_common_workers_set(struct _starpu_worker_set * worker_set,
|
|
|
}
|
|
|
}
|
|
|
free(tasks);
|
|
|
- _STARPU_TRACE_END_PROGRESS(memnode);
|
|
|
|
|
|
for (device = 0; device < ndevices; device++)
|
|
|
+ {
|
|
|
+ _STARPU_TRACE_END_PROGRESS(memnode[device]);
|
|
|
_starpu_handle_all_pending_node_data_requests(memnode[device]);
|
|
|
+ }
|
|
|
|
|
|
/* In case there remains some memory that was automatically
|
|
|
* allocated by StarPU, we release it now. Note that data
|