Przeglądaj źródła

Merge branch 'cyq' into ld

pull/2/head
augurier 8 miesięcy temu
rodzic
commit
c7bfb1f725
9 zmienionych plików z 164 dodań i 24 usunięć
  1. +1
    -1
      CMakeLists.txt
  2. +1
    -1
      db/db_impl.cc
  3. +50
    -1
      fielddb/field_db.cpp
  4. +9
    -7
      fielddb/meta.cpp
  5. +2
    -2
      fielddb/meta.h
  6. +81
    -9
      fielddb/request.cpp
  7. +16
    -0
      fielddb/request.h
  8. +3
    -2
      util/serialize_value.cc
  9. +1
    -1
      util/serialize_value.h

+ 1
- 1
CMakeLists.txt Wyświetl plik

@ -17,7 +17,7 @@ endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD)

+ 1
- 1
db/db_impl.cc Wyświetl plik

@ -1169,7 +1169,7 @@ Status DBImpl::GetFields(const ReadOptions& options, const Slice& key,
FieldArray* fields) {
std::string value;
Status s = DBImpl::Get(options, key, &value);
*fields = *ParseValue(value);
ParseValue(value,fields);
return s;
}

+ 50
- 1
fielddb/field_db.cpp Wyświetl plik

@ -6,10 +6,13 @@
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
@ -53,7 +56,48 @@ Status FieldDB::OpenFieldDB(const Options& options,
Status FieldDB::Recover() {
//TODO:
//1. 遍历所有Index类型的meta,重建内存中的index_状态表
Iterator *Iter = indexDB_->NewIterator(ReadOptions());
std::string IndexKey;
Iter->SeekToFirst();
while(Iter->Valid()) {
IndexKey = Iter->value().ToString();
ParsedInternalIndexKey ParsedIndex;
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex);
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr};
std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl;
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符)
//TODO:不知道这个做法有没有道理
std::string Seek;
PutLengthPrefixedSlice(&Seek, ParsedIndex.name_);
Seek.push_back(0xff);
Iter->Seek(Slice(Seek));
}
delete Iter;
//2. 寻找所有KV类型的meta,再次提交一遍请求
Iter = metaDB_->NewIterator(ReadOptions());
Slice MetaValue;
Iter->SeekToFirst();
while (Iter->Valid()) {
MetaValue = Iter->key();
MetaType type = MetaType(DecodeFixed32(MetaValue.data()));
MetaValue.remove_prefix(4);//移除头上的metaType的部分
if(type == KV_Creating) {
FieldArray fields;
ParseValue(Iter->value().ToString(), &fields);
PutFields(WriteOptions(), MetaValue, fields);
} else if(type == KV_Deleting) {
Delete(WriteOptions(), MetaValue);
} else {
assert(0 && "Invalid MetaType");
}
}
delete Iter;
//在所有的请求完成后,会自动把metaDB的内容清空。
Iter = metaDB_->NewIterator(ReadOptions());
Iter->SeekToFirst();
std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
delete Iter;
//3. 等待所有请求完成
return Status::OK();
}
@ -173,6 +217,10 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
//或许应该再做一个接口?或者基于现有的接口进行改造
BatchReq req(updates,&mutex_);
Status status = HandleRequest(req);
return status;
assert(0);
return Status::OK();
}
//由于常规put将空串作为name,这里也需要适当修改
@ -208,6 +256,7 @@ std::vector> FieldDB::FindKeysAndValByFieldN
result.push_back(std::make_pair(iter->key().ToString(), val));
}
}
delete iter;
return result;
}
@ -265,7 +314,7 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) {
}
break;
}
delete indexIterator;
*s = Status::OK();
return result;
}

+ 9
- 7
fielddb/meta.cpp Wyświetl plik

