2 Commits

7 changed files with 72 additions and 98 deletions
Split View
  1. +8
    -1
      db/db_impl.cc
  2. +8
    -3
      db/gc_executor.cpp
  3. +13
    -70
      db/vlog_gc.cpp
  4. +23
    -15
      db/vlog_gc.h
  5. +13
    -5
      db/vlog_set.cpp
  6. +6
    -4
      db/vlog_set.h
  7. +1
    -0
      test/db_test1.cc

+ 8
- 1
db/db_impl.cc View File

@ -1202,6 +1202,9 @@ Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->get_value(sc, &vlog_value);
if (vlog_value.empty()) {
return Status::NotFound("value has been deleted");
}
std::cout << "value from value_log: " << key.ToString() << vlog_value << std::endl;
DeserializeValue(fields, vlog_value);
@ -1224,7 +1227,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
vlog_set_->get_value(sc, &vlog_value);
*value = vlog_value;
return s;
if (vlog_value.empty()) {
return Status::NotFound("value has been deleted");
}
return Status::OK();
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {

+ 8
- 3
db/gc_executor.cpp View File

@ -2,6 +2,7 @@
// Created by 马也驰 on 2025/1/3.
//
#include "gc_executor.h"
#include "vlog_set.h"
#include "vlog_gc.h"
@ -28,8 +29,10 @@
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
char old_vlog_buff[VLOG_SIZE];
char new_vlog_buff[VLOG_SIZE];
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
char *old_vlog_buff = static_cast<char*>(malloc(VLOG_SIZE));
char *new_vlog_buff = static_cast<char*>(malloc(VLOG_SIZE));
old_vlog.seekp(0);
old_vlog.read(old_vlog_buff, VLOG_SIZE);
@ -59,11 +62,13 @@
new_vlog.write(new_vlog_buff, VLOG_SIZE);
new_vlog.flush();
free(old_vlog_buff);
free(new_vlog_buff);
old_vlog.close();
new_vlog.close();
old_vlog_info->vlog_valid_ = false;
vlog_set->remove_old_vlog(old_vlog_num);
old_vlog_info->vlog_info_latch_.unlock();
old_vlog_handler->vlog_latch_.soft_unlock();

+ 13
- 70
db/vlog_gc.cpp View File

@ -19,7 +19,10 @@ std::unordered_map executor_params_map_;
std::unordered_map<size_t, VlogGC *> vlog_gc_map_;
void test_func() {
void test_func(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) {
assert(vlog_gc_);
auto vs = vlog_gc_->get_vlog_set();
auto sp = vlog_gc_->get_slot_page();
int nums[4] = {1, 2, 3, 4};
for (int i = 0; i < 4; i++) {
nums[i] ++;
@ -64,6 +67,7 @@ void del_vlog_gc(size_t gc_num) {
map_latch_.unlock();
}
// 函数:增加 counter
void VlogGC::gc_counter_increment() {
vlog_set->counter_latch_.lock();
@ -104,10 +108,11 @@ void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
add_vlog_gc(_gc_num_, this);
// FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求
std::thread gc_thread([_gc_num_]() mutable {
std::thread gc_thread([_gc_num_, this]() mutable {
auto _vlog_gc_ = get_vlog_gc(_gc_num_);
assert(_vlog_gc_ != nullptr);
_vlog_gc_->exec_gc(_gc_num_);
gc_counter_decrement();
});
gc_thread.detach();
@ -127,8 +132,8 @@ void VlogGC::exec_gc(size_t gc_num_) {
// start gc process
auto ep = get_executor_params(gc_num_);
// gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
test_func();
gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
// test_func(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
curr_thread_nums_latch_.lock();
if (curr_thread_nums_ >= max_thread_nums_) {
@ -137,7 +142,6 @@ void VlogGC::exec_gc(size_t gc_num_) {
curr_thread_nums_ --;
curr_thread_nums_latch_.unlock();
gc_counter_decrement();
std::cout << "vlog_gc.cpp line 138" << std::endl;
del_executor_params(gc_num_);
std::cout << "vlog_gc.cpp line 140" << std::endl;
@ -145,70 +149,9 @@ void VlogGC::exec_gc(size_t gc_num_) {
std::cout << "vlog_gc.cpp line 142" << std::endl;
del_vlog_in_gc(ep.old_vlog_num);
// FIXME: dead lock here (fixed, i think)
// FIXME: remove vlog physically
vlog_set->remove_old_vlog(ep.old_vlog_num);
}
// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... |
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
//void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) {
// vlog_set->mtx.lock();
// auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
// auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
// auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
// auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
// auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
// auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
//
// old_vlog_info->vlog_info_latch_.lock();
// old_vlog_handler->vlog_latch_.soft_lock();
// new_vlog_info->vlog_info_latch_.lock();
// new_vlog_handler->vlog_latch_.hard_lock();
// vlog_set->mtx.unlock();
//
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
//
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
// old_vlog.seekp(0);
// old_vlog.read(old_vlog_buff, VLOG_SIZE);
//
// size_t value_nums = old_vlog_info->value_nums;
// size_t ovb_off = 2 * sizeof(size_t);
// size_t nvb_off = 2 * sizeof(size_t);
// size_t new_vlog_value_nums = 0;
// for (auto i = 0; i < value_nums; i++) {
// char *value = &old_vlog_buff[ovb_off];
// uint16_t value_len = get_value_len(value);
// size_t slot_num = get_value_slotnum(value);
// if (!value_deleted(value_len)) {
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
// memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t));
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
// slot_page_->set_slot(slot_num, &scn);
// nvb_off += value_len;
// new_vlog_value_nums ++;
// }
// ovb_off += value_len;
// }
// new_vlog_info->value_nums = new_vlog_value_nums;
// new_vlog_info->curr_size = nvb_off;
// memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
// memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
// new_vlog.seekp(0);
// new_vlog.write(new_vlog_buff, VLOG_SIZE);
// new_vlog.flush();
//
// old_vlog.close();
// new_vlog.close();
//
// old_vlog_info->vlog_valid_ = false;
// vlog_set->remove_old_vlog(old_vlog_num);
//
// old_vlog_info->vlog_info_latch_.unlock();
// old_vlog_handler->vlog_latch_.soft_unlock();
// new_vlog_info->vlog_info_latch_.unlock();
// new_vlog_handler->vlog_latch_.hard_unlock();
//
//}
//

+ 23
- 15
db/vlog_gc.h View File

@ -35,28 +35,34 @@ friend class gc_executor;
break;
}
}
std::cout << "vlog_gc has been deleted!" << std::endl;
}
void do_gc(size_t old_vlog_num, size_t new_vlog_num);
SlotPage *get_slot_page() { return slot_page_; }
VlogSet *get_vlog_set() { return vlog_set; }
private:
void exec_gc(size_t gc_num_);
// static void gc_process(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num);
void gc_counter_increment();
void gc_counter_decrement();
inline bool value_deleted(uint16_t value_len) {
return !(value_len >> 15);
}
inline uint16_t get_value_len(char *value) {
uint16_t value_len;
memcpy(&value_len, value, sizeof(uint16_t));
return value_len;
}
inline size_t get_value_slotnum(char *value) {
size_t slot_num;
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
return slot_num;
}
class="o">// static inline bool value_deleted(uint16_t value_len) {
// return !(value_len >> 15);
// }
class="o">// static inline uint16_t get_value_len(char *value) {
// uint16_t value_len;
// memcpy(&value_len, value, sizeof(uint16_t));
// return value_len;
// }
class="o">// static inline size_t get_value_slotnum(char *value) {
// size_t slot_num;
// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
// return slot_num;
// }
inline size_t get_gc_num() {
gc_num_latch_.lock();
@ -85,6 +91,7 @@ friend class gc_executor;
ovn_map_latch_.unlock();
}
private:
SlotPage *slot_page_;
VlogSet *vlog_set; // vlog_gc.cpp
@ -94,11 +101,12 @@ friend class gc_executor;
std::mutex curr_thread_nums_latch_;
size_t curr_thread_nums_;
// gc
std::mutex gc_num_latch_;
size_t gc_num;
size_t gc_num; // gc线程id便map中获取gc参数等信息
std::mutex ovn_map_latch_;
std::unordered_map<size_t, bool> ovn_map_;
std::unordered_map<size_t, bool> ovn_map_; // vlog是否正在进行gc
};

+ 13
- 5
db/vlog_set.cpp View File

@ -116,8 +116,8 @@ void VlogSet::get_value(const struct slot_content &sc, std::string *value) {
}
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) {
struct vlog_info* vinfo = it->second;
for (auto & it : vlog_info_map_) {
struct vlog_info* vinfo = it.second;
// 使用 vlog_info* 进行操作
vinfo->vlog_info_latch_.lock();
@ -175,7 +175,7 @@ void VlogSet::del_value(const struct slot_content &sc) {
}
vhandler->vlog_latch_.hard_lock();
vhandler->decre_access_thread_nums(); // FIXME: increase thread nums
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
mark_del_value(sc);
@ -200,10 +200,18 @@ void VlogSet::remove_old_vlog(size_t old_vlog_num) {
// new vlog should have been created here
mtx.lock();
auto vi_old = get_vlog_info(old_vlog_num);
auto vh_old = get_vlog_handler(old_vlog_num);
vi_old->vlog_info_latch_.lock();
// FIXME: dead lock
while (!vh_old->non_access_thread()) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
std::cout << "waiting in remove_old_vlog" << std::endl;
}
std::string old_vlog_name = get_vlog_name(old_vlog_num);
remove_from_config_file(old_vlog_num);
// remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!!
remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!!
vi_old->vlog_valid_ = false;
vi_old->vlog_info_latch_.unlock();
mtx.unlock();
@ -281,7 +289,7 @@ inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_nam
vlog_handler_map_[vlog_name] = vhandler;
}
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
assert(!std::remove(vlog_name.c_str()));
vlog_handler_map_.erase(vlog_name);
}

+ 6
- 4
db/vlog_set.h View File

@ -23,7 +23,8 @@ friend class gc_executor;
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1))
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK)
#define VLOG_GC_THREHOLD 0.8
#define VLOG_GC_THREHOLD 0.5
public:
VlogSet(std::string dbname, VlogGC *vlog_gc);
~VlogSet();
@ -33,6 +34,7 @@ friend class gc_executor;
void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; }
private:
size_t register_new_vlog();
void remove_old_vlog(size_t old_vlog_num);
@ -44,7 +46,7 @@ friend class gc_executor;
struct vlog_info *get_writable_vlog_info(size_t value_size);
inline void restore_vlog_inmaps(struct vlog_info *vi);
inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name);
inline void remove_vlog_from_maps(std::string &vlog_name);
void remove_vlog_from_maps(std::string &vlog_name);
inline std::string get_config_file_name();
std::string get_vlog_name(size_t vlog_num);
struct vlog_info *get_vlog_info(size_t vlog_num);
@ -62,10 +64,10 @@ friend class gc_executor;
std::mutex config_file_latch_;
std::fstream *config_file_;
int counter = 0;
int counter = 0; // gc线程正在进行
std::mutex counter_latch_;
std::mutex finished_latch_;
bool finished = true;
bool finished = true; // gc线程都已结束
VlogGC *vlog_gc; // vlog_set.cpp
};

+ 1
- 0
test/db_test1.cc View File

@ -62,6 +62,7 @@ int test2() {
}
delete db;
std::cout << "db has been deleted!" << std::endl;
return 0;
}

Loading…
Cancel
Save