Browse Source

merge out-of-core async support

Samuel Thibault 12 years ago
parent
commit
99eaef0508

+ 5 - 0
include/starpu_data_interfaces.h

@@ -400,6 +400,11 @@ size_t starpu_data_get_size(starpu_data_handle_t handle);
 
 starpu_data_handle_t starpu_data_lookup(const void *ptr);
 
+struct starpu_disk_interface
+{
+	uintptr_t dev_handle;
+};
+
 #ifdef __cplusplus
 }
 #endif

+ 19 - 12
include/starpu_disk.h

@@ -18,18 +18,25 @@
 #ifndef __STARPU_DISK_H__
 #define __STARPU_DISK_H__
 
-struct starpu_disk_ops
-{
-	void *  (*alloc)  (void *base, size_t size);
-	void    (*free)   (void *base, void *obj, size_t size);
-	void *  (*open)   (void *base, void *pos, size_t size);
-	void    (*close)  (void *base, void *obj, size_t size);
-	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size);
-	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size);
-	void *  (*plug)   (void *parameter);
-	void    (*unplug) (void *base);
-	int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);
-	int    (*bandwidth) (unsigned node);
+/* list of functions to use on disk */
+struct starpu_disk_ops {
+ 	 void *  (*alloc)  (void *base, size_t size);
+	 void    (*free)   (void *base, void *obj, size_t size);
+	 void *  (*open)   (void *base, void *pos, size_t size);     /* open an existing file */
+	 void    (*close)  (void *base, void *obj, size_t size);
+	 int     (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	 int     (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * async); 
+	 int     (*async_write)  (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	 int     (*async_read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	/* readv, writev, read2d, write2d, etc. */
+	 void *  (*plug)   (void *parameter);
+	 void    (*unplug) (void *base);
+	 int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
+	 int    (*bandwidth)    (unsigned node);
+	 void   (*wait_request) (void * async_channel);
+	 int    (*test_request) (void * async_channel);
+	 int	(*full_read)    (unsigned node, void * base, void * obj, void ** ptr, size_t * size);
+	 int 	(*full_write)   (unsigned node, void * base, void * obj, void * ptr, size_t size);
 };
 
 /* Posix functions to use disk memory */

+ 82 - 10
src/core/disk.c

@@ -121,30 +121,86 @@ _starpu_disk_free(unsigned node, void *obj, size_t size)
 	disk_register_list[pos]->functions->free(disk_register_list[pos]->base, obj, size);
 }
 
-ssize_t 
-_starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size)
+/* src_node == disk node and dst_node == STARPU_MAIN_RAM */
+int 
+_starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
-	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size);
+	int pos = get_location_with_node(src_node);
+        int values = -1;
+
+        if (async_channel != NULL)
+	{
+		struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel; 
+		channel->type = STARPU_DISK_RAM;
+		channel->event.disk_event.memory_node = src_node;	
+	
+		if (disk_register_list[pos]->functions->async_read != NULL)
+		{
+			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+			values = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+		}
+	}
+	/* asynchronous request failed or synchronous request is asked */	
+	if (async_channel == NULL || values < 0)
+	{
+		disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+		return 0;
+	}
+	return -EAGAIN;
 }
 
+int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
+{
+	int pos = get_location_with_node(src_node);
+        return disk_register_list[pos]->functions->full_read(src_node, disk_register_list[pos]->base, obj, ptr, size);
+}
 
