浏览代码

Improve the disk documentation, and clean the interface a bit

Samuel Thibault 10 年之前
父节点
当前提交
ae1892fa7f

+ 15 - 1
doc/doxygen/chapters/15out_of_core.doxy

@@ -12,6 +12,20 @@ When using StarPU, one may need to store more data than what the main memory
 (RAM) can store. This part describes the method to add a new memory node on a
 disk and to use it.
 
+The principle is that one first registers a disk location, seen by StarPU as
+a void*, which can be for instance a Unix path for the stdio or unistd case,
+or a database file path for a leveldb case, etc. The disk backend opens this
+place with the plug method.
+
+If the disk backend provides an alloc method, StarPU can then start using it
+to allocate room and store data there with the write method, without user
+intervention.
+
+The user can also use starpu_disk_open to explicitly open an object within the
+disk, e.g. a file name in the stdio or unistd cases, or a database key in the
+leveldb case, and then use starpu_*_register functions to turn it into a StarPU
+data handle. StarPU will then automatically read and write data as appropriate.
+
 \section UseANewDiskMemory Use a new disk memory
 
 To use a disk memory node, you have to register it with this function:
@@ -32,7 +46,7 @@ take some time.
 <strong>Warning: the size thus has to be at least 1 MB!</strong> 
 
 StarPU will automatically try to evict unused data to this new disk. One can
-also use the standard StarPU node API, see the \ref API_Standard_Memory_Library
+also use the standard StarPU memory node API, see the \ref API_Standard_Memory_Library
 and the \ref API_Data_Interfaces .
 
 The disk is unregistered during the starpu_shutdown().

+ 71 - 24
doc/doxygen/chapters/api/data_out_of_core.doxy

@@ -10,42 +10,89 @@
 \struct starpu_disk_ops
 \ingroup API_Out_Of_Core
 This is a set of functions to manipulate datas on disk.
