|
@@ -59,9 +59,11 @@ starpu_leveldb_alloc (void *base, size_t size)
|
|
|
|
|
|
/* create and add a key with a small memory */
|
|
|
leveldb::Status s = base_tmp->db->Put(leveldb::WriteOptions(), key, "a");
|
|
|
+ STARPU_ASSERT(s.ok());
|
|
|
|
|
|
+ /* obj->size is the real size in the disk */
|
|
|
obj->key = key;
|
|
|
- obj->size = size;
|
|
|
+ obj->size = sizeof(char);
|
|
|
|
|
|
return (void *) obj;
|
|
|
}
|
|
@@ -116,7 +118,8 @@ starpu_leveldb_close (void *base STARPU_ATTRIBUTE_UNUSED, void *obj, size_t size
|
|
|
}
|
|
|
|
|
|
|
|
|
-/* read the memory disk */
|
|
|
+/* in the leveldb, we are obliged to read and to write the entire data
|
|
|
+ * so, we have to use buffers to have offset and size options */
|
|
|
static int
|
|
|
starpu_leveldb_read (void *base, void *obj, void *buf, off_t offset, size_t size, void * async_channel STARPU_ATTRIBUTE_UNUSED)
|
|
|
{
|
|
@@ -124,12 +127,17 @@ starpu_leveldb_read (void *base, void *obj, void *buf, off_t offset, size_t size
|
|
|
struct starpu_leveldb_base * base_tmp = (struct starpu_leveldb_base *) base;
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
|
|
|
+
|
|
|
+ /* leveldb need a string to store datas */
|
|
|
std::string value;
|
|
|
leveldb::Status s = base_tmp->db->Get(leveldb::ReadOptions(), tmp->key, &value);
|
|
|
-
|
|
|
uintptr_t value_read = (uintptr_t)(value.c_str());
|
|
|
+
|
|
|
+ /* use buffer */
|
|
|
if(s.ok())
|
|
|
- memcpy(buf, (void *) (value_read+offset), size);
|
|
|
+ memcpy(buf, (void *) (value_read+offset), size);
|
|
|
+ else
|
|
|
+ STARPU_ASSERT(s.ok());
|
|
|
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
|
|
|
|
|
@@ -157,15 +165,28 @@ starpu_leveldb_write (void *base, void *obj, const void *buf, off_t offset, size
|
|
|
STARPU_PTHREAD_MUTEX_LOCK(&tmp->mutex);
|
|
|
|
|
|
uintptr_t buf_tmp = (uintptr_t) buf;
|
|
|
- void * buffer = (void *) malloc(tmp->size);
|
|
|
- starpu_leveldb_read (base, obj, buffer, 0, tmp->size, async_channel);
|
|
|
- memcpy(buffer, (void *) (buf_tmp+offset), size);
|
|
|
+ void * buffer = (void *) malloc((tmp->size > size) ? tmp->size : size);
|
|
|
|
|
|
- leveldb::WriteOptions write_options;
|
|
|
- write_options.sync = true;
|
|
|
+ /* we read the data */
|
|
|
+ std::string value;
|
|
|
+
|
|
|
+ leveldb::Status s = base_tmp->db->Get(leveldb::ReadOptions(), tmp->key, &value);
|
|
|
+ uintptr_t value_read = (uintptr_t)(value.c_str());
|
|
|
+
|
|
|
+ if(s.ok())
|
|
|
+ memcpy(buffer, (void *) value_read, tmp->size);
|
|
|
+ else
|
|
|
+ STARPU_ASSERT(s.ok());
|
|
|
|
|
|
- base_tmp->db->Put(write_options, tmp->key, (char *)buffer);
|
|
|
+ /* put the new data on their new place */
|
|
|
+ memcpy(buffer, (void *) (buf_tmp+offset), size);
|
|
|
+
|
|
|
+ /* and write them */
|
|
|
+ s = base_tmp->db->Put(leveldb::WriteOptions(), tmp->key, (char *)buffer);
|
|
|
+ STARPU_ASSERT(s.ok());
|
|
|
|
|
|
+ /* if the new size is higher than the old, we update it - first write after the alloc */
|
|
|
+ tmp->size = (tmp->size > size) ? tmp->size : size;
|
|
|
free(buffer);
|
|
|
STARPU_PTHREAD_MUTEX_UNLOCK(&tmp->mutex);
|
|
|
|
|
@@ -190,7 +211,9 @@ starpu_leveldb_full_write (unsigned node, void * base, void * obj, void * ptr, s
|
|
|
leveldb::WriteOptions write_options;
|
|
|
write_options.sync = true;
|
|
|
|
|
|
- base_tmp->db->Put(write_options, tmp->key, (char *)ptr);
|
|
|
+ leveldb::Status s = base_tmp->db->Put(write_options, tmp->key, (char *)ptr);
|
|
|
+ STARPU_ASSERT(s.ok());
|
|
|
+ return 0;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -201,22 +224,27 @@ starpu_leveldb_plug (void *parameter)
|
|
|
struct starpu_leveldb_base * tmp = (struct starpu_leveldb_base *) malloc(sizeof(struct starpu_leveldb_base));
|
|
|
STARPU_ASSERT(tmp != NULL);
|
|
|
|
|
|
+ leveldb::Status status;
|
|
|
leveldb::DB* db;
|
|
|
leveldb::Options options;
|
|
|
- try {
|
|
|
- options.error_if_exists = true;
|
|
|
- leveldb::Status status = leveldb::DB::Open(options, (char *) parameter, &db);
|
|
|
- tmp->created = true;
|
|
|
- }
|
|
|
- catch(...)
|
|
|
+ options.create_if_missing = true;
|
|
|
+
|
|
|
+ /* try to create the database */
|
|
|
+ options.error_if_exists = true;
|
|
|
+ status = leveldb::DB::Open(options, (char *) parameter, &db);
|
|
|
+ tmp->created = true;
|
|
|
+
|
|
|
+ /* if it has already been created before */
|
|
|
+ if (!status.ok())
|
|
|
{
|
|
|
options.error_if_exists = false;
|
|
|
- leveldb::Status status = leveldb::DB::Open(options, (char *) parameter, &db);
|
|
|
+ status = leveldb::DB::Open(options, (char *) parameter, &db);
|
|
|
STARPU_ASSERT_MSG(status.ok(), "StarPU leveldb plug failed !");
|
|
|
tmp->created = false;
|
|
|
}
|
|
|
+
|
|
|
tmp->db = db;
|
|
|
-
|
|
|
+ STARPU_ASSERT(status.ok());
|
|
|
return (void *) tmp;
|
|
|
}
|
|
|
|