-ssize_t 
-_starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size)
+int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void * obj, void * ptr, size_t size)
 {
-	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size);
+	int pos = get_location_with_node(dst_node);
+        return disk_register_list[pos]->functions->full_write(dst_node, disk_register_list[pos]->base, obj, ptr, size);
+}
+
+/* src_node == STARPU_MAIN_RAM and dst_node == disk node */
+int 
+_starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+	int pos = get_location_with_node(dst_node);
+        int values = -1;
+
+        if (async_channel != NULL)
+        {
+        	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+		channel->type = STARPU_DISK_RAM;
+		channel->event.disk_event.memory_node = dst_node;
+
+		if (disk_register_list[pos]->functions->async_read != NULL)
+                {
+			 _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+			values = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+        		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+		}
+        }
+        /* asynchronous request failed or synchronous request is asked */
+        if (async_channel == NULL || values < 0)
+        {
+		disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+        	return 0;
+        }
+        return -EAGAIN;
+
 }
 
 int
-_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size)
+_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel)
 {
 	int pos_src = get_location_with_node(node_src);
 	int pos_dst = get_location_with_node(node_dst);
 	/* both nodes have same copy function */
 	return disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src, 
 						        disk_register_list[pos_dst]->base, obj_dst, offset_dst,
-							size);
+							size, async_channel);
 }
 
 void * 
@@ -161,6 +217,22 @@ starpu_disk_close(unsigned node, void *obj, size_t size)
 	disk_register_list[position]->functions->close(disk_register_list[position]->base, obj, size);	
 }
 
+void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
+{
+	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	disk_register_list[position]->functions->wait_request((void *) async_channel);
+}
+
+int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
+{
+	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	if (disk_register_list[position]->functions->test_request != NULL && 
+	    disk_register_list[position]->functions->async_write != NULL &&
+	    disk_register_list[position]->functions->async_read != NULL)
+		return disk_register_list[position]->functions->test_request((void *) async_channel);
+	/* one of these functions is not defined => sync mode*/
+	return 1; 
+}	
 
 static void 
 add_disk_in_list(unsigned node,  struct starpu_disk_ops * func, void * base)

+ 11 - 4
src/core/disk.h

@@ -27,13 +27,20 @@
 void * _starpu_disk_alloc (unsigned node, size_t size);
 
 void _starpu_disk_free (unsigned node, void *obj, size_t size);
+/* src_node is a disk node, dst_node is for the moment the STARPU_MAIN_RAM */
+int _starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+/* src_node is for the moment the STARU_MAIN_RAM, dst_node is a disk node */ 
+int _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
 
-ssize_t _starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size);
+int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void * obj, void ** ptr, size_t * size);
+int _starpu_disk_full_write(unsigned src_node, unsigned dst_node, void * obj, void * ptr, size_t size);
 
-ssize_t _starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size);
-
-int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size);
+int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
 
+/* force the request to compute */
+void starpu_disk_wait_request(struct _starpu_async_channel *async_channel);
+/* return 1 if the request is finished, 0 if not finished */
+int starpu_disk_test_request(struct _starpu_async_channel *async_channel);
 /* interface to compare memory disk */
 
 int _starpu_is_same_kind_disk(unsigned node1, unsigned node2);

+ 155 - 8
src/core/disk_ops/disk_stdio.c

@@ -19,10 +19,15 @@
 #include <stdlib.h>
 #include <sys/stat.h>
 #include <sys/time.h>
+#include <aio.h>
+#include <errno.h>
+#include <time.h>
 
 #include <starpu.h>
 #include <core/disk.h>
 #include <core/perfmodel/perfmodel.h>
+#include <datawizard/copy_driver.h>
+#include <datawizard/memory_manager.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -37,6 +42,7 @@ struct starpu_stdio_obj {
 	FILE * file;
 	char * path;
 	double size;
+	starpu_pthread_mutex_t mutex;
 };
 
 
@@ -99,6 +105,8 @@ starpu_stdio_alloc (void *base, size_t size)
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->file = f;
 	obj->path = baseCpy;
@@ -114,6 +122,8 @@ starpu_stdio_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size ST
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	unlink(tmp->path);
 	fclose(tmp->file);
 	close(tmp->descriptor);
@@ -153,6 +163,8 @@ starpu_stdio_open (void *base, void *pos, size_t size)
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->file = f;
 	obj->path = baseCpy;