-\var starpu_disk_ops::alloc
-Create a new location for datas
-\var starpu_disk_ops::free
-Free an allocated data
-\var starpu_disk_ops::open
-Open an existing location of datas
-\var starpu_disk_ops::close
-Close without delete a location of datas
-\var starpu_disk_ops::read
-Read a data
-\var starpu_disk_ops::write
-Write a data
-\var starpu_disk_ops::plug
-Connect a disk memory
-\var starpu_disk_ops::unplug
-Disconnect a disk memory
-\var starpu_disk_ops::copy
-Copy disk to disk
-\var starpu_disk_ops::bandwidth
-Measue the bandwidth and the latency for the disk
+
+\var void* (*starpu_disk_ops::alloc)(void *base, size_t size)
+Create a new location for datas of size \p size. This returns an opaque object pointer.
+
+\var void (*starpu_disk_ops::free)(void *base, void *obj, size_t size)
+Free a data \p obj previously allocated with \c alloc.
+
+\var void* (*starpu_disk_ops::open)(void *base, void *pos, size_t size)
+Open an existing location of datas, at a specific position \p pos dependent on the backend.
+
+\var void (*starpu_disk_ops::close)(void *base, void *obj, size_t size)
+Close, without deleting it, a location of datas \p obj.
+
+\var int (*starpu_disk_ops::read)(void *base, void *obj, void *buf, off_t offset, size_t size)
+Read \p size bytes of data from \p obj in \p base, at offset \p offset, and put
+into \p buf. Returns the actual number of read bytes.
+
+\var int (*starpu_disk_ops::write)(void *base, void *obj, const void *buf, off_t offset, size_t size)
+Write \p size bytes of data to \p obj in \p base, at offset \p offset, from \p buf. Returns 0 on success.
+
+\var void* (*starpu_disk_ops::plug) (void *parameters)
+Connect a disk memory at location \p parameter, and return a base as void*,
+which will be passed by StarPU to all other methods.
+
+\var void (*starpu_disk_ops::unplug) (void* base)
+Disconnect a disk memory \p base.
+
+\var void* (*starpu_disk_ops::async_read)(void *base, void *obj, void *buf, off_t offset, size_t size)
+Asynchronously read \p size bytes of data from \p obj in \p base, at offset \p
+offset, and put into \p buf. Returns a void* pointer that StarPU will pass to \c
+*_request methods for testing for the completion.
+
+\var void* (*starpu_disk_ops::async_write)(void *base, void *obj, const void *buf, off_t offset, size_t size)
+Asynchronously write \p size bytes of data to \p obj in \p base, at offset \p
+offset, from \p buf. Returns a void* pointer that StarPU will pass to \c
+*_request methods for testing for the completion.
+
+\var void* (*starpu_disk_ops::copy)(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
+Copy from offset \p offset_src of disk object \p obj_src in \p base_src to
+offset \p offset_dst of disk object \p obj_dst in \p base_dst. Returns a void*
+pointer that StarPU will pass to \c *_request methods for testing for the
+completion.
+
+\var int (*starpu_disk_ops::bandwidth) (unsigned node)
+Measure the bandwidth and the latency for the disk \p node and save it. Returns
+1 if it could measure it.
+
+\var void (*starpu_disk_ops::wait_request)(void *async_channel)
+Wait for completion of request \p async_channel returned by a previous
+asynchronous read, write or copy.
+
+\var void (*starpu_disk_ops::test_request)(void *async_channel)
+Test for completion of request \p async_channel returned by a previous
+asynchronous read, write or copy. Returns 1 on completion, 0 otherwise.
+
+\var void (*starpu_disk_ops::free_request)(void *async_channel)
+Free the request allocated by a previous asynchronous read, write or copy.
+
+\var int (*starpu_disk_ops::full_read)(void * base, void * obj, void ** ptr, size_t * size)
+Read all data from \p obj of \p base, from offset 0. Returns it in an allocated buffer \p ptr, of size \p size
+
+\var int (*starpu_disk_ops::full_write)(void * base, void * obj, void * ptr, size_t size)
+Write data in \p ptr to \p obj of \p base, from offset 0, and truncate \p obj to
+\p size, so that a \c full_read will get it.
 
 \fn int starpu_disk_register(struct starpu_disk_ops *func, void *parameter, size_t size)
 \ingroup API_Out_Of_Core
-Register a disk memory node with a set of functions to manipulate datas. <br />
+Register a disk memory node with a set of functions to manipulate datas. The \c
+plug member of \p func will be passed \p parameter, and return a \c base which will be passed to all \p func methods. <br />
 SUCCESS: return the disk node. <br />
 FAIL: return an error code. <br />
 The \p size must be at least 1 MB !
 
 \fn void *starpu_disk_open(unsigned node, void *pos, size_t size)
 \ingroup API_Out_Of_Core
-Add an existing file memory in a disk node. The \p pos is defined in the starpu_disk_ops. \p size: this is a size of your file.
-\p pos is the name of the file.
+Open an existing file memory in a disk node. \p size: this is a size of your
+file. \p pos is specific position dependent on the backend, given to the \c open
+method of the disk operations. This returns an opaque object pointer.
 
 \fn void starpu_disk_close(unsigned node, void *obj, size_t size)
 \ingroup API_Out_Of_Core
-Close an existing file memory opened with starpu_disk_open.
+Close an existing data opened with starpu_disk_open.
+
 
 \var starpu_disk_stdio_ops
 \ingroup API_Out_Of_Core

+ 4 - 3
include/starpu_disk.h

@@ -28,15 +28,16 @@ struct starpu_disk_ops {
 	 void    (*close)  (void *base, void *obj, size_t size);
 	 int     (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size);
 	 int     (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size);
-	 int     (*async_write)  (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
-	 int     (*async_read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	 void *  (*async_write)  (void *base, void *obj, void *buf, off_t offset, size_t size); 
+	 void *  (*async_read)   (void *base, void *obj, void *buf, off_t offset, size_t size); 
 	/* readv, writev, read2d, write2d, etc. */
 	 void *  (*plug)   (void *parameter);
 	 void    (*unplug) (void *base);
-	 int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
+	 void *  (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);
 	 int    (*bandwidth)    (unsigned node);
 	 void   (*wait_request) (void * async_channel);
 	 int    (*test_request) (void * async_channel);
+	 void   (*free_request)(void * async_channel);
 	 int	(*full_read)    (unsigned node, void * base, void * obj, void ** ptr, size_t * size);
 	 int 	(*full_write)   (unsigned node, void * base, void * obj, void * ptr, size_t size);
 };

+ 52 - 45
src/core/disk.c

@@ -123,26 +123,26 @@ _starpu_disk_free(unsigned node, void *obj, size_t size)
 
 /* src_node == disk node and dst_node == STARPU_MAIN_RAM */
 int 
-_starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+_starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel * channel)
 {
 	int pos = get_location_with_node(src_node);
-        int values = -1;
 
-        if (async_channel != NULL)
+        if (channel != NULL)
 	{
-		struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel; 
-		channel->type = STARPU_DISK_RAM;
-		channel->event.disk_event.memory_node = src_node;	
-	
-		if (disk_register_list[pos]->functions->async_read != NULL)
+		if (disk_register_list[pos]->functions->async_read == NULL)
+			channel = NULL;
+		else
 		{
+			channel->type = STARPU_DISK_RAM;
+			channel->event.memory_node = src_node;	
+
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			values = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+			channel->event.disk_event = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 		}
 	}
 	/* asynchronous request failed or synchronous request is asked */	
-	if (async_channel == NULL || values < 0)
+	if (channel == NULL || !channel->event.disk_event)
 	{
 		disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size);
 		return 0;
@@ -150,40 +150,28 @@ _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED,
 	return -EAGAIN;
 }
 
-int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
-{
-	int pos = get_location_with_node(src_node);
-        return disk_register_list[pos]->functions->full_read(src_node, disk_register_list[pos]->base, obj, ptr, size);
-}
-
-int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void * obj, void * ptr, size_t size)
-{
-	int pos = get_location_with_node(dst_node);
-        return disk_register_list[pos]->functions->full_write(dst_node, disk_register_list[pos]->base, obj, ptr, size);
-}
-
 /* src_node == STARPU_MAIN_RAM and dst_node == disk node */
 int 
-_starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+_starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel * channel)
 {
 	int pos = get_location_with_node(dst_node);
-        int values = -1;
 
-        if (async_channel != NULL)
+        if (channel != NULL)
         {
-        	struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
-		channel->type = STARPU_DISK_RAM;
-		channel->event.disk_event.memory_node = dst_node;
-
-		if (disk_register_list[pos]->functions->async_read != NULL)
+		if (disk_register_list[pos]->functions->async_write == NULL)
+			channel = NULL;
+		else
                 {
-			 _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			values = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+			channel->type = STARPU_DISK_RAM;
+			channel->event.memory_node = dst_node;
+
+			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
+			channel->event.disk_event = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size);
         		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 		}
         }
         /* asynchronous request failed or synchronous request is asked */
