Browse Source

add new parameter to prepare asynchronous read/write

Corentin Salingue 12 years ago
parent
commit
d5928956db

+ 2 - 2
include/starpu_disk.h

@@ -24,8 +24,8 @@ struct starpu_disk_ops {
 	 void    (*free)   (void *base, void *obj, size_t size);
 	 void *  (*open)   (void *base, void *pos, size_t size);     /* open an existing file */
 	 void    (*close)  (void *base, void *obj, size_t size);
-	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size);        /* ~= pread */
-	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size); 
+	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk); 
+	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk); 
 	/* readv, writev, read2d, write2d, etc. */
 	 void *  (*plug)   (void *parameter);
 	 void    (*unplug) (void *base);

+ 5 - 5
src/core/disk.c

@@ -122,22 +122,22 @@ _starpu_disk_free(unsigned node, void *obj, size_t size)
 }
 
 ssize_t 
-_starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size)
+_starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size);
+	return disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size, _starpu_aiocb_disk);
 }
 
 
 ssize_t 
-_starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size)
+_starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	int pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size);
+	return disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size, _starpu_aiocb_disk);
 }
 
 int
-_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size)
+_starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * _starpu_aiocb_disk)
 {
 	int pos_src = get_location_with_node(node_src);
 	int pos_dst = get_location_with_node(node_dst);

+ 3 - 3
src/core/disk.h

@@ -28,11 +28,11 @@ void * _starpu_disk_alloc (unsigned node, size_t size);
 
 void _starpu_disk_free (unsigned node, void *obj, size_t size);
 
-ssize_t _starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size);
+ssize_t _starpu_disk_read(unsigned node, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
 
-ssize_t _starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size);
+ssize_t _starpu_disk_write(unsigned node, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
 
-int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size);
+int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * _starpu_aiocb_disk);
 
 /* interface to compare memory disk */
 

+ 7 - 8
src/core/disk_ops/disk_stdio.c

@@ -170,14 +170,13 @@ starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size S
 
 /* read the memory disk */
 static ssize_t 
-starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
+		int res = fseek(tmp->file, offset, SEEK_SET); 
+		STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
-	int res = fseek(tmp->file, offset, SEEK_SET); 
-	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
-
-	ssize_t nb = fread (buf, 1, size, tmp->file);
+		ssize_t nb = fread (buf, 1, size, tmp->file);
 
 	return nb;
 }
@@ -185,7 +184,7 @@ starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off
 
 /* write on the memory disk */
 static ssize_t 
-starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
@@ -241,7 +240,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_DISK_MIN);
+		_starpu_disk_write(node, mem, buf, 0, SIZE_DISK_MIN, NULL);
 		/* clean cache memory */
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Slowness computation failed \n");
@@ -263,7 +262,7 @@ get_stdio_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1);
+		_starpu_disk_write(node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
 
 		int res = fflush (tmp->file);
 		STARPU_ASSERT_MSG(res == 0, "Latency computation failed");

+ 4 - 4
src/core/disk_ops/disk_unistd_o_direct.c

@@ -54,25 +54,25 @@ starpu_unistd_o_direct_open (void *base, void *pos, size_t size)
 
 /* read the memory disk */
 static ssize_t 
-starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+starpu_unistd_o_direct_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only read a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t) buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_read (base, obj, buf, offset, size);
+	return starpu_unistd_global_read (base, obj, buf, offset, size, _starpu_aiocb_disk);
 }
 
 
 /* write on the memory disk */
 static ssize_t 
-starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+starpu_unistd_o_direct_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "You can only write a multiple of page size %u Bytes (Here %u)", getpagesize(), (int) size);
 
 	STARPU_ASSERT_MSG((((uintptr_t)buf) % getpagesize()) == 0, "You have to use starpu_malloc function");
 
-	return starpu_unistd_global_write (base, obj, buf, offset, size);
+	return starpu_unistd_global_write (base, obj, buf, offset, size, _starpu_aiocb_disk);
 }
 
 

+ 4 - 4
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -136,7 +136,7 @@ starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_
 
 /* read the memory disk */
  ssize_t 
-starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
+starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
@@ -152,7 +152,7 @@ starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *
 
 /* write on the memory disk */
  ssize_t 
-starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size)
+starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
@@ -210,7 +210,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, 0, SIZE_BENCH);
+		_starpu_disk_write(node, mem, buf, 0, SIZE_BENCH, NULL);
 	}
 	gettimeofday(&end, NULL);
 	timing_slowness = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
@@ -227,7 +227,7 @@ get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node)
 	gettimeofday(&start, NULL);
 	for (iter = 0; iter < NITER; ++iter)
 	{
-		_starpu_disk_write(node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize());
+		_starpu_disk_write(node, mem, buf, rand() % (SIZE_BENCH -1) , getpagesize(), NULL);
 	}
 	gettimeofday(&end, NULL);
 	timing_latency = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));

