Browse Source

Asynchronous partitioning: Fix starpu_data_acquire cases

We were not actually enforcing data access for STARPU_W accesses
Samuel Thibault 3 years ago
parent
commit
d98ac7a4b3

+ 3 - 1
include/starpu_data.h

@@ -118,7 +118,9 @@ enum starpu_data_access_mode
 			            When inserting these tasks through the
 				    MPI layer however, the access mode needs
 				    to be ::STARPU_MPI_REDUX. */
-	STARPU_ACCESS_MODE_MAX=(1<<8) /**< The purpose of ::STARPU_ACCESS_MODE_MAX is to
+	STARPU_NOPLAN=(1<<8),	/**< Disable automatic submission of asynchronous
+				    partitioning/unpartitioning */
+	STARPU_ACCESS_MODE_MAX=(1<<9) /**< The purpose of ::STARPU_ACCESS_MODE_MAX is to
 					be the maximum of this enum. */
 };
 

+ 1 - 1
src/core/task.c

@@ -796,7 +796,7 @@ static int _starpu_task_submit_head(struct starpu_task *task)
 			if (!(task->cl->flags & STARPU_CODELET_NOPLANS) &&
 			    ((handle->nplans && !handle->nchildren) || handle->siblings)
 			    && handle->partition_automatic_disabled == 0
-			    )
+			    && !(mode & STARPU_NOPLAN))
 				/* This handle is involved with asynchronous
 				 * partitioning as a parent or a child, make
 				 * sure the right plan is active, submit

+ 3 - 3
src/datawizard/filters.c

@@ -691,7 +691,7 @@ void _starpu_data_partition_submit(starpu_data_handle_t initial_handle, unsigned
 					 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
 	if (!handles_sequential_consistency || handles_sequential_consistency[0])
-		starpu_data_invalidate_submit(initial_handle);
+		_starpu_data_invalidate_submit_noplan(initial_handle);
 }
 
 void starpu_data_partition_submit_sequential_consistency(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children, int sequential_consistency)
@@ -787,7 +787,7 @@ void starpu_data_partition_readwrite_upgrade_submit(starpu_data_handle_t initial
 	/* TODO: assert nparts too */
 	int ret = starpu_task_insert(initial_handle->switch_cl, STARPU_RW, initial_handle, STARPU_DATA_MODE_ARRAY, descr, nparts, 0);
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_insert");
-	starpu_data_invalidate_submit(initial_handle);
+	_starpu_data_invalidate_submit_noplan(initial_handle);
 }
 
 void _starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsigned nparts, starpu_data_handle_t *children, int gather_node, unsigned char *handles_sequential_consistency, void (*callback_func)(void *), void *callback_arg)
@@ -863,7 +863,7 @@ void _starpu_data_unpartition_submit(starpu_data_handle_t initial_handle, unsign
 	for (i = 0; i < nparts; i++)
 	{
 		if (!handles_sequential_consistency || handles_sequential_consistency[i+1])
-			starpu_data_invalidate_submit(children[i]);
+			_starpu_data_invalidate_submit_noplan(children[i]);
 	}
 }
 

+ 12 - 0
src/datawizard/interfaces/data_interface.c

@@ -886,6 +886,9 @@ static void _starpu_data_unregister(starpu_data_handle_t handle, unsigned cohere
 		STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "starpu_data_unregister must not be called from a task or callback, perhaps you can use starpu_data_unregister_submit instead");
 
 		/* If sequential consistency is enabled, wait until data is available */
+		if (((handle->nplans && !handle->nchildren) || handle->siblings)
+			&& handle->partition_automatic_disabled == 0)
+			_starpu_data_partition_access_submit(handle, !handle->readonly);
 		_starpu_data_wait_until_available(handle, handle->readonly?STARPU_R:STARPU_RW, "starpu_data_unregister");
 	}
 
@@ -1201,6 +1204,15 @@ void starpu_data_invalidate_submit(starpu_data_handle_t handle)
 	handle->initialized = 0;
 }
 
