Browse Source

if HDF5_THREAD_MODE is not activated, serialize all calls to HDF5 on one main thread

Corentin Salingue 8 years ago
parent
commit
2cdc9e3952
1 changed files with 117 additions and 64 deletions
  1. 117 64
      src/core/disk_ops/disk_hdf5.c

+ 117 - 64
src/core/disk_ops/disk_hdf5.c

@@ -33,14 +33,39 @@
 
 #define NITER	_starpu_calibration_minimum
 
+/* TODO: support disk-to-disk copy with HD5Ocopy */
+
+/* ------------------- use HDF5 to write on disk -------------------  */
+
 #ifndef H5_HAVE_THREADSAFE
-/* if thread safe mode not enabled, we can't open more than one file */
 static int nb_disk_open = 0;
+static volatile int init_finished = 0;
+static starpu_pthread_t global_thread;        /* This thread will perform each write/read because we don't have asynchronous functions */
+static volatile int global_run;                        /* Ask to the thread if he can continue */
+static starpu_pthread_mutex_t global_mutex;   /* Mutex is used to protect work_list and if HDF5 library is not safe */
+static starpu_pthread_cond_t global_cond;
+static struct _starpu_hdf5_work_list * global_work_list;        /* This list contains the work for the hdf5 thread */
 #endif
 
-/* TODO: support disk-to-disk copy with HD5Ocopy */
+#ifdef H5_HAVE_THREADSAFE						
+
+#define HDF5_VAR_THREAD fileBase->thread
+#define HDF5_VAR_RUN fileBase->run
+#define HDF5_VAR_MUTEX fileBase->mutex
+#define HDF5_VAR_COND fileBase->cond
+#define HDF5_VAR_WORK_LIST fileBase->work_list
+
+#else									
+
+#define HDF5_VAR_THREAD global_thread
+#define HDF5_VAR_RUN global_run
+#define HDF5_VAR_MUTEX global_mutex
+#define HDF5_VAR_COND global_cond
+#define HDF5_VAR_WORK_LIST global_work_list
+
+#endif									
+
 
-/* ------------------- use HDF5 to write on disk -------------------  */
 
 enum hdf5_work_type { READ, WRITE, FULL_READ, FULL_WRITE };
 
@@ -76,16 +101,14 @@ struct starpu_hdf5_obj
 static inline void _starpu_hdf5_protect_start(void * base STARPU_ATTRIBUTE_UNUSED)
 {
 #ifndef H5_HAVE_THREADSAFE
-        struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
-        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
 #endif
 }
 
 static inline void _starpu_hdf5_protect_stop(void * base STARPU_ATTRIBUTE_UNUSED)
 {
 #ifndef H5_HAVE_THREADSAFE
-        struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
-        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 #endif
 }
 
@@ -198,20 +221,22 @@ static void starpu_hdf5_write_internal(struct _starpu_hdf5_work * work)
 
 static void * _starpu_hdf5_internal_thread(void * arg)
 {
-        struct starpu_hdf5_base * base = (struct starpu_hdf5_base *) arg;
-        while (base->run || !_starpu_hdf5_work_list_empty(base->work_list))
+#ifdef H5_HAVE_THREADSAFE
+        struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) arg;
+#endif
+        while (HDF5_VAR_RUN || !_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST))
         {
-                STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
-                if (_starpu_hdf5_work_list_empty(base->work_list) && base->run)
-                        STARPU_PTHREAD_COND_WAIT(&base->cond, &base->mutex);
-                STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+                STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+                if (_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST) && HDF5_VAR_RUN)
+                        STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
+                STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 
                 /* We are the only consummer here, don't need to protect here */
-                if (!_starpu_hdf5_work_list_empty(base->work_list))
+                if (!_starpu_hdf5_work_list_empty(HDF5_VAR_WORK_LIST))
                 {
-                        STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
-                        struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(base->work_list);
-                        STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+                        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+                        struct _starpu_hdf5_work * work = _starpu_hdf5_work_list_pop_back(HDF5_VAR_WORK_LIST);
+                        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 
                         _starpu_hdf5_protect_start(work->base);
                         switch(work->type)
@@ -244,20 +269,20 @@ static void * _starpu_hdf5_internal_thread(void * arg)
                 }
         }
 
