Browse Source

一些初步的想法和实现,可能需要大改

gyf
cyq 9 months ago
parent
commit
26bdb79590
7 changed files with 130 additions and 7 deletions
  1. +7
    -0
      3DB设计.md
  2. +21
    -2
      fielddb/field_db.cpp
  3. +31
    -5
      fielddb/field_db.h
  4. +20
    -0
      fielddb/metakv.cpp
  5. +26
    -0
      fielddb/metakv.h
  6. +0
    -0
      fielddb/request.cpp
  7. +25
    -0
      fielddb/request.h

+ 7
- 0
3DB设计.md View File

@ -59,6 +59,13 @@ indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且
当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。 当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。
# 全写入方案
不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB
# 整体架构
采用多线程架构
由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入?
如果这么看的话,其实创建(删除)索引的操作也不需要

+ 21
- 2
fielddb/field_db.cpp View File

@ -1,5 +1,6 @@
#include "fielddb/field_db.h" #include "fielddb/field_db.h"
#include <cstdint> #include <cstdint>
#include <string>
#include <vector> #include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -7,13 +8,31 @@
#include "leveldb/status.h" #include "leveldb/status.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
namespace leveldb {
namespace fielddb {
using namespace leveldb;
//TODO:打开fieldDB //TODO:打开fieldDB
static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) {
Status FieldDB::OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) {
// options.env->CreateDir("./abc") // options.env->CreateDir("./abc")
*dbptr = new FieldDB(options,name);
return Status::OK(); return Status::OK();
} }
Status FieldDB::Recover() {
}
FieldDB::FieldDB(const Options& options,const std::string& name) {
Status status;
status = Open(options, name+"_indexDB", &indexDB);
if(!status.ok()) return;
status = Open(options, name+"_kvDB", &kvDB);
if(!status.ok()) return;
status = Open(options, name+"_metaDB", &metaDB);
if(!status.ok()) return;
Recover();
}
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
return kvDB->Put(options, key, value); return kvDB->Put(options, key, value);

+ 31
- 5
fielddb/field_db.h View File

@ -1,10 +1,21 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include <deque>
#include <map>
#include <set>
#include <string>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
namespace leveldb{
class FieldDB:leveldb::DB {
#include "port/port_stdcxx.h"
#include "fielddb/request.h"
namespace fielddb {
using namespace leveldb;
class FieldDB : leveldb::DB {
public: public:
FieldDB() = default;
FieldDB(const Options& options,const std::string& name);
/*lab1的要求*/ /*lab1的要求*/
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override;
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override;
@ -24,10 +35,25 @@ public:
bool DeleteIndex(std::string &field_name); bool DeleteIndex(std::string &field_name);
std::vector<std::string> QueryByIndex(Field &field); std::vector<std::string> QueryByIndex(Field &field);
private:
static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr); static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr);
leveldb::DBImpl *indexDB;
leveldb::DBImpl *kvDB;
private:
//metaDB的内容进行恢复
Status Recover();
private:
leveldb::DB *metaDB;
leveldb::DB *indexDB;
leveldb::DB *kvDB;
enum IndexStatus{
Creating,
Deleting,
Exist
};
std::map<std::string,int> index;
port::Mutex _mutex; // mutex for taskqueue
std::deque<Request *> taskqueue;
}; };
} // end of namespace } // end of namespace

+ 20
- 0
fielddb/metakv.cpp View File

@ -0,0 +1,20 @@
#include "fielddb/metakv.h"
#include "util/coding.h"
#include <string>
namespace fielddb {
using namespace leveldb;
Slice MetaKV::metaKey() {
std::string buf;
PutLengthPrefixedSlice(&buf, Key);
PutFixed64(&buf, meta_seq);
PutFixed32(&buf, tag);
return Slice(buf);
}
Slice MetaKV::metaValue() {
return Slice(SerializeValue(Fields));
}
}

+ 26
- 0
fielddb/metakv.h View File

@ -0,0 +1,26 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
class MetaKV {
MetaKV(Slice &Key,FieldArray Fields):
Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
inline int get_seq() { return meta_seq; }
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
inline void setPut() { tag = PUT; }
inline void setDelete() { tag = DELETE; }
Slice metaKey();
Slice metaValue();
private:
enum {PUT = 0x0,DELETE = 0x1};
uint64_t meta_seq;
uint8_t tag;
Slice &Key;
FieldArray Fields;
};
}

+ 0
- 0
fielddb/request.cpp View File


+ 25
- 0
fielddb/request.h View File

@ -0,0 +1,25 @@
#include <string>
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
// taskqueue中的Requesttaskqueue最开始的线程处理一批Request
// write写入的思路类似
class Request {
public:
Request(std::string *Key,std::string *Value,port::Mutex *mu):
Key(Key),Value(Value),hasFields(false),_cond(mu) { }
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { }
private:
bool done;
port::CondVar _cond;
bool hasFields;
std::string *Key;
std::string *Value;
FieldArray *Fields;
};
}

Loading…
Cancel
Save