-        if (async_channel == NULL || values < 0)
+	if (channel == NULL || !channel->event.disk_event)
         {
 		disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size);
         	return 0;
@@ -193,14 +181,28 @@ _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node,
 }
 
 int
-_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel)
+_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, struct _starpu_async_channel* channel)
 {
 	int pos_src = get_location_with_node(node_src);
 	int pos_dst = get_location_with_node(node_dst);
 	/* both nodes have same copy function */
-	return disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src, 
+	channel->event.disk_event = disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src, 
 						        disk_register_list[pos_dst]->base, obj_dst, offset_dst,
-							size, async_channel);
+							size);
+	STARPU_ASSERT(channel->event.disk_event);
+	return -EAGAIN;
+}
+
+int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void * obj, void ** ptr, size_t * size)
+{
+	int pos = get_location_with_node(src_node);
+        return disk_register_list[pos]->functions->full_read(src_node, disk_register_list[pos]->base, obj, ptr, size);
+}
+
+int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void * obj, void * ptr, size_t size)
+{
+	int pos = get_location_with_node(dst_node);
+        return disk_register_list[pos]->functions->full_write(dst_node, disk_register_list[pos]->base, obj, ptr, size);
 }
 
 void * 
@@ -219,21 +221,23 @@ starpu_disk_close(unsigned node, void *obj, size_t size)
 
 void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
 {
-	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
-	disk_register_list[position]->functions->wait_request((void *) async_channel);
+	int position = get_location_with_node(async_channel->event.memory_node);
+	disk_register_list[position]->functions->wait_request(async_channel->event.disk_event);
 }
 
 int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
 {
-	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
-	if (disk_register_list[position]->functions->test_request != NULL && 
-	    disk_register_list[position]->functions->async_write != NULL &&
-	    disk_register_list[position]->functions->async_read != NULL)
-		return disk_register_list[position]->functions->test_request((void *) async_channel);
-	/* one of these functions is not defined => sync mode*/
-	return 1; 
+	int position = get_location_with_node(async_channel->event.memory_node);
+	return disk_register_list[position]->functions->test_request(async_channel->event.disk_event);
 }	
 