@@ -169,6 +181,8 @@ starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size S
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	fclose(tmp->file);
 	close(tmp->descriptor);
 	free(tmp->path);
@@ -177,34 +191,119 @@ starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size S
 
 
 /* read the memory disk */
-static ssize_t 
-starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+static int 
+starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+		
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
 
 	int res = fseek(tmp->file, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
 	ssize_t nb = fread (buf, 1, size, tmp->file);
+	STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
 
-	return nb;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
+	return 0;
 }
 
+static int
+starpu_stdio_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+      
+	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        
+	memset(aiocb, 0, sizeof(struct aiocb));
+        
+	aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+	aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP; 
+
+	return aio_read(aiocb);
+}
+
+static int
+starpu_stdio_full_read(unsigned node, void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+
+	*size = tmp->size;
+	*ptr = malloc(*size);
+	return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
+}
 
 /* write on the memory disk */
-static ssize_t 
-starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+static int 
+starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
 	int res = fseek(tmp->file, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
 
 	ssize_t nb = fwrite (buf, 1, size, tmp->file);
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 
+static int
+starpu_stdio_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP; 
+
+        return aio_write(aiocb);
+}
+
+static int
+starpu_stdio_full_write (unsigned node, void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
+{
+	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+	
+	/* update file size to realise the next good full_read */
+	if(size != tmp->size)
+	{
+		_starpu_memory_manager_deallocate_size(tmp->size, node);
+		if (_starpu_memory_manager_can_allocate_size(size, node))
+		{
+#ifdef STARPU_HAVE_WINDOWS
+			int val = _chsize(tmp->descriptor, size);
+#else
+			int val = ftruncate(tmp->descriptor,size);
+#endif
+
+			STARPU_ASSERT_MSG(val >= 0,"StarPU Error to truncate file in STDIO full_write function");
+			tmp->size = size;
+		}
+		else
+		{
+			STARPU_ASSERT_MSG(0, "Can't allocate size %u on the disk !", (int) size); 
+		}
+	}	
+	return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
+}
+
 
 /* create a new copy of parameter == base */
 static void * 
@@ -249,7 +348,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_DISK_MIN);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
 		/* clean cache memory */
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
@@ -275,7 +374,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
 
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Latency computation failed");
@@ -298,7 +397,49 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	return 1;
 }
 
+static void 
+starpu_stdio_wait_request(void * async_channel)
+{
+	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+	const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+	const struct aiocb * list[1];
+	list[0] = aiocb;
+	int values = -1;
+	int error_disk = EAGAIN;
+	while(values < 0 || error_disk == EAGAIN)
+	{
+		/* Wait the answer of the request TIMESTAMP IS NULL */
+		values = aio_suspend(list, 1, NULL);
+		error_disk = errno;
+	}
+}
 
+static int
+starpu_stdio_test_request(void * async_channel)
+{
+	struct timespec time_wait_request;
+	time_wait_request.tv_sec = 0;
+	time_wait_request.tv_nsec = 0;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+        
+	/* Wait the answer of the request */
+        values = aio_suspend(list, 1, &time_wait_request);
+        error_disk = errno;
+	/* request is finished */
+	if (values == 0)
+		return 1;
+	/* values == -1 */
+	if (error_disk == EAGAIN)
+		return 0;
+	/* an error occured */
+	STARPU_ABORT();	
+}
 
 struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.alloc = starpu_stdio_alloc,
@@ -306,9 +447,15 @@ struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.open = starpu_stdio_open,
 	.close = starpu_stdio_close,
 	.read = starpu_stdio_read,
+	.async_read = starpu_stdio_async_read,
 	.write = starpu_stdio_write,
+	.async_write = starpu_stdio_async_write,
 	.plug = starpu_stdio_plug,
 	.unplug = starpu_stdio_unplug,
 	.copy = NULL,
