Sfoglia il codice sorgente

add support pack/unpack for out-of-core

Corentin Salingue 12 anni fa
parent
commit
e66c1994ff

+ 5 - 0
include/starpu_data_interfaces.h

@@ -400,6 +400,11 @@ size_t starpu_data_get_size(starpu_data_handle_t handle);
 
 starpu_data_handle_t starpu_data_lookup(const void *ptr);
 
+struct starpu_disk_interface
+{
+	uintptr_t dev_handle;
+};
+
 #ifdef __cplusplus
 }
 #endif

+ 2 - 2
include/starpu_disk.h

@@ -35,8 +35,8 @@ struct starpu_disk_ops {
 	 int    (*bandwidth)    (unsigned node);
 	 void   (*wait_request) (void * async_channel);
 	 int    (*test_request) (void * async_channel);
-	 int	(*full_read)    (void * base, void * obj, void ** ptr, size_t * size);
-	 int 	(*full_write)   (void * base, void * obj, void * ptr, size_t size);
+	 int	(*full_read)    (unsigned node, void * base, void * obj, void ** ptr, size_t * size);
+	 int 	(*full_write)   (unsigned node, void * base, void * obj, void * ptr, size_t size);
 };
 
 

+ 2 - 2
src/core/disk.c

@@ -153,13 +153,13 @@ _starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, of
 int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void * obj, void ** ptr, size_t * size)
 {
 	int pos = get_location_with_node(src_node);
-        return disk_register_list[pos]->functions->full_read(disk_register_list[pos]->base, obj, ptr, size);
+        return disk_register_list[pos]->functions->full_read(src_node, disk_register_list[pos]->base, obj, ptr, size);
 }
 
 int _starpu_disk_full_write(unsigned src_node, unsigned dst_node, void * obj, void * ptr, size_t size)
 {
 	int pos = get_location_with_node(dst_node);
-        return disk_register_list[pos]->functions->full_write(disk_register_list[pos]->base, obj, ptr, size);
+        return disk_register_list[pos]->functions->full_write(dst_node, disk_register_list[pos]->base, obj, ptr, size);
 }
 
 /* src_node == STARPU_MAIN_RAM and dst_node == disk node */

+ 38 - 1
src/core/disk_ops/disk_stdio.c

@@ -27,6 +27,7 @@
 #include <core/disk.h>
 #include <core/perfmodel/perfmodel.h>
 #include <datawizard/copy_driver.h>
+#include <datawizard/memory_manager.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -234,6 +235,16 @@ starpu_stdio_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *bu
 	return aio_read(aiocb);
 }
 
+static int
+starpu_stdio_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+
+	*size = tmp->size;
+	*ptr = malloc(*size);
+	return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
+}
+
 /* write on the memory disk */
 static int 
 starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
@@ -271,6 +282,30 @@ starpu_stdio_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *b
         return aio_write(aiocb);
 }
 
+static int
+starpu_stdio_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+	
+	/* update file size to realise the next good full_read */
+	if(size != tmp->size)
+	{
+		_starpu_memory_manager_deallocate_size(tmp->size, node);
+		if (_starpu_memory_manager_can_allocate_size(size, node))
+		{
+#ifdef STARPU_HAVE_WINDOWS
+			int val = _chsize(tmp->descriptor, size);
+#else
+			int val = ftruncate(tmp->descriptor,size);
+#endif
+
+			STARPU_ASSERT_MSG(val < 0,"StarPU Error to truncate file in STDIO full_write function");
+		}
+	}	
+	return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
+}
+
+
 /* create a new copy of parameter == base */
 static void * 
 starpu_stdio_plug (void *parameter)
@@ -421,5 +456,7 @@ struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.copy = NULL,
 	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
 	.wait_request = starpu_stdio_wait_request,
-	.test_request = starpu_stdio_test_request
+	.test_request = starpu_stdio_test_request,
+	.full_read = starpu_stdio_full_read,
+	.full_write = starpu_stdio_full_write
 };

+ 3 - 1
src/core/disk_ops/disk_unistd.c

@@ -65,5 +65,7 @@ struct starpu_disk_ops starpu_disk_unistd_ops = {
 	.async_read = starpu_unistd_global_async_read,
 	.async_write = starpu_unistd_global_async_write,
 	.wait_request = starpu_unistd_global_wait_request,
-	.test_request = starpu_unistd_global_test_request
+	.test_request = starpu_unistd_global_test_request,
+        .full_read = starpu_unistd_global_full_read,
+        .full_write = starpu_unistd_global_full_write
 };

