Parcourir la source

try to add asynchronous input/output

Corentin Salingue il y a 12 ans
Parent
commit
8899fca875

+ 7 - 4
include/starpu_disk.h

@@ -24,13 +24,16 @@ struct starpu_disk_ops {
 	 void    (*free)   (void *base, void *obj, size_t size);
 	 void *  (*open)   (void *base, void *pos, size_t size);     /* open an existing file */
 	 void    (*close)  (void *base, void *obj, size_t size);
-	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
-	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * async); 
+	 int     (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	 int     (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * async); 
+	 int     (*async_write)  (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	 int     (*async_read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
 	/* readv, writev, read2d, write2d, etc. */
 	 void *  (*plug)   (void *parameter);
 	 void    (*unplug) (void *base);
-	  int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);
-	  int    (*bandwidth) (unsigned node);
+	 int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
+	 int    (*bandwidth)    (unsigned node);
+	 void   (*wait_request) (void * async_channel);
 };
 
 

+ 55 - 11
src/core/disk.c

@@ -121,30 +121,69 @@ _starpu_disk_free(unsigned node, void *obj, size_t size)
 	disk_register_list[pos]->functions->free(disk_register_list[pos]->base, obj, size);
 }
 
-ssize_t 
-_starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+/* src_node == disk node and dst_node == STARPU_MAIN_RAM */
+int 
+_starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
-	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size, _starpu_aiocb_disk);
-}
+	int pos = get_location_with_node(src_node);
+        int values = 0;
 
+        if (async_channel != NULL && disk_register_list[pos]->functions->async_read != NULL)
+	{
+		struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel; 
+		channel->event.disk_event.memory_node = src_node;
+		channel->type = STARPU_DISK_RAM;
+	
+		_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+		values = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+	}
+	/* asynchronous request failed or synchronous request is asked */	
+	if (async_channel == NULL || values < 0)
+	{
+		
+		disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+		return 0;
+	}
+	return -EAGAIN;
+}
 
-ssize_t 
-_starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+/* src_node == STARPU_MAIN_RAM and dst_node == disk node */
+int 
+_starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
-	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size, _starpu_aiocb_disk);
+	int pos = get_location_with_node(dst_node);
+        int values = 0;
+
+        if (async_channel != NULL && disk_register_list[pos]->functions->async_write != NULL)
+        {
+        	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        	channel->event.disk_event.memory_node = dst_node;
+		channel->type = STARPU_DISK_RAM;
+
+                _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+		return disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+        	_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+        }
+        /* asynchronous request failed or synchronous request is asked */
+        if (async_channel == NULL || values < 0)
+        {
+		return disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+        	return 0;
+        }
+        return -EAGAIN;
+
 }
 
 int
-_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * _starpu_aiocb_disk)
+_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel)
 {
 	int pos_src = get_location_with_node(node_src);
 	int pos_dst = get_location_with_node(node_dst);
 	/* both nodes have same copy function */
 	return disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src, 
 						        disk_register_list[pos_dst]->base, obj_dst, offset_dst,
-							size);
+							size, async_channel);
 }
 
 void * 
@@ -161,6 +200,11 @@ starpu_disk_close(unsigned node, void *obj, size_t size)
 	disk_register_list[position]->functions->close(disk_register_list[position]->base, obj, size);	
 }
 
+void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
+{
+	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	disk_register_list[position]->functions->wait_request((void *) async_channel);
+}
 
 static void 
 add_disk_in_list(unsigned node,  struct starpu_disk_ops * func, void * base)

+ 6 - 4
src/core/disk.h

@@ -27,13 +27,15 @@
 void * _starpu_disk_alloc (unsigned node, size_t size);
 
 void _starpu_disk_free (unsigned node, void *obj, size_t size);
+/* src_node is a disk node, dst_node is for the moment the STARPU_MAIN_RAM */
+int _starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+/* src_node is for the moment the STARU_MAIN_RAM, dst_node is a disk node */ 
+int _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
 
-ssize_t _starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
+int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
 
-ssize_t _starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
-
-int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * _starpu_aiocb_disk);
 
+void starpu_disk_wait_request(struct _starpu_async_channel *async_channel);
 /* interface to compare memory disk */
 
 int _starpu_is_same_kind_disk(unsigned node1, unsigned node2);

+ 66 - 12
src/core/disk_ops/disk_stdio.c

@@ -19,10 +19,12 @@
 #include <stdlib.h>
 #include <sys/stat.h>
 #include <sys/time.h>
+#include <aio.h>
 
 #include <starpu.h>
 #include <core/disk.h>
 #include <core/perfmodel/perfmodel.h>
+#include <datawizard/copy_driver.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -192,27 +194,47 @@ starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size S
 
 
 /* read the memory disk */