+void starpu_disk_free_request(struct _starpu_async_channel *async_channel)
+{
+	int position = get_location_with_node(async_channel->event.memory_node);
+	if (async_channel->event.disk_event)
+		disk_register_list[position]->functions->free_request(async_channel->event.disk_event);
+}
+
 static void 
 add_disk_in_list(unsigned node,  struct starpu_disk_ops * func, void * base)
 {
@@ -271,6 +275,9 @@ add_disk_in_list(unsigned node,  struct starpu_disk_ops * func, void * base)
 static int
 get_location_with_node(unsigned node)
 {
+#ifdef STARPU_DEVEL
+#warning optimize with a MAXNODE array
+#endif
 	int i;
 	for(i = 0; i <= disk_number; ++i)
 		if (disk_register_list[i]->node == node)

+ 5 - 4
src/core/disk.h

@@ -35,21 +35,22 @@ void * _starpu_disk_alloc (unsigned node, size_t size);
 
 void _starpu_disk_free (unsigned node, void *obj, size_t size);
 /* src_node is a disk node, dst_node is for the moment the STARPU_MAIN_RAM */
-int _starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int _starpu_disk_read(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel * async_channel);
 /* src_node is for the moment the STARU_MAIN_RAM, dst_node is a disk node */ 
-int _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+int _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel * async_channel);
 
 int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void * obj, void ** ptr, size_t * size);
 int _starpu_disk_full_write(unsigned src_node, unsigned dst_node, void * obj, void * ptr, size_t size);
 
-int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
+int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, struct _starpu_async_channel * async_channel);
 
 /* force the request to compute */
 void starpu_disk_wait_request(struct _starpu_async_channel *async_channel);
 /* return 1 if the request is finished, 0 if not finished */
 int starpu_disk_test_request(struct _starpu_async_channel *async_channel);
-/* interface to compare memory disk */
+void starpu_disk_free_request(struct _starpu_async_channel *async_channel);
 
+/* interface to compare memory disk */
 int _starpu_is_same_kind_disk(unsigned node1, unsigned node2);
 
 /* change disk flag */

+ 2 - 0
src/core/disk_ops/disk_leveldb.cpp

@@ -330,6 +330,7 @@ struct starpu_disk_ops starpu_disk_leveldb_ops = {
 	.bandwidth = get_leveldb_bandwidth_between_disk_and_main_ram,
 	.wait_request = NULL,
 	.test_request = NULL,
+	.free_request = NULL,
 	.full_read = starpu_leveldb_full_read,
 	.full_write = starpu_leveldb_full_write
 };
@@ -349,6 +350,7 @@ struct starpu_disk_ops starpu_disk_leveldb_ops = {
 	get_leveldb_bandwidth_between_disk_and_main_ram,
 	NULL,
 	NULL,
+	NULL,
 	starpu_leveldb_full_read,
 	starpu_leveldb_full_write
 };

+ 1 - 0
src/core/disk_ops/disk_unistd.c

@@ -69,6 +69,7 @@ struct starpu_disk_ops starpu_disk_unistd_ops = {
 	.async_write = starpu_unistd_global_async_write,
 	.wait_request = starpu_unistd_global_wait_request,
 	.test_request = starpu_unistd_global_test_request,
+	.free_request = starpu_unistd_global_free_request,
 #endif
         .full_read = starpu_unistd_global_full_read,
         .full_write = starpu_unistd_global_full_write

+ 1 - 0
src/core/disk_ops/disk_unistd_o_direct.c

@@ -103,6 +103,7 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops = {
         .async_write = starpu_unistd_global_async_write,
         .wait_request = starpu_unistd_global_wait_request,
         .test_request = starpu_unistd_global_test_request,
+	.free_request = starpu_unistd_global_free_request,
 #endif
 	.full_read = starpu_unistd_global_full_read,
 	.full_write = starpu_unistd_global_full_write

+ 32 - 26
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -188,15 +188,12 @@ starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *
 
 
 #ifdef HAVE_AIO_H
-int
-starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+void *
+starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
-        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
+        struct starpu_unistd_global_obj * tmp = obj;
 
-        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
-        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk;
-
-        memset(aiocb, 0, sizeof(struct aiocb));
+        struct aiocb *aiocb = calloc(1,sizeof(*aiocb));
 
         aiocb->aio_fildes = tmp->descriptor;
         aiocb->aio_offset = offset;
@@ -205,7 +202,13 @@ starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj,
         aiocb->aio_reqprio = 0;
         aiocb->aio_lio_opcode = LIO_NOP;
 
-        return aio_read(aiocb);
+        if (aio_read(aiocb) < 0)
+        {
+                free(aiocb);
+                aiocb = NULL;
+        }
+
+        return aiocb;
 }
 #endif
 
@@ -241,14 +244,11 @@ starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const
 
 
 #ifdef HAVE_AIO_H
-int
-starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel)
+void *
+starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
-        struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
-
-        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
-        struct aiocb *aiocb = &channel->event.disk_event._starpu_aiocb_disk ;
-        memset(aiocb, 0, sizeof(struct aiocb));
+        struct starpu_unistd_global_obj * tmp = obj;
+        struct aiocb *aiocb = calloc(1,sizeof(*aiocb));
 
         aiocb->aio_fildes = tmp->descriptor;
         aiocb->aio_offset = offset;