+ 3 - 1
src/core/disk_ops/disk_unistd_o_direct.c

@@ -99,5 +99,7 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops = {
         .async_read = starpu_unistd_global_async_read,
         .async_write = starpu_unistd_global_async_write,
         .wait_request = starpu_unistd_global_wait_request,
-        .test_request = starpu_unistd_global_test_request
+        .test_request = starpu_unistd_global_test_request,
+	.full_read = starpu_unistd_global_full_read,
+	.full_write = starpu_unistd_global_full_write
 };

+ 32 - 0
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -28,6 +28,7 @@
 #include <core/perfmodel/perfmodel.h>
 #include <core/disk_ops/unistd/disk_unistd_global.h>
 #include <datawizard/copy_driver.h>
+#include <datawizard/memory_manager.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -199,6 +200,15 @@ starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj,
         return aio_read(aiocb);
 }
 
+int
+starpu_unistd_global_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        *size = tmp->size;
+        *ptr = malloc(*size);
+	return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
+}
 
 
 /* write on the memory disk */
@@ -240,6 +250,28 @@ starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj,
         return aio_write(aiocb);
 }
 
+int
+starpu_unistd_global_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        /* update file size to realise the next good full_read */
+        if(size != tmp->size)
+        {
+                _starpu_memory_manager_deallocate_size(tmp->size, node);
+                if (_starpu_memory_manager_can_allocate_size(size, node))
+                {
+#ifdef STARPU_HAVE_WINDOWS
+                        int val = _chsize(tmp->descriptor, size);
+#else
+                        int val = ftruncate(tmp->descriptor,size);
+#endif
+
+                        STARPU_ASSERT_MSG(val < 0,"StarPU Error to truncate file in STDIO full_write function");
+                }
+        }
+	return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
+}
 
 
 /* create a new copy of parameter == base */

+ 11 - 10
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -25,18 +25,19 @@ struct starpu_unistd_global_obj {
 	starpu_pthread_mutex_t mutex;
 };
 
- void * starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, size_t size STARPU_ATTRIBUTE_UNUSED);
- void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
- void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- int starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
- int starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel);
- void * starpu_unistd_global_plug (void *parameter);
- void starpu_unistd_global_unplug (void *base);
- int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
+void * starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, size_t size STARPU_ATTRIBUTE_UNUSED);
+void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
+void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
+void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
+int starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel);
+void * starpu_unistd_global_plug (void *parameter);
+void starpu_unistd_global_unplug (void *base);
+int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
 int starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
 int starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
 void starpu_unistd_global_wait_request(void * async_channel);
 int starpu_unistd_global_test_request(void * async_channel);
-
+int starpu_unistd_global_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size);
+int starpu_unistd_global_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size);
 #endif

+ 8 - 4
src/datawizard/copy_driver.c

@@ -403,12 +403,14 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 
 		else
 		{
-			//TODO OBJ
-			void * obj;
+			struct starpu_disk_interface * disk_interface = (struct starpu_disk_interface *) dst_interface; 
+			void * obj = (void *) disk_interface->dev_handle;
 			void * ptr = NULL;
 			starpu_ssize_t size = 0;
 			handle->ops->pack_data(handle, src_node, &ptr, &size);
 			ret = _starpu_disk_full_write(src_node, dst_node, obj, ptr, size);
+			/* ptr is allocated in pack_data */
+			free(ptr);
 		}
 		break;
 		
@@ -417,12 +419,14 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		else
 		{
-			//TODO OBJ
-			void * obj;
+			struct starpu_disk_interface * disk_interface = (struct starpu_disk_interface *) src_interface; 
+			void * obj = (void *) disk_interface->dev_handle;
 			void * ptr = NULL;
 			size_t size = 0;
 			ret = _starpu_disk_full_read(src_node, dst_node, obj, &ptr, &size);
 			handle->ops->unpack_data(handle, dst_node, ptr, size); 
+			/* ptr is allocated in full_read */
+			free(ptr);
 		}
 		break;
 

+ 1 - 0
src/datawizard/interfaces/data_interface.h

@@ -34,6 +34,7 @@ union _starpu_interface
 	struct starpu_bcsr_interface bcsr;
 	struct starpu_variable_interface variable;
 	struct starpu_multiformat_interface multiformat;
+	struct starpu_disk_interface disk;
 };
 
 /* Some data interfaces or filters use this interface internally */