Sfoglia il codice sorgente

Automatically initialize handles on data acquisition when reduction methods are provided

Samuel Thibault 7 anni fa
parent
commit
fa02a5e3e7
2 ha cambiato i file con 39 aggiunte e 3 eliminazioni
  1. 26 1
      src/datawizard/user_interactions.c
  2. 13 2
      tests/datawizard/redux_acquire.c

+ 26 - 1
src/datawizard/user_interactions.c

@@ -24,6 +24,18 @@
 #include <core/dependencies/data_concurrency.h>
 #include <core/sched_policy.h>
 
+static void _starpu_data_check_initialized(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+{
+	if (!mode & STARPU_R)
+		return;
+
+	if (!handle->initialized && handle->init_cl) {
+		int ret = starpu_task_insert(handle->init_cl, STARPU_W, handle, 0);
+		STARPU_ASSERT(ret == 0);
+	}
+	STARPU_ASSERT_MSG(handle->initialized, "handle %p is not initialized while trying to read it\n", handle);
+}
+
 /* Explicitly ask StarPU to allocate room for a piece of data on the specified
  * memory node. */
 int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
@@ -182,6 +194,9 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
         _STARPU_LOG_IN();
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(struct user_interaction_wrapper));
 
@@ -274,10 +289,11 @@ int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
 
 
 /*
- *	Blockin data request from application
+ *	Blocking data request from application
  */
 
 
+
 static inline void _starpu_data_acquire_continuation(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
@@ -300,6 +316,9 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 	/* unless asynchronous, it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	if (node >= 0 && _starpu_data_is_multiformat_handle(handle) &&
 	    _starpu_handle_needs_conversion_task(handle, node))
 	{
@@ -391,6 +410,9 @@ int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum
 	/* it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	int ret;
 	STARPU_ASSERT_MSG(!_starpu_data_is_multiformat_handle(handle), "not supported yet");
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -485,6 +507,9 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	/* it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(async || _starpu_worker_may_perform_blocking_calls(), "Synchronous prefetch is not possible from a task or a callback");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(*wrapper));
 

+ 13 - 2
tests/datawizard/redux_acquire.c

@@ -46,21 +46,32 @@ static struct starpu_codelet redux_codelet =
 	.name = "redux_codelet"
 };
 
+static void check_dot(void *dot_handle) {
+	long int *x = starpu_data_get_local_ptr(dot_handle);
+	STARPU_ASSERT_MSG(*x == 42, "Incorrect value %ld", *x);
+	starpu_data_release(dot_handle);
+}
+
 int main(int argc, char **argv)
 {
-	long int dot;
 	starpu_data_handle_t dot_handle;
 
 	int ret = starpu_init(NULL);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
-	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(dot));
+	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(long int));
 	starpu_data_set_reduction_methods(dot_handle, &redux_codelet, &init_codelet);
 	starpu_data_acquire(dot_handle, STARPU_R);
 	long int *x = starpu_data_get_local_ptr(dot_handle);
 	STARPU_ASSERT_MSG(*x == 42, "Incorrect value %ld", *x);
 	starpu_data_release(dot_handle);
 	starpu_data_unregister(dot_handle);
+
+	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(long int));
+	starpu_data_set_reduction_methods(dot_handle, &redux_codelet, &init_codelet);
+	starpu_data_acquire_cb(dot_handle, STARPU_R, check_dot, dot_handle);
+	starpu_data_unregister(dot_handle);
+
 	starpu_shutdown();
 	return 0;
 }