+ 2 - 2
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -28,8 +28,8 @@ struct starpu_unistd_global_obj {
  void starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
  void * starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, void *pos, size_t size);
  void starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED);
- ssize_t starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size);
- ssize_t starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size);
+ ssize_t starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
+ ssize_t starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk);
  void * starpu_unistd_global_plug (void *parameter);
  void starpu_unistd_global_unplug (void *base);
  int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node);

+ 8 - 12
src/datawizard/copy_driver.c

@@ -1,7 +1,4 @@
-/* StarPU --- Runtime system for heterogeneous multicore architectures.
- *
- * Copyright (C) 2010-2013  Université de Bordeaux 1
- * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
+/* StarPU --- Runtime system for heterogeneous multicore architectures.  * * Copyright (C) 2010-2013  Université de Bordeaux 1 * Copyright (C) 2010, 2011, 2013  Centre National de la Recherche Scientifique
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -400,15 +397,15 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 #endif
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_DISK_RAM):
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 		
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_CPU_RAM):
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_DISK_RAM):	
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, NULL);
+		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 		
 	default:
@@ -485,9 +482,8 @@ int STARPU_ATTRIBUTE_WARN_UNUSED_RESULT _starpu_driver_copy_data_1_to_1(starpu_d
  */
 int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, uintptr_t dst, size_t dst_offset, unsigned dst_node, size_t size, void *async_data)
 {
-#if defined(STARPU_USE_CUDA) || defined(STARPU_USE_OPENCL)
 	struct _starpu_async_channel *async_channel = async_data;
-#endif
+
 	enum starpu_node_kind src_kind = starpu_node_get_kind(src_node);
 	enum starpu_node_kind dst_kind = starpu_node_get_kind(dst_node);
 
@@ -579,19 +575,19 @@ int starpu_interface_copy(uintptr_t src, size_t src_offset, unsigned src_node, u
 		return _starpu_disk_copy_src_to_disk(
 			(void*) src + src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size);
+			size, &async_channel->event._starpu_aiocb_disk);
 	}
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_CPU_RAM):
 		return _starpu_disk_copy_disk_to_src(
 			(void*) src, src_offset, src_node,
 			(void*) dst + dst_offset, dst_node,
-			size);
+			size, &async_channel->event._starpu_aiocb_disk);
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM, STARPU_DISK_RAM):
 		return _starpu_disk_copy_disk_to_disk(
 			(void*) src, src_offset, src_node,
 			(void*) dst, dst_offset, dst_node,
-			size);
+			size, &async_channel->event._starpu_aiocb_disk);
 
 	default:
 		STARPU_ABORT();

+ 3 - 0
src/datawizard/copy_driver.h

@@ -18,6 +18,8 @@
 #ifndef __COPY_DRIVER_H__
 #define __COPY_DRIVER_H__
 
+#include <aio.h>
+
 #include <common/config.h>
 #include <datawizard/memory_nodes.h>
 #include "coherency.h"
@@ -69,6 +71,7 @@ union _starpu_async_channel_event
 #ifdef STARPU_USE_MIC
 	struct _starpu_mic_async_event mic_event;
 #endif
+	struct aiocb _starpu_aiocb_disk;
 };
 
 struct _starpu_async_channel

+ 6 - 6
src/drivers/disk/driver_disk.c

@@ -18,30 +18,30 @@
 #include <core/disk.h>
 #include <starpu_profiling.h>
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size)
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_CPU_RAM);
 	
-	_starpu_disk_write(dst_node, dst, src, dst_offset, size);
+	_starpu_disk_write(dst_node, dst, src, dst_offset, size, _starpu_aiocb_disk);
 	return 0;
 }
 
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size)
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
 {
 	STARPU_ASSERT(starpu_node_get_kind(dst_node) == STARPU_CPU_RAM);
 
-	_starpu_disk_read(src_node, src, dst, src_offset, size);
+	_starpu_disk_read(src_node, src, dst, src_offset, size, _starpu_aiocb_disk);
 	return 0;
 }
 
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size)
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk)
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_DISK_RAM && starpu_node_get_kind(dst_node) == STARPU_DISK_RAM);
 
        return _starpu_disk_copy(src_node, src, src_offset, 
 			       dst_node, dst, dst_offset,
-			       size); 
+			       size, _starpu_aiocb_disk); 
 
 }

+ 3 - 3
src/drivers/disk/driver_disk.h

@@ -17,10 +17,10 @@
 #ifndef __DRIVER_DISK_H__
 #define __DRIVER_DISK_H__
 
-int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size);
+int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
 
-int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size);
+int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_node, void * dst, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
 
-int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size);
+int _starpu_disk_copy_disk_to_disk(void * src, size_t src_offset, unsigned src_node, void * dst, size_t dst_offset, unsigned dst_node, size_t size, void * _starpu_aiocb_disk);
 
 #endif