|
@@ -29,24 +29,24 @@
|
|
|
|
|
|
|
|
|
/* Finalize the execution of a task by a worker*/
|
|
|
-static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
|
|
|
+static int _starpu_src_common_finalize_job (struct _starpu_job *j, struct _starpu_worker *worker)
|
|
|
{
|
|
|
int profiling = starpu_profiling_status_get();
|
|
|
struct timespec codelet_end;
|
|
|
_starpu_driver_end_job(worker, j, &worker->perf_arch, &codelet_end, 0,
|
|
|
profiling);
|
|
|
-
|
|
|
+
|
|
|
int count = worker->current_rank;
|
|
|
|
|
|
/* If it's a combined worker, we check if it's the last one of his combined */
|
|
|
if(j->task_size > 1)
|
|
|
{
|
|
|
- struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid);
|
|
|
+ struct _starpu_combined_worker * cb_worker = _starpu_get_combined_worker_struct(worker->combined_workerid);
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&cb_worker->count_mutex);
|
|
|
count = cb_worker->count--;
|
|
|
if(count == 0)
|
|
|
- cb_worker->count = cb_worker->worker_size - 1;
|
|
|
+ cb_worker->count = cb_worker->worker_size - 1;
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&cb_worker->count_mutex);
|
|
|
}
|
|
|
|
|
@@ -71,8 +71,8 @@ static int _starpu_src_common_process_completed_job(struct _starpu_worker_set *w
|
|
|
{
|
|
|
int coreid;
|
|
|
|
|
|
- STARPU_ASSERT(sizeof(coreid) == arg_size);
|
|
|
-
|
|
|
+ STARPU_ASSERT(sizeof(coreid) == arg_size);
|
|
|
+
|
|
|
coreid = *(int *) arg;
|
|
|
|
|
|
struct _starpu_worker *worker = &workerset->workers[coreid];
|
|
@@ -100,19 +100,19 @@ static void _starpu_src_common_pre_exec(void * arg, int arg_size)
|
|
|
struct _starpu_worker * worker = _starpu_get_worker_struct(combined_worker->combined_workerid[i]);
|
|
|
_starpu_set_local_worker_key(worker);
|
|
|
_starpu_sched_pre_exec_hook(worker->current_task);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* recv a message and handle asynchronous message
|
|
|
* return 0 if the message has not been handle (it's certainly mean that it's a synchronous message)
|
|
|
* return 1 if the message has been handle
|
|
|
*/
|
|
|
-static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED,
|
|
|
- void * arg, int arg_size,
|
|
|
+static int _starpu_src_common_handle_async(const struct _starpu_mp_node *node STARPU_ATTRIBUTE_UNUSED,
|
|
|
+ void * arg, int arg_size,
|
|
|
enum _starpu_mp_command answer)
|
|
|
{
|
|
|
- struct _starpu_worker_set * worker_set=NULL;
|
|
|
- switch(answer)
|
|
|
+ struct _starpu_worker_set * worker_set=NULL;
|
|
|
+ switch(answer)
|
|
|
{
|
|
|
case STARPU_EXECUTION_COMPLETED:
|
|
|
worker_set = _starpu_get_worker_struct(starpu_worker_get_id())->set;
|
|
@@ -137,17 +137,17 @@ static void _starpu_src_common_handle_stored_async(struct _starpu_mp_node *node)
|
|
|
{
|
|
|
/* We pop a message and handle it */
|
|
|
struct mp_message * message = mp_message_list_pop_back(node->message_queue);
|
|
|
- _starpu_src_common_handle_async(node, message->buffer,
|
|
|
+ _starpu_src_common_handle_async(node, message->buffer,
|
|
|
message->size, message->type);
|
|
|
mp_message_delete(message);
|
|
|
}
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
|
|
|
}
|
|
|
|
|
|
-/* Store a message if is asynchronous
|
|
|
+/* Store a message if is asynchronous
|
|
|
* return 1 if the message has been stored
|
|
|
* return 0 if the message is unknown or synchrone */
|
|
|
-int _starpu_src_common_store_message(struct _starpu_mp_node *node,
|
|
|
+int _starpu_src_common_store_message(struct _starpu_mp_node *node,
|
|
|
void * arg, int arg_size, enum _starpu_mp_command answer)
|
|
|
{
|
|
|
struct mp_message * message = NULL;
|
|
@@ -157,8 +157,8 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
|
|
|
case STARPU_PRE_EXECUTION:
|
|
|
message = mp_message_new();
|
|
|
message->type = answer;
|
|
|
- memcpy(message->buffer, arg, arg_size);
|
|
|
- message->size = arg_size;
|
|
|
+ memcpy(message->buffer, arg, arg_size);
|
|
|
+ message->size = arg_size;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&node->message_queue_mutex);
|
|
|
mp_message_list_push_front(node->message_queue,message);
|
|
@@ -172,7 +172,7 @@ int _starpu_src_common_store_message(struct _starpu_mp_node *node,
|
|
|
}
|
|
|
|
|
|
/* Store all asynchronous messages and return when a synchronous message is received */
|
|
|
-static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
|
|
|
+static enum _starpu_mp_command _starpu_src_common_wait_command_sync(struct _starpu_mp_node *node,
|
|
|
void ** arg, int* arg_size)
|
|
|
{
|
|
|
enum _starpu_mp_command answer;
|
|
@@ -197,7 +197,7 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
|
|
|
{
|
|
|
printf("incorrect commande: unknown command or sync command");
|
|
|
STARPU_ASSERT(0);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* Handle all asynchrone message while a completed execution message from a specific worker has been receive */
|
|
@@ -205,7 +205,7 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
|
|
|
{
|
|
|
enum _starpu_mp_command answer;
|
|
|
|
|
|
- int completed = 0;
|
|
|
+ int completed = 0;
|
|
|
/*While the waited completed execution message has not been receive*/
|
|
|
while(!completed)
|
|
|
{
|
|
@@ -214,7 +214,7 @@ static void _starpu_src_common_recv_async(struct _starpu_mp_node * node)
|
|
|
if(answer == STARPU_EXECUTION_COMPLETED)
|
|
|
{
|
|
|
int coreid;
|
|
|
- STARPU_ASSERT(sizeof(coreid) == *arg_size);
|
|
|
+ STARPU_ASSERT(sizeof(coreid) == *arg_size);
|
|
|
coreid = *(int *) *arg;
|
|
|
if(devid == coreid)
|
|
|
completed = 1;
|
|
@@ -275,7 +275,7 @@ int _starpu_src_common_lookup(struct _starpu_mp_node *node,
|
|
|
answer = _starpu_src_common_wait_command_sync(node, (void **) &arg,
|
|
|
&arg_size);
|
|
|
|
|
|
- if (answer == STARPU_ERROR_LOOKUP)
|
|
|
+ if (answer == STARPU_ERROR_LOOKUP)
|
|
|
{
|
|
|
_STARPU_DISP("Error looking up symbol %s\n", func_name);
|
|
|
return -ESPIPE;
|
|
@@ -319,11 +319,11 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
|
|
|
|
|
|
buffer_size = sizeof(kernel) + sizeof(coreid) + sizeof(type)
|
|
|
+ sizeof(nb_interfaces) + nb_interfaces * sizeof(union _starpu_interface) + sizeof(is_parallel_task);
|
|
|
-
|
|
|
+
|
|
|
/*if the task is parallel*/
|
|
|
if(is_parallel_task)
|
|
|
{
|
|
|
- buffer_size += sizeof(cb_workerid);
|
|
|
+ buffer_size += sizeof(cb_workerid);
|
|
|
}
|
|
|
|
|
|
/* If the user didn't give any cl_arg, there is no need to send it */
|
|
@@ -332,7 +332,7 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
|
|
|
STARPU_ASSERT(cl_arg_size);
|
|
|
buffer_size += cl_arg_size;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/* We give to send_command a buffer we just allocated, which contains
|
|
|
* a pointer to the function (sink-side), core on which execute this
|
|
|
* function (sink-side), number of interfaces we send,
|
|
@@ -395,8 +395,8 @@ int _starpu_src_common_execute_kernel(struct _starpu_mp_node *node,
|
|
|
|
|
|
|
|
|
/* Get the information and call the function to send to the sink a message to execute the task*/
|
|
|
-static int _starpu_src_common_execute(struct _starpu_job *j,
|
|
|
- struct _starpu_worker *worker,
|
|
|
+static int _starpu_src_common_execute(struct _starpu_job *j,
|
|
|
+ struct _starpu_worker *worker,
|
|
|
struct _starpu_mp_node * node)
|
|
|
{
|
|
|
int ret;
|
|
@@ -407,7 +407,7 @@ static int _starpu_src_common_execute(struct _starpu_job *j,
|
|
|
int profiling = starpu_profiling_status_get();
|
|
|
|
|
|
STARPU_ASSERT(task);
|
|
|
- if (worker->current_rank == 0)
|
|
|
+ if (worker->current_rank == 0)
|
|
|
{
|
|
|
ret = _starpu_fetch_task_input(j);
|
|
|
if (ret != 0)
|
|
@@ -534,7 +534,7 @@ int _starpu_src_common_copy_sink_to_sink(const struct _starpu_mp_node *src_node,
|
|
|
/* 5 functions to determine the executable to run on the device (MIC, SCC,
|
|
|
* MPI).
|
|
|
*/
|
|
|
-static void _starpu_src_common_cat_3(char *final, const char *first,
|
|
|
+static void _starpu_src_common_cat_3(char *final, const char *first,
|
|
|
const char *second, const char *third)
|
|
|
{
|
|
|
strcpy(final, first);
|
|
@@ -642,12 +642,12 @@ int _starpu_src_common_locate_file(char *located_file_name,
|
|
|
return 1;
|
|
|
}
|
|
|
|
|
|
-/* Send workers to the sink node
|
|
|
+/* Send workers to the sink node
|
|
|
*/
|
|
|
static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int baseworkerid, int nworkers)
|
|
|
-{
|
|
|
+{
|
|
|
struct _starpu_machine_config *config = _starpu_get_machine_config();
|
|
|
- int worker_size = sizeof(struct _starpu_worker)*nworkers;
|
|
|
+ int worker_size = sizeof(struct _starpu_worker)*nworkers;
|
|
|
int combined_worker_size = STARPU_NMAX_COMBINEDWORKERS*sizeof(struct _starpu_combined_worker);
|
|
|
int msg[5];
|
|
|
msg[0] = nworkers;
|
|
@@ -657,7 +657,7 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
|
|
|
msg[4] = starpu_worker_get_count();
|
|
|
|
|
|
/* tell the sink node that we will send him all workers */
|
|
|
- _starpu_mp_common_send_command(node, STARPU_SYNC_WORKERS,
|
|
|
+ _starpu_mp_common_send_command(node, STARPU_SYNC_WORKERS,
|
|
|
&msg, sizeof(msg));
|
|
|
|
|
|
/* Send all worker to the sink node */
|
|
@@ -665,13 +665,13 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
|
|
|
|
|
|
/* Send all combined workers to the sink node */
|
|
|
node->dt_send(node, &config->combined_workers,combined_worker_size);
|
|
|
-}
|
|
|
+}
|
|
|
|
|
|
/* Function looping on the source node */
|
|
|
-void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
|
|
|
- unsigned baseworkerid,
|
|
|
+void _starpu_src_common_worker(struct _starpu_worker_set * worker_set,
|
|
|
+ unsigned baseworkerid,
|
|
|
struct _starpu_mp_node * mp_node)
|
|
|
-{
|
|
|
+{
|
|
|
unsigned memnode = worker_set->workers[0].memory_node;
|
|
|
struct starpu_task **tasks = malloc(sizeof(struct starpu_task *)*worker_set->nworkers);
|
|
|
|