浏览代码

take NUMA to account when reading data from disk

Corentin Salingue 7 年之前
父节点
当前提交
40b8600006

+ 2 - 2
include/starpu_disk.h

@@ -38,13 +38,13 @@ struct starpu_disk_ops
 	 int     (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size);
 	 int     (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size);
 
-	 int	(*full_read)    (void * base, void * obj, void ** ptr, size_t * size);
+	 int	(*full_read)    (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 	 int 	(*full_write)   (void * base, void * obj, void * ptr, size_t size);
 
 	 void *  (*async_write)  (void *base, void *obj, void *buf, off_t offset, size_t size);
 	 void *  (*async_read)   (void *base, void *obj, void *buf, off_t offset, size_t size);
 
-	 void *	(*async_full_read)    (void * base, void * obj, void ** ptr, size_t * size);
+	 void *	(*async_full_read)    (void * base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 	 void *	(*async_full_write)   (void * base, void * obj, void * ptr, size_t size);
 
 	 void *  (*copy)   (void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);

+ 3 - 3
src/core/disk.c

@@ -249,7 +249,7 @@ int _starpu_disk_copy(unsigned node_src, void *obj_src, off_t offset_src, unsign
 	return -EAGAIN;
 }
 
-int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size, struct _starpu_async_channel *channel)
+int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void *obj, void **ptr, size_t *size, struct _starpu_async_channel *channel)
 {
         void *event = NULL;
 	int pos = get_location_with_node(src_node);
@@ -263,7 +263,7 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE
 			channel->event.disk_event.memory_node = src_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			event = disk_register_list[pos]->functions->async_full_read(disk_register_list[pos]->base, obj, ptr, size);
+			event = disk_register_list[pos]->functions->async_full_read(disk_register_list[pos]->base, obj, ptr, size, dst_node);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 
                         add_async_event(channel, event);
@@ -272,7 +272,7 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE
 	/* asynchronous request failed or synchronous request is asked */
 	if (channel == NULL || !event)
 	{
-		disk_register_list[pos]->functions->full_read(disk_register_list[pos]->base, obj, ptr, size);
+		disk_register_list[pos]->functions->full_read(disk_register_list[pos]->base, obj, ptr, size, dst_node);
 		return 0;
 	}
 	return -EAGAIN;

+ 1 - 0
src/core/disk.h

@@ -27,6 +27,7 @@ extern "C"
 #endif
 
 #include <datawizard/copy_driver.h>
+#include <datawizard/malloc.h>
 
 /* interface to manipulate memory disk */
 void * _starpu_disk_alloc (unsigned node, size_t size) STARPU_ATTRIBUTE_MALLOC;

+ 2 - 2
src/core/disk_ops/disk_hdf5.c

@@ -672,7 +672,7 @@ static int starpu_hdf5_test(void * event)
         return starpu_sem_trywait(finished) == 0;
 }
 
-static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size)
+static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size, unsigned dst_node)
 {
         struct starpu_hdf5_obj * dataObj = (struct starpu_hdf5_obj *) obj;
 
@@ -683,7 +683,7 @@ static int starpu_hdf5_full_read(void *base, void *obj, void **ptr, size_t *size
         *size = _starpu_get_size_obj(dataObj);
         _starpu_hdf5_protect_stop(base);
 
-        starpu_malloc_flags(ptr, *size, 0); 
+        _starpu_malloc_flags_on_node(dst_node, ptr, *size, 0); 
 
         starpu_hdf5_send_work(base, obj, *ptr, 0, *size, (void*) &finished, FULL_READ);
         

+ 2 - 2
src/core/disk_ops/disk_leveldb.cpp

@@ -136,7 +136,7 @@ static int starpu_leveldb_read(void *base, void *obj, void *buf, off_t offset, s
 	return 0;
 }
 
-static int starpu_leveldb_full_read(void *base, void *obj, void **ptr, size_t *size)
+static int starpu_leveldb_full_read(void *base, void *obj, void **ptr, size_t *size, unsigned dst_node)
 {
         struct starpu_leveldb_obj *tmp = (struct starpu_leveldb_obj *) obj;
         struct starpu_leveldb_base *base_tmp = (struct starpu_leveldb_base *) base;
@@ -150,7 +150,7 @@ static int starpu_leveldb_full_read(void *base, void *obj, void **ptr, size_t *s
 	STARPU_ASSERT(s.ok());
 
 	*size = value.length();
-	*ptr = malloc(*size);
+	_starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
 	STARPU_ASSERT(*ptr);
 
 	/* use buffer */

+ 2 - 2
src/core/disk_ops/disk_stdio.c

@@ -233,7 +233,7 @@ static int starpu_stdio_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void
 	return 0;
 }
 
-static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size)
+static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size, unsigned dst_node)
 {
 	struct starpu_stdio_obj *tmp = (struct starpu_stdio_obj *) obj;
 	FILE *f = tmp->file;
@@ -253,7 +253,7 @@ static int starpu_stdio_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj,
 	if (tmp->file)
 		STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
 	/* Alloc aligned buffer */
-	starpu_malloc_flags(ptr, *size, 0);
+	_starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
 	if (tmp->file)
 		STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
 

+ 2 - 2
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -320,7 +320,7 @@ void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *
 }
 #endif
 
-int starpu_unistd_global_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size)
+int starpu_unistd_global_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void **ptr, size_t *size, unsigned dst_node)
 {
         struct starpu_unistd_global_obj *tmp = (struct starpu_unistd_global_obj *) obj;
 	int fd = tmp->descriptor;
@@ -340,7 +340,7 @@ int starpu_unistd_global_full_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj
 		_starpu_unistd_reclose(fd);
 
 	/* Allocated aligned buffer */
-	starpu_malloc_flags(ptr, *size, 0);
+	_starpu_malloc_flags_on_node(dst_node, ptr, *size, 0);
 	return starpu_unistd_global_read(base, obj, *ptr, 0, *size);
 }
 

+ 1 - 1
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -46,6 +46,6 @@ void* starpu_unistd_global_async_write (void *base, void *obj, void *buf, off_t
 void starpu_unistd_global_wait_request(void * async_channel);
 int starpu_unistd_global_test_request(void * async_channel);
 void starpu_unistd_global_free_request(void * async_channel);
-int starpu_unistd_global_full_read(void *base, void * obj, void ** ptr, size_t * size);
+int starpu_unistd_global_full_read(void *base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 int starpu_unistd_global_full_write (void * base, void * obj, void * ptr, size_t size);
 #endif