浏览代码

add async_full_read + async_full_write + matching test + update backend unistd

Corentin Salingue 7 年之前
父节点
当前提交
7df4eaa13b

+ 2 - 0
src/core/disk_ops/disk_unistd.c

@@ -69,6 +69,8 @@ struct starpu_disk_ops starpu_disk_unistd_ops =
 #ifdef HAVE_AIO_H
 	.async_read = starpu_unistd_global_async_read,
 	.async_write = starpu_unistd_global_async_write,
+	.async_full_read = starpu_unistd_global_async_full_read,
+	.async_full_write = starpu_unistd_global_async_full_write,
 	.wait_request = starpu_unistd_global_wait_request,
 	.test_request = starpu_unistd_global_test_request,
 	.free_request = starpu_unistd_global_free_request,

+ 47 - 0
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -536,6 +536,53 @@ int starpu_unistd_global_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *ob
 	return starpu_unistd_global_write(base, obj, ptr, 0, size);
 }
 
+#if HAVE_AIO_H
+void * starpu_unistd_global_async_full_read (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node)
+{
+        struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
+	int fd = tmp->descriptor;
+
+	if (fd < 0)
+		fd = _starpu_unistd_reopen(obj);
+#ifdef STARPU_HAVE_WINDOWS
+	*size = _filelength(fd);
+#else
+	struct stat st;
+	int ret = fstat(fd, &st);
+	STARPU_ASSERT(ret==0);
+
+	*size = st.st_size;
+#endif
+	if (tmp->descriptor < 0)
+		_starpu_unistd_reclose(fd);
+
+	/* Allocated aligned buffer */
+	_starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
+	return starpu_unistd_global_async_read(base, obj, *ptr, 0, *size);
+}
+
+void * starpu_unistd_global_async_full_write (void * base, void * obj, void * ptr, size_t size)
+{
+        struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
+
+        /* update file size to realise the next good full_read */
+        if(size != tmp->size)
+        {
+		int fd = tmp->descriptor;
+
+		if (fd < 0)
+			fd = _starpu_unistd_reopen(obj);
+		int val = _starpu_ftruncate(fd,size);
+		if (tmp->descriptor < 0)
+			_starpu_unistd_reclose(fd);
+		STARPU_ASSERT(val == 0);
+		tmp->size = size;
+        }
+
+	return starpu_unistd_global_async_write(base, obj, ptr, 0, size);
+}
+#endif
+
 #ifdef STARPU_UNISTD_USE_COPY
 static void * starpu_unistd_internal_thread(void * arg)
 {

+ 2 - 0
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -51,6 +51,8 @@ void starpu_unistd_global_unplug (void *base);
 int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node, void *base);
 void* starpu_unistd_global_async_read (void *base, void *obj, void *buf, off_t offset, size_t size);
 void* starpu_unistd_global_async_write (void *base, void *obj, void *buf, off_t offset, size_t size);
+void * starpu_unistd_global_async_full_write (void * base, void * obj, void * ptr, size_t size);
+void * starpu_unistd_global_async_full_read (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 void starpu_unistd_global_wait_request(void * async_channel);
 int starpu_unistd_global_test_request(void * async_channel);
 void starpu_unistd_global_free_request(void * async_channel);

+ 54 - 3
src/datawizard/copy_driver.c

@@ -539,6 +539,8 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
                 {
                         req->async_channel.type = STARPU_DISK_RAM;
                         req->async_channel.event.disk_event.requests = NULL;
+                        req->async_channel.event.disk_event.ptr = NULL;
+                        req->async_channel.event.disk_event.handle = NULL;
                 }
 		if(copy_methods->any_to_any)
 			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled() ? &req->async_channel : NULL);
@@ -551,14 +553,22 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			handle->ops->pack_data(handle, src_node, &ptr, &size);
 			ret = _starpu_disk_full_write(src_node, dst_node, obj, ptr, size, req && !starpu_asynchronous_copy_disabled() ? &req->async_channel : NULL);
 			if (ret == 0)
+			{
 				/* write is already finished, ptr was allocated in pack_data */
 				_starpu_free_flags_on_node(src_node, ptr, size, 0);
+			}
+			else if (ret == -EAGAIN)
+			{
+				req->async_channel.event.disk_event.ptr = ptr;
+				req->async_channel.event.disk_event.node = dst_node;
+				req->async_channel.event.disk_event.size = size;
+			}
 
 #ifdef STARPU_DEVEL
 #warning TODO: support asynchronous disk requests for packed data
 #endif
 			/* For now, asynchronous is not supported */
-			STARPU_ASSERT(ret == 0);
+			STARPU_ASSERT(ret == 0 || ret == -EAGAIN);
 		}
 		break;
 
