diff --git a/3DB设计.md b/3DB设计.md index 3e155a0..b37079f 100644 --- a/3DB设计.md +++ b/3DB设计.md @@ -59,6 +59,13 @@ indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且 当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。 +# 全写入方案 +不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB + +# 整体架构 +采用多线程架构 +由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入? +如果这么看的话,其实创建(删除)索引的操作也不需要 diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 1de6565..5d28813 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -1,5 +1,6 @@ #include "fielddb/field_db.h" #include +#include #include #include "leveldb/db.h" #include "leveldb/env.h" @@ -7,13 +8,31 @@ #include "leveldb/status.h" #include "util/serialize_value.h" -namespace leveldb { +namespace fielddb { +using namespace leveldb; //TODO:打开fieldDB -static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) { +Status FieldDB::OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) { // options.env->CreateDir("./abc") + *dbptr = new FieldDB(options,name); return Status::OK(); } +Status FieldDB::Recover() { + +} + +FieldDB::FieldDB(const Options& options,const std::string& name) { + Status status; + status = Open(options, name+"_indexDB", &indexDB); + if(!status.ok()) return; + status = Open(options, name+"_kvDB", &kvDB); + if(!status.ok()) return; + status = Open(options, name+"_metaDB", &metaDB); + if(!status.ok()) return; + + Recover(); +} + Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { return kvDB->Put(options, key, value); diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 23534aa..411f564 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -1,10 +1,21 @@ #include "db/db_impl.h" +#include +#include +#include +#include #include "leveldb/db.h" #include "leveldb/options.h" +#include "leveldb/slice.h" #include "leveldb/status.h" -namespace leveldb{ -class FieldDB:leveldb::DB { +#include "port/port_stdcxx.h" +#include "fielddb/request.h" + +namespace fielddb { +using namespace leveldb; +class FieldDB : leveldb::DB { public: + FieldDB() = default; + FieldDB(const Options& options,const std::string& name); /*lab1的要求*/ Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; @@ -24,10 +35,25 @@ public: bool DeleteIndex(std::string &field_name); std::vector QueryByIndex(Field &field); -private: static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr); - leveldb::DBImpl *indexDB; - leveldb::DBImpl *kvDB; +private: + //根据metaDB的内容进行恢复 + Status Recover(); + +private: + leveldb::DB *metaDB; + leveldb::DB *indexDB; + leveldb::DB *kvDB; + + enum IndexStatus{ + Creating, + Deleting, + Exist + }; + std::map index; + port::Mutex _mutex; // mutex for taskqueue + std::deque taskqueue; + }; } // end of namespace \ No newline at end of file diff --git a/fielddb/metakv.cpp b/fielddb/metakv.cpp new file mode 100644 index 0000000..819030f --- /dev/null +++ b/fielddb/metakv.cpp @@ -0,0 +1,20 @@ +#include "fielddb/metakv.h" +#include "util/coding.h" +#include + +namespace fielddb { +using namespace leveldb; + +Slice MetaKV::metaKey() { + std::string buf; + PutLengthPrefixedSlice(&buf, Key); + PutFixed64(&buf, meta_seq); + PutFixed32(&buf, tag); + return Slice(buf); +} + +Slice MetaKV::metaValue() { + return Slice(SerializeValue(Fields)); +} + +} \ No newline at end of file diff --git a/fielddb/metakv.h b/fielddb/metakv.h new file mode 100644 index 0000000..f976830 --- /dev/null +++ b/fielddb/metakv.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include "leveldb/slice.h" +#include "util/serialize_value.h" +namespace fielddb { +using namespace leveldb; +/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ +class MetaKV { + MetaKV(Slice &Key,FieldArray Fields): + Key(Key),Fields(Fields),tag(0),meta_seq(0) { } + inline int get_seq() { return meta_seq; } + inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } + inline void setPut() { tag = PUT; } + inline void setDelete() { tag = DELETE; } + Slice metaKey(); + Slice metaValue(); +private: + enum {PUT = 0x0,DELETE = 0x1}; + uint64_t meta_seq; + uint8_t tag; + Slice &Key; + FieldArray Fields; +}; +} \ No newline at end of file diff --git a/fielddb/request.cpp b/fielddb/request.cpp new file mode 100644 index 0000000..e69de29 diff --git a/fielddb/request.h b/fielddb/request.h new file mode 100644 index 0000000..3831fed --- /dev/null +++ b/fielddb/request.h @@ -0,0 +1,25 @@ +#include +#include "port/port_stdcxx.h" +#include "util/mutexlock.h" +#include "util/serialize_value.h" +namespace fielddb { +using namespace leveldb; +// 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request +// 这个思路与write写入的思路类似 +class Request { +public: + Request(std::string *Key,std::string *Value,port::Mutex *mu): + Key(Key),Value(Value),hasFields(false),_cond(mu) { } + Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): + Key(Key),Fields(Fields),hasFields(false),_cond(mu) { } + +private: + bool done; + port::CondVar _cond; + + bool hasFields; + std::string *Key; + std::string *Value; + FieldArray *Fields; +}; +} \ No newline at end of file