From b7c1d557fa35e3795bfa219eabb1bbe94a54524d Mon Sep 17 00:00:00 2001 From: cyq <1056374449@qq.com> Date: Sun, 29 Dec 2024 01:00:01 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E9=83=A8=E5=88=86=E8=BD=ACsl?= =?UTF-8?q?ice?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/field_db.cpp | 8 ++-- fielddb/field_db.h | 5 ++- fielddb/request.cpp | 105 +++++++++++++++++++++++++++++++-------------------- 3 files changed, 73 insertions(+), 45 deletions(-) diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index a9ad365..5685168 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -126,10 +126,15 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { MutexLock L(&mutex_); taskqueue_.push_back(&req); while(true){ + uint64_t start_waiting = env_->NowMicros(); while(!req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } + waiting_elasped += env_->NowMicros() - start_waiting; if(req.done) { + elapsed += env_->NowMicros() - start_; + count ++; + dumpStatistics(); return req.s; //在返回时自动释放锁L } Request *tail = GetHandleInterval(); @@ -208,9 +213,6 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { if (ready == tail) break; } - elapsed += env_->NowMicros() - start_; - count ++; - //dumpStatistics(); if(!taskqueue_.empty()) { taskqueue_.front()->cond_.Signal(); diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 5b7973d..b920293 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -115,6 +115,8 @@ private: uint64_t temp_elapsed = 0; + uint64_t waiting_elasped = 0; + inline void dumpStatistics() { if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) { std::cout << "=====================================================\n"; @@ -134,7 +136,8 @@ private: std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl; std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl; std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; - std::cout << "temp_elased : " << temp_elapsed * 1.0 / count<< std::endl; + std::cout << "temp_elased : " << temp_elapsed * 1.0 / count << std::endl; + std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl; // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl; std::cout << "=====================================================\n"; write_bytes_lim = write_bytes + 50 * 1024 * 1024; diff --git a/fielddb/request.cpp b/fielddb/request.cpp index 9c30f1d..ca83f70 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -58,14 +58,25 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, std::string val_str; Status s = Status::NotFound("test"); uint64_t start_ = DB->env_->NowMicros(); - s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str); + s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; - FieldArray *oldFields; + // FieldArray *oldFields; + FieldSliceArray oldFields; if (s.IsNotFound()){ - oldFields = nullptr; + // oldFields = nullptr; } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 - oldFields = new FieldArray; - oldFields = ParseValue(val_str,oldFields); + // oldFields = new FieldArray; + // oldFields = ParseValue(val_str,oldFields); + Slice nameSlice, valSlice; + Slice Value(val_str); + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + oldFields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + } + nameSlice.clear(), valSlice.clear(); + } } else { assert(0); } @@ -76,8 +87,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的put pend到对应的请求 - for(auto [field_name,field_value] : SliceFields) { - if(field_name == EMPTY) break; + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) break; if(DB->index_.count(field_name.ToString())) { auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { @@ -90,11 +101,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } //冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 - if (oldFields != nullptr){ - for(auto [field_name,field_value] : *oldFields) { - if(field_name == EMPTY) break; - if(DB->index_.count(field_name)) { - auto [index_status,parent_req] = DB->index_[field_name]; + if (!oldFields.empty()){ + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; @@ -118,9 +129,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //3.1对于含有索引的oldfield删除索引 if (HasOldIndex) { - for(auto [field_name,field_value] : *oldFields) { - if(field_name == EMPTY) continue; - if(DB->index_.count(field_name)) { + for(auto &[field_name,field_value] : oldFields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( Key,field_name,field_value)); @@ -131,8 +142,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //3.2对于含有索引的field建立索引 if (HasIndex) { - for(auto [field_name,field_value] : SliceFields) { - if(field_name == EMPTY) continue; + for(auto &[field_name,field_value] : SliceFields) { + if(field_name.data() == EMPTY) continue; if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( @@ -146,7 +157,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //优化:对于3.1,3.2中都有的索引只写一次 } - if(oldFields) delete oldFields; + // if(oldFields) delete oldFields; } @@ -168,18 +179,29 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); if (s.IsNotFound()) return; - FieldArray *Fields = new FieldArray; - ParseValue(val_str,Fields); + // FieldArray *Fields = new FieldArray; + // ParseValue(val_str,Fields); + FieldSliceArray Fields; + Slice nameSlice, valSlice; + Slice Value(val_str); + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + Fields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + } + nameSlice.clear(), valSlice.clear(); + } KVBatch.Delete(Slice(Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的delete pend到对应的请求 - for(auto [field_name,field_value] : *Fields) { - if(field_name == EMPTY) break; - if(DB->index_.count(field_name)) { - auto [index_status,parent_req] = DB->index_[field_name]; + for(auto &[field_name,field_value] : Fields) { + if(field_name.data() == EMPTY) break; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; @@ -197,9 +219,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value MetaBatch.Put(MetaKey, Slice()); //3.对于含有索引的field删除索引 - for(auto [field_name,field_value] : *Fields) { - if(field_name == EMPTY) continue; - if(DB->index_.count(field_name)) { + for(auto &[field_name,field_value] : Fields) { + if(field_name.data() == EMPTY) continue; + if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( Key,field_name,field_value)); @@ -208,7 +230,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } - delete Fields; + // delete Fields; } /*******iCreateReq*******/ @@ -384,13 +406,14 @@ BatchReq::~BatchReq() { void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { - WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; + // WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; std::unordered_set Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 uint64_t start_ = DB->env_->NowMicros(); for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { uint64_t start_sub = DB->env_->NowMicros(); - (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + // (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; DB->count_Batch_Sub ++; //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, @@ -399,17 +422,17 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, return; } } - DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; - if(Sub_KVBatch.ApproximateSize() > 12) { - KVBatch.Append(Sub_KVBatch); - } - if(Sub_IndexBatch.ApproximateSize() > 12) { - IndexBatch.Append(Sub_IndexBatch); - } - if(Sub_MetaBatch.ApproximateSize() > 12) { - MetaBatch.Append(Sub_MetaBatch); - } - batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); + // DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; + // if(Sub_KVBatch.ApproximateSize() > 12) { + // KVBatch.Append(Sub_KVBatch); + // } + // if(Sub_IndexBatch.ApproximateSize() > 12) { + // IndexBatch.Append(Sub_IndexBatch); + // } + // if(Sub_MetaBatch.ApproximateSize() > 12) { + // MetaBatch.Append(Sub_MetaBatch); + // } + // batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; }