diff --git a/README.md b/README.md index 3bc814d..16463ee 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,212 @@ std::string SerializeValue(const FieldArray& fields){ fielddb ### 2.2.2 如何并发创删索引与读写 -request +1. **为什么要并发控制创删索引与读写** + +对于不涉及索引操作的读写操作,leveldb已经完成了对于读读、写写和读写的并发控制。其中,读读和读写并发使用快照机制完成;读读的并发控制通过version和快照机制完成。 + +由于索引机制的加入,如果不进行并发控制的话,有可能会出现索引未创建完全、索引未删除完全等索引不一致的问题。由于创建索引的操作是一个先读后写的操作,有可能在迭代读取数据库的时候有一个并发的写操作且需要创建索引,由于leveldb的读是快照读,即使写入完成后,在当前快照下仍然无法读取到对应的数据,因此无法构建对应的索引。在这两个操作完成后,会出现kvDB中需要创建索引的数据未创建索引的问题。 + +2. **并发控制的设计目标** + +对于并发控制,最简单的方法就是给整个数据库添加一把全局锁,每一个涉及修改的操作的全过程都需要持有这把全局锁。但是显然这种方法对于并发性能并不友好。主要原因如下: + +首先,并发的写入操作有合并的可能。通过合并,可以将日志的小写入合并为一个大写入,减少日志写入的请求数量。其次,有些创删索引操作可以和写入操作并发执行。例如,对于Field`address`创建索引的时候,如果一个`(k,v)`的写入中,k的原始和当前值都不含`address`Field,那么两者就可以并发的执行。由于创删索引的耗时通常比较长,对于这种相互独立的创删索引和写入的并发是非常有必要的。 + + +3. **并发控制的整体架构** + +FieldDB实现并发控制的思路很大程度上的借鉴了Leveldb的做法。FieldDB首先将写入操作、删除操作、WriteBatch的写入、创建索引和删除索引抽象为五种不同的request,分别是FieldsReq,DeleteReq,WriteBatchReq,iCreateReq,iDeleteReq。类似于Leveldb利用`writer`和对应的队列将所有的写入操作合并,FieldDB采用了`taskqueue`将上述的四种结构进行排队,在此基础上进行并发控制。使用队列作为基础进行并发控制的还有一个主要原因在于这和Leveldb的底层实现是对齐的。Leveldb的写入本质上是一个单线程的写入,现在把多线程的同步操作提升到FieldDB层完成,对于下面kvDB、indexDB、metaDB的数据访问都是单线程的。此外,在FieldDB层就完成并发控制对于可移植性可能会有好处。只要底层的数据库的put、delete操作的语义相同,我们就可以方便的进行替换,不用考虑具体的底层数据库的并发控制模式。 + +当然,这套方案的也会有一定的问题,我们将在后文进行讨论。 + +4. **实现细节** + +- **`taskqueue_`和`HandleRequest`的实现细节** + +两者的配合完成了类似于Leveldb的并发控制。代码基础结构如下所示: +```c++ +Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { + MutexLock L(&mutex_); + taskqueue_.push_back(&req); + while(true){ + while(req.isPending() || !req.done && &req != taskqueue_.front()) { + req.cond_.Wait(); + } + if(req.done) { + return req.s; //在返回时自动释放锁L + } + Request *tail = GetHandleInterval(); + /*==============================请求具体处理开始=================================*/ + WriteBatch KVBatch,IndexBatch,MetaBatch; + SliceHashSet batchKeySet; + Status status; + if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { + //表明这一个区间并没有涉及index的创建删除 + { + //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 + MutexLock iL(&index_mu); + for(auto *req_ptr : taskqueue_) { + req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); + if(req_ptr == tail) break; + } + } + //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 + //此处可以放锁是因为写入的有序性可以通过队列来保证 + mutex_.Unlock(); + if(MetaBatch.ApproximateSize() > 12) { + status = metaDB_->Write(op, &MetaBatch); + assert(status.ok()); + } + //TODO:index的写入需要在另外一个线程中同时完成 + if(IndexBatch.ApproximateSize() > 12) { + status = indexDB_->Write(op, &IndexBatch); + } + if(KVBatch.ApproximateSize() > 12) { + status = kvDB_->Write(op, &KVBatch); + } + //3. 将meta数据清除 + if(MetaBatch.ApproximateSize() > 12) { + MetaCleaner cleaner; + cleaner.Collect(MetaBatch); + cleaner.CleanMetaBatch(metaDB_); + } + mutex_.Lock(); + } else { + //对于创建和删除索引的请求,通过prepare完成索引状态的更新 + MutexLock iL(&index_mu); + req.Prepare(this); + } + /*==============================请求具体处理结束=================================*/ + while(true) { + Request *ready = taskqueue_.front(); + taskqueue_.pop_front(); + //当前ready不是队首,不是和index的创建有关 + if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { + ready->s = status; + ready->done = true; + if (ready != &req) ready->cond_.Signal(); + } + if (ready == tail) break; + } + + + if(!taskqueue_.empty()) { + taskqueue_.front()->cond_.Signal(); + } + //如果done==true,那么就不会继续等待直接退出 + //如果处于某个请求的pending list里面,那么就会继续等待重新入队 + } +} +``` + +在请求具体处理部分之外就是并发控制的实现,可以看到,整体的实现基本上和Leveldb类似,最大的不同是整个逻辑的最外层是一个死循环,是为了实现请求的pending。这样做的原因是因为有些操作会由于和别的操作产生冲突,如果不等待别的操作完成就会导致索引不一致。比方说一个含有索引的put操作和一个正在进行的该索引的创建操作并发发生,那么前者就要进入pending状态直到索引创建操作完成,否则因为Leveldb快照读的问题不会读取到当前的写入,导致索引创建丢失。此外还有对于同一个索引的创建和删除的并发等情况。请求pending的实现和部分调用场景示例如下所示: +```c++ +//创建索引的request +class iCreateReq : public Request { +public: + iCreateReq(Slice Field,port::Mutex *mu): + Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; + void Prepare(FieldDB *DB) override; + void Finalize(FieldDB *DB) override; + void PendReq(Request *req) override; + + bool Existed; + Slice Field; + std::deque pending_list; +}; + +void iCreateReq::PendReq(Request *req) { + req->parent = this; + pending_list.push_back(req); +} + +/*部分写入请求的构建,实现含有索引的put和对于该索引创删操作的并发*/ +//1.将存在冲突的put pend到对应的请求 +for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { + parent_req->PendReq(this->parent); + return; + } else if(index_status == IndexStatus::Exist) { + HasIndex = true; + } + } +} +``` + +另外一个主要的部分是`GetHandleInterval`函数。这个函数会在`taskqueue_`队列里面选择一段合适的请求进行处理。具体来说,就是所有创删索引的操作都需要单独进行,而且其他的操作都可以放在一起合并处理。创删索引需要单独处理是因为需要保证创删索引前的数据都要能够完成写入,这样在创删索引迭代读取数据库的时候就可以不漏每一个已有的写入。 + +对于请求具体处理部分。FieldDB将请求的执行步骤进行抽象化,由一个统一的`Request`基类进行驱动。对于创删索引操作,由于所需要的时间较长,所以分为上下两部进行。上半部在队列中完成`Prepare`操作,更新内存中的索引状态表。下半部退出`taskqueue_`,在请求的线程中依次完成访问全部的数据库并构建二级索引、二级索引写入indexDB、更新内存索引状态表并唤醒等待创删请求完成的写入。唤醒等待请求的操作本质上是清楚请求的等待状态并且重新加入`taskqueue_`。创删索引执行流程的下半部代码的主要代码如下(删除索引操作类似): +```c++ +Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) { + iCreateReq req(field_name,&mutex_); + HandleRequest(req, op); + //如果已经存在索引,那么直接返回 + if(req.Existed) { + return req.s; + } + WriteBatch KVBatch,IndexBatch,MetaBatch; + SliceHashSet useless; + req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); + indexDB_->Write(op, &IndexBatch); + req.Finalize(this); + return req.s; +} + +void iCreateReq::Finalize(FieldDB *DB) { + //1. 写入完成后,更新index状态表 + MutexLock iL(&DB->index_mu); + DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr}; + DB->index_mu.Unlock(); + + if (pending_list.empty()) return; + //2. 将所有的pendinglist重新入队 + MutexLock L(&DB->mutex_); + for (auto req : pending_list){ + DB->taskqueue_.push_back(req); + req->parent = req; //解绑 + } + if (pending_list[0] == DB->taskqueue_.front()) { + pending_list[0]->cond_.Signal(); + } + this->s = Status::OK(); +} +``` + +对于WriteBatch的写入,我们采用了一个比较tricky的方法。在BatchReq中构建子FieldsReq和DeleteReq请求,最大程度的复用已有的代码逻辑。 +```c++ +BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): + Batch(Batch),Request(BatchReq_t, mu) { + + struct BatchHandler : WriteBatch::Handler { + void Put(const Slice &key, const Slice &value) override { + sub_requests->emplace_back(new FieldsReq(key,value,mu)); + sub_requests->back()->parent = req; + } + void Delete(const Slice &key) override { + sub_requests->emplace_back(new DeleteReq(key,mu)); + sub_requests->back()->parent = req; + } + + BatchReq *req; + port::Mutex *mu; + std::deque *sub_requests; + }; + + BatchHandler Handler; + Handler.req = this; + Handler.mu = mu; + Handler.sub_requests = &sub_requests; + + Batch->Iterate(&Handler); +} +``` ### 2.2.3 如何保证两个kv与index的一致性 metadb @@ -45,5 +250,6 @@ metadb ## 4. 问题与解决 ## 5. 潜在优化点 - +1. 使用一些高性能并发设计模式,如reactor来优化多线程写入时由于锁竞争导致的性能问题 +2. 采用一些高性能的库,如dpdk等 ## 6. 分工 diff --git a/benchmarks/db_bench_FieldDB.cc b/benchmarks/db_bench_FieldDB.cc index 65e980a..a061fcc 100644 --- a/benchmarks/db_bench_FieldDB.cc +++ b/benchmarks/db_bench_FieldDB.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include @@ -49,37 +50,42 @@ using namespace fielddb; // sstables -- Print sstable info // heapprofile -- Dump a heap profile (if supported by this port) static const char* FLAGS_benchmarks = + // "fillseq," + // "fillsync," + // "fillrandom," + // "overwrite," + // "readrandom," + // "readrandom," // Extra run to allow previous compactions to quiesce + // "readseq," + // "readreverse," + // "compact," + // "readrandom," + // "readseq," + // "readreverse," + // "fill100K," + // "crc32c," + // "CreateIndex," + // "FindKeysByField," + // "QueryByIndex," + // "DeleteIndex," + // "compact," + // "WriteSeqWhileCreating," + // "WriteSeqWhileDeleting," + // "compact," + // "WriteRandomWhileCreating," + // "WriteRandomWhileDeleting," + // "compact," + // "ReadSeqWhileCreating," + // "ReadSeqWhileDeleting," + // "ReadRandomWhileCreating," + // "ReadRandomWhileDeleting," + // "WriteRandomWithIndex," + // "WriteSeqWithIndex," "fillseq," - "fillsync," - "fillrandom," - "overwrite," - "readrandom," - "readrandom," // Extra run to allow previous compactions to quiesce - "readseq," - "readreverse," - "compact," - "readrandom," - "readseq," - "readreverse," - "fill100K," - "crc32c," - "CreateIndex," - "FindKeysByField," - "QueryByIndex," - "DeleteIndex," - "compact," - "WriteSeqWhileCreating," - "WriteSeqWhileDeleting," - "compact," - "WriteRandomWhileCreating," - "WriteRandomWhileDeleting," - "compact," - "ReadSeqWhileCreating," - "ReadSeqWhileDeleting," - "ReadRandomWhileCreating," - "ReadRandomWhileDeleting," - "WriteRandomWithIndex," - "WriteSeqWithIndex,"; + "WriteSeqWhileIndependentCCD," + "fillseq," + "WriteSeqWhileCCD," + ; // Number of key/values to place in database static int FLAGS_num = 1000000; @@ -146,6 +152,8 @@ static const char* FLAGS_db = nullptr; // ZSTD compression level to try out static int FLAGS_zstd_compression_level = 1; +static bool FLAGS_contain_field_age = true; + namespace leveldb { namespace { @@ -348,8 +356,8 @@ class Stats { } AppendWithSpace(&extra, message_); - std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", - name.ToString().c_str(), seconds_ * 1e6 / done_, + std::fprintf(stdout, "%-12s : %11.3f micros/op(%10d);%s%s\n", + name.ToString().c_str(), seconds_ * 1e6 / done_,done_, (extra.empty() ? "" : " "), extra.c_str()); if (FLAGS_histogram) { std::fprintf(stdout, "Microseconds per op:\n%s\n", @@ -593,6 +601,9 @@ class Benchmark { bool fresh_db = false; int num_threads = FLAGS_threads; + FLAGS_key_prefix = 0; + FLAGS_contain_field_age = true; + if (name == Slice("open")) { method = &Benchmark::OpenBench; num_ /= 10000; @@ -686,6 +697,14 @@ class Benchmark { } else if (name == Slice("WriteSeqWithIndex")) { fresh_db = true; method = &Benchmark::WriteSeqWithIndex; + } else if (name == Slice("WriteSeqWhileIndependentCCD")) { + FLAGS_key_prefix = 1; + FLAGS_contain_field_age = false; + num_threads++; + method = &Benchmark::WriteSeqWhileIndependentCCD; + } else if (name == Slice("WriteSeqWhileCCD")) { + num_threads++; + method = &Benchmark::WriteSeqWhileCCD; } else if (name == Slice("snappycomp")) { method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { @@ -792,6 +811,10 @@ class Benchmark { } shared.mu.Unlock(); + for(int i = 0; i < n; i++) { + arg[i].thread->stats.Report(name); + } + for (int i = 1; i < n; i++) { arg[0].thread->stats.Merge(arg[i].thread->stats); } @@ -917,7 +940,7 @@ class Benchmark { std::string tag = gen.Generate(value_size_).ToString(); FieldArray fields = { {"name", name}, - {"age", age}, + {std::string("age") + (FLAGS_contain_field_age?"":"a"), age}, {"tag", tag} }; std::string value = SerializeValue(fields); @@ -1174,6 +1197,52 @@ class Benchmark { } } + void ContinueCreatingDeleting(ThreadState* thread) { + int flag = 1; + while(true) { + { + MutexLock L(&thread->shared->mu); + if(thread->shared->num_done == thread->shared->total - 1) { + return; + } + } + if(flag) { + db_->CreateIndexOnField("age", write_options_); + } else { + db_->DeleteIndex("age", write_options_); + } + thread->stats.FinishedSingleOp(); + flag ^= 1; + g_env->SleepForMicroseconds(100000); + } + } + + void WriteSeqWhileIndependentCCD(ThreadState* thread) { + if (thread->tid > 0) { + WriteSeq(thread); + } else { + // Special thread that keeps creating index until other threads are done. + if (db_->GetIndexStatus("age") != IndexStatus::NotExist) { + std::fprintf(stderr, "index status error in WriteWhileCreating\n"); + std::exit(1); + } + ContinueCreatingDeleting(thread); + } + } + + void WriteSeqWhileCCD(ThreadState* thread) { + if (thread->tid > 0) { + WriteSeq(thread); + } else { + // Special thread that keeps creating index until other threads are done. + if (db_->GetIndexStatus("age") != IndexStatus::NotExist) { + std::fprintf(stderr, "index status error in WriteWhileCreating\n"); + std::exit(1); + } + ContinueCreatingDeleting(thread); + } + } + void WriteSeqWhileDeleting(ThreadState* thread) { if (thread->tid > 0) { WriteSeq(thread); @@ -1221,7 +1290,7 @@ class Benchmark { while (true) { { MutexLock l(&thread->shared->mu); - if (thread->shared->num_done == 1) { + if (thread->shared->num_done >= 1) { // 创删索引完成 delete iter; thread->stats.AddBytes(bytes); @@ -1251,7 +1320,7 @@ class Benchmark { while (true) { { MutexLock l(&thread->shared->mu); - if (thread->shared->num_done == 1) { + if (thread->shared->num_done >= 1) { // 创删索引完成 delete iter; thread->stats.AddBytes(bytes); @@ -1281,7 +1350,7 @@ class Benchmark { while (true) { { MutexLock l(&thread->shared->mu); - if (thread->shared->num_done == 1) { + if (thread->shared->num_done >= 1) { // 创删索引完成 break; } @@ -1312,7 +1381,7 @@ class Benchmark { while (true) { { MutexLock l(&thread->shared->mu); - if (thread->shared->num_done == 1) { + if (thread->shared->num_done >= 1) { // 创删索引完成 break; } @@ -1398,6 +1467,8 @@ int main(int argc, char** argv) { } } + // FLAGS_num = (FLAGS_num + FLAGS_threads - 1) / FLAGS_threads; + leveldb::g_env = leveldb::Env::Default(); // Choose a location for the test database if none given with --db=