Browse Source

merge trunk

Samuel Thibault 7 years ago
parent
commit
fd71b927ec

+ 1 - 0
configure.ac

@@ -853,6 +853,7 @@ AC_CHECK_HEADERS([aio.h])
 AC_CHECK_LIB([rt], [aio_read])
 #AC_CHECK_HEADERS([libaio.h])
 #AC_CHECK_LIB([aio], [io_setup])
+AC_CHECK_FUNCS([copy_file_range])
 
 AC_CHECK_FUNCS([mkostemp])
 AC_CHECK_FUNCS([mkdtemp])

+ 2 - 0
src/core/dependencies/data_concurrency.c

@@ -122,6 +122,8 @@ static unsigned _starpu_attempt_to_submit_data_request(unsigned request_from_cod
 	mode &= ~STARPU_COMMUTE;
 	mode &= ~STARPU_SSEND;
 	mode &= ~STARPU_LOCALITY;
+	if (mode & STARPU_R)
+		STARPU_ASSERT_MSG(handle->initialized, "handle %p is not initialized while trying to read it", handle);
 	if (mode == STARPU_RW)
 		mode = STARPU_W;
 

+ 1 - 1
src/core/disk_ops/disk_unistd.c

@@ -60,7 +60,7 @@ struct starpu_disk_ops starpu_disk_unistd_ops =
 	.write = starpu_unistd_global_write,
 	.plug = starpu_unistd_global_plug,
 	.unplug = starpu_unistd_global_unplug,
-	.copy = NULL,
+	.copy = starpu_unistd_global_copy,
 	.bandwidth = get_unistd_global_bandwidth_between_disk_and_main_ram,
 #ifdef HAVE_AIO_H
 	.async_read = starpu_unistd_global_async_read,

+ 360 - 113
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -2,6 +2,7 @@
  *
  * Copyright (C) 2013 Corentin Salingue
  * Copyright (C) 2015, 2016, 2017 CNRS
+ * Copyright (C) 2017  Inria
  *
  * StarPU is free software; you can redistribute it and/or modify
  * it under the terms of the GNU Lesser General Public License as published by
@@ -18,6 +19,7 @@
 #include <fcntl.h>
 #include <stdlib.h>
 #include <sys/stat.h>
+#include <sys/syscall.h>
 #include <stdint.h>
 #include <errno.h>
 
@@ -55,12 +57,56 @@
 #define MAX_OPEN_FILES 64
 #define TEMP_HIERARCHY_DEPTH 2
 
+#ifndef HAVE_COPY_FILE_RANGE 
+#ifdef __NR_copy_file_range
+static loff_t copy_file_range(int fd_in, loff_t *off_in, int fd_out,
+		loff_t *off_out, size_t len, unsigned int flags)
+{
+	return syscall(__NR_copy_file_range, fd_in, off_in, fd_out,
+			off_out, len, flags);
+}
+#else
+static loff_t copy_file_range(int fd_in, loff_t *off_in, int fd_out,
+		loff_t *off_out, size_t len, unsigned int flags)
+{
+	errno = ENOSYS;
+	return -1
+}
+#endif
+#endif
+
 static unsigned starpu_unistd_opened_files;
 
+LIST_TYPE(starpu_unistd_work_copy,
+	int fd_src;
+	int fd_dst;
+	off_t off_src;
+	off_t off_dst;
+	struct starpu_unistd_global_obj * obj_src;
+	struct starpu_unistd_global_obj * obj_dst;
+	size_t len;
+	unsigned flags;
+	starpu_sem_t finished;	
+);
+
+struct starpu_unistd_copy_thread
+{
+	int run;
+	starpu_pthread_t thread;
+	starpu_pthread_cond_t cond;
+	starpu_pthread_mutex_t mutex;	
+	struct starpu_unistd_work_copy_list *list;
+};
+
+struct starpu_unistd_copy_thread copy_thread[STARPU_MAXNODES][STARPU_MAXNODES];
+static unsigned starpu_unistd_nb_disk_opened = 0;
+
 struct starpu_unistd_base
 {
 	char * path;
 	int created;
+	/* To know which thread handles the copy function */
+	unsigned disk_index;	
 #if defined(HAVE_LIBAIO_H)
 	io_context_t ctx;
         struct starpu_unistd_aiocb_link * hashtable;
@@ -91,6 +137,20 @@ struct starpu_unistd_aiocb
 };
 #endif
 
+enum starpu_unistd_wait_type { STARPU_UNISTD_AIOCB, STARPU_UNISTD_COPY };
+
+union starpu_unistd_wait_event
+{
+	struct starpu_unistd_work_copy * event_copy;
+	struct starpu_unistd_aiocb event_aiocb;
+};
+
+struct starpu_unistd_wait
+{
+	enum starpu_unistd_wait_type type;
+	union starpu_unistd_wait_event event;
+};
+
 /* ------------------- use UNISTD to write on disk -------------------  */
 
 static void _starpu_unistd_init(struct starpu_unistd_global_obj *obj, int descriptor, char *path, size_t size)
@@ -257,8 +317,10 @@ void *starpu_unistd_global_async_read(void *base, void *obj, void *buf, off_t of
 {
         struct starpu_unistd_base * fileBase = (struct starpu_unistd_base *) base;
         struct starpu_unistd_global_obj *tmp = obj;
-        struct starpu_unistd_aiocb *starpu_aiocb;
-	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+	struct starpu_unistd_wait * event;
+	_STARPU_CALLOC(event, 1,sizeof(*event));
+	event->type = STARPU_UNISTD_AIOCB;
+        struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
         struct iocb *iocb = &starpu_aiocb->iocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
@@ -286,14 +348,16 @@ void *starpu_unistd_global_async_read(void *base, void *obj, void *buf, off_t of
         HASH_ADD_PTR(fileBase->hashtable, aiocb, l);
         STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
 
-        return starpu_aiocb;
+        return event;
 }
 #elif defined(HAVE_AIO_H)
 void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
-        struct starpu_unistd_aiocb *starpu_aiocb;
-	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+	struct starpu_unistd_wait * event;
+	_STARPU_CALLOC(event, 1,sizeof(*event));
+	event->type = STARPU_UNISTD_AIOCB;
+        struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
         struct aiocb *aiocb = &starpu_aiocb->aiocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
@@ -316,7 +380,7 @@ void *starpu_unistd_global_async_read(void *base STARPU_ATTRIBUTE_UNUSED, void *
                 aiocb = NULL;
         }
 
-        return aiocb;
+        return event;
 }
 #endif
 
@@ -382,8 +446,10 @@ void *starpu_unistd_global_async_write(void *base, void *obj, void *buf, off_t o
 {
         struct starpu_unistd_base * fileBase = (struct starpu_unistd_base *) base;
         struct starpu_unistd_global_obj *tmp = obj;
-        struct starpu_unistd_aiocb *starpu_aiocb;
-	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+	struct starpu_unistd_wait * event;
+	_STARPU_CALLOC(event, 1,sizeof(*event));
+	event->type = STARPU_UNISTD_AIOCB;
+        struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
         struct iocb *iocb = &starpu_aiocb->iocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
@@ -411,14 +477,16 @@ void *starpu_unistd_global_async_write(void *base, void *obj, void *buf, off_t o
         HASH_ADD_PTR(fileBase->hashtable, aiocb, l);
         STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
 
-        return starpu_aiocb;
+        return event;
 }
 #elif defined(HAVE_AIO_H)
 void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size)
 {
         struct starpu_unistd_global_obj *tmp = obj;
-        struct starpu_unistd_aiocb *starpu_aiocb;
-	_STARPU_CALLOC(starpu_aiocb, 1,sizeof(*starpu_aiocb));
+	struct starpu_unistd_wait * event;
+	_STARPU_CALLOC(event, 1,sizeof(*event));
+	event->type = STARPU_UNISTD_AIOCB;
+        struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
         struct aiocb *aiocb = &starpu_aiocb->aiocb;
         starpu_aiocb->obj = obj;
         int fd = tmp->descriptor;
@@ -441,7 +509,7 @@ void *starpu_unistd_global_async_write(void *base STARPU_ATTRIBUTE_UNUSED, void
                 aiocb = NULL;
         }
 
-        return aiocb;
+        return event;
 }
 #endif
 
@@ -466,6 +534,44 @@ int starpu_unistd_global_full_write(void *base STARPU_ATTRIBUTE_UNUSED, void *ob
 	return starpu_unistd_global_write(base, obj, ptr, 0, size);
 }
 
+static void * starpu_unistd_internal_thread(void * arg)
+{
+	struct starpu_unistd_copy_thread * copy_thread = (struct starpu_unistd_copy_thread *) arg;
+	
+	while (copy_thread->run || !starpu_unistd_work_copy_list_empty(copy_thread->list))
+	{
+		STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
+		if (copy_thread->run && starpu_unistd_work_copy_list_empty(copy_thread->list))	
+                        STARPU_PTHREAD_COND_WAIT(&copy_thread->cond, &copy_thread->mutex);
+		STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+
+		if (!starpu_unistd_work_copy_list_empty(copy_thread->list))
+		{
+			STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
+			struct starpu_unistd_work_copy * work = starpu_unistd_work_copy_list_pop_back(copy_thread->list);
+			STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+
+			copy_file_range(work->fd_src, &work->off_src, work->fd_dst, &work->off_dst, work->len, work->flags);
+
+			starpu_sem_post(&work->finished);
+		
+			/* Don't free work, it's done when tested/waited are completed */
+		}
+
+	}
+
+	return NULL;
+}
+
+static void initialize_working_thread(struct starpu_unistd_copy_thread *copy_thread)
+{
+	STARPU_PTHREAD_MUTEX_INIT(&copy_thread->mutex, NULL);
+	STARPU_PTHREAD_COND_INIT(&copy_thread->cond, NULL);
+	copy_thread->run = 1;
+	copy_thread->list = starpu_unistd_work_copy_list_new();
+	STARPU_PTHREAD_CREATE(&copy_thread->thread, NULL, starpu_unistd_internal_thread, copy_thread);
+}
+
 /* create a new copy of parameter == base */
 void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
 {
@@ -492,9 +598,35 @@ void *starpu_unistd_global_plug(void *parameter, starpu_ssize_t size STARPU_ATTR
 	STARPU_ASSERT(ret == 0);
 #endif
 
+	base->disk_index = starpu_unistd_nb_disk_opened;
+	starpu_unistd_nb_disk_opened++;
+
+	int i;
+	for (i = 0; i < starpu_unistd_nb_disk_opened; i++)
+	{
+		initialize_working_thread(&copy_thread[i][base->disk_index]);
+		/* don't initialize twice this case */
+		if (i != base->disk_index)
+			initialize_working_thread(&copy_thread[base->disk_index][i]);
+	}
+
 	return (void *) base;
 }
 
+static void ending_working_thread(struct starpu_unistd_copy_thread *copy_thread)
+{
+	STARPU_PTHREAD_MUTEX_LOCK(&copy_thread->mutex);
+	copy_thread->run = 0;
+	STARPU_PTHREAD_COND_BROADCAST(&copy_thread->cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread->mutex);
+
+	STARPU_PTHREAD_JOIN(copy_thread->thread, NULL);
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&copy_thread->mutex);
+	STARPU_PTHREAD_COND_DESTROY(&copy_thread->cond);
+	starpu_unistd_work_copy_list_delete(copy_thread->list);
+}
+
 /* free memory allocated for the base */
 void starpu_unistd_global_unplug(void *base)
 {
@@ -505,6 +637,17 @@ void starpu_unistd_global_unplug(void *base)
 #endif
 	if (fileBase->created)
 		rmdir(fileBase->path);
+
+	int i;
+	for (i = 0; i < fileBase->disk_index+1; i++)
+	{
+		ending_working_thread(&copy_thread[i][fileBase->disk_index]);
+		/* don't uninitialize twice this case */
+		if (i != fileBase->disk_index)
+			ending_working_thread(&copy_thread[fileBase->disk_index][i]);
+	}
+	starpu_unistd_nb_disk_opened--;
+
 	free(fileBase->path);
 	free(fileBase);
 }
@@ -594,128 +737,232 @@ int get_unistd_global_bandwidth_between_disk_and_main_ram(unsigned node, void *b
 	return 1;
 }
 
-#if defined(HAVE_LIBAIO_H)
 void starpu_unistd_global_wait_request(void *async_channel)
 {
-	struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
-	struct io_event event;
-
-        int values = -1;
-        int myerrno = EAGAIN;
-        while(!starpu_aiocb->finished || (values <= 0 && (myerrno == EAGAIN || myerrno == EINTR)))
-        {
-                /* Wait the answer of the request timeout IS NULL */
-		values = io_getevents(starpu_aiocb->base->ctx, 1, 1, &event, NULL);
-		if (values < 0)
-			myerrno = -values;
-                if (values > 0)
-                {
-                        //we may catch an other request...
-                        STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
-
-                        struct starpu_unistd_aiocb_link *l = NULL;
-			HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
-			STARPU_ASSERT(l != NULL);
-
-                        HASH_DEL(starpu_aiocb->base->hashtable, l);
-                        STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
-                        ((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
-                        free(l);
-                }
-        }
+	struct starpu_unistd_wait * event = async_channel;
+	switch (event->type)
+	{
+		case STARPU_UNISTD_AIOCB :
+		{
+#if defined(HAVE_LIBAIO_H)
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			struct io_event event;
+
+			int values = -1;
+			int myerrno = EAGAIN;
+			while(!starpu_aiocb->finished || (values <= 0 && (myerrno == EAGAIN || myerrno == EINTR)))
+			{
+				/* Wait the answer of the request timeout IS NULL */
+				values = io_getevents(starpu_aiocb->base->ctx, 1, 1, &event, NULL);
+				if (values < 0)
+					myerrno = -values;
+				if (values > 0)
+				{
+					//we may catch an other request...
+					STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
+
+					struct starpu_unistd_aiocb_link *l = NULL;
+					HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
+					STARPU_ASSERT(l != NULL);
+
+					HASH_DEL(starpu_aiocb->base->hashtable, l);
+					STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
+					((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
+					free(l);
+				}
+			}
+#elif defined(HAVE_AIO_H)
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			const struct aiocb *aiocb = &starpu_aiocb->aiocb;
+			int values = -1;
+			int ret, myerrno = EAGAIN;
+			while(values < 0 && (myerrno == EAGAIN || myerrno == EINTR))
+			{
+				/* Wait the answer of the request TIMESTAMP IS NULL */
+				values = aio_suspend(&aiocb, 1, NULL);
+				myerrno = errno;
+			}
+			ret = aio_error(aiocb);
+			STARPU_ASSERT_MSG(!ret, "aio_error returned %d", ret);
+#endif
+			break;
+		}
+		
+		case STARPU_UNISTD_COPY :
+		{
+			starpu_sem_wait(&event->event.event_copy->finished);	
+			break;
+		}
+
+		default :
+			STARPU_ABORT_MSG();
+			break;
+	}
 }
 
 int starpu_unistd_global_test_request(void *async_channel)
 {
-	struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
-	struct io_event event;
-	struct timespec ts;
-        int ret;
+	struct starpu_unistd_wait * event = async_channel;
+	switch (event->type)
+	{
+		case STARPU_UNISTD_AIOCB :
+		{
+#if defined(HAVE_LIBAIO_H)
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			struct io_event event;
+			struct timespec ts;
+			int ret;
 
-        if (starpu_aiocb->finished)
-                return 1;
+			if (starpu_aiocb->finished)
+				return 1;
 
-	memset(&ts, 0, sizeof(ts));
+			memset(&ts, 0, sizeof(ts));
 
-        /* Test the answer of the request */
-	ret = io_getevents(starpu_aiocb->base->ctx, 0, 1, &event, &ts);
+			/* Test the answer of the request */
+			ret = io_getevents(starpu_aiocb->base->ctx, 0, 1, &event, &ts);
 
-        if (ret == 1)
-	{
-                //we may catch an other request...
-                STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
+			if (ret == 1)
+			{
+				//we may catch an other request...
+				STARPU_PTHREAD_MUTEX_LOCK(&starpu_aiocb->base->mutex);
 
-                struct starpu_unistd_aiocb_link *l = NULL;
-		HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
-		STARPU_ASSERT(l != NULL);
+				struct starpu_unistd_aiocb_link *l = NULL;
+				HASH_FIND_PTR(starpu_aiocb->base->hashtable, &event.obj, l);
+				STARPU_ASSERT(l != NULL);
 
-                HASH_DEL(starpu_aiocb->base->hashtable, l);
-                STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
-                ((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
-                free(l);
+				HASH_DEL(starpu_aiocb->base->hashtable, l);
+				STARPU_PTHREAD_MUTEX_UNLOCK(&starpu_aiocb->base->mutex);
+				((struct starpu_unistd_aiocb *) l->starpu_aiocb)->finished = 1;
+				free(l);
 
-                if (starpu_aiocb->finished)
-                        return 1;
-	}
+				if (starpu_aiocb->finished)
+					return 1;
+			}
 
-        return 0;
+			return 0;
+#elif defined(HAVE_AIO_H)
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			struct aiocb *aiocb = &starpu_aiocb->aiocb;
+			int ret;
+
+#if defined(__GLIBC__) && (__GLIBC__ < 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ < 22))
+			/* glibc's aio_error was not threadsafe before glibc 2.22 */
+			struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
+			ret = aio_suspend(&aiocb, 1, &ts);
+			if (ret < 0 && (errno == EAGAIN || errno == EINTR))
+				return 0;
+			STARPU_ASSERT_MSG(!ret, "aio_suspend returned %d %d\n", ret, errno);
+#endif
+			/* Test the answer of the request */
+			ret = aio_error(aiocb);
+			if (ret == 0)
+				/* request is finished */
+				return 1;
+			if (ret == EINTR || ret == EINPROGRESS || ret == EAGAIN)
+				return 0;
+			/* an error occured */
+			STARPU_ABORT_MSG("aio_error returned %d", ret);
+#endif
+			break;
+		}
+		
+		case STARPU_UNISTD_COPY :
+		{
+			return starpu_sem_trywait(&event->event.event_copy->finished) == 0;	
+			break;
+		}
+
+		default :
+			STARPU_ABORT_MSG();
+			break;
+	}
+	
+	return 0;
 }
 
 void starpu_unistd_global_free_request(void *async_channel)
 {
-        struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
-        struct iocb *iocb = &starpu_aiocb->iocb;
-        if (starpu_aiocb->obj->descriptor < 0)
-                _starpu_unistd_reclose(iocb->aio_fildes);
-        free(starpu_aiocb);
-}
+	struct starpu_unistd_wait * event = async_channel;
+	switch (event->type)
+	{
+		case STARPU_UNISTD_AIOCB :
+		{
+#if defined(HAVE_LIBAIO_H)
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			struct iocb *iocb = &starpu_aiocb->iocb;
+			if (starpu_aiocb->obj->descriptor < 0)
+				_starpu_unistd_reclose(iocb->aio_fildes);
+			free(event);
 #elif defined(HAVE_AIO_H)
-void starpu_unistd_global_wait_request(void *async_channel)
-{
-        const struct aiocb *aiocb = async_channel;
-        int values = -1;
-        int ret, myerrno = EAGAIN;
-        while(values < 0 && (myerrno == EAGAIN || myerrno == EINTR))
-        {
-                /* Wait the answer of the request TIMESTAMP IS NULL */
-                values = aio_suspend(&aiocb, 1, NULL);
-                myerrno = errno;
-        }
-        ret = aio_error(aiocb);
-        STARPU_ASSERT_MSG(!ret, "aio_error returned %d", ret);
-}
-
-int starpu_unistd_global_test_request(void *async_channel)
-{
-        const struct aiocb *aiocb = async_channel;
-        int ret;
-
-#if defined(__GLIBC__) && (__GLIBC__ < 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ < 22))
-        /* glibc's aio_error was not threadsafe before glibc 2.22 */
-        struct timespec ts = { .tv_sec = 0, .tv_nsec = 0 };
-        ret = aio_suspend(&aiocb, 1, &ts);
-        if (ret < 0 && (errno == EAGAIN || errno == EINTR))
-                return 0;
-        STARPU_ASSERT_MSG(!ret, "aio_suspend returned %d %d\n", ret, errno);
+			struct starpu_unistd_aiocb *starpu_aiocb = &event->event.event_aiocb;
+			struct aiocb *aiocb = &starpu_aiocb->aiocb;
+			if (starpu_aiocb->obj->descriptor < 0)
+				_starpu_unistd_reclose(aiocb->aio_fildes);
+			aio_return(aiocb);
+			free(event);
 #endif
-        /* Test the answer of the request */
-        ret = aio_error(aiocb);
-        if (ret == 0)
-                /* request is finished */
-                return 1;
-        if (ret == EINTR || ret == EINPROGRESS || ret == EAGAIN)
-                return 0;
-        /* an error occured */
-        STARPU_ABORT_MSG("aio_error returned %d", ret);
+			break;
+		}
+		
+		case STARPU_UNISTD_COPY :
+		{
+			starpu_sem_destroy(&event->event.event_copy->finished);
+
+			int fd_src = event->event.event_copy->obj_src->descriptor;
+			if (fd_src < 0)
+				_starpu_unistd_reclose(event->event.event_copy->fd_src);
+			int fd_dst = event->event.event_copy->obj_dst->descriptor;
+			if (fd_dst < 0)
+				_starpu_unistd_reclose(event->event.event_copy->fd_dst);
+
+			starpu_unistd_work_copy_delete(event->event.event_copy);
+			free(event);
+			break;
+		}
+
+		default :
+			STARPU_ABORT_MSG();
+			break;
+	}
 }
 
-void starpu_unistd_global_free_request(void *async_channel)
+
+void *  starpu_unistd_global_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size)
 {
-        struct starpu_unistd_aiocb *starpu_aiocb = async_channel;
-        struct aiocb *aiocb = &starpu_aiocb->aiocb;
-        if (starpu_aiocb->obj->descriptor < 0)
-                _starpu_unistd_reclose(aiocb->aio_fildes);
-        aio_return(aiocb);
-        free(starpu_aiocb);
+	struct starpu_unistd_global_obj * unistd_obj_src = obj_src;
+	struct starpu_unistd_global_obj * unistd_obj_dst = obj_dst;
+	struct starpu_unistd_base * unistd_base_src = base_src;
+	struct starpu_unistd_base * unistd_base_dst = base_dst;
+
+	struct starpu_unistd_wait * event;
+	_STARPU_CALLOC(event, 1,sizeof(*event));
+	event->type = STARPU_UNISTD_COPY;
+
+	int fd_src = unistd_obj_src->descriptor;
+	if (fd_src < 0)
+		fd_src = _starpu_unistd_reopen(obj_src);
+	int fd_dst = unistd_obj_dst->descriptor;
+	if (fd_dst < 0)
+		fd_dst = _starpu_unistd_reopen(obj_dst);
+
+	struct starpu_unistd_work_copy * work = starpu_unistd_work_copy_new();
+	work->fd_src = fd_src;
+	work->fd_dst = fd_dst;
+	work->obj_src = unistd_obj_src;
+	work->obj_dst = unistd_obj_dst;
+	work->off_src = offset_src;
+	work->off_dst = offset_dst;
+	work->len = size;
+	/* currently not used by copy_file_range */
+	work->flags = 0;
+	starpu_sem_init(&work->finished, 0, 0);
+
+	event->event.event_copy = work;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].mutex);
+	starpu_unistd_work_copy_list_push_front(copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].list, work);
+        STARPU_PTHREAD_COND_BROADCAST(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].cond);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&copy_thread[unistd_base_src->disk_index][unistd_base_dst->disk_index].mutex);
+
+	return event;
 }
-#endif

+ 1 - 0
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -48,4 +48,5 @@ int starpu_unistd_global_test_request(void * async_channel);
 void starpu_unistd_global_free_request(void * async_channel);
 int starpu_unistd_global_full_read(void *base, void * obj, void ** ptr, size_t * size, unsigned dst_node);
 int starpu_unistd_global_full_write (void * base, void * obj, void * ptr, size_t size);
+void *  starpu_unistd_global_copy(void *base_src, void* obj_src, off_t offset_src,  void *base_dst, void* obj_dst, off_t offset_dst, size_t size);
 #endif

+ 26 - 1
src/datawizard/user_interactions.c

@@ -24,6 +24,18 @@
 #include <core/dependencies/data_concurrency.h>
 #include <core/sched_policy.h>
 
+static void _starpu_data_check_initialized(starpu_data_handle_t handle, enum starpu_data_access_mode mode)
+{
+	if (!mode & STARPU_R)
+		return;
+
+	if (!handle->initialized && handle->init_cl) {
+		int ret = starpu_task_insert(handle->init_cl, STARPU_W, handle, 0);
+		STARPU_ASSERT(ret == 0);
+	}
+	STARPU_ASSERT_MSG(handle->initialized, "handle %p is not initialized while trying to read it\n", handle);
+}
+
 /* Explicitly ask StarPU to allocate room for a piece of data on the specified
  * memory node. */
 int starpu_data_request_allocation(starpu_data_handle_t handle, unsigned node)
@@ -182,6 +194,9 @@ int starpu_data_acquire_on_node_cb_sequential_consistency_sync_jobids(starpu_dat
 	STARPU_ASSERT_MSG(handle->nchildren == 0, "Acquiring a partitioned data (%p) is not possible", handle);
         _STARPU_LOG_IN();
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(struct user_interaction_wrapper));
 
@@ -274,10 +289,11 @@ int starpu_data_acquire_cb_sequential_consistency(starpu_data_handle_t handle,
 
 
 /*
- *	Blockin data request from application
+ *	Blocking data request from application
  */
 
 
+
 static inline void _starpu_data_acquire_continuation(void *arg)
 {
 	struct user_interaction_wrapper *wrapper = (struct user_interaction_wrapper *) arg;
@@ -300,6 +316,9 @@ int starpu_data_acquire_on_node(starpu_data_handle_t handle, int node, enum star
 	/* unless asynchronous, it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	if (node >= 0 && _starpu_data_is_multiformat_handle(handle) &&
 	    _starpu_handle_needs_conversion_task(handle, node))
 	{
@@ -391,6 +410,9 @@ int starpu_data_acquire_on_node_try(starpu_data_handle_t handle, int node, enum
 	/* it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(_starpu_worker_may_perform_blocking_calls(), "Acquiring a data synchronously is not possible from a codelet or from a task callback, use starpu_data_acquire_cb instead.");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	int ret;
 	STARPU_ASSERT_MSG(!_starpu_data_is_multiformat_handle(handle), "not supported yet");
 	STARPU_PTHREAD_MUTEX_LOCK(&handle->sequential_consistency_mutex);
@@ -485,6 +507,9 @@ int _starpu_prefetch_data_on_node_with_mode(starpu_data_handle_t handle, unsigne
 	/* it is forbidden to call this function from a callback or a codelet */
 	STARPU_ASSERT_MSG(async || _starpu_worker_may_perform_blocking_calls(), "Synchronous prefetch is not possible from a task or a callback");
 
+	/* Check that previous tasks have set a value if needed */
+	_starpu_data_check_initialized(handle, mode);
+
 	struct user_interaction_wrapper *wrapper;
 	_STARPU_MALLOC(wrapper, sizeof(*wrapper));
 

+ 1 - 0
tests/Makefile.am

@@ -151,6 +151,7 @@ myPROGRAMS +=					\
 	datawizard/interfaces/copy_interfaces	\
 	datawizard/locality			\
 	datawizard/variable_size		\
+	datawizard/redux_acquire		\
 	errorcheck/starpu_init_noworker		\
 	errorcheck/invalid_tasks		\
 	helper/cublas_init			\

+ 77 - 0
tests/datawizard/redux_acquire.c

@@ -0,0 +1,77 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2017  CNRS
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <starpu.h>
+#include <math.h>
+
+void init_cpu_func(void *descr[], void *cl_arg)
+{
+	long int *dot = (long int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	*dot = 42;
+}
+
+void redux_cpu_func(void *descr[], void *cl_arg)
+{
+	long int *dota = (long int *)STARPU_VARIABLE_GET_PTR(descr[0]);
+	long int *dotb = (long int *)STARPU_VARIABLE_GET_PTR(descr[1]);
+
+	*dota = *dota + *dotb;
+}
+
+static struct starpu_codelet init_codelet =
+{
+	.cpu_funcs = {init_cpu_func},
+	.nbuffers = 1,
+	.modes = {STARPU_W},
+	.name = "init_codelet"
+};
+static struct starpu_codelet redux_codelet =
+{
+	.cpu_funcs = {redux_cpu_func},
+	.modes = {STARPU_RW, STARPU_R},
+	.nbuffers = 2,
+	.name = "redux_codelet"
+};
+
+static void check_dot(void *dot_handle) {
+	long int *x = starpu_data_get_local_ptr(dot_handle);
+	STARPU_ASSERT_MSG(*x == 42, "Incorrect value %ld", *x);
+	starpu_data_release(dot_handle);
+}
+
+int main(int argc, char **argv)
+{
+	starpu_data_handle_t dot_handle;
+
+	int ret = starpu_init(NULL);
+	STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
+
+	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(long int));
+	starpu_data_set_reduction_methods(dot_handle, &redux_codelet, &init_codelet);
+	starpu_data_acquire(dot_handle, STARPU_R);
+	long int *x = starpu_data_get_local_ptr(dot_handle);
+	STARPU_ASSERT_MSG(*x == 42, "Incorrect value %ld", *x);
+	starpu_data_release(dot_handle);
+	starpu_data_unregister(dot_handle);
+
+	starpu_variable_data_register(&dot_handle, -1, (uintptr_t)NULL, sizeof(long int));
+	starpu_data_set_reduction_methods(dot_handle, &redux_codelet, &init_codelet);
+	starpu_data_acquire_cb(dot_handle, STARPU_R, check_dot, dot_handle);
+	starpu_data_unregister(dot_handle);
+
+	starpu_shutdown();
+	return 0;
+}

+ 4 - 4
tests/disk/disk_copy_to_disk.c

@@ -117,10 +117,10 @@ int dotest(struct starpu_disk_ops *ops, char *base)
 	starpu_vector_data_register(&vector_handleA, disk_src, (uintptr_t) data, NX, sizeof(int));
 
 	/* Move and invalidate copy to an other disk */
-	starpu_data_acquire_on_node(vector_handleA, disk_dst, STARPU_W);
+	starpu_data_acquire_on_node(vector_handleA, disk_dst, STARPU_RW);
 	starpu_data_release_on_node(vector_handleA, disk_dst);
 
-	starpu_data_acquire_on_node(vector_handleA, disk_src, STARPU_W);
+	starpu_data_acquire_on_node(vector_handleA, disk_src, STARPU_RW);
 	starpu_data_release_on_node(vector_handleA, disk_src);
 
 	/* free them */
@@ -285,10 +285,10 @@ int dotest_hdf5(struct starpu_disk_ops *ops, char *base)
 	starpu_vector_data_register(&vector_handleA, disk_src, (uintptr_t) data, NX, sizeof(int));
 
 	/* Move and invalidate copy to an other disk */
-	starpu_data_acquire_on_node(vector_handleA, disk_dst, STARPU_W);
+	starpu_data_acquire_on_node(vector_handleA, disk_dst, STARPU_RW);
 	starpu_data_release_on_node(vector_handleA, disk_dst);
 
-	starpu_data_acquire_on_node(vector_handleA, disk_src, STARPU_W);
+	starpu_data_acquire_on_node(vector_handleA, disk_src, STARPU_RW);
 	starpu_data_release_on_node(vector_handleA, disk_src);
 
 	starpu_data_unregister(vector_handleA);