// // Created by 马也驰 on 2024/12/26. // #ifndef LEVELDB_VLOG_H #define LEVELDB_VLOG_H #include #include #include #include #include #include "../include/leveldb/slice.h" #include "../db/slotpage.h" #include "../db/shared_lock.h" // config file: || vlog_nums_(size_t) || vlog_1_num(size_t) | vlog_2_num(size_t) | ... | // vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... | // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | // {field_nums(uint16_t), attr1, attr2, ... } is wrapped/unwrapped in dm_impl // || value_size(uint16_t) | slot_num(size_t) || VALUE: {field_nums(uint16_t), attr1, attr2, ... } | is wrapped/unwrapped in vlog_set #define vlog_num_size sizeof(size_t) #define KiB 1024 #define MiB 1024*KiB #define VLOG_SIZE 1*MiB // origin: 32*MiB #define GC_THREDHOLD 0.5 #define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number #define VALUE_SIZE_MASK 0x7fff #define VALUE_DELE_MASK 0x8000 //struct vlog_name { // int name_size = 0; // char name[vlog_name_size]; // vlog_name() { // memset(name, '\n', sizeof(name)); // } // vlog_name(std::string &_name) { // memset(name, '\n', sizeof(name)); // memcpy(name, _name.c_str(), _name.size()); // name_size = _name.size(); // } //}; struct vlog_info { std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改 size_t vlog_num; size_t vlog_num_for_gc; // set when start gc bool processing_gc;; // init to be false, set as true when processing gc bool vlog_valid_; // init to be true, set as false after deleted size_t discard; size_t value_nums; size_t curr_size; vlog_info(size_t vlog_num) : processing_gc(false), discard(0), value_nums(0), vlog_num(vlog_num), curr_size(2*sizeof(size_t)), vlog_valid_(true) {} vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : processing_gc(false), discard(0), value_nums(value_nums), vlog_num(vlog_num), curr_size(curr_size), vlog_valid_(true) {} }; struct vlog_handler { SharedLock vlog_latch_; // 表明当前vlog上的并发情况,读上soft_lock,写上hard_lock vlog_handler() {} }; //class VlogSet; // // //class VlogGC { //#define THREADNUM 8 // public: // VlogGC(SlotPage *s, VlogSet *vs) : thread_pool_(new ThreadPool(THREADNUM)), // slot_page_(s), vlog_set(vs) {} // public: // void do_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, // struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info){ // thread_pool_->enqueue([this, old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info]() // { this->exec_gc(old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); }); // } // // private: // void exec_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, // struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info) { // // auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); // auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); // // size_t value_nums = old_vlog_info->value_nums; // char old_vlog_buff[VLOG_SIZE]; // char new_vlog_buff[VLOG_SIZE]; // old_vlog.seekp(0); // old_vlog.read(old_vlog_buff, VLOG_SIZE); // memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); // // size_t ovb_off = sizeof(size_t); // size_t nvb_off = sizeof(size_t); // for (auto i = 0; i < value_nums; i++) { // char *value = &old_vlog_buff[ovb_off]; // uint16_t value_len = get_value_len(value); // size_t slot_num = get_value_slotnum(value); // if (!value_deleted(value_len)) { // memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); // struct slot_content scn(new_vlog_info->vlog_num, nvb_off); // slot_page_->set_slot(slot_num, &scn); // nvb_off += value_len; // } // ovb_off += value_len; // } // new_vlog.write(new_vlog_buff, VLOG_SIZE); // new_vlog.flush(); // // old_vlog.close(); // new_vlog.close(); // // this->vlog_set->remove_from_config_file(old_vlog_name); // } // private: // inline bool value_deleted(uint16_t value_len) { // return !(value_len >> 15); // } // inline uint16_t get_value_len(char *value) { // uint16_t value_len; // memcpy(&value_len, value, sizeof(uint16_t)); // return value_len; // } // inline size_t get_value_slotnum(char *value) { // size_t slot_num; // memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); // return slot_num; // } // private: // SlotPage *slot_page_; // ThreadPool *thread_pool_; // VlogSet *vlog_set; // inline void remove_vlog_from_maps(std::string &vlog_name) //}; //class VlogSet { //// value: value_len(uint16_t) | slot_num(size_t) | value //// vlog: curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | //#define VALUE_BUFF_SIZE 0x7fff //#define VALUE_SIZE_MASK 0x7fff //#define VALUE_DELE_MASK 0x8000 // //friend class VlogGC; // //public: // void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { // mtx.lock(); // auto vinfo = get_vlog_info(vlog_num); // auto vhandler = get_vlog_handler(vlog_num); // // vinfo->vlog_info_latch_.lock(); // if (!vinfo->vlog_valid_) { // vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); // } // vhandler->vlog_latch_.soft_lock(); // vinfo->vlog_info_latch_.unlock(); // read_vlog_value(vlog_num, value_offset, value); // vhandler->vlog_latch_.soft_unlock(); // mtx.unlock(); // } // void put_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { // mtx.lock(); // auto vinfo = get_vlog_info(vlog_num); // auto vhandler = get_vlog_handler(vlog_num); // // vinfo->vlog_info_latch_.lock(); // if (!vinfo->vlog_valid_) { // vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); // } // vhandler->vlog_latch_.hard_lock(); // vinfo->vlog_info_latch_.unlock(); // write_vlog_value(vlog_num, value_offset, value); // vhandler->vlog_latch_.hard_unlock(); // mtx.unlock(); // } // void del_value(uint32_t vlog_num, uint32_t value_offset) { // mtx.lock(); // auto vinfo = get_vlog_info(vlog_num); // auto vhandler = get_vlog_handler(vlog_num); // // vinfo->vlog_info_latch_.lock(); // if (!vinfo->vlog_valid_) { // vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); // } // vhandler->vlog_latch_.hard_lock(); // mark_del_value(vlog_num, value_offset); // // vinfo->vlog_info_latch_.unlock(); // vhandler->vlog_latch_.hard_unlock(); // mtx.unlock(); // } // //private: // size_t register_new_vlog() { // size_t vn = vlog_nums_; // std::string vlog_name = get_vlog_name(vn); // register_inconfig_file(vlog_name); // create_vlog(vlog_name); // auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); // if (!vlog_new->is_open()) { // std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; // std::exit(EXIT_FAILURE); // } // register_vlog_inmaps(vn, vlog_name, vlog_new); // vlog_nums_ ++; // vlog_count_ ++; // return vn; // } // void remove_old_vlog(size_t old_vlog_num) { // // after gc, new_vlog has been created // // new vlog should have been created here ! // mtx.lock(); // auto vi_old = get_vlog_info(old_vlog_num); // vi_old->vlog_info_latch_.lock(); // std::string old_vlog_name = get_vlog_name(old_vlog_num); // remove_from_config_file(old_vlog_name); // remove_vlog_from_maps(old_vlog_name); // vi_old->vlog_valid_ = false; // vi_old->vlog_info_latch_.unlock(); // vlog_count_ --; // mtx.unlock(); // } // bool vlog_need_gc(size_t vlog_num) { // std::string vlog_name = get_vlog_name(vlog_num); // auto vi = vlog_info_map_[vlog_name]; // bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); // return retval; // } // //private: // void register_inconfig_file(std::string &vlog_name) { // struct vlog_name vn(vlog_name); // // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | // // first size_t in config file indicates current vlog_nums_(size_t) // // second size_t in config file indicates current vlog_count_(size_t) // config_file_->seekp(2*sizeof(size_t) + vlog_nums_* vlog_name_size); // config_file_->write(vn.name, vn.name_size); // config_file_->flush(); // } // void remove_from_config_file(std::string &vlog_name) { // char tmp[vlog_name_size]; // size_t index = 0; // config_file_->seekp(2*sizeof(size_t)); // while (config_file_->read(tmp, vlog_name_size) || config_file_->gcount() > 0) { // size_t length = 0; // while (length < config_file_->gcount() && tmp[length] != '\n') { // ++length; // } // std::string name(tmp, length); // if (name == vlog_name) { // memset(tmp, '\n', sizeof(tmp)); // config_file_->seekp(2*sizeof(size_t) + index * vlog_name_size); // config_file_->write(tmp, vlog_name_size); // config_file_->flush(); // break; // } // index ++; // } // } // void create_vlog(std::string &vlog_name) { // std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); // char tmp[2*sizeof(size_t)]; // memset(tmp, 0, sizeof(tmp)); // vlog_new->write(tmp, sizeof(tmp)); // vlog_new->flush(); // vlog_new->close(); // } // inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name, std::fstream *handler) { // vlog_info_map_[vlog_name] = new vlog_info(vlog_num); // vlog_handler_map_[vlog_name] = new vlog_handler(); // } // inline void remove_vlog_from_maps(std::string &vlog_name) { // auto vi = vlog_info_map_[vlog_name]; //// auto vh = vlog_handler_map_[vlog_name]; // vi->vlog_info_latch_.lock(); // vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag //// vh->handler_->close(); // assert(!std::remove(vlog_name.c_str())); // vlog_handler_map_.erase(vlog_name); // vi->vlog_info_latch_.unlock(); // } // inline std::string get_vlog_name(size_t vlog_num) { // return dbname + "_vlog_" + std::to_string(vlog_num); // } // inline struct vlog_info *get_vlog_info(size_t vlog_num) { // return vlog_info_map_[get_vlog_name(vlog_num)]; // } // inline struct vlog_handler *get_vlog_handler(size_t vlog_num) { // return vlog_handler_map_[get_vlog_name(vlog_num)]; // } // void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { // auto vlog_name = get_vlog_name(vlog_num); // auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); // handler.seekp(value_offset); // char value_buff[VALUE_BUFF_SIZE]; // handler.read(value_buff, VALUE_BUFF_SIZE); // // uint16_t value_size; // memcpy(&value_size, value_buff, sizeof(uint16_t)); // value_size &= VALUE_SIZE_MASK; // value->assign(&value_buff[sizeof(uint16_t)], value_size); // handler.close(); // } // void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { // auto vlog_name = get_vlog_name(vlog_num); // auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); // handler.seekp(value_offset); // const char *value_buff = value.data(); // handler.write(value_buff, value.size()); // // auto vinfo = get_vlog_info(vlog_num); // vinfo->value_nums ++; // vinfo->curr_size += value.size(); // // handler.flush(); // handler.close(); // } // void mark_del_value(uint32_t vlog_num, uint32_t value_offset) { // auto vinfo = get_vlog_info(vlog_num); // auto vlog_name = get_vlog_name(vlog_num); // auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); // handler.seekp(value_offset); // char value_buff[VALUE_BUFF_SIZE]; // handler.read(value_buff, VALUE_BUFF_SIZE); // // uint16_t value_size; // memcpy(&value_size, value_buff, sizeof(uint16_t)); // if (value_size & VALUE_DELE_MASK) { // // case when value has been deleted // handler.close(); // return ; // } // assert(!(value_size & VALUE_DELE_MASK)); // uint16_t masked_value_size = value_size & 0xffff; // memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); // handler.write(value_buff, value_size); // handler.flush(); // handler.close(); // // // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too //// auto vinfo = get_vlog_info(vlog_num); // vinfo->discard ++; // vinfo->value_nums --; // vinfo->curr_size -= value_size & VALUE_SIZE_MASK; // if (vlog_need_gc(vlog_num)) { // // create new vlog // vinfo->vlog_num_for_gc = register_new_vlog(); //// vinfo->vlog_valid_ = false; // auto old_vlog_name = get_vlog_name(vlog_num); // auto new_vlog_name = get_vlog_name(vinfo->vlog_num_for_gc); // auto old_vlog_info = vinfo; // auto new_vlog_info = get_vlog_info(vinfo->vlog_num_for_gc); // vlog_gc->do_gc(vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); // } // } // // //private: // std::mutex mtx; // std::string dbname; // size_t vlog_nums_; // size_t vlog_count_; // std::unordered_map vlog_info_map_; // TODO: 读到vlog已经失效时,要返回slotpage重新读取slotnum // std::unordered_map vlog_handler_map_; // std::fstream *config_file_; // // VlogGC *vlog_gc; // init with slot_page //// SlotPage *slot_page; //}; #endif // LEVELDB_VLOG_H