-	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram
+	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
+	.wait_request = starpu_stdio_wait_request,
+	.test_request = starpu_stdio_test_request,
+	.full_read = starpu_stdio_full_read,
+	.full_write = starpu_stdio_full_write
 };

+ 7 - 1
src/core/disk_ops/disk_unistd.c

@@ -61,5 +61,11 @@ struct starpu_disk_ops starpu_disk_unistd_ops = {
 	.plug = starpu_unistd_global_plug,
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,
-	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram
+	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
+	.async_read = starpu_unistd_global_async_read,
+	.async_write = starpu_unistd_global_async_write,
+	.wait_request = starpu_unistd_global_wait_request,
+	.test_request = starpu_unistd_global_test_request,
+        .full_read = starpu_unistd_global_full_read,
+        .full_write = starpu_unistd_global_full_write
 };

+ 13 - 7
src/core/disk_ops/disk_unistd_o_direct.c

@@ -53,26 +53,26 @@ starpu_unistd_o_direct_open (void *base, void *pos, size_t size)
 
 
 /* read the memory disk */
-static ssize_t 
-starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+static int 
+starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only read a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t) buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_read (base, obj, buf, offset, size);
+	return starpu_unistd_global_read (base, obj, buf, offset, size, async_channel);
 }
 
 
 /* write on the memory disk */
-static ssize_t 
-starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+static int 
+starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only write a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t)buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_write (base, obj, buf, offset, size);
+	return starpu_unistd_global_write (base, obj, buf, offset, size, async_channel);
 }
 
 
@@ -95,5 +95,11 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops = {
 	.plug = starpu_unistd_o_direct_plug,
 	.unplug = starpu_unistd_global_unplug,
 	.copy = NULL,
-	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram
+	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
+        .async_read = starpu_unistd_global_async_read,
+        .async_write = starpu_unistd_global_async_write,
+        .wait_request = starpu_unistd_global_wait_request,
+        .test_request = starpu_unistd_global_test_request,
+	.full_read = starpu_unistd_global_full_read,
+	.full_write = starpu_unistd_global_full_write
 };

+ 150 - 6
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -20,11 +20,15 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <stdint.h>
+#include <aio.h>
+#include <errno.h>
 
 #include <starpu.h>
 #include <core/disk.h>
 #include <core/perfmodel/perfmodel.h>
 #include <core/disk_ops/unistd/disk_unistd_global.h>
+#include <datawizard/copy_driver.h>
+#include <datawizard/memory_manager.h>
 
 #ifdef STARPU_HAVE_WINDOWS
         #include <io.h>
@@ -83,6 +87,8 @@ starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, s
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->path = baseCpy;
 	obj->size = size;
@@ -97,6 +103,8 @@ starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	unlink(tmp->path);
 	close(tmp->descriptor);
 
@@ -128,6 +136,8 @@ starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, vo
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->path = baseCpy;
 	obj->size = size;
@@ -143,6 +153,8 @@ starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	close(tmp->descriptor);
 	free(tmp->path);
 	free(tmp);	
@@ -150,37 +162,125 @@ starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_
 
 
 /* read the memory disk */
- ssize_t 
-starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+ int 
+starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
 	int res = lseek(tmp->descriptor, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for read failed: offset %lu got errno %d", (unsigned long) offset, errno);
 
 	ssize_t nb = read(tmp->descriptor, buf, size);
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd read failed: size %lu got errno %d", (unsigned long) size, errno);
 	
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 
 
+int
+starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP;
+
+        return aio_read(aiocb);
+}
+
+int
+starpu_unistd_global_full_read(unsigned node, void *base STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        *size = tmp->size;
+        *ptr = malloc(*size);
+	return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
+}
+
+
 /* write on the memory disk */
