Ver código fonte

fix bug for asynchronous input/output, it runs

Corentin Salingue 12 anos atrás
pai
commit
0d30c96822

+ 1 - 0
include/starpu_disk.h

@@ -34,6 +34,7 @@ struct starpu_disk_ops {
 	 int    (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
 	 int    (*bandwidth)    (unsigned node);
 	 void   (*wait_request) (void * async_channel);
+	 int    (*test_request) (void * async_channel);
 };
 
 

+ 7 - 1
src/core/disk.c

@@ -162,7 +162,7 @@ _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *buf, o
 		channel->type = STARPU_DISK_RAM;
 
                 _STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-		return disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
+		values = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size, async_channel);
         	_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
         }
         /* asynchronous request failed or synchronous request is asked */
@@ -206,6 +206,12 @@ void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
 	disk_register_list[position]->functions->wait_request((void *) async_channel);
 }
 
+int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
+{
+	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	return disk_register_list[position]->functions->test_request((void *) async_channel);
+}
+
 static void 
 add_disk_in_list(unsigned node,  struct starpu_disk_ops * func, void * base)
 {

+ 3 - 1
src/core/disk.h

@@ -34,8 +34,10 @@ int _starpu_disk_write(unsigned src_node, unsigned dst_node, void *obj, void *bu
 
 int _starpu_disk_copy(unsigned node_src, void* obj_src, off_t offset_src, unsigned node_dst, void* obj_dst, off_t offset_dst, size_t size, void * async_channel);
 
-
+/* force the request to compute */
 void starpu_disk_wait_request(struct _starpu_async_channel *async_channel);
+/* return 1 if the request is finished, 0 if not finished */
+int starpu_disk_test_request(struct _starpu_async_channel *async_channel);
 /* interface to compare memory disk */
 
 int _starpu_is_same_kind_disk(unsigned node1, unsigned node2);

+ 36 - 2
src/core/disk_ops/disk_stdio.c

@@ -20,6 +20,8 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <aio.h>
+#include <errno.h>
+#include <time.h>
 
 #include <starpu.h>
 #include <core/disk.h>
@@ -369,9 +371,40 @@ starpu_stdio_wait_request(void * async_channel)
 	const struct aiocb * list[1];
 	list[0] = aiocb;
 	int values = -1;
-	while(values < 0)
+	int error_disk = EAGAIN;
+	while(values < 0 || error_disk == EAGAIN)
+	{
 		/* Wait the answer of the request TIMESTAMP IS NULL */
 		values = aio_suspend(list, 1, NULL);
+		error_disk = errno;
+	}
+}
+
+static int
+starpu_stdio_test_request(void * async_channel)
+{
+	struct timespec time_wait_request;
+	time_wait_request.tv_sec = 0;
+	time_wait_request.tv_nsec = 0;
+
+        struct _starpu_async_channel * channel = (struct _starpu_async_channel *) async_channel;
+        const struct aiocb * aiocb = &channel->event.disk_event._starpu_aiocb_disk;
+        const struct aiocb * list[1];
+        list[0] = aiocb;
+        int values = -1;
+        int error_disk = EAGAIN;
+        
+	/* Wait the answer of the request */
+        values = aio_suspend(list, 1, &time_wait_request);
+        error_disk = errno;
+	/* request is finished */
+	if (values == 0)
+		return 1;
+	/* values == -1 */
+	if (error_disk == EAGAIN)
+		return 0;
+	/* an error occured */
+	STARPU_ABORT();	
 }
 
 struct starpu_disk_ops starpu_disk_stdio_ops = {
@@ -387,5 +420,6 @@ struct starpu_disk_ops starpu_disk_stdio_ops = {
 	.unplug = starpu_stdio_unplug,
 	.copy = NULL,
 	.bandwidth = get_stdio_bandwidth_between_disk_and_main_ram,
-	.wait_request = starpu_stdio_wait_request
+	.wait_request = starpu_stdio_wait_request,
+	.test_request = starpu_stdio_test_request
 };

+ 8 - 4
src/datawizard/copy_driver.c

@@ -399,20 +399,21 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_DISK_RAM):
 		if(copy_methods->any_to_any)
-			copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+
 		else
 			STARPU_ABORT();
 		break;
 		
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_CPU_RAM):
-		if(copy_methods->any_to_any)
-			copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+		if(copy_methods->any_to_any) 
+			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		else
 			STARPU_ABORT();
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_DISK_RAM):	
-		copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 		
 	default:
@@ -710,6 +711,9 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 		success = _starpu_mic_request_is_complete(&(async_channel->event.mic_event));
 		break;
 #endif
+	case STARPU_DISK_RAM:
+		success = starpu_disk_test_request(async_channel);
+		break;
 	case STARPU_CPU_RAM:
 	default:
 		STARPU_ABORT();

+ 3 - 4
src/drivers/disk/driver_disk.c

@@ -22,8 +22,8 @@ int _starpu_disk_copy_src_to_disk(void * src, unsigned src_node, void * dst, siz
 {
 	STARPU_ASSERT(starpu_node_get_kind(src_node) == STARPU_CPU_RAM);
 	
-	_starpu_disk_write(src_node, dst_node, dst, src, dst_offset, size, async_channel);
-	return 0;
+	return _starpu_disk_write(src_node, dst_node, dst, src, dst_offset, size, async_channel);
+	
 }
 
 
@@ -31,8 +31,7 @@ int _starpu_disk_copy_disk_to_src(void * src, size_t src_offset, unsigned src_no
 {
 	STARPU_ASSERT(starpu_node_get_kind(dst_node) == STARPU_CPU_RAM);
 
-	_starpu_disk_read(src_node, dst_node, src, dst, src_offset, size, async_channel);
-	return 0;
+	return _starpu_disk_read(src_node, dst_node, src, dst, src_offset, size, async_channel);
 }
 
 

+ 1 - 1
tests/disk/disk_compute.c

@@ -166,7 +166,7 @@ int main(int argc, char **argv)
 	for (j = 0; j < NX; ++j)
 		if (A[j] != C[j])
 		{
-//			printf("Fail A %d != C %d \n", A[j], C[j]);
+			printf("Fail A %d != C %d \n", A[j], C[j]);
 			try = 0;
 		}