Selaa lähdekoodia

kv separation v1.0

main
VirgilZhu 8 kuukautta sitten
vanhempi
commit
43b44e3167
9 muutettua tiedostoa jossa 592 lisäystä ja 11 poistoa
  1. +7
    -1
      CMakeLists.txt
  2. +313
    -9
      db/db_impl.cc
  3. +30
    -1
      db/db_impl.h
  4. +3
    -0
      db/dbformat.h
  5. +107
    -0
      db/kv_separate_management.cc
  6. +68
    -0
      db/kv_separate_management.h
  7. +6
    -0
      include/leveldb/env.h
  8. +6
    -0
      include/leveldb/write_batch.h
  9. +52
    -0
      util/env_posix.cc

+ 7
- 1
CMakeLists.txt Näytä tiedosto

@ -121,7 +121,9 @@ add_library(leveldb
db/vlog_reader.h
db/vlog_reader.cc
db/vlog_writer.h
db/vlog_writer.cc)
db/vlog_writer.cc
db/kv_separate_management.cc
db/kv_separate_management.h)
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
@ -548,11 +550,15 @@ target_link_libraries(value_field_test PRIVATE leveldb gtest)
add_executable(kv_test
"${PROJECT_SOURCE_DIR}/test/kv_test.cc"
test/kv_test.cc
db/kv_separate_management.cc
db/kv_separate_management.h
)
target_link_libraries(kv_test PRIVATE leveldb gtest)
add_executable(bench_test
"${PROJECT_SOURCE_DIR}/test/bench_test.cc"
test/bench_test.cc
db/kv_separate_management.cc
db/kv_separate_management.h
)
target_link_libraries(bench_test PRIVATE leveldb gtest)

+ 313
- 9
db/db_impl.cc Näytä tiedosto

