Browse Source

continuing to add leveldb backend functions

Corentin Salingue 12 years ago
parent
commit
9916558626
1 changed files with 35 additions and 44 deletions
  1. 35 44
      src/core/disk_ops/disk_leveldb.cpp

+ 35 - 44
src/core/disk_ops/disk_leveldb.cpp

@@ -37,6 +37,8 @@ struct starpu_leveldb_obj {
 
 struct starpu_leveldb_base {
 	leveldb::DB* db;
+	/* if StarPU creates the leveldb */
+	bool created;
 };
 
 
@@ -48,13 +50,15 @@ starpu_leveldb_alloc (void *base, size_t size)
 	struct starpu_leveldb_obj * obj = (struct starpu_leveldb_obj *) malloc(sizeof(struct starpu_leveldb_obj));
 	STARPU_ASSERT(obj != NULL);
 
+        STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
+
 	char * key = (char *) malloc(256*sizeof(char));
 	strcpy(key, "STARPU");
 	strcat(key,(char *) obj);
 
 	/* create and add a key with a small memory */
 	leveldb::Status s = base_tmp->db->Put(leveldb::WriteOptions(), key, "a");
-	
+
 	obj->key = key;
 	obj->size = size;
 
@@ -71,6 +75,8 @@ starpu_leveldb_free (void *base , void *obj, size_t size STARPU_ATTRIBUTE_UNUSED
 
 	base_tmp->db->Delete(leveldb::WriteOptions(), tmp->key);
 
+	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
+
 	free(tmp->key);
 	free(tmp);
 }
@@ -80,40 +86,15 @@ starpu_leveldb_free (void *base , void *obj, size_t size STARPU_ATTRIBUTE_UNUSED
 static void * 
 starpu_leveldb_open (void *base, void *pos, size_t size)
 {
-	struct starpu_leveldb_obj * obj = malloc(sizeof(struct starpu_leveldb_obj));
+	struct starpu_leveldb_obj * obj = (struct starpu_leveldb_obj *) malloc(sizeof(struct starpu_leveldb_obj));
 	STARPU_ASSERT(obj != NULL);
 
-	/* create template */
-	unsigned int sizeBase = 16;
-	while(sizeBase < (strlen(base)+strlen(pos)+1))
-		sizeBase *= 2;
-	
-	char * baseCpy = malloc(sizeBase*sizeof(char));
-	STARPU_ASSERT(baseCpy != NULL);
-	strcpy(baseCpy,(char *) base);
-	strcat(baseCpy,(char *) pos);
+        STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
 
-	int id = open(baseCpy, O_RDWR);
-	if (id < 0)
-	{
-		free(obj);
-		free(baseCpy);
-		return NULL;
-	}
+	char * key = (char *) malloc((strlen((char *) pos)+1)*sizeof(char));
+	strcpy(key, (char *) pos);
 
-	FILE * f = fdopen(id,"rb+");
-	if (f == NULL)
-	{
-		free(obj);
-		free(baseCpy);
-		return NULL;
-	}
-
-	STARPU_PTHREAD_MUTEX_INIT(&obj->mutex, NULL);
-
-	obj->descriptor = id;
-	obj->file = f;
-	obj->path = baseCpy;
+	obj->key = key;	
 	obj->size = size;
 
 	return (void *) obj;
@@ -129,9 +110,7 @@ starpu_leveldb_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size
 
 	STARPU_PTHREAD_MUTEX_DESTROY(&tmp->mutex);
 
-	fclose(tmp->file);
-	close(tmp->descriptor);
-	free(tmp->path);
+	free(tmp->key);
 	free(tmp);	
 }
 
@@ -141,14 +120,15 @@ static int
 starpu_leveldb_read (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
 {
 	struct starpu_leveldb_obj * tmp = (struct starpu_leveldb_obj *) obj;
-		
+	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;	
+	
 	STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
+	std::string value;
+	leveldb::Status s = base_tmp->db->Get(leveldb::ReadOptions(), tmp->key, &value);
 
-	int res = fseek(tmp->file, offset, SEEK_SET); 
-	STARPU_ASSERT_MSG(res == 0, "Stdio read failed");
-
-	ssize_t nb = fread (buf, 1, size, tmp->file);
-	STARPU_ASSERT_MSG(nb >= 0, "Stdio read failed");
+	uintptr_t value_read = (uintptr_t)(value.c_str());
+	if(s.ok())
+		memcpy(buf, (void *) (value_read+offset), size); 
 
 	STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
 
@@ -261,9 +241,19 @@ starpu_leveldb_plug (void *parameter)
 	leveldb::DB* db;
 	leveldb::Options options;
 	options.create_if_missing = true;
-	leveldb::Status status = leveldb::DB::Open(options, (char *) parameter, &db);
-	STARPU_ASSERT_MSG(status.ok(), "StarPU leveldb plug failed !");
-
+	try {
+		options.error_if_exists = true;
+		leveldb::Status status = leveldb::DB::Open(options, (char *) parameter, &db);
+		STARPU_ASSERT_MSG(status.ok(), "StarPU leveldb plug failed !");
+		tmp->created = true;
+	}
+	catch(...)
+	{
+		options.error_if_exists = false;
+		leveldb::Status status = leveldb::DB::Open(options, (char *) parameter, &db);
+                STARPU_ASSERT_MSG(status.ok(), "StarPU leveldb plug failed !");
+		tmp->created = false;
+	}
 	tmp->db = db;
 
 	return (void *) tmp;	
@@ -275,7 +265,8 @@ static void
 starpu_leveldb_unplug (void *base)
 {
 	struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
-	delete base_tmp->db;
+	if(tmp->created)
+		delete base_tmp->db;
 	free(base);
 }