Просмотр исходного кода

merge last bits of out-of-core branch, adding leveldb backend

Samuel Thibault лет назад: 12
Родитель
Сommit
7c0e344d58

+ 7 - 0
configure.ac

@@ -1578,6 +1578,13 @@ AC_MSG_RESULT($maximplementations)
 AC_DEFINE_UNQUOTED(STARPU_MAXIMPLEMENTATIONS, [$maximplementations],
 		[maximum number of implementations])
 
+AC_LANG_PUSH([C++])
+AC_CHECK_HEADERS([leveldb/db.h], [AC_DEFINE([STARPU_HAVE_LEVELDB], [1], [Define to 1 if you have the <leveldb/db.h> header file.])])
+STARPU_HAVE_LIBRARY(LEVELDB, [leveldb])
+AM_CONDITIONAL(STARPU_HAVE_LEVELDB, test "x$ac_cv_lib_leveldb_main" = "xyes")
+AC_LANG_POP([C++])
+
+
 ###############################################################################
 #                                                                             #
 #                                    MPI                                      #

+ 18 - 3
doc/doxygen/chapters/api/data_out_of_core.doxy

@@ -11,17 +11,25 @@
 \ingroup API_Out_Of_Core
 This is a set of functions to manipulate datas on disk.
 \var starpu_disk_ops::alloc
+Create a new location for datas
 \var starpu_disk_ops::free
+Free an allocated data
 \var starpu_disk_ops::open
-open an existing file
+Open an existing location of datas
 \var starpu_disk_ops::close
+Close without delete a location of datas
 \var starpu_disk_ops::read
-~= pread
+Read a data
 \var starpu_disk_ops::write
+Write a data
 \var starpu_disk_ops::plug
+Connect a disk memory
 \var starpu_disk_ops::unplug
+Disconnect a disk memory
 \var starpu_disk_ops::copy
+Copy disk to disk
 \var starpu_disk_ops::bandwidth
+Measue the bandwidth and the latency for the disk
 
 \fn int starpu_disk_register(struct starpu_disk_ops *func, void *parameter, size_t size)
 \ingroup API_Out_Of_Core
@@ -43,6 +51,7 @@ Close an existing file memory opened with starpu_disk_open.
 \ingroup API_Out_Of_Core
 This set uses the stdio library (fwrite, fread...) to read/write on disk. <br />
 <strong>Warning: It creates one file per allocation !</strong>  <br />
+It doesn't support asynchronous transfers.
 
 \var starpu_disk_unistd_ops
 \ingroup API_Out_Of_Core
@@ -53,6 +62,12 @@ This set uses the unistd library (write, read...) to read/write on disk. <br />
 \ingroup API_Out_Of_Core
 This set uses the unistd library (write, read...) to read/write on disk with the O_DIRECT flag. <br />
 <strong>Warning: It creates one file per allocation !</strong>  <br />
-Only available on Linux.
+Only available on Linux systems.
+
+\var starpu_disk_leveldb_ops
+\ingroup API_Out_Of_Core
+This set uses the leveldb created by Google <br />
+Show here: https://code.google.com/p/leveldb/ <br />
+It doesn't support asynchronous transfers.
 
 */

+ 1 - 0
include/starpu_disk.h

