浏览代码

Add starpu_data_acquire_try and starpu_data_acquire_on_node_try.

Samuel Thibault 8 年之前
父节点
当前提交
beea50c30b

+ 1 - 0
ChangeLog

@@ -51,6 +51,7 @@ StarPU 1.2.2 (svn revision xxx)
 ==============================================
 ==============================================
 
 
 New features:
 New features:
+  * Add starpu_data_acquire_try and starpu_data_acquire_on_node_try.
 
 
 StarPU 1.2.1 (svn revision 20299)
 StarPU 1.2.1 (svn revision 20299)
 ==============================================
 ==============================================

+ 21 - 0
doc/doxygen/chapters/api/data_management.doxy

@@ -313,6 +313,19 @@ Similarly to starpu_data_acquire_cb(), this function is
 non-blocking and may be called from task callbacks. Upon successful
 non-blocking and may be called from task callbacks. Upon successful
 completion, this function returns 0.
 completion, this function returns 0.
 
 
+\fn int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+\ingroup API_Data_Management
+The application can call this function instead of starpu_data_acquire() so as to
+acquire the data like starpu_data_acquire(), but only if all
+previously-submitted tasks have completed, in why case starpu_data_acquire_try
+returns 0. StarPU will have ensured that the application will get an up-to-date
+copy of \p handle in main memory located where the data was originally
+registered. starpu_data_release() must be called once the application does not
+need to access the piece of data anymore.
+
+If not all previously-submitted tasks have completed, starpu_data_acquire_try
+returns -EAGAIN, and starpu_data_release() must not be called.
+
 \def STARPU_ACQUIRE_NO_NODE
 \def STARPU_ACQUIRE_NO_NODE
 \ingroup API_Data_Management
 \ingroup API_Data_Management
 This macro can be used to acquire data, but not require it to be available on a given node, only enforce R/W dependencies.
 This macro can be used to acquire data, but not require it to be available on a given node, only enforce R/W dependencies.
@@ -347,6 +360,14 @@ memory.
 ::STARPU_ACQUIRE_NO_NODE and ::STARPU_ACQUIRE_NO_NODE_LOCK_ALL can be used instead of an
 ::STARPU_ACQUIRE_NO_NODE and ::STARPU_ACQUIRE_NO_NODE_LOCK_ALL can be used instead of an
 explicit node number.
 explicit node number.
 
 
+\fn int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
+\ingroup API_Data_Management
+This is the same as starpu_data_acquire_try(), except that the
+data will be available on the given memory node instead of main
+memory.
+::STARPU_ACQUIRE_NO_NODE and ::STARPU_ACQUIRE_NO_NODE_LOCK_ALL can be used instead of an
+explicit node number.
+
 \def STARPU_DATA_ACQUIRE_CB(handle, mode, code)
 \def STARPU_DATA_ACQUIRE_CB(handle, mode, code)
 \ingroup API_Data_Management
 \ingroup API_Data_Management
 STARPU_DATA_ACQUIRE_CB() is the same as starpu_data_acquire_cb(),
 STARPU_DATA_ACQUIRE_CB() is the same as starpu_data_acquire_cb(),

+ 3 - 0
include/starpu_data.h

@@ -74,6 +74,9 @@ int starpu_data_acquire_on_node_cb(starpu_data_handle_t handle, int node, enum s
 int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 int starpu_data_acquire_on_node_cb_sequential_consistency(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode, void (*callback)(void *), void *arg, int sequential_consistency);
 
 
+int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
+int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode);
+
 #ifdef __GCC__
 #ifdef __GCC__
 #  define STARPU_DATA_ACQUIRE_CB(handle, mode, code) do \
 #  define STARPU_DATA_ACQUIRE_CB(handle, mode, code) do \
 	{ \						\
 	{ \						\

+ 23 - 1
src/core/dependencies/implicit_data_deps.c

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010-2016  Université de Bordeaux
+ * Copyright (C) 2010-2017  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013, 2015, 2016  CNRS
  * Copyright (C) 2010, 2011, 2013, 2015, 2016  CNRS
  * Copyright (C) 2016  Inria
  * Copyright (C) 2016  Inria
  *
  *
@@ -333,6 +333,28 @@ struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_
 	return task;
 	return task;
 }
 }
 
 
