// // Created by 马也驰 on 2024/12/29. // #include "vlog_set.h" #include "../db/vlog_gc.h" // config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | // vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | // value: { value_len(uint16_t) | slot_num(size_t) | value } VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) { auto cfname = get_config_file_name(); this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); if (!this->config_file_->is_open()) { // config 文件不存在,尝试创建 delete this->config_file_; this->config_file_ = new std::fstream(cfname, std::ios::out); this->config_file_->close(); delete this->config_file_; // 重新以读写模式打开 this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); this->config_file_->seekp(0); size_t tmp = 0; this->config_file_->write(reinterpret_cast(&tmp), sizeof(size_t)); this->config_file_->flush(); this->vlog_nums_ = 0; register_new_vlog(); } else { // config 文件存在 size_t _vlog_nums_; config_file_->seekp(0); config_file_->read(reinterpret_cast(&_vlog_nums_), sizeof(size_t)); this->vlog_nums_ = _vlog_nums_; // 从 config file 中读取所有有效的vlog名字 config_file_->seekp(sizeof(size_t)); size_t tmp[_vlog_nums_]; config_file_->read(reinterpret_cast(tmp), sizeof(tmp)); for (auto i = 0; i < _vlog_nums_; i++) { size_t curr_vlog_num = tmp[i]; if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) { auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out); size_t curr_vlog_header[2]; curr_vlog.seekp(0); curr_vlog.read(reinterpret_cast(curr_vlog_header), 2*sizeof(size_t)); curr_vlog.close(); restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0])); } } } if (!this->config_file_->is_open()) { std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; std::exit(EXIT_FAILURE); } } VlogSet::~VlogSet() { config_file_->seekp(0); config_file_->write(reinterpret_cast(&vlog_nums_), sizeof(size_t)); config_file_->flush(); config_file_->close(); for (auto & it : vlog_info_map_) { struct vlog_info* vinfo = it.second; // 使用 vlog_info* 进行操作 size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); vlog_file_handler.seekp(0); vlog_file_handler.write(reinterpret_cast(tmp), sizeof(tmp)); vlog_file_handler.flush(); vlog_file_handler.close(); } delete vlog_gc; } void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { mtx.lock(); auto vinfo = get_vlog_info(vlog_num); auto vhandler = get_vlog_handler(vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.soft_lock(); vinfo->vlog_info_latch_.unlock(); read_vlog_value(vlog_num, value_offset, value); vhandler->vlog_latch_.soft_unlock(); mtx.unlock(); } struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) { struct vlog_info* vinfo = it->second; // 使用 vlog_info* 进行操作 if (vinfo->curr_size + value_size <= VLOG_SIZE) { return vinfo; } } // 所有vlog已满,创建新vlog return nullptr; } void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { mtx.lock(); auto vinfo = get_writable_vlog_info(value.size()); if (!vinfo) { // vlog全部已满,创建新的vlog auto _vlog_num_ = register_new_vlog(); vinfo = get_vlog_info(_vlog_num_); } vinfo->vlog_info_latch_.lock(); *vlog_num = vinfo->vlog_num; *value_offset = vinfo->curr_size; vinfo->curr_size += value.size(); vinfo->value_nums ++; vinfo->vlog_info_latch_.unlock(); auto vhandler = get_vlog_handler(*vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.hard_lock(); vinfo->vlog_info_latch_.unlock(); write_vlog_value(*vlog_num, *value_offset, value); vhandler->vlog_latch_.hard_unlock(); mtx.unlock(); } void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { mtx.lock(); auto vinfo = get_vlog_info(vlog_num); auto vhandler = get_vlog_handler(vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_) { // auto vlog_num_gc = vinfo->vlog_num_for_gc; vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); // vinfo->vlog_info_latch_.unlock(); // vinfo = get_vlog_info(vlog_num_gc); // vinfo->vlog_info_latch_.lock(); } vhandler->vlog_latch_.hard_lock(); mark_del_value(vlog_num, value_offset); vinfo->vlog_info_latch_.unlock(); vhandler->vlog_latch_.hard_unlock(); mtx.unlock(); } size_t VlogSet::register_new_vlog() { size_t vn = vlog_nums_; std::string vlog_name = get_vlog_name(vn); register_inconfig_file(vn); create_vlog(vn); // auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); // if (!vlog_new->is_open()) { // std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; // std::exit(EXIT_FAILURE); // } register_vlog_inmaps(vn, vlog_name); vlog_nums_ ++; // vlog_count_ ++; return vn; } void VlogSet::remove_old_vlog(size_t old_vlog_num) { // after gc, new_vlog has been created // new vlog should have been created here ! mtx.lock(); auto vi_old = get_vlog_info(old_vlog_num); vi_old->vlog_info_latch_.lock(); std::string old_vlog_name = get_vlog_name(old_vlog_num); remove_from_config_file(old_vlog_num); remove_vlog_from_maps(old_vlog_name); vi_old->vlog_valid_ = false; vi_old->vlog_info_latch_.unlock(); // vlog_count_ --; mtx.unlock(); } bool VlogSet::vlog_need_gc(size_t vlog_num) { // FIXME: vlog应该已经满了才行 std::string vlog_name = get_vlog_name(vlog_num); auto vi = vlog_info_map_[vlog_name]; if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) { return false; } bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); return retval; } void VlogSet::register_inconfig_file(size_t vlog_num) { // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | // first size_t in config file indicates current vlog_nums_(size_t) // second size_t in config file indicates current vlog_count_(size_t) config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); config_file_->write(reinterpret_cast(&vlog_num), sizeof(size_t)); config_file_->flush(); } void VlogSet::remove_from_config_file(size_t vlog_num) { char tmp[vlog_nums_*vlog_num_size]; config_file_->seekp(sizeof(size_t)); config_file_->read(tmp, sizeof(tmp)); size_t *vlog_num_ptr = reinterpret_cast(tmp); for (auto i = 0; i < vlog_nums_; i++) { size_t curr_vlog_num = *vlog_num_ptr; if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) { curr_vlog_num |= CONFIG_FILE_DELE_MASK; config_file_->seekp(sizeof(size_t) + i*sizeof(size_t)); config_file_->write(reinterpret_cast(&curr_vlog_num), sizeof(size_t)); config_file_->flush(); break; } } } void VlogSet::create_vlog(size_t vlog_num) { auto vlog_name = get_vlog_name(vlog_num); std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); vlog_new->close(); delete vlog_new; // 重新以读写模式打开 vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); if (!vlog_new->is_open()) { std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl; std::exit(EXIT_FAILURE); } size_t tmp[2] = {2*sizeof(size_t), vlog_num}; vlog_new->seekp(0); vlog_new->write(reinterpret_cast(tmp), sizeof(tmp)); vlog_new->flush(); vlog_new->close(); delete vlog_new; } inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { auto vlog_name = get_vlog_name(vi->vlog_num); vlog_info_map_[vlog_name] = vi; vlog_handler_map_[vlog_name] = new vlog_handler(); } inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { auto vinfo = new vlog_info(vlog_num); auto vhandler = new vlog_handler(); vlog_info_map_[vlog_name] = vinfo; vlog_handler_map_[vlog_name] = vhandler; } inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { auto vi = vlog_info_map_[vlog_name]; // auto vh = vlog_handler_map_[vlog_name]; vi->vlog_info_latch_.lock(); vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag // vh->handler_->close(); assert(!std::remove(vlog_name.c_str())); vlog_handler_map_.erase(vlog_name); vi->vlog_info_latch_.unlock(); } inline std::string VlogSet::get_config_file_name() { return dbname + "_config_file"; } std::string VlogSet::get_vlog_name(size_t vlog_num) { return dbname + "_vlog_" + std::to_string(vlog_num); } struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) { return vlog_info_map_[get_vlog_name(vlog_num)]; } struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { return vlog_handler_map_[get_vlog_name(vlog_num)]; } // value: | field_nums(uint16_t) | slot_num(size_t) | value | void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { auto vlog_name = get_vlog_name(vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); // FIXME: remove value size uint16_t value_size; memcpy(&value_size, value_buff, sizeof(uint16_t)); value_size &= VALUE_SIZE_MASK; *value = std::string(value_buff); // value->assign(&value_buff[sizeof(uint16_t)], value_size); handler.close(); } void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { auto vlog_name = get_vlog_name(vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(value_offset); const char *value_buff = value.data(); handler.write(value_buff, value.size()); auto vinfo = get_vlog_info(vlog_num); handler.flush(); handler.close(); } void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { auto vinfo = get_vlog_info(vlog_num); auto vlog_name = get_vlog_name(vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); uint16_t value_size; memcpy(&value_size, value_buff, sizeof(uint16_t)); if (value_size & VALUE_DELE_MASK) { // case when value has been deleted handler.close(); return ; } assert(!(value_size & VALUE_DELE_MASK)); uint16_t masked_value_size = value_size | VALUE_DELE_MASK; memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); handler.write(value_buff, value_size); handler.flush(); handler.close(); // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too // auto vinfo = get_vlog_info(vlog_num); vinfo->discard ++; vinfo->value_nums --; vinfo->curr_size -= value_size & VALUE_SIZE_MASK; if (vlog_need_gc(vlog_num)) { // create new vlog vinfo->vlog_valid_ = false; vinfo->vlog_num_for_gc = register_new_vlog(); // vinfo->vlog_valid_ = false; vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); } }