+void _starpu_data_invalidate_submit_noplan(starpu_data_handle_t handle)
+{
+	STARPU_ASSERT(handle);
+
+	starpu_data_acquire_on_node_cb(handle, STARPU_ACQUIRE_NO_NODE_LOCK_ALL, STARPU_W | STARPU_NOPLAN, _starpu_data_invalidate, handle);
+
+	handle->initialized = 0;
+}
+
 enum starpu_data_interface_id starpu_data_get_interface_id(starpu_data_handle_t handle)
 {
 	return handle->ops->interfaceid;

+ 2 - 0
src/datawizard/interfaces/data_interface.h

@@ -82,6 +82,8 @@ extern void _starpu_data_unregister_ram_pointer(starpu_data_handle_t handle, uns
 
 #define _starpu_data_is_multiformat_handle(handle) handle->ops->is_multiformat
 
+void _starpu_data_invalidate_submit_noplan(starpu_data_handle_t handle);
+
 #pragma GCC visibility pop
 
 #endif // __DATA_INTERFACE_H__

+ 5 - 4
src/datawizard/user_interactions.c

@@ -27,15 +27,16 @@
 
 static void _starpu_data_check_initialized(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
 {
-	if (!(mode & STARPU_R))
-		return;
-
 	if (((handle->nplans && !handle->nchildren) || handle->siblings)
-		&& handle->partition_automatic_disabled == 0)
+		&& handle->partition_automatic_disabled == 0
+		&& !(mode & STARPU_NOPLAN))
 	{
 		_starpu_data_partition_access_submit(handle, (mode & STARPU_W) != 0);
 	}
 
+	if (!(mode & STARPU_R))
+		return;
+
 	if (!handle->initialized && handle->init_cl)
 	{
 		int ret = starpu_task_insert(handle->init_cl, STARPU_W, handle, 0);

+ 1 - 0
tests/Makefile.am

@@ -344,6 +344,7 @@ myPROGRAMS +=				\
 	datawizard/invalidate_pending_requests	\
 	datawizard/temporary_partition		\
 	datawizard/partitioned_initialization	\
+	datawizard/partitioned_acquire		\
 	datawizard/temporary_partition_implicit	\
 	datawizard/redux_acquire		\
 	disk/disk_copy				\

+ 106 - 0
tests/datawizard/partitioned_acquire.c

@@ -0,0 +1,106 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010-2021  Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include "../helper.h"
+
+#define SIZE (1<<20)
+#define NPARTS 16
+
+/*
+ * Test asynchronous partitioning on a temporary data.
+ */
+
+static void codelet(void *descr[], void *_args)
+{
+	(void)descr;
+	(void)_args;
+}
+
+static struct starpu_codelet clw =
+{
+	.where = STARPU_CPU,
+	.cpu_funcs = {codelet},
+	.nbuffers = 1,
+	.modes = {STARPU_W}
+};
+
+static struct starpu_codelet clr =
+{
+	.where = STARPU_CPU,
+	.cpu_funcs = {codelet},
+	.nbuffers = 1,
+	.modes = {STARPU_R}
+};
+
+int main(void)
+{
+	int ret;
+	starpu_data_handle_t handle, handles[NPARTS];
+	int i;
+	char d[SIZE];
+
+	ret = starpu_init(NULL);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_vector_data_register(&handle, STARPU_MAIN_RAM, (uintptr_t) &d, SIZE, sizeof(char));
+	starpu_data_invalidate(handle);
+
+	/* Fork */
+	struct starpu_data_filter f =
+	{
+		.filter_func = starpu_vector_filter_block,
+		.nchildren = NPARTS
+	};
+	starpu_data_partition_plan(handle, &f, handles);
+
+	/* Acquire in parallel */
+	for (i = 0; i < NPARTS; i++)
+	{
+		starpu_data_acquire(handles[i], STARPU_W);
+	}
+
+	/* Release in parallel */
+	for (i = 0; i < NPARTS; i++)
+	{
+		starpu_data_release(handles[i]);
+	}
+
+	starpu_data_acquire(handle, STARPU_R);
+	starpu_data_release(handle);
+
+	/* Read result */
+	starpu_task_insert(&clr, STARPU_R, handle, 0);
+
+	/* Clean */
+	starpu_data_partition_clean(handle, NPARTS, handles);
+
+	starpu_data_unregister(handle);
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	starpu_data_partition_clean(handle, NPARTS, handles);
+	starpu_data_unregister(handle);
+	starpu_shutdown();
+	/* yes, we do not perform the computation but we did detect that no one
+	 * could perform the kernel, so this is not an error from StarPU */
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	return STARPU_TEST_SKIPPED;
+}