- ssize_t 
-starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+ int 
+starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	
 	int res = lseek(tmp->descriptor, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd lseek for write failed: offset %lu got errno %d", (unsigned long) offset, errno);
 
 	ssize_t nb = write (tmp->descriptor, buf, size);
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd write failed: size %lu got errno %d", (unsigned long) size, errno);
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 
 
+int
+starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
+        memset(aiocb, 0, sizeof(struct aiocb));
+
+        aiocb->aio_fildes = tmp->descriptor;
+        aiocb->aio_offset = offset;
+        aiocb->aio_nbytes = size;
+        aiocb->aio_buf = buf;
+        aiocb->aio_reqprio = 0;
+        aiocb->aio_lio_opcode = LIO_NOP;
+
+        return aio_write(aiocb);
+}
+
+int
+starpu_unistd_global_full_write (unsigned node, void * base STARPU_ATTRIBUTE_UNUSED, void * obj, void * ptr, size_t size)
+{
+        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+
+        /* update file size to realise the next good full_read */
+        if(size != tmp->size)
+        {
+                _starpu_memory_manager_deallocate_size(tmp->size, node);
+                if (_starpu_memory_manager_can_allocate_size(size, node))
+                {
+#ifdef STARPU_HAVE_WINDOWS
+                        int val = _chsize(tmp->descriptor, size);
+#else
+                        int val = ftruncate(tmp->descriptor,size);
+#endif
+
+                        STARPU_ASSERT_MSG(val >= 0,"StarPU Error to truncate file in UNISTD full_write function");
+			tmp->size = size;
+                }
+                else
+                {
+                        STARPU_ASSERT_MSG(0, "Can't allocate size %u on the disk !", (int) size);
+                }
+
+        }
+	return _starpu_disk_write(STARPU_MAIN_RAM, node, obj, ptr, 0, tmp->size, NULL);
+}
+
+
 /* create a new copy of parameter == base */
  void * 
 starpu_unistd_global_plug (void *parameter)
@@ -227,7 +327,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_BENCH);
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_BENCH, NULL);
 
 #ifdef STARPU_HAVE_WINDOWS
 		res = _commit(tmp->descriptor);
@@ -252,7 +352,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize());
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize(), NULL);
 
 #ifdef STARPU_HAVE_WINDOWS
 		res = _commit(tmp->descriptor);
@@ -272,3 +372,47 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 					       timing_latency/NITER, timing_latency/NITER, node);
 	return 1;
 }
+
+void
+starpu_unistd_global_wait_request(void * async_channel)
+{
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+        while(values < 0 || error_disk == EAGAIN)
+        {
+                /* Wait the answer of the request TIMESTAMP IS NULL */
+                values = aio_suspend(list, 1, NULL);
+                error_disk = errno;
+        }
+}
+
+int
+starpu_unistd_global_test_request(void * async_channel)
+{
+        struct timespec time_wait_request;
+        time_wait_request.tv_sec = 0;
+        time_wait_request.tv_nsec = 0;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+
+        /* Wait the answer of the request */
+        values = aio_suspend(list, 1, &time_wait_request);
+        error_disk = errno;
+        /* request is finished */
+        if (values == 0)
+                return 1;
+        /* values == -1 */
+        if (error_disk == EAGAIN)
+                return 0;
+        /* an error occured */
+        STARPU_ABORT();
+}

+ 16 - 10
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -22,16 +22,22 @@ struct starpu_unistd_global_obj {
         char * path;
         double size;
 	int flags;
+	starpu_pthread_mutex_t mutex;
 };
 
- void * starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, size_t size STARPU_ATTRIBUTE_UNUSED);
- void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
- void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- ssize_t starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size);
- ssize_t starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size);
- void * starpu_unistd_global_plug (void *parameter);
- void starpu_unistd_global_unplug (void *base);
- int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
-
+void * starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, size_t size STARPU_ATTRIBUTE_UNUSED);
+void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
+void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
+void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
+int starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * async_channel);
+void * starpu_unistd_global_plug (void *parameter);
+void starpu_unistd_global_unplug (void *base);
+int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
+int starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+void starpu_unistd_global_wait_request(void * async_channel);
+int starpu_unistd_global_test_request(void * async_channel);
+int starpu_unistd_global_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size);
+int starpu_unistd_global_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size);
 #endif

