diff --git a/.gitignore b/.gitignore index c4b2425..9e34c6c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,7 @@ # Build directory. build/ out/ + +# clangd +.cache/ +compile_commands.json \ No newline at end of file diff --git a/3DB设计.md b/3DB设计.md new file mode 100644 index 0000000..b37079f --- /dev/null +++ b/3DB设计.md @@ -0,0 +1,72 @@ +# 3DB +分别为kvDB,indexDB,metaDB。其中,前两个和之前存储内容相同,metaDB用来维护与并发控制相关的内容 +为了达成一致性,这里需要要有一个全局唯一的seq或者timestamp。这个信息编码在userkey中,比较器要进行对应修改,metaDB要按照先后顺序排列,另外两个不用考虑timestamp +# 具体操作 +这里将操作类型进行分类:普通put和get,索引put和get,创建(删除)索引。 +接下来将根据这个分类进行阐述 + +## 普通put和get +普通get直接调用kvDB的get +普通put要判断当前是否进行索引put和创建(删除)索引操作。如果有,则需要进入taskqueue等待。 + +## 索引put +如果当前字段中没有需要创建索引的,那么就是普通put + +如果当前没有正在创建(修改)的索引或者之前的对于同一个key的索引put,则首先向metaDB写入标记`(key,creating/deleting)`,表示事务的开始。然后构建请求,并向kvDB和indexDB分别写入。所有的请求的时间戳都和metaDB中标记的时间戳相同。全部完成之后,将之前在metaDB中的标记delete,表示事务的结束。 + +如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。 + +我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性 + +## 创建(删除)索引 +在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。 + +之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。 + +我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性 + +## 索引get +如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。 + +# 一致性,崩溃恢复(recovery) +## 创建(删除)索引 +如果meta中存在记录`(field,creating/deleting)`,则表示在创建(删除)索引的过程中的某个时间节点崩溃了。 + +如果在metaDB写入标记后崩溃,indexDB写入前崩溃。由于这个操作只涉及indexDB,且与当前创建(删除)索引相关的写入请求都被阻塞了,所以只要再扫描一遍全部的kvDB构造index写入请求就可以了。 + +如果是在indexDB写入完成之后崩溃,这实际上已经完成了创建(删除)索引操作,所以把metaDB中的标记清除即可。 + +这里最主要的问题在于如何判断崩溃时间点。对于写入标记前后通过metaDB中的记录判断。对于是否对indexDB完成写入通过能否在indexDB中找到对应的索引判断,因为索引是一个writebatch整体写入的,有原子性。 + +## 索引put +索引put涉及到kvDB和indexDB写入的一致性,这一点通过时间戳机制来保证。 + +如果metaDB中有记录`(key,creating/deleting)`,那么表明索引put过程中的某个时间节点崩溃了。由于kvDB和indexDB的写入是并发进行的,所以可能会出现四种情况: +1. kvDB和indexDB写入均未完成 +2. kvDB写入完成而indexDB未完成。 +3. kvDB写入未完成而indexDB写入完成 +4. 两者写入都完成,但是没有清除记录 + +kvDB的写入情况的判断如下:通过metaDB中的记录的key,查询kvDB。如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting,如果存在记录,则表明写入未完成。 + +indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且记录不存在或者得到的时间戳不等于metaDB记录的时间戳,则表明写入未完成;如果是deleting操作,如果存在对应的二级索引,则表明写入未完成。如果在kvDB中能够得到相应的kv,可以通过kvDB中的kv查询二级索引。 + +分别讨论creating和deleting操作下的四种情况的崩溃恢复的过程: +1. 如果是creating,则清除metaDB中的记录;如果是deleting,则继续delete +2. 如果是creating,则根据kvDB的写入构造请求写入indexDB;如果是deleting,则遍历indexDB删除对应的索引 +3. 如果是creating,且kvDB中有旧值,则将indexDB中所有相关的字段清除后根据旧值创建索引;如果是deleting,删除kvDB中的kv对 +4. 直接清除记录 + +当然,这些处理的方式会比较的细,总的来讲,只要kvDB完成写入,那么indexDB就可以完成更新;如果写入未完成,那么indexDB就需要用某种方式回滚。 + +# 全写入方案 +不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB + +# 整体架构 +采用多线程架构 +由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入? +如果这么看的话,其实创建(删除)索引的操作也不需要 + + + + diff --git a/CMakeLists.txt b/CMakeLists.txt index fda9e01..b70e461 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -190,6 +190,10 @@ target_sources(leveldb "util/options.cc" "util/random.h" "util/status.cc" + "util/serialize_value.h" + "util/serialize_value.cc" + "fielddb/field_db.cpp" + "fielddb/field_db.h" # Only CMake 3.3+ supports PUBLIC sources in targets exported by "install". $<$:PUBLIC> @@ -517,3 +521,8 @@ if(LEVELDB_INSTALL) DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" ) endif(LEVELDB_INSTALL) + +add_executable(lab1_test + "${PROJECT_SOURCE_DIR}/test/basic_function_test.cc" +) +target_link_libraries(lab1_test PRIVATE leveldb gtest) diff --git a/README.md b/README.md index ee9e43d..05ee763 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,4 @@ # 实验报告 仓库地址 https://gitea.shuishan.net.cn/10225501448/leveldb_proj2 + +新建文件时 cmakelist 120行下面记得加进去 \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index f96d245..49db131 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -34,6 +34,7 @@ #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/serialize_value.h" namespace leveldb { @@ -1164,6 +1165,27 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, return s; } +Status DBImpl::GetFields(const ReadOptions& options, const Slice& key, + FieldArray* fields) { + std::string value; + Status s = DBImpl::Get(options, key, &value); + *fields = *ParseValue(value); + return s; +} + +std::vector DBImpl::FindKeysByField(Field &field){ + std::vector result; + auto iter = NewIterator(ReadOptions()); + for(iter->SeekToFirst();iter->Valid();iter->Next()) { + // std::string k = iter->key().ToString(); + InternalFieldArray fields(iter->value()); + if(fields.HasField(field)) { + result.push_back(iter->key().ToString()); + } + } + return result; +} + Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; @@ -1198,6 +1220,10 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } +Status DBImpl::PutFields(const WriteOptions& o, const Slice& key, const FieldArray& fields) { + return DB::PutFields(o, key, fields); +} + Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1491,6 +1517,11 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { return Write(opt, &batch); } +Status DB::PutFields(const WriteOptions& opt, const Slice& key, const FieldArray& fields) { + std::string value = SerializeValue(fields); + return DB::Put(opt, key, value); +} + Status DB::Delete(const WriteOptions& opt, const Slice& key) { WriteBatch batch; batch.Delete(key); diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..6848077 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -17,6 +17,7 @@ #include "leveldb/env.h" #include "port/port.h" #include "port/thread_annotations.h" +#include "util/serialize_value.h" namespace leveldb { @@ -38,10 +39,16 @@ class DBImpl : public DB { // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; + Status PutFields(const WriteOptions&, const Slice& key, + const FieldArray& fields) override; + Status Delete(const WriteOptions&, const Slice& key) override; Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Get(const ReadOptions& options, const Slice& key, std::string* value) override; + Status GetFields(const ReadOptions& options, const Slice& key, + FieldArray* fields) override; + std::vector FindKeysByField(Field &field) override; Iterator* NewIterator(const ReadOptions&) override; const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; diff --git a/db/db_test.cc b/db/db_test.cc index a4a84cd..2a5668f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2117,6 +2117,9 @@ class ModelDB : public DB { Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override { return DB::Put(o, k, v); } + Status PutFields(const WriteOptions& o, const Slice& k, const FieldArray& v) override { + return DB::PutFields(o, k, v); + } Status Delete(const WriteOptions& o, const Slice& key) override { return DB::Delete(o, key); } @@ -2125,6 +2128,15 @@ class ModelDB : public DB { assert(false); // Not implemented return Status::NotFound(key); } + Status GetFields(const ReadOptions& options, const Slice& key, + FieldArray* value) override { + assert(false); // Not implemented + return Status::NotFound(key); + } + std::vector FindKeysByField(Field &field) override { + assert(false); // Not implemented + return std::vector(); + } Iterator* NewIterator(const ReadOptions& options) override { if (options.snapshot == nullptr) { KVMap* saved = new KVMap; diff --git a/fielddb/encode_index.h b/fielddb/encode_index.h new file mode 100644 index 0000000..eceeab4 --- /dev/null +++ b/fielddb/encode_index.h @@ -0,0 +1,37 @@ +#ifndef ENCODE_INDEX_H +#define ENCODE_INDEX_H + +#include "leveldb/slice.h" +#include "util/coding.h" +namespace fielddb{ +using namespace leveldb; + + +struct ParsedInternalIndexKey { //key : {name : val} + Slice user_key_; + Slice name_; + Slice val_; + + ParsedInternalIndexKey() {} // Intentionally left uninitialized (for speed) + ParsedInternalIndexKey(const Slice& user_key, const Slice& name, const Slice& val) + : user_key_(user_key), name_(name), val_(val) {} +}; + +bool ParseInternalIndexKey(Slice input, ParsedInternalIndexKey* result); +void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key); + + +inline bool ParseInternalIndexKey(Slice input, ParsedInternalIndexKey* result){ + return GetLengthPrefixedSlice(&input, &result->name_) && + GetLengthPrefixedSlice(&input, &result->val_) && + GetLengthPrefixedSlice(&input, &result->user_key_); +} + +inline void AppendIndexKey(std::string* result, const ParsedInternalIndexKey& key){ + PutLengthPrefixedSlice(result, key.name_); + PutLengthPrefixedSlice(result, key.val_); + PutLengthPrefixedSlice(result, key.user_key_); +} + +} +#endif \ No newline at end of file diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp new file mode 100644 index 0000000..637046e --- /dev/null +++ b/fielddb/field_db.cpp @@ -0,0 +1,194 @@ +#include "fielddb/field_db.h" +#include +#include +#include +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/options.h" +#include "leveldb/status.h" +#include "db/write_batch_internal.h" +#include "util/serialize_value.h" +#include "fielddb/encode_index.h" + +namespace fielddb { +using namespace leveldb; +//TODO:打开fieldDB +Status FieldDB::OpenFieldDB(const Options& options, + const std::string& name, FieldDB** dbptr) { + // options.env->CreateDir("./abc") + if(*dbptr == nullptr){ + return Status::NotSupported(name, "new a fieldDb first\n"); + } + + // + Status status; + DB *indexdb, *kvdb, *metadb; + status = Open(options, name+"_indexDB", &indexdb); + if(!status.ok()) return status; + + status = Open(options, name+"_kvDB", &kvdb); + if(!status.ok()) return status; + status = Open(options, name+"_metaDB", &metadb); + if(!status.ok()) return status; + + (*dbptr)->indexDB_ = indexdb; + (*dbptr)->kvDB_ = kvdb; + (*dbptr)->metaDB_ = metadb; + (*dbptr)->dbname_ = name; + + status = (*dbptr)->Recover(); + return status; +} + +// todo +Status FieldDB::Recover() { + // + return Status::OK(); +} + +Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { + return kvDB_->Put(options, key, value); +} + +// TODO:需要对是否进行index更新做处理 +Status FieldDB::PutFields(const WriteOptions &Options, + const Slice &key, const FieldArray &fields) { + // + return kvDB_->PutFields(Options, key, fields); +} + +// todo: 删除有索引的key时indexdb也要同步 +Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { + // + return kvDB_->Delete(options, key); +} +// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 +Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { + return Status::OK(); +} + +Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { + return kvDB_->Get(options, key, value); +} + +Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { + return kvDB_->GetFields(options, key, fields); + +} + +std::vector FieldDB::FindKeysByField(Field &field) { + return kvDB_->FindKeysByField(field); +} + +std::vector> FieldDB::FindKeysAndValByFieldName ( + const std::string &fieldName){ + std::vector> result; + auto iter = kvDB_->NewIterator(ReadOptions()); + std::string val; + for(iter->SeekToFirst();iter->Valid();iter->Next()) { + InternalFieldArray fields(iter->value()); + val = fields.ValOfName(fieldName); + if(!val.empty()) { + result.push_back(std::make_pair(iter->key().ToString(), val)); + } + } + return result; +} + +Status FieldDB::CreateIndexOnField(const std::string& field_name) { + //taskQueue相关 + //写锁 是不是只需要给putfields设置一把锁就行 + + std::vector> keysAndVal = + FindKeysAndValByFieldName(field_name); + WriteBatch writeBatch; + Slice value = Slice(); + for (auto &kvPair : keysAndVal){ + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); + writeBatch.Put(indexKey, value); + } + Status s = indexDB_->Write(WriteOptions(), &writeBatch); + if (!s.ok()) return s; + + index_[field_name] = Exist; + //唤醒taskqueue + +} + +Status FieldDB::DeleteIndex(const std::string &field_name) { + //taskQueue相关 + //写锁 + std::vector> keysAndVal = + FindKeysAndValByFieldName(field_name); + WriteBatch writeBatch; + for (auto &kvPair : keysAndVal){ + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); + writeBatch.Delete(indexKey); + } + Status s = indexDB_->Write(WriteOptions(), &writeBatch); + if (!s.ok()) return s; + + index_.erase(field_name); + //唤醒taskqueue +} + +std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { + if (index_.count(field.first) == 0 || index_[field.first] != Exist){ + *s = Status::NotFound(Slice()); + return std::vector(); + } + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(Slice(), field.first, field.second)); + Iterator *indexIterator = indexDB_->NewIterator(ReadOptions()); + indexIterator->Seek(indexKey); + + std::vector result; + for (; indexIterator->Valid(); indexIterator->Next()) { + ParsedInternalIndexKey iterKey; + if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){ + if (iterKey.name_ == field.first && iterKey.val_ == field.second){ + result.push_back(iterKey.user_key_.ToString()); + continue; //查到说明在范围里,否则break + } + } + break; + } + + *s = Status::OK(); + return result; +} + +Iterator * FieldDB::NewIterator(const ReadOptions &options) { + return kvDB_->NewIterator(options); +} + +// TODO:使用统一seq进行snapshot管理 +const Snapshot * FieldDB::GetSnapshot() { + return kvDB_->GetSnapshot(); +} +// TODO:同上 +void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) { + kvDB_->ReleaseSnapshot(snapshot); +} + +bool FieldDB::GetProperty(const Slice &property, std::string *value) { + return kvDB_->GetProperty(property, value) | indexDB_->GetProperty(property, value); +} + +void FieldDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) { + uint64_t temp = 0; + kvDB_->GetApproximateSizes(range, n, sizes); + indexDB_->GetApproximateSizes(range, n, &temp); + *sizes += temp; +} + +void FieldDB::CompactRange(const Slice *begin, const Slice *end) { + kvDB_->CompactRange(begin, end); +} + +} // end of namespace \ No newline at end of file diff --git a/fielddb/field_db.h b/fielddb/field_db.h new file mode 100644 index 0000000..5c7e8d5 --- /dev/null +++ b/fielddb/field_db.h @@ -0,0 +1,68 @@ +# ifndef FIELD_DB_H +# define FIELD_DB_H + +#include "port/port_stdcxx.h" +#include "db/db_impl.h" +#include +#include +#include +#include +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" + +#include "fielddb/request.h" + +namespace fielddb { +using namespace leveldb; +class FieldDB : DB { +public: + //用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db + FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; +/*lab1的要求,作为db派生类要实现的虚函数*/ + Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; + Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; + Status Delete(const WriteOptions &options, const Slice &key) override; + Status Write(const WriteOptions &options, WriteBatch *updates) override; + Status Get(const ReadOptions &options, const Slice &key, std::string *value) override; + Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) override; + std::vector FindKeysByField(Field &field) override; + Iterator * NewIterator(const ReadOptions &options) override; + const Snapshot * GetSnapshot() override; + void ReleaseSnapshot(const Snapshot *snapshot) override; + bool GetProperty(const Slice &property, std::string *value) override; + void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override; + void CompactRange(const Slice *begin, const Slice *end) override; +/*与索引相关*/ + Status CreateIndexOnField(const std::string& field_name); + Status DeleteIndex(const std::string &field_name); + std::vector QueryByIndex(const Field &field, Status *s); + + static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr); + +private: + //根据metaDB的内容进行恢复 + Status Recover(); + +private: + std::string dbname_; + + leveldb::DB *metaDB_; + leveldb::DB *indexDB_; + leveldb::DB *kvDB_; + + enum IndexStatus{ + Creating, + Deleting, + Exist + }; + std::map index_; + leveldb::port::Mutex mutex_; // mutex for taskqueue + std::deque taskqueue_; + + std::vector> FindKeysAndValByFieldName ( + const std::string &fieldName); +}; +} // end of namespace +# endif \ No newline at end of file diff --git a/fielddb/metakv.cpp b/fielddb/metakv.cpp new file mode 100644 index 0000000..819030f --- /dev/null +++ b/fielddb/metakv.cpp @@ -0,0 +1,20 @@ +#include "fielddb/metakv.h" +#include "util/coding.h" +#include + +namespace fielddb { +using namespace leveldb; + +Slice MetaKV::metaKey() { + std::string buf; + PutLengthPrefixedSlice(&buf, Key); + PutFixed64(&buf, meta_seq); + PutFixed32(&buf, tag); + return Slice(buf); +} + +Slice MetaKV::metaValue() { + return Slice(SerializeValue(Fields)); +} + +} \ No newline at end of file diff --git a/fielddb/metakv.h b/fielddb/metakv.h new file mode 100644 index 0000000..f976830 --- /dev/null +++ b/fielddb/metakv.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include "leveldb/slice.h" +#include "util/serialize_value.h" +namespace fielddb { +using namespace leveldb; +/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ +class MetaKV { + MetaKV(Slice &Key,FieldArray Fields): + Key(Key),Fields(Fields),tag(0),meta_seq(0) { } + inline int get_seq() { return meta_seq; } + inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } + inline void setPut() { tag = PUT; } + inline void setDelete() { tag = DELETE; } + Slice metaKey(); + Slice metaValue(); +private: + enum {PUT = 0x0,DELETE = 0x1}; + uint64_t meta_seq; + uint8_t tag; + Slice &Key; + FieldArray Fields; +}; +} \ No newline at end of file diff --git a/fielddb/request.cpp b/fielddb/request.cpp new file mode 100644 index 0000000..e69de29 diff --git a/fielddb/request.h b/fielddb/request.h new file mode 100644 index 0000000..3831fed --- /dev/null +++ b/fielddb/request.h @@ -0,0 +1,25 @@ +#include +#include "port/port_stdcxx.h" +#include "util/mutexlock.h" +#include "util/serialize_value.h" +namespace fielddb { +using namespace leveldb; +// 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request +// 这个思路与write写入的思路类似 +class Request { +public: + Request(std::string *Key,std::string *Value,port::Mutex *mu): + Key(Key),Value(Value),hasFields(false),_cond(mu) { } + Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): + Key(Key),Fields(Fields),hasFields(false),_cond(mu) { } + +private: + bool done; + port::CondVar _cond; + + bool hasFields; + std::string *Key; + std::string *Value; + FieldArray *Fields; +}; +} \ No newline at end of file diff --git a/include/leveldb/db.h b/include/leveldb/db.h index a13d147..cbe69a7 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -11,6 +11,7 @@ #include "leveldb/export.h" #include "leveldb/iterator.h" #include "leveldb/options.h" +#include "util/serialize_value.h" namespace leveldb { @@ -66,6 +67,9 @@ class LEVELDB_EXPORT DB { virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0; + virtual Status PutFields(const WriteOptions&, const Slice& key, + const FieldArray& fields) = 0; + // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" // did not exist in the database. @@ -86,7 +90,11 @@ class LEVELDB_EXPORT DB { // May return some other Status on an error. virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; + + virtual Status GetFields(const ReadOptions& options, const Slice& key, + FieldArray* fields) = 0; + virtual std::vector FindKeysByField(Field &field) = 0; // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). diff --git a/test/basic_function_test.cc b/test/basic_function_test.cc new file mode 100644 index 0000000..4915fe4 --- /dev/null +++ b/test/basic_function_test.cc @@ -0,0 +1,146 @@ +#include "gtest/gtest.h" +// #include "leveldb/env.h" +// #include "leveldb/db.h" +#include "fielddb/field_db.h" +using namespace fielddb; + +constexpr int value_size = 2048; +constexpr int data_size = 128 << 20; +std::vector cities = { + "Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou", + "Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" + }; +std::vector shanghaiKeys; + +Status OpenDB(std::string dbName, FieldDB **db) { + Options options; + options.create_if_missing = true; + return FieldDB::OpenFieldDB(options, dbName, db); +} + +void ClearDB(FieldDB *db){ + //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染 + WriteOptions writeOptions; + int key_num = data_size / value_size; + for (int i = 0; i < key_num; i++) { + int key_ = i+1; + std::string key = std::to_string(key_); + Status s = db->Delete(WriteOptions(), key); + ASSERT_TRUE(s.ok()); + } +} + +void InsertFieldData(FieldDB *db) { + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(0); + + for (int i = 0; i < key_num; i++) { + int randThisTime = rand(); //确保读写一个循环只rand一次,否则随机序列会不一致 + int key_ = randThisTime % key_num+1; + std::string key = std::to_string(key_); + + std::string name = "customer#" + std::to_string(key_); + std::string address = cities[randThisTime % cities.size()]; + FieldArray fields = { + {"name", name}, + {"address", address} + }; + if (address == "Shanghai") { + shanghaiKeys.push_back(key); + } + Status s = db->PutFields(WriteOptions(), key, fields); + ASSERT_TRUE(s.ok()); + } +} + +void GetFieldData(FieldDB *db) { + ReadOptions readOptions; + int key_num = data_size / value_size; + + // 点查 + srand(0); + for (int i = 0; i < 100; i++) { + int randThisTime = rand(); + int key_ = randThisTime % key_num+1; + std::string key = std::to_string(key_); + FieldArray fields_ret; + Status s = db->GetFields(readOptions, key, &fields_ret); + ASSERT_TRUE(s.ok()); + for (const Field& pairs : fields_ret) { + if (pairs.first == "name"){ + + } else if (pairs.first == "address"){ + std::string city = pairs.second; + ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end()); + } else assert(false); + } + } +} + +void findKeysByCity(FieldDB *db) { + Field field = {"address", "Shanghai"}; + std::vector resKeys = db->FindKeysByField(field); + std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; + for (const std::string &key : resKeys){ + ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + } +} + +void findKeysByCityIndex(FieldDB *db, bool expect) { + Field field = {"address", "Shanghai"}; + Status s; + std::vector resKeys = db->QueryByIndex(field, &s); + if (expect) ASSERT_TRUE(s.ok()); + else { + ASSERT_TRUE(s.IsNotFound()); + return; + } + std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; + for (const std::string &key : resKeys){ + ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + } +} + +TEST(TestLab1, Basic) { + // DestroyDB("testdb",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + // ClearDB(db); + InsertFieldData(db); + GetFieldData(db); + findKeysByCity(db); + delete db; +} + +TEST(TestLab2, Basic) { + //destroy + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + // ClearDB(db); + shanghaiKeys.clear(); + InsertFieldData(db); + // GetFieldData(db); + // findKeysByCity(db); + db->CreateIndexOnField("address"); + findKeysByCityIndex(db, true); + db->DeleteIndex("address"); + findKeysByCityIndex(db, false); + + delete db; +} + + +int main(int argc, char** argv) { + // All tests currently run with the same read-only file limits. + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/util/serialize_value.cc b/util/serialize_value.cc new file mode 100644 index 0000000..562360b --- /dev/null +++ b/util/serialize_value.cc @@ -0,0 +1,94 @@ +#include "util/serialize_value.h" +#include +#include +#include "util/coding.h" +#include + +namespace leveldb{ +bool compareByFirst(const Field& a, const Field& b) { + return a.first < b.first; // 按字段名升序排序 +} + +std::string SerializeValue(const FieldArray& fields){ + FieldArray sortFields = fields; + std::sort(sortFields.begin(), sortFields.end(), compareByFirst); + std::string result; + for (const Field& pairs : sortFields) { + PutLengthPrefixedSlice(&result, pairs.first); + PutLengthPrefixedSlice(&result, pairs.second); + } + return result; +} + +FieldArray *ParseValue(const std::string& value_str){ + Slice valueSlice(value_str); + FieldArray *res = new FieldArray; + Slice nameSlice = Slice(); + Slice valSlice = Slice(); + std::string nameStr; + std::string valStr; + while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)){ + nameStr = nameSlice.ToString(); + + if(GetLengthPrefixedSlice(&valueSlice, &valSlice)){ + valStr = valSlice.ToString(); + res->emplace_back(nameStr, valStr); + } else { + std::cout << "name and val not match!" << std::endl; + } + nameSlice.clear(); + valSlice.clear(); + } + return res; +} + +void InternalFieldArray::Map() { + if(isMapped) return; + for(const Field& pair : fields) { + map[pair.first] = pair.second; + } + isMapped = true; +} + +std::string InternalFieldArray::Serialize() { + std::string result; + if(isMapped) { + for(auto pair : map) { + PutLengthPrefixedSlice(&result, pair.first); + PutLengthPrefixedSlice(&result, pair.second); + } + } else { + result = SerializeValue(fields); + } + return result; +} + +bool InternalFieldArray::HasField(const Field& field) { + if(isMapped) { + if(map.count(field.first) && map[field.first] == field.second) { + return true; + } + return false; + } + return std::find(fields.begin(),fields.end(),field) != fields.end(); +} + +std::string InternalFieldArray::ValOfName(const std::string &name) { + if(isMapped) { + if(map.count(name)) { + return map[name]; + } + return std::string(); + } + + for (auto iter = fields.begin(); iter != fields.end(); iter++){ + if (iter->first == name) { + return iter->second; + } else if (iter->first > name) { + return std::string(); + } + } + return std::string(); +} + +} \ No newline at end of file diff --git a/util/serialize_value.h b/util/serialize_value.h new file mode 100644 index 0000000..b769fb8 --- /dev/null +++ b/util/serialize_value.h @@ -0,0 +1,59 @@ +#ifndef STORAGE_LEVELDB_UTIL_SERIALIZE_VALUE_H_ +#define STORAGE_LEVELDB_UTIL_SERIALIZE_VALUE_H_ + +#include +#include +#include +#include +#include "leveldb/slice.h" +#include "util/coding.h" +namespace leveldb{ +using Field = std::pair; // field_name:field_value +using FieldArray = std::vector>; + +std::string SerializeValue(const FieldArray& fields); +FieldArray *ParseValue(const std::string& value_str); + +class InternalFieldArray { +public: + using FieldMap = std::map; + + InternalFieldArray(const FieldArray &fields, bool to_map = false): + fields(fields),isMapped(false) { + if(to_map) Map(); + } + + + InternalFieldArray(const Slice value_slice) { + Slice valueSlice = value_slice; + Slice nameSlice, valSlice; + while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) { + if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) { + map[nameSlice.ToString()] = valSlice.ToString(); + } else { + std::cout << "name and val not match!" << std::endl; + } + nameSlice.clear(); + valSlice.clear(); + } + isMapped = true; + } + + InternalFieldArray(const std::string& value_str) + :leveldb::InternalFieldArray(Slice(value_str)) {} + + //将vector变为用map存 + void Map(); + + std::string Serialize(); + + bool HasField(const Field& field); + std::string ValOfName(const std::string& name); + +private: + bool isMapped; + const FieldArray fields; + FieldMap map; +}; +} +#endif \ No newline at end of file diff --git a/设计文档.md b/设计文档.md new file mode 100644 index 0000000..babc37e --- /dev/null +++ b/设计文档.md @@ -0,0 +1,104 @@ +# 1. 项目概述 +leveldb中的存储原本只支持简单的字节序列,在这个项目中我们对其功能进行拓展,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。但如果仅通过字段查找数据,需要对整个数据库的遍历,不够高效,因此还要新增二级索引,提高对特定字段的查询效率。 + +本文档涵盖的设计内容只是最初的设想,实现过程中大概率会进行调整甚至重构。各部分也将在项目落实的过程中进行补充完善。 + +# 2. 功能设计 +## 2.1 字段设计 +设计目标:对value存储读取时进行序列化编码,使其支持字段。 + +实现思路:设计之初有考虑增加一些元数据(例如过滤器、字段偏移支持二分)来加速查询。但考虑到在数据库中kv的数量是十分庞大的,新加数据结构会带来巨大的空间开销。因此我们决定在这里牺牲时间换取空间,而将时间的加速放在索引中。 +在这一基础上,我们对序列化进行了简单的优化:将字段名排序后,一一调用leveldb中原本的编码方法`PutLengthPrefixedSlice`存入value。这样不会有额外的空间开销,而好处在于遍历一个value的字段时,如果得到的字段名比目标大,就可以提前结束遍历。 +``` +std::string SerializeValue(const FieldArray& fields){ + FieldArray sortFields = fields; + std::sort(sortFields.begin(), sortFields.end(), compareByFirst); + std::string result; + for (const Field& pairs : sortFields) { + PutLengthPrefixedSlice(&result, pairs.first); + PutLengthPrefixedSlice(&result, pairs.second); + } + return result; +} +``` +最终db类提供了新接口`putFields`, `getFields`,分别对传入的字段序列化后调用原来的`put`, `get`接口。 +`FindKeysByField`调用`NewIterator`遍历所有数据,field名和值符合则加入返回的key中。 + +## 2.2 二级索引 +设计目标:对某个字段或属性建立索引,提高对该字段的查询效率。需考虑索引的创建、删除、维护。 + +实现思路: 这一部分的难点主要在于,索引数据与原数据的存储需要进行隔离,不同操作之间存在同步与异步问题,还需要考虑两种数据间的一致性。为了使设计简洁化,避免不同模块耦合带来潜在的问题,我们的设计如下: +1. 总体上,我们对两种数据分别创建一个db类的对象`kvDb, indexDb`。对外的接口类`FieldDb`包含了这两个对象,提供原先的leveldb各种接口,以及新功能,并在这一层完成两个对象的管理。 +2. `kvDb`与原先的存储一致,`indexDb`中key是原key与索引字段的联合编码,value是空(详见数据结构设计),并且lab1中新增的接口也不会被调用到。 +3. 在open新的fieldDb时,会创建这两个对象并open他们。fieldDb会维护一个字符串数组`fieldWithIndex`,记录了当前哪些字段名拥有索引。一个`pair`的队列`taskQueue`, 维护了创建或者删除索引的任务请求。创建或删除索引时,把任务加入队尾并休眠。一个任务完成后唤醒队首继续下一个任务(类似write机制) +``` +class FieldDb { + Db *kvDb; + Db *indexDb; + string[] fieldWithIndex; + queue> taskQueue; + // 0建 or 1删,或把pair抽象成一个类 + + //原db所有对外接口,下面没提就调用kvDb相应函数 + bool CreateIndexOnField(const std::string& fieldName){、 + //加入taskQueue并休眠/拿写锁 + //kvdb->FindKvsByField得到数据信息 + //编码 + //kvdb->writeIndexLog写索引日志 + //indexDb->put存储索引 + //fieldWithIndex.insert(fieldName) + //唤醒taskQueue队首/释放写锁 + } + bool DeleteIndex(std::string &fieldName); //同上 + std::vector QueryByIndex(Field &field){ + //判断fieldWithIndex是否有 + //indexDb->iterator得到编码后的数据信息 + //解码返回 + } +} +``` +4. 一个创建/删除索引任务开始时,首先锁住写锁,再对`kvDb`调用`FindKvsByField`(类似lab1的`FindKeysByField`但也要返回field对应的值),对返回的数组中每个元素一一与字段名编码成新key,**合并在一个writebatch中写入日志**,再调用`indexDb`中的put或delete。当`indexdb`把所有操作执行完后,在`fieldWithIndex`中添加/删除这个`fieldName`,使得用户可以针对这个字段进行索引读,最后唤醒下一个任务,没有则释放写锁。 +5. fieldDb也提供了`put`, `get`,`getFields`, `putFields`,`delete`接口。对前三者,简单调用`kvDb`中的对应接口(不涉及索引)。 +对`putFields`,先判断是否有`fieldWithIndex`中有的字段,如果有,并对`kvDb`调用一个(多个)`put`,**但在写日志时一并加上索引日志写入**。 +`delete`逻辑一致。 +6. 针对索引的日志:为了保证两个数据库间的一致性,由`kvDb`的日志模块统一管理。这其中包含了两种chunk(kv写入和索引写入),在恢复时需要分别解析,决定往哪一个数据库中写入。索引写入的时机在4、5中的**加粗**部分,如何编码还有待设计。也就是说,`indexDb`本身的日志模块不再起到作用,项目后期可以修改关闭这一部分。 +7. 对两个数据库的其他部分,理论上每个数据库内部的其他模块不会互相影响。 + + +# 3. 数据结构设计 +具体设计模块化,实现时再具体考虑。 +`indexDb`的kv编码:**暂时考虑助教文档那种** +区分日志中kv部分和index部分:思路是在writebatch中某个地方加个标识区分,每一类的编码与各自的key编码类似**细节有待完成** + +# 4. 接口/函数设计 +`FieldDb`的对外接口函数之前已展示,这里补充一些子数据库需提供给`FieldDb`的抽象功能(暂时想到的): +``` +class Db{ + //原有部分和lab1部分 + //kvdb需要: + pair FindKvsByField(string fieldName); //搜集索引需要的数据信息 + status writeIndexLog(string fieldName, pair); //向indexDb put前先写日志 +} +``` +类内部实现的功能函数,具体实现过程中再抽象。 + +# 5. 功能测试 +1. 基本的每个接口函数调用。 +2. 创建/删除索引时的并发读写、并发创/删。 +3. 数据库的恢复检查。 +4. 性能测试。 + +# 6. 可能的挑战与解决方案 +已想到的部分在之前已阐述,其余待发现。 +mark一些原代码(db中)的修改点:recover时日志解析修改,write时的日志写入可能要合并索引的写入。 + +# 7. 分工与进度安排 + 功能 | 完成日期 | 分工 +:------|:---------|:------ +value序列化|11.19 | 李度 +fieldDb接口|11.25|陈胤遒 +lab1整体+测试|11.30|高宇菲 +fieldDb功能实现|12.10|李度 +kvdb功能实现与原代码修改|12.10|陈胤遒 +整体系统整合+测试|12.20|李度、陈胤遒、高宇菲 +性能测试|12.30|高宇菲