-static ssize_t 
-starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+static int 
+starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
-	
+		
 	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
 
 	int res = fseek(tmp->file, offset, SEEK_SET); 
-		STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
+	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
-		ssize_t nb = fread (buf, 1, size, tmp->file);
+	ssize_t nb = fread (buf, 1, size, tmp->file);
+	STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
 
-	return nb;
+	return 0;
 }
 
+static int
+starpu_stdio_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+      
+	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        
+	memset(aiocb, 0, sizeof(struct aiocb));
+        
+	aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+	aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP; 
+
+	return aio_read(aiocb);
+}
 
 /* write on the memory disk */
-static ssize_t 
-starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+static int 
+starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
@@ -228,6 +250,24 @@ starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *b
 	return nb;
 }
 
+static int
+starpu_stdio_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP; 
+
+        return aio_write(aiocb);
+}
 
 /* create a new copy of parameter == base */
 static void * 
@@ -272,7 +312,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_DISK_MIN, NULL);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
 		/* clean cache memory */
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
@@ -298,7 +338,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
 
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
@@ -321,7 +361,18 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	return 1;
 }
 
-
+static void 
+starpu_stdio_wait_request(void * async_channel)
+{
+	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+	const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+	const struct aiocb * list[1];
+	list[0] = aiocb;
+	int values = -1;
+	while(values < 0)
+		/* Wait the answer of the request TIMESTAMP IS NULL */
+		values = aio_suspend(list, 1, NULL);
+}
 
 struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.alloc = starpu_stdio_alloc,
@@ -329,9 +380,12 @@ struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.open = starpu_stdio_open,
 	.close = starpu_stdio_close,
 	.read = starpu_stdio_read,
+	.async_read = starpu_stdio_async_read,
 	.write = starpu_stdio_write,
+	.async_write = starpu_stdio_async_write,
 	.plug = starpu_stdio_plug,
 	.unplug = starpu_stdio_unplug,
 	.copy = NULL,
-	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram
+	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
+	.wait_request = starpu_stdio_wait_request
 };

+ 7 - 6
src/core/disk_ops/disk_unistd_o_direct.c

@@ -53,26 +53,26 @@ starpu_unistd_o_direct_open (void *base, void *pos, size_t size)
 
 
 /* read the memory disk */
-static ssize_t 
-starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+static int 
+starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only read a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t) buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_read (base, obj, buf, offset, size, _starpu_aiocb_disk);
+	return starpu_unistd_global_read (base, obj, buf, offset, size, async_channel);
 }
 
 
 /* write on the memory disk */
-static ssize_t 
-starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+static int 
+starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only write a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t)buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_write (base, obj, buf, offset, size, _starpu_aiocb_disk);
+	return starpu_unistd_global_write (base, obj, buf, offset, size, async_channel);
 }
 
 
@@ -92,6 +92,7 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops = {
 	.close = starpu_unistd_global_close,
 	.read = starpu_unistd_o_direct_read,
 	.write = starpu_unistd_o_direct_write,
+	.async_write = NULL,
 	.plug = starpu_unistd_o_direct_plug,
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,

+ 7 - 7
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -157,8 +157,8 @@ starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_
 
 
 /* read the memory disk */
- ssize_t 
-starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+ int 
+starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
@@ -177,13 +177,13 @@ starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *
 
 
 /* write on the memory disk */
- ssize_t 
-starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
+ int 
+starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
 	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
-
+	
 	int res = lseek(tmp->descriptor, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd write failed");
 
@@ -242,7 +242,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_BENCH, NULL);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_BENCH, NULL);
 
 #ifdef STARPU_HAVE_WINDOWS
 		res = _commit(tmp->descriptor);
@@ -267,7 +267,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize(), NULL);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize(), NULL);
 
 #ifdef STARPU_HAVE_WINDOWS
 		res = _commit(tmp->descriptor);

+ 2 - 2
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -29,8 +29,8 @@ struct starpu_unistd_global_obj {
  void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
  void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
  void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- ssize_t starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
- ssize_t starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
+ int starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+ int starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel);
  void * starpu_unistd_global_plug (void *parameter);
  void starpu_unistd_global_unplug (void *base);
  int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);

+ 0 - 1
src/datawizard/coherency.h

@@ -27,7 +27,6 @@
 #include <common/fxt.h>
 #include <common/list.h>
 
-#include <datawizard/data_request.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <datawizard/datastats.h>
 #include <datawizard/memstats.h>

+ 7 - 5
src/datawizard/copy_driver.c

@@ -24,6 +24,7 @@
 #include <starpu_opencl.h>
 #include <starpu_cuda.h>
 #include <profiling/profiling.h>