+int _starpu_test_implicit_data_deps_with_handle(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+{
+	/* Do not care about some flags */
+	mode &= ~ STARPU_SSEND;
+	mode &= ~ STARPU_LOCALITY;
+
+	STARPU_ASSERT(!(mode & STARPU_SCRATCH));
+
+	if (handle->sequential_consistency)
+	{
+		if (handle->last_sync_task)
+			return -EAGAIN;
+		if (handle->last_submitted_accessors.next != &handle->last_submitted_accessors)
+			return -EAGAIN;
+
+		if (mode & STARPU_W || mode == STARPU_REDUX)
+			handle->initialized = 1;
+		handle->last_submitted_mode = mode;
+	}
+	return 0;
+}
+
 /* Create the implicit dependencies for a newly submitted task */
 /* Create the implicit dependencies for a newly submitted task */
 void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 void _starpu_detect_implicit_data_deps(struct starpu_task *task)
 {
 {

+ 2 - 1
src/core/dependencies/implicit_data_deps.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
  *
- * Copyright (C) 2010, 2012, 2014-2015  Université de Bordeaux
+ * Copyright (C) 2010, 2012, 2014-2015, 2017  Université de Bordeaux
  * Copyright (C) 2010, 2011  CNRS
  * Copyright (C) 2010, 2011  CNRS
  *
  *
  * StarPU is free software; you can redistribute it and/or modify
  * StarPU is free software; you can redistribute it and/or modify
@@ -23,6 +23,7 @@
 
 
 struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 struct starpu_task *_starpu_detect_implicit_data_deps_with_handle(struct starpu_task *pre_sync_task, struct starpu_task *post_sync_task, struct _starpu_task_wrapper_dlist *post_sync_task_dependency_slot,
 						   starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 						   starpu_data_handle_t handle, enum starpu_data_access_mode mode);
+int _starpu_test_implicit_data_deps_with_handle(starpu_data_handle_t handle, enum starpu_data_access_mode mode);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_detect_implicit_data_deps(struct starpu_task *task);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, struct _starpu_task_wrapper_dlist *task_dependency_slot, starpu_data_handle_t handle);
 void _starpu_release_data_enforce_sequential_consistency(struct starpu_task *task, struct _starpu_task_wrapper_dlist *task_dependency_slot, starpu_data_handle_t handle);
 void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j);
 void _starpu_release_task_enforce_sequential_consistency(struct _starpu_job *j);

+ 57 - 0
src/datawizard/user_interactions.c

@@ -338,6 +338,63 @@ int starpu_data_acquire(starpu_data_handle_t handle, enum starpu_data_access_mod
 	return starpu_data_acquire_on_node(handle, STARPU_MAIN_RAM, mode);
 	return starpu_data_acquire_on_node(handle, STARPU_MAIN_RAM, mode);
 }
 }
 
 
