Browse Source

Merge pull request 'ld' (#2) from ld into main

Reviewed-on: 10225501448/leveldb_proj2#2
main
李度 8 months ago
parent
commit
09a7723f6e
28 changed files with 4779 additions and 266 deletions
  1. +3
    -1
      .gitignore
  2. +24
    -5
      3DB设计.md
  3. +21
    -3
      CMakeLists.txt
  4. +22
    -4
      benchmarks/db_bench.cc
  5. +1413
    -0
      benchmarks/db_bench_FieldDB.cc
  6. +1157
    -0
      benchmarks/db_bench_testDB.cc
  7. +15
    -2
      db/db_impl.cc
  8. +30
    -0
      db/db_impl.h
  9. +24
    -0
      fielddb/SliceHashSet.h
  10. +283
    -58
      fielddb/field_db.cpp
  11. +103
    -18
      fielddb/field_db.h
  12. +69
    -0
      fielddb/meta.cpp
  13. +56
    -0
      fielddb/meta.h
  14. +0
    -20
      fielddb/metakv.cpp
  15. +0
    -26
      fielddb/metakv.h
  16. +439
    -0
      fielddb/request.cpp
  17. +157
    -11
      fielddb/request.h
  18. +2
    -0
      include/leveldb/env.h
  19. +30
    -106
      test/basic_function_test.cc
  20. +284
    -0
      test/helper.cc
  21. +34
    -0
      test/helper.h
  22. +302
    -0
      test/parallel_test.cc
  23. +90
    -0
      test/recover_test.cc
  24. +112
    -0
      testdb/testdb.cc
  25. +72
    -0
      testdb/testdb.h
  26. +4
    -0
      util/env_posix.cc
  27. +25
    -7
      util/serialize_value.cc
  28. +8
    -5
      util/serialize_value.h

+ 3
- 1
.gitignore View File

@ -9,4 +9,6 @@ out/
# clangd
.cache/
compile_commands.json
compile_commands.json
benchmark-result/

+ 24
- 5
3DB设计.md View File

@ -16,14 +16,14 @@
如果当前有正在创建(修改)的索引或者之前的对于同一个key的索引put,则判断本次put是否含有对应的索引,如果没有则按上面一段的操作进行。如果含有,则加入之前设计中的taskqueue,在索引创建完成后会进行处理。
我觉得:索引put涉及到了indexDB和kvDB两者之间的原子性
想法:索引put涉及到了indexDB和kvDB两者之间的原子性
## 创建(删除)索引
在进行这个操作之前对metaDB写入一个标记`(field,creating/deleting)`,表示在field上创建(删除)索引操作的事务的开始。(注:这里的key是包含了时间戳或者seq的)。
之后扫描kvDB,构建相应请求,对indexDB进行写入,这里也是通过writebatch写入的。写入完成之后,将之前的标记清除,表示当前的事务结束了。之后对于taskqueue里面的请求进行处理,完成之后,唤醒taskqueue中的请求。
我觉得:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性
想法:创建(删除)索引这个操作实际上只对indexDB进行了写入请求,并不涉及indexDB和kvDB两者之间的一致性
## 索引get
如果没有索引,则返回;如果索引正在创建,则存到taskqueue中,在索引创建完成之后进行处理(这里或许也可以直接返回);如果存在索引则将请求发往indexDB。
@ -63,10 +63,29 @@ indexDB的写入情况判断如下:扫描indexDB,如果是creating操作且
不用时间戳,全部写入metaDB作为log,然后再写入kvDB和indexDB
# 整体架构
采用多线程架构
由于二级索引理论上是幂等的操作,所以或许不用taskqueue来阻塞创建之后的写入?
如果这么看的话,其实创建(删除)索引的操作也不需要
基于request(类似于writer)来处理并发的请求。对于创建和删除索引操作,包含一个pending队列,来维护会受到影响的请求。
# 有关实现的部分
1. 对于metaDB中存入数据的编码部分放在了metakv文件中
## TODO List
1. index和kv的写入应该放在两个线程中同时写入,这里为了实现的方便,暂时先后完成
2. 原版的env中的Schedule是使用单例模式,也就是所有的数据库都只有一个线程,我们这里
需要所有的数据库都有属于自己的线程,且可能不止一个,因此需要实现类似于线程池的东西
## 一些想法
1. 根据对于某一个Field搜索的频率和耗时,自动的创建索引,且这个索引会在长时间不用后被清除
# 有关KV分离的想法
复用原有的log,将log信息加入到version信息中,需要更改version edit,version等内容
log带上编号,从小到大的进行垃圾回收
垃圾回收过程中形成的新的写入保留原有的seq放入L0
KV分离核心的困难之一在于垃圾回收的并发控制。我的核心想法是在回收log的时候,不进行合并操作,将
回收得到的东西直接保留seqno放进L0。由于L0本身就是无序的,如果在垃圾回收的过程中产生了并发写入,
新的写入也只会写入到L0,这样只要等待下一次的合并就行了。

+ 21
- 3
CMakeLists.txt View File

@ -17,7 +17,7 @@ endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD)
@ -194,6 +194,12 @@ target_sources(leveldb
"util/serialize_value.cc"
"fielddb/field_db.cpp"
"fielddb/field_db.h"
"fielddb/meta.cpp"
"fielddb/meta.h"
"fielddb/request.cpp"
"fielddb/request.h"
"testdb/testdb.cc"
"testdb/testdb.h"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
@ -443,6 +449,8 @@ if(LEVELDB_BUILD_BENCHMARKS)
if(NOT BUILD_SHARED_LIBS)
leveldb_benchmark("benchmarks/db_bench.cc")
leveldb_benchmark("benchmarks/db_bench_FieldDB.cc")
leveldb_benchmark("benchmarks/db_bench_testDB.cc")
endif(NOT BUILD_SHARED_LIBS)
check_library_exists(sqlite3 sqlite3_open "" HAVE_SQLITE3)
@ -522,7 +530,17 @@ if(LEVELDB_INSTALL)
)
endif(LEVELDB_INSTALL)
add_executable(lab1_test
add_executable(basic_function_test
"${PROJECT_SOURCE_DIR}/test/basic_function_test.cc"
)
target_link_libraries(lab1_test PRIVATE leveldb gtest)
target_link_libraries(basic_function_test PRIVATE leveldb gtest)
add_executable(parallel_test
"${PROJECT_SOURCE_DIR}/test/parallel_test.cc"
)
target_link_libraries(parallel_test PRIVATE leveldb gtest)
add_executable(recover_test
"${PROJECT_SOURCE_DIR}/test/recover_test.cc"
)
target_link_libraries(recover_test PRIVATE leveldb gtest)

+ 22
- 4
benchmarks/db_bench.cc View File

@ -325,8 +325,8 @@ class Stats {
// elapsed times.
double elapsed = (finish_ - start_) * 1e-6;
char rate[100];
std::snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / elapsed);
std::snprintf(rate, sizeof(rate), "%6.1f MB/s Bytes:%6.1f elapsed(s):%6.1f seconds:%6.1f ",
(bytes_ / 1048576.0) / elapsed,(bytes_ / 1048576.0),elapsed,seconds_);
extra = rate;
}
AppendWithSpace(&extra, message_);
@ -737,6 +737,10 @@ class Benchmark {
}
shared.mu.Unlock();
// for(int i = 0; i < n; i++) {
// arg[i].thread->stats.Report(name.ToString() + "thread:" + std::to_string(i));
// }
for (int i = 1; i < n; i++) {
arg[0].thread->stats.Merge(arg[i].thread->stats);
}
@ -852,8 +856,20 @@ class Benchmark {
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
std::string name = "customer#" + std::to_string(k);
//这个字段用来查找
std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100);
//这个字段填充长度
std::string tag = gen.Generate(value_size_).ToString();
FieldArray fields = {
{"name", name},
{"age", age},
{"tag", tag}
};
std::string value = SerializeValue(fields);
batch.Put(key.slice(), value);
bytes += value.size() + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
@ -1123,6 +1139,8 @@ int main(int argc, char** argv) {
}
}
FLAGS_num /= FLAGS_threads;
leveldb::g_env = leveldb::Env::Default();
// Choose a location for the test database if none given with --db=<path>

+ 1413
- 0
benchmarks/db_bench_FieldDB.cc
File diff suppressed because it is too large
View File


+ 1157
- 0
benchmarks/db_bench_testDB.cc
File diff suppressed because it is too large
View File


+ 15
- 2
db/db_impl.cc View File

@ -1169,7 +1169,7 @@ Status DBImpl::GetFields(const ReadOptions& options, const Slice& key,
FieldArray* fields) {
std::string value;
Status s = DBImpl::Get(options, key, &value);
*fields = *ParseValue(value);
ParseValue(value,fields);
return s;
}
@ -1183,6 +1183,7 @@ std::vector DBImpl::FindKeysByField(Field &field){
result.push_back(iter->key().ToString());
}
}
delete iter;
return result;
}
@ -1234,12 +1235,18 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
w.sync = options.sync;
w.done = false;
uint64_t start_ = env_->NowMicros();
MutexLock l(&mutex_);
count ++;
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
Waiting_elapsed += env_->NowMicros() - start_;
waited_count ++;
Total_elapsed += env_->NowMicros() - start_;
// dumpStatistics();
return w.status;
}
@ -1258,6 +1265,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// into mem_.
{
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
@ -1269,6 +1277,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
BatchSize += write_batch->ApproximateSize();
write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
@ -1297,7 +1307,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
Total_elapsed += env_->NowMicros() - start_;
NoWaiting_elapsed += env_->NowMicros() - start_;
Nowaited_count ++;
// dumpStatistics();
return status;
}

+ 30
- 0
db/db_impl.h View File

@ -6,7 +6,10 @@
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <deque>
#include <ostream>
#include <set>
#include <string>
@ -210,6 +213,33 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
int count = 0;
int waited_count = 0;
int Nowaited_count = 0;
uint64_t Total_elapsed = 0;
uint64_t Waiting_elapsed = 0;
uint64_t NoWaiting_elapsed = 0;
uint64_t write_elapsed = 0;
uint64_t BatchSize = 0;
const double MB = 1024 * 1024;
const double KB = 1024;
inline void dumpStatistics() {
if(count && count % 500000 == 0) {
printf("==================================\n");
printf("Count: Total:%d Waited:%d Nowaited:%d\n",count,waited_count,Nowaited_count);
printf("%ld %ld %ld\n",Total_elapsed,Waiting_elapsed,NoWaiting_elapsed);
printf("Average Total elapsed: %lf ms\n",Total_elapsed * 1.0 / count);
printf("Average Waiting elapsed: %lf ms\n",Waiting_elapsed * 1.0 / count);
printf("For waiting request: %lf ms\n",Waiting_elapsed * 1.0 / waited_count);
printf("For Nowait request: %lf ms\n",NoWaiting_elapsed * 1.0 / Nowaited_count);
printf("Write elapsed: %lf ms\n",write_elapsed * 1.0 / Nowaited_count);
printf("Average BatchSize: %lfKB\n",BatchSize / KB / count);
printf("Average BatchSize per write:%lfKB\n",BatchSize / KB / Nowaited_count);
printf("==================================\n");
std::fflush(stdout);
}
}
};
// Sanitize db options. The caller should delete result.info_log if

+ 24
- 0
fielddb/SliceHashSet.h View File

@ -0,0 +1,24 @@
# ifndef SLICE_HASH_SET_H
# define SLICE_HASH_SET_H
#include "leveldb/slice.h"
#include "util/hash.h"
#include <unordered_set>
using namespace leveldb;
class SliceHash {
public:
uint32_t operator()(const Slice &lhs) const {
return Hash(lhs.data(),lhs.size(),0x1234);
}
};
class SliceEq {
public:
bool operator()(const Slice &lhs, const Slice &rhs) const {
return lhs == rhs;
}
};
using SliceHashSet = std::unordered_set<Slice,SliceHash,SliceEq>;
#endif

+ 283
- 58
fielddb/field_db.cpp View File

