From 0fed70d5c30a0de7177ba695b4c78db5cee4fa72 Mon Sep 17 00:00:00 2001 From: cyq <1056374449@qq.com> Date: Sat, 21 Dec 2024 21:32:18 +0800 Subject: [PATCH] =?UTF-8?q?Batchreq=E5=92=8CWrite=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0=EF=BC=8C=E4=BD=86=E6=9C=AA=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 2 +- fielddb/field_db.cpp | 8 +++++- fielddb/request.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++++++++--- fielddb/request.h | 16 ++++++++++++ 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9529fb0..fa7468f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) # This project requires C++11. - set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 0eea72e..8a710de 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -97,6 +97,7 @@ Status FieldDB::Recover() { Iter = metaDB_->NewIterator(ReadOptions()); Iter->SeekToFirst(); std::cout << "Iter Valid : " << Iter->Valid() << std::endl; + delete Iter; //3. 等待所有请求完成 return Status::OK(); } @@ -216,6 +217,10 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { //或许应该再做一个接口?或者基于现有的接口进行改造 + BatchReq req(updates,&mutex_); + Status status = HandleRequest(req); + return status; + assert(0); return Status::OK(); } //由于常规put将空串作为name,这里也需要适当修改 @@ -251,6 +256,7 @@ std::vector> FieldDB::FindKeysAndValByFieldN result.push_back(std::make_pair(iter->key().ToString(), val)); } } + delete iter; return result; } @@ -308,7 +314,7 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { } break; } - + delete indexIterator; *s = Status::OK(); return result; } diff --git a/fielddb/request.cpp b/fielddb/request.cpp index d2c2d8b..5df5310 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -1,9 +1,12 @@ #include "fielddb/request.h" #include +#include #include +#include #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" +#include "port/port_stdcxx.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" @@ -74,7 +77,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; @@ -89,7 +92,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasOldIndex = true; @@ -172,7 +175,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; @@ -214,7 +217,7 @@ void iCreateReq::Prepare(FieldDB *DB) { s = Status::OK(); } else { //如果正在创建或删除,那么进行等待 - parent->PendReq(this); + parent->PendReq(this->parent); } return; } @@ -320,5 +323,67 @@ void iDeleteReq::Finalize(FieldDB *DB) { this->s = Status::OK(); } +BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): + Batch(Batch),Request(BatchReq_t, mu) { + + struct BatchHandler : WriteBatch::Handler { + void Put(const Slice &key, const Slice &value) override { + //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 + str_buf->push_back(key.ToString()); + fa_buf->push_back({{"",value.ToString()}}); + sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu)); + sub_requests->back()->parent = req; + } + void Delete(const Slice &key) override { + str_buf->push_back(key.ToString()); + sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu)); + sub_requests->back()->parent = req; + } + + BatchReq *req; + port::Mutex *mu; + std::deque *str_buf; + std::deque *fa_buf; + std::deque *sub_requests; + }; + + BatchHandler Handler; + Handler.req = this; + Handler.mu = mu; + Handler.str_buf = &str_buf; + Handler.fa_buf = &fa_buf; + Handler.sub_requests = &sub_requests; + + Batch->Iterate(&Handler); +} + +BatchReq::~BatchReq() { + while(!sub_requests.empty()) { + Request *req = sub_requests.front(); + sub_requests.pop_front(); + delete req; + } +} + +void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) +{ + WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; + std::unordered_set Sub_batchKeySet; + //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 + for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { + (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, + //pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 + if(isPending()) { + return; + } + } + KVBatch.Append(Sub_KVBatch); + IndexBatch.Append(Sub_IndexBatch); + MetaBatch.Append(Sub_MetaBatch); + batchKeySet.insert(batchKeySet.begin(),batchKeySet.end()); +} + } // namespace fielddb \ No newline at end of file diff --git a/fielddb/request.h b/fielddb/request.h index 1ebebfd..4813888 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -24,6 +24,7 @@ public: iCreateReq_t, iDeleteReq_t, DeleteReq_t, + BatchReq_t, }; public: @@ -41,6 +42,7 @@ public: inline bool isiCreateReq() { return type_ == iCreateReq_t; } inline bool isiDeleteReq() { return type_ == iDeleteReq_t; } inline bool isDeleteReq() { return type_ == DeleteReq_t; } + inline bool isBatchReq() { return type_ == BatchReq_t; } //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, @@ -130,6 +132,20 @@ public: std::string *Key; }; +class BatchReq : public Request { +public: + BatchReq(WriteBatch *Batch,port::Mutex *mu); + ~BatchReq(); + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + + WriteBatch *Batch; + std::deque sub_requests; + std::deque str_buf; + std::deque fa_buf; +}; + } #endif \ No newline at end of file