|
@@ -51,8 +51,10 @@ static enum _starpu_mp_node_kind _starpu_sink_common_get_kind(void)
|
|
|
return STARPU_INVALID_KIND;
|
|
|
}
|
|
|
|
|
|
-void
|
|
|
-_starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
|
|
|
+
|
|
|
+/* Send to host the number of cores of the sink device
|
|
|
+ */
|
|
|
+static void _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
|
|
|
{
|
|
|
// Process packet received from `_starpu_src_common_sink_cores'.
|
|
|
_starpu_mp_common_send_command (node, STARPU_ANSWER_SINK_NBCORES,
|
|
@@ -60,7 +62,8 @@ _starpu_sink_common_get_nb_cores (struct _starpu_mp_node *node)
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
+/* Send to host the address of the function given in parameter
|
|
|
+ */
|
|
|
static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
|
|
|
char *func_name)
|
|
|
{
|
|
@@ -80,6 +83,9 @@ static void _starpu_sink_common_lookup(const struct _starpu_mp_node *node,
|
|
|
NULL, 0);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+/* Allocate a memory space and send the address of this space to the host
|
|
|
+ */
|
|
|
void _starpu_sink_common_allocate(const struct _starpu_mp_node *mp_node,
|
|
|
void *arg, int arg_size)
|
|
|
{
|
|
@@ -150,7 +156,6 @@ static void _starpu_sink_common_copy_to_sink(const struct _starpu_mp_node *mp_no
|
|
|
/* Function looping on the sink, waiting for tasks to execute.
|
|
|
* If the caller is the host, don't do anything.
|
|
|
*/
|
|
|
-
|
|
|
void _starpu_sink_common_worker(void)
|
|
|
{
|
|
|
struct _starpu_mp_node *node = NULL;
|
|
@@ -168,10 +173,9 @@ void _starpu_sink_common_worker(void)
|
|
|
/* Create and initialize the node */
|
|
|
node = _starpu_mp_common_node_create(node_kind, -1);
|
|
|
|
|
|
- sleep(1);
|
|
|
-
|
|
|
while (!exit_starpu)
|
|
|
{
|
|
|
+ /* If we have received a message */
|
|
|
if(node->mp_recv_is_ready(node))
|
|
|
{
|
|
|
|
|
@@ -219,9 +223,12 @@ void _starpu_sink_common_worker(void)
|
|
|
printf("Oops, command %x unrecognized\n", command);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
pthread_mutex_lock(&node->message_queue_mutex);
|
|
|
+ /* If the list is not empty */
|
|
|
if(!mp_message_list_empty(node->message_queue))
|
|
|
{
|
|
|
+ /* We pop a message and send it to the host */
|
|
|
struct mp_message * message = mp_message_list_pop_back(node->message_queue);
|
|
|
pthread_mutex_unlock(&node->message_queue_mutex);
|
|
|
//_STARPU_DEBUG("telling host that we have finished the task %p sur %d.\n", task->kernel, task->coreid);
|
|
@@ -241,60 +248,83 @@ void _starpu_sink_common_worker(void)
|
|
|
exit(0);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+/* The main function executed by the thread
|
|
|
+ * thread_arg is a structure containing the information needed by the thread
|
|
|
+ */
|
|
|
void* _starpu_sink_thread(void * thread_arg)
|
|
|
{
|
|
|
+ /* Retrieve the information from the structure */
|
|
|
struct mp_task **task = ((struct arg_sink_thread *)thread_arg)->task;
|
|
|
struct _starpu_mp_node *node = ((struct arg_sink_thread *)thread_arg)->node;
|
|
|
pthread_mutex_t * mutex = ((struct arg_sink_thread *)thread_arg)->mutex;
|
|
|
int coreid =((struct arg_sink_thread *)thread_arg)->coreid;
|
|
|
+ /* free the structure */
|
|
|
free(thread_arg);
|
|
|
+
|
|
|
+
|
|
|
while(1)
|
|
|
{
|
|
|
+ /*Wait there is a task available */
|
|
|
pthread_mutex_lock(mutex);
|
|
|
+
|
|
|
+ /* If it's a parallel task */
|
|
|
if((*task)->is_parallel_task)
|
|
|
{
|
|
|
+ /* Synchronize with others threads of the combined worker*/
|
|
|
STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->before_work_barrier);
|
|
|
+
|
|
|
+ /* The first thread of the combined worker
|
|
|
+ * tell the sink that the execution has begun
|
|
|
+ */
|
|
|
if((*task)->coreid == (*task)->combined_worker[0])
|
|
|
{
|
|
|
- //init message to tell the sink that the execution has begun
|
|
|
+ /* Init message to tell the sink that the execution has begun */
|
|
|
struct mp_message * message = mp_message_new();
|
|
|
message->type = STARPU_PRE_EXECUTION;
|
|
|
*(int *) message->buffer = (*task)->combined_workerid;
|
|
|
message->size = sizeof((*task)->combined_workerid);
|
|
|
|
|
|
- //append the message to the queue
|
|
|
+ /* Append the message to the queue */
|
|
|
pthread_mutex_lock(&node->message_queue_mutex);
|
|
|
mp_message_list_push_front(node->message_queue,message);
|
|
|
pthread_mutex_unlock(&node->message_queue_mutex);
|
|
|
|
|
|
+ /* If the mode is FORKJOIN,
|
|
|
+ * the first thread binds himself on all core of the combined worker
|
|
|
+ */
|
|
|
if((*task)->type == STARPU_FORKJOIN)
|
|
|
node->bind_thread(node, coreid, (*task)->combined_worker, (*task)->combined_worker_size);
|
|
|
}
|
|
|
}
|
|
|
if((*task)->type != STARPU_FORKJOIN || (*task)->coreid == (*task)->combined_worker[0])
|
|
|
{
|
|
|
- //execute the task
|
|
|
+ /* execute the task */
|
|
|
(*task)->kernel((*task)->interfaces,(*task)->cl_arg);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ /* If it's a parallel task */
|
|
|
if((*task)->is_parallel_task)
|
|
|
{
|
|
|
+ /* Synchronize with others threads of the combined worker*/
|
|
|
STARPU_PTHREAD_BARRIER_WAIT(&(*task)->mp_barrier->after_work_barrier);
|
|
|
+
|
|
|
+ /* The fisrt thread of the combined */
|
|
|
if((*task)->coreid == (*task)->combined_worker[0])
|
|
|
{
|
|
|
- //erase the barrier from the list
|
|
|
+ /* Erase the barrier from the list */
|
|
|
pthread_mutex_lock(&node->barrier_mutex);
|
|
|
mp_barrier_list_erase(node->barrier_list,(*task)->mp_barrier);
|
|
|
pthread_mutex_unlock(&node->barrier_mutex);
|
|
|
-
|
|
|
+
|
|
|
+ /* If the mode is FORKJOIN,
|
|
|
+ * the first thread rebinds himself on his own core
|
|
|
+ */
|
|
|
if((*task)->type == STARPU_FORKJOIN)
|
|
|
node->bind_thread(node, coreid, &coreid, 1);
|
|
|
|
|
|
}
|
|
|
}
|
|
|
- //init message to tell the sink that the execution is completed
|
|
|
+ /* Init message to tell the sink that the execution is completed */
|
|
|
struct mp_message * message = mp_message_new();
|
|
|
message->type = STARPU_EXECUTION_COMPLETED;
|
|
|
message->size = sizeof((*task)->coreid);
|
|
@@ -303,7 +333,7 @@ void* _starpu_sink_thread(void * thread_arg)
|
|
|
free(*task);
|
|
|
(*task) = NULL;
|
|
|
|
|
|
- //append the message to the queue
|
|
|
+ /* Append the message to the queue */
|
|
|
pthread_mutex_lock(&node->message_queue_mutex);
|
|
|
mp_message_list_push_front(node->message_queue,message);
|
|
|
pthread_mutex_unlock(&node->message_queue_mutex);
|
|
@@ -312,38 +342,48 @@ void* _starpu_sink_thread(void * thread_arg)
|
|
|
pthread_exit(NULL);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+/* Add the task to the specific thread and wake him up
|
|
|
+ */
|
|
|
static void _starpu_sink_common_execute_thread(struct _starpu_mp_node *node, struct mp_task *task)
|
|
|
{
|
|
|
- //add the task to the spesific thread
|
|
|
+ /* Add the task to the specific thread */
|
|
|
node->run_table[task->coreid] = task;
|
|
|
- //unlock the mutex
|
|
|
+ /* Unlock the mutex to wake up the thread which will execute the task */
|
|
|
pthread_mutex_unlock(&node->mutex_run_table[task->coreid]);
|
|
|
}
|
|
|
|
|
|
-/**/
|
|
|
+/* Search for the mp_barrier correspondind to the specified combined worker
|
|
|
+ * and create it if it doesn't exist
|
|
|
+ */
|
|
|
struct mp_barrier * _starpu_sink_common_get_barrier(struct _starpu_mp_node * node, int cb_workerid, int cb_workersize)
|
|
|
{
|
|
|
struct mp_barrier * b = NULL;
|
|
|
pthread_mutex_lock(&node->barrier_mutex);
|
|
|
- if(!mp_barrier_list_empty(node->barrier_list))
|
|
|
+
|
|
|
+ /* Search if the barrier already exist */
|
|
|
+ for(b = mp_barrier_list_begin(node->barrier_list);
|
|
|
+ b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid;
|
|
|
+ b = mp_barrier_list_next(b));
|
|
|
+
|
|
|
+ /* If we found the barrier */
|
|
|
+ if(b != NULL && b->id == cb_workerid)
|
|
|
+ {
|
|
|
+ pthread_mutex_unlock(&node->barrier_mutex);
|
|
|
+ return b;
|
|
|
+ }
|
|
|
+ else
|
|
|
{
|
|
|
- for(b = mp_barrier_list_begin(node->barrier_list);
|
|
|
- b != mp_barrier_list_end(node->barrier_list) && b->id != cb_workerid;
|
|
|
- b = mp_barrier_list_next(b));
|
|
|
|
|
|
- if(b != NULL && b->id == cb_workerid)
|
|
|
- {
|
|
|
- pthread_mutex_unlock(&node->barrier_mutex);
|
|
|
- return b;
|
|
|
- }
|
|
|
+ /* Else we create, initialize and add it to the list*/
|
|
|
+ b = mp_barrier_new();
|
|
|
+ b->id = cb_workerid;
|
|
|
+ STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
|
|
|
+ STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
|
|
|
+ mp_barrier_list_push_back(node->barrier_list,b);
|
|
|
+ pthread_mutex_unlock(&node->barrier_mutex);
|
|
|
+ return b;
|
|
|
}
|
|
|
- b = mp_barrier_new();
|
|
|
- b->id = cb_workerid;
|
|
|
- STARPU_PTHREAD_BARRIER_INIT(&b->before_work_barrier,NULL,cb_workersize);
|
|
|
- STARPU_PTHREAD_BARRIER_INIT(&b->after_work_barrier,NULL,cb_workersize);
|
|
|
- mp_barrier_list_push_back(node->barrier_list,b);
|
|
|
- pthread_mutex_unlock(&node->barrier_mutex);
|
|
|
- return b;
|
|
|
}
|
|
|
|
|
|
|