+ 1 - 1
src/datawizard/coherency.c

@@ -36,9 +36,9 @@ unsigned _starpu_select_src_node(starpu_data_handle_t handle, unsigned destinati
 	/* first find a valid copy, either a STARPU_OWNER or a STARPU_SHARED */
 	unsigned node;
 
-	unsigned src_node_mask = 0;
 	size_t size = _starpu_data_get_size(handle);
 	double cost = INFINITY;
+	unsigned src_node_mask = 0;
 
 	for (node = 0; node < nnodes; node++)
 	{

+ 0 - 1
src/datawizard/coherency.h

@@ -27,7 +27,6 @@
 #include <common/fxt.h>
 #include <common/list.h>
 
-#include <datawizard/data_request.h>
 #include <datawizard/interfaces/data_interface.h>
 #include <datawizard/datastats.h>
 #include <datawizard/memstats.h>

+ 39 - 9
src/datawizard/copy_driver.c

@@ -27,6 +27,7 @@
 #include <starpu_opencl.h>
 #include <starpu_cuda.h>
 #include <profiling/profiling.h>
+#include <core/disk.h>
 
 #ifdef STARPU_SIMGRID
 #include <core/simgrid.h>
@@ -91,7 +92,7 @@ static unsigned communication_cnt = 0;
 static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 				    struct _starpu_data_replicate *src_replicate,
 				    struct _starpu_data_replicate *dst_replicate,
-				    struct _starpu_data_request *req STARPU_ATTRIBUTE_UNUSED)
+				    struct _starpu_data_request *req)
 {
 	unsigned src_node = src_replicate->memory_node;
 	unsigned dst_node = dst_replicate->memory_node;
@@ -400,15 +401,40 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 #endif
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_DISK_RAM):
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		if(copy_methods->any_to_any)
+			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+
+		else
+		{
+			struct starpu_disk_interface * disk_interface = (struct starpu_disk_interface *) dst_interface; 
+			void * obj = (void *) disk_interface->dev_handle;
+			void * ptr = NULL;
+			starpu_ssize_t size = 0;
+			handle->ops->pack_data(handle, src_node, &ptr, &size);
+			ret = _starpu_disk_full_write(src_node, dst_node, obj, ptr, size);
+			/* ptr is allocated in pack_data */
+			free(ptr);
+		}
 		break;
 		
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_CPU_RAM):
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		if(copy_methods->any_to_any) 
+			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+		else
+		{
+			struct starpu_disk_interface * disk_interface = (struct starpu_disk_interface *) src_interface; 
+			void * obj = (void *) disk_interface->dev_handle;
+			void * ptr = NULL;
+			size_t size = 0;
+			ret = _starpu_disk_full_read(src_node, dst_node, obj, &ptr, &size);
+			handle->ops->unpack_data(handle, dst_node, ptr, size); 
+			/* ptr is allocated in full_read */
+			free(ptr);
+		}
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_DISK_RAM):	
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 		
 	default:
@@ -485,9 +511,8 @@ int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_d
  */
 int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data)
 {
-#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL)
 	struct _starpu_async_channel *async_channel = async_data;
-#endif
+
 	enum starpu_node_kind src_kind = starpu_node_get_kind(src_node);
 	enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
 
@@ -579,19 +604,19 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 		return _starpu_disk_copy_src_to_disk(
 			(void*) src + src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size);
+			size, async_channel);
 	}
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_CPU_RAM):
 		return _starpu_disk_copy_disk_to_src(
 			(void*) src, src_offset, src_node,
 			(void*) dst + dst_offset, dst_node,
-			size);
+			size, async_channel);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_DISK_RAM):
 		return _starpu_disk_copy_disk_to_disk(
 			(void*) src, src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size);
