瀏覽代碼

Batchreq和Write接口的实现,但未完成测试

gyf
cyq 8 月之前
父節點
當前提交
0fed70d5c3
共有 4 個文件被更改,包括 93 次插入6 次删除
  1. +1
    -1
      CMakeLists.txt
  2. +7
    -1
      fielddb/field_db.cpp
  3. +69
    -4
      fielddb/request.cpp
  4. +16
    -0
      fielddb/request.h

+ 1
- 1
CMakeLists.txt 查看文件

@ -17,7 +17,7 @@ endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project. # C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD) if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11. # This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD) endif(NOT CMAKE_CXX_STANDARD)

+ 7
- 1
fielddb/field_db.cpp 查看文件

@ -97,6 +97,7 @@ Status FieldDB::Recover() {
Iter = metaDB_->NewIterator(ReadOptions()); Iter = metaDB_->NewIterator(ReadOptions());
Iter->SeekToFirst(); Iter->SeekToFirst();
std::cout << "Iter Valid : " << Iter->Valid() << std::endl; std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
delete Iter;
//3. 等待所有请求完成 //3. 等待所有请求完成
return Status::OK(); return Status::OK();
} }
@ -216,6 +217,10 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
//或许应该再做一个接口?或者基于现有的接口进行改造 //或许应该再做一个接口?或者基于现有的接口进行改造
BatchReq req(updates,&mutex_);
Status status = HandleRequest(req);
return status;
assert(0);
return Status::OK(); return Status::OK();
} }
//由于常规put将空串作为name,这里也需要适当修改 //由于常规put将空串作为name,这里也需要适当修改
@ -251,6 +256,7 @@ std::vector> FieldDB::FindKeysAndValByFieldN
result.push_back(std::make_pair(iter->key().ToString(), val)); result.push_back(std::make_pair(iter->key().ToString(), val));
} }
} }
delete iter;
return result; return result;
} }
@ -308,7 +314,7 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) {
} }
break; break;
} }
delete indexIterator;
*s = Status::OK(); *s = Status::OK();
return result; return result;
} }

+ 69
- 4
fielddb/request.cpp 查看文件

@ -1,9 +1,12 @@
#include "fielddb/request.h" #include "fielddb/request.h"
#include <cassert> #include <cassert>
#include <deque>
#include <string> #include <string>
#include <unordered_set>
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
#include "fielddb/encode_index.h" #include "fielddb/encode_index.h"
@ -74,7 +77,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) { if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name]; auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return; return;
} else if(index_status == IndexStatus::Exist) { } else if(index_status == IndexStatus::Exist) {
HasIndex = true; HasIndex = true;
@ -89,7 +92,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) { if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name]; auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return; return;
} else if(index_status == IndexStatus::Exist) { } else if(index_status == IndexStatus::Exist) {
HasOldIndex = true; HasOldIndex = true;
@ -172,7 +175,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) { if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name]; auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return; return;
} else if(index_status == IndexStatus::Exist) { } else if(index_status == IndexStatus::Exist) {
HasIndex = true; HasIndex = true;
@ -214,7 +217,7 @@ void iCreateReq::Prepare(FieldDB *DB) {
s = Status::OK(); s = Status::OK();
} else { } else {
//如果正在创建或删除,那么进行等待 //如果正在创建或删除,那么进行等待
parent->PendReq(this);
parent->PendReq(this->parent);
} }
return; return;
} }
@ -320,5 +323,67 @@ void iDeleteReq::Finalize(FieldDB *DB) {
this->s = Status::OK(); this->s = Status::OK();
} }
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{"",value.ToString()}});
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
sub_requests->back()->parent = req;
}
void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
std::deque<std::string> *str_buf;
std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
Handler.str_buf = &str_buf;
Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
BatchReq::~BatchReq() {
while(!sub_requests.empty()) {
Request *req = sub_requests.front();
sub_requests.pop_front();
delete req;
}
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string *> Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
if(isPending()) {
return;
}
}
KVBatch.Append(Sub_KVBatch);
IndexBatch.Append(Sub_IndexBatch);
MetaBatch.Append(Sub_MetaBatch);
batchKeySet.insert(batchKeySet.begin(),batchKeySet.end());
}
} // namespace fielddb } // namespace fielddb

+ 16
- 0
fielddb/request.h 查看文件

@ -24,6 +24,7 @@ public:
iCreateReq_t, iCreateReq_t,
iDeleteReq_t, iDeleteReq_t,
DeleteReq_t, DeleteReq_t,
BatchReq_t,
}; };
public: public:
@ -41,6 +42,7 @@ public:
inline bool isiCreateReq() { return type_ == iCreateReq_t; } inline bool isiCreateReq() { return type_ == iCreateReq_t; }
inline bool isiDeleteReq() { return type_ == iDeleteReq_t; } inline bool isiDeleteReq() { return type_ == iDeleteReq_t; }
inline bool isDeleteReq() { return type_ == DeleteReq_t; } inline bool isDeleteReq() { return type_ == DeleteReq_t; }
inline bool isBatchReq() { return type_ == BatchReq_t; }
//Fields的 //Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
@ -130,6 +132,20 @@ public:
std::string *Key; std::string *Key;
}; };
class BatchReq : public Request {
public:
BatchReq(WriteBatch *Batch,port::Mutex *mu);
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;
std::deque<std::string> str_buf;
std::deque<FieldArray> fa_buf;
};
} }
#endif #endif

Loading…
取消
儲存