@@ -567,6 +577,8 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
                 {
                         req->async_channel.type = STARPU_DISK_RAM;
                         req->async_channel.event.disk_event.requests = NULL;
+                        req->async_channel.event.disk_event.ptr = NULL;
+                        req->async_channel.event.disk_event.handle = NULL;
                 }
 		if(copy_methods->any_to_any)
 			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled()  ? &req->async_channel : NULL);
@@ -583,12 +595,19 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 				/* ptr is allocated in full_read */
 				_starpu_free_flags_on_node(dst_node, ptr, size, 0);
 			}
-
+			else if (ret == -EAGAIN)
+			{
+				req->async_channel.event.disk_event.ptr = ptr;
+				req->async_channel.event.disk_event.node = dst_node;
+				req->async_channel.event.disk_event.size = size;
+				req->async_channel.event.disk_event.handle = handle;
+			}
+			
 #ifdef STARPU_DEVEL
 #warning TODO: support asynchronous disk requests for packed data
 #endif
 			/* For now, asynchronous is not supported */
-			STARPU_ASSERT(ret == 0);
+			STARPU_ASSERT(ret == 0 || ret == -EAGAIN);
 		}
 		break;
 
@@ -597,6 +616,8 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
                 {
                         req->async_channel.type = STARPU_DISK_RAM;
                         req->async_channel.event.disk_event.requests = NULL;
+                        req->async_channel.event.disk_event.ptr = NULL;
+                        req->async_channel.event.disk_event.handle = NULL;
                 }
 		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled() ? &req->async_channel : NULL);
 		break;
@@ -886,6 +907,21 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 #endif
 	case STARPU_DISK_RAM:
 		starpu_disk_wait_request(async_channel);
+		if (async_channel->event.disk_event.ptr != NULL)
+		{
+			if (async_channel->event.disk_event.handle != NULL)
+			{
+				/* read is finished, we can already unpack */
+				async_channel->event.disk_event.handle->ops->unpack_data(async_channel->event.disk_event.handle, async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size);
+				/* ptr is allocated in full_read */
+				_starpu_free_flags_on_node(async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size, 0);
+			}
+			else
+			{
+				/* write is finished, ptr was allocated in pack_data */
+				_starpu_free_flags_on_node(async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size, 0);
+			}
+		}
 		break;
 	case STARPU_CPU_RAM:
 	default:
@@ -951,6 +987,21 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 #endif
 	case STARPU_DISK_RAM:
 		success = starpu_disk_test_request(async_channel);
+		if (async_channel->event.disk_event.ptr != NULL && success)
+		{
+			if (async_channel->event.disk_event.handle != NULL)
+			{
+				/* read is finished, we can already unpack */
+				async_channel->event.disk_event.handle->ops->unpack_data(async_channel->event.disk_event.handle, async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size);
+				/* ptr is allocated in full_read */
+				_starpu_free_flags_on_node(async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size, 0);
+			}
+			else
+			{
+				/* write is finished, ptr was allocated in pack_data */
+				_starpu_free_flags_on_node(async_channel->event.disk_event.node, async_channel->event.disk_event.ptr, async_channel->event.disk_event.size, 0);
+			}
+		}
 		break;
 	case STARPU_CPU_RAM:
 	default:

+ 5 - 1
src/datawizard/copy_driver.h

@@ -75,11 +75,15 @@ LIST_TYPE(_starpu_disk_backend_event,
 	void *backend_event;
 );
         
