Nathalie Furmento %!s(int64=7) %!d(string=hai) anos
pai
achega
cc92dcccce

+ 3 - 4
mpi/examples/user_datatype/my_interface.c

@@ -44,15 +44,13 @@ void starpu_my_interface_compare_codelet_cpu(void *descr[], void *_args)
 	*compare = (d0 == d1 && c0 == c1);
 }
 
-static struct starpu_my_interface *myinterface = NULL;
-
 void _starpu_my_interface_datatype_allocate(MPI_Datatype *mpi_datatype)
 {
 	int ret;
-
 	int blocklengths[2] = {1, 1};
 	MPI_Aint displacements[2];
 	MPI_Datatype types[2] = {MPI_INT, MPI_CHAR};
+	struct starpu_my_interface *myinterface = NULL;
 	myinterface = malloc(sizeof(struct starpu_my_interface));
 
 	MPI_Address(myinterface, displacements);
@@ -65,6 +63,8 @@ void _starpu_my_interface_datatype_allocate(MPI_Datatype *mpi_datatype)
 
 	ret = MPI_Type_commit(mpi_datatype);
 	STARPU_ASSERT_MSG(ret == MPI_SUCCESS, "MPI_Type_commit failed");
+
+	free(myinterface);
 }
 
 void starpu_my_interface_datatype_allocate(starpu_data_handle_t handle, MPI_Datatype *mpi_datatype)
@@ -76,7 +76,6 @@ void starpu_my_interface_datatype_allocate(starpu_data_handle_t handle, MPI_Data
 void starpu_my_interface_datatype_free(MPI_Datatype *mpi_datatype)
 {
 	MPI_Type_free(mpi_datatype);
-	free(myinterface);
 }
 
 int starpu_my_interface_get_int(starpu_data_handle_t handle)

+ 5 - 0
mpi/examples/user_datatype/user_datatype.c

@@ -109,5 +109,10 @@ int main(int argc, char **argv)
 	starpu_mpi_shutdown();
 	starpu_shutdown();
 
+	if (rank == 0)
+	{
+		FPRINTF(stderr, "[node 0] %s\n", compare==1?"SUCCESS":"FAILURE");
+	}
+
 	return (rank == 0) ? !compare : 0;
 }

+ 77 - 101
src/core/disk.c

@@ -41,7 +41,6 @@
 
 struct disk_register
 {
-	unsigned node;
 	void *base;
 	struct starpu_disk_ops *functions;
 	/* disk condition (1 = all authorizations,  */
@@ -49,12 +48,9 @@ struct disk_register
 };
 
 static int add_disk_in_list(unsigned node, struct starpu_disk_ops *func, void *base);
-static inline unsigned get_location_with_node(unsigned node);
 
-static struct disk_register **disk_register_list = NULL;
-static unsigned memnode_to_disknode[STARPU_MAXNODES];
+static struct disk_register *disk_register_list[STARPU_MAXNODES];
 static int disk_number = 0;
-static int size_register_list = 2;
 
 int starpu_disk_swap_node = -1;
 
@@ -101,14 +97,14 @@ int starpu_disk_register(struct starpu_disk_ops *func, void *parameter, starpu_s
 	}
 
 	//Add bus for disk <-> disk copy
-	if (func->copy != NULL && disk_register_list != NULL)
+	if (func->copy != NULL)
 	{
 		int disk;
-		for (disk = 0; disk < size_register_list; disk++)
+		for (disk = 0; disk < STARPU_MAXNODES; disk++)
 			if (disk_register_list[disk] != NULL && disk_register_list[disk]->functions->copy != NULL && disk_register_list[disk]->functions->copy == func->copy)
 			{
-				_starpu_register_bus(disk_memnode, disk_register_list[disk]->node);
-				_starpu_register_bus(disk_register_list[disk]->node, disk_memnode);
+				_starpu_register_bus(disk_memnode, disk);
+				_starpu_register_bus(disk, disk_memnode);
 			}
 	}
 
