From f464e0993328508c217ba20e3cb1db7d8c72c2c1 Mon Sep 17 00:00:00 2001 From: cyq <1056374449@qq.com> Date: Sun, 15 Dec 2024 15:43:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B6=E5=8F=91=E6=8E=A7=E5=88=B6=E7=9A=84?= =?UTF-8?q?=E5=9F=BA=E6=9C=AC=E6=A1=86=E6=9E=B6=E5=92=8C=E5=A4=A7=E9=83=A8?= =?UTF-8?q?=E5=88=86=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3DB设计.md | 29 ++++++-- fielddb/field_db.cpp | 202 +++++++++++++++++++++++++++++++++++++++++---------- fielddb/field_db.h | 28 +++++-- fielddb/meta.cpp | 58 +++++++++++++++ fielddb/meta.h | 55 ++++++++++++++ fielddb/metakv.cpp | 20 ----- fielddb/metakv.h | 26 ------- fielddb/request.cpp | 134 ++++++++++++++++++++++++++++++++++ fielddb/request.h | 123 ++++++++++++++++++++++++++++--- 9 files changed, 572 insertions(+), 103 deletions(-) create mode 100644 fielddb/meta.cpp create mode 100644 fielddb/meta.h delete mode 100644 fielddb/metakv.cpp delete mode 100644 fielddb/metakv.h diff --git a/3DB设计.md b/3DB设计.md index b37079f..29b8710 100644 --- a/3DB设计.md +++ b/3DB设计.md @@ -16,14 +16,14 @@ 如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。 -我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性 +想法:索引put涉及到了indexDB和kvDB两者之间的原子性 ## 创建(删除)索引 在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。 之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。 -我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性 +想法:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性 ## 索引get 如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。 @@ -63,10 +63,29 @@ indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且 不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB # 整体架构 -采用多线程架构 -由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入? -如果这么看的话,其实创建(删除)索引的操作也不需要 +基于request(类似于writer)来处理并发的请求。对于创建和删除索引操作,包含一个pending队列,来维护会受到影响的请求。 +# 有关实现的部分 + +1. 对于metaDB中存入数据的编码部分放在了metakv文件中 + +## TODO List +1. index和kv的写入应该放在两个线程中同时写入,这里为了实现的方便,暂时先后完成 + +2. 原版的env中的Schedule是使用单例模式,也就是所有的数据库都只有一个线程,我们这里 +需要所有的数据库都有属于自己的线程,且可能不止一个,因此需要实现类似于线程池的东西 + +## 一些想法 +1. 根据对于某一个Field搜索的频率和耗时,自动的创建索引,且这个索引会在长时间不用后被清除 + +# 有关KV分离的想法 +复用原有的log,将log信息加入到version信息中,需要更改version edit,version等内容 +log带上编号,从小到大的进行垃圾回收 +垃圾回收过程中形成的新的写入保留原有的seq放入L0 + +KV分离核心的困难之一在于垃圾回收的并发控制。我的核心想法是在回收log的时候,不进行合并操作,将 +回收得到的东西直接保留seqno放进L0。由于L0本身就是无序的,如果在垃圾回收的过程中产生了并发写入, +新的写入也只会写入到L0,这样只要等待下一次的合并就行了。 diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 637046e..93c433c 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -1,14 +1,19 @@ #include "fielddb/field_db.h" #include +#include #include +#include #include #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/options.h" #include "leveldb/status.h" +#include "leveldb/write_batch.h" #include "db/write_batch_internal.h" +#include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" +#include "fielddb/meta.h" namespace fielddb { using namespace leveldb; @@ -37,38 +42,144 @@ Status FieldDB::OpenFieldDB(const Options& options, (*dbptr)->dbname_ = name; status = (*dbptr)->Recover(); + + (*dbptr)->options_ = &options; + (*dbptr)->env_ = options.env; return status; } -// todo +// TODO:Recover Status FieldDB::Recover() { - // + //TODO: + //1. 遍历所有Index类型的meta,重建内存中的index_状态表 + //2. 寻找所有KV类型的meta,再次提交一遍请求 + //3. 等待所有请求完成 return Status::OK(); } +Request *FieldDB::GetHandleInterval() { + mutex_.AssertHeld(); //保证队列是互斥访问的 + Request *tail = taskqueue_.front(); + for(auto *req_ptr : taskqueue_) { + if(req_ptr->isDeleteReq() || req_ptr->isiCreateReq()) { + return tail; + } + tail = req_ptr; + } + return tail; +} + +Status FieldDB::HandleRequest(Request &req) { + MutexLock L(&mutex_); + taskqueue_.push_back(&req); +Again: + while(!req.done && &req != taskqueue_.front()) { + req.cond_.Wait(); + } + if(req.done) { + return req.s; //在返回时自动释放锁L + } + Request *tail = GetHandleInterval(); + WriteBatch KVBatch,IndexBatch,MetaBatch; + Status status; + if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { + //表明这一个区间并没有涉及index的创建删除 + { + //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 + MutexLock iL(&index_mu); + for(auto *req_ptr : taskqueue_) { + req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + if(req_ptr == tail) break; + } + } + //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 + //此处可以放锁是因为写入的有序性可以通过队列来保证 + mutex_.Unlock(); + WriteOptions op; + status = metaDB_->Write(op, &MetaBatch); + //TODO:index的写入需要在另外一个线程中同时完成 + status = indexDB_->Write(op, &IndexBatch); + status = kvDB_->Write(op, &KVBatch); + //3. 将meta数据清除 + MetaCleaner cleaner; + cleaner.Collect(MetaBatch); + cleaner.CleanMetaBatch(metaDB_); + mutex_.Lock(); + } else { + //对于创建和删除索引的请求,通过prepare完成索引状态的更新 + MutexLock iL(&index_mu); + req.Prepare(this); + } + + while(true) { + Request *ready = taskqueue_.front(); + taskqueue_.pop_front(); + //当前ready不是队首,不是和index的创建有关 + if(ready != &req && !ready->isPending() && + !req.isiCreateReq() && !req.isiDeleteReq()) { + ready->s = status; + ready->done = true; + ready->cond_.Signal(); + } + if (ready == tail) break; + } + if(!taskqueue_.empty()) { + taskqueue_.front()->cond_.Signal(); + } + //如果done==true,那么就不会继续等待直接退出 + //如果处于某个请求的pending list里面,那么就会继续等待重新入队 + //这里用了万恶的goto,蛤蛤 + goto Again; + + // return status; +} + + +//这里把一个空串作为常规put的name Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { - return kvDB_->Put(options, key, value); + FieldArray FA = {{"",value.ToString()}}; + return PutFields(options, key, FA); + // return kvDB_->Put(options, key, value); } // TODO:需要对是否进行index更新做处理 Status FieldDB::PutFields(const WriteOptions &Options, const Slice &key, const FieldArray &fields) { - // - return kvDB_->PutFields(Options, key, fields); + //这里是为了const和slice-string的转换被迫搞得 + std::string key_ = key.ToString(); + FieldArray fields_ = fields; + + FieldsReq req(&key_,&fields_,&mutex_); + + Status status = HandleRequest(req); + return status; + // return kvDB_->PutFields(Options, key, fields); } // todo: 删除有索引的key时indexdb也要同步 Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { // - return kvDB_->Delete(options, key); + std::string key_ = key.ToString(); + DeleteReq req(&key_,&mutex_); + Status status = HandleRequest(req); + return status; + // return kvDB_->Delete(options, key); } // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { + //或许应该再做一个接口?或者基于现有的接口进行改造 return Status::OK(); } - +//由于常规put将空串作为name,这里也需要适当修改 Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { - return kvDB_->Get(options, key, value); + // return kvDB_->Get(options, key, value); + FieldArray fields; + Status s = GetFields(options, key, &fields); + if(!s.ok()) { + return s; + } + *value = fields[0].second; + return s; } Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { @@ -99,45 +210,62 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) { //taskQueue相关 //写锁 是不是只需要给putfields设置一把锁就行 - std::vector> keysAndVal = - FindKeysAndValByFieldName(field_name); - WriteBatch writeBatch; - Slice value = Slice(); - for (auto &kvPair : keysAndVal){ - std::string indexKey; - AppendIndexKey(&indexKey, - ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); - writeBatch.Put(indexKey, value); - } - Status s = indexDB_->Write(WriteOptions(), &writeBatch); - if (!s.ok()) return s; - - index_[field_name] = Exist; - //唤醒taskqueue + // std::vector> keysAndVal = + // FindKeysAndValByFieldName(field_name); + // WriteBatch writeBatch; + // Slice value = Slice(); + // for (auto &kvPair : keysAndVal){ + // std::string indexKey; + // AppendIndexKey(&indexKey, + // ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); + // writeBatch.Put(indexKey, value); + // } + // Status s = indexDB_->Write(WriteOptions(), &writeBatch); + // if (!s.ok()) return s; + // index_[field_name].first = Exist; + // //唤醒taskqueue + // return s; + std::string Field = field_name; + iCreateReq req(&Field,&mutex_); + HandleRequest(req); + WriteBatch KVBatch,IndexBatch,MetaBatch; + req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + indexDB_->Write(WriteOptions(), &IndexBatch); + req.Finalize(this); + return req.s; } Status FieldDB::DeleteIndex(const std::string &field_name) { //taskQueue相关 //写锁 - std::vector> keysAndVal = - FindKeysAndValByFieldName(field_name); - WriteBatch writeBatch; - for (auto &kvPair : keysAndVal){ - std::string indexKey; - AppendIndexKey(&indexKey, - ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); - writeBatch.Delete(indexKey); - } - Status s = indexDB_->Write(WriteOptions(), &writeBatch); - if (!s.ok()) return s; + // std::vector> keysAndVal = + // FindKeysAndValByFieldName(field_name); + // WriteBatch writeBatch; + // for (auto &kvPair : keysAndVal){ + // std::string indexKey; + // AppendIndexKey(&indexKey, + // ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); + // writeBatch.Delete(indexKey); + // } + // Status s = indexDB_->Write(WriteOptions(), &writeBatch); + // if (!s.ok()) return s; - index_.erase(field_name); - //唤醒taskqueue + // index_.erase(field_name); + // //唤醒taskqueue + // return s; + std::string Field = field_name; + iDeleteReq req(&Field,&mutex_); + HandleRequest(req); + WriteBatch KVBatch,IndexBatch,MetaBatch; + req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + indexDB_->Write(WriteOptions(), &IndexBatch); + req.Finalize(this); + return req.s; } std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { - if (index_.count(field.first) == 0 || index_[field.first] != Exist){ + if (index_.count(field.first) == 0 || index_[field.first].first != Exist){ *s = Status::NotFound(Slice()); return std::vector(); } diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 5c7e8d5..c54b525 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -1,6 +1,3 @@ -# ifndef FIELD_DB_H -# define FIELD_DB_H - #include "port/port_stdcxx.h" #include "db/db_impl.h" #include @@ -8,16 +5,24 @@ #include #include #include "leveldb/db.h" +#include "leveldb/env.h" #include "leveldb/options.h" #include "leveldb/slice.h" #include "leveldb/status.h" - #include "fielddb/request.h" - +#include +# ifndef FIELD_DB_H +# define FIELD_DB_H namespace fielddb { using namespace leveldb; class FieldDB : DB { public: + friend class Request; + friend class FieldsReq; + friend class iCreateReq; + friend class iDeleteReq; + friend class DeleteReq; + //用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; /*lab1的要求,作为db派生类要实现的虚函数*/ @@ -47,6 +52,8 @@ private: private: std::string dbname_; + const Options *options_; + Env *env_; leveldb::DB *metaDB_; leveldb::DB *indexDB_; @@ -57,12 +64,21 @@ private: Deleting, Exist }; - std::map index_; + using FieldName = std::string; + // 标记index的状态,如果是creating/deleting,则会附带相应的请求 + std::map> index_; + port::Mutex index_mu; + leveldb::port::Mutex mutex_; // mutex for taskqueue std::deque taskqueue_; std::vector> FindKeysAndValByFieldName ( const std::string &fieldName); + + /*For request handling*/ + Status HandleRequest(Request &req); //每个请求自行构造请求后交由这个函数处理 + Request *GetHandleInterval(); //获得任务队列中的待处理区间,区间划分规则和原因见文档 + }; } // end of namespace # endif \ No newline at end of file diff --git a/fielddb/meta.cpp b/fielddb/meta.cpp new file mode 100644 index 0000000..970d1fb --- /dev/null +++ b/fielddb/meta.cpp @@ -0,0 +1,58 @@ +#include "fielddb/meta.h" +#include "util/coding.h" +#include +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/write_batch.h" + +namespace fielddb { +using namespace leveldb; + +// Slice MetaKV::metaKey() { +// std::string buf; +// PutLengthPrefixedSlice(&buf, Key); +// PutFixed64(&buf, meta_seq); +// PutFixed32(&buf, tag); +// return Slice(buf); +// } + +// Slice MetaKV::metaValue() { +// return Slice(SerializeValue(Fields)); +// } + + +//对于含有index field的put的meta编码为 (KV|Key,Value) +void MetaKV::Trans(Slice &MetaKey,Slice &MetaValue) { + MetaKey.clear(); + MetaValue.clear(); + std::string buf; + PutFixed32(&buf, KV_Creating); + PutLengthPrefixedSlice(&buf, Slice(*name)); + MetaKey = Slice(buf); + MetaValue = Slice(*value); +} + +class CleanerHandler : public WriteBatch::Handler { +public: + WriteBatch *NeedClean; + void Put(const Slice& key, const Slice& value) override { + //将所有之前put的meta数据进行delete + NeedClean->Delete(key); + } + void Delete(const Slice& key) override { + //所有的传入的MetaBatch都是Put的 + assert(0); + } +}; + +void MetaCleaner::Collect(WriteBatch &MetaBatch) { + CleanerHandler Handler; + Handler.NeedClean = &NeedClean; + MetaBatch.Iterate(&Handler); +} + +void MetaCleaner::CleanMetaBatch(DB *metaDB) { + if(NeedClean.ApproximateSize() == 0) return; + metaDB->Write(WriteOptions(), &NeedClean); +} +} \ No newline at end of file diff --git a/fielddb/meta.h b/fielddb/meta.h new file mode 100644 index 0000000..eb3a927 --- /dev/null +++ b/fielddb/meta.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include "leveldb/slice.h" +#include "leveldb/write_batch.h" +#include "util/serialize_value.h" +#include "fielddb/field_db.h" +namespace fielddb { +using namespace leveldb; +/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ +// class MetaKV { +// MetaKV(Slice &Key,FieldArray Fields): +// Key(Key),Fields(Fields),tag(0),meta_seq(0) { } +// inline int get_seq() { return meta_seq; } +// inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } +// inline void setPut() { tag = PUT; } +// inline void setDelete() { tag = DELETE; } +// Slice metaKey(); +// Slice metaValue(); +// private: +// enum {PUT = 0x0,DELETE = 0x1}; +// uint64_t meta_seq; +// uint8_t tag; +// Slice &Key; +// FieldArray Fields; +// }; + +enum MetaType { + Index, //记录index状态的meta + KV_Creating, //记录含有index field的put的meta + KV_Deleting, +}; + +//将一对(field_name,field_value)转换到metaDB中的KV表示 +class MetaKV { +public: + MetaKV(std::string *field_name,std::string *field_value): + name(field_name),value(field_value) { } + void Trans(Slice &MetaKey,Slice &MetaValue); +private: + std::string *name; + std::string *value; +}; + +class MetaCleaner { +public: + MetaCleaner() = default; + void Collect(WriteBatch &MetaBatch); + void CleanMetaBatch(DB *metaDB); +private: + WriteBatch NeedClean; +}; + +} \ No newline at end of file diff --git a/fielddb/metakv.cpp b/fielddb/metakv.cpp deleted file mode 100644 index 819030f..0000000 --- a/fielddb/metakv.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include "fielddb/metakv.h" -#include "util/coding.h" -#include - -namespace fielddb { -using namespace leveldb; - -Slice MetaKV::metaKey() { - std::string buf; - PutLengthPrefixedSlice(&buf, Key); - PutFixed64(&buf, meta_seq); - PutFixed32(&buf, tag); - return Slice(buf); -} - -Slice MetaKV::metaValue() { - return Slice(SerializeValue(Fields)); -} - -} \ No newline at end of file diff --git a/fielddb/metakv.h b/fielddb/metakv.h deleted file mode 100644 index f976830..0000000 --- a/fielddb/metakv.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include -#include -#include "leveldb/slice.h" -#include "util/serialize_value.h" -namespace fielddb { -using namespace leveldb; -/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ -class MetaKV { - MetaKV(Slice &Key,FieldArray Fields): - Key(Key),Fields(Fields),tag(0),meta_seq(0) { } - inline int get_seq() { return meta_seq; } - inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } - inline void setPut() { tag = PUT; } - inline void setDelete() { tag = DELETE; } - Slice metaKey(); - Slice metaValue(); -private: - enum {PUT = 0x0,DELETE = 0x1}; - uint64_t meta_seq; - uint8_t tag; - Slice &Key; - FieldArray Fields; -}; -} \ No newline at end of file diff --git a/fielddb/request.cpp b/fielddb/request.cpp index e69de29..36d82cf 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -0,0 +1,134 @@ +#include "fielddb/request.h" +#include +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "leveldb/write_batch.h" +#include "util/mutexlock.h" +#include "util/serialize_value.h" +#include "fielddb/encode_index.h" +#include "fielddb/field_db.h" +#include "fielddb/meta.h" +namespace fielddb { +using namespace leveldb; + +//为虚函数提供最基本的实现 +void Request::PendReq(Request *req) { + assert(0); +} + +//为虚函数提供最基本的实现 +void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) +{ + assert(0); +} + +void Request::Prepare(FieldDB *DB) { + assert(0); +} + +void Request::Finalize(FieldDB *DB) { + assert(0); +} + +//为虚函数提供最基本的实现 +bool Request::isPending() { + //pending中的请求的parent会指向所等待的请求(iCreate/iDelete) + return parent != this; +} + + +/*******FieldsReq*******/ +void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) +{ + KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); + bool HasIndex = false; + { + // MutexLock L(&DB->index_mu); //互斥访问索引状态表 + DB->index_mu.AssertHeld(); + //1.将存在冲突的put pend到对应的请求 + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") break; + if(DB->index_.count(field_name)) { + auto [index_status,parent_req] = DB->index_[field_name]; + if(index_status == FieldDB::Creating || index_status == FieldDB::Deleting) { + parent_req->PendReq(this); + return; + } else if(index_status == FieldDB::Exist) { + HasIndex = true; + } + assert(0); + } + } + //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB + if(HasIndex) { + Slice MetaKey,MetaValue; + std::string serialized = SerializeValue(*Fields); + MetaKV MKV = MetaKV(Key,&serialized); + MKV.Trans(MetaKey, MetaValue); + MetaBatch.Put(MetaKey, MetaValue); + } + //3.对于含有索引的field建立索引 + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") continue; + if(DB->index_.count(field_name)) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + *Key,field_name,field_value)); + IndexBatch.Put(indexKey, Slice()); + } + } + } +} + + +/*******DeleteReq*******/ +void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) +{ + //TODO: + //1. 读取当前的最新的键值对,判断是否存在含有键值对的field + //2.1 如果无,则正常构造delete + //2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 + //在kvDB和metaDB中写入对应的delete + //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 +} + +/*******iCreateReq*******/ +void iCreateReq::Prepare(FieldDB *DB) { + //在index_中完成索引状态更新,在这里可以避免重复创建 + DB->index_mu.AssertHeld(); + if(DB->index_.count(*Field)) { + auto [istatus,parent] = DB->index_[*Field]; + if(istatus == FieldDB::Exist) { + //如果已经完成建立索引,则返回成功 + done = true; + s = Status::OK(); + } else { + //如果正在创建或删除,那么进行等待 + parent->PendReq(this); + } + return; + } + //如果索引状态表中没有,则表示尚未创建,更新相应的状态 + //这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend + DB->index_[*Field] = {FieldDB::Creating,this}; + done = true; +} + +void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) +{ + //TODO:遍历数据库,构建二级索引到indexbatch,并且更新metaDB中的元数据为Index类型的(Field,Creating) + //这里或许不需要在metaDB中先写一遍? +} + +void iCreateReq::Finalize(FieldDB *DB) { + //TODO: + //1. 写入完成后,更新index状态表,并将metaDB的值改为Index类型的(Field,Existing) + //2. 将所有的pendinglist重新入队 + +} + +} \ No newline at end of file diff --git a/fielddb/request.h b/fielddb/request.h index 3831fed..731158d 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -1,25 +1,130 @@ +#include #include +#include "leveldb/status.h" +#include "leveldb/write_batch.h" #include "port/port_stdcxx.h" #include "util/mutexlock.h" #include "util/serialize_value.h" +// #include "fielddb/field_db.h" + +#ifndef REQUEST_H +#define REQUEST_H namespace fielddb { using namespace leveldb; // 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request // 这个思路与write写入的思路类似 +class FieldDB; class Request { public: - Request(std::string *Key,std::string *Value,port::Mutex *mu): - Key(Key),Value(Value),hasFields(false),_cond(mu) { } - Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): - Key(Key),Fields(Fields),hasFields(false),_cond(mu) { } + friend class FieldDB; + enum RequestType { + FieldsReq_t, + ValueReq_t, + iCreateReq_t, + iDeleteReq_t, + DeleteReq_t, + }; + +public: + // Request(std::string *Key,std::string *Value,port::Mutex *mu): + // Key(Key),Value(Value),hasFields(false),cond_(mu) { } + // Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): + // Key(Key),Fields(Fields),hasFields(true),cond_(mu) { } + Request(RequestType type,port::Mutex *mu): + type_(type),cond_(mu),done(false) { parent = this; }; + + virtual ~Request(); -private: + inline bool isFieldsReq() { return type_ == FieldsReq_t; } + // inline bool isValueReq() { return type_ == ValueReq_t; } + inline bool isiCreateReq() { return type_ == iCreateReq_t; } + inline bool isiDeleteReq() { return type_ == iDeleteReq_t; } + inline bool isDeleteReq() { return type_ == DeleteReq_t; } + + //用于含有Fields的 + virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB); + //主要用于icreate和idelete在队列中的注册当前状态 + virtual void Prepare(FieldDB *DB); + virtual void Finalize(FieldDB *DB); + + virtual void PendReq(Request *req); + bool isPending(); +// protected: bool done; - port::CondVar _cond; + Status s; + port::CondVar cond_; + RequestType type_; + Request *parent; +}; + +//含有field的put +class FieldsReq : public Request { +public: + FieldsReq(std::string *Key,FieldArray *Fields,port::Mutex *mu): + Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { }; + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; - bool hasFields; std::string *Key; - std::string *Value; FieldArray *Fields; }; -} \ No newline at end of file + +//不含有field的put,但是计划被弃用了 +// class ValueReq : public Request { +// public: +// ValueReq(std::string *Key,std::string *Value,port::Mutex *mu): +// Key(Key),Value(Value),Request(ValueReq_t,mu) { }; + +// std::string *Key; +// std::string *Value; +// }; + +//TODO:下面的Field什么的可能通过传引用的方式会更加好? + +//创建索引的request +class iCreateReq : public Request { +public: + iCreateReq(std::string *Field,port::Mutex *mu): + Field(Field),Request(iCreateReq_t, mu) { }; + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + void Prepare(FieldDB *DB) override; + void Finalize(FieldDB *DB) override; + + std::string *Field; + std::deque pending_list; +}; + +//删除索引的request +class iDeleteReq : public Request { +public: + iDeleteReq(std::string *Field,port::Mutex *mu): + Field(Field),Request(iDeleteReq_t, mu) { }; + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + void Prepare(FieldDB *DB) override; + void Finalize(FieldDB *DB) override; + + std::string *Field; + std::deque pending_list; +}; + +//删除key的request +class DeleteReq : public Request { +public: + DeleteReq(std::string *Key,port::Mutex *mu): + Key(Key),Request(DeleteReq_t,mu) { }; + + void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, + WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + + std::string *Key; +}; + +} + +#endif \ No newline at end of file