소스 검색

make transfers asynchronous for MPI and MIC workers

Corentin Salingue 8 년 전
부모
커밋
18812205bf
5개의 변경된 파일68개의 추가작업 그리고 17개의 파일을 삭제
  1. 2 0
      src/core/workers.h
  2. 1 1
      src/datawizard/coherency.c
  3. 1 0
      src/datawizard/coherency.h
  4. 5 0
      src/drivers/driver_common/driver_common.c
  5. 59 16
      src/drivers/mp_common/source_common.c

+ 2 - 0
src/core/workers.h

@@ -115,6 +115,8 @@ LIST_TYPE(_starpu_worker,
 
 	unsigned spinning_backoff ; /* number of cycles to pause when spinning  */
 
+    unsigned nb_buffers_sent; /* number of piece of data already send to remote side */
+    struct starpu_task *task_sending; /* The buffers of this task are being sent */
 
 	/* indicate whether the workers shares tasks lists with other workers*/
 	/* in this case when removing him from a context it disapears instantly */

+ 1 - 1
src/datawizard/coherency.c

@@ -913,7 +913,7 @@ int starpu_idle_prefetch_task_input_on_node(struct starpu_task *task, unsigned n
 	return starpu_idle_prefetch_task_input_on_node_prio(task, node, prio);
 }
 
-static struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
+struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node)
 {
 	if (mode & (STARPU_SCRATCH|STARPU_REDUX))
 	{

+ 1 - 0
src/datawizard/coherency.h

@@ -326,5 +326,6 @@ void _starpu_data_end_reduction_mode(starpu_data_handle_t handle);
 void _starpu_data_end_reduction_mode_terminate(starpu_data_handle_t handle);
 
 void _starpu_data_set_unregister_hook(starpu_data_handle_t handle, _starpu_data_handle_unregister_hook func);
+struct _starpu_data_replicate *get_replicate(starpu_data_handle_t handle, enum starpu_data_access_mode mode, int workerid, unsigned node);
 
 #endif // __COHERENCY__H__

+ 5 - 0
src/drivers/driver_common/driver_common.c

@@ -503,6 +503,11 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		{
 			tasks[i] = NULL;
 		}
+        /* don't push a task if we are already pushing one */
+        else if (workers[i].task_sending != NULL)
+        {
+            tasks[i] = NULL;
+        }
 		/*else try to pop a task*/
 		else
 		{

+ 59 - 16
src/drivers/mp_common/source_common.c

@@ -936,6 +936,15 @@ static void _starpu_src_common_send_workers(struct _starpu_mp_node * node, int b
     STARPU_PTHREAD_MUTEX_UNLOCK(&node->connection_mutex);
 }
 
+/* Callback used when a buffer is send asynchronously to the sink */
+static void _starpu_src_common_send_data_callback(void *arg)
+{
+   struct _starpu_worker * worker = (struct _starpu_worker *) arg;
+
+   /* increase the number of buffer received */
+   worker->task_sending++;
+}
+
 
 static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set * worker_set, struct _starpu_mp_node * mp_node, struct starpu_task **tasks, unsigned memnode)
 {
@@ -947,6 +956,44 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
     starpu_pthread_wait_reset(&worker_set->workers[0].wait);
 #endif
 
+
+    /* Test if async transfers are completed */
+    for (unsigned i = 0; i < worker_set->nworkers; i++)
+        /* We send all buffers to execute the task */
+        if (worker_set->workers[i].task_sending != NULL && worker_set->workers[i].nb_buffers_sent == STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending))
+        {
+            unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending);
+            unsigned buf;
+            for (buf = 0; buf < nbuffers; buf++)
+            {
+                starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(worker_set->workers[i].task_sending, buf);
+                _starpu_release_data_on_node(handle, 0, &handle->per_node[memnode]);
+            }
+
+            /* Execute the task */
+            struct _starpu_job * j = _starpu_get_job_associated_to_task(worker_set->workers[i].task_sending);
+            _starpu_set_local_worker_key(&worker_set->workers[i]);
+            res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
+            switch (res)
+            {
+                case 0:
+                    /* The task task has been launched with no error */
+                    break;
+                case -EAGAIN:
+                    _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
+                    _starpu_push_task_to_workers(worker_set->workers[i].task_sending);
+                    STARPU_ABORT();
+                    continue;
+                    break;
+                default:
+                    STARPU_ASSERT(0);
+            }
+
+            /* Reset it */
+            worker_set->workers[i].task_sending = NULL;
+            worker_set->workers[i].nb_buffers_sent = 0;
+        }
+
     _STARPU_TRACE_START_PROGRESS(memnode);
     res |= __starpu_datawizard_progress(1, 1);
     _STARPU_TRACE_END_PROGRESS(memnode);
@@ -977,27 +1024,23 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
     /*if at least one worker have pop a task*/
     if(res != 0)
     {
-        unsigned i;
+        unsigned i, buf;
         for(i=0; i<worker_set->nworkers; i++)
         {
             if(tasks[i] != NULL)
             {
-                struct _starpu_job * j = _starpu_get_job_associated_to_task(tasks[i]);
-                _starpu_set_local_worker_key(&worker_set->workers[i]);
-                res =  _starpu_src_common_execute(j, &worker_set->workers[i], mp_node);
-                switch (res)
+                unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(tasks[i]);
+
+                for (buf = 0; buf < nbuffers; buf++)
                 {
-                    case 0:
-                        /* The task task has been launched with no error */
-                        break;
-                    case -EAGAIN:
-                        _STARPU_DISP("ouch, this MP worker could not actually run task %p, putting it back...\n", tasks[i]);
-                        _starpu_push_task_to_workers(tasks[i]);
-                        STARPU_ABORT();
-                        continue;
-                        break;
-                    default:
-                        STARPU_ASSERT(0);
+                    starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(tasks[i], buf);
+                    enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(tasks[i], buf);
+                    int workerid = starpu_worker_get_id_check();
+                    struct _starpu_data_replicate *local_replicate = get_replicate(handle, mode, workerid, memnode);
+                    
+                    int ret = _starpu_fetch_data_on_node(handle, memnode, local_replicate, mode, 0, 0, 1,
+                                     _starpu_src_common_send_data_callback, &worker_set->workers[i], 0, "_starpu_src_common_worker_internal_work");
+                    STARPU_ASSERT(!ret);
                 }
             }
         }