瀏覽代碼

Make sure that starpu_data_sync_with_mem_non_blocking also enforces sequential
consistency. Add a test (with the worst name ever) to make sure it's working
properly.

Cédric Augonnet 15 年之前
父節點
當前提交
888effa7e3

+ 53 - 23
src/datawizard/user_interactions.c

@@ -48,6 +48,7 @@ struct state_and_node {
 	unsigned finished;
 	unsigned async;
 	void (*callback)(void *);
+	void (*callback_fetch_data)(void *); // called after fetch_data
 	void *callback_arg;
 	struct starpu_task *pre_sync_task;
 	struct starpu_task *post_sync_task;
@@ -57,7 +58,21 @@ struct state_and_node {
  *	Non Blocking data request from application
  */
 /* put the current value of the data into RAM */
-static inline void _starpu_sync_data_with_mem_continuation_non_blocking(void *arg)
+static void _starpu_sync_data_with_mem_fetch_data_callback(void *arg)
+{
+	struct state_and_node *statenode = arg;
+	starpu_data_handle handle = statenode->state;
+
+	/* At that moment, the caller holds a reference to the piece of data.
+	 * We enqueue the "post" sync task in the list associated to the handle
+	 * so that it is submitted by the starpu_data_release_from_mem
+	 * function. */
+	_starpu_add_post_sync_tasks(statenode->post_sync_task, handle);
+
+	statenode->callback(statenode->callback_arg);
+}
+
+static void _starpu_sync_data_with_mem_continuation_non_blocking(void *arg)
 {
 	int ret;
 	struct state_and_node *statenode = arg;
@@ -66,15 +81,25 @@ static inline void _starpu_sync_data_with_mem_continuation_non_blocking(void *ar
 
 	STARPU_ASSERT(handle);
 
-	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0, NULL, NULL);
+	ret = _starpu_fetch_data_on_node(handle, 0, statenode->mode, 0,
+			_starpu_sync_data_with_mem_fetch_data_callback, statenode);
 	STARPU_ASSERT(!ret);
-	
-	/* continuation of starpu_data_sync_with_mem_non_blocking: we
-	 * execute the callback if any  */
-	if (statenode->callback)
-		statenode->callback(statenode->callback_arg);
 
-	free(statenode);
+}
+
+void starpu_data_sync_with_mem_non_blocking_pre_sync_callback(void *arg)
+{
+	struct state_and_node *statenode = arg;
+
+	/* we try to get the data, if we do not succeed immediately, we set a
+ 	* callback function that will be executed automatically when the data is
+ 	* available again, otherwise we fetch the data directly */
+	if (!_starpu_attempt_to_submit_data_request_from_apps(statenode->state, statenode->mode,
+			_starpu_sync_data_with_mem_continuation_non_blocking, statenode))
+	{
+		/* no one has locked this data yet, so we proceed immediately */
+		_starpu_sync_data_with_mem_continuation_non_blocking(statenode);
+	}
 }
 
 /* The data must be released by calling starpu_data_release_from_mem later on */
@@ -94,25 +119,30 @@ int starpu_data_sync_with_mem_non_blocking(starpu_data_handle handle,
 	PTHREAD_MUTEX_INIT(&statenode->lock, NULL);
 	statenode->finished = 0;
 
-	/* we try to get the data, if we do not succeed immediately, we set a
- 	* callback function that will be executed automatically when the data is
- 	* available again, otherwise we fetch the data directly */
-	if (!_starpu_attempt_to_submit_data_request_from_apps(handle, mode,
-			_starpu_sync_data_with_mem_continuation_non_blocking, statenode))
+	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
+	int sequential_consistency = handle->sequential_consistency;
+	if (sequential_consistency)
 	{
-		/* no one has locked this data yet, so we proceed immediately */
-		_starpu_sync_data_with_mem_continuation_non_blocking(statenode);
+		statenode->pre_sync_task = starpu_task_create();
+		statenode->pre_sync_task->detach = 1;
+		statenode->pre_sync_task->callback_func = starpu_data_sync_with_mem_non_blocking_pre_sync_callback;
+		statenode->pre_sync_task->callback_arg = statenode;
+
+		statenode->post_sync_task = starpu_task_create();
+		statenode->post_sync_task->detach = 1;
+
+		_starpu_detect_implicit_data_deps_with_handle(statenode->pre_sync_task, statenode->post_sync_task, handle, mode);
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
+
+		/* TODO detect if this is superflous */
+		int ret = starpu_task_submit(statenode->pre_sync_task);
+		STARPU_ASSERT(!ret);
 	}
+	else {
+		PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
-#warning TODO fix sequential consistency !
-	/* XXX this is a temporary hack to have the starpu_sync_data_with_mem
-	 * function working properly. It should be fixed later on. */
-	PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
-	if (handle->sequential_consistency)
-	{
-		handle->post_sync_tasks_cnt++;
+		starpu_data_sync_with_mem_non_blocking_pre_sync_callback(statenode);
 	}
-	PTHREAD_MUTEX_UNLOCK(&handle->sequential_consistency_mutex);
 
 	return 0;
 }

+ 4 - 0
tests/Makefile.am

@@ -104,6 +104,7 @@ check_PROGRAMS += 				\
 	datawizard/unpartition			\
 	datawizard/sync_with_data_with_mem	\
 	datawizard/sync_with_data_with_mem_non_blocking\
+	datawizard/sync_with_data_with_mem_non_blocking_implicit\
 	errorcheck/starpu_init_noworker		\
 	errorcheck/invalid_blocking_calls	\
 	errorcheck/invalid_tasks		\
@@ -218,6 +219,9 @@ datawizard_sync_with_data_with_mem_SOURCES =	\
 datawizard_sync_with_data_with_mem_non_blocking_SOURCES = \
 	datawizard/sync_with_data_with_mem_non_blocking.c
 
+datawizard_sync_with_data_with_mem_non_blocking_implicit_SOURCES = \
+	datawizard/sync_with_data_with_mem_non_blocking_implicit.c
+
 errorcheck_starpu_init_noworker_SOURCES =	\
 	errorcheck/starpu_init_noworker.c
 

+ 1 - 0
tests/datawizard/sync_with_data_with_mem_non_blocking.c

@@ -92,6 +92,7 @@ int main(int argc, char **argv)
 		starpu_data_malloc_pinned_if_possible((void **)&buffer[b], VECTORSIZE);
 		starpu_vector_data_register(&v_handle[b], 0,
 				(uintptr_t)buffer[b], VECTORSIZE, sizeof(char));
+		starpu_data_set_sequential_consistency_flag(v_handle[b], 0);
 	}
 
 	unsigned iter;

+ 138 - 0
tests/datawizard/sync_with_data_with_mem_non_blocking_implicit.c

@@ -0,0 +1,138 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+#define NBUFFERS	64
+#define NITER		128
+#define VECTORSIZE	1024
+
+float *buffer[NBUFFERS];
+
+starpu_data_handle v_handle[NBUFFERS];
+
+static void dummy_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+{
+}
+
+static starpu_codelet cl = {
+	.where = STARPU_CPU|STARPU_CUDA|STARPU_OPENCL,
+	.cpu_func = dummy_codelet,
+#ifdef STARPU_USE_CUDA
+	.cuda_func = dummy_codelet,
+#endif
+#ifdef STARPU_USE_OPENCL
+	.opencl_func = dummy_codelet,
+#endif
+	.nbuffers = 1
+};
+
+void use_handle(starpu_data_handle handle)
+{
+	int ret;
+	struct starpu_task *task;
+
+	task = starpu_task_create();
+		task->cl = &cl;
+		task->buffers[0].handle = handle;
+		task->buffers[0].mode = STARPU_RW;
+		task->detach = 0;
+
+	ret = starpu_task_submit(task);
+	if (ret == -ENODEV)
+	{
+		/* No one can execute such a task, but that's not a failure
+		 * of the test either. */
+		exit(0);
+	}
+}
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+static unsigned n_synced_buffers;
+
+void callback_sync_data(void *arg __attribute__ ((unused)))
+{
+	pthread_mutex_lock(&mutex);
+
+	n_synced_buffers++;
+
+	if (n_synced_buffers == NBUFFERS)
+		pthread_cond_signal(&cond);
+
+	pthread_mutex_unlock(&mutex);
+}
+
+int main(int argc, char **argv)
+{
+	starpu_init(NULL);
+
+	/* Allocate all buffers and register them to StarPU */
+	unsigned b;
+	for (b = 0; b < NBUFFERS; b++)
+	{
+		starpu_data_malloc_pinned_if_possible((void **)&buffer[b], VECTORSIZE);
+		starpu_vector_data_register(&v_handle[b], 0,
+				(uintptr_t)buffer[b], VECTORSIZE, sizeof(char));
+	}
+
+	unsigned iter;
+	for (iter = 0; iter < NITER; iter++)
+	{
+		/* Use the buffers on the different workers so that it may not
+		 * be in main memory anymore */
+		for (b = 0; b < NBUFFERS; b++)
+			use_handle(v_handle[b]);
+	
+		pthread_mutex_lock(&mutex);
+		n_synced_buffers = 0;
+		pthread_mutex_unlock(&mutex);
+
+		/* Grab the different pieces of data into main memory */
+		for (b = 0; b < NBUFFERS; b++)
+		{
+			starpu_data_sync_with_mem_non_blocking(v_handle[b], STARPU_RW,
+					callback_sync_data, NULL);
+		}
+
+		/* Wait for all buffers to be available */
+		pthread_mutex_lock(&mutex);
+
+		while (n_synced_buffers != NBUFFERS)
+			pthread_cond_wait(&cond, &mutex);
+
+		pthread_mutex_unlock(&mutex);
+
+		/* Release them */
+		for (b = 0; b < NBUFFERS; b++)
+			starpu_data_release_from_mem(v_handle[b]);
+	}
+
+	starpu_task_wait_for_all();
+
+	/* do some cleanup */
+	for (b = 0; b < NBUFFERS; b++)
+		starpu_data_unregister(v_handle[b]);
+
+	starpu_shutdown();
+
+	return 0;
+}