@ -11,6 +11,7 @@
#include <set>
#include <string>
#include <vector>
#include <iostream>
#include "fields.h"
#include "db/builder.h"
@ -141,6 +142,9 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(nullptr),
shutting_down_(false),
background_work_finished_signal_(&mutex_),
garbage_collection_work_signal_(&mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
@ -148,12 +152,15 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
logfile_number_(0),
seed_(0),
tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false),
background_GarbageCollection_scheduled_(false),
finish_back_garbage_collection_(false),
manual_compaction_(nullptr),
vlog_(nullptr),
vlog_kv_numbers_(0),
garbage_collection_management_(new SeparateManagement(raw_options.garbage_collection_threshold) ),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
@ -164,6 +171,9 @@ DBImpl::~DBImpl() {
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
while(background_GarbageCollection_scheduled_){
garbage_collection_work_signal_.Wait();
}
mutex_.Unlock();
if (db_lock_ != nullptr) {
@ -745,6 +755,9 @@ void DBImpl::RecordBackgroundError(const Status& s) {
if (bg_error_.ok()) {
bg_error_ = s;
background_work_finished_signal_.SignalAll();
// TODO begin
// garbage_collection_work_signal_.SignalAll();
// TODO end
}
}
@ -765,6 +778,204 @@ void DBImpl::MaybeScheduleCompaction() {
}
}
// TODO begin 调度垃圾回收的相关的函数 不需要锁,仅仅是追加的方式。
// 获得db库中所有的log文件,将其放入到vector中
Status DBImpl::GetAllValueLog(std::string dir,std::vector<uint64_t>& logs){
logs.clear();
std::vector<std::string> filenames;
// 获取文件列表
Status s = env_->GetChildren(dir, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
//存储当前已有的日志文件
if (type == kLogFile)
logs.push_back(number);
}
}
return s;
}
// 手动进行离线回收、
// 1. 如果管理类 separate_management中有的话,那么就按照这个类中的map的信息进行回收,主要用于删除快照后的一个回收
// 2. 对db目录下的文件所有的文件进行回收,主要针对于open的时候。主线程中使用。
// 返回的 status 如果是不ok的说明回收的时候出现一个log文件是有问题的。
Status DBImpl::OutLineGarbageCollection(){
MutexLock l(&mutex_);
Status s;
// map 中保存了文件的信息,那么就采用map来指导回收,否则对db下所有的log文件进行回收
if (!garbage_collection_management_->EmptyMap()) {
garbage_collection_management_->CollectionMap();
uint64_t last_sequence = versions_->LastSequence();
garbage_collection_management_->ConvertQueue(last_sequence);
versions_->SetLastSequence(last_sequence);
MaybeScheduleGarbageCollection();
return Status();
}
return s;
}
// 读取回收一个log文件,不加锁
// next_sequence : 只有在open的时候才会返回需要修改的值,在线gc是不需要的。
// next_sequence 指的是第一个没有用到的sequence
Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
struct LogReporter : public log::VlogReader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
LogReporter report;
std::string logName = LogFileName(dbname_, fid);
SequentialFile* lfile;
Status status = env_->NewSequentialFile(logName, &lfile);
if (!status.ok()) {
Log(options_.info_log, "Garbage Collection Open file error: %s", status.ToString().c_str());
return status;
}
log::VlogReader reader(lfile, &report);
Slice record;
std::string scratch;
// record_offset 每条record 相对文本开头的偏移。
uint64_t record_offset = 0;
uint64_t size_offset = 0;
WriteOptions opt(options_.background_garbage_collection_separate_);
WriteBatch batch(opt.separate_threshold);
batch.setGarbageColletion(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
while( reader.ReadRecord(&record,&scratch) ){
const char* head_record_ptr = record.data();
record.remove_prefix(log::vHeaderSize + log::wHeaderSize);
while( record.size() > 0 ){
const char* head_kv_ptr = record.data();
// kv对在文本中的偏移
uint64_t kv_offset = record_offset + head_kv_ptr - head_record_ptr;
ValueType type = static_cast<ValueType>(record[0]);
record.remove_prefix(1);
Slice key;
Slice value;
std::string get_value;
GetLengthPrefixedSlice(&record,&key);
if( type != kTypeDeletion ){
GetLengthPrefixedSlice(&record,&value);
}
// 需要抛弃的值主要有以下三种情况:0,1,2
// 0. log 中不是 kv 分离的都抛弃
if(type != kTypeSeparation){
continue;
}
status = this->GetLsm(key,&get_value);
// 1. 从LSM tree 中找不到值,说明这个值被删除了,log中要丢弃
// 2. 找到了值,但是最新值不是kv分离的情况,所以也可以抛弃
if (status.IsNotFound() || !status.IsSeparated() ) {
continue;
}
// 读取错误,整个文件都不继续进行回收了
if( !status.ok() ){
std::cout<<"read the file error "<<std::endl;
return status;
}
// 判断是否要丢弃旧值
Slice get_slice(get_value);
uint64_t lsm_fid;
uint64_t lsm_offset;
GetVarint64(&get_slice,&lsm_fid);
GetVarint64(&get_slice,&lsm_offset);
if( fid == lsm_fid && lsm_offset == kv_offset ){
batch.Put(key, value);
++next_sequence;
if( kv_offset - size_offset > config::gcWriteBatchSize ){
Write(opt, &batch);
batch.Clear();
batch.setGarbageColletion(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
uint64_t kv_size;
GetVarint64(&get_slice,&kv_size);
size_offset = kv_offset + kv_size;
}
}
}
record_offset += record.data() - head_record_ptr;
}
Write(opt, &batch);
status = env_->RemoveFile(logName);
if( status.ok() ){
garbage_collection_management_->RemoveFileFromMap(fid);
}
return status;
}
// 回收任务
void DBImpl::BackGroundGarbageCollection(){
uint64_t fid;
uint64_t last_sequence;
while( true){
Log(options_.info_log, "garbage collection file number: %lu", fid);
if( !garbage_collection_management_->GetGarbageCollectionQueue(fid,last_sequence) ){
return;
}
// 在线的gc回收的sequence是要提前就分配好的。
CollectionValueLog(fid,last_sequence);
}
}
// 可能调度后台线程进行压缩
void DBImpl::MaybeScheduleGarbageCollection() {
mutex_.AssertHeld();
if (background_GarbageCollection_scheduled_) {
// Already scheduled
// 先检查线程是否已经被调度了,如果已经被调度了,就直接退出。
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
// 如果DB已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
// 如果后台线程出错,也不调度。
} else {
//设置调度变量,通过detach线程调度;detach线程即使主线程退出,依然可以正常执行完成
background_GarbageCollection_scheduled_ = true;
env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this);
}
}
// 后台gc线程中执行的任务
void DBImpl::GarbageCollectionBGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->GarbageCollectionBackgroundCall();
}
void DBImpl::GarbageCollectionBackgroundCall() {
assert(background_GarbageCollection_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
// // 如果DB已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// No more background work after a background error.
// 如果后台线程出错,也不调度。
} else {
// 开始后台GC回收线程
BackGroundGarbageCollection();
}
background_GarbageCollection_scheduled_ = false;
//再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度
// MaybeScheduleGarbageCollection();
garbage_collection_work_signal_.SignalAll();
}
// TODO end
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
@ -843,9 +1054,9 @@ void DBImpl::BackgroundCompaction() {
// TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。
// 不进行后台的gc回收,那么也不更新待分配sequence的log了。
if(!finish_back_garbage_collection_){
garbage_colletion_management_->UpdateQueue(versions_->ImmLogFileNumber() );
garbage_collection_management_->UpdateQueue(versions_->ImmLogFileNumber() );
}
// TODO end
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
@ -1103,7 +1314,22 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
}
} else {// TODO begin
//fid ,key valuesize ,
Slice drop_value = input->value();
// 获得type类型
if( ikey.type == kTypeSeparation ){
uint64_t fid = 0;
uint64_t offset = 0;
uint64_t size = 0;
GetVarint64(&drop_value,&fid);
GetVarint64(&drop_value,&offset);
GetVarint64(&drop_value,&size);
mutex_.Lock();
garbage_collection_management_->UpdateMap(fid,size);
mutex_.Unlock();
}
}// TODO end
input->Next();
}
@ -1226,6 +1452,48 @@ bool DBImpl::ParseVlogValue(Slice key_value, Slice key,
}
}
Status DBImpl::GetLsm(const Slice& key, std::string* value) {
MutexLock l(&mutex_);
ReadOptions options;
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
if( !this->snapshots_.empty() ){
options.snapshot = this->snapshots_.oldest();
}
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
Status s;
mem->Ref();
// imm 不一定存在,但是 mem 是一定存在的。
if (imm != nullptr) imm->Ref();
current->Ref(); // Version 读引用计数增一
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s )) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
//在Version中查找是否包含指定key值
s = current->Get(options, lkey, value, &stats);
}
mutex_.Lock();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref(); //Version 读引用计数减一
return s;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1341,12 +1609,20 @@ void DBImpl::RecordReadSample(Slice key) {
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
// TODO begin 建立快照 对快照之后的信息不进行回收了。
finish_back_garbage_collection_ = true;
// TODO end
return snapshots_.New(versions_->LastSequence());
}
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
MutexLock l(&mutex_);
snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
// TODO begin 没有快照了重新进行后台回收
if( snapshots_.empty() ){
finish_back_garbage_collection_ = false;
}
// TODO end
}
/*** DBImpl 类关于 Fields 类的 Put、Get 接口 ***/
@ -1396,6 +1672,30 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
// TODO begin gc中的batch全部都是设置好的。此时是不需要设置的。
if( !write_batch->IsGarbageColletion() ){
// 判断是否需要进行垃圾回收,如需要,腾出一块sequence的区域,触发垃圾回收将在makeroomforwrite当中。
// 先进行判断是否要进行gc后台回收,如果建立了快照的话finish_back_garbage_collection_就是true,
// 此时不进行sequence分配了。
//
if( !finish_back_garbage_collection_
&& garbage_collection_management_->ConvertQueue(last_sequence) ){
// 尝试调度gc回收线程进行回收。
MaybeScheduleGarbageCollection();
}
//SetSequence在write_batch中写入本次的sequence
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
// Count返回write_batch中的key-value个数
last_sequence += WriteBatchInternal::Count(write_batch);
}
vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch);
// TODO 这里设置last_sequence 是为了照顾离线回收的时候,在map存在的时候需要调用 ConvertQueue 给回收任务分配sequence。
// TODO 针对多线程调用put的时候,为了避免给gc回收的时候分配的sequence重叠。
versions_->SetLastSequence(last_sequence);
// TODO end
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
@ -1479,11 +1779,15 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// TODO begin 写队列中如果碰到是gc的write_batch 停止合并。
if (w->sync && !first->sync
|| first->batch->IsGarbageColletion()
|| w->batch->IsGarbageColletion()) {
// 当前的Writer要求 Sync ,而第一个Writer不要求Sync,两个的磁盘写入策略不一致。不做合并操作
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
// TODO end
if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
@ -1520,7 +1824,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
if (!s.ok()) {
versions_->ReuseFileNumber(new_log_number);
}
// gc_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize());
garbage_collection_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize());
vlog_kv_numbers_ = 0;
delete vlog_;
delete logfile_;
@ -1684,7 +1988,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// TODO begin
std::vector<uint64_t> logs;
s = impl->GetAllValueLog(dbname,logs);
s = impl->GetAllValueLog(dbname, logs);
sort(logs.begin(),logs.end());
// TODO end
@ -1728,7 +2032,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
uint64_t next_sequence = impl->versions_->LastSequence() + 1;
std::cout<<" collection file : "<<fid<<std::endl;
impl->mutex_.Unlock();
Status stmp = impl->CollectionValueLog( fid,next_sequence );
Status stmp = impl->CollectionValueLog(fid, next_sequence);
impl->mutex_.Lock();
if( !stmp.ok() ) s = stmp;
impl->versions_->SetLastSequence(next_sequence - 1);

+ 30
- 1
db/db_impl.h Näytä tiedosto

@ -13,6 +13,8 @@
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/vlog_writer.h"
#include "db/kv_separate_management.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -78,6 +80,11 @@ class DBImpl : public DB {
// bytes.
void RecordReadSample(Slice key);
// TODO begin
Status OutLineGarbageCollection();
Status GetAllValueLog(std::string dir, std::vector<uint64_t>& logs);
// TODO end
private:
friend class DB;
struct CompactionState;
@ -145,6 +152,15 @@ class DBImpl : public DB {
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// TODO begin
static void GarbageCollectionBGWork(void* db);
void GarbageCollectionBackgroundCall();
void MaybeScheduleGarbageCollection() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackGroundGarbageCollection();
Status CollectionValueLog(uint64_t fid, uint64_t& last_sequence);
// TODO end
static void BGWork(void* db);
void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
@ -162,6 +178,10 @@ class DBImpl : public DB {
return internal_comparator_.user_comparator();
}
// TODO begin
Status GetLsm( const Slice& key,std::string* value);
// TODO end
// Constant after construction
Env* const env_;
const InternalKeyComparator internal_comparator_;
@ -215,7 +235,16 @@ class DBImpl : public DB {
int vlog_kv_numbers_;
// KVSepManagement* gc_management_;
// TODO begin gc回收的过程
port::CondVar garbage_collection_work_signal_ GUARDED_BY(mutex_);
// gc线程是否已经被调度或者在运行
bool background_GarbageCollection_scheduled_ GUARDED_BY(mutex_);
bool finish_back_garbage_collection_;
SeparateManagement* garbage_collection_management_;
// TODO end
};
// Sanitize db options. The caller should delete result.info_log if

+ 3
- 0
db/dbformat.h Näytä tiedosto

@ -44,6 +44,9 @@ static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576;
// gc后台回收的时候进行 batch合并之后再写入的大小
static const uint64_t gcWriteBatchSize = 4*1024*1024;
} // namespace config
class InternalKey;

+ 107
- 0
db/kv_separate_management.cc Näytä tiedosto

@ -0,0 +1,107 @@
#include "kv_separate_management.h"
#include <queue>
#include <vector>
#include <db/dbformat.h>
namespace leveldb {
// 改变 db 的 last_sequence,给每一个需要进行gc回收的value log 文件分配 新的sequence的序号,
// 以便对value log 中的有效key的重新put进新的value log 中。返回值决定是否进行gc
bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) {
if (!need_updates_.empty()) {
db_sequence++;
} else {
return false;
}
while (!need_updates_.empty()) {
ValueLogInfo* info = need_updates_.front();
need_updates_.pop_front();
map_file_info_.erase(info->logfile_number_);
info->last_sequence_ = db_sequence;
db_sequence += info->left_kv_numbers_;
garbage_collection_.push_back(info);
}
assert(db_sequence <= kMaxSequenceNumber);
return true;
}
// 每一个vlog 罗盘的时候都会在map_file_info_ 中添加索引 ,这个在新建一个value log的时候会用到。
void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory) {
assert(map_file_info_.find(fid) == map_file_info_.end());
ValueLogInfo* info = new ValueLogInfo();
info->logfile_number_ = fid;
info->left_kv_numbers_ = kv_numbers;
assert(kv_numbers <= kMaxSequenceNumber);
info->invalid_memory_ = 0;
info->last_sequence_ = -1;
info->file_size_ = log_memory;
map_file_info_.insert(std::make_pair(fid,info));
}
// map_file_info_ 存放了所有的value log 的信息,每次删除一个key的时候要对这个key对应的value log计算空间无效利用率
// 所以要统计有多少空间是无效的,以便后面进行触发gc的过程。
void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) {
if (map_file_info_.find(fid) != map_file_info_.end()) {
ValueLogInfo* info = map_file_info_[fid];
info->left_kv_numbers_--;
info->invalid_memory_ += abandon_memory;
}
}
// 遍历 map_file_info_ 中所有的file 找到无效空间最大的log 进行gc回收 ,这个文件要不存在 delete_files_中
void SeparateManagement::UpdateQueue(uint64_t fid) {
std::priority_queue<ValueLogInfo*, std::vector<ValueLogInfo*>, MapCmp> sort_priority_;
for (auto iter = map_file_info_.begin(); iter != map_file_info_.end(); ++iter) {
if (delete_files_.find( iter->first) == delete_files_.end()) {
sort_priority_.push(iter->second);
}
}
int num = 1;
int threshold = garbage_collection_threshold_;
if (!sort_priority_.empty()
&& sort_priority_.top()->invalid_memory_ >= garbage_collection_threshold_ * 1.2) {
num = 3;
threshold = garbage_collection_threshold_ * 1.2;
}
while (!sort_priority_.empty() && num > 0) {
ValueLogInfo* info = sort_priority_.top();
sort_priority_.pop();
if (info->logfile_number_ > fid) {
continue;
}
num--;
if (info->invalid_memory_ >= threshold) {
need_updates_.push_back(info);
delete_files_.insert(info->logfile_number_);
}
}
}
// gc回收线程用来获得需要回收的文件
bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence){
if (garbage_collection_.empty()) {
return false;
} else {
ValueLogInfo* info = garbage_collection_.front();
garbage_collection_.pop_front();
fid = info->logfile_number_;
last_sequence = info->last_sequence_;
return true;
}
}
void SeparateManagement::CollectionMap(){
if (map_file_info_.empty()) return;
for (auto iter : map_file_info_) {
uint64_t fid = iter.first;
ValueLogInfo* info = iter.second;
if (delete_files_.find(fid) == delete_files_.end()) {
need_updates_.push_back(info);
delete_files_.insert(info->logfile_number_);
}
}
}
}

+ 68
- 0
db/kv_separate_management.h Näytä tiedosto

@ -0,0 +1,68 @@
#ifndef LEVELDB_KV_SEPARATE_MANAGEMENT_H
#define LEVELDB_KV_SEPARATE_MANAGEMENT_H
#include <unordered_map>
#include <unordered_set>
#include <deque>
#include "leveldb/slice.h"
#include <iterator>
namespace leveldb {
typedef struct ValueLogInfo {
uint64_t last_sequence_;
size_t file_size_; //
uint64_t logfile_number_; //
int left_kv_numbers_; // kv
uint64_t invalid_memory_; // value log
}ValueLogInfo;
struct MapCmp{
bool operator ()(const ValueLogInfo* a, const ValueLogInfo* b)
{
return a->invalid_memory_ < b->invalid_memory_; // value
}
};
class SeparateManagement {
public:
SeparateManagement(uint64_t garbage_collection_threshold)
: garbage_collection_threshold_(garbage_collection_threshold) {}
~SeparateManagement() {}
bool ConvertQueue(uint64_t& db_sequence); // db last_sequence
void UpdateMap(uint64_t fid,uint64_t abandon_memory);
void UpdateQueue(uint64_t fid);
bool GetGarbageCollectionQueue(uint64_t& fid,uint64_t& last_sequence);
void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory);
bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); }
void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); }
bool EmptyMap() { return map_file_info_.empty(); }
void CollectionMap();
private:
uint64_t garbage_collection_threshold_;
// vlog文件的索引
std::unordered_map<uint64_t, ValueLogInfo*> map_file_info_;
// info gc回收的
std::deque<ValueLogInfo*> garbage_collection_;
// gc回收的sequencen的info
std::deque<ValueLogInfo*> need_updates_;
std::unordered_set<uint64_t> delete_files_;
};
} // namespace leveldb
#endif //LEVELDB_KV_SEPARATE_MANAGEMENT_H

+ 6
- 0
include/leveldb/env.h Näytä tiedosto

@ -197,6 +197,7 @@ class LEVELDB_EXPORT Env {
// serialized.
virtual void Schedule(void (*function)(void* arg), void* arg) = 0;
virtual void ScheduleForGarbageCollection(void (*function)(void* arg), void* arg) = 0;
// Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed.
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
@ -385,6 +386,11 @@ class LEVELDB_EXPORT EnvWrapper : public Env {
void Schedule(void (*f)(void*), void* a) override {
return target_->Schedule(f, a);
}
void ScheduleForGarbageCollection(void (*f)(void*), void* a) override{
return target_->ScheduleForGarbageCollection(f, a);
}
void StartThread(void (*f)(void*), void* a) override {
return target_->StartThread(f, a);
}

+ 6
- 0
include/leveldb/write_batch.h Näytä tiedosto

@ -76,10 +76,16 @@ class LEVELDB_EXPORT WriteBatch {
Status Iterate(Handler* handler) const;
Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const;
bool IsGarbageColletion() { return belong_to_gc; }
void setGarbageColletion(bool is_gc) { belong_to_gc = is_gc; }
private:
friend class WriteBatchInternal;
size_t separate_threshold_;
std::string rep_; // See comment in write_batch.cc for the format of rep_
bool belong_to_gc;
};
} // namespace leveldb

+ 52
- 0
util/env_posix.cc Näytä tiedosto

@ -698,6 +698,9 @@ class PosixEnv : public Env {
void Schedule(void (*background_work_function)(void* background_work_arg),
void* background_work_arg) override;
void ScheduleForGarbageCollection(void (*background_work_function)(void* background_work_arg),
void* background_work_arg) override;
void StartThread(void (*thread_main)(void* thread_main_arg),
void* thread_main_arg) override {
std::thread new_thread(thread_main, thread_main_arg);
@ -753,11 +756,16 @@ class PosixEnv : public Env {
private:
void BackgroundThreadMain();
void BackgroundThreadMainGarbageCollection();
static void BackgroundThreadEntryPoint(PosixEnv* env) {
env->BackgroundThreadMain();
}
static void BackgroundThreadEntryPointforGlobalCollection(PosixEnv* env) {
env->BackgroundThreadMainGarbageCollection();
}
// Stores the work item data in a Schedule() call.
//
// Instances are constructed on the thread calling Schedule() and used on the
@ -779,6 +787,14 @@ class PosixEnv : public Env {
std::queue<BackgroundWorkItem> background_work_queue_
GUARDED_BY(background_work_mutex_);
// TODO begin gc 回收相关的变量
port::Mutex background_GlobalCollection_work_mutex_;
port::CondVar background_GlobalCollection_work_cv_ GUARDED_BY(background_GlobalCollection_work_mutex_);
std::queue<BackgroundWorkItem> background_GlobalCollection_work_queue_
GUARDED_BY(background_GlobalCollection_work_mutex_);
// TODO end
PosixLockTable locks_; // Thread-safe.
Limiter mmap_limiter_; // Thread-safe.
Limiter fd_limiter_; // Thread-safe.
@ -814,6 +830,7 @@ int MaxOpenFiles() {
PosixEnv::PosixEnv()
: background_work_cv_(&background_work_mutex_),
background_GlobalCollection_work_cv_(&background_GlobalCollection_work_mutex_),
started_background_thread_(false),
mmap_limiter_(MaxMmaps()),
fd_limiter_(MaxOpenFiles()) {}
@ -858,6 +875,41 @@ void PosixEnv::BackgroundThreadMain() {
}
}
void PosixEnv::ScheduleForGarbageCollection(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_GlobalCollection_work_mutex_.Lock();
// If the queue is empty, the background thread may be waiting for work.
if (background_GlobalCollection_work_queue_.empty()) {
background_GlobalCollection_work_cv_.Signal();
}
// 因为是锁住了 所以可以先 signal 再 emplace。
background_GlobalCollection_work_queue_.emplace(background_work_function, background_work_arg);
background_GlobalCollection_work_mutex_.Unlock();
}
// gc 的后台回收任务
void PosixEnv::BackgroundThreadMainGarbageCollection() {
while (true) {
background_GlobalCollection_work_mutex_.Lock();
// Wait until there is work to be done.
while (background_GlobalCollection_work_queue_.empty()) {
background_GlobalCollection_work_cv_.Wait();
}
assert(!background_GlobalCollection_work_queue_.empty());
auto background_work_function = background_GlobalCollection_work_queue_.front().function;
void* background_work_arg = background_GlobalCollection_work_queue_.front().arg;
background_GlobalCollection_work_queue_.pop();
background_GlobalCollection_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}
namespace {
// Wraps an Env instance whose destructor is never created.

Ladataan…
Peruuta
Tallenna