瀏覽代碼

simplify unpartitioning data, and make it actually wait for completion of all transfers

Samuel Thibault 10 年之前
父節點
當前提交
69e9aa32cb
共有 1 個文件被更改,包括 24 次插入12 次删除
  1. 24 12
      src/datawizard/filters.c

+ 24 - 12
src/datawizard/filters.c

@@ -299,8 +299,6 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 		if (child_handle->nchildren > 0)
 		if (child_handle->nchildren > 0)
 			starpu_data_unpartition(child_handle, gathering_node);
 			starpu_data_unpartition(child_handle, gathering_node);
 
 
-		sizes[child] = _starpu_data_get_size(child_handle);
-
 		/* If this is a multiformat handle, we must convert the data now */
 		/* If this is a multiformat handle, we must convert the data now */
 #ifdef STARPU_DEVEL
 #ifdef STARPU_DEVEL
 #warning TODO: _starpu_fetch_data_on_node should be doing it
 #warning TODO: _starpu_fetch_data_on_node should be doing it
@@ -330,21 +328,35 @@ void starpu_data_unpartition(starpu_data_handle_t root_handle, unsigned gatherin
 		 * data should be possible from the node that does the unpartionning ... we
 		 * data should be possible from the node that does the unpartionning ... we
 		 * don't want to have the programming deal with memory shortage at that time,
 		 * don't want to have the programming deal with memory shortage at that time,
 		 * really */
 		 * really */
-		if (child_handle->current_mode == STARPU_REDUX)
-		{
-			/* Acquire the child data on the gathering node. This will trigger collapsing the reduction */
-			ret = starpu_data_acquire_on_node(child_handle, gathering_node, STARPU_RW);
-			_starpu_unlock_post_sync_tasks(child_handle);
-		}
-		else
+		/* Acquire the child data on the gathering node. This will trigger collapsing any reduction */
+		ret = starpu_data_acquire_on_node(child_handle, gathering_node, STARPU_RW);
+		STARPU_ASSERT(ret == 0);
+		starpu_data_release_on_node(child_handle, gathering_node);
+
+		_starpu_spin_lock(&child_handle->header_lock);
+		child_handle->busy_waiting = 1;
+		_starpu_spin_unlock(&child_handle->header_lock);
+
+		/* Wait for all requests to finish (notably WT requests) */
+		STARPU_PTHREAD_MUTEX_LOCK(&child_handle->busy_mutex);
+		while (1)
 		{
 		{
-			/* Simply transfer any pending data */
-			ret = _starpu_fetch_data_on_node(child_handle, &child_handle->per_node[gathering_node], STARPU_R, 0, 0, 0, NULL, NULL);
+			/* Here helgrind would shout that this an unprotected access,
+			 * but this is actually fine: all threads who do busy_count--
+			 * are supposed to call _starpu_data_check_not_busy, which will
+			 * wake us up through the busy_mutex/busy_cond. */
+			if (!child_handle->busy_count)
+				break;
+			/* This is woken by _starpu_data_check_not_busy, always called
+			 * after decrementing busy_count */
+			STARPU_PTHREAD_COND_WAIT(&child_handle->busy_cond, &child_handle->busy_mutex);
 		}
 		}
-		STARPU_ASSERT(ret == 0);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&child_handle->busy_mutex);
 
 
 		_starpu_spin_lock(&child_handle->header_lock);
 		_starpu_spin_lock(&child_handle->header_lock);
 
 
+		sizes[child] = _starpu_data_get_size(child_handle);
+
 		_starpu_data_unregister_ram_pointer(child_handle);
 		_starpu_data_unregister_ram_pointer(child_handle);
 
 
 		for (worker = 0; worker < nworkers; worker++)
 		for (worker = 0; worker < nworkers; worker++)