@@ -137,32 +133,27 @@ int starpu_disk_register(struct starpu_disk_ops *func, void *parameter, starpu_s
 
 void _starpu_disk_unregister(void)
 {
-	if (disk_register_list)
-	{
-		int i;
+	int i;
 
-		/* search disk and delete it */
-		for (i = 0; i < size_register_list; ++i)
-		{
-			if (disk_register_list[i] == NULL)
-				continue;
+	/* search disk and delete it */
+	for (i = 0; i < STARPU_MAXNODES; ++i)
+	{
+		if (disk_register_list[i] == NULL)
+			continue;
 
-			_starpu_set_disk_flag(disk_register_list[i]->node, STARPU_DISK_NO_RECLAIM);
-			_starpu_free_all_automatically_allocated_buffers(disk_register_list[i]->node);
+		_starpu_set_disk_flag(i, STARPU_DISK_NO_RECLAIM);
+		_starpu_free_all_automatically_allocated_buffers(i);
 
-			/* don't forget to unplug */
-			disk_register_list[i]->functions->unplug(disk_register_list[i]->base);
-			free(disk_register_list[i]);
-			disk_register_list[i] = NULL;
+		/* don't forget to unplug */
+		disk_register_list[i]->functions->unplug(disk_register_list[i]->base);
+		free(disk_register_list[i]);
+		disk_register_list[i] = NULL;
 
-			disk_number--;
-		}
-
-		/* no disk in the list -> delete the list */
-		free(disk_register_list);
-		disk_register_list = NULL;
+		disk_number--;
 	}
 
+	/* no disk in the list -> delete the list */
+
 	STARPU_ASSERT_MSG(disk_number == 0, "Some disks are not unregistered !");
 }
 
@@ -170,32 +161,29 @@ void _starpu_disk_unregister(void)
 
 void *_starpu_disk_alloc(unsigned node, size_t size)
 {
-	unsigned pos = get_location_with_node(node);
-	return disk_register_list[pos]->functions->alloc(disk_register_list[pos]->base, size);
+	return disk_register_list[node]->functions->alloc(disk_register_list[node]->base, size);
 }
 
 void _starpu_disk_free(unsigned node, void *obj, size_t size)
 {
-	unsigned pos = get_location_with_node(node);
-	disk_register_list[pos]->functions->free(disk_register_list[pos]->base, obj, size);
+	disk_register_list[node]->functions->free(disk_register_list[node]->base, obj, size);
 }
 
 /* src_node == disk node and dst_node == STARPU_MAIN_RAM */
 int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel *channel)
 {
         void *event = NULL;
-	unsigned pos = get_location_with_node(src_node);
 
         if (channel != NULL)
 	{
-		if (disk_register_list[pos]->functions->async_read == NULL)
+		if (disk_register_list[src_node]->functions->async_read == NULL)
 			channel = NULL;
 		else
 		{
 			channel->event.disk_event.memory_node = src_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			event = disk_register_list[pos]->functions->async_read(disk_register_list[pos]->base, obj, buf, offset, size);
+			event = disk_register_list[src_node]->functions->async_read(disk_register_list[src_node]->base, obj, buf, offset, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 
                         add_async_event(channel, event);
@@ -204,7 +192,7 @@ int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUS
 	/* asynchronous request failed or synchronous request is asked */
 	if (channel == NULL || !event)
 	{
-		disk_register_list[pos]->functions->read(disk_register_list[pos]->base, obj, buf, offset, size);
+		disk_register_list[src_node]->functions->read(disk_register_list[src_node]->base, obj, buf, offset, size);
 		return 0;
 	}
 	return -EAGAIN;
@@ -214,18 +202,17 @@ int _starpu_disk_read(unsigned src_node, unsigned dst_node STARPU_ATTRIBUTE_UNUS
 int _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *buf, off_t offset, size_t size, struct _starpu_async_channel *channel)
 {
         void *event = NULL;
-	unsigned pos = get_location_with_node(dst_node);
 
         if (channel != NULL)
         {
-		if (disk_register_list[pos]->functions->async_write == NULL)
+		if (disk_register_list[dst_node]->functions->async_write == NULL)
 			channel = NULL;
 		else
                 {
 			channel->event.disk_event.memory_node = dst_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			event = disk_register_list[pos]->functions->async_write(disk_register_list[pos]->base, obj, buf, offset, size);
+			event = disk_register_list[dst_node]->functions->async_write(disk_register_list[dst_node]->base, obj, buf, offset, size);
         		_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 
                         add_async_event(channel, event);
@@ -234,7 +221,7 @@ int _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_n
         /* asynchronous request failed or synchronous request is asked */
 	if (channel == NULL || !event)
         {
-		disk_register_list[pos]->functions->write(disk_register_list[pos]->base, obj, buf, offset, size);
+		disk_register_list[dst_node]->functions->write(disk_register_list[dst_node]->base, obj, buf, offset, size);
         	return 0;
         }
         return -EAGAIN;
@@ -242,15 +229,37 @@ int _starpu_disk_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_n
 
 int _starpu_disk_copy(unsigned node_src, void *obj_src, off_t offset_src, unsigned node_dst, void *obj_dst, off_t offset_dst, size_t size, struct _starpu_async_channel *channel)
 {
-	unsigned pos_src = get_location_with_node(node_src);
-	unsigned pos_dst = get_location_with_node(node_dst);
 	/* both nodes have same copy function */
-        void * event;
-	channel->event.disk_event.memory_node = node_src;
-	event = disk_register_list[pos_src]->functions->copy(disk_register_list[pos_src]->base, obj_src, offset_src,
-											       disk_register_list[pos_dst]->base, obj_dst, offset_dst,
-											       size);
-        add_async_event(channel, event);
+        void * event = NULL;
+
+	if (channel)
+	{
+		channel->event.disk_event.memory_node = node_src;
+		event = disk_register_list[node_src]->functions->copy(disk_register_list[node_src]->base, obj_src, offset_src,
+								disk_register_list[node_dst]->base, obj_dst, offset_dst, size);
+		add_async_event(channel, event);
+	}
+
+	/* Something goes wrong with copy disk to disk... */
+	if (!event)
+	{
+		if (channel || (!channel && starpu_asynchronous_copy_disabled()))
+			disk_register_list[node_src]->functions->copy = NULL;
+
+		/* perform a read, and after a write... */
+		void * ptr;
+		int ret = _starpu_malloc_flags_on_node(STARPU_MAIN_RAM, &ptr, size, 0);
+		STARPU_ASSERT_MSG(ret == 0, "Cannot allocate %zu bytes to perform disk to disk operation", size);
+
+		ret = _starpu_disk_read(node_src, STARPU_MAIN_RAM, obj_src, ptr, offset_src, size, NULL);
+		STARPU_ASSERT_MSG(ret == 0, "Cannot read %zu bytes to perform disk to disk copy", size);
+		ret = _starpu_disk_write(STARPU_MAIN_RAM, node_dst, obj_dst, ptr, offset_dst, size, NULL);
+		STARPU_ASSERT_MSG(ret == 0, "Cannot write %zu bytes to perform disk to disk copy", size);
+
+		_starpu_free_flags_on_node(STARPU_MAIN_RAM, ptr, size, 0);
+
+		return 0;
+	}
 
 	STARPU_ASSERT(event);
 	return -EAGAIN;
@@ -259,18 +268,17 @@ int _starpu_disk_copy(unsigned node_src, void *obj_src, off_t offset_src, unsign
 int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void *obj, void **ptr, size_t *size, struct _starpu_async_channel *channel)
 {
         void *event = NULL;
-	unsigned pos = get_location_with_node(src_node);
 
 	if (channel != NULL)
 	{
-		if (disk_register_list[pos]->functions->async_full_read == NULL)
+		if (disk_register_list[src_node]->functions->async_full_read == NULL)
 			channel = NULL;
 		else
 		{
 			channel->event.disk_event.memory_node = src_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			event = disk_register_list[pos]->functions->async_full_read(disk_register_list[pos]->base, obj, ptr, size, dst_node);
+			event = disk_register_list[src_node]->functions->async_full_read(disk_register_list[src_node]->base, obj, ptr, size, dst_node);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 
                         add_async_event(channel, event);
@@ -279,7 +287,7 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void *obj, void
 	/* asynchronous request failed or synchronous request is asked */
 	if (channel == NULL || !event)
 	{
-		disk_register_list[pos]->functions->full_read(disk_register_list[pos]->base, obj, ptr, size, dst_node);
+		disk_register_list[src_node]->functions->full_read(disk_register_list[src_node]->base, obj, ptr, size, dst_node);
 		return 0;
 	}
 	return -EAGAIN;
@@ -288,18 +296,17 @@ int _starpu_disk_full_read(unsigned src_node, unsigned dst_node, void *obj, void
 int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned dst_node, void *obj, void *ptr, size_t size, struct _starpu_async_channel *channel)
 {
         void *event = NULL;
-	unsigned pos = get_location_with_node(dst_node);
 
 	if (channel != NULL)
 	{
-		if (disk_register_list[pos]->functions->async_full_write == NULL)
+		if (disk_register_list[dst_node]->functions->async_full_write == NULL)
 			channel = NULL;
 		else
 		{
 			channel->event.disk_event.memory_node = dst_node;
 
 			_STARPU_TRACE_START_DRIVER_COPY_ASYNC(src_node, dst_node);
-			event = disk_register_list[pos]->functions->async_full_write(disk_register_list[pos]->base, obj, ptr, size);
+			event = disk_register_list[dst_node]->functions->async_full_write(disk_register_list[dst_node]->base, obj, ptr, size);
 			_STARPU_TRACE_END_DRIVER_COPY_ASYNC(src_node, dst_node);
 
                         add_async_event(channel, event);
@@ -308,7 +315,7 @@ int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned
 	/* asynchronous request failed or synchronous request is asked */
 	if (channel == NULL || !event)
 	{
-		disk_register_list[pos]->functions->full_write(disk_register_list[pos]->base, obj, ptr, size);
+		disk_register_list[dst_node]->functions->full_write(disk_register_list[dst_node]->base, obj, ptr, size);
 		return 0;
 	}
 	return -EAGAIN;
@@ -316,19 +323,17 @@ int _starpu_disk_full_write(unsigned src_node STARPU_ATTRIBUTE_UNUSED, unsigned
 
 void *starpu_disk_open(unsigned node, void *pos, size_t size)
 {
-	unsigned position = get_location_with_node(node);
-	return disk_register_list[position]->functions->open(disk_register_list[position]->base, pos, size);
+	return disk_register_list[node]->functions->open(disk_register_list[node]->base, pos, size);
 }
 
 void starpu_disk_close(unsigned node, void *obj, size_t size)
 {
-	unsigned position = get_location_with_node(node);
-	disk_register_list[position]->functions->close(disk_register_list[position]->base, obj, size);
+	disk_register_list[node]->functions->close(disk_register_list[node]->base, obj, size);
 }
 
 void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
 {
-	unsigned position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	unsigned node = async_channel->event.disk_event.memory_node;
 
         if (async_channel->event.disk_event.requests != NULL && !_starpu_disk_backend_event_list_empty(async_channel->event.disk_event.requests))
         {
@@ -340,9 +345,9 @@ void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
                 {
                         next = _starpu_disk_backend_event_list_next(event);
 
-                        disk_register_list[position]->functions->wait_request(event->backend_event);
+                        disk_register_list[node]->functions->wait_request(event->backend_event);
 
-                        disk_register_list[position]->functions->free_request(event->backend_event);
+                        disk_register_list[node]->functions->free_request(event->backend_event);
 
                         _starpu_disk_backend_event_list_erase(async_channel->event.disk_event.requests, event);
 
@@ -359,7 +364,7 @@ void starpu_disk_wait_request(struct _starpu_async_channel *async_channel)
 
 int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
 {
-	unsigned position = get_location_with_node(async_channel->event.disk_event.memory_node);
+	unsigned node = async_channel->event.disk_event.memory_node;
 
         if (async_channel->event.disk_event.requests != NULL && !_starpu_disk_backend_event_list_empty(async_channel->event.disk_event.requests))
         {
@@ -371,11 +376,11 @@ int starpu_disk_test_request(struct _starpu_async_channel *async_channel)
                 {
                         next = _starpu_disk_backend_event_list_next(event);
 
-                        int res = disk_register_list[position]->functions->test_request(event->backend_event);
+                        int res = disk_register_list[node]->functions->test_request(event->backend_event);
 
                                 if (res)
                                 {
-                                        disk_register_list[position]->functions->free_request(event->backend_event);
+                                        disk_register_list[node]->functions->free_request(event->backend_event);
 
                                         _starpu_disk_backend_event_list_erase(async_channel->event.disk_event.requests, event);
 
@@ -410,51 +415,24 @@ void starpu_disk_free_request(struct _starpu_async_channel *async_channe STARPU_
 static int add_disk_in_list(unsigned node,  struct starpu_disk_ops *func, void *base)
 {
 	int n;
-
-	/* initialization */
-	if (disk_register_list == NULL)
-	{
-		_STARPU_CALLOC(disk_register_list, size_register_list, sizeof(struct disk_register *));
-	}
-	/* small size -> new size  */
-	if (disk_number >= size_register_list)
-	{
-		int old_size = size_register_list;
-		size_register_list *= 2;
-		_STARPU_REALLOC(disk_register_list, size_register_list*sizeof(struct disk_register *));
-
-		/* Initialize the new part */
-		int i;
-		for (i = old_size; i < size_register_list; i++)
-			disk_register_list[i] = NULL;
-	}
-
 	struct disk_register *dr;
 	_STARPU_MALLOC(dr, sizeof(struct disk_register));
-	dr->node = node;
 	dr->base = base;
 	dr->flag = STARPU_DISK_ALL;
 	dr->functions = func;
 	n = disk_number++;
-	disk_register_list[n] = dr;
-	memnode_to_disknode[node] = n;
+	disk_register_list[node] = dr;
 	return n;
 }
 
-static inline unsigned get_location_with_node(unsigned node)
-{
-	return memnode_to_disknode[node];
-}
 
 int _starpu_disk_can_copy(unsigned node1, unsigned node2)
 {
 	if (starpu_node_get_kind(node1) == STARPU_DISK_RAM && starpu_node_get_kind(node2) == STARPU_DISK_RAM)
 	{
-		unsigned pos1 = get_location_with_node(node1);
-		unsigned pos2 = get_location_with_node(node2);
-		if (disk_register_list[pos1]->functions == disk_register_list[pos2]->functions)
+		if (disk_register_list[node1]->functions == disk_register_list[node2]->functions)
 			/* they must have a copy function */
-			if (disk_register_list[pos1]->functions->copy != NULL)
+			if (disk_register_list[node1]->functions->copy != NULL)
 				return 1;
 	}
 	return 0;
@@ -462,14 +440,12 @@ int _starpu_disk_can_copy(unsigned node1, unsigned node2)
 
 void _starpu_set_disk_flag(unsigned node, int flag)
 {
-	unsigned pos = get_location_with_node(node);
-	disk_register_list[pos]->flag = flag;
+	disk_register_list[node]->flag = flag;
 }
 
 int _starpu_get_disk_flag(unsigned node)
 {
-	unsigned pos = get_location_with_node(node);
-	return disk_register_list[pos]->flag;
+	return disk_register_list[node]->flag;
 }
 
 void _starpu_swap_init(void)

+ 28 - 8
src/core/disk_ops/disk_hdf5.c

@@ -259,11 +259,14 @@ static void starpu_hdf5_copy_internal(struct _starpu_hdf5_work * work)
 	/* HDF5 H50copy supports only same size in both areas and copies the entire object */
 	if (work->offset_src == 0 && work->offset_dst == 0 && work->size == work->obj_src->size && work->size == work->obj_dst->size)
 	{
+		H5Dclose(work->obj_dst->dataset);
 		/* Dirty : Delete dataspace because H5Ocopy only works if destination does not exist */
 		H5Ldelete(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);
 
 		status = H5Ocopy(work->base_src->fileID, work->obj_src->path, work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT, H5P_DEFAULT); 
 		STARPU_ASSERT_MSG(status >= 0, "Can not copy data (%s) associed to this disk (%s) to the data (%s) on this disk (%s)\n", work->obj_src->path, work->base_src->path, work->obj_dst->path, work->base_dst->path);
+
+		work->obj_dst->dataset = H5Dopen2(work->base_dst->fileID, work->obj_dst->path, H5P_DEFAULT);				
 	}
 	else
 	{
@@ -309,14 +312,17 @@ static void * _starpu_hdf5_internal_thread(void * arg)
 			if (work->base_src < work->base_dst)
 			{
 				_starpu_hdf5_protect_start(work->base_src);
-				if (work->base_src != work->base_dst)
-					_starpu_hdf5_protect_start(work->base_dst);
+#ifdef H5_HAVE_THREADSAFE
+				_starpu_hdf5_protect_start(work->base_dst);
+#endif
 			}
 			else
 			{
 				_starpu_hdf5_protect_start(work->base_dst);
+#ifdef H5_HAVE_THREADSAFE
 				if (work->base_src != work->base_dst)
 					_starpu_hdf5_protect_start(work->base_src);
+#endif
 			}
 
                         switch(work->type)
@@ -344,8 +350,22 @@ static void * _starpu_hdf5_internal_thread(void * arg)
                                 default:
                                         STARPU_ABORT();
                         }
-                        _starpu_hdf5_protect_stop(work->base_src);
-                        _starpu_hdf5_protect_stop(work->base_dst);
+
+			if (work->base_src < work->base_dst)
+			{
+				_starpu_hdf5_protect_stop(work->base_src);
+#ifdef H5_HAVE_THREADSAFE
+				_starpu_hdf5_protect_stop(work->base_dst);
+#endif
+			}
+			else
+			{
+				_starpu_hdf5_protect_stop(work->base_dst);
+#ifdef H5_HAVE_THREADSAFE
+				if (work->base_src != work->base_dst)
+					_starpu_hdf5_protect_stop(work->base_src);
+#endif
+			}
 
                         /* Update event to tell it's finished */
                         starpu_sem_post((starpu_sem_t *) work->event);
@@ -354,10 +374,6 @@ static void * _starpu_hdf5_internal_thread(void * arg)
                 }
         }
 
-        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
-        STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
-        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
-
         return NULL;
 }
 
@@ -418,11 +434,13 @@ static void starpu_hdf5_send_work(void *base_src, void *obj_src, off_t offset_sr
         work->size = size;
         work->event = event;
 
+#ifdef H5_HAVE_THREADSAFE
         struct starpu_hdf5_base * fileBase;
 	if (fileBase_src != NULL)
 		fileBase = fileBase_src;
 	else	
 		fileBase = fileBase_dst;
+#endif
 
         STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
         _starpu_hdf5_work_list_push_front(&HDF5_VAR_WORK_LIST, work);
@@ -611,6 +629,7 @@ static void starpu_hdf5_unplug(void *base)
 		STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 		STARPU_PTHREAD_JOIN(HDF5_VAR_THREAD, NULL);
 		STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+		STARPU_PTHREAD_COND_DESTROY(&HDF5_VAR_COND);
 		STARPU_ASSERT(_starpu_hdf5_work_list_empty(&HDF5_VAR_WORK_LIST));
 		/* the internal thread is deleted */
 #ifndef H5_HAVE_THREADSAFE
@@ -620,6 +639,7 @@ static void starpu_hdf5_unplug(void *base)
         status = H5Fclose(fileBase->fileID);
 
         STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
+
 #ifndef H5_HAVE_THREADSAFE
         if (actual_nb_disk == 0)
 	{

+ 4 - 0
src/core/disk_ops/disk_unistd.c

@@ -60,7 +60,11 @@ struct starpu_disk_ops starpu_disk_unistd_ops =
 	.write = starpu_unistd_global_write,
 	.plug = starpu_unistd_global_plug,
 	.unplug = starpu_unistd_global_unplug,
+#ifdef STARPU_UNISTD_USE_COPY
 	.copy = starpu_unistd_global_copy,
+#else
+	.copy = NULL,
+#endif
 	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
 #ifdef HAVE_AIO_H
 	.async_read = starpu_unistd_global_async_read,

+ 16 - 0
src/core/disk_ops/disk_unistd_o_direct.c

@@ -100,6 +100,18 @@ void *starpu_unistd_o_direct_global_async_write(void *base, void *obj, void *buf
 }
 #endif
 
+#ifdef STARPU_UNISTD_USE_COPY
+void *  starpu_unistd_o_direct_global_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
+{
+
+	STARPU_ASSERT_MSG((size % getpagesize()) == 0, "The unistd_o_direct variant can only write a multiple of page size %lu Bytes (Here %lu). Use the non-o_direct unistd variant if your data is not a multiple of %lu",
+			(unsigned long) getpagesize(), (unsigned long) size, (unsigned long) getpagesize());
+
+	return starpu_unistd_global_copy(base_src, obj_src, offset_src, base_dst, obj_dst, offset_dst, size);
+}
+
+#endif
+
 struct starpu_disk_ops starpu_disk_unistd_o_direct_ops =
 {
 	.alloc = starpu_unistd_o_direct_alloc,
@@ -110,7 +122,11 @@ struct starpu_disk_ops starpu_disk_unistd_o_direct_ops =
 	.write = starpu_unistd_o_direct_write,
 	.plug = starpu_unistd_o_direct_plug,
 	.unplug = starpu_unistd_global_unplug,
+#ifdef STARPU_UNISTD_USE_COPY
+	.copy = starpu_unistd_o_direct_global_copy,
+#else
 	.copy = NULL,
+#endif
 	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
 #if defined(HAVE_AIO_H) || defined(HAVE_LIBAIO_H)
         .async_read = starpu_unistd_o_direct_global_async_read,

+ 114 - 46
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -19,7 +19,6 @@
 #include <fcntl.h>
 #include <stdlib.h>
 #include <sys/stat.h>
-#include <sys/syscall.h>
 #include <stdint.h>
 #include <errno.h>
 
@@ -57,31 +56,23 @@
 #define MAX_OPEN_FILES 64
 #define TEMP_HIERARCHY_DEPTH 2
 
-#ifndef HAVE_COPY_FILE_RANGE
-#ifdef __NR_copy_file_range
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
 static starpu_ssize_t copy_file_range(int fd_in, loff_t *off_in, int fd_out,
 				      loff_t *off_out, size_t len, unsigned int flags)
 {
 	return syscall(__NR_copy_file_range, fd_in, off_in, fd_out,
 			off_out, len, flags);
 }
-#else
-static starpu_ssize_t copy_file_range(int fd_in, off_t *off_in, int fd_out,
-				      off_t *off_out, size_t len, unsigned int flags)
-{
-	errno = ENOSYS;
-	return -1;
-}
-#endif
 #endif
 
 static unsigned starpu_unistd_opened_files;
 
+#ifdef STARPU_UNISTD_USE_COPY
 LIST_TYPE(starpu_unistd_work_copy,
 	int fd_src;
 	int fd_dst;
-	off_t off_src;
-	off_t off_dst;
+	loff_t off_src;
+	loff_t off_dst;
 	struct starpu_unistd_global_obj * obj_src;
 	struct starpu_unistd_global_obj * obj_dst;
 	size_t len;
@@ -100,13 +91,22 @@ struct starpu_unistd_copy_thread
 
 struct starpu_unistd_copy_thread copy_thread[STARPU_MAXNODES][STARPU_MAXNODES];
 static unsigned starpu_unistd_nb_disk_opened = 0;
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
+/* copy_file_range syscall can return ENOSYS. Use global var to catch
+ * and prevent StarPU using direct disk to disk copy */
+enum starpu_unistd_failed_copy { INIT, CHECKED, FAILED };
+static enum starpu_unistd_failed_copy starpu_unistd_copy_failed = INIT;
+#endif
+#endif
 
 struct starpu_unistd_base
 {
 	char * path;
 	int created;
 	/* To know which thread handles the copy function */
+#ifdef STARPU_UNISTD_USE_COPY
 	unsigned disk_index;
+#endif
 #if defined(HAVE_LIBAIO_H)
 	io_context_t ctx;
         struct starpu_unistd_aiocb_link * hashtable;
@@ -142,7 +142,9 @@ enum starpu_unistd_wait_type { STARPU_UNISTD_AIOCB, STARPU_UNISTD_COPY };
 union starpu_unistd_wait_event
 {
 	struct starpu_unistd_work_copy * event_copy;
+#if defined(HAVE_LIBAIO_H) || defined(HAVE_AIO_H)
 	struct starpu_unistd_aiocb event_aiocb;
+#endif
 };
 
 struct starpu_unistd_wait
@@ -534,24 +536,42 @@ int starpu_unistd_global_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *ob
 	return starpu_unistd_global_write(base, obj, ptr, 0, size);
 }
 
+#ifdef STARPU_UNISTD_USE_COPY
 static void * starpu_unistd_internal_thread(void * arg)
 {
-	struct starpu_unistd_copy_thread * copy_thread = (struct starpu_unistd_copy_thread *) arg;
+	struct starpu_unistd_copy_thread * internal_copy_thread = (struct starpu_unistd_copy_thread *) arg;
 
-	while (copy_thread->run || !starpu_unistd_work_copy_list_empty(copy_thread->list))
+	while (internal_copy_thread->run || !starpu_unistd_work_copy_list_empty(internal_copy_thread->list))
 	{
-		STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
-		if (copy_thread->run && starpu_unistd_work_copy_list_empty(copy_thread->list))
-                        STARPU_PTHREAD_COND_WAIT(&copy_thread->cond, &copy_thread->mutex);
-		STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+		STARPU_PTHREAD_MUTEX_LOCK(&internal_copy_thread->mutex);
+		if (internal_copy_thread->run && starpu_unistd_work_copy_list_empty(internal_copy_thread->list))
+                        STARPU_PTHREAD_COND_WAIT(&internal_copy_thread->cond, &internal_copy_thread->mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&internal_copy_thread->mutex);
 
-		if (!starpu_unistd_work_copy_list_empty(copy_thread->list))
+		if (!starpu_unistd_work_copy_list_empty(internal_copy_thread->list))
 		{
-			STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
-			struct starpu_unistd_work_copy * work = starpu_unistd_work_copy_list_pop_back(copy_thread->list);
-			STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+			STARPU_PTHREAD_MUTEX_LOCK(&internal_copy_thread->mutex);
+			struct starpu_unistd_work_copy * work = starpu_unistd_work_copy_list_pop_back(internal_copy_thread->list);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&internal_copy_thread->mutex);
+
+			starpu_ssize_t ret = copy_file_range(work->fd_src, &work->off_src, work->fd_dst, &work->off_dst, work->len, work->flags);
 
-			copy_file_range(work->fd_src, &work->off_src, work->fd_dst, &work->off_dst, work->len, work->flags);
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
+			STARPU_PTHREAD_MUTEX_LOCK(&internal_copy_thread->mutex);
+			if (starpu_unistd_copy_failed == INIT && ret == -1 && errno == ENOSYS)
+			{
+				starpu_unistd_copy_failed = FAILED;
+			} 
+			else
+			{
+#endif
+				STARPU_ASSERT_MSG(ret >= 0, "Copy_file_range failed (errno %d)", errno);
+				STARPU_ASSERT_MSG((size_t) ret == work->len, "Copy_file_range failed (value %zd instead of %zd)", ret, work->len);
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
+				starpu_unistd_copy_failed = CHECKED;
+			}
+			STARPU_PTHREAD_MUTEX_UNLOCK(&internal_copy_thread->mutex);
+#endif
 
 			starpu_sem_post(&work->finished);
 
@@ -563,14 +583,15 @@ static void * starpu_unistd_internal_thread(void * arg)
 	return NULL;
 }
 
-static void initialize_working_thread(struct starpu_unistd_copy_thread *copy_thread)
+static void initialize_working_thread(struct starpu_unistd_copy_thread *internal_copy_thread)
 {
-	STARPU_PTHREAD_MUTEX_INIT(&copy_thread->mutex, NULL);
-	STARPU_PTHREAD_COND_INIT(&copy_thread->cond, NULL);
-	copy_thread->run = 1;
-	copy_thread->list = starpu_unistd_work_copy_list_new();
-	STARPU_PTHREAD_CREATE(&copy_thread->thread, NULL, starpu_unistd_internal_thread, copy_thread);
+	STARPU_PTHREAD_MUTEX_INIT(&internal_copy_thread->mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&internal_copy_thread->cond, NULL);
+	internal_copy_thread->run = 1;
+	internal_copy_thread->list = starpu_unistd_work_copy_list_new();
+	STARPU_PTHREAD_CREATE(&internal_copy_thread->thread, NULL, starpu_unistd_internal_thread, internal_copy_thread);
 }
+#endif
 
 /* create a new copy of parameter == base */
 void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
@@ -598,10 +619,11 @@ void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTR
 	STARPU_ASSERT(ret == 0);
 #endif
 
+#ifdef STARPU_UNISTD_USE_COPY
 	base->disk_index = starpu_unistd_nb_disk_opened;
 	starpu_unistd_nb_disk_opened++;
 
-	int i;
+	unsigned i;
 	for (i = 0; i < starpu_unistd_nb_disk_opened; i++)
 	{
 		initialize_working_thread(&copy_thread[i][base->disk_index]);
@@ -609,23 +631,26 @@ void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTR
 		if (i != base->disk_index)
 			initialize_working_thread(&copy_thread[base->disk_index][i]);
 	}
+#endif
 
 	return (void *) base;
 }
 
-static void ending_working_thread(struct starpu_unistd_copy_thread *copy_thread)
+#ifdef STARPU_UNISTD_USE_COPY
+static void ending_working_thread(struct starpu_unistd_copy_thread *internal_copy_thread)
 {
-	STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
-	copy_thread->run = 0;
-	STARPU_PTHREAD_COND_BROADCAST(&copy_thread->cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+	STARPU_PTHREAD_MUTEX_LOCK(&internal_copy_thread->mutex);
+	internal_copy_thread->run = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&internal_copy_thread->cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&internal_copy_thread->mutex);
 
-	STARPU_PTHREAD_JOIN(copy_thread->thread, NULL);
+	STARPU_PTHREAD_JOIN(internal_copy_thread->thread, NULL);
 
-	STARPU_PTHREAD_MUTEX_DESTROY(&copy_thread->mutex);
-	STARPU_PTHREAD_COND_DESTROY(&copy_thread->cond);
-	starpu_unistd_work_copy_list_delete(copy_thread->list);
+	STARPU_PTHREAD_MUTEX_DESTROY(&internal_copy_thread->mutex);
+	STARPU_PTHREAD_COND_DESTROY(&internal_copy_thread->cond);
+	starpu_unistd_work_copy_list_delete(internal_copy_thread->list);
 }
+#endif
 
 /* free memory allocated for the base */
 void starpu_unistd_global_unplug(void *base)
@@ -638,7 +663,8 @@ void starpu_unistd_global_unplug(void *base)
 	if (fileBase->created)
 		rmdir(fileBase->path);
 
-	int i;
+#ifdef STARPU_UNISTD_USE_COPY
+	unsigned i;
 	for (i = 0; i < fileBase->disk_index+1; i++)
 	{
 		ending_working_thread(&copy_thread[i][fileBase->disk_index]);
@@ -648,6 +674,10 @@ void starpu_unistd_global_unplug(void *base)
 	}
 	starpu_unistd_nb_disk_opened--;
 
+	if (starpu_unistd_nb_disk_opened == 0)
+		starpu_unistd_copy_failed = INIT;
+#endif
+
 	free(fileBase->path);
 	free(fileBase);
 }
@@ -788,11 +818,13 @@ void starpu_unistd_global_wait_request(void *async_channel)
 			break;
 		}
 
+#ifdef STARPU_UNISTD_USE_COPY
 		case STARPU_UNISTD_COPY :
 		{
 			starpu_sem_wait(&event->event.event_copy->finished);
 			break;
 		}
+#endif
 
 		default :
 			STARPU_ABORT_MSG();
@@ -842,7 +874,7 @@ int starpu_unistd_global_test_request(void *async_channel)
 			return 0;
 #elif defined(HAVE_AIO_H)
 			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
-			struct aiocb *aiocb = &starpu_aiocb->aiocb;
+			const struct aiocb *aiocb = &starpu_aiocb->aiocb;
 			int ret;
 
 #if defined(__GLIBC__) && (__GLIBC__ < 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ < 22))
@@ -866,11 +898,13 @@ int starpu_unistd_global_test_request(void *async_channel)
 			break;
 		}
 
+#ifdef STARPU_UNISTD_USE_COPY
 		case STARPU_UNISTD_COPY :
 		{
 			return starpu_sem_trywait(&event->event.event_copy->finished) == 0;
 			break;
 		}
+#endif
 
 		default :
 			STARPU_ABORT_MSG();
@@ -904,6 +938,7 @@ void starpu_unistd_global_free_request(void *async_channel)
 			break;
 		}
 
+#ifdef STARPU_UNISTD_USE_COPY
 		case STARPU_UNISTD_COPY :
 		{
 			starpu_sem_destroy(&event->event.event_copy->finished);
@@ -919,6 +954,7 @@ void starpu_unistd_global_free_request(void *async_channel)
 			free(event);
 			break;
 		}
+#endif
 
 		default :
 			STARPU_ABORT_MSG();
@@ -927,6 +963,7 @@ void starpu_unistd_global_free_request(void *async_channel)
 }
 
 
+#ifdef STARPU_UNISTD_USE_COPY
 void *  starpu_unistd_global_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
 {
 	struct starpu_unistd_global_obj * unistd_obj_src = obj_src;
@@ -959,10 +996,41 @@ void *  starpu_unistd_global_copy(void *base_src, void* obj_src, off_t offset_sr
 
 	event->event.event_copy = work;
 
-	STARPU_PTHREAD_MUTEX_LOCK(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].mutex);
-	starpu_unistd_work_copy_list_push_front(copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].list, work);
-        STARPU_PTHREAD_COND_BROADCAST(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].cond);
-	STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].mutex);
+	struct starpu_unistd_copy_thread * thread = &copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index];
+
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
+	unsigned check = 0;
+	STARPU_PTHREAD_MUTEX_LOCK(&thread->mutex);
+	if (starpu_unistd_copy_failed == INIT)
+		check = 1;
+	STARPU_PTHREAD_MUTEX_UNLOCK(&thread->mutex);
+#endif
+
+
+	STARPU_PTHREAD_MUTEX_LOCK(&thread->mutex);
+	starpu_unistd_work_copy_list_push_front(thread->list, work);
+        STARPU_PTHREAD_COND_BROADCAST(&thread->cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&thread->mutex);
+
+#if !defined(HAVE_COPY_FILE_RANGE) && defined( __NR_copy_file_range)
+	if (check)
+	{
+		starpu_unistd_global_wait_request((void *) event);
+		/* add token when StarPU will test/wait the request */
+		starpu_sem_post(&work->finished);
+	
+		STARPU_PTHREAD_MUTEX_LOCK(&thread->mutex);
+		/* here copy_file_range does not work */
+		if (starpu_unistd_copy_failed == FAILED)
+		{
+			STARPU_PTHREAD_MUTEX_UNLOCK(&thread->mutex);
+			starpu_unistd_global_free_request((void *) event);
+			return NULL;
+		}
+		STARPU_PTHREAD_MUTEX_UNLOCK(&thread->mutex);
+	}
+#endif
 
 	return event;
 }
+#endif

+ 10 - 0
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -18,11 +18,19 @@
 #define __DISK_UNISTD_GLOBAL_H__
 
 #include <fcntl.h>
+#ifdef __linux__
+#include <sys/syscall.h>
+#endif
 
 #ifndef O_BINARY
 #define O_BINARY 0
 #endif
 
+#define STARPU_UNISTD_USE_COPY 1
+#if !defined(HAVE_COPY_FILE_RANGE) && !defined(__NR_copy_file_range)
+#undef STARPU_UNISTD_USE_COPY
+#endif
+
 struct starpu_unistd_global_obj
 {
         int descriptor;
@@ -48,5 +56,7 @@ int starpu_unistd_global_test_request(void * async_channel);
 void starpu_unistd_global_free_request(void * async_channel);
 int starpu_unistd_global_full_read(void *base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 int starpu_unistd_global_full_write (void * base, void * obj, void * ptr, size_t size);
+#ifdef STARPU_UNISTD_USE_COPY
 void *  starpu_unistd_global_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);
 #endif
+#endif

+ 2 - 2
src/core/perfmodel/perfmodel_history.c

@@ -1650,6 +1650,8 @@ void _starpu_update_perfmodel_history(struct _starpu_job *j, struct starpu_perfm
 		unsigned found = 0;
 		int comb = _starpu_perfmodel_create_comb_if_needed(arch);
 
+		STARPU_PTHREAD_RWLOCK_WRLOCK(&model->state->model_rwlock);
+
 		for(c = 0; c < model->state->ncombs; c++)
 		{
 			if(model->state->combs[c] == comb)
@@ -1659,8 +1661,6 @@ void _starpu_update_perfmodel_history(struct _starpu_job *j, struct starpu_perfm
 			}
 		}
 
-		STARPU_PTHREAD_RWLOCK_WRLOCK(&model->state->model_rwlock);
-
 		if(!found)
 		{
 			if (model->state->ncombs + 1 >= model->state->ncombs_set)

+ 1 - 1
src/core/topology.c

@@ -2182,7 +2182,7 @@ static void _starpu_init_numa_node(struct _starpu_machine_config *config)
 			numa_memory_nodes_to_physicalid[memnode] = STARPU_NUMA_MAIN_RAM;
 			nb_numa_nodes++;
 #ifdef STARPU_SIMGRID
-			msg_host_t host = _starpu_simgrid_get_host_by_name("RAM");
+			host = _starpu_simgrid_get_host_by_name("RAM");
 			STARPU_ASSERT(host);
 			_starpu_simgrid_memory_node_set_host(STARPU_MAIN_RAM, host);
 #endif

+ 44 - 23
src/core/workers.c

@@ -1096,7 +1096,8 @@ struct starpu_tree* starpu_workers_get_tree(void)
 #ifdef STARPU_HAVE_HWLOC
 static void _fill_tree(struct starpu_tree *tree, hwloc_obj_t curr_obj, unsigned depth, hwloc_topology_t topology, struct starpu_tree *father)
 {
-	unsigned i;
+	unsigned i, j;
+	unsigned arity;
 	if (curr_obj->arity == 1)
 	{
 		/* Nothing interestin here, skip level */
@@ -1104,13 +1105,30 @@ static void _fill_tree(struct starpu_tree *tree, hwloc_obj_t curr_obj, unsigned
 		return;
 	}
 	starpu_tree_insert(tree, curr_obj->logical_index, depth, curr_obj->type == HWLOC_OBJ_PU, curr_obj->arity, father);
-	starpu_tree_prepare_children(curr_obj->arity, tree);
+	arity = 0;
 	for(i = 0; i < curr_obj->arity; i++)
 	{
-/* 		char string[128]; */
-/* 		hwloc_obj_snprintf(string, sizeof(string), topology, curr_obj->children[i], "#", 0); */
-/* 		printf("%*s%s %d is_pu %d \n", 0, "", string, curr_obj->children[i]->logical_index, curr_obj->children[i]->type == HWLOC_OBJ_PU); */
-		_fill_tree(&tree->nodes[i], curr_obj->children[i], depth+1, topology, tree);
+		hwloc_obj_t child = curr_obj->children[i];
+		if (child->type == HWLOC_OBJ_BRIDGE && (!child->cpuset || hwloc_bitmap_iszero(child->cpuset)))
+			/* I/O stuff, stop caring */
+			continue;
+		arity++;
+	}
+	starpu_tree_prepare_children(arity, tree);
+	j = 0;
+	for(i = 0; i < arity; i++)
+	{
+		hwloc_obj_t child = curr_obj->children[i];
+		if (child->type == HWLOC_OBJ_BRIDGE && (!child->cpuset || hwloc_bitmap_iszero(child->cpuset)))
+			/* I/O stuff, stop caring */
+			continue;
+#if 0
+		char string[128];
+		hwloc_obj_snprintf(string, sizeof(string), topology, child, "#", 0);
+		printf("%*s%s %d is_pu %d \n", 0, "", string, child->logical_index, child->type == HWLOC_OBJ_PU);
+#endif
+		_fill_tree(&tree->nodes[j], child, depth+1, topology, tree);
+		j++;
 	}
 }
 #endif
@@ -1118,27 +1136,20 @@ static void _fill_tree(struct starpu_tree *tree, hwloc_obj_t curr_obj, unsigned
 static void _starpu_build_tree(void)
 {
 #ifdef STARPU_HAVE_HWLOC
-	hwloc_topology_t cpu_topo;
 	struct starpu_tree *tree;
 	_STARPU_MALLOC(tree, sizeof(struct starpu_tree));
 	_starpu_config.topology.tree = tree;
 
-	hwloc_topology_init(&cpu_topo);
-#if HWLOC_API_VERSION >= 0x20000
-	hwloc_topology_set_all_types_filter(cpu_topo, HWLOC_TYPE_FILTER_KEEP_STRUCTURE);
-#else
-	hwloc_topology_ignore_all_keep_structure(cpu_topo);
-#endif
-	hwloc_topology_load(cpu_topo);
-	hwloc_obj_t root = hwloc_get_root_obj(cpu_topo);
+	hwloc_obj_t root = hwloc_get_root_obj(_starpu_config.topology.hwtopology);
 
-/* 	char string[128]; */
-/* 	hwloc_obj_snprintf(string, sizeof(string), topology, root, "#", 0); */
-/* 	printf("%*s%s %d is_pu = %d \n", 0, "", string, root->logical_index, root->type == HWLOC_OBJ_PU); */
+#if 0
+	char string[128];
+	hwloc_obj_snprintf(string, sizeof(string), topology, root, "#", 0);
+	printf("%*s%s %d is_pu = %d \n", 0, "", string, root->logical_index, root->type == HWLOC_OBJ_PU);
+#endif
 
 	/* level, is_pu, is in the tree (it will be true only after add) */
-	_fill_tree(tree, root, 0, cpu_topo, NULL);
-	hwloc_topology_destroy(cpu_topo);
+	_fill_tree(tree, root, 0, _starpu_config.topology.hwtopology, NULL);
 #endif
 }
 
@@ -2148,6 +2159,11 @@ void starpu_worker_get_sched_condition(int workerid, starpu_pthread_mutex_t **sc
 	*sched_mutex = &_starpu_config.workers[workerid].sched_mutex;
 }
 
+/* returns 1 if the call results in initiating a transition of worker WORKERID
+ * from sleeping state to awake
+ * returns 0 if worker WORKERID is not sleeping or the wake-up transition
+ * already has been initiated
+ */
 static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sched_cond, starpu_pthread_mutex_t *mutex STARPU_ATTRIBUTE_UNUSED)
 {
 #ifdef STARPU_SIMGRID
@@ -2156,15 +2172,20 @@ static int starpu_wakeup_worker_locked(int workerid, starpu_pthread_cond_t *sche
 	if (_starpu_config.workers[workerid].status == STATUS_SCHEDULING)
 	{
 		_starpu_config.workers[workerid].state_keep_awake = 1;
-		return 1;
+		return 0;
 	}
 	else if (_starpu_config.workers[workerid].status == STATUS_SLEEPING)
 	{
-		_starpu_config.workers[workerid].state_keep_awake = 1;
+		int ret = 0;
+		if (_starpu_config.workers[workerid].state_keep_awake != 1)
+		{
+			_starpu_config.workers[workerid].state_keep_awake = 1;
+			ret = 1;
+		}
 		/* cond_broadcast is required over cond_signal since
 		 * the condition is share for multiple purpose */
 		STARPU_PTHREAD_COND_BROADCAST(sched_cond);
-		return 1;
+		return ret;
 	}
 	return 0;
 }

+ 2 - 2
src/datawizard/copy_driver.c

@@ -598,7 +598,7 @@ static int copy_data_1_to_1_generic(starpu_data_handle_t handle,
                         req->async_channel.type = STARPU_DISK_RAM;
                         req->async_channel.event.disk_event.requests = NULL;
                 }
-		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req ? &req->async_channel : NULL);
+		ret = copy_methods->any_to_any(src_interface, src_node, dst_interface, dst_node, req && !starpu_asynchronous_copy_disabled() ? &req->async_channel : NULL);
 		break;
 
 	default:
@@ -954,7 +954,7 @@ unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *as
 		break;
 	case STARPU_CPU_RAM:
 	default:
-		STARPU_ABORT();
+		STARPU_ABORT_MSG("Memory is not recognized (kind %u) \n", kind);
 	}
 
 	return success;

+ 57 - 16
src/drivers/driver_common/driver_common.c

@@ -390,26 +390,48 @@ struct starpu_task *_starpu_get_worker_task(struct _starpu_worker *worker, int w
 		_starpu_worker_leave_sched_op(worker);
 		STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
 
-		if (_starpu_worker_can_block(memnode, worker)
-			&& !_starpu_sched_ctx_last_worker_awake(worker)
-			&& !worker->state_block_in_parallel_req
-			&& !worker->state_unblock_in_parallel_req)
+		int cond_no_keep_awake = !worker->state_keep_awake;
+		int cond_can_block = _starpu_worker_can_block(memnode, worker);
+		int cond_no_last_awake = !_starpu_sched_ctx_last_worker_awake(worker);
+		int cond_no_block_in_parallel_rq = !worker->state_block_in_parallel_req;
+		int cond_no_unblock_in_parallel_rq = !worker->state_unblock_in_parallel_req;
+
+		if (cond_can_block
+			&& cond_no_last_awake
+			&& cond_no_block_in_parallel_rq
+			&& cond_no_unblock_in_parallel_rq)
 		{
 			do
 			{
+				//_STARPU_DEBUG("worker %u going to sleep: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+				cond_no_keep_awake = !worker->state_keep_awake;
+				cond_can_block = _starpu_worker_can_block(memnode, worker);
+				cond_no_last_awake = !_starpu_sched_ctx_last_worker_awake(worker);
+				cond_no_block_in_parallel_rq = !worker->state_block_in_parallel_req;
+				cond_no_unblock_in_parallel_rq = !worker->state_unblock_in_parallel_req;
 			}
 			/* do not check status != SLEEPING here since status is
 			 * not changed by other threads/workers */
-			while (!worker->state_keep_awake
-					&& !worker->state_block_in_parallel_req
-					&& !worker->state_unblock_in_parallel_req);
+			while (cond_no_keep_awake
+					&& cond_can_block
+					&& cond_no_last_awake
+					&& cond_no_block_in_parallel_rq
+					&& cond_no_unblock_in_parallel_rq);
+			//_STARPU_DEBUG("worker %u waking up: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
 			worker->state_keep_awake = 0;
+			_starpu_worker_set_status_scheduling_done(workerid);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
-		} else {
+		}
+		else
+		{
+			//_STARPU_DEBUG("worker %u wont sleep: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
+			_starpu_worker_set_status_scheduling_done(workerid);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
-				_starpu_exponential_backoff(worker); }
+				_starpu_exponential_backoff(worker);
+		}
 
 		return NULL;
 	}
@@ -568,23 +590,42 @@ int _starpu_get_multi_worker_task(struct _starpu_worker *workers, struct starpu_
 		_starpu_worker_set_status_sleeping(workerid);
 		_starpu_worker_leave_sched_op(worker);
 
-		if (_starpu_worker_can_block(memnode, worker)
-				&& !_starpu_sched_ctx_last_worker_awake(worker)
-				&& !worker->state_block_in_parallel_req
-				&& !worker->state_unblock_in_parallel_req)
+		int cond_no_keep_awake = 1;
+		int cond_can_block = _starpu_worker_can_block(memnode, worker);
+		int cond_no_last_awake = !_starpu_sched_ctx_last_worker_awake(worker);
+		int cond_no_block_in_parallel_rq = !worker->state_block_in_parallel_req;
+		int cond_no_unblock_in_parallel_rq = !worker->state_unblock_in_parallel_req;
+
+		if (cond_can_block
+			&& cond_no_last_awake
+			&& cond_no_block_in_parallel_rq
+			&& cond_no_unblock_in_parallel_rq)
 		{
 			do
 			{
+				//_STARPU_DEBUG("worker %u going to sleep: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
 				STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
+
+				cond_no_keep_awake = !worker->state_keep_awake;
+				cond_can_block = _starpu_worker_can_block(memnode, worker);
+				cond_no_last_awake = !_starpu_sched_ctx_last_worker_awake(worker);
+				cond_no_block_in_parallel_rq = !worker->state_block_in_parallel_req;
+				cond_no_unblock_in_parallel_rq = !worker->state_unblock_in_parallel_req;
 			}
-			while (!worker->state_keep_awake
-					&& !worker->state_block_in_parallel_req
-					&& !worker->state_unblock_in_parallel_req);
+			while (cond_no_keep_awake
+					&& cond_can_block
+					&& cond_no_last_awake
+					&& cond_no_block_in_parallel_rq
+					&& cond_no_unblock_in_parallel_rq);
+			//_STARPU_DEBUG("worker %u waking up: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
 			worker->state_keep_awake = 0;
+			_starpu_worker_set_status_scheduling_done(workerid);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 		}
 		else
 		{
+			//_STARPU_DEBUG("worker %u wont sleep: %d|%d|%d|%d|%d\n", worker->workerid, cond_no_keep_awake, cond_can_block, cond_no_last_awake, cond_no_block_in_parallel_rq, cond_no_unblock_in_parallel_rq);
+			_starpu_worker_set_status_scheduling_done(workerid);
 			STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
 			if (_starpu_machine_is_running())
 				_starpu_exponential_backoff(worker);

+ 1 - 1
tests/Makefile.am

@@ -151,7 +151,6 @@ myPROGRAMS +=					\
 	datawizard/interfaces/copy_interfaces	\
 	datawizard/locality			\
 	datawizard/variable_size		\
-	datawizard/redux_acquire		\
 	errorcheck/starpu_init_noworker		\
 	errorcheck/invalid_tasks		\
 	helper/cublas_init			\
@@ -278,6 +277,7 @@ myPROGRAMS +=				\
 	datawizard/test_arbiter			\
 	datawizard/invalidate_pending_requests	\
 	datawizard/temporary_partition		\
+	datawizard/redux_acquire		\
 	disk/disk_copy				\
 	disk/disk_copy_to_disk			\
 	disk/disk_compute			\

+ 11 - 0
tests/datawizard/redux_acquire.c

@@ -16,6 +16,7 @@
 
 #include <starpu.h>
 #include <math.h>
+#include "helper.h"
 
 void init_cpu_func(void *descr[], void *cl_arg)
 {
@@ -59,8 +60,13 @@ int main(int argc, char **argv)
 	starpu_data_handle_t dot_handle;
 
 	int ret = starpu_init(NULL);
+	if (ret == -ENODEV)
+		goto skip;
 	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
 
+	if (starpu_cpu_worker_get_count() == 0)
+		goto enodev;
+
 	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(long int));
 	starpu_data_set_reduction_methods(dot_handle, &redux_codelet, &init_codelet);
 	starpu_data_acquire(dot_handle, STARPU_R);
@@ -76,4 +82,9 @@ int main(int argc, char **argv)
 
 	starpu_shutdown();
 	return 0;
+
+enodev:
+	starpu_shutdown();
+skip:
+	return STARPU_TEST_SKIPPED;
 }