Sfoglia il codice sorgente

Merge branch 'cyq' into ld

pull/1/head
augurier 9 mesi fa
parent
commit
c6e83ce91e
7 ha cambiato i file con 195 aggiunte e 7 eliminazioni
  1. +72
    -0
      3DB设计.md
  2. +21
    -2
      fielddb/field_db.cpp
  3. +31
    -5
      fielddb/field_db.h
  4. +20
    -0
      fielddb/metakv.cpp
  5. +26
    -0
      fielddb/metakv.h
  6. +0
    -0
      fielddb/request.cpp
  7. +25
    -0
      fielddb/request.h

+ 72
- 0
3DB设计.md Vedi File

@ -0,0 +1,72 @@
# 3DB
分别为kvDB,indexDB,metaDB。其中,前两个和之前存储内容相同,metaDB用来维护与并发控制相关的内容
为了达成一致性,这里需要要有一个全局唯一的seq或者timestamp。这个信息编码在userkey中,比较器要进行对应修改,metaDB要按照先后顺序排列,另外两个不用考虑timestamp
# 具体操作
这里将操作类型进行分类:普通put和get,索引put和get,创建(删除)索引。
接下来将根据这个分类进行阐述
## 普通put和get
普通get直接调用kvDB的get
普通put要判断当前是否进行索引put和创建(删除)索引操作。如果有,则需要进入taskqueue等待。
## 索引put
如果当前字段中没有需要创建索引的,那么就是普通put
如果当前没有正在创建(修改)的索引或者之前的对于同一个key的索引put,则首先向metaDB写入标记`(key,creating/deleting)`,表示事务的开始。然后构建请求,并向kvDB和indexDB分别写入。所有的请求的时间戳都和metaDB中标记的时间戳相同。全部完成之后,将之前在metaDB中的标记delete,表示事务的结束。
如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。
我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性
## 创建(删除)索引
在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。
之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。
我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性
## 索引get
如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。
# 一致性,崩溃恢复(recovery)
## 创建(删除)索引
如果meta中存在记录`(field,creating/deleting)`,则表示在创建(删除)索引的过程中的某个时间节点崩溃了。
如果在metaDB写入标记后崩溃,indexDB写入前崩溃。由于这个操作只涉及indexDB,且与当前创建(删除)索引相关的写入请求都被阻塞了,所以只要再扫描一遍全部的kvDB构造index写入请求就可以了。
如果是在indexDB写入完成之后崩溃,这实际上已经完成了创建(删除)索引操作,所以把metaDB中的标记清除即可。
这里最主要的问题在于如何判断崩溃时间点。对于写入标记前后通过metaDB中的记录判断。对于是否对indexDB完成写入通过能否在indexDB中找到对应的索引判断,因为索引是一个writebatch整体写入的,有原子性。
## 索引put
索引put涉及到kvDB和indexDB写入的一致性,这一点通过时间戳机制来保证。
如果metaDB中有记录`(key,creating/deleting)`,那么表明索引put过程中的某个时间节点崩溃了。由于kvDB和indexDB的写入是并发进行的,所以可能会出现四种情况:
1. kvDB和indexDB写入均未完成
2. kvDB写入完成而indexDB未完成。
3. kvDB写入未完成而indexDB写入完成
4. 两者写入都完成,但是没有清除记录
kvDB的写入情况的判断如下:通过metaDB中的记录的key,查询kvDB。如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting,如果存在记录,则表明写入未完成。
indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting操作,如果存在对应的二级索引,则表明写入未完成。如果在kvDB中能够得到相应的kv,可以通过kvDB中的kv查询二级索引。
分别讨论creating和deleting操作下的四种情况的崩溃恢复的过程:
1. 如果是creating,则清除metaDB中的记录;如果是deleting,则继续delete
2. 如果是creating,则根据kvDB的写入构造请求写入indexDB;如果是deleting,则遍历indexDB删除对应的索引
3. 如果是creating,且kvDB中有旧值,则将indexDB中所有相关的字段清除后根据旧值创建索引;如果是deleting,删除kvDB中的kv对
4. 直接清除记录
当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。
# 全写入方案
不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB
# 整体架构
采用多线程架构
由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入?
如果这么看的话,其实创建(删除)索引的操作也不需要

+ 21
- 2
fielddb/field_db.cpp Vedi File

@ -1,5 +1,6 @@
#include "fielddb/field_db.h"
#include <cstdint>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -7,13 +8,31 @@
#include "leveldb/status.h"
#include "util/serialize_value.h"
namespace leveldb {
namespace fielddb {
using namespace leveldb;
//TODO:打开fieldDB
static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) {
Status FieldDB::OpenFieldDB(const Options& options,const std::string& name,DB** dbptr) {
// options.env->CreateDir("./abc")
*dbptr = new FieldDB(options,name);
return Status::OK();
}
Status FieldDB::Recover() {
}
FieldDB::FieldDB(const Options& options,const std::string& name) {
Status status;
status = Open(options, name+"_indexDB", &indexDB);
if(!status.ok()) return;
status = Open(options, name+"_kvDB", &kvDB);
if(!status.ok()) return;
status = Open(options, name+"_metaDB", &metaDB);
if(!status.ok()) return;
Recover();
}
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
return kvDB->Put(options, key, value);

+ 31
- 5
fielddb/field_db.h Vedi File

@ -1,10 +1,21 @@
#include "db/db_impl.h"
#include <deque>
#include <map>
#include <set>
#include <string>
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb{
class FieldDB:leveldb::DB {
#include "port/port_stdcxx.h"
#include "fielddb/request.h"
namespace fielddb {
using namespace leveldb;
class FieldDB : leveldb::DB {
public:
FieldDB() = default;
FieldDB(const Options& options,const std::string& name);
/*lab1的要求*/
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override;
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override;
@ -24,10 +35,25 @@ public:
bool DeleteIndex(std::string &field_name);
std::vector<std::string> QueryByIndex(Field &field);
private:
static Status OpenFieldDB(const Options& options,const std::string& name,DB** dbptr);
leveldb::DBImpl *indexDB;
leveldb::DBImpl *kvDB;
private:
//metaDB的内容进行恢复
Status Recover();
private:
leveldb::DB *metaDB;
leveldb::DB *indexDB;
leveldb::DB *kvDB;
enum IndexStatus{
Creating,
Deleting,
Exist
};
std::map<std::string,int> index;
port::Mutex _mutex; // mutex for taskqueue
std::deque<Request *> taskqueue;
};
} // end of namespace

+ 20
- 0
fielddb/metakv.cpp Vedi File

@ -0,0 +1,20 @@
#include "fielddb/metakv.h"
#include "util/coding.h"
#include <string>
namespace fielddb {
using namespace leveldb;
Slice MetaKV::metaKey() {
std::string buf;
PutLengthPrefixedSlice(&buf, Key);
PutFixed64(&buf, meta_seq);
PutFixed32(&buf, tag);
return Slice(buf);
}
Slice MetaKV::metaValue() {
return Slice(SerializeValue(Fields));
}
}

+ 26
- 0
fielddb/metakv.h Vedi File

@ -0,0 +1,26 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
class MetaKV {
MetaKV(Slice &Key,FieldArray Fields):
Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
inline int get_seq() { return meta_seq; }
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
inline void setPut() { tag = PUT; }
inline void setDelete() { tag = DELETE; }
Slice metaKey();
Slice metaValue();
private:
enum {PUT = 0x0,DELETE = 0x1};
uint64_t meta_seq;
uint8_t tag;
Slice &Key;
FieldArray Fields;
};
}

+ 0
- 0
fielddb/request.cpp Vedi File


+ 25
- 0
fielddb/request.h Vedi File

@ -0,0 +1,25 @@
#include <string>
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
// taskqueue中的Requesttaskqueue最开始的线程处理一批Request
// write写入的思路类似
class Request {
public:
Request(std::string *Key,std::string *Value,port::Mutex *mu):
Key(Key),Value(Value),hasFields(false),_cond(mu) { }
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { }
private:
bool done;
port::CondVar _cond;
bool hasFields;
std::string *Key;
std::string *Value;
FieldArray *Fields;
};
}

Caricamento…
Annulla
Salva