@@ -257,7 +257,13 @@ starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj,
         aiocb->aio_reqprio = 0;
         aiocb->aio_lio_opcode = LIO_NOP;
 
-        return aio_write(aiocb);
+        if (aio_write(aiocb) < 0)
+        {
+                free(aiocb);
+                aiocb = NULL;
+        }
+
+        return aiocb;
 }
 #endif
 
@@ -386,16 +392,13 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 void
 starpu_unistd_global_wait_request(void * async_channel)
 {
-        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
-        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
-        const struct aiocb * list[1];
-        list[0] = aiocb;
+        const struct aiocb * aiocb = async_channel;
         int values = -1;
         int error_disk = EAGAIN;
         while(values < 0 || error_disk == EAGAIN)
         {
                 /* Wait the answer of the request TIMESTAMP IS NULL */
-                values = aio_suspend(list, 1, NULL);
+                values = aio_suspend(&aiocb, 1, NULL);
                 error_disk = errno;
         }
 }
@@ -407,15 +410,12 @@ starpu_unistd_global_test_request(void * async_channel)
         time_wait_request.tv_sec = 0;
         time_wait_request.tv_nsec = 0;
 
-        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
-        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
-        const struct aiocb * list[1];
-        list[0] = aiocb;
+        const struct aiocb * aiocb = async_channel;
         int values = -1;
         int error_disk = EAGAIN;
 
         /* Wait the answer of the request */
-        values = aio_suspend(list, 1, &time_wait_request);
+        values = aio_suspend(&aiocb, 1, &time_wait_request);
         error_disk = errno;
         /* request is finished */
         if (values == 0)
@@ -426,4 +426,10 @@ starpu_unistd_global_test_request(void * async_channel)
         /* an error occured */
         STARPU_ABORT();
 }
+
+void
+starpu_unistd_global_free_request(void *async_channel)
+{
+        free(async_channel);
+}
 #endif

+ 3 - 2
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -40,10 +40,11 @@ int starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, c
 void * starpu_unistd_global_plug (void *parameter);
 void starpu_unistd_global_unplug (void *base);
 int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);
-int starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
-int starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel);
+void* starpu_unistd_global_async_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size);
+void* starpu_unistd_global_async_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size);
 void starpu_unistd_global_wait_request(void * async_channel);
 int starpu_unistd_global_test_request(void * async_channel);
+void starpu_unistd_global_free_request(void * async_channel);
 int starpu_unistd_global_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size);
 int starpu_unistd_global_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size);
 #endif

+ 4 - 4
src/datawizard/copy_driver.c

@@ -356,7 +356,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 				STARPU_ASSERT(copy_methods->any_to_any);
 				ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
 			}
-			_starpu_mic_init_event(&(req->async_channel.event.mic_event), dst_node);
+			_starpu_mic_init_event(&(req->async_channel.event), dst_node);
 		}
 		break;
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_MIC_RAM,STARPU_CPU_RAM):
@@ -381,7 +381,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 				STARPU_ASSERT(copy_methods->any_to_any);
 				ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, &req->async_channel);
 			}
-			_starpu_mic_init_event(&(req->async_channel.event.mic_event), src_node);
+			_starpu_mic_init_event(&(req->async_channel.event), src_node);
 		}
 		break;
 #endif
@@ -688,7 +688,7 @@ void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_
 #endif
 #ifdef STARPU_USE_MIC
 	case STARPU_MIC_RAM:
