Reviewed-on: 10225501448/leveldb_proj2#1main
@ -0,0 +1,72 @@ | |||
# 3DB | |||
分别为kvDB,indexDB,metaDB。其中,前两个和之前存储内容相同,metaDB用来维护与并发控制相关的内容 | |||
为了达成一致性,这里需要要有一个全局唯一的seq或者timestamp。这个信息编码在userkey中,比较器要进行对应修改,metaDB要按照先后顺序排列,另外两个不用考虑timestamp | |||
# 具体操作 | |||
这里将操作类型进行分类:普通put和get,索引put和get,创建(删除)索引。 | |||
接下来将根据这个分类进行阐述 | |||
## 普通put和get | |||
普通get直接调用kvDB的get | |||
普通put要判断当前是否进行索引put和创建(删除)索引操作。如果有,则需要进入taskqueue等待。 | |||
## 索引put | |||
如果当前字段中没有需要创建索引的,那么就是普通put | |||
如果当前没有正在创建(修改)的索引或者之前的对于同一个key的索引put,则首先向metaDB写入标记`(key,creating/deleting)`,表示事务的开始。然后构建请求,并向kvDB和indexDB分别写入。所有的请求的时间戳都和metaDB中标记的时间戳相同。全部完成之后,将之前在metaDB中的标记delete,表示事务的结束。 | |||
如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。 | |||
我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性 | |||
## 创建(删除)索引 | |||
在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。 | |||
之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。 | |||
我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性 | |||
## 索引get | |||
如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。 | |||
# 一致性,崩溃恢复(recovery) | |||
## 创建(删除)索引 | |||
如果meta中存在记录`(field,creating/deleting)`,则表示在创建(删除)索引的过程中的某个时间节点崩溃了。 | |||
如果在metaDB写入标记后崩溃,indexDB写入前崩溃。由于这个操作只涉及indexDB,且与当前创建(删除)索引相关的写入请求都被阻塞了,所以只要再扫描一遍全部的kvDB构造index写入请求就可以了。 | |||
如果是在indexDB写入完成之后崩溃,这实际上已经完成了创建(删除)索引操作,所以把metaDB中的标记清除即可。 | |||
这里最主要的问题在于如何判断崩溃时间点。对于写入标记前后通过metaDB中的记录判断。对于是否对indexDB完成写入通过能否在indexDB中找到对应的索引判断,因为索引是一个writebatch整体写入的,有原子性。 | |||
## 索引put | |||
索引put涉及到kvDB和indexDB写入的一致性,这一点通过时间戳机制来保证。 | |||
如果metaDB中有记录`(key,creating/deleting)`,那么表明索引put过程中的某个时间节点崩溃了。由于kvDB和indexDB的写入是并发进行的,所以可能会出现四种情况: | |||
1. kvDB和indexDB写入均未完成 | |||
2. kvDB写入完成而indexDB未完成。 | |||
3. kvDB写入未完成而indexDB写入完成 | |||
4. 两者写入都完成,但是没有清除记录 | |||
kvDB的写入情况的判断如下:通过metaDB中的记录的key,查询kvDB。如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting,如果存在记录,则表明写入未完成。 | |||
indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting操作,如果存在对应的二级索引,则表明写入未完成。如果在kvDB中能够得到相应的kv,可以通过kvDB中的kv查询二级索引。 | |||
分别讨论creating和deleting操作下的四种情况的崩溃恢复的过程: | |||
1. 如果是creating,则清除metaDB中的记录;如果是deleting,则继续delete | |||
2. 如果是creating,则根据kvDB的写入构造请求写入indexDB;如果是deleting,则遍历indexDB删除对应的索引 | |||
3. 如果是creating,且kvDB中有旧值,则将indexDB中所有相关的字段清除后根据旧值创建索引;如果是deleting,删除kvDB中的kv对 | |||
4. 直接清除记录 | |||
当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。 | |||
# 全写入方案 | |||
不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB | |||
# 整体架构 | |||
采用多线程架构 | |||
由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入? | |||
如果这么看的话,其实创建(删除)索引的操作也不需要 | |||
@ -1,2 +1,4 @@ | |||
# 实验报告 | |||
仓库地址 https://gitea.shuishan.net.cn/10225501448/leveldb_proj2 | |||
新建文件时 cmakelist 120行下面记得加进去 |
@ -0,0 +1,37 @@ | |||
#ifndef ENCODE_INDEX_H | |||
#define ENCODE_INDEX_H | |||
#include "leveldb/slice.h" | |||
#include "util/coding.h" | |||
namespace fielddb{ | |||
using namespace leveldb; | |||
struct ParsedInternalIndexKey { //key : {name : val} | |||
Slice user_key_; | |||
Slice name_; | |||
Slice val_; | |||
ParsedInternalIndexKey() {} // Intentionally left uninitialized (for speed) | |||
ParsedInternalIndexKey(const Slice& user_key, const Slice& name, const Slice& val) | |||
: user_key_(user_key), name_(name), val_(val) {} | |||
}; | |||
bool ParseInternalIndexKey(Slice input, ParsedInternalIndexKey* result); | |||
void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key); | |||
inline bool ParseInternalIndexKey(Slice input, ParsedInternalIndexKey* result){ | |||
return GetLengthPrefixedSlice(&input, &result->name_) && | |||
GetLengthPrefixedSlice(&input, &result->val_) && | |||
GetLengthPrefixedSlice(&input, &result->user_key_); | |||
} | |||
inline void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key){ | |||
PutLengthPrefixedSlice(result, key.name_); | |||
PutLengthPrefixedSlice(result, key.val_); | |||
PutLengthPrefixedSlice(result, key.user_key_); | |||
} | |||
} | |||
#endif |
@ -0,0 +1,194 @@ | |||
#include "fielddb/field_db.h" | |||
#include <cstdint> | |||
#include <string> | |||
#include <vector> | |||
#include "leveldb/db.h" | |||
#include "leveldb/env.h" | |||
#include "leveldb/options.h" | |||
#include "leveldb/status.h" | |||
#include "db/write_batch_internal.h" | |||
#include "util/serialize_value.h" | |||
#include "fielddb/encode_index.h" | |||
namespace fielddb { | |||
using namespace leveldb; | |||
//TODO:打开fieldDB | |||
Status FieldDB::OpenFieldDB(const Options& options, | |||
const std::string& name, FieldDB** dbptr) { | |||
// options.env->CreateDir("./abc") | |||
if(*dbptr == nullptr){ | |||
return Status::NotSupported(name, "new a fieldDb first\n"); | |||
} | |||
// | |||
Status status; | |||
DB *indexdb, *kvdb, *metadb; | |||
status = Open(options, name+"_indexDB", &indexdb); | |||
if(!status.ok()) return status; | |||
status = Open(options, name+"_kvDB", &kvdb); | |||
if(!status.ok()) return status; | |||
status = Open(options, name+"_metaDB", &metadb); | |||
if(!status.ok()) return status; | |||
(*dbptr)->indexDB_ = indexdb; | |||
(*dbptr)->kvDB_ = kvdb; | |||
(*dbptr)->metaDB_ = metadb; | |||
(*dbptr)->dbname_ = name; | |||
status = (*dbptr)->Recover(); | |||
return status; | |||
} | |||
// todo | |||
Status FieldDB::Recover() { | |||
// | |||
return Status::OK(); | |||
} | |||
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { | |||
return kvDB_->Put(options, key, value); | |||
} | |||
// TODO:需要对是否进行index更新做处理 | |||
Status FieldDB::PutFields(const WriteOptions &Options, | |||
const Slice &key, const FieldArray &fields) { | |||
// | |||
return kvDB_->PutFields(Options, key, fields); | |||
} | |||
// todo: 删除有索引的key时indexdb也要同步 | |||
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { | |||
// | |||
return kvDB_->Delete(options, key); | |||
} | |||
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 | |||
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { | |||
return Status::OK(); | |||
} | |||
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { | |||
return kvDB_->Get(options, key, value); | |||
} | |||
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { | |||
return kvDB_->GetFields(options, key, fields); | |||
} | |||
std::vector<std::string> FieldDB::FindKeysByField(Field &field) { | |||
return kvDB_->FindKeysByField(field); | |||
} | |||
std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName ( | |||
const std::string &fieldName){ | |||
std::vector<std::pair<std::string, std::string>> result; | |||
auto iter = kvDB_->NewIterator(ReadOptions()); | |||
std::string val; | |||
for(iter->SeekToFirst();iter->Valid();iter->Next()) { | |||
InternalFieldArray fields(iter->value()); | |||
val = fields.ValOfName(fieldName); | |||
if(!val.empty()) { | |||
result.push_back(std::make_pair(iter->key().ToString(), val)); | |||
} | |||
} | |||
return result; | |||
} | |||
Status FieldDB::CreateIndexOnField(const std::string& field_name) { | |||
//taskQueue相关 | |||
//写锁 是不是只需要给putfields设置一把锁就行 | |||
std::vector<std::pair<std::string, std::string>> keysAndVal = | |||
FindKeysAndValByFieldName(field_name); | |||
WriteBatch writeBatch; | |||
Slice value = Slice(); | |||
for (auto &kvPair : keysAndVal){ | |||
std::string indexKey; | |||
AppendIndexKey(&indexKey, | |||
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); | |||
writeBatch.Put(indexKey, value); | |||
} | |||
Status s = indexDB_->Write(WriteOptions(), &writeBatch); | |||
if (!s.ok()) return s; | |||
index_[field_name] = Exist; | |||
//唤醒taskqueue | |||
} | |||
Status FieldDB::DeleteIndex(const std::string &field_name) { | |||
//taskQueue相关 | |||
//写锁 | |||
std::vector<std::pair<std::string, std::string>> keysAndVal = | |||
FindKeysAndValByFieldName(field_name); | |||
WriteBatch writeBatch; | |||
for (auto &kvPair : keysAndVal){ | |||
std::string indexKey; | |||
AppendIndexKey(&indexKey, | |||
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); | |||
writeBatch.Delete(indexKey); | |||
} | |||
Status s = indexDB_->Write(WriteOptions(), &writeBatch); | |||
if (!s.ok()) return s; | |||
index_.erase(field_name); | |||
//唤醒taskqueue | |||
} | |||
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) { | |||
if (index_.count(field.first) == 0 || index_[field.first] != Exist){ | |||
*s = Status::NotFound(Slice()); | |||
return std::vector<std::string>(); | |||
} | |||
std::string indexKey; | |||
AppendIndexKey(&indexKey, | |||
ParsedInternalIndexKey(Slice(), field.first, field.second)); | |||
Iterator *indexIterator = indexDB_->NewIterator(ReadOptions()); | |||
indexIterator->Seek(indexKey); | |||
std::vector<std::string> result; | |||
for (; indexIterator->Valid(); indexIterator->Next()) { | |||
ParsedInternalIndexKey iterKey; | |||
if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){ | |||
if (iterKey.name_ == field.first && iterKey.val_ == field.second){ | |||
result.push_back(iterKey.user_key_.ToString()); | |||
continue; //查到说明在范围里,否则break | |||
} | |||
} | |||
break; | |||
} | |||
*s = Status::OK(); | |||
return result; | |||
} | |||
Iterator * FieldDB::NewIterator(const ReadOptions &options) { | |||
return kvDB_->NewIterator(options); | |||
} | |||
// TODO:使用统一seq进行snapshot管理 | |||
const Snapshot * FieldDB::GetSnapshot() { | |||
return kvDB_->GetSnapshot(); | |||
} | |||
// TODO:同上 | |||
void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) { | |||
kvDB_->ReleaseSnapshot(snapshot); | |||
} | |||
bool FieldDB::GetProperty(const Slice &property, std::string *value) { | |||
return kvDB_->GetProperty(property, value) | indexDB_->GetProperty(property, value); | |||
} | |||
void FieldDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) { | |||
uint64_t temp = 0; | |||
kvDB_->GetApproximateSizes(range, n, sizes); | |||
indexDB_->GetApproximateSizes(range, n, &temp); | |||
*sizes += temp; | |||
} | |||
void FieldDB::CompactRange(const Slice *begin, const Slice *end) { | |||
kvDB_->CompactRange(begin, end); | |||
} | |||
} // end of namespace |
@ -0,0 +1,68 @@ | |||
# ifndef FIELD_DB_H | |||
# define FIELD_DB_H | |||
#include "port/port_stdcxx.h" | |||
#include "db/db_impl.h" | |||
#include <deque> | |||
#include <map> | |||
#include <set> | |||
#include <string> | |||
#include "leveldb/db.h" | |||
#include "leveldb/options.h" | |||
#include "leveldb/slice.h" | |||
#include "leveldb/status.h" | |||
#include "fielddb/request.h" | |||
namespace fielddb { | |||
using namespace leveldb; | |||
class FieldDB : DB { | |||
public: | |||
//用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db | |||
FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; | |||
/*lab1的要求,作为db派生类要实现的虚函数*/ | |||
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; | |||
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; | |||
Status Delete(const WriteOptions &options, const Slice &key) override; | |||
Status Write(const WriteOptions &options, WriteBatch *updates) override; | |||
Status Get(const ReadOptions &options, const Slice &key, std::string *value) override; | |||
Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) override; | |||
std::vector<std::string> FindKeysByField(Field &field) override; | |||
Iterator * NewIterator(const ReadOptions &options) override; | |||
const Snapshot * GetSnapshot() override; | |||
void ReleaseSnapshot(const Snapshot *snapshot) override; | |||
bool GetProperty(const Slice &property, std::string *value) override; | |||
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override; | |||
void CompactRange(const Slice *begin, const Slice *end) override; | |||
/*与索引相关*/ | |||
Status CreateIndexOnField(const std::string& field_name); | |||
Status DeleteIndex(const std::string &field_name); | |||
std::vector<std::string> QueryByIndex(const Field &field, Status *s); | |||
static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr); | |||
private: | |||
//根据metaDB的内容进行恢复 | |||
Status Recover(); | |||
private: | |||
std::string dbname_; | |||
leveldb::DB *metaDB_; | |||
leveldb::DB *indexDB_; | |||
leveldb::DB *kvDB_; | |||
enum IndexStatus{ | |||
Creating, | |||
Deleting, | |||
Exist | |||
}; | |||
std::map<std::string, int> index_; | |||
leveldb::port::Mutex mutex_; // mutex for taskqueue | |||
std::deque<Request *> taskqueue_; | |||
std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName ( | |||
const std::string &fieldName); | |||
}; | |||
} // end of namespace | |||
# endif |
@ -0,0 +1,20 @@ | |||
#include "fielddb/metakv.h" | |||
#include "util/coding.h" | |||
#include <string> | |||
namespace fielddb { | |||
using namespace leveldb; | |||
Slice MetaKV::metaKey() { | |||
std::string buf; | |||
PutLengthPrefixedSlice(&buf, Key); | |||
PutFixed64(&buf, meta_seq); | |||
PutFixed32(&buf, tag); | |||
return Slice(buf); | |||
} | |||
Slice MetaKV::metaValue() { | |||
return Slice(SerializeValue(Fields)); | |||
} | |||
} |
@ -0,0 +1,26 @@ | |||
#pragma once | |||
#include <cstdint> | |||
#include <cstdio> | |||
#include "leveldb/slice.h" | |||
#include "util/serialize_value.h" | |||
namespace fielddb { | |||
using namespace leveldb; | |||
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ | |||
class MetaKV { | |||
MetaKV(Slice &Key,FieldArray Fields): | |||
Key(Key),Fields(Fields),tag(0),meta_seq(0) { } | |||
inline int get_seq() { return meta_seq; } | |||
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } | |||
inline void setPut() { tag = PUT; } | |||
inline void setDelete() { tag = DELETE; } | |||
Slice metaKey(); | |||
Slice metaValue(); | |||
private: | |||
enum {PUT = 0x0,DELETE = 0x1}; | |||
uint64_t meta_seq; | |||
uint8_t tag; | |||
Slice &Key; | |||
FieldArray Fields; | |||
}; | |||
} |
@ -0,0 +1,25 @@ | |||
#include <string> | |||
#include "port/port_stdcxx.h" | |||
#include "util/mutexlock.h" | |||
#include "util/serialize_value.h" | |||
namespace fielddb { | |||
using namespace leveldb; | |||
// 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request | |||
// 这个思路与write写入的思路类似 | |||
class Request { | |||
public: | |||
Request(std::string *Key,std::string *Value,port::Mutex *mu): | |||
Key(Key),Value(Value),hasFields(false),_cond(mu) { } | |||
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): | |||
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { } | |||
private: | |||
bool done; | |||
port::CondVar _cond; | |||
bool hasFields; | |||
std::string *Key; | |||
std::string *Value; | |||
FieldArray *Fields; | |||
}; | |||
} |
@ -0,0 +1,146 @@ | |||
#include "gtest/gtest.h" | |||
// #include "leveldb/env.h" | |||
// #include "leveldb/db.h" | |||
#include "fielddb/field_db.h" | |||
using namespace fielddb; | |||
constexpr int value_size = 2048; | |||
constexpr int data_size = 128 << 20; | |||
std::vector<std::string> cities = { | |||
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou", | |||
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" | |||
}; | |||
std::vector<std::string> shanghaiKeys; | |||
Status OpenDB(std::string dbName, FieldDB **db) { | |||
Options options; | |||
options.create_if_missing = true; | |||
return FieldDB::OpenFieldDB(options, dbName, db); | |||
} | |||
void ClearDB(FieldDB *db){ | |||
//destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染 | |||
WriteOptions writeOptions; | |||
int key_num = data_size / value_size; | |||
for (int i = 0; i < key_num; i++) { | |||
int key_ = i+1; | |||
std::string key = std::to_string(key_); | |||
Status s = db->Delete(WriteOptions(), key); | |||
ASSERT_TRUE(s.ok()); | |||
} | |||
} | |||
void InsertFieldData(FieldDB *db) { | |||
WriteOptions writeOptions; | |||
int key_num = data_size / value_size; | |||
srand(0); | |||
for (int i = 0; i < key_num; i++) { | |||
int randThisTime = rand(); //确保读写一个循环只rand一次,否则随机序列会不一致 | |||
int key_ = randThisTime % key_num+1; | |||
std::string key = std::to_string(key_); | |||
std::string name = "customer#" + std::to_string(key_); | |||
std::string address = cities[randThisTime % cities.size()]; | |||
FieldArray fields = { | |||
{"name", name}, | |||
{"address", address} | |||
}; | |||
if (address == "Shanghai") { | |||
shanghaiKeys.push_back(key); | |||
} | |||
Status s = db->PutFields(WriteOptions(), key, fields); | |||
ASSERT_TRUE(s.ok()); | |||
} | |||
} | |||
void GetFieldData(FieldDB *db) { | |||
ReadOptions readOptions; | |||
int key_num = data_size / value_size; | |||
// 点查 | |||
srand(0); | |||
for (int i = 0; i < 100; i++) { | |||
int randThisTime = rand(); | |||
int key_ = randThisTime % key_num+1; | |||
std::string key = std::to_string(key_); | |||
FieldArray fields_ret; | |||
Status s = db->GetFields(readOptions, key, &fields_ret); | |||
ASSERT_TRUE(s.ok()); | |||
for (const Field& pairs : fields_ret) { | |||
if (pairs.first == "name"){ | |||
} else if (pairs.first == "address"){ | |||
std::string city = pairs.second; | |||
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end()); | |||
} else assert(false); | |||
} | |||
} | |||
} | |||
void findKeysByCity(FieldDB *db) { | |||
Field field = {"address", "Shanghai"}; | |||
std::vector<std::string> resKeys = db->FindKeysByField(field); | |||
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; | |||
for (const std::string &key : resKeys){ | |||
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); | |||
} | |||
} | |||
void findKeysByCityIndex(FieldDB *db, bool expect) { | |||
Field field = {"address", "Shanghai"}; | |||
Status s; | |||
std::vector<std::string> resKeys = db->QueryByIndex(field, &s); | |||
if (expect) ASSERT_TRUE(s.ok()); | |||
else { | |||
ASSERT_TRUE(s.IsNotFound()); | |||
return; | |||
} | |||
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; | |||
for (const std::string &key : resKeys){ | |||
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); | |||
} | |||
} | |||
TEST(TestLab1, Basic) { | |||
// DestroyDB("testdb",Options()); | |||
FieldDB *db = new FieldDB(); | |||
if(OpenDB("testdb", &db).ok() == false) { | |||
std::cerr << "open db failed" << std::endl; | |||
abort(); | |||
} | |||
// ClearDB(db); | |||
InsertFieldData(db); | |||
GetFieldData(db); | |||
findKeysByCity(db); | |||
delete db; | |||
} | |||
TEST(TestLab2, Basic) { | |||
//destroy | |||
FieldDB *db = new FieldDB(); | |||
if(OpenDB("testdb2", &db).ok() == false) { | |||
std::cerr << "open db failed" << std::endl; | |||
abort(); | |||
} | |||
// ClearDB(db); | |||
shanghaiKeys.clear(); | |||
InsertFieldData(db); | |||
// GetFieldData(db); | |||
// findKeysByCity(db); | |||
db->CreateIndexOnField("address"); | |||
findKeysByCityIndex(db, true); | |||
db->DeleteIndex("address"); | |||
findKeysByCityIndex(db, false); | |||
delete db; | |||
} | |||
int main(int argc, char** argv) { | |||
// All tests currently run with the same read-only file limits. | |||
testing::InitGoogleTest(&argc, argv); | |||
return RUN_ALL_TESTS(); | |||
} |
@ -0,0 +1,94 @@ | |||
#include "util/serialize_value.h" | |||
#include <algorithm> | |||
#include <string> | |||
#include "util/coding.h" | |||
#include <iostream> | |||
namespace leveldb{ | |||
bool compareByFirst(const Field& a, const Field& b) { | |||
return a.first < b.first; // 按字段名升序排序 | |||
} | |||
std::string SerializeValue(const FieldArray& fields){ | |||
FieldArray sortFields = fields; | |||
std::sort(sortFields.begin(), sortFields.end(), compareByFirst); | |||
std::string result; | |||
for (const Field& pairs : sortFields) { | |||
PutLengthPrefixedSlice(&result, pairs.first); | |||
PutLengthPrefixedSlice(&result, pairs.second); | |||
} | |||
return result; | |||
} | |||
FieldArray *ParseValue(const std::string& value_str){ | |||
Slice valueSlice(value_str); | |||
FieldArray *res = new FieldArray; | |||
Slice nameSlice = Slice(); | |||
Slice valSlice = Slice(); | |||
std::string nameStr; | |||
std::string valStr; | |||
while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)){ | |||
nameStr = nameSlice.ToString(); | |||
if(GetLengthPrefixedSlice(&valueSlice, &valSlice)){ | |||
valStr = valSlice.ToString(); | |||
res->emplace_back(nameStr, valStr); | |||
} else { | |||
std::cout << "name and val not match!" << std::endl; | |||
} | |||
nameSlice.clear(); | |||
valSlice.clear(); | |||
} | |||
return res; | |||
} | |||
void InternalFieldArray::Map() { | |||
if(isMapped) return; | |||
for(const Field& pair : fields) { | |||
map[pair.first] = pair.second; | |||
} | |||
isMapped = true; | |||
} | |||
std::string InternalFieldArray::Serialize() { | |||
std::string result; | |||
if(isMapped) { | |||
for(auto pair : map) { | |||
PutLengthPrefixedSlice(&result, pair.first); | |||
PutLengthPrefixedSlice(&result, pair.second); | |||
} | |||
} else { | |||
result = SerializeValue(fields); | |||
} | |||
return result; | |||
} | |||
bool InternalFieldArray::HasField(const Field& field) { | |||
if(isMapped) { | |||
if(map.count(field.first) && map[field.first] == field.second) { | |||
return true; | |||
} | |||
return false; | |||
} | |||
return std::find(fields.begin(),fields.end(),field) != fields.end(); | |||
} | |||
std::string InternalFieldArray::ValOfName(const std::string &name) { | |||
if(isMapped) { | |||
if(map.count(name)) { | |||
return map[name]; | |||
} | |||
return std::string(); | |||
} | |||
for (auto iter = fields.begin(); iter != fields.end(); iter++){ | |||
if (iter->first == name) { | |||
return iter->second; | |||
} else if (iter->first > name) { | |||
return std::string(); | |||
} | |||
} | |||
return std::string(); | |||
} | |||
} |
@ -0,0 +1,59 @@ | |||
#ifndef STORAGE_LEVELDB_UTIL_SERIALIZE_VALUE_H_ | |||
#define STORAGE_LEVELDB_UTIL_SERIALIZE_VALUE_H_ | |||
#include <iostream> | |||
#include <string> | |||
#include <vector> | |||
#include <map> | |||
#include "leveldb/slice.h" | |||
#include "util/coding.h" | |||
namespace leveldb{ | |||
using Field = std::pair<std::string, std::string>; // field_name:field_value | |||
using FieldArray = std::vector<std::pair<std::string, std::string>>; | |||
std::string SerializeValue(const FieldArray& fields); | |||
FieldArray *ParseValue(const std::string& value_str); | |||
class InternalFieldArray { | |||
public: | |||
using FieldMap = std::map<std::string,std::string>; | |||
InternalFieldArray(const FieldArray &fields, bool to_map = false): | |||
fields(fields),isMapped(false) { | |||
if(to_map) Map(); | |||
} | |||
InternalFieldArray(const Slice value_slice) { | |||
Slice valueSlice = value_slice; | |||
Slice nameSlice, valSlice; | |||
while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) { | |||
if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) { | |||
map[nameSlice.ToString()] = valSlice.ToString(); | |||
} else { | |||
std::cout << "name and val not match!" << std::endl; | |||
} | |||
nameSlice.clear(); | |||
valSlice.clear(); | |||
} | |||
isMapped = true; | |||
} | |||
InternalFieldArray(const std::string& value_str) | |||
:leveldb::InternalFieldArray(Slice(value_str)) {} | |||
//将vector变为用map存 | |||
void Map(); | |||
std::string Serialize(); | |||
bool HasField(const Field& field); | |||
std::string ValOfName(const std::string& name); | |||
private: | |||
bool isMapped; | |||
const FieldArray fields; | |||
FieldMap map; | |||
}; | |||
} | |||
#endif |
@ -0,0 +1,104 @@ | |||
# 1. 项目概述 | |||
leveldb中的存储原本只支持简单的字节序列,在这个项目中我们对其功能进行拓展,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。但如果仅通过字段查找数据,需要对整个数据库的遍历,不够高效,因此还要新增二级索引,提高对特定字段的查询效率。 | |||
本文档涵盖的设计内容只是最初的设想,实现过程中大概率会进行调整甚至重构。各部分也将在项目落实的过程中进行补充完善。 | |||
# 2. 功能设计 | |||
## 2.1 字段设计 | |||
设计目标:对value存储读取时进行序列化编码,使其支持字段。 | |||
实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。 | |||
在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法`PutLengthPrefixedSlice`存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。 | |||
``` | |||
std::string SerializeValue(const FieldArray& fields){ | |||
FieldArray sortFields = fields; | |||
std::sort(sortFields.begin(), sortFields.end(), compareByFirst); | |||
std::string result; | |||
for (const Field& pairs : sortFields) { | |||
PutLengthPrefixedSlice(&result, pairs.first); | |||
PutLengthPrefixedSlice(&result, pairs.second); | |||
} | |||
return result; | |||
} | |||
``` | |||
最终db类提供了新接口`putFields`, `getFields`,分别对传入的字段序列化后调用原来的`put`, `get`接口。 | |||
`FindKeysByField`调用`NewIterator`遍历所有数据,field名和值符合则加入返回的key中。 | |||
## 2.2 二级索引 | |||
设计目标:对某个字段或属性建立索引,提高对该字段的查询效率。需考虑索引的创建、删除、维护。 | |||
实现思路: 这一部分的难点主要在于,索引数据与原数据的存储需要进行隔离,不同操作之间存在同步与异步问题,还需要考虑两种数据间的一致性。为了使设计简洁化,避免不同模块耦合带来潜在的问题,我们的设计如下: | |||
1. 总体上,我们对两种数据分别创建一个db类的对象`kvDb, indexDb`。对外的接口类`FieldDb`包含了这两个对象,提供原先的leveldb各种接口,以及新功能,并在这一层完成两个对象的管理。 | |||
2. `kvDb`与原先的存储一致,`indexDb`中key是原key与索引字段的联合编码,value是空(详见数据结构设计),并且lab1中新增的接口也不会被调用到。 | |||
3. 在open新的fieldDb时,会创建这两个对象并open他们。fieldDb会维护一个字符串数组`fieldWithIndex`,记录了当前哪些字段名拥有索引。一个`pair<indexStatus, fieldName>`的队列`taskQueue`, 维护了创建或者删除索引的任务请求。创建或删除索引时,把任务加入队尾并休眠。一个任务完成后唤醒队首继续下一个任务(类似write机制) | |||
``` | |||
class FieldDb { | |||
Db *kvDb; | |||
Db *indexDb; | |||
string[] fieldWithIndex; | |||
queue<pair<bool indexStatus, std::string fieldName>> taskQueue; | |||
// 0建 or 1删,或把pair抽象成一个类 | |||
//原db所有对外接口,下面没提就调用kvDb相应函数 | |||
bool CreateIndexOnField(const std::string& fieldName){、 | |||
//加入taskQueue并休眠/拿写锁 | |||
//kvdb->FindKvsByField得到数据信息 | |||
//编码 | |||
//kvdb->writeIndexLog写索引日志 | |||
//indexDb->put存储索引 | |||
//fieldWithIndex.insert(fieldName) | |||
//唤醒taskQueue队首/释放写锁 | |||
} | |||
bool DeleteIndex(std::string &fieldName); //同上 | |||
std::vector<std::string> QueryByIndex(Field &field){ | |||
//判断fieldWithIndex是否有 | |||
//indexDb->iterator得到编码后的数据信息 | |||
//解码返回 | |||
} | |||
} | |||
``` | |||
4. 一个创建/删除索引任务开始时,首先锁住写锁,再对`kvDb`调用`FindKvsByField`(类似lab1的`FindKeysByField`但也要返回field对应的值),对返回的数组中每个元素一一与字段名编码成新key,**合并在一个writebatch中写入日志**,再调用`indexDb`中的put或delete。当`indexdb`把所有操作执行完后,在`fieldWithIndex`中添加/删除这个`fieldName`,使得用户可以针对这个字段进行索引读,最后唤醒下一个任务,没有则释放写锁。 | |||
5. fieldDb也提供了`put`, `get`,`getFields`, `putFields`,`delete`接口。对前三者,简单调用`kvDb`中的对应接口(不涉及索引)。 | |||
对`putFields`,先判断是否有`fieldWithIndex`中有的字段,如果有,并对`kvDb`调用一个(多个)`put`,**但在写日志时一并加上索引日志写入**。 | |||
`delete`逻辑一致。 | |||
6. 针对索引的日志:为了保证两个数据库间的一致性,由`kvDb`的日志模块统一管理。这其中包含了两种chunk(kv写入和索引写入),在恢复时需要分别解析,决定往哪一个数据库中写入。索引写入的时机在4、5中的**加粗**部分,如何编码还有待设计。也就是说,`indexDb`本身的日志模块不再起到作用,项目后期可以修改关闭这一部分。 | |||
7. 对两个数据库的其他部分,理论上每个数据库内部的其他模块不会互相影响。 | |||
# 3. 数据结构设计 | |||
具体设计模块化,实现时再具体考虑。 | |||
`indexDb`的kv编码:**暂时考虑助教文档那种** | |||
区分日志中kv部分和index部分:思路是在writebatch中某个地方加个标识区分,每一类的编码与各自的key编码类似**细节有待完成** | |||
# 4. 接口/函数设计 | |||
`FieldDb`的对外接口函数之前已展示,这里补充一些子数据库需提供给`FieldDb`的抽象功能(暂时想到的): | |||
``` | |||
class Db{ | |||
//原有部分和lab1部分 | |||
//kvdb需要: | |||
pair<slice key, string fieldVal> FindKvsByField(string fieldName); //搜集索引需要的数据信息 | |||
status writeIndexLog(string fieldName, pair<slice key, string fieldVal>); //向indexDb put前先写日志 | |||
} | |||
``` | |||
类内部实现的功能函数,具体实现过程中再抽象。 | |||
# 5. 功能测试 | |||
1. 基本的每个接口函数调用。 | |||
2. 创建/删除索引时的并发读写、并发创/删。 | |||
3. 数据库的恢复检查。 | |||
4. 性能测试。 | |||
# 6. 可能的挑战与解决方案 | |||
已想到的部分在之前已阐述,其余待发现。 | |||
mark一些原代码(db中)的修改点:recover时日志解析修改,write时的日志写入可能要合并索引的写入。 | |||
# 7. 分工与进度安排 | |||
功能 | 完成日期 | 分工 | |||
:------|:---------|:------ | |||
value序列化|11.19 | 李度 | |||
fieldDb接口|11.25|陈胤遒 | |||
lab1整体+测试|11.30|高宇菲 | |||
fieldDb功能实现|12.10|李度 | |||
kvdb功能实现与原代码修改|12.10|陈胤遒 | |||
整体系统整合+测试|12.20|李度、陈胤遒、高宇菲 | |||
性能测试|12.30|高宇菲 |