+int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum starpu_data_access_mode mode)
+{
+	STARPU_ASSERT(handle);
+	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data is not possible");
+	/* it is forbidden to call this function from a callback or a codelet */
+	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
+
+	int ret;
+	STARPU_ASSERT_MSG(!_starpu_data_is_multiformat_handle(handle), "not supported yet");
+	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	ret = _starpu_test_implicit_data_deps_with_handle(handle, mode);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+	if (ret)
+		return ret;
+
+	struct user_interaction_wrapper wrapper =
+	{
+		.handle = handle,
+		.mode = mode,
+		.node = node,
+		.finished = 0
+	};
+
+	STARPU_PTHREAD_COND_INIT(&wrapper.cond, NULL);
+	STARPU_PTHREAD_MUTEX_INIT(&wrapper.lock, NULL);
+
+	/* we try to get the data, if we do not succeed immediately, we set a
+ 	* callback function that will be executed automatically when the data is
+ 	* available again, otherwise we fetch the data directly */
+	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode, _starpu_data_acquire_continuation, &wrapper))
+	{
+		struct _starpu_data_replicate *replicate =
+			node >= 0 ? &handle->per_node[node] : NULL;
+		/* no one has locked this data yet, so we proceed immediately */
+		ret = _starpu_fetch_data_on_node(handle, node, replicate, mode, 0, 0, 0, NULL, NULL, 0, "starpu_data_acquire_on_node");
+		STARPU_ASSERT(!ret);
+		if (replicate && replicate->mc)
+			replicate->mc->diduse = 1;
+	}
+	else
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&wrapper.lock);
+		while (!wrapper.finished)
+			STARPU_PTHREAD_COND_WAIT(&wrapper.cond, &wrapper.lock);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&wrapper.lock);
+	}
+	STARPU_PTHREAD_COND_DESTROY(&wrapper.cond);
+	STARPU_PTHREAD_MUTEX_DESTROY(&wrapper.lock);
+
+	return 0;
+}
+
+int starpu_data_acquire_try(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+{
+	return starpu_data_acquire_on_node_try(handle, STARPU_MAIN_RAM, mode);
+}
+
 /* This function must be called after starpu_data_acquire so that the
 /* This function must be called after starpu_data_acquire so that the
  * application release the data */
  * application release the data */
 void starpu_data_release_on_node(starpu_data_handle_t handle, int node)
 void starpu_data_release_on_node(starpu_data_handle_t handle, int node)

+ 1 - 0
tests/Makefile.am

@@ -216,6 +216,7 @@ myPROGRAMS +=				\
 	datawizard/acquire_cb_insert		\
 	datawizard/acquire_cb_insert		\
 	datawizard/acquire_release		\
 	datawizard/acquire_release		\
 	datawizard/acquire_release2		\
 	datawizard/acquire_release2		\
+	datawizard/acquire_try			\
 	datawizard/cache			\
 	datawizard/cache			\
 	datawizard/commute			\
 	datawizard/commute			\
 	datawizard/commute2			\
 	datawizard/commute2			\

+ 98 - 0
tests/datawizard/acquire_try.c

@@ -0,0 +1,98 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2010, 2014, 2016-2017  Université de Bordeaux
+ * Copyright (C) 2010, 2011, 2012, 2013  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <config.h>
+#include <starpu.h>
+#include "../helper.h"
+
+/*
+ * Try to use data_acquire_try in parallel with tasks
+ */
+
+void func(STARPU_ATTRIBUTE_UNUSED void *descr[], STARPU_ATTRIBUTE_UNUSED void *_args)
+{
+	starpu_sleep(0.01);
+}
+
+static struct starpu_codelet cl =
+{
+	.modes = { STARPU_RW },
+	.cpu_funcs = {func},
+	.cuda_funcs = {func},
+	.opencl_funcs = {func},
+	.cpu_funcs_name = {"func"},
+	.nbuffers = 1
+};
+
+unsigned token = 0;
+starpu_data_handle_t token_handle;
+
+static
+void callback(void *arg STARPU_ATTRIBUTE_UNUSED)
+{
+        starpu_data_release(token_handle);
+}
+
+int main(int argc, char **argv)
+{
+	unsigned i;
+	int ret;
+	int nacquired;
+
+        ret = starpu_initialize(NULL, &argc, &argv);
+	if (ret == -ENODEV) return STARPU_TEST_SKIPPED;
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&token_handle, STARPU_MAIN_RAM, (uintptr_t)&token, sizeof(unsigned));
+
+	ret = starpu_task_insert(&cl, STARPU_RW, token_handle, 0);
+	if (ret == -ENODEV)
+		goto enodev;
+	ret = starpu_data_acquire_try(token_handle, STARPU_R);
+	STARPU_ASSERT(ret != 0);
+
+	while ((ret = starpu_data_acquire_try(token_handle, STARPU_R)) != 0)
+	{
+		starpu_sleep(0.001);
+	}
+
+	ret = starpu_task_insert(&cl, STARPU_RW, token_handle, 0);
+	if (ret == -ENODEV)
+		goto enodev;
+
+	starpu_data_release(token_handle);
+
+	starpu_task_wait_for_all();
+
+	ret = starpu_data_acquire_try(token_handle, STARPU_R);
+	STARPU_ASSERT(ret == 0);
+	starpu_data_release(token_handle);
+
+	starpu_data_unregister(token_handle);
+
+	starpu_shutdown();
+
+	return 0;
+
+enodev:
+	starpu_data_unregister(token_handle);
+	fprintf(stderr, "WARNING: No one can execute this task\n");
+	/* yes, we do not perform the computation but we did detect that no one
+ 	 * could perform the kernel, so this is not an error from StarPU */
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+}