You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

358 lines
12 KiB

//
// Created by 马也驰 on 2024/12/29.
//
#include "vlog_set.h"
#include "../db/vlog_gc.h"
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
// value: { value_len(uint16_t) | slot_num(size_t) | value }
VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) {
auto cfname = get_config_file_name();
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out);
if (!this->config_file_->is_open()) {
// config 文件不存在,尝试创建
delete this->config_file_;
this->config_file_ = new std::fstream(cfname, std::ios::out);
this->config_file_->close();
delete this->config_file_;
// 重新以读写模式打开
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out);
this->config_file_->seekp(0);
size_t tmp = 0;
this->config_file_->write(reinterpret_cast<const char*>(&tmp), sizeof(size_t));
this->config_file_->flush();
this->vlog_nums_ = 0;
register_new_vlog();
} else {
// config 文件存在
size_t _vlog_nums_;
config_file_->seekp(0);
config_file_->read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t));
this->vlog_nums_ = _vlog_nums_;
// 从 config file 中读取所有有效的vlog名字
config_file_->seekp(sizeof(size_t));
size_t tmp[_vlog_nums_];
config_file_->read(reinterpret_cast<char*>(tmp), sizeof(tmp));
for (auto i = 0; i < _vlog_nums_; i++) {
size_t curr_vlog_num = tmp[i];
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) {
auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out);
size_t curr_vlog_header[2];
curr_vlog.seekp(0);
curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t));
curr_vlog.close();
restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0]));
}
}
}
if (!this->config_file_->is_open()) {
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl;
std::exit(EXIT_FAILURE);
}
}
VlogSet::~VlogSet() {
config_file_->seekp(0);
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t));
config_file_->flush();
config_file_->close();
for (auto & it : vlog_info_map_) {
struct vlog_info* vinfo = it.second;
// 使用 vlog_info* 进行操作
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums};
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out);
vlog_file_handler.seekp(0);
vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
vlog_file_handler.flush();
vlog_file_handler.close();
}
delete vlog_gc;
}
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.soft_lock();
vinfo->vlog_info_latch_.unlock();
read_vlog_value(vlog_num, value_offset, value);
vhandler->vlog_latch_.soft_unlock();
mtx.unlock();
}
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) {
struct vlog_info* vinfo = it->second;
// 使用 vlog_info* 进行操作
if (vinfo->curr_size + value_size <= VLOG_SIZE) {
return vinfo;
}
}
// 所有vlog已满,创建新vlog
return nullptr;
}
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) {
mtx.lock();
auto vinfo = get_writable_vlog_info(value.size());
if (!vinfo) {
// vlog全部已满,创建新的vlog
auto _vlog_num_ = register_new_vlog();
vinfo = get_vlog_info(_vlog_num_);
}
vinfo->vlog_info_latch_.lock();
*vlog_num = vinfo->vlog_num;
*value_offset = vinfo->curr_size;
vinfo->curr_size += value.size();
vinfo->value_nums ++;
vinfo->vlog_info_latch_.unlock();
auto vhandler = get_vlog_handler(*vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.hard_lock();
vinfo->vlog_info_latch_.unlock();
write_vlog_value(*vlog_num, *value_offset, value);
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
// auto vlog_num_gc = vinfo->vlog_num_for_gc;
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// vinfo->vlog_info_latch_.unlock();
// vinfo = get_vlog_info(vlog_num_gc);
// vinfo->vlog_info_latch_.lock();
}
vhandler->vlog_latch_.hard_lock();
mark_del_value(vlog_num, value_offset);
vinfo->vlog_info_latch_.unlock();
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
size_t VlogSet::register_new_vlog() {
size_t vn = vlog_nums_;
std::string vlog_name = get_vlog_name(vn);
register_inconfig_file(vn);
create_vlog(vn);
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
// if (!vlog_new->is_open()) {
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
// std::exit(EXIT_FAILURE);
// }
register_vlog_inmaps(vn, vlog_name);
vlog_nums_ ++;
// vlog_count_ ++;
return vn;
}
void VlogSet::remove_old_vlog(size_t old_vlog_num) {
// after gc, new_vlog has been created
// new vlog should have been created here !
mtx.lock();
auto vi_old = get_vlog_info(old_vlog_num);
vi_old->vlog_info_latch_.lock();
std::string old_vlog_name = get_vlog_name(old_vlog_num);
remove_from_config_file(old_vlog_num);
remove_vlog_from_maps(old_vlog_name);
vi_old->vlog_valid_ = false;
vi_old->vlog_info_latch_.unlock();
// vlog_count_ --;
mtx.unlock();
}
bool VlogSet::vlog_need_gc(size_t vlog_num) {
// FIXME: vlog应该已经满了才行
std::string vlog_name = get_vlog_name(vlog_num);
auto vi = vlog_info_map_[vlog_name];
if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) {
return false;
}
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
return retval;
}
void VlogSet::register_inconfig_file(size_t vlog_num) {
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
// first size_t in config file indicates current vlog_nums_(size_t)
// second size_t in config file indicates current vlog_count_(size_t)
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t));
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t));
config_file_->flush();
}
void VlogSet::remove_from_config_file(size_t vlog_num) {
char tmp[vlog_nums_*vlog_num_size];
config_file_->seekp(sizeof(size_t));
config_file_->read(tmp, sizeof(tmp));
size_t *vlog_num_ptr = reinterpret_cast<size_t*>(tmp);
for (auto i = 0; i < vlog_nums_; i++) {
size_t curr_vlog_num = *vlog_num_ptr;
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) {
curr_vlog_num |= CONFIG_FILE_DELE_MASK;
config_file_->seekp(sizeof(size_t) + i*sizeof(size_t));
config_file_->write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t));
config_file_->flush();
break;
}
}
}
void VlogSet::create_vlog(size_t vlog_num) {
auto vlog_name = get_vlog_name(vlog_num);
std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out);
vlog_new->close();
delete vlog_new;
// 重新以读写模式打开
vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
if (!vlog_new->is_open()) {
std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl;
std::exit(EXIT_FAILURE);
}
size_t tmp[2] = {2*sizeof(size_t), vlog_num};
vlog_new->seekp(0);
vlog_new->write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
vlog_new->flush();
vlog_new->close();
delete vlog_new;
}
inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) {
auto vlog_name = get_vlog_name(vi->vlog_num);
vlog_info_map_[vlog_name] = vi;
vlog_handler_map_[vlog_name] = new vlog_handler();
}
inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) {
auto vinfo = new vlog_info(vlog_num);
auto vhandler = new vlog_handler();
vlog_info_map_[vlog_name] = vinfo;
vlog_handler_map_[vlog_name] = vhandler;
}
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
auto vi = vlog_info_map_[vlog_name];
// auto vh = vlog_handler_map_[vlog_name];
vi->vlog_info_latch_.lock();
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag
// vh->handler_->close();
assert(!std::remove(vlog_name.c_str()));
vlog_handler_map_.erase(vlog_name);
vi->vlog_info_latch_.unlock();
}
inline std::string VlogSet::get_config_file_name() {
return dbname + "_config_file";
}
std::string VlogSet::get_vlog_name(size_t vlog_num) {
return dbname + "_vlog_" + std::to_string(vlog_num);
}
struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) {
return vlog_info_map_[get_vlog_name(vlog_num)];
}
struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) {
return vlog_handler_map_[get_vlog_name(vlog_num)];
}
// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) | value |
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
// FIXME: remove value size
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
value_size &= VALUE_SIZE_MASK;
// *value = std::string(value_buff);
assert(value_size <= VALUE_BUFF_SIZE);
value->assign(&value_buff[sizeof(uint16_t)], value_size);
handler.close();
}
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) {
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
const char *value_buff = value.data();
handler.write(value_buff, value.size());
auto vinfo = get_vlog_info(vlog_num);
handler.flush();
handler.close();
}
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
auto vinfo = get_vlog_info(vlog_num);
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
if (value_size & VALUE_DELE_MASK) {
// case when value has been deleted
handler.close();
return ;
}
assert(!(value_size & VALUE_DELE_MASK));
uint16_t masked_value_size = value_size | VALUE_DELE_MASK;
memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
handler.write(value_buff, value_size);
handler.flush();
handler.close();
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
// auto vinfo = get_vlog_info(vlog_num);
vinfo->discard ++;
vinfo->value_nums --;
vinfo->curr_size -= value_size & VALUE_SIZE_MASK;
if (vlog_need_gc(vlog_num)) {
// create new vlog
vinfo->vlog_valid_ = false;
vinfo->vlog_num_for_gc = register_new_vlog();
// vinfo->vlog_valid_ = false;
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc);
}
}