diff --git a/benchmarks/db_bench_FieldDB.cc b/benchmarks/db_bench_FieldDB.cc index 3abf7c5..0471ee5 100644 --- a/benchmarks/db_bench_FieldDB.cc +++ b/benchmarks/db_bench_FieldDB.cc @@ -870,7 +870,7 @@ class Benchmark { {"age", age}, {"tag", tag} }; - Slice value = SerializeValue(fields); + std::string value = SerializeValue(fields); batch.Put(key.slice(), value); bytes += value.size() + key.slice().size(); thread->stats.FinishedSingleOp(); diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 8a5abbd..a9ad365 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -2,13 +2,10 @@ #include #include #include -#include #include #include #include #include -#include "leveldb/c.h" -#include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" @@ -16,7 +13,6 @@ #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" -#include "db/write_batch_internal.h" #include "util/coding.h" #include "util/mutexlock.h" #include "util/serialize_value.h" @@ -234,10 +230,10 @@ Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice & // 需要对是否进行index更新做处理 Status FieldDB::PutFields(const WriteOptions &Options, const Slice &key, const FieldArray &fields) { - std::string key_ = key.ToString(); - FieldArray fields_ = fields; + // std::string key_ = key.ToString(); + // FieldArray fields_ = fields; - FieldsReq req(&key_,&fields_,&mutex_); + FieldsReq req(key,fields,&mutex_); Status status = HandleRequest(req, Options); return status; @@ -246,8 +242,8 @@ Status FieldDB::PutFields(const WriteOptions &Options, // 删除有索引的key时indexdb也要同步 Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { - std::string key_ = key.ToString(); - DeleteReq req(&key_,&mutex_); + // std::string key_ = key.ToString(); + DeleteReq req(key,&mutex_); Status status = HandleRequest(req, options); return status; } @@ -289,15 +285,15 @@ std::vector FieldDB::FindKeysByField(Field &field) { } std::vector> FieldDB::FindKeysAndValByFieldName ( - const std::string &fieldName){ + const Slice fieldName){ std::vector> result; auto iter = kvDB_->NewIterator(ReadOptions()); - std::string val; + Slice val; for(iter->SeekToFirst();iter->Valid();iter->Next()) { InternalFieldArray fields(iter->value()); - val = fields.ValOfName(fieldName); + val = fields.ValOfName(fieldName.ToString()); if(!val.empty()) { - result.push_back(std::make_pair(iter->key().ToString(), val)); + result.push_back(std::make_pair(iter->key().ToString(), val.ToString())); } } delete iter; @@ -305,8 +301,9 @@ std::vector> FieldDB::FindKeysAndValByFieldN } Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) { - std::string Field = field_name; - iCreateReq req(&Field,&mutex_); + // std::string Field = field_name; + // iCreateReq req(&Field,&mutex_); + iCreateReq req(field_name,&mutex_); HandleRequest(req, op); //如果已经存在索引,那么直接返回 if(req.Existed) { @@ -321,8 +318,8 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt } Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) { - std::string Field = field_name; - iDeleteReq req(&Field,&mutex_); + // std::string Field = field_name; + iDeleteReq req(field_name,&mutex_); HandleRequest(req, op); //如果已经被删除或者不存在,那么可以直接返回 if(req.Deleted) { diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 1135635..5b7973d 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -85,7 +85,7 @@ private: std::deque taskqueue_; std::vector> FindKeysAndValByFieldName ( - const std::string &fieldName); + const Slice fieldName); /*For request handling*/ Status HandleRequest(Request &req, const WriteOptions &op); //每个请求自行构造请求后交由这个函数处理 diff --git a/fielddb/meta.cpp b/fielddb/meta.cpp index 11e1241..08e9ebb 100644 --- a/fielddb/meta.cpp +++ b/fielddb/meta.cpp @@ -29,7 +29,7 @@ void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) { //但是slice中的指针指向的是析构的string对象的部分内存 std::string &buf = MetaKey; PutFixed32(&buf, KV_Creating); - PutLengthPrefixedSlice(&buf, Slice(*name)); + PutLengthPrefixedSlice(&buf, Slice(name)); // MetaKey = Slice(buf); // MetaValue = Slice(*value); } @@ -38,7 +38,7 @@ void MetaKV::TransDelete(std::string &MetaKey) { MetaKey.clear(); std::string &buf = MetaKey; PutFixed32(&buf, KV_Deleting); - PutLengthPrefixedSlice(&buf, Slice(*name)); + PutLengthPrefixedSlice(&buf, Slice(name)); // MetaKey = Slice(buf); } diff --git a/fielddb/meta.h b/fielddb/meta.h index 2766e88..c2f04cc 100644 --- a/fielddb/meta.h +++ b/fielddb/meta.h @@ -35,13 +35,13 @@ enum MetaType { //将一对(field_name,field_value)转换到metaDB中的KV表示 class MetaKV { public: - MetaKV(std::string *field_name,std::string *field_value = nullptr): + MetaKV(Slice field_name,Slice field_value = Slice()): name(field_name),value(field_value) { } void TransPut(std::string &MetaKey,std::string &MetaValue); void TransDelete(std::string &MetaKey); private: - std::string *name; - std::string *value; + Slice name; + Slice value; }; class MetaCleaner { diff --git a/fielddb/request.cpp b/fielddb/request.cpp index c08c58a..9c30f1d 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -50,15 +50,15 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, std::unordered_set &batchKeySet) { - if (batchKeySet.find(*Key) != batchKeySet.end()){ + if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(*Key); + batchKeySet.insert(Key.ToString()); } std::string val_str; Status s = Status::NotFound("test"); uint64_t start_ = DB->env_->NowMicros(); - s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str); DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; FieldArray *oldFields; if (s.IsNotFound()){ @@ -76,10 +76,10 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的put pend到对应的请求 - for(auto [field_name,field_value] : *Fields) { + for(auto [field_name,field_value] : SliceFields) { if(field_name == EMPTY) break; - if(DB->index_.count(field_name)) { - auto [index_status,parent_req] = DB->index_[field_name]; + if(DB->index_.count(field_name.ToString())) { + auto [index_status,parent_req] = DB->index_[field_name.ToString()]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this->parent); return; @@ -105,13 +105,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } - - KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); + std::string scrach = SerializeValue(SliceFields); + KVBatch.Put(Slice(Key), Slice(scrach)); //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex || HasOldIndex) { std::string MetaKey,MetaValue; - std::string serialized = SerializeValue(*Fields); - MetaKV MKV = MetaKV(Key,&serialized); + std::string serialized = SerializeValue(SliceFields); + MetaKV MKV = MetaKV(Key,serialized); MKV.TransPut(MetaKey, MetaValue); MetaBatch.Put(MetaKey, serialized); @@ -123,7 +123,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( - *Key,field_name,field_value)); + Key,field_name,field_value)); IndexBatch.Delete(indexKey); } } @@ -131,12 +131,12 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //3.2对于含有索引的field建立索引 if (HasIndex) { - for(auto [field_name,field_value] : *Fields) { + for(auto [field_name,field_value] : SliceFields) { if(field_name == EMPTY) continue; - if(DB->index_.count(field_name)) { + if(DB->index_.count(field_name.ToString())) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( - *Key,field_name,field_value)); + Key,field_name,field_value)); IndexBatch.Put(indexKey, Slice()); } } @@ -155,10 +155,10 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB, std::unordered_set &batchKeySet) { - if (batchKeySet.find(*Key) != batchKeySet.end()){ + if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(*Key); + batchKeySet.insert(Key.ToString()); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete @@ -166,11 +166,11 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //在kvDB和metaDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 std::string val_str; - Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); if (s.IsNotFound()) return; FieldArray *Fields = new FieldArray; ParseValue(val_str,Fields); - KVBatch.Delete(Slice(*Key)); + KVBatch.Delete(Slice(Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 @@ -189,7 +189,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //assert(0); } } - KVBatch.Delete(Slice(*Key)); + KVBatch.Delete(Slice(Key)); //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex) { std::string MetaKey; @@ -202,7 +202,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(DB->index_.count(field_name)) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( - *Key,field_name,field_value)); + Key,field_name,field_value)); IndexBatch.Delete(indexKey); } } @@ -215,8 +215,8 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void iCreateReq::Prepare(FieldDB *DB) { //在index_中完成索引状态更新,在这里可以避免重复创建 DB->index_mu.AssertHeld(); - if(DB->index_.count(*Field)) { - auto [istatus,parent] = DB->index_[*Field]; + if(DB->index_.count(Field.ToString())) { + auto [istatus,parent] = DB->index_[Field.ToString()]; if(istatus == IndexStatus::Exist) { //如果已经完成建立索引,则返回成功 done = true; @@ -230,7 +230,7 @@ void iCreateReq::Prepare(FieldDB *DB) { } //如果索引状态表中没有,则表示尚未创建,更新相应的状态 //这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend - DB->index_[*Field] = {IndexStatus::Creating,this}; + DB->index_[Field.ToString()] = {IndexStatus::Creating,this}; done = true; } @@ -246,12 +246,12 @@ void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 std::vector> keysAndVal = - DB->FindKeysAndValByFieldName(*Field); + DB->FindKeysAndValByFieldName(Field.ToString()); Slice value = Slice(); for (auto &kvPair : keysAndVal){ std::string indexKey; AppendIndexKey(&indexKey, - ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); + ParsedInternalIndexKey(kvPair.first, Field, kvPair.second)); IndexBatch.Put(indexKey, value); } } @@ -259,7 +259,7 @@ void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void iCreateReq::Finalize(FieldDB *DB) { //1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing)) MutexLock iL(&DB->index_mu); - DB->index_[*Field] = {IndexStatus::Exist, nullptr}; + DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr}; DB->index_mu.Unlock(); if (pending_list.empty()) return; @@ -278,15 +278,15 @@ void iCreateReq::Finalize(FieldDB *DB) { /*******iDeleteReq*******/ void iDeleteReq::Prepare(FieldDB *DB) { DB->index_mu.AssertHeld(); - if(DB->index_.count(*Field) == 0) { + if(DB->index_.count(Field.ToString()) == 0) { done = true; Deleted = true; s = Status::OK(); return ; } - auto [istatus,parent] = DB->index_[*Field]; + auto [istatus,parent] = DB->index_[Field.ToString()]; if(istatus == IndexStatus::Exist) { - DB->index_[*Field] = {IndexStatus::Deleting,this}; + DB->index_[Field.ToString()] = {IndexStatus::Deleting,this}; done = true; } else { //如果正在创建或者删除,那么pend到对应的请求上 @@ -303,19 +303,19 @@ void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { std::vector> keysAndVal = - DB->FindKeysAndValByFieldName(*Field); + DB->FindKeysAndValByFieldName(Field); Slice value = Slice(); for (auto &kvPair : keysAndVal){ std::string indexKey; AppendIndexKey(&indexKey, - ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); + ParsedInternalIndexKey(kvPair.first, Field, kvPair.second)); IndexBatch.Delete(indexKey); } } void iDeleteReq::Finalize(FieldDB *DB) { MutexLock iL(&DB->index_mu); - DB->index_.erase(*Field); + DB->index_.erase(Field.ToString()); DB->index_mu.Unlock(); if (pending_list.empty()) return; @@ -337,37 +337,37 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): struct BatchHandler : WriteBatch::Handler { void Put(const Slice &key, const Slice &value) override { //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 - str_buf->push_back(key.ToString()); - FieldArray *field = new FieldArray; - field = ParseValue(value.ToString(), field); - if (field->empty()){ //batch中的value没有field - fa_buf->push_back({{EMPTY,value.ToString()}}); - } else { - fa_buf->push_back(*field); - } - - sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu)); + // str_buf->push_back(key.ToString()); + // FieldArray *field = new FieldArray; + // field = ParseValue(value.ToString(), field); + // if (field->empty()){ //batch中的value没有field + // fa_buf->push_back({{EMPTY,value.ToString()}}); + // } else { + // fa_buf->push_back(*field); + // } + //默认所有WriteBatch中的东西都是有Field的!!!!! + sub_requests->emplace_back(new FieldsReq(key,value,mu)); sub_requests->back()->parent = req; // delete field; } void Delete(const Slice &key) override { - str_buf->push_back(key.ToString()); - sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu)); + // str_buf->push_back(key.ToString()); + sub_requests->emplace_back(new DeleteReq(key,mu)); sub_requests->back()->parent = req; } BatchReq *req; port::Mutex *mu; - std::deque *str_buf; - std::deque *fa_buf; + // std::deque *str_buf; + // std::deque *fa_buf; std::deque *sub_requests; }; BatchHandler Handler; Handler.req = this; Handler.mu = mu; - Handler.str_buf = &str_buf; - Handler.fa_buf = &fa_buf; + // Handler.str_buf = &str_buf; + // Handler.fa_buf = &fa_buf; Handler.sub_requests = &sub_requests; Batch->Iterate(&Handler); diff --git a/fielddb/request.h b/fielddb/request.h index 19faf49..4e68596 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -1,8 +1,10 @@ #include #include +#include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" #include "port/port_stdcxx.h" +#include "util/coding.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include @@ -64,14 +66,31 @@ public: //含有field的put class FieldsReq : public Request { public: - FieldsReq(std::string *Key,FieldArray *Fields,port::Mutex *mu): - Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { }; + FieldsReq(Slice Key,const FieldArray &Fields,port::Mutex *mu): + Key(Key),Request(FieldsReq_t,mu) { + for(auto &[name,value] : Fields) { + SliceFields.push_back({name,value}); + } + }; + + FieldsReq(Slice Key, Slice Value,port::Mutex *mu): + Key(Key),Request(FieldsReq_t,mu) { + Slice nameSlice, valSlice; + while(GetLengthPrefixedSlice(&Value, &nameSlice)) { + if(GetLengthPrefixedSlice(&Value, &valSlice)) { + SliceFields.push_back({nameSlice,valSlice}); + } else { + std::cout << "name and val not match! From FieldsReq Init" << std::endl; + } + nameSlice.clear(), valSlice.clear(); + } + } void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; - std::string *Key; - FieldArray *Fields; + Slice Key; + FieldSliceArray SliceFields; }; //不含有field的put,但是计划被弃用了 @@ -89,7 +108,7 @@ public: //创建索引的request class iCreateReq : public Request { public: - iCreateReq(std::string *Field,port::Mutex *mu): + iCreateReq(Slice Field,port::Mutex *mu): Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, @@ -99,14 +118,14 @@ public: void PendReq(Request *req) override; bool Existed; - std::string *Field; + Slice Field; std::deque pending_list; }; //删除索引的request class iDeleteReq : public Request { public: - iDeleteReq(std::string *Field,port::Mutex *mu): + iDeleteReq(Slice Field,port::Mutex *mu): Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, @@ -116,20 +135,20 @@ public: void PendReq(Request *req) override; bool Deleted; - std::string *Field; + Slice Field; std::deque pending_list; }; //删除key的request class DeleteReq : public Request { public: - DeleteReq(std::string *Key,port::Mutex *mu): + DeleteReq(Slice Key,port::Mutex *mu): Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; - std::string *Key; + Slice Key; }; class BatchReq : public Request { @@ -142,8 +161,8 @@ public: WriteBatch *Batch; std::deque sub_requests; - std::deque str_buf; - std::deque fa_buf; + // std::deque str_buf; + // std::deque fa_buf; }; } diff --git a/util/serialize_value.cc b/util/serialize_value.cc index 73fb092..f60ba30 100644 --- a/util/serialize_value.cc +++ b/util/serialize_value.cc @@ -3,6 +3,7 @@ #include #include "util/coding.h" #include +#include "leveldb/slice.h" namespace leveldb{ bool compareByFirst(const Field& a, const Field& b) { @@ -20,6 +21,21 @@ std::string SerializeValue(const FieldArray& fields){ return result; } +std::string SerializeValue(const FieldSliceArray& fields) { + using pss = std::pair; + FieldSliceArray sortFields = fields; + std::sort(sortFields.begin(), sortFields.end(), + [&](const pss &lhs, const pss &rhs) { + return lhs.first.compare(rhs.first); + }); + std::string result; + for (const pss& pairs : sortFields) { + PutLengthPrefixedSlice(&result, pairs.first); + PutLengthPrefixedSlice(&result, pairs.second); + } + return result; +} + FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){ Slice valueSlice(value_str); // FieldArray *res = new FieldArray; @@ -74,22 +90,22 @@ bool InternalFieldArray::HasField(const Field& field) { return std::find(fields.begin(),fields.end(),field) != fields.end(); } -std::string InternalFieldArray::ValOfName(const std::string &name) { +Slice InternalFieldArray::ValOfName(const std::string &name) { if(isMapped) { if(map.count(name)) { return map[name]; } - return std::string(); + return Slice(); } for (auto iter = fields.begin(); iter != fields.end(); iter++){ if (iter->first == name) { return iter->second; } else if (iter->first > name) { - return std::string(); + return Slice(); } } - return std::string(); + return Slice(); } } \ No newline at end of file diff --git a/util/serialize_value.h b/util/serialize_value.h index 2405773..ba677c1 100644 --- a/util/serialize_value.h +++ b/util/serialize_value.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include "leveldb/slice.h" @@ -10,13 +11,15 @@ namespace leveldb{ using Field = std::pair; // field_name:field_value using FieldArray = std::vector>; +using FieldSliceArray = std::vector>; std::string SerializeValue(const FieldArray& fields); +std::string SerializeValue(const FieldSliceArray& fields); FieldArray *ParseValue(const std::string& value_str, FieldArray *fields); class InternalFieldArray { public: - using FieldMap = std::map; + using FieldMap = std::map; InternalFieldArray(const FieldArray &fields, bool to_map = false): fields(fields),isMapped(false) { @@ -29,7 +32,7 @@ public: Slice nameSlice, valSlice; while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) { if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) { - map[nameSlice.ToString()] = valSlice.ToString(); + map[nameSlice.ToString()] = valSlice; } else { std::cout << "name and val not match! From InternalFieldArray" << std::endl; } @@ -48,7 +51,7 @@ public: std::string Serialize(); bool HasField(const Field& field); - std::string ValOfName(const std::string& name); + Slice ValOfName(const std::string& name); private: bool isMapped;