Sfoglia il codice sorgente

Use a list to store multiple calls to asynchronous requests for disk functions

Corentin Salingue 7 anni fa
parent
commit
3cbbcdc1fb
4 ha cambiato i file con 128 aggiunte e 26 eliminazioni
  1. 107 17
      src/core/disk.c
  2. 15 0
      src/datawizard/copy_driver.c
  3. 6 1
      src/datawizard/copy_driver.h
  4. 0 8
      src/datawizard/data_request.c

+ 107 - 17
src/core/disk.c

@@ -56,6 +56,23 @@ static int size_register_list = 2;
 
 int starpu_disk_swap_node = -1;
 
+static void add_async_event(struct _starpu_async_channel * channel, void * event)
+{
+        if (!event)
+                return; 
+
+        if (channel->event.disk_event.requests == NULL)
+        {
+                channel->event.disk_event.requests = _starpu_disk_backend_event_list_new();
+        }
+
+        struct _starpu_disk_backend_event * disk_event = _starpu_disk_backend_event_new();
+        disk_event->backend_event = event;
+
+        /* Store event at the end of the list */
+        _starpu_disk_backend_event_list_push_back(channel->event.disk_event.requests, disk_event);
+}
+
 int starpu_disk_register(struct starpu_disk_ops *func, void *parameter, starpu_ssize_t size)
 {
 	STARPU_ASSERT_MSG(size < 0 || size >= STARPU_DISK_SIZE_MIN,"Minimum disk size is %u Bytes ! (Here %u) \n", (int) STARPU_DISK_SIZE_MIN, (int) size);
@@ -130,6 +147,7 @@ void _starpu_disk_free(unsigned node, void *obj, size_t size)
 /* src_node == disk node and dst_node == STARPU_MAIN_RAM */
 int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel *channel)
 {
+        void * event;
 	int pos = get_location_with_node(src_node);
 
         if (channel != NULL)
@@ -138,16 +156,17 @@ int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUS
 			channel = NULL;
 		else
 		{
-			channel->type = STARPU_DISK_RAM;
 			channel->event.disk_event.memory_node = src_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			channel->event.disk_event.backend_event = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size);
+			event = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+
+                        add_async_event(channel, event);
 		}
 	}
 	/* asynchronous request failed or synchronous request is asked */
-	if (channel == NULL || !channel->event.disk_event.backend_event)
+	if (channel == NULL || !event)
 	{
 		disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size);
 		return 0;
@@ -158,6 +177,7 @@ int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUS
 /* src_node == STARPU_MAIN_RAM and dst_node == disk node */
 int _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel *channel)
 {
+        void * event;
 	int pos = get_location_with_node(dst_node);
 
         if (channel != NULL)
@@ -166,16 +186,17 @@ int _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_n
 			channel = NULL;
 		else
                 {
-			channel->type = STARPU_DISK_RAM;
 			channel->event.disk_event.memory_node = dst_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			channel->event.disk_event.backend_event = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size);
+			event = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size);
         		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+
+                        add_async_event(channel, event);
 		}
         }
         /* asynchronous request failed or synchronous request is asked */
-	if (channel == NULL || !channel->event.disk_event.backend_event)
+	if (channel == NULL || !event)
         {
 		disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size);
         	return 0;
@@ -188,16 +209,20 @@ int _starpu_disk_copy(unsigned node_src, void *obj_src, off_t offset_src, unsign
 	int pos_src = get_location_with_node(node_src);
 	int pos_dst = get_location_with_node(node_dst);
 	/* both nodes have same copy function */
+        void * event;
 	channel->event.disk_event.memory_node = node_src;
-	channel->event.disk_event.backend_event = disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src,
+	event = disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src,
 											       disk_register_list[pos_dst]->base, obj_dst, offset_dst,
 											       size);