-
 struct _starpu_disk_async_event
 {
 	unsigned memory_node;
         struct _starpu_disk_backend_event_list * requests;
+
+	void * ptr;
+	unsigned node;
+	size_t size;
+	starpu_data_handle_t handle;
 };
 
 /* this is a structure that can be queried to see whether an asynchronous

+ 1 - 0
tests/Makefile.am

@@ -279,6 +279,7 @@ myPROGRAMS +=				\
 	datawizard/temporary_partition		\
 	datawizard/redux_acquire		\
 	disk/disk_copy				\
+	disk/disk_copy_unpack			\
 	disk/disk_copy_to_disk			\
 	disk/disk_compute			\
 	disk/disk_pack				\

+ 168 - 0
tests/disk/disk_copy_unpack.c

@@ -0,0 +1,168 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017  Inria
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <math.h>
+#include <common/config.h>
+#include "../helper.h"
+
+/*
+ * Test pack / unpack methods before pushing data on disk with async read/write.
+ */
+
+/* size of one vector */
+#ifdef STARPU_QUICK_CHECK
+#  define	DISK	64
+#  define	NX	(256*1024/sizeof(double))
+#else
+#  define	NX	(32*1048576/sizeof(double))
+#  define	DISK	200
+#endif
+
+#if !defined(STARPU_HAVE_SETENV)
+#warning setenv is not defined. Skipping test
+int main(int argc, char **argv)
+{
+	return STARPU_TEST_SKIPPED;
+}
+#else
+
+int dotest(struct starpu_disk_ops *ops, void *param)
+{
+	unsigned *A;
+	int ret;
+
+	/* Initialize StarPU without GPU devices to make sure the memory of the GPU devices will not be used */
+	struct starpu_conf conf;
+	ret = starpu_conf_init(&conf);
+	if (ret == -EINVAL)
+		return EXIT_FAILURE;
+	conf.ncuda = 0;
+	conf.nopencl = 0;
+	conf.nmic = 0;
+	conf.nscc = 0;
+	ret = starpu_init(&conf);
+	if (ret == -ENODEV) goto enodev;
+
+	/* register a disk */
+	int new_dd = starpu_disk_register(ops, param, 1024*1024*DISK);
+	/* can't write on /tmp/ */
+	if (new_dd == -ENOENT) goto enoent;
+
+	/* allocate two memory spaces */
+	starpu_malloc_flags((void **)&A, NX*sizeof(unsigned), STARPU_MALLOC_COUNT);
+
+	FPRINTF(stderr, "TEST DISK MEMORY \n");
+
+	unsigned int j;
+	/* initialization with bad values */
+	for(j = 0; j < NX; ++j)
+	{
+		A[j] = j;
+	}
+
+	starpu_data_handle_t vector_handleA;
+
+	static const struct starpu_data_copy_methods my_vector_copy_data_methods_s =
+	{
+		.any_to_any = NULL,
+	};
+	
+	starpu_interface_vector_ops.copy_methods = &my_vector_copy_data_methods_s;
+
+	/* register vector in starpu */
+	starpu_vector_data_register(&vector_handleA, STARPU_MAIN_RAM, (uintptr_t)A, NX, sizeof(unsigned));
+
+	/* Move and invalidate copy to an other disk */
+	starpu_data_acquire_on_node(vector_handleA, new_dd, STARPU_RW);
+	starpu_data_release_on_node(vector_handleA, new_dd);
+
+	starpu_data_acquire_on_node(vector_handleA, new_dd, STARPU_RW);
+	starpu_data_release_on_node(vector_handleA, new_dd);
+
+	/* free them */
+	starpu_data_unregister(vector_handleA);
+
+	/* check if computation is correct */
+	int try = 1;
+	for (j = 0; j < NX; ++j)
+		if (A[j] != j)
+		{
+			FPRINTF(stderr, "Fail A %u != %u \n", A[j], j);
+			try = 0;
+		}
+
+	starpu_free_flags(A, NX*sizeof(double), STARPU_MALLOC_COUNT);
+
+	/* terminate StarPU, no task can be submitted after */
+	starpu_shutdown();
+
+	if(try)
+		FPRINTF(stderr, "TEST SUCCESS\n");
+	else
+		FPRINTF(stderr, "TEST FAIL\n");
+	return try ? EXIT_SUCCESS : EXIT_FAILURE;
+
+enodev:
+	return STARPU_TEST_SKIPPED;
+enoent:
+	FPRINTF(stderr, "Couldn't write data: ENOENT\n");
+	starpu_shutdown();
+	return STARPU_TEST_SKIPPED;
+}
+
+static int merge_result(int old, int new)
+{
+	if (new == EXIT_FAILURE)
+		return EXIT_FAILURE;
+	if (old == 0)
+		return 0;
+	return new;
+}
+
+int main(void)
+{
+	int ret = 0;
+	int ret2;
+	char s[128];
+	char *ptr;
+
+	snprintf(s, sizeof(s), "/tmp/%s-disk-XXXXXX", getenv("USER"));
+	ptr = _starpu_mkdtemp(s);
+	if (!ptr)
+	{
+		FPRINTF(stderr, "Cannot make directory <%s>\n", s);
+		return STARPU_TEST_SKIPPED;
+	}
+
+	ret = merge_result(ret, dotest(&starpu_disk_stdio_ops, s));
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_ops, s));
+#ifdef STARPU_LINUX_SYS
+	ret = merge_result(ret, dotest(&starpu_disk_unistd_o_direct_ops, s));
+#endif
+#ifdef STARPU_HAVE_HDF5
+	ret = merge_result(ret, dotest(&starpu_disk_hdf5_ops, s));
+#endif
+
+	ret2 = rmdir(s);
+	if (ret2 < 0)
+		STARPU_CHECK_RETURN_VALUE(-errno, "rmdir '%s'\n", s);
+	return ret;
+}
+#endif