浏览代码

较为完整的kv分离实现

kvsep_cyq
cyq 8 个月前
父节点
当前提交
c58cbc12aa
共有 8 个文件被更改,包括 272 次插入12 次删除
  1. +215
    -2
      db/db_impl.cc
  2. +11
    -1
      db/db_impl.h
  3. +14
    -2
      db/version_set.cc
  4. +3
    -0
      db/version_set.h
  5. +2
    -0
      include/leveldb/options.h
  6. +3
    -1
      kv_sep/kvlog.cc
  7. +6
    -1
      kv_sep/kvlog.h
  8. +18
    -5
      test/test.cc

+ 215
- 2
db/db_impl.cc 查看文件

@ -4,6 +4,7 @@
#include "db/db_impl.h"
#include "db/version_edit.h"
#include <algorithm>
#include <atomic>
#include <cassert>
@ -27,12 +28,16 @@
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
#include "port/port_stdcxx.h"
#include "port/thread_annotations.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
@ -271,6 +276,9 @@ void DBImpl::RemoveObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kKVLogFile:
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
@ -542,7 +550,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != nullptr) {
if (base != nullptr && !only_Level0) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
@ -550,6 +558,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
}
edit->AddKVLogs(imm_kvlogfile_number);
pending_outputs_.erase(imm_kvlogfile_number);
imm_kvlogfile_number = 0;
delete imm_kvlogfile_;
@ -718,6 +727,154 @@ void DBImpl::BackgroundCall() {
background_work_finished_signal_.SignalAll();
}
bool DBImpl::CollectKVLogs() {
mutex_.AssertHeld();
Version *current = versions_->current();
current->Ref();
int total_files = 0;
for(int i = 0; i < config::kNumLevels; i++) {
total_files += current->NumFiles(i);
}
//这个判断保证了所有被选择回收的kvlog都是不对应于mem、imm、L0文件的
if(current->kvlogs_.size() < total_files + 10) {
std::cout << "kvlogs not enough : " << current->kvlogs_.size() << " " << total_files << std::endl;
current->Unref();
return false;
}
Log(options_.info_log,"CollectKVLogs Start\n");
std::vector<FileMetaData*> &kvlogs = current->kvlogs_;
FileMetaData metaSST, metaKVLog;
metaSST.number = versions_->NewFileNumber();
metaKVLog.number = versions_->NewFileNumber();
pending_outputs_.insert(metaSST.number);
pending_outputs_.insert(metaKVLog.number);
// WritableFile *fileSST;
// env_->NewWritableFile(TableFileName(dbname_, metaSST.number),&fileSST);
// TableBuilder *builder = new TableBuilder(options_,fileSST);
WritableFile *fileKVLog;
env_->NewWritableFile(KVLogFileName(dbname_, metaKVLog.number), &fileKVLog);
KVLog kvlog_writer(fileKVLog,metaKVLog.number);
FilePointer fp;
uint32_t seed;
SequenceNumber latest_sequence;
ReadOptions ro;
ro.decode = false;
Iterator *deep_iter = NewInternalDeepIterator(ro,&latest_sequence,&seed);
// Iterator *iter = NewDBIterator(this, user_comparator(), deep_iter, latest_sequence, seed);
uint64_t budget = 10000;
SequenceNumber limit = latest_sequence + budget;
SequenceNumber now_seq = latest_sequence;
// std::cout << limit << " " << latest_sequence << std::endl;
versions_->SetLastSequence(limit);
only_Level0 = true; //保证后续的小合并的结果只会在level0
mutex_.Unlock();
std::string saved_key_;
// ParsedInternalKey key_in_kvlog;
ParsedInternalKey key_in_sst;
WriteBatch write_batch,kp_batch;
MemTable *tempMem = new MemTable(internal_comparator_);
tempMem->Ref();
std::set<uint64_t> kvlogs_to_remove;
bool has_collected = false;
for(int i = 0; i < kvlogs.size(); i++) {
FileMetaData *f = kvlogs[i];
SequentialFile *file;
env_->NewSequentialFile(KVLogFileName(dbname_, f->number), &file);
KVLogReader reader(file);
for(int cnt = 0; reader.Valid() && now_seq <= limit; reader.Next(), cnt ++) {
// std::cout << "find KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
if(reader.Type() == kTypeDeletion) {
// std::cout << "is delete record\n";
continue;
}
saved_key_.clear();
AppendInternalKey(&saved_key_,
ParsedInternalKey(reader.Key(),latest_sequence,kValueTypeForSeek));
deep_iter->Seek(saved_key_);
// ParseInternalKey(reader.Key(), &key_in_kvlog);
ParseInternalKey(deep_iter->key(), &key_in_sst);
//当前的key已经由于合并没有了,kvlog中的记录失效
if(internal_comparator_.user_comparator()->Compare(reader.Key(), key_in_sst.user_key) != 0) {
// std::cout << "has been compacted\n";
continue;
}
//如果搜索到的key是delete,或者key的sequence比kvlog里面的大,那么kvlog中的就是失效状态
//这里不可能搜到比kvlog里面还要小的情况
if(key_in_sst.type == kTypeDeletion) {
// std::cout << "key delete \n";
continue;
}
if(key_in_sst.sequence > reader.Seq()) {
// printf("sst seq larger: sst:%ld kvlog:%ld\n",key_in_sst.sequence,reader.Seq());
continue;
}
has_collected = true;
// std::cout << "collected KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
if((limit - now_seq) % 100 == 0) {
std::cout << "collected KV: " << reader.Value().ToString() << " " << reader.Seq() << " " << cnt << "\n";
std::cout << "rest budget : " << limit - now_seq << "\n";
}
write_batch.Clear();
kp_batch.Clear();
write_batch.Put(reader.Key(), reader.Value());
now_seq += 1;
WriteBatchInternal::SetSequence(&write_batch, now_seq);
kvlog_writer.AddRecord(WriteBatchInternal::Contents(&write_batch), fp);
WriteBatchInternal::ConstructKPBatch(&kp_batch, &write_batch, fp);
WriteBatchInternal::InsertInto(&kp_batch, tempMem);
}
if(reader.Valid()) break;
kvlogs_to_remove.insert(f->number);
delete file;
// break;//当前一次只回收一个kvlog
}
VersionEdit edit;
Iterator *miter = nullptr;
if(has_collected) {
miter = tempMem->NewIterator();
Status s = BuildTable(dbname_, env_, options_,
table_cache_, miter, &metaSST);
}
mutex_.Lock();
if(has_collected) {
edit.AddFile(0, metaSST.number, metaSST.file_size,
metaSST.smallest, metaSST.largest);
edit.AddKVLogs(metaKVLog.number);
}
for(auto num : kvlogs_to_remove) {
edit.RemoveKVLogs(num);
}
Log(options_.info_log,"Add SST at Level0 : %ld\n",metaSST.number);
for(auto num : kvlogs_to_remove) {
Log(options_.info_log,"Collect kvlog : %ld\n",num);
printf("Collect kvlog : %ld\n",num);
}
Log(options_.info_log,"Merge to kvlog : %ld\n",metaKVLog.number);
versions_->LogAndApply(&edit, &mutex_);
delete miter;
tempMem->Unref();
delete deep_iter;
delete fileKVLog;
pending_outputs_.erase(metaSST.number);
pending_outputs_.erase(metaKVLog.number);
only_Level0 = false;
current->Unref();
return true;
}
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
@ -726,6 +883,10 @@ void DBImpl::BackgroundCompaction() {
return;
}
if(CollectKVLogs()) {
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
@ -1091,6 +1252,20 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
delete state;
}
struct DeepIterState {
port::Mutex* const mu;
Version* const version GUARDED_BY(mu);
DeepIterState(port::Mutex* mutex, Version* version)
: mu(mutex),version(version) {}
};
static void CleanupDeepIteratorState(void* arg1, void* args) {
DeepIterState* state = reinterpret_cast<DeepIterState*>(arg1);
state->mu->AssertHeld();
state->version->Unref();
delete state;
}
} // anonymous namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
@ -1120,6 +1295,33 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
return internal_iter;
}
//在初始化和析构的时候都要保证mutex_.AssertHeld()
Iterator* DBImpl::NewInternalDeepIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {
mutex_.AssertHeld();
// mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
versions_->current()->AddDeepIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
// IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
// internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
DeepIterState *cleanup = new DeepIterState(&mutex_,versions_->current());
internal_iter->RegisterCleanup(CleanupDeepIteratorState, cleanup, nullptr);
*seed = ++seed_;
// mutex_.Unlock();
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
uint32_t ignored_seed;
@ -1292,6 +1494,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
@ -1333,7 +1536,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
// versions_->SetLastSequence(last_sequence);
}
while (true) {
@ -1355,6 +1558,15 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// NoWaiting_elapsed += env_->NowMicros() - start_;
// Nowaited_count ++;
// dumpStatistics();
// SequentialFile *file;
// env_->NewSequentialFile(KVLogFileName(dbname_, kvlogfile_number_), &file);
// KVLogReader reader(file);
// std::cout << "==============tail check begin==============\n";
// for( ;reader.Valid(); reader.Next()) {
// printf("key:%s, value:%s, seq:%ld\n",reader.Key().ToString().c_str(),reader.Value().ToString().c_str(),reader.Seq());
// }
// std::cout << "==============tail check end==============\n";
// delete file;
return status;
}
@ -1675,6 +1887,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
if (ParseFileName(filenames[i], &number, &type) &&
type != kDBLockFile) { // Lock file will be deleted at end
Status del = env->RemoveFile(dbname + "/" + filenames[i]);
// std::cout << "remove file : " << filenames[i] << std::endl;
if (result.ok() && !del.ok()) {
result = del;
}

+ 11
- 1
db/db_impl.h 查看文件

@ -18,6 +18,7 @@
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
@ -85,7 +86,7 @@ class DBImpl : public DB {
// bytes.
void RecordReadSample(Slice key);
Slice GetValueFromFP(const FilePointer &fp,std::string *value);
private:
friend class DB;
@ -121,6 +122,10 @@ class DBImpl : public DB {
SequenceNumber* latest_snapshot,
uint32_t* seed);
Iterator* NewInternalDeepIterator(const ReadOptions&,
SequenceNumber* latest_snapshot,
uint32_t* seed);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
@ -166,6 +171,10 @@ class DBImpl : public DB {
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/*about kvlogs*/
Slice GetValueFromFP(const FilePointer &fp,std::string *value);
int triggers = 0;
bool CollectKVLogs();
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
@ -202,6 +211,7 @@ class DBImpl : public DB {
KVLog *kvlog_;
WritableFile* imm_kvlogfile_;
uint64_t imm_kvlogfile_number;
bool only_Level0 GUARDED_BY(mutex_) = false;
/*kvlog end*/
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.

+ 14
- 2
db/version_set.cc 查看文件

@ -246,6 +246,18 @@ void Version::AddIterators(const ReadOptions& options,
}
}
void Version::AddDeepIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {
@ -737,12 +749,12 @@ class VersionSet::Builder {
//合并一个版本的kvlog以及当前edit的新增,但是不包含被删除的kvlog
for(const auto &kvlog : base_->kvlogs_) {
if(!deleted_kvlogs.count(kvlog->number)) continue;
if(deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}
for(const auto &kvlog : added_kvlogs) {
if(!deleted_kvlogs.count(kvlog->number)) continue;
if(deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}

+ 3
- 0
db/version_set.h 查看文件

@ -15,6 +15,7 @@
#ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_
#define STORAGE_LEVELDB_DB_VERSION_SET_H_
#include "db/db_impl.h"
#include <cstdint>
#include <map>
#include <set>
@ -69,6 +70,7 @@ class Version {
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
void AddDeepIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
@ -118,6 +120,7 @@ class Version {
private:
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
class LevelFileNumIterator;

+ 2
- 0
include/leveldb/options.h 查看文件

@ -162,6 +162,8 @@ struct LEVELDB_EXPORT ReadOptions {
// not have been released). If "snapshot" is null, use an implicit
// snapshot of the state at the beginning of this read operation.
const Snapshot* snapshot = nullptr;
bool decode = true; //FilePointer解析成为实际的value
};
// Options that control write operations

+ 3
- 1
kv_sep/kvlog.cc 查看文件

@ -49,6 +49,7 @@ Status KVLog::AddRecord(const Slice &slice, FilePointer &fp) {
void KVLogReader::Next() {
if(input.empty()) {
NextWriteBatch();
if(!Valid()) return;
}
NextKV();
}
@ -72,11 +73,12 @@ void KVLogReader::NextWriteBatch() {
valid = false;
return;
}
seq = DecodeFixed64(input.data());
seq = DecodeFixed64(input.data()) - 1;//-1是为了和后面的nextkv的实现对齐
input.remove_prefix(12); //remove writebatch header
}
void KVLogReader::NextKV() {
seq++;
type = (ValueType)input[0];
input.remove_prefix(1);
switch (type) {

+ 6
- 1
kv_sep/kvlog.h 查看文件

@ -40,12 +40,17 @@ class KVLogReader {
public:
KVLogReader(SequentialFile *file):file(file),rep_(nullptr),rep_size(0),valid(true)
{ Next();};
~KVLogReader() {
delete rep_;
}
ValueType Type() {return type;}
Slice Key() {return key;}
Slice Value() {return value;}
SequenceNumber Seq() {return seq;}
bool Valid() {return valid;}
void Next();
void Valid();
private:
void NextWriteBatch();
void NextKV();

+ 18
- 5
test/test.cc 查看文件

@ -1,5 +1,6 @@
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "util/random.h"
#include <bits/stdc++.h>
#include <string>
@ -10,8 +11,8 @@ int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
DestroyDB("testdb", Options());
Status status = DB::Open(op, "testdb", &db);
DestroyDB("testdb_kv", Options());
Status status = DB::Open(op, "testdb_kv", &db);
// assert(status.ok());
// string key = "leveldb",value = to_string(0);
@ -52,9 +53,21 @@ int main() {
// cout<<"without snapshot:"<<result<<endl;
// db->ReleaseSnapshot(snapshot);
string key = "abc",value = "leveldb";
for(int i = 0; i < 20; i++) {
db->Put(WriteOptions(),key + std::to_string(i),value + std::to_string(i));
string key,value = "leveldb";
key.resize(1000,'a');
// value.resize(10,'b');
value += "index";
Random ran(0);
for(int i = 0; i < 100000; i++) {
for(int j = 0; j < 20; j++) {
int rand = ran.Uniform(200 * 10);
string tk = key + std::to_string(rand);
string tv = value + std::to_string(rand) + "_" + std::to_string(i);
db->Put(WriteOptions(),tk,tv);
}
if(i && i%1000 ==0) {
std::cout << "iteration:" << i << std::endl;
}
}
std::string res;
for(int i = 0; i < 10; i++) {

正在加载...
取消
保存