+#include <core/disk.h>
 
 #ifdef STARPU_SIMGRID
 #include <core/simgrid.h>
@@ -88,7 +89,7 @@ static unsigned communication_cnt = 0;
 static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 				    struct _starpu_data_replicate *src_replicate,
 				    struct _starpu_data_replicate *dst_replicate,
-				    struct _starpu_data_request *req STARPU_ATTRIBUTE_UNUSED)
+				    struct _starpu_data_request *req)
 {
 	unsigned src_node = src_replicate->memory_node;
 	unsigned dst_node = dst_replicate->memory_node;
@@ -575,19 +576,19 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 		return _starpu_disk_copy_src_to_disk(
 			(void*) src + src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size, &async_channel->event._starpu_aiocb_disk);
+			size, async_channel);
 	}
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_CPU_RAM):
 		return _starpu_disk_copy_disk_to_src(
 			(void*) src, src_offset, src_node,
 			(void*) dst + dst_offset, dst_node,
-			size, &async_channel->event._starpu_aiocb_disk);
+			size, async_channel);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_DISK_RAM):
 		return _starpu_disk_copy_disk_to_disk(
 			(void*) src, src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size, &async_channel->event._starpu_aiocb_disk);
+			size, async_channel);
 
 	default:
 		STARPU_ABORT();
@@ -646,6 +647,8 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 		_starpu_mic_wait_request_completion(&(async_channel->event.mic_event));
 		break;
 #endif
+	case STARPU_MAIN_RAM:
+		starpu_disk_wait_request(async_channel);
 	case STARPU_CPU_RAM:
 	default:
 		STARPU_ABORT();
@@ -667,7 +670,6 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 #ifdef STARPU_USE_CUDA
 	cudaEvent_t event;
 #endif
-
 	switch (kind)
 	{
 #ifdef STARPU_USE_CUDA

+ 7 - 1
src/datawizard/copy_driver.h

@@ -50,6 +50,12 @@ struct _starpu_mic_async_event
 };
 #endif
 
+struct _starpu_disk_async_event
+{
+        struct aiocb _starpu_aiocb_disk;
+	unsigned memory_node;
+};
+
 /* this is a structure that can be queried to see whether an asynchronous
  * transfer has terminated or not */
 union _starpu_async_channel_event
@@ -71,7 +77,7 @@ union _starpu_async_channel_event
 #ifdef STARPU_USE_MIC
 	struct _starpu_mic_async_event mic_event;
 #endif
-	struct aiocb _starpu_aiocb_disk;
+	struct _starpu_disk_async_event disk_event;
 };
 
 struct _starpu_async_channel

+ 6 - 6
src/drivers/disk/driver_disk.c

@@ -18,30 +18,30 @@
 #include <core/disk.h>
 #include <starpu_profiling.h>
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_CPU_RAM);
 	
-	_starpu_disk_write(dst_node, dst, src, dst_offset, size, _starpu_aiocb_disk);
+	_starpu_disk_write(src_node, dst_node, dst, src, dst_offset, size, async_channel);
 	return 0;
 }
 
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(dst_node) == STARPU_CPU_RAM);
 
-	_starpu_disk_read(src_node, src, dst, src_offset, size, _starpu_aiocb_disk);
+	_starpu_disk_read(src_node, dst_node, src, dst, src_offset, size, async_channel);
 	return 0;
 }
 
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM);
 
        return _starpu_disk_copy(src_node, src, src_offset, 
 			       dst_node, dst, dst_offset,
-			       size, _starpu_aiocb_disk); 
+			       size, async_channel); 
 
 }

+ 3 - 3
src/drivers/disk/driver_disk.h

@@ -17,10 +17,10 @@
 #ifndef __DRIVER_DISK_H__
 #define __DRIVER_DISK_H__
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel);
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * async_channel);
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel);
 
 #endif

+ 2 - 2
tests/disk/disk_compute.c

@@ -65,7 +65,7 @@ int main(int argc, char **argv)
 
 
 	/* register a disk */
-	int new_dd = starpu_disk_register(&starpu_disk_unistd_ops, (void *) base, 1024*1024*1);
+	int new_dd = starpu_disk_register(&starpu_disk_stdio_ops, (void *) base, 1024*1024*1);
 	/* can't write on /tmp/ */
 	if (new_dd == -ENOENT) goto enoent;
 	
@@ -166,7 +166,7 @@ int main(int argc, char **argv)
 	for (j = 0; j < NX; ++j)
 		if (A[j] != C[j])
 		{
-			printf("Fail A %d != C %d \n", A[j], C[j]);
+//			printf("Fail A %d != C %d \n", A[j], C[j]);
 			try = 0;
 		}