-	STARPU_ASSERT(channel->event.disk_event.backend_event);
+        add_async_event(channel, event);
+
+	STARPU_ASSERT(event);
 	return -EAGAIN;
 }
 
 int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size, struct _starpu_async_channel *channel)
 {
+        void * event;
 	int pos = get_location_with_node(src_node);
 
 	if (channel != NULL)
@@ -206,16 +231,17 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE
 			channel = NULL;
 		else
 		{
-			channel->type = STARPU_DISK_RAM;
 			channel->event.disk_event.memory_node = src_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			channel->event.disk_event.backend_event = disk_register_list[pos]->functions->async_full_read(disk_register_list[pos]->base, obj, ptr, size);
+			event = disk_register_list[pos]->functions->async_full_read(disk_register_list[pos]->base, obj, ptr, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+
+                        add_async_event(channel, event);
 		}
 	}
 	/* asynchronous request failed or synchronous request is asked */
-	if (channel == NULL || !channel->event.disk_event.backend_event)
+	if (channel == NULL || !event)
 	{
 		disk_register_list[pos]->functions->full_read(disk_register_list[pos]->base, obj, ptr, size);
 		return 0;
@@ -225,6 +251,7 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE
 
 int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *ptr, size_t size, struct _starpu_async_channel *channel)
 {
+        void * event;
 	int pos = get_location_with_node(dst_node);
 
 	if (channel != NULL)
@@ -233,16 +260,17 @@ int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned
 			channel = NULL;
 		else
 		{
-			channel->type = STARPU_DISK_RAM;
 			channel->event.disk_event.memory_node = dst_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			channel->event.disk_event.backend_event = disk_register_list[pos]->functions->async_full_write(disk_register_list[pos]->base, obj, ptr, size);
+			event = disk_register_list[pos]->functions->async_full_write(disk_register_list[pos]->base, obj, ptr, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
+
+                        add_async_event(channel, event);
 		}
 	}
 	/* asynchronous request failed or synchronous request is asked */
-	if (channel == NULL || !channel->event.disk_event.backend_event)
+	if (channel == NULL || !event)
 	{
 		disk_register_list[pos]->functions->full_write(disk_register_list[pos]->base, obj, ptr, size);
 		return 0;
@@ -265,20 +293,82 @@ void starpu_disk_close(unsigned node, void *obj, size_t size)
 void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
 {
 	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
-	disk_register_list[position]->functions->wait_request(async_channel->event.disk_event.backend_event);
+
+        if (async_channel->event.disk_event.requests != NULL && !_starpu_disk_backend_event_list_empty(async_channel->event.disk_event.requests))
+        {
+                struct _starpu_disk_backend_event * event = _starpu_disk_backend_event_list_begin(async_channel->event.disk_event.requests);
+                struct _starpu_disk_backend_event * next;
+
+                /* Wait all events in the list and remove them */
+                while (event != _starpu_disk_backend_event_list_end(async_channel->event.disk_event.requests))
+                {
+                        next = _starpu_disk_backend_event_list_next(event);
+
+                        disk_register_list[position]->functions->wait_request(event->backend_event);
+
+                        disk_register_list[position]->functions->free_request(event->backend_event);
+
+                        _starpu_disk_backend_event_list_erase(async_channel->event.disk_event.requests, event);
+
+                        _starpu_disk_backend_event_delete(event);
+
+                        event = next;
+                }
+
+                /* Remove the list because it doesn't contain any event */
+                _starpu_disk_backend_event_list_delete(async_channel->event.disk_event.requests);
+                async_channel->event.disk_event.requests = NULL;
+        }
 }
 
 int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
 {
 	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
-	return disk_register_list[position]->functions->test_request(async_channel->event.disk_event.backend_event);
+
+        if (async_channel->event.disk_event.requests != NULL && !_starpu_disk_backend_event_list_empty(async_channel->event.disk_event.requests))
+        {
+                struct _starpu_disk_backend_event * event = _starpu_disk_backend_event_list_begin(async_channel->event.disk_event.requests);
+                struct _starpu_disk_backend_event * next;
+
+                /* Wait all events in the list and remove them */
+                while (event != _starpu_disk_backend_event_list_end(async_channel->event.disk_event.requests))
+                {
+                        next = _starpu_disk_backend_event_list_next(event);
+
+                        int res = disk_register_list[position]->functions->test_request(event->backend_event);
+
+                                if (res)
+                                {
+                                        disk_register_list[position]->functions->free_request(event->backend_event);
+
+                                        _starpu_disk_backend_event_list_erase(async_channel->event.disk_event.requests, event);
+
+                                        _starpu_disk_backend_event_delete(event);
+                                }
+
+                        event = next;
+                }
+
+                /* Remove the list because it doesn't contain any event */
+                if (_starpu_disk_backend_event_list_empty(async_channel->event.disk_event.requests))
+                {
+                        _starpu_disk_backend_event_list_delete(async_channel->event.disk_event.requests);
+                        async_channel->event.disk_event.requests = NULL;
+                }
+        }
+
+	return async_channel->event.disk_event.requests == NULL;
 }
 
 void starpu_disk_free_request(struct _starpu_async_channel *async_channel)
 {
-	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
+/* It does not have any sense to use this function currently because requests are freed in test of wait functions */
+        STARPU_ABORT();
+
+/*	int position = get_location_with_node(async_channel->event.disk_event.memory_node);
 	if (async_channel->event.disk_event.backend_event)
 		disk_register_list[position]->functions->free_request(async_channel->event.disk_event.backend_event);
+*/
 }
 
 static int add_disk_in_list(unsigned node,  struct starpu_disk_ops *func, void *base)

+ 15 - 0
src/datawizard/copy_driver.c

@@ -521,6 +521,11 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 #endif
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_CPU_RAM,STARPU_DISK_RAM):
+                if (req && !starpu_asynchronous_copy_disabled())
+                {
+                        req->async_channel.type = STARPU_DISK_RAM;
+                        req->async_channel.event.disk_event.requests = NULL;
+                }
 		if(copy_methods->any_to_any)
 			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled() ? &req->async_channel : NULL);
 
@@ -541,6 +546,11 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_CPU_RAM):
+                if (req && !starpu_asynchronous_copy_disabled())
+                {
+                        req->async_channel.type = STARPU_DISK_RAM;
+                        req->async_channel.event.disk_event.requests = NULL;
+                }
 		if(copy_methods->any_to_any)
 			ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled()  ? &req->async_channel : NULL);
 		else
