|
|
@ -6,11 +6,12 @@ leveldb中的存储原本只支持简单的字节序列,在这个项目中我 |
|
|
|
|
|
|
|
# 2. 功能实现 |
|
|
|
## 2.1 字段 |
|
|
|
设计目标:对value存储读取时进行序列化编码,使其支持字段。 |
|
|
|
设计目标:对value存储读取时进行序列化编码,使其支持字段。 |
|
|
|
这一部分的具体代码在util/serialize_value.cc中 |
|
|
|
|
|
|
|
实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。 |
|
|
|
在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法`PutLengthPrefixedSlice`存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。 |
|
|
|
``` |
|
|
|
```c++ |
|
|
|
std::string SerializeValue(const FieldArray& fields){ |
|
|
|
std::sort(sortFields.begin(), sortFields.end(), compareByFirst); |
|
|
|
for (const Field& pairs : sortFields) { |
|
|
@ -22,13 +23,237 @@ std::string SerializeValue(const FieldArray& fields){ |
|
|
|
``` |
|
|
|
最终db类提供了新接口`putFields`, `getFields`,分别对传入的字段序列化后调用原来的`put`, `get`接口。 |
|
|
|
`FindKeysByField`调用`NewIterator`遍历所有数据,field名和值符合则加入返回的key中。 |
|
|
|
**这一部分的具体代码在util/serialize_value.cc中** |
|
|
|
|
|
|
|
|
|
|
|
## 2.2 二级索引 |
|
|
|
设计目标:对某个字段(属性)建立索引,提高对该字段的查询效率。 |
|
|
|
这一部分的具体代码在field/下 |
|
|
|
|
|
|
|
### 2.2.1 总体架构 |
|
|
|
fielddb |
|
|
|
1. **二级索引的设计** |
|
|
|
|
|
|
|
二级索引的难点主要包括以下几点:索引数据与kv数据的存储需要进行隔离,不同操作之间存在同步与异步问题,每一次的写入操作都需要额外考虑数据库原本的索引情况,任何操作还需要考虑两种数据间的一致性。为了使设计简洁化,避免不同模块耦合带来潜在的问题,我们的设计如下: |
|
|
|
|
|
|
|
总体上,我们对两种数据分别创建一个db类的对象kvDb, indexDb。对外的接口类FieldDb包含了这两个对象,提供原先的leveldb各种接口,以及新功能,并在这一层完成两个对象的管理。这两个子数据库共同协作,完成了二级索引的各核心操作。在此基础上,为了保证数据库崩溃时两个子数据库的一致性,我们设计了第三个子数据库metadb,它的作用类似于日志。 |
|
|
|
```c++ |
|
|
|
class FieldDB : DB { |
|
|
|
public: |
|
|
|
FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; |
|
|
|
~FieldDB(); |
|
|
|
/*lab1的要求,以及作为db派生类要实现的虚函数*/ |
|
|
|
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; |
|
|
|
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; |
|
|
|
Status Delete(const WriteOptions &options, const Slice &key) override; |
|
|
|
Status Write(const WriteOptions &options, WriteBatch *updates) override; |
|
|
|
Status Get(const ReadOptions &options, const Slice &key, std::string *value) override; |
|
|
|
Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) override; |
|
|
|
std::vector<std::string> FindKeysByField(Field &field) override; |
|
|
|
Iterator * NewIterator(const ReadOptions &options) override; |
|
|
|
const Snapshot * GetSnapshot() override; |
|
|
|
void ReleaseSnapshot(const Snapshot *snapshot) override; |
|
|
|
bool GetProperty(const Slice &property, std::string *value) override; |
|
|
|
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override; |
|
|
|
void CompactRange(const Slice *begin, const Slice *end) override; |
|
|
|
/*与索引相关*/ |
|
|
|
Status CreateIndexOnField(const std::string& field_name, const WriteOptions &op); |
|
|
|
Status DeleteIndex(const std::string &field_name, const WriteOptions &op); |
|
|
|
std::vector<std::string> QueryByIndex(const Field &field, Status *s); |
|
|
|
IndexStatus GetIndexStatus(const std::string &fieldName); |
|
|
|
|
|
|
|
static Status OpenFieldDB(Options& options,const std::string& name,FieldDB** dbptr); |
|
|
|
|
|
|
|
private: |
|
|
|
//根据metaDB的内容进行恢复 |
|
|
|
Status Recover(); |
|
|
|
|
|
|
|
private: |
|
|
|
leveldb::DB *kvDB_; |
|
|
|
leveldb::DB *metaDB_; |
|
|
|
leveldb::DB *indexDB_; |
|
|
|
|
|
|
|
std::string dbname_; |
|
|
|
const Options *options_; |
|
|
|
Env *env_; |
|
|
|
} |
|
|
|
``` |
|
|
|
这样的设计带来了如下的好处: |
|
|
|
kv和index实现了完全的分离,并且由于各自都使用了leveldb构建的lsmtree,完全保证了内部实现的正确性。相应的,我们的工作基本只处在fielddb层,减少了模块的耦合,对于我们自己实现的正确性也有极大的提升。 |
|
|
|
|
|
|
|
所有leveldb原本的功能仍然能够支持,并且有些实现起来十分简单,比如: |
|
|
|
```c++ |
|
|
|
Iterator * FieldDB::NewIterator(const ReadOptions &options) { |
|
|
|
return kvDB_->NewIterator(options); |
|
|
|
} |
|
|
|
|
|
|
|
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { |
|
|
|
return kvDB_->GetFields(options, key, fields); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
此外,性能开销增加也只在fielddb层,使我们能够进行比较和优化。 |
|
|
|
|
|
|
|
2. **index的编码** |
|
|
|
index编码仍然采用了leveldb提供`PutLengthPrefixedSlice`,保留信息的同时,提高空间利用率。 |
|
|
|
对于一个`key : {name : val}`的字段,索引采用如下编码: |
|
|
|
```c++ |
|
|
|
inline void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key){ |
|
|
|
PutLengthPrefixedSlice(result, key.name_); |
|
|
|
PutLengthPrefixedSlice(result, key.val_); |
|
|
|
PutLengthPrefixedSlice(result, key.user_key_); |
|
|
|
``` |
|
|
|
这一部分也被模块化的封装在field/encode_index.h中。 |
|
|
|
|
|
|
|
由此产生了索引读的方法:根据name和val构建一个新的iterator,迭代获取范围内的所有key: |
|
|
|
```c++ |
|
|
|
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) { |
|
|
|
if (index_.count(field.first) == 0 || index_[field.first].first != Exist){ |
|
|
|
*s = Status::NotFound(Slice()); |
|
|
|
return std::vector<std::string>(); |
|
|
|
} |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, |
|
|
|
ParsedInternalIndexKey(Slice(), field.first, field.second)); |
|
|
|
Iterator *indexIterator = indexDB_->NewIterator(ReadOptions()); |
|
|
|
indexIterator->Seek(indexKey); |
|
|
|
|
|
|
|
std::vector<std::string> result; |
|
|
|
for (; indexIterator->Valid(); indexIterator->Next()) { |
|
|
|
ParsedInternalIndexKey iterKey; |
|
|
|
if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){ |
|
|
|
if (iterKey.name_ == field.first && iterKey.val_ == field.second){ |
|
|
|
result.push_back(iterKey.user_key_.ToString()); |
|
|
|
continue; //查到说明在范围里,否则break |
|
|
|
} |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
delete indexIterator; |
|
|
|
*s = Status::OK(); |
|
|
|
return result; |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
3. **新的写流程** |
|
|
|
索引功能的出现,使得写的逻辑需要重新设计。因为每一次写,不仅需要关注本次写入的字段,是不是需要同时写入索引,还需要关注本次写入的key,是不是覆盖了数据库原本的key,导致需要修改原本key的索引情况。这也意味着,即使是put简单的kv(不带字段),实际上还是需要修改put逻辑。方便起见,我们为原本的put中的value加入一个""的字段名,也视为putfield(这只是为了使我们的数据库支持原本的所有功能,也并不是本项目的重点,完全可以索性删除put功能,让我们的数据库只支持字段value)。 |
|
|
|
|
|
|
|
下面是putfield的实现思路: |
|
|
|
```c++ |
|
|
|
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
WriteBatch &MetaBatch,fielddb::FieldDB *DB, |
|
|
|
SliceHashSet &batchKeySet) |
|
|
|
{ |
|
|
|
if (batchKeySet.find(Key) != batchKeySet.end()){ |
|
|
|
return;//并发的被合并的put/delete请求只处理一次 |
|
|
|
} else { |
|
|
|
batchKeySet.insert(Key); |
|
|
|
} |
|
|
|
std::string val_str; |
|
|
|
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); |
|
|
|
FieldSliceArray oldFields; |
|
|
|
if (s.IsNotFound()){ |
|
|
|
// oldFields = nullptr; |
|
|
|
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 |
|
|
|
Slice nameSlice, valSlice; |
|
|
|
Slice Value(val_str); |
|
|
|
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { |
|
|
|
if(GetLengthPrefixedSlice(&Value, &valSlice)) { |
|
|
|
oldFields.push_back({nameSlice,valSlice}); |
|
|
|
} else { |
|
|
|
std::cout << "name and val not match! From FieldsReq Init" << std::endl; |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
nameSlice.clear(), valSlice.clear(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
|
|
|
|
bool HasIndex = false; |
|
|
|
bool HasOldIndex = false; |
|
|
|
{ |
|
|
|
DB->index_mu.AssertHeld(); |
|
|
|
//1.将存在冲突的put pend到对应的请求 |
|
|
|
for(auto &[field_name,field_value] : SliceFields) { |
|
|
|
if(field_name.data() == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; |
|
|
|
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { |
|
|
|
parent_req->PendReq(this->parent); |
|
|
|
return; |
|
|
|
} else if(index_status == IndexStatus::Exist) { |
|
|
|
HasIndex = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 |
|
|
|
if (!oldFields.empty()){ |
|
|
|
for(auto &[field_name,field_value] : oldFields) { |
|
|
|
if(field_name.data() == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; |
|
|
|
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { |
|
|
|
parent_req->PendReq(this->parent); |
|
|
|
return; |
|
|
|
} else if(index_status == IndexStatus::Exist) { |
|
|
|
HasOldIndex = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
std::string scrach = SerializeValue(SliceFields); |
|
|
|
KVBatch.Put(Slice(Key), Slice(scrach)); |
|
|
|
//2.对于没有冲突但含有索引操作的put,构建metaKV |
|
|
|
if(HasIndex || HasOldIndex) { |
|
|
|
std::string MetaKey,MetaValue; |
|
|
|
std::string serialized = SerializeValue(SliceFields); |
|
|
|
MetaKV MKV = MetaKV(Key,serialized); |
|
|
|
MKV.TransPut(MetaKey, MetaValue); |
|
|
|
MetaBatch.Put(MetaKey, serialized); |
|
|
|
|
|
|
|
|
|
|
|
//3.1对于含有索引的oldfield删除索引 |
|
|
|
if (HasOldIndex) { |
|
|
|
for(auto &[field_name,field_value] : oldFields) { |
|
|
|
if(field_name.data() == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name.ToString()) && //旧数据有,新数据没有的字段,删索引 |
|
|
|
std::find(SliceFields.begin(), SliceFields.end(), |
|
|
|
std::make_pair(field_name, field_value)) == SliceFields.end()) { |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, ParsedInternalIndexKey( |
|
|
|
Key,field_name,field_value)); |
|
|
|
IndexBatch.Delete(indexKey); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
//3.2对于含有索引的field建立索引 |
|
|
|
if (HasIndex) { |
|
|
|
for(auto &[field_name,field_value] : SliceFields) { |
|
|
|
if(field_name.data() == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, ParsedInternalIndexKey( |
|
|
|
Key,field_name,field_value)); |
|
|
|
IndexBatch.Put(indexKey, Slice()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
同理,delete也需要先读最新的数据,再进行相应的处理,这里简单贴上实现逻辑: |
|
|
|
``` |
|
|
|
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field |
|
|
|
//2.1 如果无,则正常构造delete |
|
|
|
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 |
|
|
|
//在kvDB和indexDB中写入对应的delete |
|
|
|
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 |
|
|
|
``` |
|
|
|
|
|
|
|
上面的代码也展现了并发与恢复的部分,接下来会一一阐述。 |
|
|
|
|
|
|
|
### 2.2.2 如何并发创删索引与读写 |
|
|
|
1. **为什么要并发控制创删索引与读写** |
|
|
@ -238,8 +463,106 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
### 2.2.3 如何保证两个kv与index的一致性 |
|
|
|
metadb |
|
|
|
### 2.2.3 如何保证kv与index的一致性 |
|
|
|
metadb为异常恢复服务,只涉及到putfield和delete部分。(这里最初的设计有些问题,当时认为异常恢复也需要考虑创删索引部分,但实际上创删索引的本质,是一次往indexdb的writebatch,只会有索引整体写入成功和不成功两种情况,并不会出现不一致问题。) |
|
|
|
|
|
|
|
因此metadb的编码,只要在原本kv编码的基础上,加一个标志位,标识本条是来自putfield还是delete。 |
|
|
|
metadb提供的功能被封装在fielddb/meta.cc中,包括编码: |
|
|
|
```c++ |
|
|
|
void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) { |
|
|
|
MetaKey.clear(); |
|
|
|
MetaValue.clear(); |
|
|
|
std::string &buf = MetaKey; |
|
|
|
PutFixed32(&buf, KV_Creating); |
|
|
|
PutLengthPrefixedSlice(&buf, Slice(name)); |
|
|
|
} |
|
|
|
|
|
|
|
void MetaKV::TransDelete(std::string &MetaKey) { |
|
|
|
MetaKey.clear(); |
|
|
|
std::string &buf = MetaKey; |
|
|
|
PutFixed32(&buf, KV_Deleting); |
|
|
|
PutLengthPrefixedSlice(&buf, Slice(name)); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
以及kv和index写完后的清理(构建一个都是delete的writebatch,向metadb中写入): |
|
|
|
```c++ |
|
|
|
class CleanerHandler : public WriteBatch::Handler { |
|
|
|
public: |
|
|
|
WriteBatch *NeedClean; |
|
|
|
void Put(const Slice& key, const Slice& value) override { |
|
|
|
//将所有之前put的meta数据进行delete |
|
|
|
NeedClean->Delete(key); |
|
|
|
} |
|
|
|
void Delete(const Slice& key) override { |
|
|
|
//所有的传入的MetaBatch都是Put的 |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
void MetaCleaner::Collect(WriteBatch &MetaBatch) { |
|
|
|
if(MetaBatch.ApproximateSize() <= 12) return; |
|
|
|
CleanerHandler Handler; |
|
|
|
Handler.NeedClean = &NeedClean; |
|
|
|
MetaBatch.Iterate(&Handler); |
|
|
|
} |
|
|
|
|
|
|
|
void MetaCleaner::CleanMetaBatch(DB *metaDB) { |
|
|
|
if(NeedClean.ApproximateSize() <= 12) return; |
|
|
|
metaDB->Write(WriteOptions(), &NeedClean); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
相应的,我们数据库的恢复也是建立在三个数据库的协作之上: |
|
|
|
在重新打开三个数据库,依靠各自的日志恢复各自的数据后,完成对索引相关内容的恢复: |
|
|
|
```c++ |
|
|
|
Status FieldDB::Recover() { |
|
|
|
//1. 遍历所有Index类型的meta,重建内存中的index_状态表 |
|
|
|
Iterator *Iter = indexDB_->NewIterator(ReadOptions()); |
|
|
|
std::string IndexKey; |
|
|
|
Iter->SeekToFirst(); |
|
|
|
while(Iter->Valid()) { |
|
|
|
IndexKey = Iter->key().ToString(); |
|
|
|
ParsedInternalIndexKey ParsedIndex; |
|
|
|
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); |
|
|
|
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; |
|
|
|
|
|
|
|
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) |
|
|
|
std::string Seek; |
|
|
|
PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); |
|
|
|
Seek.push_back(0xff); |
|
|
|
Iter->Seek(Slice(Seek)); |
|
|
|
} |
|
|
|
delete Iter; |
|
|
|
//2. 寻找所有KV类型的meta,再次提交一遍请求 |
|
|
|
Iter = metaDB_->NewIterator(ReadOptions()); |
|
|
|
Slice MetaValue; |
|
|
|
Iter->SeekToFirst(); |
|
|
|
while (Iter->Valid()) { |
|
|
|
MetaValue = Iter->key(); |
|
|
|
MetaType type = MetaType(DecodeFixed32(MetaValue.data())); |
|
|
|
MetaValue.remove_prefix(4);//移除头上的metaType的部分 |
|
|
|
Slice extractKey; |
|
|
|
GetLengthPrefixedSlice(&MetaValue, &extractKey); |
|
|
|
if(type == KV_Creating) { |
|
|
|
FieldArray fields; |
|
|
|
ParseValue(Iter->value().ToString(), &fields); |
|
|
|
PutFields(WriteOptions(), extractKey, fields); |
|
|
|
} else if(type == KV_Deleting) { |
|
|
|
Delete(WriteOptions(), extractKey); |
|
|
|
} else { |
|
|
|
assert(0 && "Invalid MetaType"); |
|
|
|
} |
|
|
|
Iter->Next(); |
|
|
|
} |
|
|
|
delete Iter; |
|
|
|
//在所有的请求完成后,会自动把metaDB的内容清空。 |
|
|
|
Iter = metaDB_->NewIterator(ReadOptions()); |
|
|
|
Iter->SeekToFirst(); |
|
|
|
delete Iter; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
## 3. 测试 |
|
|
|
### 3.1 正确性测试 |
|
|
|