# 实验报告 仓库地址 https://gitea.shuishan.net.cn/10225501448/leveldb_proj2 # 1. 项目概述 leveldb中的存储原本只支持简单的字节序列,在这个项目中我们对其功能进行拓展,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。但如果仅通过字段查找数据,需要对整个数据库的遍历,不够高效,因此还要新增二级索引,提高对特定字段的查询效率。 # 2. 功能实现 ## 2.1 字段 设计目标:对value存储读取时进行序列化编码,使其支持字段。 实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。 在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法`PutLengthPrefixedSlice`存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。 ``` std::string SerializeValue(const FieldArray& fields){ std::sort(sortFields.begin(), sortFields.end(), compareByFirst); for (const Field& pairs : sortFields) { PutLengthPrefixedSlice(&result, pairs.first); PutLengthPrefixedSlice(&result, pairs.second); } return result; } ``` 最终db类提供了新接口`putFields`, `getFields`,分别对传入的字段序列化后调用原来的`put`, `get`接口。 `FindKeysByField`调用`NewIterator`遍历所有数据,field名和值符合则加入返回的key中。 **这一部分的具体代码在util/serialize_value.cc中** ## 2.2 二级索引 设计目标:对某个字段(属性)建立索引,提高对该字段的查询效率。 ### 2.2.1 总体架构 fielddb ### 2.2.2 如何并发创删索引与读写 1. **为什么要并发控制创删索引与读写** 对于不涉及索引操作的读写操作,leveldb已经完成了对于读读、写写和读写的并发控制。其中,读读和读写并发使用快照机制完成;读读的并发控制通过version和快照机制完成。 由于索引机制的加入,如果不进行并发控制的话,有可能会出现索引未创建完全、索引未删除完全等索引不一致的问题。由于创建索引的操作是一个先读后写的操作,有可能在迭代读取数据库的时候有一个并发的写操作且需要创建索引,由于leveldb的读是快照读,即使写入完成后,在当前快照下仍然无法读取到对应的数据,因此无法构建对应的索引。在这两个操作完成后,会出现kvDB中需要创建索引的数据未创建索引的问题。 2. **并发控制的设计目标** 对于并发控制,最简单的方法就是给整个数据库添加一把全局锁,每一个涉及修改的操作的全过程都需要持有这把全局锁。但是显然这种方法对于并发性能并不友好。主要原因如下: 首先,并发的写入操作有合并的可能。通过合并,可以将日志的小写入合并为一个大写入,减少日志写入的请求数量。其次,有些创删索引操作可以和写入操作并发执行。例如,对于Field`address`创建索引的时候,如果一个`(k,v)`的写入中,k的原始和当前值都不含`address`Field,那么两者就可以并发的执行。由于创删索引的耗时通常比较长,对于这种相互独立的创删索引和写入的并发是非常有必要的。 3. **并发控制的整体架构** FieldDB实现并发控制的思路很大程度上的借鉴了Leveldb的做法。FieldDB首先将写入操作、删除操作、WriteBatch的写入、创建索引和删除索引抽象为五种不同的request,分别是FieldsReq,DeleteReq,WriteBatchReq,iCreateReq,iDeleteReq。类似于Leveldb利用`writer`和对应的队列将所有的写入操作合并,FieldDB采用了`taskqueue`将上述的四种结构进行排队,在此基础上进行并发控制。使用队列作为基础进行并发控制的还有一个主要原因在于这和Leveldb的底层实现是对齐的。Leveldb的写入本质上是一个单线程的写入,现在把多线程的同步操作提升到FieldDB层完成,对于下面kvDB、indexDB、metaDB的数据访问都是单线程的。此外,在FieldDB层就完成并发控制对于可移植性可能会有好处。只要底层的数据库的put、delete操作的语义相同,我们就可以方便的进行替换,不用考虑具体的底层数据库的并发控制模式。 当然,这套方案的也会有一定的问题,我们将在后文进行讨论。 4. **实现细节** - **`taskqueue_`和`HandleRequest`的实现细节** 两者的配合完成了类似于Leveldb的并发控制。代码基础结构如下所示: ```c++ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { MutexLock L(&mutex_); taskqueue_.push_back(&req); while(true){ while(req.isPending() || !req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } if(req.done) { return req.s; //在返回时自动释放锁L } Request *tail = GetHandleInterval(); /*==============================请求具体处理开始=================================*/ WriteBatch KVBatch,IndexBatch,MetaBatch; SliceHashSet batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 MutexLock iL(&index_mu); for(auto *req_ptr : taskqueue_) { req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); if(req_ptr == tail) break; } } //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 //此处可以放锁是因为写入的有序性可以通过队列来保证 mutex_.Unlock(); if(MetaBatch.ApproximateSize() > 12) { status = metaDB_->Write(op, &MetaBatch); assert(status.ok()); } //TODO:index的写入需要在另外一个线程中同时完成 if(IndexBatch.ApproximateSize() > 12) { status = indexDB_->Write(op, &IndexBatch); } if(KVBatch.ApproximateSize() > 12) { status = kvDB_->Write(op, &KVBatch); } //3. 将meta数据清除 if(MetaBatch.ApproximateSize() > 12) { MetaCleaner cleaner; cleaner.Collect(MetaBatch); cleaner.CleanMetaBatch(metaDB_); } mutex_.Lock(); } else { //对于创建和删除索引的请求,通过prepare完成索引状态的更新 MutexLock iL(&index_mu); req.Prepare(this); } /*==============================请求具体处理结束=================================*/ while(true) { Request *ready = taskqueue_.front(); taskqueue_.pop_front(); //当前ready不是队首,不是和index的创建有关 if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { ready->s = status; ready->done = true; if (ready != &req) ready->cond_.Signal(); } if (ready == tail) break; } if(!taskqueue_.empty()) { taskqueue_.front()->cond_.Signal(); } //如果done==true,那么就不会继续等待直接退出 //如果处于某个请求的pending list里面,那么就会继续等待重新入队 } } ``` 在请求具体处理部分之外就是并发控制的实现,可以看到,整体的实现基本上和Leveldb类似,最大的不同是整个逻辑的最外层是一个死循环,是为了实现请求的pending。这样做的原因是因为有些操作会由于和别的操作产生冲突,如果不等待别的操作完成就会导致索引不一致。比方说一个含有索引的put操作和一个正在进行的该索引的创建操作并发发生,那么前者就要进入pending状态直到索引创建操作完成,否则因为Leveldb快照读的问题不会读取到当前的写入,导致索引创建丢失。此外还有对于同一个索引的创建和删除的并发等情况。请求pending的实现和部分调用场景示例如下所示: ```c++ //创建索引的request class iCreateReq : public Request { public: iCreateReq(Slice Field,port::Mutex *mu): Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; bool Existed; Slice Field; std::deque pending_list; }; void iCreateReq::PendReq(Request *req) { req->parent = this; pending_list.push_back(req); } /*部分写入请求的构建,实现含有索引的put和对于该索引创删操作的并发*/ //1.将存在冲突的put pend到对应的请求 for(auto &[field_name,field_value] : SliceFields) { if(field_name.data() == EMPTY) break; if(DB->index_.count(field_name.ToString())) { auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; } } } ``` 另外一个主要的部分是`GetHandleInterval`函数。这个函数会在`taskqueue_`队列里面选择一段合适的请求进行处理。具体来说,就是所有创删索引的操作都需要单独进行,而且其他的操作都可以放在一起合并处理。创删索引需要单独处理是因为需要保证创删索引前的数据都要能够完成写入,这样在创删索引迭代读取数据库的时候就可以不漏每一个已有的写入。 对于请求具体处理部分。FieldDB将请求的执行步骤进行抽象化,由一个统一的`Request`基类进行驱动。对于创删索引操作,由于所需要的时间较长,所以分为上下两部进行。上半部在队列中完成`Prepare`操作,更新内存中的索引状态表。下半部退出`taskqueue_`,在请求的线程中依次完成访问全部的数据库并构建二级索引、二级索引写入indexDB、更新内存索引状态表并唤醒等待创删请求完成的写入。唤醒等待请求的操作本质上是清楚请求的等待状态并且重新加入`taskqueue_`。创删索引执行流程的下半部代码的主要代码如下(删除索引操作类似): ```c++ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) { iCreateReq req(field_name,&mutex_); HandleRequest(req, op); //如果已经存在索引,那么直接返回 if(req.Existed) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; SliceHashSet useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); return req.s; } void iCreateReq::Finalize(FieldDB *DB) { //1. 写入完成后,更新index状态表 MutexLock iL(&DB->index_mu); DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr}; DB->index_mu.Unlock(); if (pending_list.empty()) return; //2. 将所有的pendinglist重新入队 MutexLock L(&DB->mutex_); for (auto req : pending_list){ DB->taskqueue_.push_back(req); req->parent = req; //解绑 } if (pending_list[0] == DB->taskqueue_.front()) { pending_list[0]->cond_.Signal(); } this->s = Status::OK(); } ``` 对于WriteBatch的写入,我们采用了一个比较tricky的方法。在BatchReq中构建子FieldsReq和DeleteReq请求,最大程度的复用已有的代码逻辑。 ```c++ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): Batch(Batch),Request(BatchReq_t, mu) { struct BatchHandler : WriteBatch::Handler { void Put(const Slice &key, const Slice &value) override { sub_requests->emplace_back(new FieldsReq(key,value,mu)); sub_requests->back()->parent = req; } void Delete(const Slice &key) override { sub_requests->emplace_back(new DeleteReq(key,mu)); sub_requests->back()->parent = req; } BatchReq *req; port::Mutex *mu; std::deque *sub_requests; }; BatchHandler Handler; Handler.req = this; Handler.mu = mu; Handler.sub_requests = &sub_requests; Batch->Iterate(&Handler); } ``` ### 2.2.3 如何保证两个kv与index的一致性 metadb ## 3. 测试 ### 3.1 正确性测试 相关代码在`test/`下 #### 3.1.1 封装函数功能 相关代码在`helper.cc`中 在所有测试中,绝大部分key的值数字>0,同时也支持单独写入key<=0,方便测试观察。 value一共使用到了三个字段:name,address,age。name是`"customer#+"key`,address每次在给定的城市数组中抽取一个,age限定在0~100, 因此后两者会有大量重复,测试中的索引都是建立在这两个字段上。 由于测试中需要检查写入时与查询时,拥有相应的字段的kv是否对应,因此需要维护一个`set`。又因为测试涉及到并发,因此封装了一个线程安全的`ThreadSafeSet`类,对于set的操作有锁保护。具体测试中使用两个该类统计了两类key:address为shanghai的key和age为20的key。 ``` ThreadSafeSet shanghaiKeys; ThreadSafeSet age20Keys; ``` 下面大致介绍一下封装函数的功能: InsertOneField:只插一条特定数据的测试(让key<=0, 和批量写入区别开)。使用它是为了保证有些地方的写入必须正确,并通过追踪对应的key发现系统中潜在的问题。 DeleteOneField:只删一条特定数据的测试,功能与单条插入相似,主要用来测试delete。 GetOneField:只读一条特定数据的测试,与上面两条对应。 对于所有涉及随机性的函数,传参一个随机种子。只要随机种子一致,随机生成的内容就一致,相应的读需要和相应的写、删保持一致。此外测试的数据量也是可以修改的。 InsertFieldData:批量写入,按照上面提到的测试规则生成kv,并调用putfields。检查返回状态是否ok。同时对于生成的字段,如果是address为shanghai或age为20,统计入相应的set。 DeleteFieldData:批量删除,按照规则(>0)生成key,并调用delete。检查返回状态是否ok。 WriteFieldData:与InsertFieldData相似,但生成kv后不直接调用putfields,而是写入writebatch,全部生成完在调用write把batch写入数据库。用来测试write功能,检查返回状态是否ok。 GetFieldData:测试读,按照规则(>0)生成key,并调用getfield。由于并发时不一定能读到,提供一个allownotfound参数。如果true,返回状态可以是notfound。如果false,就只能是ok。对于读到的field,检测是否被正确解析,以及每个字段是否满足规则。 GetDeleteData:检查对应种子的所有生成数据有没有删除干净(getfield的所有返回状态都是notfound)。 findKeysByCity:调用`FindKeysByField({"address", "Shanghai"})`,检查返回的所有key是否都在shanghaiKeys中(insert时统计的)。 findKeysByCityIndex:提供一个参数haveIndex,表明数据库有没有该索引(address)。调用`querybyindex({"address", "Shanghai"})`,如果haveindex检测返回状态ok,并且所有的key都要在shanghaiKeys中。 findKeysByAgeIndex:与上条相同,检测age字段。 checkDataInKVAndIndex: 在并发写删与恢复测试中,不能保证每个种子序列的所有key都在数据库中。但需要保证的是,kv和index中的数据是一致的。这里先后调用QueryByIndex和FindKeysByField,比较得到的key,确保两者一致。 #### 3.1.2 基础测试 相关代码在`basic_function_test.cc`中 这一部分主要测试每个功能在正常使用中是否正确,按照逻辑简单调用封装的功能函数。 TestLab1流程: 批量写 -> 必须读到 -> findkeysbycity -> 批量删 -> 必须全读不到。 TestLab2流程:批量写 -> 创索引address,age -> 索引查询address,age -> 删索引address -> 索引查询address(haveindex=false) -> 索引查询age -> 批量删 -> 索引查询age(索引还在能查,但返回的key数量为0) -> write -> 必须读到 -> 索引查询age。 至此,上面的流程基本覆盖了我们数据库的每个基础功能。 #### 3.1.2 并发测试 相关代码在`parallel_test.cc`中 这一部分主要测试读、写、创删索引之间的并发。每个测试中并发线程的数量也是可以修改的。 TestReadPut:创索引 -> 并发:两线程写(不同随机种子)三线程读(不保证能读到)-> 读两次写相应的随机种子(必须读到)-> 索引查(返回的key在两次写入中) -> 检查两数据库一致性(checkDataInKVAndIndex)。 TestPutCreatei:先批量写入一次数据 -> 并发:一线程创索引,一线程忙等,至数据库开始创建索引后单条写入 -> 检测索引创建是否成功(创建索引前批量写的数据,能通过索引查得到)-> 读单条写入 -> 检测数据库一致性。 TestCreateiCreatei:先批量写入一次数据 -> 并发:三线程创索引address -> 索引查 -> 一致性 -> 并发:两线程删索引address,一线程创索引age -> 索引查(address无,age有)-> 一致性。 TestPutDeleteOne(有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性): 创索引 -> 并发:对于100条数据,10线程插入,10线程删除 -> 一致性。 TestPutDelete:创索引 -> 并发:两线程写0、1种子数据,两线程删0、1种子数据 -> 一致性。 TestWrite(在之前基础上并发所有,并加入write):创索引address、批量写种子2 -> 并发:线程0创索引age,其他线程忙等至开始创建,线程1批量写种子0,线程2write种子1, 线程3删索引age -> 检测:种子012所有数据都应被读到,一致性, age索引被删除。 这里的测试也可以加入delete,或不删索引age检测age的一致性,具体见注释。 至此,上面的流程基本覆盖了我们数据库的每个基础功能之间的并发。 #### 3.1.2 恢复测试 相关代码在`recover_test.cc`中 这一部分主要测试正常与异常的恢复。 TestNormalRecover:创索引、批量写、此时之前测试都检测过能被读到 -> delete db -> 重新open -> 读数据、索引查(之前写入的数据仍能被读到)。 TestParalRecover**该测试比较特别,需要运行两次**:创索引 -> 并发:线程0批量写,线程1write,线程2delete,线程3 在单条插入后,deletedb。线程3导致了其他线程错误,测试会终止(模拟数据库崩溃),这会导致各线程在各种奇怪的时间点崩溃。此时注释掉上半部分代码,运行下半部分:单条写入能被读到,并检测一致性。 这里我们运行了几十次,前半部分的崩溃报错有多种,但后半部分的运行都是成功的。同时也追踪了恢复的运行过程,确实有数据从metadb中被正确解析。 ### 3.2 性能测试 测试、分析、优化 ## 4. 问题与解决 ## 5. 潜在优化点 1. 使用一些高性能并发设计模式,如reactor来优化多线程写入时由于锁竞争导致的性能问题 2. 采用一些高性能的库,如dpdk等 ## 6. 分工