@@ -43,6 +43,7 @@ struct starpu_disk_ops {
 extern struct starpu_disk_ops starpu_disk_stdio_ops;
 extern struct starpu_disk_ops starpu_disk_unistd_ops;
 extern struct starpu_disk_ops starpu_disk_unistd_o_direct_ops;
+extern struct starpu_disk_ops starpu_disk_leveldb_ops;
 
 void starpu_disk_close(unsigned node, void *obj, size_t size);
 

+ 6 - 2
src/Makefile.am

@@ -52,7 +52,7 @@ lib_LTLIBRARIES = libstarpu-@STARPU_EFFECTIVE_VERSION@.la
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_CPPFLAGS = -I$(top_srcdir)/include/ $(STARPU_RCCE_CPPFLAGS) -DBUILDING_STARPU
 
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_CFLAGS = $(GLOBAL_AM_CFLAGS) $(HWLOC_CFLAGS) $(STARPU_CUDA_CPPFLAGS) $(STARPU_OPENCL_CPPFLAGS) $(STARPU_COI_CPPFLAGS) $(STARPU_SCIF_CPPFLAGS) $(STARPU_RCCE_CFLAGS) $(FXT_CFLAGS)
-libstarpu_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = -lm $(HWLOC_LIBS) $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(STARPU_COI_LDFLAGS) $(STARPU_SCIF_LDFLAGS) $(STARPU_RCCE_LDFLAGS) $(FXT_LIBS) $(STARPU_GLPK_LDFLAGS)
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_LIBADD = -lm $(HWLOC_LIBS) $(STARPU_OPENCL_LDFLAGS) $(STARPU_CUDA_LDFLAGS) $(STARPU_COI_LDFLAGS) $(STARPU_SCIF_LDFLAGS) $(STARPU_RCCE_LDFLAGS) $(FXT_LIBS) $(STARPU_GLPK_LDFLAGS) $(STARPU_LEVELDB_LDFLAGS)
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_LDFLAGS = $(ldflags) $(FXT_LDFLAGS) -no-undefined									\
   -version-info $(libstarpu_so_version)
 
@@ -240,6 +240,11 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES = 						\
 	top/starpu_top_connection.c                          	\
 	worker_collection/worker_list.c
 
+
+if STARPU_HAVE_LEVELDB
+libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += core/disk_ops/disk_leveldb.cpp
+endif
+
 if STARPU_USE_CPU
 libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += drivers/cpu/driver_cpu.c
 endif
@@ -275,7 +280,6 @@ libstarpu_@STARPU_EFFECTIVE_VERSION@_la_SOURCES += core/disk_ops/disk_unistd_o_d
 endif
 
 
-
 #########################################
 #										#
 #        Generic MP compilation			#

+ 9 - 0
src/core/disk.h

@@ -23,6 +23,11 @@
 #define STARPU_DISK_ALL 1
 #define STARPU_DISK_NO_RECLAIM 2
 
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
 #include <datawizard/copy_driver.h>
 
 /* interface to manipulate memory disk */
@@ -56,4 +61,8 @@ int _starpu_get_disk_flag(unsigned node);
 
 void _starpu_disk_unregister(void);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* __DISK_H__ */

+ 354 - 0
src/core/disk_ops/disk_leveldb.cpp

@@ -0,0 +1,354 @@
+/* StarPU --- Runtime system for heterogeneous multicore architectures.
+ *
+ * Copyright (C) 2013 Corentin Salingue
+ *
+ * StarPU is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ *
+ * StarPU is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU Lesser General Public License in COPYING.LGPL for more details.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <leveldb/db.h>
+#include <leveldb/options.h>
+
+#include <starpu.h>
+#include <core/disk.h>
+#include <core/perfmodel/perfmodel.h>
+#include <datawizard/copy_driver.h>
+#include <datawizard/memory_manager.h>
+
+#define NITER	64
+
+/* ------------------- use leveldb to write on disk -------------------  */
+
+struct starpu_leveldb_obj {
+	char * key;
+	double size;
+	starpu_pthread_mutex_t mutex;
+};
+
+struct starpu_leveldb_base {
+	leveldb::DB* db;
+	/* if StarPU creates the leveldb */
+	bool created;
+};
+
+
+/* allocation memory on disk */
+static void * 
+starpu_leveldb_alloc (void *base, size_t size)
+{
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+	struct starpu_leveldb_obj * obj = (struct starpu_leveldb_obj *) malloc(sizeof(struct starpu_leveldb_obj));
+	STARPU_ASSERT(obj != NULL);
+
+        STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
+	char * key = (char *) malloc(256*sizeof(char));
+	strcpy(key, "STARPU");
+	strcat(key,(char *) obj);
+
+	/* create and add a key with a small memory */
+	leveldb::Status s = base_tmp->db->Put(leveldb::WriteOptions(), key, "a");
+	STARPU_ASSERT(s.ok());
+
+	/* obj->size is the real size in the disk */
+	obj->key = key;
+	obj->size = sizeof(char);
+
+	return (void *) obj;
+}
+
+
+/* free memory on disk */
+static void
+starpu_leveldb_free (void *base , void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
+{
+	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+
+	base_tmp->db->Delete(leveldb::WriteOptions(), tmp->key);
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
+	free(tmp->key);
+	free(tmp);
+}
+
+
+/* open an existing memory on disk */
+static void * 
+starpu_leveldb_open (void *base, void *pos, size_t size)
+{
+	struct starpu_leveldb_obj * obj = (struct starpu_leveldb_obj *) malloc(sizeof(struct starpu_leveldb_obj));
+	STARPU_ASSERT(obj != NULL);
+
+        STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
+	char * key = (char *) malloc((strlen((char *) pos)+1)*sizeof(char));
+	strcpy(key, (char *) pos);
+
+	obj->key = key;	
+	obj->size = size;
+
+	return (void *) obj;
+	
+}
+
+
+/* free memory without delete it */
+static void 
+starpu_leveldb_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size STARPU_ATTRIBUTE_UNUSED)
+{
+	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
+	free(tmp->key);
+	free(tmp);	
+}
+
+
+/* in the leveldb, we are obliged to read and to write the entire data 
+ * so, we have to use buffers to have offset and size options */
+static int 
+starpu_leveldb_read (void *base, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
+{
+	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;	
+	
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
+	/* leveldb need a string to store datas */
+	std::string value;
+	leveldb::Status s = base_tmp->db->Get(leveldb::ReadOptions(), tmp->key, &value);
+	uintptr_t value_read = (uintptr_t)(value.c_str());
+
+	/* use buffer */
+	if(s.ok())
+		memcpy(buf, (void *) (value_read+offset), size);
+	else
+		STARPU_ASSERT(s.ok());
+
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
+	return 0;
+}
+
+static int
+starpu_leveldb_full_read(unsigned node, void *base, void * obj, void ** ptr, size_t * size)
+{
+        struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+        struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+
+	*size = tmp->size;
+	*ptr = (size_t *)malloc(*size);
+	return _starpu_disk_read(node, STARPU_MAIN_RAM, obj, *ptr, 0, *size, NULL);
+}
+
+/* write on the memory disk */
+static int 
+starpu_leveldb_write (void *base, void *obj, const void *buf, off_t offset, size_t size, void * async_channel)
+{
+        struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+        struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+
+	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+
+	uintptr_t buf_tmp = (uintptr_t) buf;
+	void * buffer = (void *) malloc((tmp->size > size) ? tmp->size : size);
+
+	/* we read the data */
+        std::string value;
+
+        leveldb::Status s = base_tmp->db->Get(leveldb::ReadOptions(), tmp->key, &value);
+        uintptr_t value_read = (uintptr_t)(value.c_str());
+
+        if(s.ok())
+                memcpy(buffer, (void *) value_read, tmp->size);
+        else
+                STARPU_ASSERT(s.ok());
+
+	/* put the new data on their new place */
+	memcpy(buffer, (void *) (buf_tmp+offset), size); 
+
+	/* and write them */
+	s = base_tmp->db->Put(leveldb::WriteOptions(), tmp->key, (char *)buffer);
+	STARPU_ASSERT(s.ok());	
+
+	/* if the new size is higher than the old, we update it - first write after the alloc */
+	tmp->size = (tmp->size > size) ? tmp->size : size;
+	free(buffer);
+	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
+
+	return 0;
+}
+
+static int
+starpu_leveldb_full_write (unsigned node, void * base, void * obj, void * ptr, size_t size)
+{
+	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+	
+	/* update file size to realise the next good full_read */
+	if(size != tmp->size)
+	{
+		_starpu_memory_manager_deallocate_size(tmp->size, node);
+		if (_starpu_memory_manager_can_allocate_size(size, node))
+			tmp->size = size;
+		else
+			STARPU_ASSERT_MSG(0, "Can't allocate size %u on the disk !", (int) size); 
+	}	
+	leveldb::WriteOptions write_options;
+	write_options.sync = true;
+
+        leveldb::Status s = base_tmp->db->Put(write_options, tmp->key, (char *)ptr);
+	STARPU_ASSERT(s.ok());
+	return 0;
+}
+
+
+/* create a new copy of parameter == base */
+static void * 
+starpu_leveldb_plug (void *parameter)
+{
+	struct starpu_leveldb_base * tmp = (struct starpu_leveldb_base *) malloc(sizeof(struct starpu_leveldb_base));
+	STARPU_ASSERT(tmp != NULL);
+
+	leveldb::Status status;
+	leveldb::DB* db;
+	leveldb::Options options;
+	options.create_if_missing = true;
+	
+	/* try to create the database */
+	options.error_if_exists = true;
+	status = leveldb::DB::Open(options, (char *) parameter, &db);
+	tmp->created = true;
+	
+	/* if it has already been created  before */
+	if (!status.ok())
+	{
+		options.error_if_exists = false;
+		status = leveldb::DB::Open(options, (char *) parameter, &db);
+                STARPU_ASSERT_MSG(status.ok(), "StarPU leveldb plug failed !");
+		tmp->created = false;
+	}
+
+	tmp->db = db;
+	STARPU_ASSERT(status.ok());
+	return (void *) tmp;	
+}
+
+
+/* free memory allocated for the base */
+static void
+starpu_leveldb_unplug (void *base)
+{
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
+	if(base_tmp->created)
+		delete base_tmp->db;
+	free(base);
+}
+
+
+static int
+get_leveldb_bandwidth_between_disk_and_main_ram(unsigned node)
+{
+
+	unsigned iter;
+	double timing_slowness, timing_latency;
+	struct timeval start;
+	struct timeval end;
+	
+	srand (time (NULL)); 
+	char * buf = (char *) malloc(SIZE_DISK_MIN*sizeof(char));
+	STARPU_ASSERT(buf != NULL);
+	
+	/* allocate memory */
+	void * mem = _starpu_disk_alloc(node, SIZE_DISK_MIN);
+	/* fail to alloc */
+	if (mem == NULL)
+		return 0;
+	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) mem;
+
+	/* Measure upload slowness */
+	gettimeofday(&start, NULL);
+	for (iter = 0; iter < NITER; ++iter)
+	{
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, 0, SIZE_DISK_MIN, NULL);
+	}
+	gettimeofday(&end, NULL);
+	timing_slowness = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+
+	/* free memory */
+	free(buf);
+
+	buf = (char *) malloc(sizeof(char));
+	STARPU_ASSERT(buf != NULL);
+
+	/* Measure latency */
+	gettimeofday(&start, NULL);
+	for (iter = 0; iter < NITER; ++iter)
+	{
+		_starpu_disk_write(STARPU_MAIN_RAM, node, mem, buf, rand() % (SIZE_DISK_MIN -1) , 1, NULL);
+	}
+	gettimeofday(&end, NULL);
+	timing_latency = (double)((end.tv_sec - start.tv_sec)*1000000 + (end.tv_usec - start.tv_usec));
+
+	_starpu_disk_free(node, mem, SIZE_DISK_MIN);
+	free(buf);
+
+	_starpu_save_bandwidth_and_latency_disk((NITER/timing_slowness)*1000000, (NITER/timing_slowness)*1000000,
+					       timing_latency/NITER, timing_latency/NITER, node);
+	return 1;
+}
+
+#if __cplusplus >= 201103L
+struct starpu_disk_ops starpu_disk_leveldb_ops = {
+	.alloc = starpu_leveldb_alloc,
+	.free = starpu_leveldb_free,
+	.open = starpu_leveldb_open,
+	.close = starpu_leveldb_close,
+	.read = starpu_leveldb_read,
+	.write = starpu_leveldb_write,
+	.async_write = NULL,
+	.async_read = NULL,
+	.plug = starpu_leveldb_plug,
+	.unplug = starpu_leveldb_unplug,
+	.copy = NULL,
+	.bandwidth = get_leveldb_bandwidth_between_disk_and_main_ram,
+	.wait_request = NULL,
+	.test_request = NULL,
+	.full_read = starpu_leveldb_full_read,
+	.full_write = starpu_leveldb_full_write
+};
+#else
+struct starpu_disk_ops starpu_disk_leveldb_ops = {
+	starpu_leveldb_alloc,
+	starpu_leveldb_free,
+	starpu_leveldb_open,
+	starpu_leveldb_close,
+	starpu_leveldb_read,
+	starpu_leveldb_write,
+	NULL,
+	NULL,
+	starpu_leveldb_plug,
+	starpu_leveldb_unplug,
+	NULL,
+	get_leveldb_bandwidth_between_disk_and_main_ram,
+	NULL,
+	NULL,
+	starpu_leveldb_full_read,
+	starpu_leveldb_full_write
+};
+#endif

+ 10 - 0
src/core/perfmodel/perfmodel.h

@@ -24,6 +24,11 @@
 #include <core/task_bundle.h>
 #include <stdio.h>
 
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
 struct _starpu_perfmodel_list
 {
 	struct _starpu_perfmodel_list *next;
@@ -72,4 +77,9 @@ int *_starpu_get_opencl_affinity_vector(unsigned gpuid);
 
 void _starpu_save_bandwidth_and_latency_disk(double bandwidth_write, double bandwidth_read, 
 					    double latency_write, double latency_read, unsigned node);
+
+#ifdef __cplusplus
+}
+#endif
+
 #endif // __PERFMODEL_H__

+ 10 - 0
src/datawizard/copy_driver.h

@@ -37,6 +37,11 @@
 #include <starpu_opencl.h>
 #endif
 
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
 struct _starpu_data_request;
 struct _starpu_data_replicate;
 
@@ -101,4 +106,9 @@ int _starpu_driver_copy_data_1_to_1(starpu_data_handle_t handle,
 
 unsigned _starpu_driver_test_request_completion(struct _starpu_async_channel *async_channel);
 void _starpu_driver_wait_request_completion(struct _starpu_async_channel *async_channel);
+
+#ifdef __cplusplus
+}
+#endif
+
 #endif // __COPY_DRIVER_H__

+ 9 - 0
src/datawizard/memory_manager.h

@@ -19,6 +19,11 @@
 
 #include <starpu.h>
 
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
 /**
  * Initialises the memory manager
  */
@@ -56,4 +61,8 @@ void _starpu_memory_manager_deallocate_size(size_t size, unsigned node);
 
 int _starpu_memory_manager_test_allocate_size_(size_t size, unsigned node);
 
+#ifdef __cplusplus
+}
+#endif
+
 #endif /* __MEMORY_MANAGER_H__ */