|
@@ -51,7 +51,6 @@ static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
|
|
|
return STARPU_INVALID_KIND;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/* Send to host the number of cores of the sink device
|
|
|
*/
|
|
|
static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
|
|
@@ -61,7 +60,6 @@ static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
|
|
|
&node->nb_cores, sizeof (int));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/* Send to host the address of the function given in parameter
|
|
|
*/
|
|
|
static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
|
|
@@ -69,7 +67,7 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
|
|
|
{
|
|
|
void (*func)(void);
|
|
|
func = node->lookup(node,func_name);
|
|
|
-
|
|
|
+
|
|
|
//_STARPU_DEBUG("Looked up %s, got %p\n", func_name, func);
|
|
|
|
|
|
/* If we couldn't find the function, let's send an error to the host.
|
|
@@ -82,7 +80,6 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
|
|
|
NULL, 0);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/* Allocate a memory space and send the address of this space to the host
|
|
|
*/
|
|
|
void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
|
|
@@ -161,8 +158,8 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
|
|
|
STARPU_ASSERT(arg_size == (sizeof(int)*5));
|
|
|
void * arg_ptr = arg;
|
|
|
int i;
|
|
|
-
|
|
|
- int nworkers = *(int *)arg_ptr;
|
|
|
+
|
|
|
+ int nworkers = *(int *)arg_ptr;
|
|
|
arg_ptr += sizeof(nworkers);
|
|
|
|
|
|
int worker_size = *(int *)arg_ptr;
|
|
@@ -170,7 +167,7 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
|
|
|
|
|
|
int combined_worker_size = *(int *)arg_ptr;
|
|
|
arg_ptr += sizeof(combined_worker_size);
|
|
|
-
|
|
|
+
|
|
|
int baseworkerid = *(int *)arg_ptr;
|
|
|
arg_ptr += sizeof(baseworkerid);
|
|
|
|
|
@@ -181,7 +178,7 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
|
|
|
/* Retrieve workers */
|
|
|
struct _starpu_worker * workers = &config->workers[baseworkerid];
|
|
|
node->dt_recv(node,workers,worker_size);
|
|
|
-
|
|
|
+
|
|
|
/* Update workers to have coherent field */
|
|
|
for(i=0; i<nworkers; i++)
|
|
|
{
|
|
@@ -204,24 +201,14 @@ static void _starpu_sink_common_recv_workers(struct _starpu_mp_node * node, void
|
|
|
workers[i].current_task = NULL;
|
|
|
workers[i].set = NULL;
|
|
|
workers[i].terminated_jobs = NULL;
|
|
|
-
|
|
|
- //_starpu_barrier_counter_init(&workers[i].tasks_barrier, 1);
|
|
|
- //_starpu_barrier_counter_destroy(&workers[i].tasks_barrier);
|
|
|
-
|
|
|
- starpu_pthread_mutex_init(&workers[i].parallel_sect_mutex,NULL);
|
|
|
- starpu_pthread_mutex_destroy(&workers[i].parallel_sect_mutex);
|
|
|
-
|
|
|
- starpu_pthread_cond_init(&workers[i].parallel_sect_cond,NULL);
|
|
|
- starpu_pthread_cond_destroy(&workers[i].parallel_sect_cond);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/* Retrieve combined workers */
|
|
|
- struct _starpu_combined_worker * combined_workers = config->combined_workers;
|
|
|
+ struct _starpu_combined_worker * combined_workers = config->combined_workers;
|
|
|
node->dt_recv(node, combined_workers, combined_worker_size);
|
|
|
|
|
|
node->baseworkerid = baseworkerid;
|
|
|
- STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
|
|
|
+ STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -311,7 +298,7 @@ void _starpu_sink_common_worker(void)
|
|
|
struct mp_message * message = mp_message_list_pop_back(node->message_queue);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&node->message_queue_mutex);
|
|
|
//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
|
|
|
- _starpu_mp_common_send_command(node, message->type,
|
|
|
+ _starpu_mp_common_send_command(node, message->type,
|
|
|
&message->buffer, message->size);
|
|
|
mp_message_delete(message);
|
|
|
}
|
|
@@ -328,7 +315,7 @@ void _starpu_sink_common_worker(void)
|
|
|
}
|
|
|
|
|
|
|
|
|
-/* Search for the mp_barrier correspondind to the specified combined worker
|
|
|
+/* Search for the mp_barrier correspondind to the specified combined worker
|
|
|
* and create it if it doesn't exist
|
|
|
*/
|
|
|
static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
|
|
@@ -336,8 +323,8 @@ static struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_nod
|
|
|
struct mp_barrier * b = NULL;
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&node->barrier_mutex);
|
|
|
/* Search if the barrier already exist */
|
|
|
- for(b = mp_barrier_list_begin(node->barrier_list);
|
|
|
- b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid;
|
|
|
+ for(b = mp_barrier_list_begin(node->barrier_list);
|
|
|
+ b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid;
|
|
|
b = mp_barrier_list_next(b));
|
|
|
|
|
|
/* If we found the barrier */
|
|
@@ -390,7 +377,7 @@ static void _starpu_sink_common_pre_execution_message(struct _starpu_mp_node *no
|
|
|
message->size = sizeof(task->combined_workerid);
|
|
|
|
|
|
|
|
|
- /* Append the message to the queue */
|
|
|
+ /* Append the message to the queue */
|
|
|
_starpu_sink_common_append_message(node, message);
|
|
|
|
|
|
}
|
|
@@ -422,7 +409,7 @@ static void _starpu_sink_common_bind_to_combined_worker(struct _starpu_mp_node *
|
|
|
|
|
|
|
|
|
|
|
|
-/* Get the current rank of the worker in the combined worker
|
|
|
+/* Get the current rank of the worker in the combined worker
|
|
|
*/
|
|
|
static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_combined_worker * combined_worker)
|
|
|
{
|
|
@@ -434,7 +421,7 @@ static int _starpu_sink_common_get_current_rank(int workerid, struct _starpu_com
|
|
|
STARPU_ASSERT(0);
|
|
|
}
|
|
|
|
|
|
-/* Execute the task
|
|
|
+/* Execute the task
|
|
|
*/
|
|
|
static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int coreid, struct _starpu_worker * worker)
|
|
|
{
|
|
@@ -446,11 +433,11 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
|
|
|
if(task->is_parallel_task)
|
|
|
{
|
|
|
combined_worker = _starpu_get_combined_worker_struct(task->combined_workerid);
|
|
|
-
|
|
|
+
|
|
|
worker->current_rank = _starpu_sink_common_get_current_rank(worker->workerid, combined_worker);
|
|
|
worker->combined_workerid = task->combined_workerid;
|
|
|
worker->worker_size = combined_worker->worker_size;
|
|
|
-
|
|
|
+
|
|
|
/* Synchronize with others threads of the combined worker*/
|
|
|
STARPU_PTHREAD_BARRIER_WAIT(&task->mp_barrier->before_work_barrier);
|
|
|
|
|
@@ -461,8 +448,8 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
|
|
|
/* tell the sink that the execution has begun */
|
|
|
_starpu_sink_common_pre_execution_message(node,task);
|
|
|
|
|
|
- /* If the mode is FORKJOIN,
|
|
|
- * the first thread binds himself
|
|
|
+ /* If the mode is FORKJOIN,
|
|
|
+ * the first thread binds himself
|
|
|
* on all core of the combined worker*/
|
|
|
if(task->type == STARPU_FORKJOIN)
|
|
|
{
|
|
@@ -497,7 +484,7 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
|
|
|
/* Erase the barrier from the list */
|
|
|
_starpu_sink_common_erase_barrier(node,task->mp_barrier);
|
|
|
|
|
|
- /* If the mode is FORKJOIN,
|
|
|
+ /* If the mode is FORKJOIN,
|
|
|
* the first thread rebinds himself on his own core */
|
|
|
if(task->type == STARPU_FORKJOIN)
|
|
|
node->bind_thread(node, coreid, &coreid, 1);
|
|
@@ -519,7 +506,7 @@ static void _starpu_sink_common_execute_kernel(struct _starpu_mp_node *node, int
|
|
|
}
|
|
|
|
|
|
|
|
|
-/* The main function executed by the thread
|
|
|
+/* The main function executed by the thread
|
|
|
* thread_arg is a structure containing the information needed by the thread
|
|
|
*/
|
|
|
void* _starpu_sink_thread(void * thread_arg)
|
|
@@ -530,7 +517,7 @@ void* _starpu_sink_thread(void * thread_arg)
|
|
|
/* free the structure */
|
|
|
free(thread_arg);
|
|
|
|
|
|
- STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
|
|
|
+ STARPU_PTHREAD_BARRIER_WAIT(&node->init_completed_barrier);
|
|
|
|
|
|
struct _starpu_worker *worker = &_starpu_get_machine_config()->workers[node->baseworkerid + coreid];
|
|
|
|
|
@@ -603,8 +590,8 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
|
|
|
* interfaces, thus we expect the same size anyway */
|
|
|
for (i = 0; i < task->nb_interfaces; i++)
|
|
|
{
|
|
|
- union _starpu_interface * interface = malloc(sizeof(union _starpu_interface));
|
|
|
- memcpy(interface, arg_ptr,
|
|
|
+ union _starpu_interface * interface = malloc(sizeof(union _starpu_interface));
|
|
|
+ memcpy(interface, arg_ptr,
|
|
|
sizeof(union _starpu_interface));
|
|
|
task->interfaces[i] = interface;
|
|
|
arg_ptr += sizeof(union _starpu_interface);
|
|
@@ -622,6 +609,6 @@ void _starpu_sink_common_execute(struct _starpu_mp_node *node,
|
|
|
NULL, 0);
|
|
|
|
|
|
//_STARPU_DEBUG("executing the task %p\n", task->kernel);
|
|
|
- _starpu_sink_common_execute_thread(node, task);
|
|
|
+ _starpu_sink_common_execute_thread(node, task);
|
|
|
|
|
|
}
|