-        STARPU_PTHREAD_MUTEX_LOCK(&base->mutex);
-        STARPU_PTHREAD_COND_BROADCAST(&base->cond);
-        STARPU_PTHREAD_MUTEX_UNLOCK(&base->mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+        STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 
         return NULL;
 }
 
-static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * base)
+static void _starpu_hdf5_create_thread(struct starpu_hdf5_base * fileBase)
 {
-        base->work_list = _starpu_hdf5_work_list_new();
-        base->run = 1;
+        HDF5_VAR_WORK_LIST = _starpu_hdf5_work_list_new();
+        HDF5_VAR_RUN = 1;
 
-        STARPU_PTHREAD_COND_INIT(&base->cond, NULL);
-        STARPU_PTHREAD_CREATE(&base->thread, NULL, _starpu_hdf5_internal_thread, (void *) base); 
+        STARPU_PTHREAD_COND_INIT(&HDF5_VAR_COND, NULL);
+        STARPU_PTHREAD_CREATE(&HDF5_VAR_THREAD, NULL, _starpu_hdf5_internal_thread, (void *) fileBase); 
 }
 
 /* returns the size in BYTES */
@@ -300,11 +325,11 @@ static void starpu_hdf5_send_work(void *base, void *obj, void *buf, off_t offset
         work->size = size;
         work->event = event;
 
-        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
-        _starpu_hdf5_work_list_push_front(fileBase->work_list, work);
+        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
+        _starpu_hdf5_work_list_push_front(HDF5_VAR_WORK_LIST, work);
         /* Wake up internal thread */
-        STARPU_PTHREAD_COND_BROADCAST(&fileBase->cond);
-        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+        STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 }
 
 static struct starpu_hdf5_obj * _starpu_hdf5_data_alloc(struct starpu_hdf5_base * fileBase,  char * name, size_t size)
@@ -371,30 +396,35 @@ static struct starpu_hdf5_obj * _starpu_hdf5_data_open(struct starpu_hdf5_base *
 
 static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIBUTE_UNUSED)
 {
+        struct starpu_hdf5_base * fileBase;
+        _STARPU_MALLOC(fileBase, sizeof(struct starpu_hdf5_base));
+
 #ifndef H5_HAVE_THREADSAFE
-        int nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
-        if (nb_disk != 1)
+	int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, 1);
+	if (actual_nb_disk == 1)
+	{	
+#endif
+		STARPU_PTHREAD_MUTEX_INIT(&HDF5_VAR_MUTEX, NULL);
+#ifndef H5_HAVE_THREADSAFE
+	}
+	else
 	{
-                _STARPU_ERROR("HDF5 library is not compiled with --enable-threadsafe. You can't open more than one HDF5 file at the same time !\n");
-		return NULL;
+		while (!init_finished)
+			;
 	}
 #endif
 
