diff --git a/CMakeLists.txt b/CMakeLists.txt index 9529fb0..fa7468f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) # This project requires C++11. - set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index 49db131..6879b82 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1169,7 +1169,7 @@ Status DBImpl::GetFields(const ReadOptions& options, const Slice& key, FieldArray* fields) { std::string value; Status s = DBImpl::Get(options, key, &value); - *fields = *ParseValue(value); + ParseValue(value,fields); return s; } diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 4c1cf81..c7e094e 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -6,10 +6,13 @@ #include #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/iterator.h" #include "leveldb/options.h" +#include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" #include "db/write_batch_internal.h" +#include "util/coding.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" @@ -53,7 +56,48 @@ Status FieldDB::OpenFieldDB(const Options& options, Status FieldDB::Recover() { //TODO: //1. 遍历所有Index类型的meta,重建内存中的index_状态表 + Iterator *Iter = indexDB_->NewIterator(ReadOptions()); + std::string IndexKey; + Iter->SeekToFirst(); + while(Iter->Valid()) { + IndexKey = Iter->value().ToString(); + ParsedInternalIndexKey ParsedIndex; + ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); + index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; + std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl; + + //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) + //TODO:不知道这个做法有没有道理 + std::string Seek; + PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); + Seek.push_back(0xff); + Iter->Seek(Slice(Seek)); + } + delete Iter; //2. 寻找所有KV类型的meta,再次提交一遍请求 + Iter = metaDB_->NewIterator(ReadOptions()); + Slice MetaValue; + Iter->SeekToFirst(); + while (Iter->Valid()) { + MetaValue = Iter->key(); + MetaType type = MetaType(DecodeFixed32(MetaValue.data())); + MetaValue.remove_prefix(4);//移除头上的metaType的部分 + if(type == KV_Creating) { + FieldArray fields; + ParseValue(Iter->value().ToString(), &fields); + PutFields(WriteOptions(), MetaValue, fields); + } else if(type == KV_Deleting) { + Delete(WriteOptions(), MetaValue); + } else { + assert(0 && "Invalid MetaType"); + } + } + delete Iter; + //在所有的请求完成后,会自动把metaDB的内容清空。 + Iter = metaDB_->NewIterator(ReadOptions()); + Iter->SeekToFirst(); + std::cout << "Iter Valid : " << Iter->Valid() << std::endl; + delete Iter; //3. 等待所有请求完成 return Status::OK(); } @@ -173,6 +217,10 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { //或许应该再做一个接口?或者基于现有的接口进行改造 + BatchReq req(updates,&mutex_); + Status status = HandleRequest(req); + return status; + assert(0); return Status::OK(); } //由于常规put将空串作为name,这里也需要适当修改 @@ -208,6 +256,7 @@ std::vector> FieldDB::FindKeysAndValByFieldN result.push_back(std::make_pair(iter->key().ToString(), val)); } } + delete iter; return result; } @@ -265,7 +314,7 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { } break; } - + delete indexIterator; *s = Status::OK(); return result; } diff --git a/fielddb/meta.cpp b/fielddb/meta.cpp index a02f585..13ee09d 100644 --- a/fielddb/meta.cpp +++ b/fielddb/meta.cpp @@ -22,22 +22,24 @@ using namespace leveldb; //对于含有index field的put/delete的meta编码为 (KV|Key,Value) -void MetaKV::TransPut(Slice &MetaKey,Slice &MetaValue) { +void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) { MetaKey.clear(); MetaValue.clear(); - std::string buf; + //这里的改动是为了防止潜在的段错误。原来的写法中,slice(buf)对应的buf是局部的,在函数返回后,buf被销毁 + //但是slice中的指针指向的是析构的string对象的部分内存 + std::string &buf = MetaKey; PutFixed32(&buf, KV_Creating); PutLengthPrefixedSlice(&buf, Slice(*name)); - MetaKey = Slice(buf); - MetaValue = Slice(*value); + // MetaKey = Slice(buf); + // MetaValue = Slice(*value); } -void MetaKV::TransDelete(Slice &MetaKey) { +void MetaKV::TransDelete(std::string &MetaKey) { MetaKey.clear(); - std::string buf; + std::string &buf = MetaKey; PutFixed32(&buf, KV_Deleting); PutLengthPrefixedSlice(&buf, Slice(*name)); - MetaKey = Slice(buf); + // MetaKey = Slice(buf); } class CleanerHandler : public WriteBatch::Handler { diff --git a/fielddb/meta.h b/fielddb/meta.h index 3dba911..2766e88 100644 --- a/fielddb/meta.h +++ b/fielddb/meta.h @@ -37,8 +37,8 @@ class MetaKV { public: MetaKV(std::string *field_name,std::string *field_value = nullptr): name(field_name),value(field_value) { } - void TransPut(Slice &MetaKey,Slice &MetaValue); - void TransDelete(Slice &MetaKey); + void TransPut(std::string &MetaKey,std::string &MetaValue); + void TransDelete(std::string &MetaKey); private: std::string *name; std::string *value; diff --git a/fielddb/request.cpp b/fielddb/request.cpp index d29d668..f08937b 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -1,8 +1,12 @@ #include "fielddb/request.h" #include +#include +#include +#include #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" +#include "port/port_stdcxx.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" @@ -56,7 +60,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if (s.IsNotFound()){ oldFields = nullptr; } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 - oldFields = ParseValue(val_str); + oldFields = new FieldArray; + oldFields = ParseValue(val_str,oldFields); } else { assert(0); } @@ -72,7 +77,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; @@ -87,7 +92,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasOldIndex = true; @@ -100,11 +105,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex || HasOldIndex) { - Slice MetaKey,MetaValue; + std::string MetaKey,MetaValue; std::string serialized = SerializeValue(*Fields); MetaKV MKV = MetaKV(Key,&serialized); MKV.TransPut(MetaKey, MetaValue); - MetaBatch.Put(MetaKey, MetaValue); + MetaBatch.Put(MetaKey, serialized); //3.1对于含有索引的oldfield删除索引 @@ -136,6 +141,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } //优化:对于3.1,3.2中都有的索引只写一次 } + + if(oldFields) delete oldFields; } @@ -157,7 +164,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); if (s.IsNotFound()) return; - FieldArray *Fields = ParseValue(val_str); + FieldArray *Fields = new FieldArray; + ParseValue(val_str,Fields); + KVBatch.Delete(Slice(*Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 @@ -168,7 +177,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { - parent_req->PendReq(this); + parent_req->PendReq(this->parent); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; @@ -179,7 +188,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, KVBatch.Delete(Slice(*Key)); //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex) { - Slice MetaKey; + std::string MetaKey; MetaKV MKV = MetaKV(Key); MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value MetaBatch.Put(MetaKey, Slice()); @@ -195,6 +204,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } + delete Fields; } /*******iCreateReq*******/ @@ -210,7 +220,7 @@ void iCreateReq::Prepare(FieldDB *DB) { s = Status::OK(); } else { //如果正在创建或删除,那么进行等待 - parent->PendReq(this); + parent->PendReq(this->parent); } return; } @@ -317,5 +327,67 @@ void iDeleteReq::Finalize(FieldDB *DB) { this->s = Status::OK(); } +BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): + Batch(Batch),Request(BatchReq_t, mu) { + + struct BatchHandler : WriteBatch::Handler { + void Put(const Slice &key, const Slice &value) override { + //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 + str_buf->push_back(key.ToString()); + fa_buf->push_back({{"",value.ToString()}}); + sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu)); + sub_requests->back()->parent = req; + } + void Delete(const Slice &key) override { + str_buf->push_back(key.ToString()); + sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu)); + sub_requests->back()->parent = req; + } + + BatchReq *req; + port::Mutex *mu; + std::deque *str_buf; + std::deque *fa_buf; + std::deque *sub_requests; + }; + + BatchHandler Handler; + Handler.req = this; + Handler.mu = mu; + Handler.str_buf = &str_buf; + Handler.fa_buf = &fa_buf; + Handler.sub_requests = &sub_requests; + + Batch->Iterate(&Handler); +} + +BatchReq::~BatchReq() { + while(!sub_requests.empty()) { + Request *req = sub_requests.front(); + sub_requests.pop_front(); + delete req; + } +} + +void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) +{ + WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; + std::unordered_set Sub_batchKeySet; + //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 + for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { + (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, + //pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 + if(isPending()) { + return; + } + } + KVBatch.Append(Sub_KVBatch); + IndexBatch.Append(Sub_IndexBatch); + MetaBatch.Append(Sub_MetaBatch); + batchKeySet.insert(batchKeySet.begin(),batchKeySet.end()); +} + } // namespace fielddb \ No newline at end of file diff --git a/fielddb/request.h b/fielddb/request.h index e1ab1e8..de391e0 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -24,6 +24,7 @@ public: iCreateReq_t, iDeleteReq_t, DeleteReq_t, + BatchReq_t, }; public: @@ -41,6 +42,7 @@ public: inline bool isiCreateReq() { return type_ == iCreateReq_t; } inline bool isiDeleteReq() { return type_ == iDeleteReq_t; } inline bool isDeleteReq() { return type_ == DeleteReq_t; } + inline bool isBatchReq() { return type_ == BatchReq_t; } //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, @@ -130,6 +132,20 @@ public: std::string *Key; }; +class BatchReq : public Request { +public: + BatchReq(WriteBatch *Batch,port::Mutex *mu); + ~BatchReq(); + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + + WriteBatch *Batch; + std::deque sub_requests; + std::deque str_buf; + std::deque fa_buf; +}; + } #endif \ No newline at end of file diff --git a/util/serialize_value.cc b/util/serialize_value.cc index 562360b..88aa844 100644 --- a/util/serialize_value.cc +++ b/util/serialize_value.cc @@ -20,9 +20,10 @@ std::string SerializeValue(const FieldArray& fields){ return result; } -FieldArray *ParseValue(const std::string& value_str){ +FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){ Slice valueSlice(value_str); - FieldArray *res = new FieldArray; + // FieldArray *res = new FieldArray; + FieldArray *res = fields; Slice nameSlice = Slice(); Slice valSlice = Slice(); std::string nameStr; diff --git a/util/serialize_value.h b/util/serialize_value.h index b769fb8..a337bc6 100644 --- a/util/serialize_value.h +++ b/util/serialize_value.h @@ -12,7 +12,7 @@ using Field = std::pair; // field_name:field_value using FieldArray = std::vector>; std::string SerializeValue(const FieldArray& fields); -FieldArray *ParseValue(const std::string& value_str); +FieldArray *ParseValue(const std::string& value_str, FieldArray *fields); class InternalFieldArray { public: