瀏覽代碼

并发控制的基本框架和大部分实现

gyf
cyq 8 月之前
父節點
當前提交
f464e09933
共有 9 個文件被更改,包括 572 次插入103 次删除
  1. +24
    -5
      3DB设计.md
  2. +165
    -37
      fielddb/field_db.cpp
  3. +22
    -6
      fielddb/field_db.h
  4. +58
    -0
      fielddb/meta.cpp
  5. +55
    -0
      fielddb/meta.h
  6. +0
    -20
      fielddb/metakv.cpp
  7. +0
    -26
      fielddb/metakv.h
  8. +134
    -0
      fielddb/request.cpp
  9. +114
    -9
      fielddb/request.h

+ 24
- 5
3DB设计.md 查看文件

@ -16,14 +16,14 @@
如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。 如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。
我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性
想法:索引put涉及到了indexDB和kvDB两者之间的原子性
## 创建(删除)索引 ## 创建(删除)索引
在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。 在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。
之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。 之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。
我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性
想法:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性
## 索引get ## 索引get
如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。 如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。
@ -63,10 +63,29 @@ indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且
不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB 不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB
# 整体架构 # 整体架构
采用多线程架构
由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入?
如果这么看的话,其实创建(删除)索引的操作也不需要
基于request(类似于writer)来处理并发的请求。对于创建和删除索引操作,包含一个pending队列,来维护会受到影响的请求。
# 有关实现的部分
1. 对于metaDB中存入数据的编码部分放在了metakv文件中
## TODO List
1. index和kv的写入应该放在两个线程中同时写入,这里为了实现的方便,暂时先后完成
2. 原版的env中的Schedule是使用单例模式,也就是所有的数据库都只有一个线程,我们这里
需要所有的数据库都有属于自己的线程,且可能不止一个,因此需要实现类似于线程池的东西
## 一些想法
1. 根据对于某一个Field搜索的频率和耗时,自动的创建索引,且这个索引会在长时间不用后被清除
# 有关KV分离的想法
复用原有的log,将log信息加入到version信息中,需要更改version edit,version等内容
log带上编号,从小到大的进行垃圾回收
垃圾回收过程中形成的新的写入保留原有的seq放入L0
KV分离核心的困难之一在于垃圾回收的并发控制。我的核心想法是在回收log的时候,不进行合并操作,将
回收得到的东西直接保留seqno放进L0。由于L0本身就是无序的,如果在垃圾回收的过程中产生了并发写入,
新的写入也只会写入到L0,这样只要等待下一次的合并就行了。

+ 165
- 37
fielddb/field_db.cpp 查看文件

@ -1,14 +1,19 @@
#include "fielddb/field_db.h" #include "fielddb/field_db.h"
#include <cstdint> #include <cstdint>
#include <cstdio>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
#include "fielddb/encode_index.h" #include "fielddb/encode_index.h"
#include "fielddb/meta.h"
namespace fielddb { namespace fielddb {
using namespace leveldb; using namespace leveldb;
@ -37,38 +42,144 @@ Status FieldDB::OpenFieldDB(const Options& options,
(*dbptr)->dbname_ = name; (*dbptr)->dbname_ = name;
status = (*dbptr)->Recover(); status = (*dbptr)->Recover();
(*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env;
return status; return status;
} }
// todo
// TODO:Recover
Status FieldDB::Recover() { Status FieldDB::Recover() {
//
//TODO:
//1. 遍历所有Index类型的meta,重建内存中的index_状态表
//2. 寻找所有KV类型的meta,再次提交一遍请求
//3. 等待所有请求完成
return Status::OK(); return Status::OK();
} }
Request *FieldDB::GetHandleInterval() {
mutex_.AssertHeld(); //保证队列是互斥访问的
Request *tail = taskqueue_.front();
for(auto *req_ptr : taskqueue_) {
if(req_ptr->isDeleteReq() || req_ptr->isiCreateReq()) {
return tail;
}
tail = req_ptr;
}
return tail;
}
Status FieldDB::HandleRequest(Request &req) {
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
Again:
while(!req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
if(req.done) {
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
if(req_ptr == tail) break;
}
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
WriteOptions op;
status = metaDB_->Write(op, &MetaBatch);
//TODO:index的写入需要在另外一个线程中同时完成
status = indexDB_->Write(op, &IndexBatch);
status = kvDB_->Write(op, &KVBatch);
//3. 将meta数据清除
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
while(true) {
Request *ready = taskqueue_.front();
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(ready != &req && !ready->isPending() &&
!req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
ready->cond_.Signal();
}
if (ready == tail) break;
}
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
//这里用了万恶的goto,蛤蛤
goto Again;
// return status;
}
//这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
return kvDB_->Put(options, key, value);
FieldArray FA = {{"",value.ToString()}};
return PutFields(options, key, FA);
// return kvDB_->Put(options, key, value);
} }
// TODO:需要对是否进行index更新做处理 // TODO:需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options, Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) { const Slice &key, const FieldArray &fields) {
//
return kvDB_->PutFields(Options, key, fields);
//这里是为了const和slice-string的转换被迫搞得
std::string key_ = key.ToString();
FieldArray fields_ = fields;
FieldsReq req(&key_,&fields_,&mutex_);
Status status = HandleRequest(req);
return status;
// return kvDB_->PutFields(Options, key, fields);
} }
// todo: 删除有索引的key时indexdb也要同步 // todo: 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
// //
return kvDB_->Delete(options, key);
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req);
return status;
// return kvDB_->Delete(options, key);
} }
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
//或许应该再做一个接口?或者基于现有的接口进行改造
return Status::OK(); return Status::OK();
} }
//由于常规put将空串作为name,这里也需要适当修改
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
return kvDB_->Get(options, key, value);
// return kvDB_->Get(options, key, value);
FieldArray fields;
Status s = GetFields(options, key, &fields);
if(!s.ok()) {
return s;
}
*value = fields[0].second;
return s;
} }
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) {
@ -99,45 +210,62 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) {
//taskQueue相关 //taskQueue相关
//写锁 是不是只需要给putfields设置一把锁就行 //写锁 是不是只需要给putfields设置一把锁就行
std::vector<std::pair<std::string, std::string>> keysAndVal =
FindKeysAndValByFieldName(field_name);
WriteBatch writeBatch;
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
writeBatch.Put(indexKey, value);
}
Status s = indexDB_->Write(WriteOptions(), &writeBatch);
if (!s.ok()) return s;
index_[field_name] = Exist;
//唤醒taskqueue
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// Slice value = Slice();
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Put(indexKey, value);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
// index_[field_name].first = Exist;
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
HandleRequest(req);
WriteBatch KVBatch,IndexBatch,MetaBatch;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
} }
Status FieldDB::DeleteIndex(const std::string &field_name) { Status FieldDB::DeleteIndex(const std::string &field_name) {
//taskQueue相关 //taskQueue相关
//写锁 //写锁
std::vector<std::pair<std::string, std::string>> keysAndVal =
FindKeysAndValByFieldName(field_name);
WriteBatch writeBatch;
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
writeBatch.Delete(indexKey);
}
Status s = indexDB_->Write(WriteOptions(), &writeBatch);
if (!s.ok()) return s;
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Delete(indexKey);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
index_.erase(field_name);
//唤醒taskqueue
// index_.erase(field_name);
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
HandleRequest(req);
WriteBatch KVBatch,IndexBatch,MetaBatch;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
} }
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) { std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) {
if (index_.count(field.first) == 0 || index_[field.first] != Exist){
if (index_.count(field.first) == 0 || index_[field.first].first != Exist){
*s = Status::NotFound(Slice()); *s = Status::NotFound(Slice());
return std::vector<std::string>(); return std::vector<std::string>();
} }

+ 22
- 6
fielddb/field_db.h 查看文件

@ -1,6 +1,3 @@
# ifndef FIELD_DB_H
# define FIELD_DB_H
#include "port/port_stdcxx.h" #include "port/port_stdcxx.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include <deque> #include <deque>
@ -8,16 +5,24 @@
#include <set> #include <set>
#include <string> #include <string>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "fielddb/request.h" #include "fielddb/request.h"
#include <shared_mutex>
# ifndef FIELD_DB_H
# define FIELD_DB_H
namespace fielddb { namespace fielddb {
using namespace leveldb; using namespace leveldb;
class FieldDB : DB { class FieldDB : DB {
public: public:
friend class Request;
friend class FieldsReq;
friend class iCreateReq;
friend class iDeleteReq;
friend class DeleteReq;
//FieldDB *db = new FieldDB()openDB *db //FieldDB *db = new FieldDB()openDB *db
FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {};
/*lab1的要求,作为db派生类要实现的虚函数*/ /*lab1的要求,作为db派生类要实现的虚函数*/
@ -47,6 +52,8 @@ private:
private: private:
std::string dbname_; std::string dbname_;
const Options *options_;
Env *env_;
leveldb::DB *metaDB_; leveldb::DB *metaDB_;
leveldb::DB *indexDB_; leveldb::DB *indexDB_;
@ -57,12 +64,21 @@ private:
Deleting, Deleting,
Exist Exist
}; };
std::map<std::string, int> index_;
using FieldName = std::string;
// index的状态,creating/deleting
std::map<FieldName, std::pair<IndexStatus,Request*>> index_;
port::Mutex index_mu;
leveldb::port::Mutex mutex_; // mutex for taskqueue leveldb::port::Mutex mutex_; // mutex for taskqueue
std::deque<Request *> taskqueue_; std::deque<Request *> taskqueue_;
std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName ( std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName (
const std::string &fieldName); const std::string &fieldName);
/*For request handling*/
Status HandleRequest(Request &req); //
Request *GetHandleInterval(); //
}; };
} // end of namespace } // end of namespace
# endif # endif

+ 58
- 0
fielddb/meta.cpp 查看文件

@ -0,0 +1,58 @@
#include "fielddb/meta.h"
#include "util/coding.h"
#include <string>
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
namespace fielddb {
using namespace leveldb;
// Slice MetaKV::metaKey() {
// std::string buf;
// PutLengthPrefixedSlice(&buf, Key);
// PutFixed64(&buf, meta_seq);
// PutFixed32(&buf, tag);
// return Slice(buf);
// }
// Slice MetaKV::metaValue() {
// return Slice(SerializeValue(Fields));
// }
//对于含有index field的put的meta编码为 (KV|Key,Value)
void MetaKV::Trans(Slice &MetaKey,Slice &MetaValue) {
MetaKey.clear();
MetaValue.clear();
std::string buf;
PutFixed32(&buf, KV_Creating);
PutLengthPrefixedSlice(&buf, Slice(*name));
MetaKey = Slice(buf);
MetaValue = Slice(*value);
}
class CleanerHandler : public WriteBatch::Handler {
public:
WriteBatch *NeedClean;
void Put(const Slice& key, const Slice& value) override {
//将所有之前put的meta数据进行delete
NeedClean->Delete(key);
}
void Delete(const Slice& key) override {
//所有的传入的MetaBatch都是Put的
assert(0);
}
};
void MetaCleaner::Collect(WriteBatch &MetaBatch) {
CleanerHandler Handler;
Handler.NeedClean = &NeedClean;
MetaBatch.Iterate(&Handler);
}
void MetaCleaner::CleanMetaBatch(DB *metaDB) {
if(NeedClean.ApproximateSize() == 0) return;
metaDB->Write(WriteOptions(), &NeedClean);
}
}

+ 55
- 0
fielddb/meta.h 查看文件

@ -0,0 +1,55 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "util/serialize_value.h"
#include "fielddb/field_db.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
// class MetaKV {
// MetaKV(Slice &Key,FieldArray Fields):
// Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
// inline int get_seq() { return meta_seq; }
// inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
// inline void setPut() { tag = PUT; }
// inline void setDelete() { tag = DELETE; }
// Slice metaKey();
// Slice metaValue();
// private:
// enum {PUT = 0x0,DELETE = 0x1};
// uint64_t meta_seq;
// uint8_t tag;
// Slice &Key;
// FieldArray Fields;
// };
enum MetaType {
Index, //index状态的meta
KV_Creating, //index field的put的meta
KV_Deleting,
};
//(field_name,field_value)metaDB中的KV表示
class MetaKV {
public:
MetaKV(std::string *field_name,std::string *field_value):
name(field_name),value(field_value) { }
void Trans(Slice &MetaKey,Slice &MetaValue);
private:
std::string *name;
std::string *value;
};
class MetaCleaner {
public:
MetaCleaner() = default;
void Collect(WriteBatch &MetaBatch);
void CleanMetaBatch(DB *metaDB);
private:
WriteBatch NeedClean;
};
}

+ 0
- 20
fielddb/metakv.cpp 查看文件

@ -1,20 +0,0 @@
#include "fielddb/metakv.h"
#include "util/coding.h"
#include <string>
namespace fielddb {
using namespace leveldb;
Slice MetaKV::metaKey() {
std::string buf;
PutLengthPrefixedSlice(&buf, Key);
PutFixed64(&buf, meta_seq);
PutFixed32(&buf, tag);
return Slice(buf);
}
Slice MetaKV::metaValue() {
return Slice(SerializeValue(Fields));
}
}

+ 0
- 26
fielddb/metakv.h 查看文件

@ -1,26 +0,0 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
class MetaKV {
MetaKV(Slice &Key,FieldArray Fields):
Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
inline int get_seq() { return meta_seq; }
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
inline void setPut() { tag = PUT; }
inline void setDelete() { tag = DELETE; }
Slice metaKey();
Slice metaValue();
private:
enum {PUT = 0x0,DELETE = 0x1};
uint64_t meta_seq;
uint8_t tag;
Slice &Key;
FieldArray Fields;
};
}

+ 134
- 0
fielddb/request.cpp 查看文件

@ -0,0 +1,134 @@
#include "fielddb/request.h"
#include <cassert>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/field_db.h"
#include "fielddb/meta.h"
namespace fielddb {
using namespace leveldb;
//为虚函数提供最基本的实现
void Request::PendReq(Request *req) {
assert(0);
}
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
assert(0);
}
void Request::Prepare(FieldDB *DB) {
assert(0);
}
void Request::Finalize(FieldDB *DB) {
assert(0);
}
//为虚函数提供最基本的实现
bool Request::isPending() {
//pending中的请求的parent会指向所等待的请求(iCreate/iDelete)
return parent != this;
}
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求
for(auto [field_name,field_value] : *Fields) {
if(field_name == "") break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == FieldDB::Creating || index_status == FieldDB::Deleting) {
parent_req->PendReq(this);
return;
} else if(index_status == FieldDB::Exist) {
HasIndex = true;
}
assert(0);
}
}
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
Slice MetaKey,MetaValue;
std::string serialized = SerializeValue(*Fields);
MetaKV MKV = MetaKV(Key,&serialized);
MKV.Trans(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, MetaValue);
}
//3.对于含有索引的field建立索引
for(auto [field_name,field_value] : *Fields) {
if(field_name == "") continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
}
}
}
}
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
//TODO:
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录
//在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
}
/*******iCreateReq*******/
void iCreateReq::Prepare(FieldDB *DB) {
//在index_中完成索引状态更新,在这里可以避免重复创建
DB->index_mu.AssertHeld();
if(DB->index_.count(*Field)) {
auto [istatus,parent] = DB->index_[*Field];
if(istatus == FieldDB::Exist) {
//如果已经完成建立索引,则返回成功
done = true;
s = Status::OK();
} else {
//如果正在创建或删除,那么进行等待
parent->PendReq(this);
}
return;
}
//如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[*Field] = {FieldDB::Creating,this};
done = true;
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
//TODO:遍历数据库,构建二级索引到indexbatch,并且更新metaDB中的元数据为Index类型的(Field,Creating)
//这里或许不需要在metaDB中先写一遍?
}
void iCreateReq::Finalize(FieldDB *DB) {
//TODO:
//1. 写入完成后,更新index状态表,并将metaDB的值改为Index类型的(Field,Existing)
//2. 将所有的pendinglist重新入队
}
}

+ 114
- 9
fielddb/request.h 查看文件

@ -1,25 +1,130 @@
#include <deque>
#include <string> #include <string>
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h" #include "port/port_stdcxx.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
// #include "fielddb/field_db.h"
#ifndef REQUEST_H
#define REQUEST_H
namespace fielddb { namespace fielddb {
using namespace leveldb; using namespace leveldb;
// taskqueue中的Requesttaskqueue最开始的线程处理一批Request // taskqueue中的Requesttaskqueue最开始的线程处理一批Request
// write写入的思路类似 // write写入的思路类似
class FieldDB;
class Request { class Request {
public: public:
Request(std::string *Key,std::string *Value,port::Mutex *mu):
Key(Key),Value(Value),hasFields(false),_cond(mu) { }
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { }
friend class FieldDB;
enum RequestType {
FieldsReq_t,
ValueReq_t,
iCreateReq_t,
iDeleteReq_t,
DeleteReq_t,
};
public:
// Request(std::string *Key,std::string *Value,port::Mutex *mu):
// Key(Key),Value(Value),hasFields(false),cond_(mu) { }
// Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
// Key(Key),Fields(Fields),hasFields(true),cond_(mu) { }
Request(RequestType type,port::Mutex *mu):
type_(type),cond_(mu),done(false) { parent = this; };
virtual ~Request();
private:
inline bool isFieldsReq() { return type_ == FieldsReq_t; }
// inline bool isValueReq() { return type_ == ValueReq_t; }
inline bool isiCreateReq() { return type_ == iCreateReq_t; }
inline bool isiDeleteReq() { return type_ == iDeleteReq_t; }
inline bool isDeleteReq() { return type_ == DeleteReq_t; }
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
virtual void PendReq(Request *req);
bool isPending();
// protected:
bool done; bool done;
port::CondVar _cond;
Status s;
port::CondVar cond_;
RequestType type_;
Request *parent;
};
//field的put
class FieldsReq : public Request {
public:
FieldsReq(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
bool hasFields;
std::string *Key; std::string *Key;
std::string *Value;
FieldArray *Fields; FieldArray *Fields;
}; };
}
//field的put
// class ValueReq : public Request {
// public:
// ValueReq(std::string *Key,std::string *Value,port::Mutex *mu):
// Key(Key),Value(Value),Request(ValueReq_t,mu) { };
// std::string *Key;
// std::string *Value;
// };
//TODO:Field什么的可能通过传引用的方式会更加好
//request
class iCreateReq : public Request {
public:
iCreateReq(std::string *Field,port::Mutex *mu):
Field(Field),Request(iCreateReq_t, mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
std::string *Field;
std::deque<Request *> pending_list;
};
//request
class iDeleteReq : public Request {
public:
iDeleteReq(std::string *Field,port::Mutex *mu):
Field(Field),Request(iDeleteReq_t, mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
std::string *Field;
std::deque<Request *> pending_list;
};
//key的request
class DeleteReq : public Request {
public:
DeleteReq(std::string *Key,port::Mutex *mu):
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
std::string *Key;
};
}
#endif

Loading…
取消
儲存