@ -1,19 +1,30 @@
#include "fielddb/field_db.h"
#include <climits>
#include <cstdint>
#include <cstdio>
#include <string>
#include <sys/types.h>
#include <utility>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "db/write_batch_internal.h"
#include "leveldb/write_batch.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
#include "field_db.h"
#include "fielddb/SliceHashSet.h"
namespace fielddb {
using namespace leveldb;
//TODO:打开fieldDB
Status FieldDB::OpenFieldDB(const Options& options,
Status FieldDB::OpenFieldDB(Options& options,
const std::string& name, FieldDB** dbptr) {
// options.env->CreateDir("./abc")
if(*dbptr == nullptr){
@ -23,11 +34,20 @@ Status FieldDB::OpenFieldDB(const Options& options,
//
Status status;
DB *indexdb, *kvdb, *metadb;
// options.block_cache = NewLRUCache(ULONG_MAX);
// options.max_open_files = 1000;
// options.write_buffer_size = 512 * 1024 * 1024;
//这里是为了让3个数据库有独立的的Background thread
options.env = getPosixEnv();
status = Open(options, name+"_indexDB", &indexdb);
if(!status.ok()) return status;
options.env = getPosixEnv();
status = Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
options.env = getPosixEnv();
status = Open(options, name+"_metaDB", &metadb);
if(!status.ok()) return status;
@ -37,38 +57,227 @@ Status FieldDB::OpenFieldDB(const Options& options,
(*dbptr)->dbname_ = name;
status = (*dbptr)->Recover();
(*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env;
return status;
}
// todo
Status FieldDB::Recover() {
//
//1. 遍历所有Index类型的meta,重建内存中的index_状态表
Iterator *Iter = indexDB_->NewIterator(ReadOptions());
std::string IndexKey;
Iter->SeekToFirst();
while(Iter->Valid()) {
IndexKey = Iter->key().ToString();
ParsedInternalIndexKey ParsedIndex;
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex);
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr};
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符)
std::string Seek;
PutLengthPrefixedSlice(&Seek, ParsedIndex.name_);
Seek.push_back(0xff);
Iter->Seek(Slice(Seek));
}
delete Iter;
//2. 寻找所有KV类型的meta,再次提交一遍请求
Iter = metaDB_->NewIterator(ReadOptions());
Slice MetaValue;
Iter->SeekToFirst();
while (Iter->Valid()) {
MetaValue = Iter->key();
MetaType type = MetaType(DecodeFixed32(MetaValue.data()));
MetaValue.remove_prefix(4);//移除头上的metaType的部分
Slice extractKey;
GetLengthPrefixedSlice(&MetaValue, &extractKey);
if(type == KV_Creating) {
FieldArray fields;
ParseValue(Iter->value().ToString(), &fields);
PutFields(WriteOptions(), extractKey, fields);
} else if(type == KV_Deleting) {
Delete(WriteOptions(), extractKey);
} else {
assert(0 && "Invalid MetaType");
}
Iter->Next();
}
delete Iter;
//在所有的请求完成后,会自动把metaDB的内容清空。
Iter = metaDB_->NewIterator(ReadOptions());
Iter->SeekToFirst();
//std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
delete Iter;
//3. 等待所有请求完成
return Status::OK();
}
Request *FieldDB::GetHandleInterval() {
mutex_.AssertHeld(); //保证队列是互斥访问的
Request *tail = taskqueue_.front();
for(auto *req_ptr : taskqueue_) {
if(req_ptr->isiDeleteReq() || req_ptr->isiCreateReq()) {
return tail;
}
tail = req_ptr;
}
return tail;
}
Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
//uint64_t start_ = env_->NowMicros();
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
while(true){
//uint64_t start_waiting = env_->NowMicros();
while(req.isPending() || !req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
//waiting_elasped += env_->NowMicros() - start_waiting;
if(req.done) {
//elapsed += env_->NowMicros() - start_;
//count ++;
// dumpStatistics();
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
SliceHashSet batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
//uint64_t start_construct = env_->NowMicros();
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break;
}
//construct_elapsed += env_->NowMicros() - start_construct;
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
//uint64_t start_write = env_->NowMicros();
if(MetaBatch.ApproximateSize() > 12) {
//uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch);
//write_meta_elapsed += env_->NowMicros() - start_meta;
//write_bytes += MetaBatch.ApproximateSize();
assert(status.ok());
}
//TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) {
//uint64_t start_index = env_->NowMicros();
status = indexDB_->Write(op, &IndexBatch);
//write_index_elapsed += env_->NowMicros() - start_index;
//write_bytes += IndexBatch.ApproximateSize();
assert(status.ok());
}
if(KVBatch.ApproximateSize() > 12) {
//uint64_t start_kv = env_->NowMicros();
status = kvDB_->Write(op, &KVBatch);
//write_kv_elapsed += env_->NowMicros() - start_kv;
//write_bytes += KVBatch.ApproximateSize();
assert(status.ok());
}
//3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) {
//uint64_t start_clean = env_->NowMicros();
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
//write_clean_elapsed += env_->NowMicros() - start_clean;
}
//write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
// {
// static int count = 0;
// if(count++ % 100000 == 0) {
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
// }
// }
while(true) {
Request *ready = taskqueue_.front();
// int debug = tail->type_;
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
if (ready != &req) ready->cond_.Signal();
}
if (ready == tail) break;
}
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
}
}
// 这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
return kvDB_->Put(options, key, value);
FieldArray FA = {{EMPTY,value.ToString()}};
return PutFields(options, key, FA);
// return kvDB_->Put(options, key, value);
}
// TODO:需要对是否进行index更新做处理
// 需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) {
//
return kvDB_->PutFields(Options, key, fields);
// std::string key_ = key.ToString();
// FieldArray fields_ = fields;
FieldsReq req(key,fields,&mutex_);
Status status = HandleRequest(req, Options);
return status;
}
// todo: 删除有索引的key时indexdb也要同步
// 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
//
return kvDB_->Delete(options, key);
// std::string key_ = key.ToString();
DeleteReq req(key,&mutex_);
Status status = HandleRequest(req, options);
return status;
}
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
// 根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
return Status::OK();
// {
// uint64_t start_ = env_->NowMicros();
// Status status = kvDB_->Write(options, updates);
// temp_elapsed += env_->NowMicros() - start_;
// count ++;
// dumpStatistics();
// return status;
// }
//uint64_t start_ = env_->NowMicros();
BatchReq req(updates,&mutex_);
//construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
Status status = HandleRequest(req, options);
return status;
}
//由于常规put将空串作为name,这里也需要适当修改
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
return kvDB_->Get(options, key, value);
FieldArray fields;
Status s = GetFields(options, key, &fields);
if(!s.ok()) {
return s;
}
*value = fields[0].second;
return s;
}
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) {
@ -81,63 +290,56 @@ std::vector FieldDB::FindKeysByField(Field &field) {
}
std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName (
const std::string &fieldName){
const Slice fieldName){
std::vector<std::pair<std::string, std::string>> result;
auto iter = kvDB_->NewIterator(ReadOptions());
std::string val;
Slice val;
for(iter->SeekToFirst();iter->Valid();iter->Next()) {
InternalFieldArray fields(iter->value());
val = fields.ValOfName(fieldName);
val = fields.ValOfName(fieldName.ToString());
if(!val.empty()) {
result.push_back(std::make_pair(iter->key().ToString(), val));
result.push_back(std::make_pair(iter->key().ToString(), val.ToString()));
}
}
delete iter;
return result;
}
Status FieldDB::CreateIndexOnField(const std::string& field_name) {
//taskQueue相关
//写锁 是不是只需要给putfields设置一把锁就行
std::vector<std::pair<std::string, std::string>> keysAndVal =
FindKeysAndValByFieldName(field_name);
WriteBatch writeBatch;
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
writeBatch.Put(indexKey, value);
Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) {
// std::string Field = field_name;
// iCreateReq req(&Field,&mutex_);
iCreateReq req(field_name,&mutex_);
HandleRequest(req, op);
//如果已经存在索引,那么直接返回
if(req.Existed) {
return req.s;
}
Status s = indexDB_->Write(WriteOptions(), &writeBatch);
if (!s.ok()) return s;
index_[field_name] = Exist;
//唤醒taskqueue
WriteBatch KVBatch,IndexBatch,MetaBatch;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
return req.s;
}
Status FieldDB::DeleteIndex(const std::string &field_name) {
//taskQueue相关
//写锁
std::vector<std::pair<std::string, std::string>> keysAndVal =
FindKeysAndValByFieldName(field_name);
WriteBatch writeBatch;
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
writeBatch.Delete(indexKey);
Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) {
// std::string Field = field_name;
iDeleteReq req(field_name,&mutex_);
HandleRequest(req, op);
//如果已经被删除或者不存在,那么可以直接返回
if(req.Deleted) {
return req.s;
}
Status s = indexDB_->Write(WriteOptions(), &writeBatch);
if (!s.ok()) return s;
index_.erase(field_name);
//唤醒taskqueue
WriteBatch KVBatch,IndexBatch,MetaBatch;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
return req.s;
}
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) {
if (index_.count(field.first) == 0 || index_[field.first] != Exist){
if (index_.count(field.first) == 0 || index_[field.first].first != Exist){
*s = Status::NotFound(Slice());
return std::vector<std::string>();
}
@ -158,11 +360,17 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) {
}
break;
}
delete indexIterator;
*s = Status::OK();
return result;
}
IndexStatus FieldDB::GetIndexStatus(const std::string &fieldName){
if (index_.count(fieldName) == 0) return IndexStatus::NotExist;
IndexStatus idxs = index_[fieldName].first;
return idxs;
}
Iterator * FieldDB::NewIterator(const ReadOptions &options) {
return kvDB_->NewIterator(options);
}
@ -191,4 +399,21 @@ void FieldDB::CompactRange(const Slice *begin, const Slice *end) {
kvDB_->CompactRange(begin, end);
}
} // end of namespace
Status DestroyDB(const std::string& name, const Options& options) {
Status s;
s = leveldb::DestroyDB(name+"_kvDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_indexDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_metaDB", options);
assert(s.ok());
return s;
}
FieldDB::~FieldDB() {
delete indexDB_;
delete kvDB_;
delete metaDB_;
}
} // namespace fielddb

+ 103
- 18
fielddb/field_db.h View File

@ -1,25 +1,44 @@
# ifndef FIELD_DB_H
# define FIELD_DB_H
#include "port/port_stdcxx.h"
#include "db/db_impl.h"
#include <cstdint>
#include <cstdio>
#include <deque>
#include <map>
#include <set>
#include <string>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "fielddb/request.h"
#include <shared_mutex>
# ifndef FIELD_DB_H
# define FIELD_DB_H
namespace fielddb {
using namespace leveldb;
const char EMPTY[1] = {0};
enum IndexStatus{
Creating,
Deleting,
Exist,
NotExist
};
class FieldDB : DB {
public:
friend class Request;
friend class FieldsReq;
friend class iCreateReq;
friend class iDeleteReq;
friend class DeleteReq;
friend class BatchReq;
//FieldDB *db = new FieldDB()openDB *db
FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {};
~FieldDB();
/*lab1的要求,作为db派生类要实现的虚函数*/
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override;
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override;
@ -35,34 +54,100 @@ public:
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override;
void CompactRange(const Slice *begin, const Slice *end) override;
/*与索引相关*/
Status CreateIndexOnField(const std::string& field_name);
Status DeleteIndex(const std::string &field_name);
Status CreateIndexOnField(const std::string& field_name, const WriteOptions &op);
Status DeleteIndex(const std::string &field_name, const WriteOptions &op);
std::vector<std::string> QueryByIndex(const Field &field, Status *s);
//
IndexStatus GetIndexStatus(const std::string &fieldName);
static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr);
static Status OpenFieldDB(Options& options,const std::string& name,FieldDB** dbptr);
private:
//metaDB的内容进行恢复
Status Recover();
private:
std::string dbname_;
leveldb::DB *kvDB_;
leveldb::DB *metaDB_;
leveldb::DB *indexDB_;
leveldb::DB *kvDB_;
std::string dbname_;
const Options *options_;
Env *env_;
using FieldName = std::string;
// index的状态,creating/deleting
std::map<FieldName, std::pair<IndexStatus,Request*>> index_;
port::Mutex index_mu;
enum IndexStatus{
Creating,
Deleting,
Exist
};
std::map<std::string, int> index_;
leveldb::port::Mutex mutex_; // mutex for taskqueue
std::deque<Request *> taskqueue_;
std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName (
const std::string &fieldName);
const Slice fieldName);
/*For request handling*/
Status HandleRequest(Request &req, const WriteOptions &op); //
Request *GetHandleInterval(); //
// private:
// int count = 0;
// int count_Batch = 0;
// int count_Batch_Sub = 0;
// uint64_t elapsed = 0;
// uint64_t construct_elapsed = 0;
// uint64_t construct_BatchReq_init_elapsed = 0;
// uint64_t construct_BatchReq_elapsed = 0;
// uint64_t construct_BatchReq_Sub_elapsed = 0;
// uint64_t construct_BatchReq_perSub_elapsed = 0;
// uint64_t construct_FieldsReq_Read_elapsed = 0;
// uint64_t write_elapsed = 0;
// uint64_t write_meta_elapsed = 0;
// uint64_t write_index_elapsed = 0;
// uint64_t write_kv_elapsed = 0;
// uint64_t write_clean_elapsed = 0;
// uint64_t write_bytes = 0;
// uint64_t write_step = 500 * 1024 * 1024;
// uint64_t write_bytes_lim = write_step;
// uint64_t temp_elapsed = 0;
// uint64_t waiting_elasped = 0;
// inline void dumpStatistics() {
// if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) {
// std::cout << "=====================================================\n";
// std::cout << "Total Count : " << count;
// std::cout << "\tTotal Write Bytes(MB) : " << write_bytes / 1048576.0 << std::endl;
// std::cout << "Average Time(ms) : " << elapsed * 1.0 / count;
// std::cout << "\tAverage Write rates(MB/s) : " << write_bytes / 1048576.0 / elapsed * 1000000 << std::endl;
// std::cout << "Construct Time(ms) : " << construct_elapsed * 1.0 / count << std::endl;
// std::cout << "\tConstruct BatchReq Init Time(ms) : " << construct_BatchReq_init_elapsed * 1.0 / count << std::endl;
// std::cout << "\tConstruct BatchReq Time(ms) : " << construct_BatchReq_elapsed * 1.0 / count << std::endl;
// std::cout << "\tConstruct BatchReq Sub Time(ms) : " << construct_BatchReq_Sub_elapsed * 1.0 / count << std::endl;
// std::cout << "\tConstruct BatchReq perSub Time(ms) : " << construct_BatchReq_perSub_elapsed * 1.0 / count_Batch_Sub << std::endl;
// std::cout << "\tConstruct FieldsReq Read Time(ms) : " << construct_FieldsReq_Read_elapsed * 1.0 / count << std::endl;
// std::cout << "Write Time(ms) : " << write_elapsed * 1.0 / count << std::endl;
// std::cout << "\tWrite Meta Time(ms) : " << write_meta_elapsed * 1.0 / count << std::endl;
// std::cout << "\tWrite Index Time(ms) : " << write_index_elapsed * 1.0 / count << std::endl;
// std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl;
// std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl;
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
// std::cout << "temp_elased : " << temp_elapsed * 1.0 / count << std::endl;
// std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl;
// // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl;
// std::cout << "=====================================================\n";
// write_bytes_lim = write_bytes + write_step;
// std::fflush(stdout);
// }
// }
};
Status DestroyDB(const std::string& name,
const Options& options);
} // end of namespace
# endif

+ 69
- 0
fielddb/meta.cpp View File

@ -0,0 +1,69 @@
#include "fielddb/meta.h"
#include "util/coding.h"
#include <string>
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
namespace fielddb {
using namespace leveldb;
// Slice MetaKV::metaKey() {
// std::string buf;
// PutLengthPrefixedSlice(&buf, Key);
// PutFixed64(&buf, meta_seq);
// PutFixed32(&buf, tag);
// return Slice(buf);
// }
// Slice MetaKV::metaValue() {
// return Slice(SerializeValue(Fields));
// }
//对于含有index field的put/delete的meta编码为 (KV|Key,Value)
void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) {
MetaKey.clear();
MetaValue.clear();
//这里的改动是为了防止潜在的段错误。原来的写法中,slice(buf)对应的buf是局部的,在函数返回后,buf被销毁
//但是slice中的指针指向的是析构的string对象的部分内存
std::string &buf = MetaKey;
PutFixed32(&buf, KV_Creating);
PutLengthPrefixedSlice(&buf, Slice(name));
// MetaKey = Slice(buf);
// MetaValue = Slice(*value);
}
void MetaKV::TransDelete(std::string &MetaKey) {
MetaKey.clear();
std::string &buf = MetaKey;
PutFixed32(&buf, KV_Deleting);
PutLengthPrefixedSlice(&buf, Slice(name));
// MetaKey = Slice(buf);
}
class CleanerHandler : public WriteBatch::Handler {
public:
WriteBatch *NeedClean;
void Put(const Slice& key, const Slice& value) override {
//将所有之前put的meta数据进行delete
NeedClean->Delete(key);
}
void Delete(const Slice& key) override {
//所有的传入的MetaBatch都是Put的
assert(0);
}
};
void MetaCleaner::Collect(WriteBatch &MetaBatch) {
if(MetaBatch.ApproximateSize() <= 12) return;
CleanerHandler Handler;
Handler.NeedClean = &NeedClean;
MetaBatch.Iterate(&Handler);
}
void MetaCleaner::CleanMetaBatch(DB *metaDB) {
if(NeedClean.ApproximateSize() <= 12) return;
metaDB->Write(WriteOptions(), &NeedClean);
}
}

+ 56
- 0
fielddb/meta.h View File

@ -0,0 +1,56 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "util/serialize_value.h"
#include "fielddb/field_db.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
// class MetaKV {
// MetaKV(Slice &Key,FieldArray Fields):
// Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
// inline int get_seq() { return meta_seq; }
// inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
// inline void setPut() { tag = PUT; }
// inline void setDelete() { tag = DELETE; }
// Slice metaKey();
// Slice metaValue();
// private:
// enum {PUT = 0x0,DELETE = 0x1};
// uint64_t meta_seq;
// uint8_t tag;
// Slice &Key;
// FieldArray Fields;
// };
enum MetaType {
//Index, //index状态的meta
KV_Creating, //index field的put的meta
KV_Deleting,
};
//(field_name,field_value)metaDB中的KV表示
class MetaKV {
public:
MetaKV(Slice field_name,Slice field_value = Slice()):
name(field_name),value(field_value) { }
void TransPut(std::string &MetaKey,std::string &MetaValue);
void TransDelete(std::string &MetaKey);
private:
Slice name;
Slice value;
};
class MetaCleaner {
public:
MetaCleaner() = default;
void Collect(WriteBatch &MetaBatch);
void CleanMetaBatch(DB *metaDB);
private:
WriteBatch NeedClean;
};
}

+ 0
- 20
fielddb/metakv.cpp View File

@ -1,20 +0,0 @@
#include "fielddb/metakv.h"
#include "util/coding.h"
#include <string>
namespace fielddb {
using namespace leveldb;
Slice MetaKV::metaKey() {
std::string buf;
PutLengthPrefixedSlice(&buf, Key);
PutFixed64(&buf, meta_seq);
PutFixed32(&buf, tag);
return Slice(buf);
}
Slice MetaKV::metaValue() {
return Slice(SerializeValue(Fields));
}
}

+ 0
- 26
fielddb/metakv.h View File

@ -1,26 +0,0 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include "leveldb/slice.h"
#include "util/serialize_value.h"
namespace fielddb {
using namespace leveldb;
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/
class MetaKV {
MetaKV(Slice &Key,FieldArray Fields):
Key(Key),Fields(Fields),tag(0),meta_seq(0) { }
inline int get_seq() { return meta_seq; }
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; }
inline void setPut() { tag = PUT; }
inline void setDelete() { tag = DELETE; }
Slice metaKey();
Slice metaValue();
private:
enum {PUT = 0x0,DELETE = 0x1};
uint64_t meta_seq;
uint8_t tag;
Slice &Key;
FieldArray Fields;
};
}

+ 439
- 0
fielddb/request.cpp View File

@ -0,0 +1,439 @@
#include "fielddb/request.h"
#include <cassert>
#include <cstdint>
#include <deque>
#include <string>
#include <unordered_set>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/field_db.h"
#include "fielddb/meta.h"
#include "request.h"
namespace fielddb {
using namespace leveldb;
//为虚函数提供最基本的实现
void Request::PendReq(Request *req) {
assert(0);
}
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
assert(0);
}
void Request::Prepare(FieldDB *DB) {
assert(0);
}
void Request::Finalize(FieldDB *DB) {
assert(0);
}
//为虚函数提供最基本的实现
bool Request::isPending() {
//pending中的请求的parent会指向所等待的请求(iCreate/iDelete)
return parent != this;
}
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
}
std::string val_str;
//uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
//DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
// FieldArray *oldFields;
FieldSliceArray oldFields;
if (s.IsNotFound()){
// oldFields = nullptr;
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
// oldFields = new FieldArray;
// oldFields = ParseValue(val_str,oldFields);
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
oldFields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
} else {
assert(0);
}
bool HasIndex = false;
bool HasOldIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中
if (!oldFields.empty()){
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasOldIndex = true;
}
//assert(0);
}
}
}
std::string scrach = SerializeValue(SliceFields);
KVBatch.Put(Slice(Key), Slice(scrach));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
std::string MetaKey,MetaValue;
std::string serialized = SerializeValue(SliceFields);
MetaKV MKV = MetaKV(Key,serialized);
MKV.TransPut(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, serialized);
//3.1对于含有索引的oldfield删除索引
if (HasOldIndex) {
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
//3.2对于含有索引的field建立索引
if (HasIndex) {
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
}
}
}
}
//优化:对于3.1,3.2中都有的索引只写一次
}
// if(oldFields) delete oldFields;
}
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录
//在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
if (s.IsNotFound()) return;
// FieldArray *Fields = new FieldArray;
// ParseValue(val_str,Fields);
FieldSliceArray Fields;
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
Fields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
KVBatch.Delete(Slice(Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的delete pend到对应的请求
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
KVBatch.Delete(Slice(Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
std::string MetaKey;
MetaKV MKV = MetaKV(Key);
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
MetaBatch.Put(MetaKey, Slice());
//3.对于含有索引的field删除索引
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
}
// delete Fields;
}
/*******iCreateReq*******/
void iCreateReq::Prepare(FieldDB *DB) {
//在index_中完成索引状态更新,在这里可以避免重复创建
DB->index_mu.AssertHeld();
if(DB->index_.count(Field.ToString())) {
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) {
//如果已经完成建立索引,则返回成功
done = true;
Existed = true;
s = Status::OK();
} else {
//如果正在创建或删除,那么进行等待
parent->PendReq(this->parent);
}
return;
}
//如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[Field.ToString()] = {IndexStatus::Creating,this};
done = true;
}
void iCreateReq::PendReq(Request *req) {
req->parent = this;
pending_list.push_back(req);
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field.ToString());
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Put(indexKey, value);
}
}
void iCreateReq::Finalize(FieldDB *DB) {
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing))
MutexLock iL(&DB->index_mu);
DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
/*******iDeleteReq*******/
void iDeleteReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld();
if(DB->index_.count(Field.ToString()) == 0) {
done = true;
Deleted = true;
s = Status::OK();
return ;
}
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) {
DB->index_[Field.ToString()] = {IndexStatus::Deleting,this};
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
parent->PendReq(this->parent);
}
}
void iDeleteReq::PendReq(Request* req) {
req->parent = this;
pending_list.push_back(req);
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Delete(indexKey);
}
}
void iDeleteReq::Finalize(FieldDB *DB) {
MutexLock iL(&DB->index_mu);
DB->index_.erase(Field.ToString());
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
// str_buf->push_back(key.ToString());
// FieldArray *field = new FieldArray;
// field = ParseValue(value.ToString(), field);
// if (field->empty()){ //batch中的value没有field
// fa_buf->push_back({{EMPTY,value.ToString()}});
// } else {
// fa_buf->push_back(*field);
// }
//默认所有WriteBatch中的东西都是有Field的!!!!!
sub_requests->emplace_back(new FieldsReq(key,value,mu));
sub_requests->back()->parent = req;
// delete field;
}
void Delete(const Slice &key) override {
// str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(key,mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
// std::deque<std::string> *str_buf;
// std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
// Handler.str_buf = &str_buf;
// Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
BatchReq::~BatchReq() {
while(!sub_requests.empty()) {
Request *req = sub_requests.front();
sub_requests.pop_front();
delete req;
}
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
SliceHashSet Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
//uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
//uint64_t start_sub = DB->env_->NowMicros();
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
// (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet);
//DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
//DB->count_Batch_Sub ++;
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
if(isPending()) {
return;
}
}
//DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
if(Sub_KVBatch.ApproximateSize() > 12) {
KVBatch.Append(Sub_KVBatch);
}
if(Sub_IndexBatch.ApproximateSize() > 12) {
IndexBatch.Append(Sub_IndexBatch);
}
if(Sub_MetaBatch.ApproximateSize() > 12) {
MetaBatch.Append(Sub_MetaBatch);
}
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
//DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_;
}
} // namespace fielddb

+ 157
- 11
fielddb/request.h View File

@ -1,25 +1,171 @@
#include <deque>
#include <string>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include <unordered_set>
// #include "fielddb/field_db.h"
#include "fielddb/SliceHashSet.h"
#ifndef REQUEST_H
#define REQUEST_H
namespace fielddb {
using namespace leveldb;
// taskqueue中的Requesttaskqueue最开始的线程处理一批Request
// write写入的思路类似
class FieldDB;
class Request {
public:
Request(std::string *Key,std::string *Value,port::Mutex *mu):
Key(Key),Value(Value),hasFields(false),_cond(mu) { }
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { }
friend class FieldDB;
enum RequestType {
FieldsReq_t,
//ValueReq_t,
iCreateReq_t,
iDeleteReq_t,
DeleteReq_t,
BatchReq_t,
};
public:
// Request(std::string *Key,std::string *Value,port::Mutex *mu):
// Key(Key),Value(Value),hasFields(false),cond_(mu) { }
// Request(std::string *Key,FieldArray *Fields,port::Mutex *mu):
// Key(Key),Fields(Fields),hasFields(true),cond_(mu) { }
Request(RequestType type,port::Mutex *mu):
type_(type),cond_(mu),done(false) { parent = this; };
//virtual ~Request();
inline bool isFieldsReq() { return type_ == FieldsReq_t; }
// inline bool isValueReq() { return type_ == ValueReq_t; }
inline bool isiCreateReq() { return type_ == iCreateReq_t; }
inline bool isiDeleteReq() { return type_ == iDeleteReq_t; }
inline bool isDeleteReq() { return type_ == DeleteReq_t; }
inline bool isBatchReq() { return type_ == BatchReq_t; }
private:
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
virtual void PendReq(Request *req);
bool isPending();
// protected:
bool done;
port::CondVar _cond;
Status s;
port::CondVar cond_;
RequestType type_;
Request *parent;
};
//field的put
class FieldsReq : public Request {
public:
FieldsReq(Slice Key,const FieldArray &Fields,port::Mutex *mu):
Key(Key),Request(FieldsReq_t,mu) {
for(auto &[name,value] : Fields) {
SliceFields.push_back({name,value});
}
};
FieldsReq(Slice Key, Slice Value,port::Mutex *mu):
Key(Key),Request(FieldsReq_t,mu) {
Slice nameSlice, valSlice;
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
SliceFields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
}
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
FieldSliceArray SliceFields;
};
//field的put
// class ValueReq : public Request {
// public:
// ValueReq(std::string *Key,std::string *Value,port::Mutex *mu):
// Key(Key),Value(Value),Request(ValueReq_t,mu) { };
// std::string *Key;
// std::string *Value;
// };
//TODO:Field什么的可能通过传引用的方式会更加好
//request
class iCreateReq : public Request {
public:
iCreateReq(Slice Field,port::Mutex *mu):
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
bool hasFields;
std::string *Key;
std::string *Value;
FieldArray *Fields;
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
bool Existed;
Slice Field;
std::deque<Request *> pending_list;
};
}
//request
class iDeleteReq : public Request {
public:
iDeleteReq(Slice Field,port::Mutex *mu):
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
bool Deleted;
Slice Field;
std::deque<Request *> pending_list;
};
//key的request
class DeleteReq : public Request {
public:
DeleteReq(Slice Key,port::Mutex *mu):
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
};
class BatchReq : public Request {
public:
BatchReq(WriteBatch *Batch,port::Mutex *mu);
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;
// std::deque<std::string> str_buf;
// std::deque<FieldArray> fa_buf;
};
}
#endif

