Corentin Salingue 12 anos atrás
pai
commit
9671e62244

+ 2 - 2
include/starpu_disk.h

@@ -24,8 +24,8 @@ struct starpu_disk_ops {
 	 void    (*free)   (void *base, void *obj, size_t size);
 	 void *  (*open)   (void *base, void *pos, size_t size);     /* open an existing file */
 	 void    (*close)  (void *base, void *obj, size_t size);
-	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk); 
-	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk); 
+	ssize_t  (*read)   (void *base, void *obj, void *buf, off_t offset, size_t size, void * async); 
+	ssize_t  (*write)  (void *base, void *obj, const void *buf, off_t offset, size_t size, void * async); 
 	/* readv, writev, read2d, write2d, etc. */
 	 void *  (*plug)   (void *parameter);
 	 void    (*unplug) (void *base);

+ 19 - 1
src/core/disk_ops/disk_stdio.c

@@ -37,6 +37,7 @@ struct starpu_stdio_obj {
 	FILE * file;
 	char * path;
 	double size;
+	starpu_pthread_mutex_t mutex;
 };
 
 
@@ -102,6 +103,8 @@ starpu_stdio_alloc (void *base, size_t size)
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->file = f;
 	obj->path = baseCpy;
@@ -117,6 +120,8 @@ starpu_stdio_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size ST
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	unlink(tmp->path);
 	fclose(tmp->file);
 	close(tmp->descriptor);
@@ -159,6 +164,8 @@ starpu_stdio_open (void *base, void *pos, size_t size)
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->file = f;
 	obj->path = baseCpy;
@@ -175,6 +182,8 @@ starpu_stdio_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size S
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	fclose(tmp->file);
 	close(tmp->descriptor);
 	free(tmp->path);
@@ -187,11 +196,16 @@ static ssize_t
 starpu_stdio_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * _starpu_aiocb_disk)
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
-		int res = fseek(tmp->file, offset, SEEK_SET); 
+	
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
+	int res = fseek(tmp->file, offset, SEEK_SET); 
 		STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
 
 		ssize_t nb = fread (buf, 1, size, tmp->file);
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 
@@ -202,11 +216,15 @@ starpu_stdio_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const void *b
 {
 	struct starpu_stdio_obj * tmp = (struct starpu_stdio_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
 	int res = fseek(tmp->file, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res == 0, "Stdio write failed");
 
 	ssize_t nb = fwrite (buf, 1, size, tmp->file);
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 

+ 16 - 0
src/core/disk_ops/unistd/disk_unistd_global.c

@@ -83,6 +83,8 @@ starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, s
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->path = baseCpy;
 	obj->size = size;
@@ -97,6 +99,8 @@ starpu_unistd_global_free (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	unlink(tmp->path);
 	close(tmp->descriptor);
 
@@ -127,6 +131,8 @@ starpu_unistd_global_open (struct starpu_unistd_global_obj * obj, void *base, vo
 		return NULL;
 	}
 
+	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	obj->descriptor = id;
 	obj->path = baseCpy;
 	obj->size = size;
@@ -142,6 +148,8 @@ starpu_unistd_global_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	close(tmp->descriptor);
 	free(tmp->path);
 	free(tmp);	
@@ -154,12 +162,16 @@ starpu_unistd_global_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
 	int res = lseek(tmp->descriptor, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd read failed");
 
 	ssize_t nb = read(tmp->descriptor, buf, size);
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd read failed");
 	
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 
@@ -170,12 +182,16 @@ starpu_unistd_global_write (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, const
 {
 	struct starpu_unistd_global_obj * tmp = (struct starpu_unistd_global_obj *) obj;
 
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
 	int res = lseek(tmp->descriptor, offset, SEEK_SET); 
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd write failed");
 
 	ssize_t nb = write (tmp->descriptor, buf, size);
 	STARPU_ASSERT_MSG(res >= 0, "Starpu Disk unistd write failed");
 
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
 	return nb;
 }
 

+ 1 - 0
src/core/disk_ops/unistd/disk_unistd_global.h

@@ -22,6 +22,7 @@ struct starpu_unistd_global_obj {
         char * path;
         double size;
 	int flags;
+	starpu_pthread_mutex_t mutex;
 };
 
  void * starpu_unistd_global_alloc (struct starpu_unistd_global_obj * obj, void *base, size_t size STARPU_ATTRIBUTE_UNUSED);