Prechádzať zdrojové kódy

fix bug when sending the task

Corentin Salingue 8 rokov pred
rodič
commit
5146c1c84e
1 zmenil súbory, kde vykonal 13 pridanie a 3 odobranie
  1. 13 3
      src/drivers/mp_common/source_common.c

+ 13 - 3
src/drivers/mp_common/source_common.c

@@ -942,7 +942,8 @@ static void _starpu_src_common_send_data_callback(void *arg)
    struct _starpu_worker * worker = (struct _starpu_worker *) arg;
 
    /* increase the number of buffer received */
-   worker->nb_buffers_sent++;
+   STARPU_WMB();
+   (void)STARPU_ATOMIC_ADD(&worker->nb_buffers_sent, 1);
 }
 
 
@@ -963,12 +964,21 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
         /* We send all buffers to execute the task */
         if (worker_set->workers[i].task_sending != NULL && worker_set->workers[i].nb_buffers_sent == STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending))
         {
+            STARPU_RMB();
             unsigned nbuffers = STARPU_TASK_GET_NBUFFERS(worker_set->workers[i].task_sending);
             unsigned buf;
             for (buf = 0; buf < nbuffers; buf++)
             {
                 starpu_data_handle_t handle = STARPU_TASK_GET_HANDLE(worker_set->workers[i].task_sending, buf);
-                _starpu_release_data_on_node(handle, 0, &handle->per_node[memnode]);
+                struct _starpu_data_replicate *replicate = &handle->per_node[memnode];
+                /* Release our refcnt */
+                _starpu_spin_lock(&handle->header_lock);
+                replicate->refcnt--;
+                STARPU_ASSERT(replicate->refcnt >= 0);
+                STARPU_ASSERT(handle->busy_count > 0);
+                handle->busy_count--;
+                if (!_starpu_data_check_not_busy(handle))
+                    _starpu_spin_unlock(&handle->header_lock);
             }
 
             /* Execute the task */
@@ -1039,7 +1049,7 @@ static void _starpu_src_common_worker_internal_work(struct _starpu_worker_set *
                     enum starpu_data_access_mode mode = STARPU_TASK_GET_MODE(tasks[i], buf);
                     int workerid = starpu_worker_get_id_check();
                     struct _starpu_data_replicate *local_replicate = get_replicate(handle, mode, workerid, memnode);
-                    
+
                     int ret = _starpu_fetch_data_on_node(handle, memnode, local_replicate, mode, 0, 0, 1,
                                      _starpu_src_common_send_data_callback, &worker_set->workers[i], 0, "_starpu_src_common_worker_internal_work");
                     STARPU_ASSERT(!ret);