From e39d6b86de8d6c1ddee8a2b7f3268c1e9df44915 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Wed, 1 Jan 2025 13:34:13 +0800 Subject: [PATCH] =?UTF-8?q?=E9=83=A8=E5=88=86=E5=AE=9E=E9=AA=8C=E6=8A=A5?= =?UTF-8?q?=E5=91=8A=EF=BC=8C=E6=95=B4=E7=90=86=E6=B3=A8=E9=87=8A=EF=BC=8C?= =?UTF-8?q?putfield=E5=B0=8F=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 335 ++++++++++++++++++++++++++++++++++++++++++++++++++- fielddb/field_db.cpp | 6 +- fielddb/request.cpp | 15 +-- fielddb/request.h | 2 - 4 files changed, 337 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index ea0e2ee..a2a0b7f 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,12 @@ leveldb中的存储原本只支持简单的字节序列,在这个项目中我 # 2. 功能实现 ## 2.1 字段 -设计目标:对value存储读取时进行序列化编码,使其支持字段。 +设计目标:对value存储读取时进行序列化编码,使其支持字段。 +这一部分的具体代码在util/serialize_value.cc中 实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。 在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法`PutLengthPrefixedSlice`存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。 -``` +```c++ std::string SerializeValue(const FieldArray& fields){ std::sort(sortFields.begin(), sortFields.end(), compareByFirst); for (const Field& pairs : sortFields) { @@ -22,13 +23,237 @@ std::string SerializeValue(const FieldArray& fields){ ``` 最终db类提供了新接口`putFields`, `getFields`,分别对传入的字段序列化后调用原来的`put`, `get`接口。 `FindKeysByField`调用`NewIterator`遍历所有数据,field名和值符合则加入返回的key中。 -**这一部分的具体代码在util/serialize_value.cc中** + ## 2.2 二级索引 设计目标:对某个字段(属性)建立索引,提高对该字段的查询效率。 +这一部分的具体代码在field/下 ### 2.2.1 总体架构 -fielddb +1. **二级索引的设计** + +二级索引的难点主要包括以下几点:索引数据与kv数据的存储需要进行隔离,不同操作之间存在同步与异步问题,每一次的写入操作都需要额外考虑数据库原本的索引情况,任何操作还需要考虑两种数据间的一致性。为了使设计简洁化,避免不同模块耦合带来潜在的问题,我们的设计如下: + +总体上,我们对两种数据分别创建一个db类的对象kvDb, indexDb。对外的接口类FieldDb包含了这两个对象,提供原先的leveldb各种接口,以及新功能,并在这一层完成两个对象的管理。这两个子数据库共同协作,完成了二级索引的各核心操作。在此基础上,为了保证数据库崩溃时两个子数据库的一致性,我们设计了第三个子数据库metadb,它的作用类似于日志。 +```c++ +class FieldDB : DB { +public: + FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; + ~FieldDB(); +/*lab1的要求,以及作为db派生类要实现的虚函数*/ + Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; + Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; + Status Delete(const WriteOptions &options, const Slice &key) override; + Status Write(const WriteOptions &options, WriteBatch *updates) override; + Status Get(const ReadOptions &options, const Slice &key, std::string *value) override; + Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) override; + std::vector FindKeysByField(Field &field) override; + Iterator * NewIterator(const ReadOptions &options) override; + const Snapshot * GetSnapshot() override; + void ReleaseSnapshot(const Snapshot *snapshot) override; + bool GetProperty(const Slice &property, std::string *value) override; + void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override; + void CompactRange(const Slice *begin, const Slice *end) override; +/*与索引相关*/ + Status CreateIndexOnField(const std::string& field_name, const WriteOptions &op); + Status DeleteIndex(const std::string &field_name, const WriteOptions &op); + std::vector QueryByIndex(const Field &field, Status *s); + IndexStatus GetIndexStatus(const std::string &fieldName); + + static Status OpenFieldDB(Options& options,const std::string& name,FieldDB** dbptr); + +private: + //根据metaDB的内容进行恢复 + Status Recover(); + +private: + leveldb::DB *kvDB_; + leveldb::DB *metaDB_; + leveldb::DB *indexDB_; + + std::string dbname_; + const Options *options_; + Env *env_; +} +``` +这样的设计带来了如下的好处: +kv和index实现了完全的分离,并且由于各自都使用了leveldb构建的lsmtree,完全保证了内部实现的正确性。相应的,我们的工作基本只处在fielddb层,减少了模块的耦合,对于我们自己实现的正确性也有极大的提升。 + +所有leveldb原本的功能仍然能够支持,并且有些实现起来十分简单,比如: +```c++ +Iterator * FieldDB::NewIterator(const ReadOptions &options) { + return kvDB_->NewIterator(options); +} + +Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { + return kvDB_->GetFields(options, key, fields); +} +``` + +此外,性能开销增加也只在fielddb层,使我们能够进行比较和优化。 + +2. **index的编码** +index编码仍然采用了leveldb提供`PutLengthPrefixedSlice`,保留信息的同时,提高空间利用率。 +对于一个`key : {name : val}`的字段,索引采用如下编码: +```c++ +inline void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key){ + PutLengthPrefixedSlice(result, key.name_); + PutLengthPrefixedSlice(result, key.val_); + PutLengthPrefixedSlice(result, key.user_key_); +``` +这一部分也被模块化的封装在field/encode_index.h中。 + +由此产生了索引读的方法:根据name和val构建一个新的iterator,迭代获取范围内的所有key: +```c++ +std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { + if (index_.count(field.first) == 0 || index_[field.first].first != Exist){ + *s = Status::NotFound(Slice()); + return std::vector(); + } + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(Slice(), field.first, field.second)); + Iterator *indexIterator = indexDB_->NewIterator(ReadOptions()); + indexIterator->Seek(indexKey); + + std::vector result; + for (; indexIterator->Valid(); indexIterator->Next()) { + ParsedInternalIndexKey iterKey; + if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){ + if (iterKey.name_ == field.first && iterKey.val_ == field.second){ + result.push_back(iterKey.user_key_.ToString()); + continue; //查到说明在范围里,否则break + } + } + break; + } + delete indexIterator; + *s = Status::OK(); + return result; +} +``` + +3. **新的写流程** +索引功能的出现,使得写的逻辑需要重新设计。因为每一次写,不仅需要关注本次写入的字段,是不是需要同时写入索引,还需要关注本次写入的key,是不是覆盖了数据库原本的key,导致需要修改原本key的索引情况。这也意味着,即使是put简单的kv(不带字段),实际上还是需要修改put逻辑。方便起见,我们为原本的put中的value加入一个""的字段名,也视为putfield(这只是为了使我们的数据库支持原本的所有功能,也并不是本项目的重点,完全可以索性删除put功能,让我们的数据库只支持字段value)。 + +下面是putfield的实现思路: +```c++ +void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + SliceHashSet &batchKeySet) +{ + if (batchKeySet.find(Key) != batchKeySet.end()){ + return;//并发的被合并的put/delete请求只处理一次 + } else { + batchKeySet.insert(Key); + } + std::string val_str; + s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); + FieldSliceArray oldFields; + if (s.IsNotFound()){ + // oldFields = nullptr; + } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 + Slice nameSlice, valSlice; + Slice Value(val_str); + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + oldFields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + assert(0); + } + nameSlice.clear(), valSlice.clear(); + } + } else { + assert(0); + } + + bool HasIndex = false; + bool HasOldIndex = false; + { + DB->index_mu.AssertHeld(); + //1.将存在冲突的put pend到对应的请求 + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { + parent_req->PendReq(this->parent); + return; + } else if(index_status == IndexStatus::Exist) { + HasIndex = true; + } + } + } + //冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 + if (!oldFields.empty()){ + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { + parent_req->PendReq(this->parent); + return; + } else if(index_status == IndexStatus::Exist) { + HasOldIndex = true; + } + } + } + } + std::string scrach = SerializeValue(SliceFields); + KVBatch.Put(Slice(Key), Slice(scrach)); + //2.对于没有冲突但含有索引操作的put,构建metaKV + if(HasIndex || HasOldIndex) { + std::string MetaKey,MetaValue; + std::string serialized = SerializeValue(SliceFields); + MetaKV MKV = MetaKV(Key,serialized); + MKV.TransPut(MetaKey, MetaValue); + MetaBatch.Put(MetaKey, serialized); + + + //3.1对于含有索引的oldfield删除索引 + if (HasOldIndex) { + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString()) && //旧数据有,新数据没有的字段,删索引 + std::find(SliceFields.begin(), SliceFields.end(), + std::make_pair(field_name, field_value)) == SliceFields.end()) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + Key,field_name,field_value)); + IndexBatch.Delete(indexKey); + } + } + } + + //3.2对于含有索引的field建立索引 + if (HasIndex) { + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString())) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + Key,field_name,field_value)); + IndexBatch.Put(indexKey, Slice()); + } + } + } + + } + } +} +``` + +同理,delete也需要先读最新的数据,再进行相应的处理,这里简单贴上实现逻辑: +``` + //1. 读取当前的最新的键值对,判断是否存在含有键值对的field + //2.1 如果无,则正常构造delete + //2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 + //在kvDB和indexDB中写入对应的delete + //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 +``` + +上面的代码也展现了并发与恢复的部分,接下来会一一阐述。 ### 2.2.2 如何并发创删索引与读写 1. **为什么要并发控制创删索引与读写** @@ -238,8 +463,106 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): } ``` -### 2.2.3 如何保证两个kv与index的一致性 -metadb +### 2.2.3 如何保证kv与index的一致性 +metadb为异常恢复服务,只涉及到putfield和delete部分。(这里最初的设计有些问题,当时认为异常恢复也需要考虑创删索引部分,但实际上创删索引的本质,是一次往indexdb的writebatch,只会有索引整体写入成功和不成功两种情况,并不会出现不一致问题。) + +因此metadb的编码,只要在原本kv编码的基础上,加一个标志位,标识本条是来自putfield还是delete。 +metadb提供的功能被封装在fielddb/meta.cc中,包括编码: +```c++ +void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) { + MetaKey.clear(); + MetaValue.clear(); + std::string &buf = MetaKey; + PutFixed32(&buf, KV_Creating); + PutLengthPrefixedSlice(&buf, Slice(name)); +} + +void MetaKV::TransDelete(std::string &MetaKey) { + MetaKey.clear(); + std::string &buf = MetaKey; + PutFixed32(&buf, KV_Deleting); + PutLengthPrefixedSlice(&buf, Slice(name)); +} +``` + +以及kv和index写完后的清理(构建一个都是delete的writebatch,向metadb中写入): +```c++ +class CleanerHandler : public WriteBatch::Handler { +public: + WriteBatch *NeedClean; + void Put(const Slice& key, const Slice& value) override { + //将所有之前put的meta数据进行delete + NeedClean->Delete(key); + } + void Delete(const Slice& key) override { + //所有的传入的MetaBatch都是Put的 + assert(0); + } +}; + +void MetaCleaner::Collect(WriteBatch &MetaBatch) { + if(MetaBatch.ApproximateSize() <= 12) return; + CleanerHandler Handler; + Handler.NeedClean = &NeedClean; + MetaBatch.Iterate(&Handler); +} + +void MetaCleaner::CleanMetaBatch(DB *metaDB) { + if(NeedClean.ApproximateSize() <= 12) return; + metaDB->Write(WriteOptions(), &NeedClean); +} +``` + +相应的,我们数据库的恢复也是建立在三个数据库的协作之上: +在重新打开三个数据库,依靠各自的日志恢复各自的数据后,完成对索引相关内容的恢复: +```c++ +Status FieldDB::Recover() { + //1. 遍历所有Index类型的meta,重建内存中的index_状态表 + Iterator *Iter = indexDB_->NewIterator(ReadOptions()); + std::string IndexKey; + Iter->SeekToFirst(); + while(Iter->Valid()) { + IndexKey = Iter->key().ToString(); + ParsedInternalIndexKey ParsedIndex; + ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); + index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; + + //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) + std::string Seek; + PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); + Seek.push_back(0xff); + Iter->Seek(Slice(Seek)); + } + delete Iter; + //2. 寻找所有KV类型的meta,再次提交一遍请求 + Iter = metaDB_->NewIterator(ReadOptions()); + Slice MetaValue; + Iter->SeekToFirst(); + while (Iter->Valid()) { + MetaValue = Iter->key(); + MetaType type = MetaType(DecodeFixed32(MetaValue.data())); + MetaValue.remove_prefix(4);//移除头上的metaType的部分 + Slice extractKey; + GetLengthPrefixedSlice(&MetaValue, &extractKey); + if(type == KV_Creating) { + FieldArray fields; + ParseValue(Iter->value().ToString(), &fields); + PutFields(WriteOptions(), extractKey, fields); + } else if(type == KV_Deleting) { + Delete(WriteOptions(), extractKey); + } else { + assert(0 && "Invalid MetaType"); + } + Iter->Next(); + } + delete Iter; + //在所有的请求完成后,会自动把metaDB的内容清空。 + Iter = metaDB_->NewIterator(ReadOptions()); + Iter->SeekToFirst(); + delete Iter; + return Status::OK(); +} +``` ## 3. 测试 ### 3.1 正确性测试 diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 1e8cafe..f836556 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -23,10 +23,9 @@ namespace fielddb { using namespace leveldb; -//TODO:打开fieldDB +//打开fieldDB Status FieldDB::OpenFieldDB(Options& options, const std::string& name, FieldDB** dbptr) { - // options.env->CreateDir("./abc") if(*dbptr == nullptr){ return Status::NotSupported(name, "new a fieldDb first\n"); } @@ -375,11 +374,10 @@ Iterator * FieldDB::NewIterator(const ReadOptions &options) { return kvDB_->NewIterator(options); } -// TODO:使用统一seq进行snapshot管理 const Snapshot * FieldDB::GetSnapshot() { return kvDB_->GetSnapshot(); } -// TODO:同上 + void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) { kvDB_->ReleaseSnapshot(snapshot); } diff --git a/fielddb/request.cpp b/fielddb/request.cpp index a614f7b..ea4987e 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -59,13 +59,10 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //uint64_t start_ = DB->env_->NowMicros(); s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); //DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; - // FieldArray *oldFields; FieldSliceArray oldFields; if (s.IsNotFound()){ // oldFields = nullptr; - } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 - // oldFields = new FieldArray; - // oldFields = ParseValue(val_str,oldFields); + } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 Slice nameSlice, valSlice; Slice Value(val_str); while(GetLengthPrefixedSlice(&Value, &nameSlice)) { @@ -73,6 +70,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, oldFields.push_back({nameSlice,valSlice}); } else { std::cout << "name and val not match! From FieldsReq Init" << std::endl; + assert(0); } nameSlice.clear(), valSlice.clear(); } @@ -130,7 +128,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if (HasOldIndex) { for(auto &[field_name,field_value] : oldFields) { if(field_name.data() == EMPTY) continue; - if(DB->index_.count(field_name.ToString())) { + if(DB->index_.count(field_name.ToString()) && //旧数据有,新数据没有的字段,删索引 + std::find(SliceFields.begin(), SliceFields.end(), + std::make_pair(field_name, field_value)) == SliceFields.end()) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( Key,field_name,field_value)); @@ -153,10 +153,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } - //优化:对于3.1,3.2中都有的索引只写一次 } - - // if(oldFields) delete oldFields; } @@ -173,7 +170,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete //2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 - //在kvDB和metaDB中写入对应的delete + //在kvDB和indexDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); diff --git a/fielddb/request.h b/fielddb/request.h index b4847fe..8bfc23a 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -104,8 +104,6 @@ public: // std::string *Value; // }; -//TODO:下面的Field什么的可能通过传引用的方式会更加好? - //创建索引的request class iCreateReq : public Request { public: