diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 4ff7f05..554a6c5 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -856,8 +856,20 @@ class Benchmark { for (int j = 0; j < entries_per_batch_; j++) { const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); key.Set(k); - batch.Put(key.slice(), gen.Generate(value_size_)); - bytes += value_size_ + key.slice().size(); + + std::string name = "customer#" + std::to_string(k); + //这个字段用来查找 + std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100); + //这个字段填充长度 + std::string tag = gen.Generate(value_size_).ToString(); + FieldArray fields = { + {"name", name}, + {"age", age}, + {"tag", tag} + }; + std::string value = SerializeValue(fields); + batch.Put(key.slice(), value); + bytes += value.size() + key.slice().size(); thread->stats.FinishedSingleOp(); } s = db_->Write(write_options_, &batch); diff --git a/benchmarks/db_bench_FieldDB.cc b/benchmarks/db_bench_FieldDB.cc index c7029aa..65e980a 100644 --- a/benchmarks/db_bench_FieldDB.cc +++ b/benchmarks/db_bench_FieldDB.cc @@ -77,7 +77,9 @@ static const char* FLAGS_benchmarks = "ReadSeqWhileCreating," "ReadSeqWhileDeleting," "ReadRandomWhileCreating," - "ReadRandomWhileDeleting,"; + "ReadRandomWhileDeleting," + "WriteRandomWithIndex," + "WriteSeqWithIndex,"; // Number of key/values to place in database static int FLAGS_num = 1000000; @@ -340,8 +342,8 @@ class Stats { // elapsed times. double elapsed = (finish_ - start_) * 1e-6; char rate[100]; - std::snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / elapsed); + std::snprintf(rate, sizeof(rate), "%6.1f MB/s Bytes:%6.1f elapsed(s):%6.1f seconds:%6.1f ", + (bytes_ / 1048576.0) / elapsed,(bytes_ / 1048576.0),elapsed,seconds_); extra = rate; } AppendWithSpace(&extra, message_); @@ -678,6 +680,12 @@ class Benchmark { } else if (name == Slice("ReadRandomWhileDeleting")) { num_threads++; method = &Benchmark::ReadRandomWhileDeleting; + } else if (name == Slice("WriteRandomWithIndex")) { + fresh_db = true; + method = &Benchmark::WriteRandomWithIndex; + } else if (name == Slice("WriteSeqWithIndex")) { + fresh_db = true; + method = &Benchmark::WriteSeqWithIndex; } else if (name == Slice("snappycomp")) { method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { @@ -1141,6 +1149,18 @@ class Benchmark { db_->DeleteIndex("age", write_options_); } + void WriteSeqWithIndex(ThreadState* thread) { + CreateIndex(thread); + thread->stats.Start(); + WriteSeq(thread); + } + + void WriteRandomWithIndex(ThreadState* thread) { + CreateIndex(thread); + thread->stats.Start(); + WriteRandom(thread); + } + void WriteSeqWhileCreating(ThreadState* thread) { if (thread->tid > 0) { WriteSeq(thread); diff --git a/benchmarks/db_bench_testDB.cc b/benchmarks/db_bench_testDB.cc index 5a4acb9..05d0030 100644 --- a/benchmarks/db_bench_testDB.cc +++ b/benchmarks/db_bench_testDB.cc @@ -859,8 +859,20 @@ class Benchmark { for (int j = 0; j < entries_per_batch_; j++) { const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); key.Set(k); - batch.Put(key.slice(), gen.Generate(value_size_)); - bytes += value_size_ + key.slice().size(); + + std::string name = "customer#" + std::to_string(k); + //这个字段用来查找 + std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100); + //这个字段填充长度 + std::string tag = gen.Generate(value_size_).ToString(); + FieldArray fields = { + {"name", name}, + {"age", age}, + {"tag", tag} + }; + std::string value = SerializeValue(fields); + batch.Put(key.slice(), value); + bytes += value.size() + key.slice().size(); thread->stats.FinishedSingleOp(); } s = db_->Write(write_options_, &batch); diff --git a/db/db_impl.cc b/db/db_impl.cc index 122760c..36afa1c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1235,12 +1235,18 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { w.sync = options.sync; w.done = false; + uint64_t start_ = env_->NowMicros(); MutexLock l(&mutex_); + count ++; writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { + Waiting_elapsed += env_->NowMicros() - start_; + waited_count ++; + Total_elapsed += env_->NowMicros() - start_; + // dumpStatistics(); return w.status; } @@ -1259,6 +1265,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // into mem_. { mutex_.Unlock(); + uint64_t start_write = env_->NowMicros(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { @@ -1270,6 +1277,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } + BatchSize += write_batch->ApproximateSize(); + write_elapsed += env_->NowMicros() - start_write; mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we @@ -1298,7 +1307,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (!writers_.empty()) { writers_.front()->cv.Signal(); } - + Total_elapsed += env_->NowMicros() - start_; + NoWaiting_elapsed += env_->NowMicros() - start_; + Nowaited_count ++; + // dumpStatistics(); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 6848077..0d77e55 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -6,7 +6,10 @@ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ #include +#include +#include #include +#include #include #include @@ -210,6 +213,33 @@ class DBImpl : public DB { Status bg_error_ GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); + + int count = 0; + int waited_count = 0; + int Nowaited_count = 0; + uint64_t Total_elapsed = 0; + uint64_t Waiting_elapsed = 0; + uint64_t NoWaiting_elapsed = 0; + uint64_t write_elapsed = 0; + uint64_t BatchSize = 0; + const double MB = 1024 * 1024; + const double KB = 1024; + inline void dumpStatistics() { + if(count && count % 500000 == 0) { + printf("==================================\n"); + printf("Count: Total:%d Waited:%d Nowaited:%d\n",count,waited_count,Nowaited_count); + printf("%ld %ld %ld\n",Total_elapsed,Waiting_elapsed,NoWaiting_elapsed); + printf("Average Total elapsed: %lf ms\n",Total_elapsed * 1.0 / count); + printf("Average Waiting elapsed: %lf ms\n",Waiting_elapsed * 1.0 / count); + printf("For waiting request: %lf ms\n",Waiting_elapsed * 1.0 / waited_count); + printf("For Nowait request: %lf ms\n",NoWaiting_elapsed * 1.0 / Nowaited_count); + printf("Write elapsed: %lf ms\n",write_elapsed * 1.0 / Nowaited_count); + printf("Average BatchSize: %lfKB\n",BatchSize / KB / count); + printf("Average BatchSize per write:%lfKB\n",BatchSize / KB / Nowaited_count); + printf("==================================\n"); + std::fflush(stdout); + } + } }; // Sanitize db options. The caller should delete result.info_log if diff --git a/fielddb/SliceHashSet.h b/fielddb/SliceHashSet.h new file mode 100644 index 0000000..1b728e2 --- /dev/null +++ b/fielddb/SliceHashSet.h @@ -0,0 +1,24 @@ +# ifndef SLICE_HASH_SET_H +# define SLICE_HASH_SET_H + +#include "leveldb/slice.h" +#include "util/hash.h" +#include +using namespace leveldb; +class SliceHash { +public: + uint32_t operator()(const Slice &lhs) const { + return Hash(lhs.data(),lhs.size(),0x1234); + } +}; + +class SliceEq { +public: + bool operator()(const Slice &lhs, const Slice &rhs) const { + return lhs == rhs; + } +}; + +using SliceHashSet = std::unordered_set; + +#endif \ No newline at end of file diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index a9ad365..346c8c9 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -19,6 +19,7 @@ #include "fielddb/encode_index.h" #include "fielddb/meta.h" #include "field_db.h" +#include "fielddb/SliceHashSet.h" namespace fielddb { using namespace leveldb; @@ -36,15 +37,17 @@ Status FieldDB::OpenFieldDB(Options& options, // options.block_cache = NewLRUCache(ULONG_MAX); // options.max_open_files = 1000; // options.write_buffer_size = 512 * 1024 * 1024; - // options.env = getPosixEnv(); + + //这里是为了让3个数据库有独立的的Background thread + options.env = getPosixEnv(); status = Open(options, name+"_indexDB", &indexdb); if(!status.ok()) return status; - // options.env = getPosixEnv(); + options.env = getPosixEnv(); status = Open(options, name+"_kvDB", &kvdb); if(!status.ok()) return status; - // options.env = getPosixEnv(); + options.env = getPosixEnv(); status = Open(options, name+"_metaDB", &metadb); if(!status.ok()) return status; @@ -126,15 +129,20 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { MutexLock L(&mutex_); taskqueue_.push_back(&req); while(true){ - while(!req.done && &req != taskqueue_.front()) { + uint64_t start_waiting = env_->NowMicros(); + while(req.isPending() || !req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } + waiting_elasped += env_->NowMicros() - start_waiting; if(req.done) { + elapsed += env_->NowMicros() - start_; + count ++; + // dumpStatistics(); return req.s; //在返回时自动释放锁L } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set batchKeySet; + SliceHashSet batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 @@ -208,9 +216,6 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { if (ready == tail) break; } - elapsed += env_->NowMicros() - start_; - count ++; - //dumpStatistics(); if(!taskqueue_.empty()) { taskqueue_.front()->cond_.Signal(); @@ -310,7 +315,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + SliceHashSet useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); @@ -326,7 +331,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &o return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + SliceHashSet useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 5b7973d..2cd458f 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -111,10 +111,13 @@ private: uint64_t write_clean_elapsed = 0; uint64_t write_bytes = 0; - uint64_t write_bytes_lim = 50 * 1024 * 1024; + uint64_t write_step = 500 * 1024 * 1024; + uint64_t write_bytes_lim = write_step; uint64_t temp_elapsed = 0; + uint64_t waiting_elasped = 0; + inline void dumpStatistics() { if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) { std::cout << "=====================================================\n"; @@ -134,10 +137,11 @@ private: std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl; std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl; std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; - std::cout << "temp_elased : " << temp_elapsed * 1.0 / count<< std::endl; + std::cout << "temp_elased : " << temp_elapsed * 1.0 / count << std::endl; + std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl; // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl; std::cout << "=====================================================\n"; - write_bytes_lim = write_bytes + 50 * 1024 * 1024; + write_bytes_lim = write_bytes + write_step; std::fflush(stdout); } } diff --git a/fielddb/request.cpp b/fielddb/request.cpp index 9c30f1d..9a53a6a 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -25,7 +25,7 @@ void Request::PendReq(Request *req) { //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { assert(0); } @@ -48,24 +48,35 @@ bool Request::isPending() { /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { - if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ + if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key.ToString()); + batchKeySet.insert(Key); } std::string val_str; Status s = Status::NotFound("test"); uint64_t start_ = DB->env_->NowMicros(); - s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str); + s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; - FieldArray *oldFields; + // FieldArray *oldFields; + FieldSliceArray oldFields; if (s.IsNotFound()){ - oldFields = nullptr; + // oldFields = nullptr; } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 - oldFields = new FieldArray; - oldFields = ParseValue(val_str,oldFields); + // oldFields = new FieldArray; + // oldFields = ParseValue(val_str,oldFields); + Slice nameSlice, valSlice; + Slice Value(val_str); + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + oldFields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + } + nameSlice.clear(), valSlice.clear(); + } } else { assert(0); } @@ -76,8 +87,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的put pend到对应的请求 - for(auto [field_name,field_value] : SliceFields) { - if(field_name == EMPTY) break; + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) break; if(DB->index_.count(field_name.ToString())) { auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { @@ -90,11 +101,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } //冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 - if (oldFields != nullptr){ - for(auto [field_name,field_value] : *oldFields) { - if(field_name == EMPTY) break; - if(DB->index_.count(field_name)) { - auto [index_status,parent_req] = DB->index_[field_name]; + if (!oldFields.empty()){ + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; @@ -118,9 +129,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //3.1对于含有索引的oldfield删除索引 if (HasOldIndex) { - for(auto [field_name,field_value] : *oldFields) { - if(field_name == EMPTY) continue; - if(DB->index_.count(field_name)) { + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( Key,field_name,field_value)); @@ -131,8 +142,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //3.2对于含有索引的field建立索引 if (HasIndex) { - for(auto [field_name,field_value] : SliceFields) { - if(field_name == EMPTY) continue; + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) continue; if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( @@ -146,19 +157,19 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //优化:对于3.1,3.2中都有的索引只写一次 } - if(oldFields) delete oldFields; + // if(oldFields) delete oldFields; } /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { - if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ + if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key.ToString()); + batchKeySet.insert(Key); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete @@ -168,18 +179,29 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); if (s.IsNotFound()) return; - FieldArray *Fields = new FieldArray; - ParseValue(val_str,Fields); + // FieldArray *Fields = new FieldArray; + // ParseValue(val_str,Fields); + FieldSliceArray Fields; + Slice nameSlice, valSlice; + Slice Value(val_str); + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + Fields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + } + nameSlice.clear(), valSlice.clear(); + } KVBatch.Delete(Slice(Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的delete pend到对应的请求 - for(auto [field_name,field_value] : *Fields) { - if(field_name == EMPTY) break; - if(DB->index_.count(field_name)) { - auto [index_status,parent_req] = DB->index_[field_name]; + for(auto &[field_name,field_value] : Fields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; @@ -197,9 +219,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value MetaBatch.Put(MetaKey, Slice()); //3.对于含有索引的field删除索引 - for(auto [field_name,field_value] : *Fields) { - if(field_name == EMPTY) continue; - if(DB->index_.count(field_name)) { + for(auto &[field_name,field_value] : Fields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( Key,field_name,field_value)); @@ -208,7 +230,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } - delete Fields; + // delete Fields; } /*******iCreateReq*******/ @@ -241,7 +263,7 @@ void iCreateReq::PendReq(Request *req) { void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 @@ -300,7 +322,7 @@ void iDeleteReq::PendReq(Request* req) { } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(Field); @@ -382,15 +404,16 @@ BatchReq::~BatchReq() { } void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) { WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; - std::unordered_set Sub_batchKeySet; + SliceHashSet Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 uint64_t start_ = DB->env_->NowMicros(); for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { uint64_t start_sub = DB->env_->NowMicros(); (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + // (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; DB->count_Batch_Sub ++; //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, diff --git a/fielddb/request.h b/fielddb/request.h index 4e68596..b4847fe 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -9,6 +9,7 @@ #include "util/serialize_value.h" #include // #include "fielddb/field_db.h" +#include "fielddb/SliceHashSet.h" #ifndef REQUEST_H #define REQUEST_H @@ -48,7 +49,7 @@ public: //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet); //主要用于icreate和idelete在队列中的注册当前状态 virtual void Prepare(FieldDB *DB); virtual void Finalize(FieldDB *DB); @@ -87,7 +88,7 @@ public: } void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; Slice Key; FieldSliceArray SliceFields; @@ -112,7 +113,7 @@ public: Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -129,7 +130,7 @@ public: Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -146,7 +147,7 @@ public: Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; Slice Key; }; @@ -157,7 +158,7 @@ public: ~BatchReq(); void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; WriteBatch *Batch; std::deque sub_requests;