+ 2
- 0
include/leveldb/env.h View File

@ -218,6 +218,8 @@ class LEVELDB_EXPORT Env {
virtual void SleepForMicroseconds(int micros) = 0;
};
Env* getPosixEnv();
// A file abstraction for reading sequentially through a file
class LEVELDB_EXPORT SequentialFile {
public:

+ 30
- 106
test/basic_function_test.cc View File

@ -2,139 +2,63 @@
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
using namespace fielddb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
std::vector<std::string> cities = {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou",
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
std::vector<std::string> shanghaiKeys;
Status OpenDB(std::string dbName, FieldDB **db) {
Options options;
options.create_if_missing = true;
return FieldDB::OpenFieldDB(options, dbName, db);
}
void ClearDB(FieldDB *db){
//destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染
WriteOptions writeOptions;
int key_num = data_size / value_size;
for (int i = 0; i < key_num; i++) {
int key_ = i+1;
std::string key = std::to_string(key_);
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
}
}
void InsertFieldData(FieldDB *db) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(0);
for (int i = 0; i < key_num; i++) {
int randThisTime = rand(); //确保读写一个循环只rand一次,否则随机序列会不一致
int key_ = randThisTime % key_num+1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
FieldArray fields = {
{"name", name},
{"address", address}
};
if (address == "Shanghai") {
shanghaiKeys.push_back(key);
}
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
}
}
void GetFieldData(FieldDB *db) {
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
srand(0);
for (int i = 0; i < 100; i++) {
int randThisTime = rand();
int key_ = randThisTime % key_num+1;
std::string key = std::to_string(key_);
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
ASSERT_TRUE(s.ok());
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
} else if (pairs.first == "address"){
std::string city = pairs.second;
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end());
} else assert(false);
}
}
}
void findKeysByCity(FieldDB *db) {
Field field = {"address", "Shanghai"};
std::vector<std::string> resKeys = db->FindKeysByField(field);
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
void findKeysByCityIndex(FieldDB *db, bool expect) {
Field field = {"address", "Shanghai"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (expect) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
TEST(TestLab1, Basic) {
// DestroyDB("testdb",Options());
fielddb::DestroyDB("testdb1.1",Options()); //每个测试前,先把对应名称的之前的数据库删了
FieldDB *db = new FieldDB();
if(OpenDB("testdb", &db).ok() == false) {
if(OpenDB("testdb1.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
InsertFieldData(db);
GetFieldData(db);
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
findKeysByCity(db);
DeleteFieldData(db);
GetDeleteData(db);
delete db;
}
TEST(TestLab2, Basic) {
//destroy
fielddb::DestroyDB("testdb1.2",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2", &db).ok() == false) {
if(OpenDB("testdb1.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
InsertFieldData(db);
// GetFieldData(db);
// findKeysByCity(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
findKeysByCityIndex(db, true);
db->DeleteIndex("address");
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);
db->DeleteIndex("address", op);
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);
DeleteFieldData(db);
// GetDeleteData(db);
//helper太长不再封装函数了,这里因为数据都被delete了,但索引还在,所以能QueryByIndex但返回key数量0
Field field = {"age", "20"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
ASSERT_EQ(resKeys.size(), 0);
WriteFieldData(db);
GetFieldData(db, false);
findKeysByAgeIndex(db, true);
delete db;
}

+ 284
- 0
test/helper.cc View File

@ -0,0 +1,284 @@
#include "gtest/gtest.h"
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include <random>
#include "helper.h"
using namespace fielddb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
#define AGE_RANGE 100
std::vector<std::string> cities = {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou",
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
//检查insert和queryByIndex的数据是否对应
//封装了一个线程安全的全局set
ThreadSafeSet shanghaiKeys;
ThreadSafeSet age20Keys;
//复杂的测试要注意这两个全局变量,
//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加,
//DeleteFieldData和InsertOneField会删除,
//其他测试之间有必要手动clear
const WriteOptions op;
Status OpenDB(std::string dbName, FieldDB **db) {
Options options;
options.create_if_missing = true;
return FieldDB::OpenFieldDB(options, dbName, db);
}
// void ClearDB(FieldDB *db){
// //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染
// WriteOptions writeOptions;
// int key_num = data_size / value_size;
// for (int i = 0; i < key_num; i++) {
// int key_ = i+1;
// std::string key = std::to_string(key_);
// Status s = db->Delete(WriteOptions(), key);
// ASSERT_TRUE(s.ok());
// }
// }
//只插一条特定数据的测试
void InsertOneField(FieldDB *db, std::string key = "0") {
WriteOptions writeOptions;
FieldArray fields = {
{"name", "special#" + key},
{"address", "Shanghai"},
{"age", "20"}
};
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
shanghaiKeys.insert(key);
age20Keys.insert(key);
}
//只删一条特定数据的测试
void DeleteOneField(FieldDB *db, std::string key = "0") {
WriteOptions writeOptions;
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
shanghaiKeys.erase(key);
age20Keys.erase(key);
}
//与上面对应
void GetOneField(FieldDB *db, std::string key = "0") {
ReadOptions readOptions;
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
ASSERT_TRUE(s.ok());
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
ASSERT_EQ(pairs.second, "special#" + key);
} else if (pairs.first == "address"){
ASSERT_EQ(pairs.second, "Shanghai");
} else if (pairs.first == "age"){
ASSERT_EQ(pairs.second, "20");
} else assert(false);
}
}
void InsertFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------inserting-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
//让批量写入的key>0, 单独写入的key<=0,方便测试观察
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE);
FieldArray fields = {
{"name", name},
{"address", address},
{"age", age}
};
if (address == "Shanghai") {
shanghaiKeys.insert(key);
}
if (age == "20") {
age20Keys.insert(key);
}
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
}
}
void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------deleting-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
shanghaiKeys.clear();
age20Keys.clear();
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
}
}
void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------writing-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
WriteBatch wb;
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
//让批量写入的key>0, 单独写入的key<=0,方便测试观察
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE);
FieldArray fields = {
{"name", name},
{"address", address},
{"age", age}
};
if (address == "Shanghai") {
shanghaiKeys.insert(key);
}
if (age == "20") {
age20Keys.insert(key);
}
wb.Put(key, SerializeValue(fields));
}
Status s = db->Write(writeOptions, &wb);
ASSERT_TRUE(s.ok());
}
//并发时不一定能读到,加个参数控制
void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) {
std::cout << "-------getting-------" << std::endl;
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
std::mt19937 rng(seed);
for (int i = 0; i < 100; i++) {
int randThisTime = rng();
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
if (!allowNotFound){ //必须读到
// if (!s.ok()){
// std::cout << key << std::endl;
// }
ASSERT_TRUE(s.ok());
} else { //不必须读到,但只要读到address必须正确
if(s.IsNotFound()) continue;
}
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
} else if (pairs.first == "address"){
std::string city = pairs.second;
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end());
} else if (pairs.first == "age"){
int age = std::stoi(pairs.second);
ASSERT_TRUE(age >= 0 && age < AGE_RANGE);
} else assert(false);
}
}
}
//检查对应种子有没有删除干净
//删除期间即使其他种子也不能并发写,因为即使种子不同,随机出的key可能相同
void GetDeleteData(FieldDB *db, int seed = 0) {
std::cout << "-------getting-------" << std::endl;
ReadOptions readOptions;
int key_num = data_size / value_size;
std::mt19937 rng(seed);
for (int i = 0; i < 100; i++) {
int randThisTime = rng();
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
ASSERT_TRUE(s.IsNotFound());
}
}
void findKeysByCity(FieldDB *db) {
std::cout << "-------getting field address-------" << std::endl;
Field field = {"address", "Shanghai"};
std::vector<std::string> resKeys = db->FindKeysByField(field);
//打印比较,因为shanghaikey可能被后写入的、其他address的key覆盖,打印出的后一个数应该小于前一个数
//如果随机种子相同,每次打印出的两个数也应该相同
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
// haveIndex表明数据库有没有该索引(address)
void findKeysByCityIndex(FieldDB *db, bool haveIndex) {
std::cout << "-------getting field address by index-------" << std::endl;
Field field = {"address", "Shanghai"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (haveIndex) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较
for (const std::string &key : resKeys){
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
void findKeysByAgeIndex(FieldDB *db, bool haveIndex) {
std::cout << "-------getting field age by index-------" << std::endl;
Field field = {"age", "20"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (haveIndex) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_TRUE(age20Keys.haveKey(key));
}
}
void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") {
Field field;
if (fieldName == "address") field = {"address", "Shanghai"};
else if (fieldName == "age") field = {"age", "20"};
else assert(0);//只支持这两个字段检查
Status s;
std::vector<std::string> resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据
std::vector<std::string> resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据
std::sort(resKeys1.begin(), resKeys1.end());
std::sort(resKeys2.begin(), resKeys2.end());
std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl;
ASSERT_EQ(resKeys1, resKeys2);
}

+ 34
- 0
test/helper.h View File

@ -0,0 +1,34 @@
#include "fielddb/field_db.h"
using namespace fielddb;
class ThreadSafeSet
{
private:
std::set<std::string> keys;
std::mutex setMutex;
public:
ThreadSafeSet(){}
void insert(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.insert(key);
}
void erase(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.erase(key);
}
void clear(){
std::lock_guard<std::mutex> lock(setMutex);
keys.clear();
}
size_t size(){
std::lock_guard<std::mutex> lock(setMutex);
return keys.size();
}
bool haveKey(std::string key){
return std::find(keys.begin(), keys.end(), key) != keys.end();
}
};

+ 302
- 0
test/parallel_test.cc View File

@ -0,0 +1,302 @@
#include "gtest/gtest.h"
#include <thread>
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
using namespace fielddb;
// 测试中read/write都表示带索引的读写
//读写有索引数据的并发
TEST(TestReadPut, Parallel) {
fielddb::DestroyDB("testdb2.1",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
//二写三读
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0) {//写随机序列0
threads[i] = std::thread(InsertFieldData, db, 0);
} else if (i == 1)
{//写随机序列1
threads[i] = std::thread(InsertFieldData, db, 1);
} else {//读
bool allowNotFound = true;
threads[i] = std::thread(GetFieldData, db, allowNotFound, 0);
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
// 此时写已完成,一定能读到两次写
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
GetFieldData(db, allowNotFound, 1);
findKeysByCity(db);
checkDataInKVAndIndex(db);
delete db;
}
//创建索引与写有该索引数据的并发
TEST(TestPutCreatei, Parallel) {
fielddb::DestroyDB("testdb2.2",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
InsertFieldData(db);
int thread_num_ = 2;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0) {//创建索引
threads[i] = std::thread([db](){
db->CreateIndexOnField("address", op);
std::cout << "finish create index\n";
});
} else {//写
threads[i] = std::thread([db](){
while (db->GetIndexStatus("address") == NotExist){
continue; //开始创建了再并发的写
}
InsertOneField(db); //先插一条
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查索引是否创建成功
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
//检查写入是否成功
GetOneField(db);
checkDataInKVAndIndex(db);
delete db;
}
//创建删除不同索引的并发
TEST(TestCreateiCreatei, Parallel) {
fielddb::DestroyDB("testdb2.3",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.3", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
InsertFieldData(db);
int thread_num_ = 3;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
//3线程并发创建索引address
threads[i] = std::thread([db](){
db->CreateIndexOnField("address", op);
std::cout << "finish create index address\n";
});
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查索引是否创建成功
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
findKeysByAgeIndex(db, false);
checkDataInKVAndIndex(db);
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0 || i == 1) {//2线程删除索引address
threads[i] = std::thread([db](){
db->DeleteIndex("address", op);
std::cout << "finish delete index address\n";
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
db->CreateIndexOnField("age", op);
std::cout << "finish create index age\n";
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);
delete db;
}
//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性
TEST(TestPutDeleteOne, Parallel) {
fielddb::DestroyDB("testdb2.4",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.4", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 20;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
if (i % 2 == 0) {//2线程删除索引address
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
InsertOneField(db, std::to_string(j));
}
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
DeleteOneField(db, std::to_string(j));
}
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
//有索引时,put与delete的并发,确保kvdb和indexdb的一致性
TEST(TestPutDelete, Parallel) {
fielddb::DestroyDB("testdb2.5",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.5", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 4;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){InsertFieldData(db);});
threads[1] = std::thread([db](){InsertFieldData(db, 1);});
threads[2] = std::thread([db](){DeleteFieldData(db);});
threads[3] = std::thread([db](){DeleteFieldData(db, 1);});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
//write和其他功能的并发(大杂烩
TEST(TestWrite, Parallel) {
fielddb::DestroyDB("testdb2.6",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.6", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address", op);
InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){db->CreateIndexOnField("age", op);});
threads[1] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue; //开始创建了再并发的写
}
InsertFieldData(db);});
threads[2] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
WriteFieldData(db, 1);});
threads[3] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
DeleteFieldData(db, 0);});
threads[4] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
db->DeleteIndex("age", op);});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上
//删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性
//checkDataInKVAndIndex(db, "age");
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 90
- 0
test/recover_test.cc View File

@ -0,0 +1,90 @@
#include "gtest/gtest.h"
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
#include <thread>
#include <csignal>
#include <exception>
using namespace fielddb;
TEST(TestNormalRecover, Recover) {
fielddb::DestroyDB("testdb3.1",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb3.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
InsertFieldData(db);
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
delete db;
db = new FieldDB();
if(OpenDB("testdb3.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
//仍然能读到之前写入的数据和索引
GetFieldData(db, allowNotFound);
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
}
TEST(TestParalRecover, Recover) {
//第一次运行
// fielddb::DestroyDB("testdb3.2",Options());
// FieldDB *db = new FieldDB();
// if(OpenDB("testdb3.2", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// db->CreateIndexOnField("address", op);
// db->CreateIndexOnField("age", op);
// int thread_num_ = 4;
// std::vector<std::thread> threads(thread_num_);
// threads[0] = std::thread([db](){
// InsertFieldData(db);
// });
// threads[1] = std::thread([db](){
// WriteFieldData(db);
// });
// threads[2] = std::thread([db](){
// DeleteFieldData(db);
// });
// threads[3] = std::thread([db](){
// InsertOneField(db);
// delete db;
// });
// for (auto& t : threads) {
// if (t.joinable()) {
// t.join();
// }
// }
//线程3导致了其他线程错误,测试会终止(模拟数据库崩溃)
//这会导致各线程在各种奇怪的时间点崩溃
//第二次运行注释掉上面的代码,运行下面的代码测试恢复
//第二次运行
FieldDB *db = new FieldDB();
if(OpenDB("testdb3.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
GetOneField(db);
checkDataInKVAndIndex(db);
//这里会出现两个数字,如果>1说明除了线程3插入的一条数据,其他线程也有数据在崩溃前被正确恢复了
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 112
- 0
testdb/testdb.cc View File

@ -0,0 +1,112 @@
#include "testdb/testdb.h"
#include "db/db_impl.h"
#include <memory>
#include "leveldb/status.h"
#include "testdb.h"
using namespace testdb;
Status testDB::OpentestDB(Options& options,
const std::string& name, testDB** dbptr) {
// options.env->CreateDir("./abc")
if(*dbptr == nullptr){
return Status::NotSupported(name, "new a testDb first\n");
}
//
Status status;
DB *indexdb, *kvdb, *metadb;
// options.block_cache = NewLRUCache(ULONG_MAX);
// options.max_open_files = 1000;
// options.write_buffer_size = 512 * 1024 * 1024;
// options.env = getPosixEnv();
// status = Open(options, name+"_indexDB", &indexdb);
// if(!status.ok()) return status;
// (*dbptr)->indexDB_ = indexdb;
// options.env = getPosixEnv();
status = DB::Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
(*dbptr)->kvDB_ = kvdb;
// options.env = getPosixEnv();
// status = Open(options, name+"_metaDB", &metadb);
// if(!status.ok()) return status;
// (*dbptr)->metaDB_ = metadb;
(*dbptr)->dbname_ = name;
// status = (*dbptr)->Recover();
(*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env;
return status;
}
Status testDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
return kvDB_->Put(options, key, value);
}
Status testDB::PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) {
return Status::OK();
}
Status testDB::Delete(const WriteOptions &options, const Slice &key) {
return kvDB_->Delete(options, key);
}
Status testDB::Write(const WriteOptions &options, WriteBatch *updates) {
return kvDB_->Write(options, updates);
}
Status testDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
return kvDB_->Get(options, key, value);
}
Status testDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) {
return Status::OK();
}
std::vector<std::string> testDB::FindKeysByField(Field &test) {
return std::vector<std::string>();
}
Iterator * testDB::NewIterator(const ReadOptions &options) {
return kvDB_->NewIterator(options);
}
const Snapshot * testDB::GetSnapshot() {
return kvDB_->GetSnapshot();
}
void testDB::ReleaseSnapshot(const Snapshot *snapshot) {
kvDB_->ReleaseSnapshot(snapshot);
}
bool testDB::GetProperty(const Slice &property, std::string *value) {
return kvDB_->GetProperty(property, value);
}
void testDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) {
kvDB_->GetApproximateSizes(range, n, sizes);
}
void testDB::CompactRange(const Slice *begin, const Slice *end) {
kvDB_->CompactRange(begin, end);
}
Status testdb::DestroyDB(const std::string& name, const Options& options) {
Status s;
s = leveldb::DestroyDB(name+"_kvDB", options);
assert(s.ok());
// s = leveldb::DestroyDB(name+"_indexDB", options);
// assert(s.ok());
// s = leveldb::DestroyDB(name+"_metaDB", options);
// assert(s.ok());
return s;
}
testDB::~testDB() {
delete kvDB_;
// delete indexDB_;
// delete metaDB_;
}

+ 72
- 0
testdb/testdb.h View File

@ -0,0 +1,72 @@
#include "port/port_stdcxx.h"
#include "db/db_impl.h"
#include <cstdint>
#include <cstdio>
#include <deque>
#include <map>
#include <set>
#include <string>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include <shared_mutex>
# ifndef test_DB_H
# define test_DB_H
namespace testdb {
using namespace leveldb;
enum IndexStatus{
Creating,
Deleting,
Exist,
NotExist
};
class testDB {
private:
leveldb::DB *kvDB_;
// leveldb::DB *metaDB_;
// leveldb::DB *indexDB_;
std::string dbname_;
const Options *options_;
Env *env_;
public:
friend class Request;
friend class testsReq;
friend class iCreateReq;
friend class iDeleteReq;
friend class DeleteReq;
friend class BatchReq;
//testDB *db = new testDB()openDB *db
// testDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {};
testDB() : kvDB_(nullptr) { }
~testDB();
/*lab1的要求,作为db派生类要实现的虚函数*/
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) ;
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) ;
Status Delete(const WriteOptions &options, const Slice &key) ;
Status Write(const WriteOptions &options, WriteBatch *updates) ;
Status Get(const ReadOptions &options, const Slice &key, std::string *value) ;
Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) ;
std::vector<std::string> FindKeysByField(Field &test) ;
Iterator * NewIterator(const ReadOptions &options) ;
const Snapshot * GetSnapshot() ;
void ReleaseSnapshot(const Snapshot *snapshot) ;
bool GetProperty(const Slice &property, std::string *value) ;
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) ;
void CompactRange(const Slice *begin, const Slice *end) ;
static Status OpentestDB(Options& options,const std::string& name,testDB** dbptr);
};
Status DestroyDB(const std::string& name,
const Options& options);
} // end of namespace
# endif