-        struct starpu_hdf5_base * base;
-        _STARPU_MALLOC(base, sizeof(struct starpu_hdf5_base));
-
-	STARPU_PTHREAD_MUTEX_INIT(&base->mutex, NULL);
-        _starpu_hdf5_protect_start(base);
+        _starpu_hdf5_protect_start(fileBase);
 
         struct stat buf;
         if (stat(parameter, &buf) != 0 || !S_ISREG(buf.st_mode))
         {
                 /* The file doesn't exist or the directory exists => create the datafile */
                 int id;
-                base->path = _starpu_mktemp_many(parameter, 0, O_RDWR | O_BINARY, &id);
-                if (!base->path)
+                fileBase->path = _starpu_mktemp_many(parameter, 0, O_RDWR | O_BINARY, &id);
+                if (!fileBase->path)
                 {
-                        free(base);
+                        free(fileBase);
                         _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
 			return NULL;
                 }
@@ -403,14 +433,14 @@ static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIB
                 close(id);
 
                 /* Truncate it */
-                base->fileID = H5Fcreate((char *)base->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
-                if (base->fileID < 0) 
+                fileBase->fileID = H5Fcreate((char *)fileBase->path, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
+                if (fileBase->fileID < 0) 
                 {
-                        free(base); 
+                        free(fileBase); 
                         _STARPU_ERROR("Can not create the HDF5 file (%s)", (char *) parameter);
 			return NULL;
                 }
-                base->created = 1;
+                fileBase->created = 1;
         } 
         else
         {
@@ -419,48 +449,71 @@ static void *starpu_hdf5_plug(void *parameter, starpu_ssize_t size STARPU_ATTRIB
                 _STARPU_MALLOC(path, strlen((char *) parameter)+1);
                 strcpy(path, (char *) parameter);
 
-                base->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
-                if (base->fileID < 0) 
+                fileBase->fileID = H5Fopen((char *)parameter, H5F_ACC_RDWR, H5P_DEFAULT);
+                if (fileBase->fileID < 0) 
                 {
-                        free(base);
+                        free(fileBase);
 			free(path);
                         _STARPU_ERROR("Can not open the HDF5 file (%s)", (char *) parameter);
 			return NULL;
                 }
-                base->created = 0;
-                base->path = path;
+                fileBase->created = 0;
+                fileBase->path = path;
         }
 
-        _starpu_hdf5_create_thread(base);
+#ifndef H5_HAVE_THREADSAFE
+	if (actual_nb_disk == 1)
+	{
+#endif
+		_starpu_hdf5_create_thread(fileBase);
+#ifndef H5_HAVE_THREADSAFE
+		init_finished = 1;
+	}
+#endif
 
-        _starpu_hdf5_protect_stop(base);
+        _starpu_hdf5_protect_stop(fileBase);
 
-        base->next_dataset_id = 0;
+        fileBase->next_dataset_id = 0;
 
-	return (void *) base;
+	return (void *) fileBase;
 }
 
 /* free memory allocated for the base */
 static void starpu_hdf5_unplug(void *base)
 {
 #ifndef H5_HAVE_THREADSAFE
-        STARPU_ATOMIC_ADD(&nb_disk_open, -1);
+        int actual_nb_disk = STARPU_ATOMIC_ADD(&nb_disk_open, -1);
 #endif
 
         struct starpu_hdf5_base * fileBase = (struct starpu_hdf5_base *) base;
         herr_t status;
 
-        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
 
-        fileBase->run = 0;
-        STARPU_PTHREAD_COND_BROADCAST(&fileBase->cond);
-        STARPU_PTHREAD_COND_WAIT(&fileBase->cond, &fileBase->mutex);
-        /* the internal thread is deleted */
+#ifndef H5_HAVE_THREADSAFE
+        if (actual_nb_disk == 0)
+	{
+#endif
+		HDF5_VAR_RUN = 0;
+		STARPU_PTHREAD_COND_BROADCAST(&HDF5_VAR_COND);
+		STARPU_PTHREAD_COND_WAIT(&HDF5_VAR_COND, &HDF5_VAR_MUTEX);
+		/* the internal thread is deleted */
+#ifndef H5_HAVE_THREADSAFE
+	}
+#endif
 
         status = H5Fclose(fileBase->fileID);
 
-        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
-	STARPU_PTHREAD_MUTEX_DESTROY(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
+#ifndef H5_HAVE_THREADSAFE
+        if (actual_nb_disk == 0)
+	{
+#endif
+		STARPU_PTHREAD_MUTEX_DESTROY(&HDF5_VAR_MUTEX);
+#ifndef H5_HAVE_THREADSAFE
+		init_finished = 0;
+	}
+#endif
 
         STARPU_ASSERT_MSG(status >= 0, "Can not unplug this HDF5 disk (%s)\n", fileBase->path);
         if (fileBase->created)
@@ -485,10 +538,10 @@ static void *starpu_hdf5_alloc(void *base, size_t size)
         char name_id[16];
 
         /* Save the name of the dataset */
-        STARPU_PTHREAD_MUTEX_LOCK(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_LOCK(&HDF5_VAR_MUTEX);
         snprintf(name_id, sizeof(name_id), "%u", fileBase->next_dataset_id);
         fileBase->next_dataset_id++;
-        STARPU_PTHREAD_MUTEX_UNLOCK(&fileBase->mutex);
+        STARPU_PTHREAD_MUTEX_UNLOCK(&HDF5_VAR_MUTEX);
 
         /* name in HDF5 is like a path */
         _STARPU_MALLOC(name, 1+strlen(prefix)+strlen(name_id)+1);