@@ -563,6 +573,11 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
 		break;
 
 	case _STARPU_MEMORY_NODE_TUPLE(STARPU_DISK_RAM,STARPU_DISK_RAM):
+                if (req && !starpu_asynchronous_copy_disabled())
+                {
+                        req->async_channel.type = STARPU_DISK_RAM;
+                        req->async_channel.event.disk_event.requests = NULL;
+                }
 		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
 		break;
 

+ 6 - 1
src/datawizard/copy_driver.h

@@ -71,10 +71,15 @@ struct _starpu_mpi_ms_async_event
 };
 #endif
 
+LIST_TYPE(_starpu_disk_backend_event,
+	void *backend_event;
+);
+        
+
 struct _starpu_disk_async_event
 {
 	unsigned memory_node;
-	void *backend_event;
+        struct _starpu_disk_backend_event_list * requests;
 };
 
 /* this is a structure that can be queried to see whether an asynchronous

+ 0 - 8
src/datawizard/data_request.c

@@ -118,14 +118,6 @@ static void _starpu_data_request_unlink(struct _starpu_data_request *r)
 
 static void _starpu_data_request_destroy(struct _starpu_data_request *r)
 {
-	switch (r->async_channel.type)
-	{
-		case STARPU_DISK_RAM:
-			starpu_disk_free_request(&r->async_channel);
-			break;
-		default:
-			break;
-	}
 	//fprintf(stderr, "DESTROY REQ %p (%d) refcnt %d\n", r, node, r->refcnt);
 	_starpu_data_request_delete(r);
 }