+ 4
- 0
util/env_posix.cc View File

@ -923,4 +923,8 @@ Env* Env::Default() {
return env_container.env();
}
Env* getPosixEnv() {
return new PosixEnv;
}
} // namespace leveldb

+ 25
- 7
util/serialize_value.cc View File

@ -3,6 +3,7 @@
#include <string>
#include "util/coding.h"
#include <iostream>
#include "leveldb/slice.h"
namespace leveldb{
bool compareByFirst(const Field& a, const Field& b) {
@ -20,9 +21,25 @@ std::string SerializeValue(const FieldArray& fields){
return result;
}
FieldArray *ParseValue(const std::string& value_str){
std::string SerializeValue(const FieldSliceArray& fields) {
using pss = std::pair<Slice,Slice>;
FieldSliceArray sortFields = fields;
std::sort(sortFields.begin(), sortFields.end(),
[&](const pss &lhs, const pss &rhs) {
return lhs.first.compare(rhs.first);
});
std::string result;
for (const pss& pairs : sortFields) {
PutLengthPrefixedSlice(&result, pairs.first);
PutLengthPrefixedSlice(&result, pairs.second);
}
return result;
}
FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){
Slice valueSlice(value_str);
FieldArray *res = new FieldArray;
// FieldArray *res = new FieldArray;
FieldArray *res = fields;
Slice nameSlice = Slice();
Slice valSlice = Slice();
std::string nameStr;
@ -34,7 +51,8 @@ FieldArray *ParseValue(const std::string& value_str){
valStr = valSlice.ToString();
res->emplace_back(nameStr, valStr);
} else {
std::cout << "name and val not match!" << std::endl;
std::cout << "name and val not match! From ParseValue" << std::endl;
assert(0);
}
nameSlice.clear();
valSlice.clear();
@ -73,22 +91,22 @@ bool InternalFieldArray::HasField(const Field& field) {
return std::find(fields.begin(),fields.end(),field) != fields.end();
}
std::string InternalFieldArray::ValOfName(const std::string &name) {
Slice InternalFieldArray::ValOfName(const std::string &name) {
if(isMapped) {
if(map.count(name)) {
return map[name];
}
return std::string();
return Slice();
}
for (auto iter = fields.begin(); iter != fields.end(); iter++){
if (iter->first == name) {
return iter->second;
} else if (iter->first > name) {
return std::string();
return Slice();
}
}
return std::string();
return Slice();
}
}

+ 8
- 5
util/serialize_value.h View File

@ -3,6 +3,7 @@
#include <iostream>
#include <string>
#include <utility>
#include <vector>
#include <map>
#include "leveldb/slice.h"
@ -10,13 +11,15 @@
namespace leveldb{
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
using FieldSliceArray = std::vector<std::pair<Slice, Slice>>;
std::string SerializeValue(const FieldArray& fields);
FieldArray *ParseValue(const std::string& value_str);
std::string SerializeValue(const FieldSliceArray& fields);
FieldArray *ParseValue(const std::string& value_str, FieldArray *fields);
class InternalFieldArray {
public:
using FieldMap = std::map<std::string,std::string>;
using FieldMap = std::map<std::string,Slice>;
InternalFieldArray(const FieldArray &fields, bool to_map = false):
fields(fields),isMapped(false) {
@ -29,9 +32,9 @@ public:
Slice nameSlice, valSlice;
while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) {
if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) {
map[nameSlice.ToString()] = valSlice.ToString();
map[nameSlice.ToString()] = valSlice;
} else {
std::cout << "name and val not match!" << std::endl;
std::cout << "name and val not match! From InternalFieldArray" << std::endl;
}
nameSlice.clear();
valSlice.clear();
@ -48,7 +51,7 @@ public:
std::string Serialize();
bool HasField(const Field& field);
std::string ValOfName(const std::string& name);
Slice ValOfName(const std::string& name);
private:
bool isMapped;

Loading…
Cancel
Save