-		_starpu_mic_wait_request_completion(&(async_channel->event.mic_event));
+		_starpu_mic_wait_request_completion(&(async_channel->event));
 		break;
 #endif
 	case STARPU_MAIN_RAM:
@@ -746,7 +746,7 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 #endif
 #ifdef STARPU_USE_MIC
 	case STARPU_MIC_RAM:
-		success = _starpu_mic_request_is_complete(&(async_channel->event.mic_event));
+		success = _starpu_mic_request_is_complete(&(async_channel->event));
 		break;
 #endif
 	case STARPU_DISK_RAM:

+ 5 - 12
src/datawizard/copy_driver.h

@@ -1,6 +1,6 @@
 /* StarPU --- Runtime system for heterogeneous multicore architectures.
  *
- * Copyright (C) 2010, 2012-2013  Université de Bordeaux
+ * Copyright (C) 2010, 2012-2014  Université de Bordeaux
  * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
@@ -51,25 +51,18 @@ struct _starpu_data_replicate;
  * signal is used to test asynchronous request. */
 struct _starpu_mic_async_event
 {
-	unsigned memory_node;
 	int mark;
 	uint64_t *signal;
 };
 #endif
 
-struct _starpu_disk_async_event
-{
-#ifdef HAVE_AIO_H
-        struct aiocb _starpu_aiocb_disk;
-#endif
-	unsigned memory_node;
-};
-
 /* this is a structure that can be queried to see whether an asynchronous
  * transfer has terminated or not */
 union _starpu_async_channel_event
 {
-	int dummy;
+	/* Only used for disks and mic atm */
+	unsigned memory_node;
+
 #ifdef STARPU_SIMGRID
 	struct {
 		unsigned finished;
@@ -86,7 +79,7 @@ union _starpu_async_channel_event
 #ifdef STARPU_USE_MIC
 	struct _starpu_mic_async_event mic_event;
 #endif
-	struct _starpu_disk_async_event disk_event;
+	void *disk_event;
 };
 
 struct _starpu_async_channel

+ 9 - 0
src/datawizard/data_request.c

@@ -19,6 +19,7 @@
 #include <common/config.h>
 #include <common/utils.h>
 #include <datawizard/datawizard.h>
+#include <core/disk.h>
 
 /* TODO: This should be tuned according to driver capabilities
  * Data interfaces should also have to declare how many asynchronous requests
@@ -93,6 +94,14 @@ static void starpu_data_request_destroy(struct _starpu_data_request *r)
 
 	STARPU_ASSERT(r->dst_replicate->request[node] == r);
 	r->dst_replicate->request[node] = NULL;
+
+	switch (r->async_channel.type) {
+		case STARPU_DISK_RAM:
+			starpu_disk_free_request(&r->async_channel);
+			break;
+		default:
+			break;
+	}
 	//fprintf(stderr, "DESTROY REQ %p (%d) refcnt %d\n", r, node, r->refcnt);
 	_starpu_data_request_delete(r);
 }

+ 18 - 18
src/drivers/mic/driver_mic_source.c

@@ -437,7 +437,7 @@ int _starpu_mic_copy_mic_to_ram_async(void *src, unsigned src_node, void *dst, u
 }
 
 /* Initialize a _starpu_mic_async_event. */
-int _starpu_mic_init_event(struct _starpu_mic_async_event *event, unsigned memory_node)
+int _starpu_mic_init_event(struct _starpu_async_channel_event *event, unsigned memory_node)
 {
 	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(memory_node);
 	scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
@@ -445,60 +445,60 @@ int _starpu_mic_init_event(struct _starpu_mic_async_event *event, unsigned memor
 	event->memory_node = memory_node;
 
 	/* Address of allocation must be multiple of the page size. */
-	if (posix_memalign((void **)&(event->signal), 0x1000, sizeof(*(event->signal))) != 0)
+	if (posix_memalign((void **)&(event->mic_event.signal), 0x1000, sizeof(*(event->mic_event.signal))) != 0)
 		return -ENOMEM;
-	*(event->signal) = 0;
+	*(event->mic_event.signal) = 0;
 
 	/* The size pass to scif_register is 0x1000 because it should be a multiple of the page size. */
-	if (scif_register(epd, event->signal, 0x1000, (off_t)(event->signal), SCIF_PROT_WRITE, SCIF_MAP_FIXED) < 0)
+	if (scif_register(epd, event->mic_event.signal, 0x1000, (off_t)(event->mic_event.signal), SCIF_PROT_WRITE, SCIF_MAP_FIXED) < 0)
 		STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
 	/* Mark for a futur wait. */
-	if (scif_fence_mark(epd, SCIF_FENCE_INIT_SELF, &(event->mark)) < 0)
+	if (scif_fence_mark(epd, SCIF_FENCE_INIT_SELF, &(event->mic_event.mark)) < 0)
 		STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
-	/* Tell to scif to write STARPU_MIC_REQUEST_COMPLETE in event->signal when the transfer is complete.
+	/* Tell to scif to write STARPU_MIC_REQUEST_COMPLETE in event->mic_event.signal when the transfer is complete.
 	 * We use this for test the end of a transfer. */
-	if (scif_fence_signal(epd, (off_t)event->signal, STARPU_MIC_REQUEST_COMPLETE, 0, 0, SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL) < 0)
+	if (scif_fence_signal(epd, (off_t)event->mic_event.signal, STARPU_MIC_REQUEST_COMPLETE, 0, 0, SCIF_FENCE_INIT_SELF | SCIF_SIGNAL_LOCAL) < 0)
 		STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
 	return 0;
 }
 
 /* Wait the end of the asynchronous request */
-void _starpu_mic_wait_request_completion(struct _starpu_mic_async_event *event)
+void _starpu_mic_wait_request_completion(struct _starpu_async_channel_event *event)
 {
-	if (event->signal != NULL)
+	if (event->mic_event.signal != NULL)
 	{
 		const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(event->memory_node);
 		scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
 
-		if (scif_fence_wait(epd, event->mark) < 0)
+		if (scif_fence_wait(epd, event->mic_event.mark) < 0)
 			STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
-		if (scif_unregister(epd, (off_t)(event->signal), 0x1000) < 0)
+		if (scif_unregister(epd, (off_t)(event->mic_event.signal), 0x1000) < 0)
 			STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
-		free(event->signal);
-		event->signal = NULL;
+		free(event->mic_event.signal);
+		event->mic_event.signal = NULL;
 	}
 }
 
 /* Test if a asynchronous request is end.
  * Return 1 if is end, 0 else. */
-int _starpu_mic_request_is_complete(struct _starpu_mic_async_event *event)
+int _starpu_mic_request_is_complete(struct _starpu_async_channel_event *event)
 {
-	if (event->signal != NULL && *(event->signal) != STARPU_MIC_REQUEST_COMPLETE)
+	if (event->mic_event.signal != NULL && *(event->mic_event.signal) != STARPU_MIC_REQUEST_COMPLETE)
 		return 0;
 
 	const struct _starpu_mp_node *mp_node = _starpu_mic_src_get_mp_node_from_memory_node(event->memory_node);
 	scif_epd_t epd = mp_node->host_sink_dt_connection.mic_endpoint;
 
-	if (scif_unregister(epd, (off_t)(event->signal), 0x1000) < 0)
+	if (scif_unregister(epd, (off_t)(event->mic_event.signal), 0x1000) < 0)
 		STARPU_MIC_SRC_REPORT_SCIF_ERROR(errno);
 
-	free(event->signal);
-	event->signal = NULL;
+	free(event->mic_event.signal);
+	event->mic_event.signal = NULL;
 	return 1;
 }
 

+ 3 - 3
src/drivers/mic/driver_mic_source.h

@@ -70,9 +70,9 @@ int _starpu_mic_copy_mic_to_ram(void *src, unsigned src_node, void *dst, unsigne
 int _starpu_mic_copy_ram_to_mic_async(void *src, unsigned src_node STARPU_ATTRIBUTE_UNUSED, void *dst, unsigned dst_node, size_t size);
 int _starpu_mic_copy_mic_to_ram_async(void *src, unsigned src_node, void *dst, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, size_t size);
 
-int _starpu_mic_init_event(struct _starpu_mic_async_event *event, unsigned memory_node);
-void _starpu_mic_wait_request_completion(struct _starpu_mic_async_event *event);
-int _starpu_mic_request_is_complete(struct _starpu_mic_async_event *event);
+int _starpu_mic_init_event(struct _starpu_async_channel_event *event, unsigned memory_node);
+void _starpu_mic_wait_request_completion(struct _starpu_async_channel_event *event);
+int _starpu_mic_request_is_complete(struct _starpu_async_channel_event *event);
 
 void *_starpu_mic_src_worker(void *arg);