diff --git a/db/db_impl.cc b/db/db_impl.cc index 78e2382..36afa1c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1310,7 +1310,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Total_elapsed += env_->NowMicros() - start_; NoWaiting_elapsed += env_->NowMicros() - start_; Nowaited_count ++; - dumpStatistics(); + // dumpStatistics(); return status; } diff --git a/fielddb/SliceHashSet.h b/fielddb/SliceHashSet.h new file mode 100644 index 0000000..1b728e2 --- /dev/null +++ b/fielddb/SliceHashSet.h @@ -0,0 +1,24 @@ +# ifndef SLICE_HASH_SET_H +# define SLICE_HASH_SET_H + +#include "leveldb/slice.h" +#include "util/hash.h" +#include +using namespace leveldb; +class SliceHash { +public: + uint32_t operator()(const Slice &lhs) const { + return Hash(lhs.data(),lhs.size(),0x1234); + } +}; + +class SliceEq { +public: + bool operator()(const Slice &lhs, const Slice &rhs) const { + return lhs == rhs; + } +}; + +using SliceHashSet = std::unordered_set; + +#endif \ No newline at end of file diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 5685168..e757b66 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -19,6 +19,7 @@ #include "fielddb/encode_index.h" #include "fielddb/meta.h" #include "field_db.h" +#include "fielddb/SliceHashSet.h" namespace fielddb { using namespace leveldb; @@ -36,15 +37,17 @@ Status FieldDB::OpenFieldDB(Options& options, // options.block_cache = NewLRUCache(ULONG_MAX); // options.max_open_files = 1000; // options.write_buffer_size = 512 * 1024 * 1024; - // options.env = getPosixEnv(); + + //这里是为了让3个数据库有独立的的Background thread + options.env = getPosixEnv(); status = Open(options, name+"_indexDB", &indexdb); if(!status.ok()) return status; - // options.env = getPosixEnv(); + options.env = getPosixEnv(); status = Open(options, name+"_kvDB", &kvdb); if(!status.ok()) return status; - // options.env = getPosixEnv(); + options.env = getPosixEnv(); status = Open(options, name+"_metaDB", &metadb); if(!status.ok()) return status; @@ -127,7 +130,7 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { taskqueue_.push_back(&req); while(true){ uint64_t start_waiting = env_->NowMicros(); - while(!req.done && &req != taskqueue_.front()) { + while(req.isPending() || !req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } waiting_elasped += env_->NowMicros() - start_waiting; @@ -139,7 +142,7 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set batchKeySet; + SliceHashSet batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 @@ -312,7 +315,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + SliceHashSet useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); @@ -328,7 +331,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &o return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + SliceHashSet useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); diff --git a/fielddb/field_db.h b/fielddb/field_db.h index b920293..2cd458f 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -111,7 +111,8 @@ private: uint64_t write_clean_elapsed = 0; uint64_t write_bytes = 0; - uint64_t write_bytes_lim = 50 * 1024 * 1024; + uint64_t write_step = 500 * 1024 * 1024; + uint64_t write_bytes_lim = write_step; uint64_t temp_elapsed = 0; @@ -140,7 +141,7 @@ private: std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl; // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl; std::cout << "=====================================================\n"; - write_bytes_lim = write_bytes + 50 * 1024 * 1024; + write_bytes_lim = write_bytes + write_step; std::fflush(stdout); } } diff --git a/fielddb/request.cpp b/fielddb/request.cpp index b90470a..9a53a6a 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -25,7 +25,7 @@ void Request::PendReq(Request *req) { //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { assert(0); } @@ -48,12 +48,12 @@ bool Request::isPending() { /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { - if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ + if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key.ToString()); + batchKeySet.insert(Key); } std::string val_str; Status s = Status::NotFound("test"); @@ -164,12 +164,12 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { - if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ + if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key.ToString()); + batchKeySet.insert(Key); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete @@ -263,7 +263,7 @@ void iCreateReq::PendReq(Request *req) { void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, - std::unordered_set &batchKeySet) + SliceHashSet &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 @@ -322,7 +322,7 @@ void iDeleteReq::PendReq(Request* req) { } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(Field); @@ -404,10 +404,10 @@ BatchReq::~BatchReq() { } void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) { WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; - std::unordered_set Sub_batchKeySet; + SliceHashSet Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 uint64_t start_ = DB->env_->NowMicros(); for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { diff --git a/fielddb/request.h b/fielddb/request.h index 4e68596..b4847fe 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -9,6 +9,7 @@ #include "util/serialize_value.h" #include // #include "fielddb/field_db.h" +#include "fielddb/SliceHashSet.h" #ifndef REQUEST_H #define REQUEST_H @@ -48,7 +49,7 @@ public: //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet); //主要用于icreate和idelete在队列中的注册当前状态 virtual void Prepare(FieldDB *DB); virtual void Finalize(FieldDB *DB); @@ -87,7 +88,7 @@ public: } void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; Slice Key; FieldSliceArray SliceFields; @@ -112,7 +113,7 @@ public: Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -129,7 +130,7 @@ public: Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -146,7 +147,7 @@ public: Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; Slice Key; }; @@ -157,7 +158,7 @@ public: ~BatchReq(); void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; WriteBatch *Batch; std::deque sub_requests;