// // Created by 马也驰 on 2024/12/29. // #include "vlog_set.h" #include "../db/vlog_gc.h" #include <thread> // config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | // vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | // value: { value_len(uint16_t) | slot_num(size_t) | value } // no bugs VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) { config_file_name = get_config_file_name(); auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out); if (!config_file_.is_open()) { // config 文件不存在,尝试创建 config_file_ = std::fstream(config_file_name, std::ios::out); config_file_.close(); // 重新以读写模式打开 config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out); config_file_.seekp(0); size_t tmp = 0; config_file_.write(reinterpret_cast<const char*>(&tmp), sizeof(size_t)); config_file_.flush(); this->vlog_nums_ = 0; register_new_vlog(); } else { // config 文件存在 size_t _vlog_nums_; config_file_.seekp(0); config_file_.read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t)); this->vlog_nums_ = _vlog_nums_; // 从 config file 中读取所有有效的vlog名字 config_file_.seekp(sizeof(size_t)); size_t tmp[_vlog_nums_]; config_file_.read(reinterpret_cast<char*>(tmp), sizeof(tmp)); for (auto i = 0; i < _vlog_nums_; i++) { size_t curr_vlog_num = tmp[i]; if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) { auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out); size_t curr_vlog_header[2]; curr_vlog.seekp(0); curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t)); restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0])); } } } if (!config_file_.is_open()) { std::cerr << "Failed to open or create the db config file: " << config_file_name << std::endl; std::exit(EXIT_FAILURE); } config_file_.close(); if (USING_VLOG_CACHE) { vlog_cache = new VlogCache(); } } VlogSet::~VlogSet() { // wait for all gc threads to finish while (true) { finished_latch_.lock(); if (!finished) { finished_latch_.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(2)); } else { finished_latch_.unlock(); break; } } auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out); config_file_.seekp(0); config_file_.write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t)); config_file_.flush(); config_file_.close(); mtx.lock(); for (auto & it : vlog_info_map_) { struct vlog_info* vinfo = it.second; auto vlog_handler = get_vlog_handler(vinfo->vlog_num); vinfo->vlog_info_latch_.lock(); // FIXME: SIGSEGV vlog_handler->vlog_latch_.hard_lock(); // 所有操作都已经同步完成 // 使用 vlog_info* 进行操作 size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; vinfo->vlog_info_latch_.unlock(); auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); vlog_file_handler.seekp(0); vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp)); vlog_file_handler.flush(); vlog_file_handler.close(); std::cout << "vlog_set.cpp line 102" << std::endl; vlog_handler->vlog_latch_.hard_unlock(); } mtx.unlock(); std::cout << "vlog_set.cpp line 106" << std::endl; delete vlog_gc; if (USING_VLOG_CACHE) { delete vlog_cache; } } // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | void VlogSet::get_value(const struct slot_content &sc, std::string *value) { mtx.lock(); auto vinfo = get_vlog_info(sc.vlog_num); auto vhandler = get_vlog_handler(sc.vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.soft_lock(); vhandler->incre_access_thread_nums(); // FIXME: increase thread nums mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); read_vlog_value(sc, value); vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums vhandler->vlog_latch_.soft_unlock(); } struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { for (auto & it : vlog_info_map_) { struct vlog_info* vinfo = it.second; // 使用 vlog_info* 进行操作 vinfo->vlog_info_latch_.lock(); if (vinfo->vlog_valid_ && !vinfo->processing_gc && vinfo->curr_size+value_size <= VLOG_SIZE) { vinfo->vlog_info_latch_.unlock(); return vinfo; } vinfo->vlog_info_latch_.unlock(); } // 所有vlog已满,创建新vlog return nullptr; } // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { mtx.lock(); auto vinfo = get_writable_vlog_info(value.size()); if (!vinfo) { // vlog全部已满,创建新的vlog auto _vlog_num_ = register_new_vlog(); vinfo = get_vlog_info(_vlog_num_); } vinfo->vlog_info_latch_.lock(); sc.vlog_num = vinfo->vlog_num; sc.value_offset = vinfo->curr_size; vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t); vinfo->value_nums ++; auto vhandler = get_vlog_handler(vinfo->vlog_num); if (!vinfo->vlog_valid_ || vinfo->processing_gc) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.hard_lock(); vhandler->incre_access_thread_nums(); // FIXME: increase thread nums mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); write_vlog_value(sc, slot_num, value); vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums vhandler->vlog_latch_.hard_unlock(); } void VlogSet::del_value(const struct slot_content &sc) { mtx.lock(); auto vinfo = get_vlog_info(sc.vlog_num); auto vhandler = get_vlog_handler(sc.vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_ || vinfo->processing_gc) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.hard_lock(); vhandler->incre_access_thread_nums(); // FIXME: increase thread nums mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); mark_del_value(sc); vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums vhandler->vlog_latch_.hard_unlock(); } size_t VlogSet::register_new_vlog() { // FIXME: concurrency on config file size_t vn = vlog_nums_; std::string vlog_name = get_vlog_name(vn); register_inconfig_file(vn); create_vlog(vn); register_vlog_inmaps(vn, vlog_name); vlog_nums_ ++; return vn; } // only used in vlog gc void VlogSet::remove_old_vlog(size_t old_vlog_num) { // after gc, new_vlog has been created // new vlog should have been created here mtx.lock(); auto vi_old = get_vlog_info(old_vlog_num); auto vh_old = get_vlog_handler(old_vlog_num); vi_old->vlog_info_latch_.lock(); // FIXME: dead lock while (!vh_old->non_access_thread()) { std::this_thread::sleep_for(std::chrono::milliseconds(2)); std::cout << "waiting in remove_old_vlog" << std::endl; } std::string old_vlog_name = get_vlog_name(old_vlog_num); remove_from_config_file(old_vlog_num); remove_vlog_file(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!! vi_old->vlog_valid_ = false; vi_old->vlog_info_latch_.unlock(); vlog_nums_ --; mtx.unlock(); } bool VlogSet::vlog_need_gc(size_t vlog_num) { // TODO: need to judge whether vlog is full auto vi = get_vlog_info(vlog_num); if ((double)(vi->curr_size*1.0)/VLOG_SIZE >= VLOG_GC_VOLUM_THRESHOLD && (double)(vi->curr_size*1.0)/VLOG_SIZE < VLOG_GC_THREHOLD) { return false; } bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); return retval; } void VlogSet::register_inconfig_file(size_t vlog_num) { // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | // first size_t in config file indicates current vlog_nums_(size_t) // second size_t in config file indicates current vlog_count_(size_t) config_file_latch_.lock(); auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out); config_file_.seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); config_file_.write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t)); config_file_.flush(); config_file_.close(); config_file_latch_.unlock(); } void VlogSet::remove_from_config_file(size_t vlog_num) { // FIXME: concurrency on config file config_file_latch_.lock(); size_t tmp[vlog_nums_]; auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out); config_file_.seekp(sizeof(size_t)); config_file_.read(reinterpret_cast<char*>(tmp), sizeof(tmp)); for (auto i = 0; i < vlog_nums_; i++) { size_t curr_vlog_num = tmp[i]; if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) { curr_vlog_num |= CONFIG_FILE_DELE_MASK; config_file_.seekp(sizeof(size_t) + i*sizeof(size_t)); config_file_.write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t)); config_file_.flush(); break; } } config_file_.close(); config_file_latch_.unlock(); } void VlogSet::create_vlog(size_t vlog_num) { auto vlog_name = get_vlog_name(vlog_num); auto vlog_new = std::fstream(vlog_name, std::ios::out); vlog_new.close(); // 重新以读写模式打开 vlog_new = std::fstream(vlog_name, std::ios::in | std::ios::out); if (!vlog_new.is_open()) { std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl; std::exit(EXIT_FAILURE); } size_t tmp[2] = {2*sizeof(size_t), 0}; vlog_new.seekp(0); vlog_new.write(reinterpret_cast<const char*>(tmp), sizeof(tmp)); vlog_new.flush(); vlog_new.close(); } inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { auto vlog_name = get_vlog_name(vi->vlog_num); vlog_info_map_[vlog_name] = vi; vlog_handler_map_[vlog_name] = new vlog_handler(); } inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { auto vinfo = new vlog_info(vlog_num); auto vhandler = new vlog_handler(); vlog_info_map_[vlog_name] = vinfo; vlog_handler_map_[vlog_name] = vhandler; } void VlogSet::remove_vlog_file(std::string &vlog_name) { assert(!std::remove(vlog_name.c_str())); // vlog_handler_map_.erase(vlog_name); } inline std::string VlogSet::get_config_file_name() { return dbname + "_config_file"; } std::string VlogSet::get_vlog_name(size_t vlog_num) { return dbname + "_vlog_" + std::to_string(vlog_num); } struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) { return vlog_info_map_[get_vlog_name(vlog_num)]; } struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { return vlog_handler_map_[get_vlog_name(vlog_num)]; } size_t VlogSet::serialize_data(const leveldb::Slice &buff, size_t slot_num, std::string &value) { const char *value_buff = buff.data(); const size_t off = sizeof(uint16_t) + sizeof(size_t); const size_t value_size = off + buff.size(); char data[value_size]; memcpy(data, &value_size, sizeof(uint16_t)); memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); memcpy(data+off, value_buff, buff.size()); value = std::string(data, value_size); return value_size; } void VlogSet::deserialize_data(char *buff, std::string &value) { uint16_t value_size; memcpy(&value_size, buff, sizeof(uint16_t)); if (value_size & VALUE_DELE_MASK) { value = ""; return ; } value_size &= VALUE_SIZE_MASK; assert(value_size <= VALUE_BUFF_SIZE); const size_t off = sizeof(uint16_t)+sizeof(size_t); value = std::string(&buff[off], value_size-off); } void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) { if (USING_VLOG_CACHE) { read_vlog_value_cache(sc, value); } else { read_vlog_value_direct(sc, value); } } void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { if (USING_VLOG_CACHE) { write_vlog_value_cache(sc, slot_num, value); } else { write_vlog_value_direct(sc, slot_num, value); } } uint16_t VlogSet::delete_vlog_value(const struct slot_content &sc) { uint16_t value_size; if (USING_VLOG_CACHE) { value_size = delete_vlog_value_cache(sc); } else { value_size = delete_vlog_value_direct(sc); } return value_size; } // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | void VlogSet::read_vlog_value_direct(const struct slot_content &sc, std::string *value) { auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(sc.value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); handler.close(); std::string vlog_value; deserialize_data(value_buff, vlog_value); *value = vlog_value; // uint16_t value_size; // memcpy(&value_size, value_buff, sizeof(uint16_t)); // if (value_size & VALUE_DELE_MASK) { // *value = ""; // return ; // } // value_size &= VALUE_SIZE_MASK; // assert(value_size <= VALUE_BUFF_SIZE); // // const size_t off = sizeof(uint16_t)+sizeof(size_t); // *value = std::string(&value_buff[off], value_size-off); } // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | void VlogSet::write_vlog_value_direct(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(sc.value_offset); std::string vlog_value; auto value_size = serialize_data(value, slot_num, vlog_value); const char *data = vlog_value.data(); // const char *value_buff = value.data(); // // const size_t off = sizeof(uint16_t) + sizeof(size_t); // const size_t value_size = off + value.size(); // char data[value_size]; // memcpy(data, &value_size, sizeof(uint16_t)); // memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); // memcpy(data+off, value_buff, value.size()); handler.write(data, value_size); handler.flush(); handler.close(); } uint16_t VlogSet::delete_vlog_value_direct(const struct slot_content &sc) { auto vinfo = get_vlog_info(sc.vlog_num); auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); handler.seekp(sc.value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); uint16_t value_size; memcpy(&value_size, value_buff, sizeof(uint16_t)); if (value_size & VALUE_DELE_MASK) { // case when value has been deleted handler.close(); return value_size & VALUE_SIZE_MASK; } assert(!(value_size & VALUE_DELE_MASK)); uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK; memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); handler.seekp(sc.value_offset); handler.write(value_buff, sizeof(uint16_t)); handler.flush(); handler.close(); return value_size; } void VlogSet::read_vlog_value_cache(const struct slot_content &sc, std::string *value) { auto vlog_name = get_vlog_name(sc.vlog_num); std::string tmp_value; vlog_cache->read_data(vlog_name, tmp_value, sc.value_offset, sizeof(uint16_t)); uint16_t value_len; memcpy(&value_len, tmp_value.data(), sizeof(uint16_t)); if (value_len & VALUE_DELE_MASK) { *value = ""; return ; } assert(!(value_len & VALUE_DELE_MASK)); std::string value_str; vlog_cache->read_data(vlog_name, value_str, sc.value_offset, value_len); std::string vlog_value; char *value_buff = const_cast<char*>(value_str.data()); deserialize_data(value_buff, vlog_value); *value = vlog_value; } void VlogSet::write_vlog_value_cache(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { auto vlog_name = get_vlog_name(sc.vlog_num); auto value_str = value.ToString(); std::string vlog_value; auto value_size = serialize_data(value.data(), slot_num, vlog_value); vlog_cache->write_data(vlog_name, vlog_value, sc.value_offset, value_size); } uint16_t VlogSet::delete_vlog_value_cache(const struct slot_content &sc) { auto vlog_name = get_vlog_name(sc.vlog_num); return vlog_cache->delete_data(vlog_name, sc.value_offset); } // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | void VlogSet::mark_del_value(const struct slot_content &sc) { auto vinfo = get_vlog_info(sc.vlog_num); auto value_size = delete_vlog_value(sc); // auto vlog_name = get_vlog_name(sc.vlog_num); // auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); // handler.seekp(sc.value_offset); // char value_buff[VALUE_BUFF_SIZE]; // handler.read(value_buff, VALUE_BUFF_SIZE); // // uint16_t value_size; // memcpy(&value_size, value_buff, sizeof(uint16_t)); // if (value_size & VALUE_DELE_MASK) { // // case when value has been deleted // handler.close(); // return ; // } // assert(!(value_size & VALUE_DELE_MASK)); // uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK; // memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); // handler.seekp(sc.value_offset); // handler.write(value_buff, sizeof(uint16_t)); // handler.flush(); // handler.close(); // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too vinfo->discard ++; vinfo->value_nums --; vinfo->curr_size -= value_size; // FIXME: gc process, avoid repeated gc if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) { // create new vlog vinfo->processing_gc = true; vinfo->vlog_num_for_gc = register_new_vlog(); vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc); } }