@ -22,22 +22,24 @@ using namespace leveldb;
//对于含有index field的put/delete的meta编码为 (KV|Key,Value)
void MetaKV::TransPut(Slice &MetaKey,Slice &MetaValue) {
void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) {
MetaKey.clear();
MetaValue.clear();
std::string buf;
//这里的改动是为了防止潜在的段错误。原来的写法中,slice(buf)对应的buf是局部的,在函数返回后,buf被销毁
//但是slice中的指针指向的是析构的string对象的部分内存
std::string &buf = MetaKey;
PutFixed32(&buf, KV_Creating);
PutLengthPrefixedSlice(&buf, Slice(*name));
MetaKey = Slice(buf);
MetaValue = Slice(*value);
// MetaKey = Slice(buf);
// MetaValue = Slice(*value);
}
void MetaKV::TransDelete(Slice &MetaKey) {
void MetaKV::TransDelete(std::string &MetaKey) {
MetaKey.clear();
std::string buf;
std::string &buf = MetaKey;
PutFixed32(&buf, KV_Deleting);
PutLengthPrefixedSlice(&buf, Slice(*name));
MetaKey = Slice(buf);
// MetaKey = Slice(buf);
}
class CleanerHandler : public WriteBatch::Handler {

+ 2
- 2
fielddb/meta.h Wyświetl plik

@ -37,8 +37,8 @@ class MetaKV {
public:
MetaKV(std::string *field_name,std::string *field_value = nullptr):
name(field_name),value(field_value) { }
void TransPut(Slice &MetaKey,Slice &MetaValue);
void TransDelete(Slice &MetaKey);
void TransPut(std::string &MetaKey,std::string &MetaValue);
void TransDelete(std::string &MetaKey);
private:
std::string *name;
std::string *value;

+ 81
- 9
fielddb/request.cpp Wyświetl plik

@ -1,8 +1,12 @@
#include "fielddb/request.h"
#include <cassert>
#include <deque>
#include <string>
#include <unordered_set>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
@ -56,7 +60,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if (s.IsNotFound()){
oldFields = nullptr;
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
oldFields = ParseValue(val_str);
oldFields = new FieldArray;
oldFields = ParseValue(val_str,oldFields);
} else {
assert(0);
}
@ -72,7 +77,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
@ -87,7 +92,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasOldIndex = true;
@ -100,11 +105,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
Slice MetaKey,MetaValue;
std::string MetaKey,MetaValue;
std::string serialized = SerializeValue(*Fields);
MetaKV MKV = MetaKV(Key,&serialized);
MKV.TransPut(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, serialized);
//3.1对于含有索引的oldfield删除索引
@ -136,6 +141,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
//优化:对于3.1,3.2中都有的索引只写一次
}
if(oldFields) delete oldFields;
}
@ -157,7 +164,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
if (s.IsNotFound()) return;
FieldArray *Fields = ParseValue(val_str);
FieldArray *Fields = new FieldArray;
ParseValue(val_str,Fields);
KVBatch.Delete(Slice(*Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
@ -168,7 +177,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
@ -179,7 +188,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
KVBatch.Delete(Slice(*Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
Slice MetaKey;
std::string MetaKey;
MetaKV MKV = MetaKV(Key);
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
MetaBatch.Put(MetaKey, Slice());
@ -195,6 +204,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
}
}
delete Fields;
}
/*******iCreateReq*******/
@ -210,7 +220,7 @@ void iCreateReq::Prepare(FieldDB *DB) {
s = Status::OK();
} else {
//如果正在创建或删除,那么进行等待
parent->PendReq(this);
parent->PendReq(this->parent);
}
return;
}
@ -317,5 +327,67 @@ void iDeleteReq::Finalize(FieldDB *DB) {
this->s = Status::OK();
}
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{"",value.ToString()}});
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
sub_requests->back()->parent = req;
}
void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
std::deque<std::string> *str_buf;
std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
Handler.str_buf = &str_buf;
Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
BatchReq::~BatchReq() {
while(!sub_requests.empty()) {
Request *req = sub_requests.front();
sub_requests.pop_front();
delete req;
}
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string *> Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
if(isPending()) {
return;
}
}
KVBatch.Append(Sub_KVBatch);
IndexBatch.Append(Sub_IndexBatch);
MetaBatch.Append(Sub_MetaBatch);
batchKeySet.insert(batchKeySet.begin(),batchKeySet.end());
}
} // namespace fielddb

+ 16
- 0
fielddb/request.h Wyświetl plik

@ -24,6 +24,7 @@ public:
iCreateReq_t,
iDeleteReq_t,
DeleteReq_t,
BatchReq_t,
};
public:
@ -41,6 +42,7 @@ public:
inline bool isiCreateReq() { return type_ == iCreateReq_t; }
inline bool isiDeleteReq() { return type_ == iDeleteReq_t; }
inline bool isDeleteReq() { return type_ == DeleteReq_t; }
inline bool isBatchReq() { return type_ == BatchReq_t; }
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
@ -130,6 +132,20 @@ public:
std::string *Key;
};
class BatchReq : public Request {
public:
BatchReq(WriteBatch *Batch,port::Mutex *mu);
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;
std::deque<std::string> str_buf;
std::deque<FieldArray> fa_buf;
};
}
#endif

+ 3
- 2
util/serialize_value.cc Wyświetl plik

@ -20,9 +20,10 @@ std::string SerializeValue(const FieldArray& fields){
return result;
}
FieldArray *ParseValue(const std::string& value_str){
FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){
Slice valueSlice(value_str);
FieldArray *res = new FieldArray;
// FieldArray *res = new FieldArray;
FieldArray *res = fields;
Slice nameSlice = Slice();
Slice valSlice = Slice();
std::string nameStr;

+ 1
- 1
util/serialize_value.h Wyświetl plik

@ -12,7 +12,7 @@ using Field = std::pair; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
std::string SerializeValue(const FieldArray& fields);
FieldArray *ParseValue(const std::string& value_str);
FieldArray *ParseValue(const std::string& value_str, FieldArray *fields);
class InternalFieldArray {
public:

Ładowanie…
Anuluj
Zapisz