+			size, async_channel);
 
 	default:
 		STARPU_ABORT();
@@ -650,6 +675,8 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 		_starpu_mic_wait_request_completion(&(async_channel->event.mic_event));
 		break;
 #endif
+	case STARPU_MAIN_RAM:
+		starpu_disk_wait_request(async_channel);
 	case STARPU_CPU_RAM:
 	default:
 		STARPU_ABORT();
@@ -706,6 +733,9 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 		success = _starpu_mic_request_is_complete(&(async_channel->event.mic_event));
 		break;
 #endif
+	case STARPU_DISK_RAM:
+		success = starpu_disk_test_request(async_channel);
+		break;
 	case STARPU_CPU_RAM:
 	default:
 		STARPU_ABORT();

+ 9 - 0
src/datawizard/copy_driver.h

@@ -18,6 +18,8 @@
 #ifndef __COPY_DRIVER_H__
 #define __COPY_DRIVER_H__
 
+#include <aio.h>
+
 #include <common/config.h>
 #include <datawizard/memory_nodes.h>
 #include "coherency.h"
@@ -48,6 +50,12 @@ struct _starpu_mic_async_event
 };
 #endif
 
+struct _starpu_disk_async_event
+{
+        struct aiocb _starpu_aiocb_disk;
+	unsigned memory_node;
+};
+
 /* this is a structure that can be queried to see whether an asynchronous
  * transfer has terminated or not */
 union _starpu_async_channel_event
@@ -69,6 +77,7 @@ union _starpu_async_channel_event
 #ifdef STARPU_USE_MIC
 	struct _starpu_mic_async_event mic_event;
 #endif
+	struct _starpu_disk_async_event disk_event;
 };
 
 struct _starpu_async_channel

+ 1 - 0
src/datawizard/interfaces/data_interface.h

@@ -34,6 +34,7 @@ union _starpu_interface
 	struct starpu_bcsr_interface bcsr;
 	struct starpu_variable_interface variable;
 	struct starpu_multiformat_interface multiformat;
+	struct starpu_disk_interface disk;
 };
 
 /* Some data interfaces or filters use this interface internally */

+ 7 - 8
src/drivers/disk/driver_disk.c

@@ -18,30 +18,29 @@
 #include <core/disk.h>
 #include <starpu_profiling.h>
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size)
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_CPU_RAM);
 	
-	_starpu_disk_write(dst_node, dst, src, dst_offset, size);
-	return 0;
+	return _starpu_disk_write(src_node, dst_node, dst, src, dst_offset, size, async_channel);
+	
 }
 
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size)
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(dst_node) == STARPU_CPU_RAM);
 
-	_starpu_disk_read(src_node, src, dst, src_offset, size);
-	return 0;
+	return _starpu_disk_read(src_node, dst_node, src, dst, src_offset, size, async_channel);
 }
 
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size)
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM);
 
        return _starpu_disk_copy(src_node, src, src_offset, 
 			       dst_node, dst, dst_offset,
-			       size); 
+			       size, async_channel); 
 
 }

+ 3 - 3
src/drivers/disk/driver_disk.h

@@ -17,10 +17,10 @@
 #ifndef __DRIVER_DISK_H__
 #define __DRIVER_DISK_H__
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size);
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel);
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size);
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * async_channel);
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size);
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * async_channel);
 
 #endif

+ 1 - 1
tests/disk/disk_compute.c

@@ -65,7 +65,7 @@ int main(int argc, char **argv)
 
 
 	/* register a disk */
-	int new_dd = starpu_disk_register(&starpu_disk_unistd_ops, (void *) base, 1024*1024*1);
+	int new_dd = starpu_disk_register(&starpu_disk_stdio_ops, (void *) base, 1024*1024*1);
 	/* can't write on /tmp/ */
 	if (new_dd == -ENOENT) goto enoent;