仓库地址 https://gitea.shuishan.net.cn/10225501448/leveldb_proj2
leveldb中的存储原本只支持简单的字节序列,在这个项目中我们对其功能进行拓展,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。但如果仅通过字段查找数据,需要对整个数据库的遍历,不够高效,因此还要新增二级索引,提高对特定字段的查询效率。
设计目标:对value存储读取时进行序列化编码,使其支持字段。
实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。
在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法PutLengthPrefixedSlice
存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。
std::string SerializeValue(const FieldArray& fields){
std::sort(sortFields.begin(), sortFields.end(), compareByFirst);
for (const Field& pairs : sortFields) {
PutLengthPrefixedSlice(&result, pairs.first);
PutLengthPrefixedSlice(&result, pairs.second);
}
return result;
}
最终db类提供了新接口putFields
, getFields
,分别对传入的字段序列化后调用原来的put
, get
接口。
FindKeysByField
调用NewIterator
遍历所有数据,field名和值符合则加入返回的key中。
这一部分的具体代码在util/serialize_value.cc中
设计目标:对某个字段(属性)建立索引,提高对该字段的查询效率。
fielddb
对于不涉及索引操作的读写操作,leveldb已经完成了对于读读、写写和读写的并发控制。其中,读读和读写并发使用快照机制完成;读读的并发控制通过version和快照机制完成。
由于索引机制的加入,如果不进行并发控制的话,有可能会出现索引未创建完全、索引未删除完全等索引不一致的问题。由于创建索引的操作是一个先读后写的操作,有可能在迭代读取数据库的时候有一个并发的写操作且需要创建索引,由于leveldb的读是快照读,即使写入完成后,在当前快照下仍然无法读取到对应的数据,因此无法构建对应的索引。在这两个操作完成后,会出现kvDB中需要创建索引的数据未创建索引的问题。
对于并发控制,最简单的方法就是给整个数据库添加一把全局锁,每一个涉及修改的操作的全过程都需要持有这把全局锁。但是显然这种方法对于并发性能并不友好。主要原因如下:
首先,并发的写入操作有合并的可能。通过合并,可以将日志的小写入合并为一个大写入,减少日志写入的请求数量。其次,有些创删索引操作可以和写入操作并发执行。例如,对于Fieldaddress
创建索引的时候,如果一个(k,v)
的写入中,k的原始和当前值都不含address
Field,那么两者就可以并发的执行。由于创删索引的耗时通常比较长,对于这种相互独立的创删索引和写入的并发是非常有必要的。
FieldDB实现并发控制的思路很大程度上的借鉴了Leveldb的做法。FieldDB首先将写入操作、删除操作、WriteBatch的写入、创建索引和删除索引抽象为五种不同的request,分别是FieldsReq,DeleteReq,WriteBatchReq,iCreateReq,iDeleteReq。类似于Leveldb利用writer
和对应的队列将所有的写入操作合并,FieldDB采用了taskqueue
将上述的四种结构进行排队,在此基础上进行并发控制。使用队列作为基础进行并发控制的还有一个主要原因在于这和Leveldb的底层实现是对齐的。Leveldb的写入本质上是一个单线程的写入,现在把多线程的同步操作提升到FieldDB层完成,对于下面kvDB、indexDB、metaDB的数据访问都是单线程的。此外,在FieldDB层就完成并发控制对于可移植性可能会有好处。只要底层的数据库的put、delete操作的语义相同,我们就可以方便的进行替换,不用考虑具体的底层数据库的并发控制模式。
当然,这套方案的也会有一定的问题,我们将在后文进行讨论。
taskqueue_
和HandleRequest
的实现细节两者的配合完成了类似于Leveldb的并发控制。代码基础结构如下所示:
Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
while(true){
while(req.isPending() || !req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
if(req.done) {
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
/*==============================请求具体处理开始=================================*/
WriteBatch KVBatch,IndexBatch,MetaBatch;
SliceHashSet batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break;
}
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
if(MetaBatch.ApproximateSize() > 12) {
status = metaDB_->Write(op, &MetaBatch);
assert(status.ok());
}
//TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) {
status = indexDB_->Write(op, &IndexBatch);
}
if(KVBatch.ApproximateSize() > 12) {
status = kvDB_->Write(op, &KVBatch);
}
//3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) {
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
}
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
/*==============================请求具体处理结束=================================*/
while(true) {
Request *ready = taskqueue_.front();
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
if (ready != &req) ready->cond_.Signal();
}
if (ready == tail) break;
}
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
}
}
在请求具体处理部分之外就是并发控制的实现,可以看到,整体的实现基本上和Leveldb类似,最大的不同是整个逻辑的最外层是一个死循环,是为了实现请求的pending。这样做的原因是因为有些操作会由于和别的操作产生冲突,如果不等待别的操作完成就会导致索引不一致。比方说一个含有索引的put操作和一个正在进行的该索引的创建操作并发发生,那么前者就要进入pending状态直到索引创建操作完成,否则因为Leveldb快照读的问题不会读取到当前的写入,导致索引创建丢失。此外还有对于同一个索引的创建和删除的并发等情况。请求pending的实现和部分调用场景示例如下所示:
//创建索引的request
class iCreateReq : public Request {
public:
iCreateReq(Slice Field,port::Mutex *mu):
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
bool Existed;
Slice Field;
std::deque<Request *> pending_list;
};
void iCreateReq::PendReq(Request *req) {
req->parent = this;
pending_list.push_back(req);
}
/*部分写入请求的构建,实现含有索引的put和对于该索引创删操作的并发*/
//1.将存在冲突的put pend到对应的请求
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
}
}
另外一个主要的部分是GetHandleInterval
函数。这个函数会在taskqueue_
队列里面选择一段合适的请求进行处理。具体来说,就是所有创删索引的操作都需要单独进行,而且其他的操作都可以放在一起合并处理。创删索引需要单独处理是因为需要保证创删索引前的数据都要能够完成写入,这样在创删索引迭代读取数据库的时候就可以不漏每一个已有的写入。
对于请求具体处理部分。FieldDB将请求的执行步骤进行抽象化,由一个统一的Request
基类进行驱动。对于创删索引操作,由于所需要的时间较长,所以分为上下两部进行。上半部在队列中完成Prepare
操作,更新内存中的索引状态表。下半部退出taskqueue_
,在请求的线程中依次完成访问全部的数据库并构建二级索引、二级索引写入indexDB、更新内存索引状态表并唤醒等待创删请求完成的写入。唤醒等待请求的操作本质上是清楚请求的等待状态并且重新加入taskqueue_
。创删索引执行流程的下半部代码的主要代码如下(删除索引操作类似):
Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) {
iCreateReq req(field_name,&mutex_);
HandleRequest(req, op);
//如果已经存在索引,那么直接返回
if(req.Existed) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
return req.s;
}
void iCreateReq::Finalize(FieldDB *DB) {
//1. 写入完成后,更新index状态表
MutexLock iL(&DB->index_mu);
DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
对于WriteBatch的写入,我们采用了一个比较tricky的方法。在BatchReq中构建子FieldsReq和DeleteReq请求,最大程度的复用已有的代码逻辑。
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
sub_requests->emplace_back(new FieldsReq(key,value,mu));
sub_requests->back()->parent = req;
}
void Delete(const Slice &key) override {
sub_requests->emplace_back(new DeleteReq(key,mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
metadb
测试、分析、优化