瀏覽代碼

- Add a non blocking version of sync_with_data_with_mem called
sync_with_data_with_mem_non_blocking: when the data is available in main
memory, a (non blocking) callback is executed.
- Also add a test to make sure this new function works properly.

Cédric Augonnet 15 年之前
父節點
當前提交
def1bf0edc

+ 2 - 0
include/starpu-data.h

@@ -49,6 +49,8 @@ void starpu_delete_data(starpu_data_handle state);
 void starpu_advise_if_data_is_important(starpu_data_handle state, unsigned is_important);
 
 int starpu_sync_data_with_mem(starpu_data_handle state, starpu_access_mode mode);
+int starpu_sync_data_with_mem_non_blocking(starpu_data_handle handle,
+			starpu_access_mode mode, void (*callback)(void *), void *arg);
 void starpu_release_data_from_mem(starpu_data_handle state);
 
 int starpu_malloc_pinned_if_possible(void **A, size_t dim);

+ 48 - 4
src/datawizard/user_interactions.c

@@ -42,6 +42,9 @@ struct state_and_node {
 	pthread_mutex_t lock;
 	unsigned finished;
 	unsigned async;
+	unsigned non_blocking;
+	void (*callback)(void *);
+	void *callback_arg;
 };
 
 /* put the current value of the data into RAM */
@@ -58,10 +61,20 @@ static inline void _starpu_sync_data_with_mem_continuation(void *arg)
 	ret = fetch_data_on_node(handle, 0, r, w, 0);
 	STARPU_ASSERT(!ret);
 	
-	pthread_mutex_lock(&statenode->lock);
-	statenode->finished = 1;
-	pthread_cond_signal(&statenode->cond);
-	pthread_mutex_unlock(&statenode->lock);
+	if (statenode->non_blocking)
+	{
+		/* continuation of starpu_sync_data_with_mem_non_blocking: we
+		 * execute the callback if any  */
+		if (statenode->callback)
+			statenode->callback(statenode->callback_arg);
+	}
+	else {
+		/* continuation of starpu_sync_data_with_mem */
+		pthread_mutex_lock(&statenode->lock);
+		statenode->finished = 1;
+		pthread_cond_signal(&statenode->cond);
+		pthread_mutex_unlock(&statenode->lock);
+	}
 }
 
 /* The data must be released by calling starpu_release_data_from_mem later on */
@@ -76,6 +89,7 @@ int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode
 		.state = handle,
 		.mode = mode,
 		.node = 0, // unused
+		.non_blocking = 0,
 		.cond = PTHREAD_COND_INITIALIZER,
 		.lock = PTHREAD_MUTEX_INITIALIZER,
 		.finished = 0
@@ -100,6 +114,36 @@ int starpu_sync_data_with_mem(starpu_data_handle handle, starpu_access_mode mode
 	return 0;
 }
 
+/* The data must be released by calling starpu_release_data_from_mem later on */
+int starpu_sync_data_with_mem_non_blocking(starpu_data_handle handle,
+		starpu_access_mode mode, void (*callback)(void *), void *arg)
+{
+	struct state_and_node statenode =
+	{
+		.state = handle,
+		.mode = mode,
+		.node = 0, // unused
+		.non_blocking = 1,
+		.callback = callback,
+		.callback_arg = arg,
+		.cond = PTHREAD_COND_INITIALIZER,
+		.lock = PTHREAD_MUTEX_INITIALIZER,
+		.finished = 0
+	};
+
+	/* we try to get the data, if we do not succeed immediately, we set a
+ 	* callback function that will be executed automatically when the data is
+ 	* available again, otherwise we fetch the data directly */
+	if (!attempt_to_submit_data_request_from_apps(handle, mode,
+			_starpu_sync_data_with_mem_continuation, &statenode))
+	{
+		/* no one has locked this data yet, so we proceed immediately */
+		_starpu_sync_data_with_mem_continuation(&statenode);
+	}
+
+	return 0;
+}
+
 /* This function must be called after starpu_sync_data_with_mem so that the
  * application release the data */
 void starpu_release_data_from_mem(starpu_data_handle handle)

+ 4 - 0
tests/Makefile.am

@@ -85,6 +85,7 @@ check_PROGRAMS += 				\
 	datawizard/readers_and_writers		\
 	datawizard/unpartition			\
 	datawizard/sync_with_data_with_mem	\
+	datawizard/sync_with_data_with_mem_non_blocking\
 	errorcheck/starpu_init_noworker		\
 	errorcheck/invalid_blocking_calls	\
 	helper/cublas_init			\
@@ -145,6 +146,9 @@ datawizard_unpartition_SOURCES =		\
 datawizard_sync_with_data_with_mem_SOURCES =	\
 	datawizard/sync_with_data_with_mem.c
 
+datawizard_sync_with_data_with_mem_non_blocking_SOURCES = \
+	datawizard/sync_with_data_with_mem_non_blocking.c
+
 errorcheck_starpu_init_noworker_SOURCES =	\
 	errorcheck/starpu_init_noworker.c
 

+ 1 - 1
tests/datawizard/sync_with_data_with_mem.c

@@ -21,7 +21,7 @@
 #include <stdlib.h>
 
 #define NBUFFERS	64
-#define NITER		1000
+#define NITER		128
 #define VECTORSIZE	1024
 
 float *buffer[NBUFFERS];

+ 134 - 0
tests/datawizard/sync_with_data_with_mem_non_blocking.c

@@ -0,0 +1,134 @@
+/*
+ * StarPU
+ * Copyright (C) INRIA 2008-2009 (see AUTHORS file)
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <starpu.h>
+#include <stdlib.h>
+
+#define NBUFFERS	64
+#define NITER		128
+#define VECTORSIZE	1024
+
+float *buffer[NBUFFERS];
+
+starpu_data_handle v_handle[NBUFFERS];
+
+static void dummy_codelet(void *descr[], __attribute__ ((unused)) void *_args)
+{
+}
+
+static starpu_codelet cl = {
+	.where = CORE|CUDA,
+	.core_func = dummy_codelet,
+#ifdef USE_CUDA
+	.cuda_func = dummy_codelet,
+#endif
+	.nbuffers = 1
+};
+
+void use_handle(starpu_data_handle handle)
+{
+	int ret;
+	struct starpu_task *task;
+
+	task = starpu_task_create();
+		task->cl = &cl;
+		task->buffers[0].handle = handle;
+		task->buffers[0].mode = STARPU_RW;
+		task->detach = 0;
+
+	ret = starpu_submit_task(task);
+	if (ret == -ENODEV)
+	{
+		/* No one can execute such a task, but that's not a failure
+		 * of the test either. */
+		exit(0);
+	}
+}
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+static unsigned n_synced_buffers;
+
+void callback_sync_data(void *arg __attribute__ ((unused)))
+{
+	pthread_mutex_lock(&mutex);
+
+	n_synced_buffers++;
+
+	if (n_synced_buffers == NBUFFERS)
+		pthread_cond_signal(&cond);
+
+	pthread_mutex_unlock(&mutex);
+}
+
+int main(int argc, char **argv)
+{
+	starpu_init(NULL);
+
+	/* Allocate all buffers and register them to StarPU */
+	unsigned b;
+	for (b = 0; b < NBUFFERS; b++)
+	{
+		starpu_malloc_pinned_if_possible((void **)&buffer[b], VECTORSIZE);
+		starpu_register_vector_data(&v_handle[b], 0,
+				(uintptr_t)buffer[b], VECTORSIZE, sizeof(char));
+	}
+
+	unsigned iter;
+	for (iter = 0; iter < NITER; iter++)
+	{
+		/* Use the buffers on the different workers so that it may not
+		 * be in main memory anymore */
+		for (b = 0; b < NBUFFERS; b++)
+			use_handle(v_handle[b]);
+	
+		starpu_wait_all_tasks();
+
+		pthread_mutex_lock(&mutex);
+		n_synced_buffers = 0;
+		pthread_mutex_unlock(&mutex);
+
+		/* Grab the different pieces of data into main memory */
+		for (b = 0; b < NBUFFERS; b++)
+		{
+			starpu_sync_data_with_mem_non_blocking(v_handle[b], STARPU_RW,
+					callback_sync_data, NULL);
+		}
+
+		/* Wait for all buffers to be available */
+		pthread_mutex_lock(&mutex);
+
+		while (n_synced_buffers != NBUFFERS)
+			pthread_cond_wait(&cond, &mutex);
+
+		pthread_mutex_unlock(&mutex);
+
+		/* Release them */
+		for (b = 0; b < NBUFFERS; b++)
+			starpu_release_data_from_mem(v_handle[b]);
+	}
+
+	/* do some cleanup */
+	for (b = 0; b < NBUFFERS; b++)
+		starpu_delete_data(v_handle[b]);
+
+	starpu_shutdown();
+
+	return 0;
+}