@ -0,0 +1,52 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/28. | |||||
// | |||||
#ifndef LEVELDB_SHARED_LOCK_H | |||||
#define LEVELDB_SHARED_LOCK_H | |||||
#include <mutex> | |||||
class SharedLock { | |||||
public: | |||||
SharedLock() : read_count_(0) {} | |||||
public: | |||||
void soft_lock() { | |||||
write_latch_.lock(); | |||||
read_count_latch_.lock(); | |||||
if (!read_count_) { | |||||
read_latch_.lock(); | |||||
} | |||||
read_count_ ++; | |||||
read_count_latch_.unlock(); | |||||
write_latch_.unlock(); | |||||
} | |||||
void soft_unlock() { | |||||
read_count_latch_.lock(); | |||||
read_count_ --; | |||||
if (!read_count_) { | |||||
read_latch_.unlock(); | |||||
} | |||||
read_count_latch_.unlock(); | |||||
} | |||||
void hard_lock() { | |||||
read_latch_.lock(); | |||||
write_latch_.lock(); | |||||
read_latch_.unlock(); | |||||
} | |||||
void hard_unlock() { | |||||
write_latch_.unlock(); | |||||
} | |||||
private: | |||||
std::mutex read_latch_; // indicate if any read is proceeding | |||||
std::mutex read_count_latch_; | |||||
volatile size_t read_count_; | |||||
std::mutex write_latch_; | |||||
// volatile size_t write_count_; | |||||
}; | |||||
#endif // LEVELDB_SHARED_LOCK_H |
@ -0,0 +1,67 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/28. | |||||
// | |||||
#ifndef LEVELDB_THREADPOOL_H | |||||
#define LEVELDB_THREADPOOL_H | |||||
#include <iostream> | |||||
#include <vector> | |||||
#include <thread> | |||||
#include <queue> | |||||
#include <functional> | |||||
#include <condition_variable> | |||||
#include <atomic> | |||||
class ThreadPool { | |||||
public: | |||||
explicit ThreadPool(size_t numThreads) : stop(false) { | |||||
for (size_t i = 0; i < numThreads; ++i) { | |||||
workers.emplace_back([this]() { | |||||
while (true) { | |||||
std::function<void()> task; | |||||
{ | |||||
std::unique_lock<std::mutex> lock(queueMutex); | |||||
condition.wait(lock, [this]() { return stop || !tasks.empty(); }); | |||||
if (stop && tasks.empty()) return; | |||||
task = std::move(tasks.front()); | |||||
tasks.pop(); | |||||
} | |||||
task(); | |||||
} | |||||
}); | |||||
} | |||||
} | |||||
~ThreadPool() { | |||||
{ | |||||
std::unique_lock<std::mutex> lock(queueMutex); | |||||
stop = true; | |||||
} | |||||
condition.notify_all(); | |||||
for (std::thread &worker : workers) { | |||||
worker.join(); | |||||
} | |||||
} | |||||
void enqueue(std::function<void()> task) { | |||||
{ | |||||
std::unique_lock<std::mutex> lock(queueMutex); | |||||
tasks.push(std::move(task)); | |||||
} | |||||
condition.notify_one(); | |||||
} | |||||
private: | |||||
std::vector<std::thread> workers; | |||||
std::queue<std::function<void()>> tasks; | |||||
std::mutex queueMutex; | |||||
std::condition_variable condition; | |||||
std::atomic<bool> stop; | |||||
}; | |||||
#endif // LEVELDB_THREADPOOL_H |
@ -0,0 +1,378 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/26. | |||||
// | |||||
#ifndef LEVELDB_VLOG_H | |||||
#define LEVELDB_VLOG_H | |||||
#include <cstdlib> | |||||
#include <fstream> | |||||
#include <iostream> | |||||
#include <vector> | |||||
#include <cstdio> | |||||
#include "../db/threadpool.h" | |||||
#include "../include/leveldb/slice.h" | |||||
#include "../db/slotpage.h" | |||||
#include "../db/shared_lock.h" | |||||
// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... | | |||||
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||||
// value: value_len(uint16_t) | slot_num(size_t) | value | | |||||
#define vlog_num_size sizeof(size_t) | |||||
#define vlog_name_size 256 | |||||
#define KiB 1024 | |||||
#define MiB 1024*KiB | |||||
#define VLOG_SIZE 32*MiB | |||||
#define GC_THREDHOLD 0.5 | |||||
#define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number | |||||
#define VALUE_SIZE_MASK 0x7fff | |||||
#define VALUE_DELE_MASK 0x8000 | |||||
//struct vlog_name { | |||||
// int name_size = 0; | |||||
// char name[vlog_name_size]; | |||||
// vlog_name() { | |||||
// memset(name, '\n', sizeof(name)); | |||||
// } | |||||
// vlog_name(std::string &_name) { | |||||
// memset(name, '\n', sizeof(name)); | |||||
// memcpy(name, _name.c_str(), _name.size()); | |||||
// name_size = _name.size(); | |||||
// } | |||||
//}; | |||||
struct vlog_info { | |||||
std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改 | |||||
size_t vlog_num; | |||||
size_t vlog_num_for_gc; // set when start gc | |||||
bool vlog_valid_; | |||||
size_t discard; | |||||
size_t value_nums; | |||||
size_t curr_size; | |||||
vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0), | |||||
vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {} | |||||
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums), | |||||
vlog_num(vlog_num), curr_size(curr_size) {} | |||||
}; | |||||
struct vlog_handler { | |||||
SharedLock vlog_latch_; // 表明当前vlog上的并发情况,读上soft_lock,写上hard_lock | |||||
vlog_handler() {} | |||||
}; | |||||
//class VlogSet; | |||||
// | |||||
// | |||||
//class VlogGC { | |||||
//#define THREADNUM 8 | |||||
// public: | |||||
// VlogGC(SlotPage *s, VlogSet *vs) : thread_pool_(new ThreadPool(THREADNUM)), | |||||
// slot_page_(s), vlog_set(vs) {} | |||||
// public: | |||||
// void do_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, | |||||
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info){ | |||||
// thread_pool_->enqueue([this, old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info]() | |||||
// { this->exec_gc(old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); }); | |||||
// } | |||||
// | |||||
// private: | |||||
// void exec_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, | |||||
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info) { | |||||
// | |||||
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); | |||||
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); | |||||
// | |||||
// size_t value_nums = old_vlog_info->value_nums; | |||||
// char old_vlog_buff[VLOG_SIZE]; | |||||
// char new_vlog_buff[VLOG_SIZE]; | |||||
// old_vlog.seekp(0); | |||||
// old_vlog.read(old_vlog_buff, VLOG_SIZE); | |||||
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); | |||||
// | |||||
// size_t ovb_off = sizeof(size_t); | |||||
// size_t nvb_off = sizeof(size_t); | |||||
// for (auto i = 0; i < value_nums; i++) { | |||||
// char *value = &old_vlog_buff[ovb_off]; | |||||
// uint16_t value_len = get_value_len(value); | |||||
// size_t slot_num = get_value_slotnum(value); | |||||
// if (!value_deleted(value_len)) { | |||||
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); | |||||
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off); | |||||
// slot_page_->set_slot(slot_num, &scn); | |||||
// nvb_off += value_len; | |||||
// } | |||||
// ovb_off += value_len; | |||||
// } | |||||
// new_vlog.write(new_vlog_buff, VLOG_SIZE); | |||||
// new_vlog.flush(); | |||||
// | |||||
// old_vlog.close(); | |||||
// new_vlog.close(); | |||||
// | |||||
// this->vlog_set->remove_from_config_file(old_vlog_name); | |||||
// } | |||||
// private: | |||||
// inline bool value_deleted(uint16_t value_len) { | |||||
// return !(value_len >> 15); | |||||
// } | |||||
// inline uint16_t get_value_len(char *value) { | |||||
// uint16_t value_len; | |||||
// memcpy(&value_len, value, sizeof(uint16_t)); | |||||
// return value_len; | |||||
// } | |||||
// inline size_t get_value_slotnum(char *value) { | |||||
// size_t slot_num; | |||||
// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); | |||||
// return slot_num; | |||||
// } | |||||
// private: | |||||
// SlotPage *slot_page_; | |||||
// ThreadPool *thread_pool_; | |||||
// VlogSet *vlog_set; // inline void remove_vlog_from_maps(std::string &vlog_name) | |||||
//}; | |||||
//class VlogSet { | |||||
//// value: value_len(uint16_t) | slot_num(size_t) | value | |||||
//// vlog: curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||||
//#define VALUE_BUFF_SIZE 0x7fff | |||||
//#define VALUE_SIZE_MASK 0x7fff | |||||
//#define VALUE_DELE_MASK 0x8000 | |||||
// | |||||
//friend class VlogGC; | |||||
// | |||||
//public: | |||||
// void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||||
// mtx.lock(); | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
// auto vhandler = get_vlog_handler(vlog_num); | |||||
// | |||||
// vinfo->vlog_info_latch_.lock(); | |||||
// if (!vinfo->vlog_valid_) { | |||||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
// } | |||||
// vhandler->vlog_latch_.soft_lock(); | |||||
// vinfo->vlog_info_latch_.unlock(); | |||||
// read_vlog_value(vlog_num, value_offset, value); | |||||
// vhandler->vlog_latch_.soft_unlock(); | |||||
// mtx.unlock(); | |||||
// } | |||||
// void put_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { | |||||
// mtx.lock(); | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
// auto vhandler = get_vlog_handler(vlog_num); | |||||
// | |||||
// vinfo->vlog_info_latch_.lock(); | |||||
// if (!vinfo->vlog_valid_) { | |||||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
// } | |||||
// vhandler->vlog_latch_.hard_lock(); | |||||
// vinfo->vlog_info_latch_.unlock(); | |||||
// write_vlog_value(vlog_num, value_offset, value); | |||||
// vhandler->vlog_latch_.hard_unlock(); | |||||
// mtx.unlock(); | |||||
// } | |||||
// void del_value(uint32_t vlog_num, uint32_t value_offset) { | |||||
// mtx.lock(); | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
// auto vhandler = get_vlog_handler(vlog_num); | |||||
// | |||||
// vinfo->vlog_info_latch_.lock(); | |||||
// if (!vinfo->vlog_valid_) { | |||||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
// } | |||||
// vhandler->vlog_latch_.hard_lock(); | |||||
// mark_del_value(vlog_num, value_offset); | |||||
// | |||||
// vinfo->vlog_info_latch_.unlock(); | |||||
// vhandler->vlog_latch_.hard_unlock(); | |||||
// mtx.unlock(); | |||||
// } | |||||
// | |||||
//private: | |||||
// size_t register_new_vlog() { | |||||
// size_t vn = vlog_nums_; | |||||
// std::string vlog_name = get_vlog_name(vn); | |||||
// register_inconfig_file(vlog_name); | |||||
// create_vlog(vlog_name); | |||||
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
// if (!vlog_new->is_open()) { | |||||
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; | |||||
// std::exit(EXIT_FAILURE); | |||||
// } | |||||
// register_vlog_inmaps(vn, vlog_name, vlog_new); | |||||
// vlog_nums_ ++; | |||||
// vlog_count_ ++; | |||||
// return vn; | |||||
// } | |||||
// void remove_old_vlog(size_t old_vlog_num) { | |||||
// // after gc, new_vlog has been created | |||||
// // new vlog should have been created here ! | |||||
// mtx.lock(); | |||||
// auto vi_old = get_vlog_info(old_vlog_num); | |||||
// vi_old->vlog_info_latch_.lock(); | |||||
// std::string old_vlog_name = get_vlog_name(old_vlog_num); | |||||
// remove_from_config_file(old_vlog_name); | |||||
// remove_vlog_from_maps(old_vlog_name); | |||||
// vi_old->vlog_valid_ = false; | |||||
// vi_old->vlog_info_latch_.unlock(); | |||||
// vlog_count_ --; | |||||
// mtx.unlock(); | |||||
// } | |||||
// bool vlog_need_gc(size_t vlog_num) { | |||||
// std::string vlog_name = get_vlog_name(vlog_num); | |||||
// auto vi = vlog_info_map_[vlog_name]; | |||||
// bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); | |||||
// return retval; | |||||
// } | |||||
// | |||||
//private: | |||||
// void register_inconfig_file(std::string &vlog_name) { | |||||
// struct vlog_name vn(vlog_name); | |||||
// // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | | |||||
// // first size_t in config file indicates current vlog_nums_(size_t) | |||||
// // second size_t in config file indicates current vlog_count_(size_t) | |||||
// config_file_->seekp(2*sizeof(size_t) + vlog_nums_* vlog_name_size); | |||||
// config_file_->write(vn.name, vn.name_size); | |||||
// config_file_->flush(); | |||||
// } | |||||
// void remove_from_config_file(std::string &vlog_name) { | |||||
// char tmp[vlog_name_size]; | |||||
// size_t index = 0; | |||||
// config_file_->seekp(2*sizeof(size_t)); | |||||
// while (config_file_->read(tmp, vlog_name_size) || config_file_->gcount() > 0) { | |||||
// size_t length = 0; | |||||
// while (length < config_file_->gcount() && tmp[length] != '\n') { | |||||
// ++length; | |||||
// } | |||||
// std::string name(tmp, length); | |||||
// if (name == vlog_name) { | |||||
// memset(tmp, '\n', sizeof(tmp)); | |||||
// config_file_->seekp(2*sizeof(size_t) + index * vlog_name_size); | |||||
// config_file_->write(tmp, vlog_name_size); | |||||
// config_file_->flush(); | |||||
// break; | |||||
// } | |||||
// index ++; | |||||
// } | |||||
// } | |||||
// void create_vlog(std::string &vlog_name) { | |||||
// std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); | |||||
// char tmp[2*sizeof(size_t)]; | |||||
// memset(tmp, 0, sizeof(tmp)); | |||||
// vlog_new->write(tmp, sizeof(tmp)); | |||||
// vlog_new->flush(); | |||||
// vlog_new->close(); | |||||
// } | |||||
// inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name, std::fstream *handler) { | |||||
// vlog_info_map_[vlog_name] = new vlog_info(vlog_num); | |||||
// vlog_handler_map_[vlog_name] = new vlog_handler(); | |||||
// } | |||||
// inline void remove_vlog_from_maps(std::string &vlog_name) { | |||||
// auto vi = vlog_info_map_[vlog_name]; | |||||
//// auto vh = vlog_handler_map_[vlog_name]; | |||||
// vi->vlog_info_latch_.lock(); | |||||
// vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag | |||||
//// vh->handler_->close(); | |||||
// assert(!std::remove(vlog_name.c_str())); | |||||
// vlog_handler_map_.erase(vlog_name); | |||||
// vi->vlog_info_latch_.unlock(); | |||||
// } | |||||
// inline std::string get_vlog_name(size_t vlog_num) { | |||||
// return dbname + "_vlog_" + std::to_string(vlog_num); | |||||
// } | |||||
// inline struct vlog_info *get_vlog_info(size_t vlog_num) { | |||||
// return vlog_info_map_[get_vlog_name(vlog_num)]; | |||||
// } | |||||
// inline struct vlog_handler *get_vlog_handler(size_t vlog_num) { | |||||
// return vlog_handler_map_[get_vlog_name(vlog_num)]; | |||||
// } | |||||
// void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||||
// auto vlog_name = get_vlog_name(vlog_num); | |||||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
// handler.seekp(value_offset); | |||||
// char value_buff[VALUE_BUFF_SIZE]; | |||||
// handler.read(value_buff, VALUE_BUFF_SIZE); | |||||
// | |||||
// uint16_t value_size; | |||||
// memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||||
// value_size &= VALUE_SIZE_MASK; | |||||
// value->assign(&value_buff[sizeof(uint16_t)], value_size); | |||||
// handler.close(); | |||||
// } | |||||
// void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { | |||||
// auto vlog_name = get_vlog_name(vlog_num); | |||||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
// handler.seekp(value_offset); | |||||
// const char *value_buff = value.data(); | |||||
// handler.write(value_buff, value.size()); | |||||
// | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
// vinfo->value_nums ++; | |||||
// vinfo->curr_size += value.size(); | |||||
// | |||||
// handler.flush(); | |||||
// handler.close(); | |||||
// } | |||||
// void mark_del_value(uint32_t vlog_num, uint32_t value_offset) { | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
// auto vlog_name = get_vlog_name(vlog_num); | |||||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
// handler.seekp(value_offset); | |||||
// char value_buff[VALUE_BUFF_SIZE]; | |||||
// handler.read(value_buff, VALUE_BUFF_SIZE); | |||||
// | |||||
// uint16_t value_size; | |||||
// memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||||
// if (value_size & VALUE_DELE_MASK) { | |||||
// // case when value has been deleted | |||||
// handler.close(); | |||||
// return ; | |||||
// } | |||||
// assert(!(value_size & VALUE_DELE_MASK)); | |||||
// uint16_t masked_value_size = value_size & 0xffff; | |||||
// memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); | |||||
// handler.write(value_buff, value_size); | |||||
// handler.flush(); | |||||
// handler.close(); | |||||
// | |||||
// // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too | |||||
//// auto vinfo = get_vlog_info(vlog_num); | |||||
// vinfo->discard ++; | |||||
// vinfo->value_nums --; | |||||
// vinfo->curr_size -= value_size & VALUE_SIZE_MASK; | |||||
// if (vlog_need_gc(vlog_num)) { | |||||
// // create new vlog | |||||
// vinfo->vlog_num_for_gc = register_new_vlog(); | |||||
//// vinfo->vlog_valid_ = false; | |||||
// auto old_vlog_name = get_vlog_name(vlog_num); | |||||
// auto new_vlog_name = get_vlog_name(vinfo->vlog_num_for_gc); | |||||
// auto old_vlog_info = vinfo; | |||||
// auto new_vlog_info = get_vlog_info(vinfo->vlog_num_for_gc); | |||||
// vlog_gc->do_gc(vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); | |||||
// } | |||||
// } | |||||
// | |||||
// | |||||
//private: | |||||
// std::mutex mtx; | |||||
// std::string dbname; | |||||
// size_t vlog_nums_; | |||||
// size_t vlog_count_; | |||||
// std::unordered_map<std::string, struct vlog_info *> vlog_info_map_; // TODO: 读到vlog已经失效时,要返回slotpage重新读取slotnum | |||||
// std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_; | |||||
// std::fstream *config_file_; | |||||
// | |||||
// VlogGC *vlog_gc; // init with slot_page | |||||
//// SlotPage *slot_page; | |||||
//}; | |||||
#endif // LEVELDB_VLOG_H |
@ -0,0 +1,68 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/29. | |||||
// | |||||
#include "vlog_gc.h" | |||||
#include "../db/vlog_set.h" | |||||
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { | |||||
thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]() | |||||
{ this->exec_gc(old_vlog_num, new_vlog_num); }); | |||||
} | |||||
void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { | |||||
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); | |||||
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); | |||||
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); | |||||
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); | |||||
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); | |||||
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); | |||||
old_vlog_handler->vlog_latch_.soft_lock(); | |||||
new_vlog_info->vlog_info_latch_.lock(); | |||||
new_vlog_handler->vlog_latch_.hard_lock(); | |||||
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); | |||||
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); | |||||
char old_vlog_buff[VLOG_SIZE]; | |||||
char new_vlog_buff[VLOG_SIZE]; | |||||
old_vlog.seekp(0); | |||||
old_vlog.read(old_vlog_buff, VLOG_SIZE); | |||||
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); | |||||
size_t value_nums = old_vlog_info->value_nums; | |||||
size_t ovb_off = 2 * sizeof(size_t); | |||||
size_t nvb_off = 2 * sizeof(size_t); | |||||
size_t new_vlog_value_nums = 0; | |||||
for (auto i = 0; i < value_nums; i++) { | |||||
char *value = &old_vlog_buff[ovb_off]; | |||||
uint16_t value_len = get_value_len(value); | |||||
size_t slot_num = get_value_slotnum(value); | |||||
if (!value_deleted(value_len)) { | |||||
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); | |||||
struct slot_content scn(new_vlog_info->vlog_num, nvb_off); | |||||
slot_page_->set_slot(slot_num, &scn); | |||||
nvb_off += value_len; | |||||
new_vlog_value_nums ++; | |||||
} | |||||
ovb_off += value_len; | |||||
} | |||||
new_vlog_info->value_nums = value_nums; | |||||
new_vlog_info->curr_size = nvb_off; | |||||
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); | |||||
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); | |||||
new_vlog.write(new_vlog_buff, VLOG_SIZE); | |||||
new_vlog.flush(); | |||||
old_vlog.close(); | |||||
new_vlog.close(); | |||||
old_vlog_handler->vlog_latch_.soft_unlock(); | |||||
new_vlog_info->vlog_info_latch_.unlock(); | |||||
new_vlog_handler->vlog_latch_.hard_unlock(); | |||||
vlog_set->remove_old_vlog(old_vlog_num); | |||||
} | |||||
@ -0,0 +1,46 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/29. | |||||
// | |||||
#ifndef LEVELDB_VLOG_GC_H | |||||
#define LEVELDB_VLOG_GC_H | |||||
#include <string> | |||||
#include <mutex> | |||||
#include "../db/slotpage.h" | |||||
#include "../db/threadpool.h" | |||||
#include "../db/vlog.h" | |||||
// 前向声明 VlogSet | |||||
class VlogSet; | |||||
class VlogGC { | |||||
public: | |||||
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {} | |||||
void do_gc(size_t old_vlog_num, size_t new_vlog_num); | |||||
private: | |||||
void exec_gc(size_t old_vlog_num, size_t new_vlog_num); | |||||
inline bool value_deleted(uint16_t value_len) { | |||||
return !(value_len >> 15); | |||||
} | |||||
inline uint16_t get_value_len(char *value) { | |||||
uint16_t value_len; | |||||
memcpy(&value_len, value, sizeof(uint16_t)); | |||||
return value_len; | |||||
} | |||||
inline size_t get_value_slotnum(char *value) { | |||||
size_t slot_num; | |||||
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); | |||||
return slot_num; | |||||
} | |||||
SlotPage *slot_page_; | |||||
ThreadPool *thread_pool_; | |||||
VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp | |||||
}; | |||||
#endif // LEVELDB_VLOG_GC_H |
@ -0,0 +1,336 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/29. | |||||
// | |||||
#include "vlog_set.h" | |||||
#include "../db/vlog_gc.h" | |||||
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | | |||||
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||||
// value: { value_len(uint16_t) | slot_num(size_t) | value } | |||||
VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) { | |||||
auto cfname = get_config_file_name(); | |||||
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); | |||||
if (!this->config_file_->is_open()) { | |||||
// config 文件不存在,尝试创建 | |||||
delete this->config_file_; | |||||
this->config_file_ = new std::fstream(cfname, std::ios::out); | |||||
this->config_file_->close(); | |||||
delete this->config_file_; | |||||
// 重新以读写模式打开 | |||||
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); | |||||
this->vlog_nums_ = 0; | |||||
} else { | |||||
// config 文件存在 | |||||
size_t _vlog_nums_; | |||||
config_file_->seekp(0); | |||||
config_file_->read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t)); | |||||
this->vlog_nums_ = _vlog_nums_; | |||||
// 从 config file 中读取所有有效的vlog名字 | |||||
config_file_->seekp(sizeof(size_t)); | |||||
size_t tmp[_vlog_nums_]; | |||||
config_file_->read(reinterpret_cast<char*>(tmp), sizeof(tmp)); | |||||
for (auto i = 0; i < _vlog_nums_; i++) { | |||||
size_t curr_vlog_num = tmp[i]; | |||||
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) { | |||||
auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out); | |||||
size_t curr_vlog_header[2]; | |||||
curr_vlog.seekp(0); | |||||
curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t)); | |||||
curr_vlog.close(); | |||||
restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0])); | |||||
} | |||||
} | |||||
} | |||||
if (!this->config_file_->is_open()) { | |||||
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; | |||||
std::exit(EXIT_FAILURE); | |||||
} | |||||
} | |||||
VlogSet::~VlogSet() { | |||||
config_file_->seekp(0); | |||||
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t)); | |||||
config_file_->flush(); | |||||
config_file_->close(); | |||||
for (auto & it : vlog_info_map_) { | |||||
struct vlog_info* vinfo = it.second; | |||||
// 使用 vlog_info* 进行操作 | |||||
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; | |||||
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); | |||||
vlog_file_handler.seekp(0); | |||||
vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp)); | |||||
vlog_file_handler.flush(); | |||||
vlog_file_handler.close(); | |||||
} | |||||
delete vlog_gc; | |||||
} | |||||
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||||
mtx.lock(); | |||||
auto vinfo = get_vlog_info(vlog_num); | |||||
auto vhandler = get_vlog_handler(vlog_num); | |||||
vinfo->vlog_info_latch_.lock(); | |||||
if (!vinfo->vlog_valid_) { | |||||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
} | |||||
vhandler->vlog_latch_.soft_lock(); | |||||
vinfo->vlog_info_latch_.unlock(); | |||||
read_vlog_value(vlog_num, value_offset, value); | |||||
vhandler->vlog_latch_.soft_unlock(); | |||||
mtx.unlock(); | |||||
} | |||||
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { | |||||
for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) { | |||||
struct vlog_info* vinfo = it->second; | |||||
// 使用 vlog_info* 进行操作 | |||||
if (vinfo->curr_size + value_size <= VLOG_SIZE) { | |||||
return vinfo; | |||||
} | |||||
} | |||||
// 所有vlog已满,创建新vlog | |||||
return nullptr; | |||||
} | |||||
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { | |||||
mtx.lock(); | |||||
auto vinfo = get_writable_vlog_info(value.size()); | |||||
if (!vinfo) { | |||||
// vlog全部已满,创建新的vlog | |||||
auto _vlog_num_ = register_new_vlog(); | |||||
vinfo = get_vlog_info(_vlog_num_); | |||||
} | |||||
vinfo->vlog_info_latch_.lock(); | |||||
*vlog_num = vinfo->vlog_num; | |||||
*value_offset = vinfo->curr_size; | |||||
vinfo->curr_size += value.size(); | |||||
vinfo->value_nums ++; | |||||
vinfo->vlog_info_latch_.unlock(); | |||||
auto vhandler = get_vlog_handler(*vlog_num); | |||||
vinfo->vlog_info_latch_.lock(); | |||||
if (!vinfo->vlog_valid_) { | |||||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
} | |||||
vhandler->vlog_latch_.hard_lock(); | |||||
vinfo->vlog_info_latch_.unlock(); | |||||
write_vlog_value(*vlog_num, *value_offset, value); | |||||
vhandler->vlog_latch_.hard_unlock(); | |||||
mtx.unlock(); | |||||
} | |||||
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { | |||||
mtx.lock(); | |||||
auto vinfo = get_vlog_info(vlog_num); | |||||
auto vhandler = get_vlog_handler(vlog_num); | |||||
vinfo->vlog_info_latch_.lock(); | |||||
if (!vinfo->vlog_valid_) { | |||||
// auto vlog_num_gc = vinfo->vlog_num_for_gc; | |||||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||||
// vinfo->vlog_info_latch_.unlock(); | |||||
// vinfo = get_vlog_info(vlog_num_gc); | |||||
// vinfo->vlog_info_latch_.lock(); | |||||
} | |||||
vhandler->vlog_latch_.hard_lock(); | |||||
mark_del_value(vlog_num, value_offset); | |||||
vinfo->vlog_info_latch_.unlock(); | |||||
vhandler->vlog_latch_.hard_unlock(); | |||||
mtx.unlock(); | |||||
} | |||||
size_t VlogSet::register_new_vlog() { | |||||
size_t vn = vlog_nums_; | |||||
std::string vlog_name = get_vlog_name(vn); | |||||
register_inconfig_file(vn); | |||||
create_vlog(vlog_name); | |||||
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
// if (!vlog_new->is_open()) { | |||||
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; | |||||
// std::exit(EXIT_FAILURE); | |||||
// } | |||||
register_vlog_inmaps(vn, vlog_name); | |||||
vlog_nums_ ++; | |||||
// vlog_count_ ++; | |||||
return vn; | |||||
} | |||||
void VlogSet::remove_old_vlog(size_t old_vlog_num) { | |||||
// after gc, new_vlog has been created | |||||
// new vlog should have been created here ! | |||||
mtx.lock(); | |||||
auto vi_old = get_vlog_info(old_vlog_num); | |||||
vi_old->vlog_info_latch_.lock(); | |||||
std::string old_vlog_name = get_vlog_name(old_vlog_num); | |||||
remove_from_config_file(old_vlog_num); | |||||
remove_vlog_from_maps(old_vlog_name); | |||||
vi_old->vlog_valid_ = false; | |||||
vi_old->vlog_info_latch_.unlock(); | |||||
// vlog_count_ --; | |||||
mtx.unlock(); | |||||
} | |||||
bool VlogSet::vlog_need_gc(size_t vlog_num) { | |||||
std::string vlog_name = get_vlog_name(vlog_num); | |||||
auto vi = vlog_info_map_[vlog_name]; | |||||
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); | |||||
return retval; | |||||
} | |||||
void VlogSet::register_inconfig_file(size_t vlog_num) { | |||||
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | | |||||
// first size_t in config file indicates current vlog_nums_(size_t) | |||||
// second size_t in config file indicates current vlog_count_(size_t) | |||||
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); | |||||
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t)); | |||||
config_file_->flush(); | |||||
} | |||||
void VlogSet::remove_from_config_file(size_t vlog_num) { | |||||
char tmp[vlog_nums_*vlog_num_size]; | |||||
size_t index = 0; | |||||
config_file_->seekp(sizeof(size_t)); | |||||
config_file_->read(tmp, sizeof(tmp)); | |||||
size_t *vlog_num_ptr = reinterpret_cast<size_t*>(tmp); | |||||
for (auto i = 0; i < vlog_nums_; i++) { | |||||
size_t curr_vlog_num = *vlog_num_ptr; | |||||
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) { | |||||
curr_vlog_num &= CONFIG_FILE_DELE_MASK; | |||||
config_file_->seekp(sizeof(size_t) + i*sizeof(size_t)); | |||||
config_file_->write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t)); | |||||
config_file_->flush(); | |||||
break; | |||||
} | |||||
} | |||||
} | |||||
void VlogSet::create_vlog(std::string &vlog_name) { | |||||
std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); | |||||
char tmp[2*sizeof(size_t)]; | |||||
memset(tmp, 0, sizeof(tmp)); | |||||
vlog_new->write(tmp, sizeof(tmp)); | |||||
vlog_new->flush(); | |||||
vlog_new->close(); | |||||
} | |||||
inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { | |||||
auto vlog_name = get_vlog_name(vi->vlog_num); | |||||
vlog_info_map_[vlog_name] = vi; | |||||
vlog_handler_map_[vlog_name] = new vlog_handler(); | |||||
} | |||||
inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { | |||||
vlog_info_map_[vlog_name] = new vlog_info(vlog_num); | |||||
vlog_handler_map_[vlog_name] = new vlog_handler(); | |||||
} | |||||
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { | |||||
auto vi = vlog_info_map_[vlog_name]; | |||||
// auto vh = vlog_handler_map_[vlog_name]; | |||||
vi->vlog_info_latch_.lock(); | |||||
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag | |||||
// vh->handler_->close(); | |||||
assert(!std::remove(vlog_name.c_str())); | |||||
vlog_handler_map_.erase(vlog_name); | |||||
vi->vlog_info_latch_.unlock(); | |||||
} | |||||
inline std::string VlogSet::get_config_file_name() { | |||||
return dbname + "_config_file"; | |||||
} | |||||
std::string VlogSet::get_vlog_name(size_t vlog_num) { | |||||
return dbname + "_vlog_" + std::to_string(vlog_num); | |||||
} | |||||
struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) { | |||||
return vlog_info_map_[get_vlog_name(vlog_num)]; | |||||
} | |||||
struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { | |||||
return vlog_handler_map_[get_vlog_name(vlog_num)]; | |||||
} | |||||
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||||
auto vlog_name = get_vlog_name(vlog_num); | |||||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
handler.seekp(value_offset); | |||||
char value_buff[VALUE_BUFF_SIZE]; | |||||
handler.read(value_buff, VALUE_BUFF_SIZE); | |||||
uint16_t value_size; | |||||
memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||||
value_size &= VALUE_SIZE_MASK; | |||||
value->assign(&value_buff[sizeof(uint16_t)], value_size); | |||||
handler.close(); | |||||
} | |||||
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { | |||||
auto vlog_name = get_vlog_name(vlog_num); | |||||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
handler.seekp(value_offset); | |||||
const char *value_buff = value.data(); | |||||
handler.write(value_buff, value.size()); | |||||
auto vinfo = get_vlog_info(vlog_num); | |||||
vinfo->value_nums ++; | |||||
vinfo->curr_size += value.size(); | |||||
handler.flush(); | |||||
handler.close(); | |||||
} | |||||
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { | |||||
auto vinfo = get_vlog_info(vlog_num); | |||||
auto vlog_name = get_vlog_name(vlog_num); | |||||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||||
handler.seekp(value_offset); | |||||
char value_buff[VALUE_BUFF_SIZE]; | |||||
handler.read(value_buff, VALUE_BUFF_SIZE); | |||||
uint16_t value_size; | |||||
memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||||
if (value_size & VALUE_DELE_MASK) { | |||||
// case when value has been deleted | |||||
handler.close(); | |||||
return ; | |||||
} | |||||
assert(!(value_size & VALUE_DELE_MASK)); | |||||
uint16_t masked_value_size = value_size & 0xffff; | |||||
memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); | |||||
handler.write(value_buff, value_size); | |||||
handler.flush(); | |||||
handler.close(); | |||||
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too | |||||
// auto vinfo = get_vlog_info(vlog_num); | |||||
vinfo->discard ++; | |||||
vinfo->value_nums --; | |||||
vinfo->curr_size -= value_size & VALUE_SIZE_MASK; | |||||
if (vlog_need_gc(vlog_num)) { | |||||
// create new vlog | |||||
vinfo->vlog_valid_ = false; | |||||
vinfo->vlog_num_for_gc = register_new_vlog(); | |||||
// vinfo->vlog_valid_ = false; | |||||
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); | |||||
} | |||||
} |
@ -0,0 +1,67 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/29. | |||||
// | |||||
#ifndef LEVELDB_VLOG_SET_H | |||||
#define LEVELDB_VLOG_SET_H | |||||
#include <unordered_map> | |||||
#include <string> | |||||
#include <mutex> | |||||
#include <cassert> | |||||
#include "../include/leveldb/slice.h" | |||||
#include "../db/shared_lock.h" | |||||
#include "../db/vlog.h" | |||||
// 前向声明 VlogGC | |||||
class VlogGC; | |||||
class VlogSet { | |||||
friend class VlogGC; | |||||
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) | |||||
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) | |||||
public: | |||||
VlogSet(std::string dbname, VlogGC *vlog_gc); | |||||
~VlogSet(); | |||||
void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); | |||||
void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value); | |||||
void del_value(uint32_t vlog_num, uint32_t value_offset); | |||||
void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; } | |||||
private: | |||||
size_t register_new_vlog(); | |||||
void remove_old_vlog(size_t old_vlog_num); | |||||
bool vlog_need_gc(size_t vlog_num); | |||||
void register_inconfig_file(size_t vlog_num); | |||||
void remove_from_config_file(size_t vlog_num); | |||||
void create_vlog(std::string &vlog_name); | |||||
struct vlog_info *get_writable_vlog_info(size_t value_size); | |||||
inline void restore_vlog_inmaps(struct vlog_info *vi); | |||||
inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name); | |||||
inline void remove_vlog_from_maps(std::string &vlog_name); | |||||
inline std::string get_config_file_name(); | |||||
std::string get_vlog_name(size_t vlog_num); | |||||
struct vlog_info *get_vlog_info(size_t vlog_num); | |||||
struct vlog_handler *get_vlog_handler(size_t vlog_num); | |||||
void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); | |||||
void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value); | |||||
void mark_del_value(uint32_t vlog_num, uint32_t value_offset); | |||||
private: | |||||
std::mutex mtx; | |||||
std::string dbname; | |||||
size_t vlog_nums_; | |||||
// size_t vlog_count_; | |||||
std::unordered_map<std::string, struct vlog_info *> vlog_info_map_; | |||||
std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_; | |||||
std::fstream *config_file_; | |||||
VlogGC *vlog_gc; // 仅声明为指针,具体定义放在 vlog_set.cpp | |||||
}; | |||||
#endif // LEVELDB_VLOG_SET_H |
@ -0,0 +1,127 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/20. | |||||
// | |||||
#include "../db/slotpage.h" | |||||
#include <thread> | |||||
#define SLOTNUM 8192 | |||||
void printVector(const std::vector<size_t>& vec) { | |||||
for (size_t i = 0; i < vec.size(); ++i) { | |||||
std::cout << vec[i] << " "; | |||||
} | |||||
std::cout << std::endl; | |||||
} | |||||
int test1() { | |||||
BitMap bitmap; | |||||
std::vector<size_t> slots; | |||||
slots.reserve(SLOTNUM<<2); | |||||
int i = 0; | |||||
for (i = 0; i < 2047; i++) { | |||||
slots.push_back(bitmap.alloc_slot()); | |||||
} | |||||
slots.push_back(bitmap.alloc_slot()); | |||||
for (; i < SLOTNUM<<2; i++) { | |||||
slots.push_back(bitmap.alloc_slot()); | |||||
} | |||||
printVector(slots); | |||||
std::cout << "\n\n"; | |||||
for (i = 0; i < 100; i++) { | |||||
bitmap.dealloc_slot(slots.at(i)); | |||||
} | |||||
bitmap.dealloc_slot(10000); | |||||
bitmap.dealloc_slot(11000); | |||||
bitmap.dealloc_slot(12000); | |||||
bitmap.show_allocated_slot(); | |||||
std::cout << "\n\n"; | |||||
for (i = 0; i < 103; i++) { | |||||
bitmap.alloc_slot(); | |||||
} | |||||
bitmap.show_allocated_slot(); | |||||
return 1; | |||||
} | |||||
int test2() { | |||||
BitMap bitmap; | |||||
std::vector<size_t> slots; | |||||
slots.reserve(SLOTNUM); | |||||
for (auto i = 0; i < SLOTNUM; i++) { | |||||
slots.push_back(bitmap.alloc_slot()); | |||||
} | |||||
printVector(slots); | |||||
std::cout << "\n\n"; | |||||
bitmap.dealloc_slot(30); | |||||
bitmap.dealloc_slot(40); | |||||
bitmap.dealloc_slot(50); | |||||
bitmap.show_allocated_slot(); | |||||
std::cout << "\n\n"; | |||||
bitmap.alloc_slot(); | |||||
bitmap.alloc_slot(); | |||||
bitmap.show_allocated_slot(); | |||||
return 1; | |||||
} | |||||
void allocate_slots(BitMap& bitmap, std::vector<size_t>& slots, int num_slots) { | |||||
for (int i = 0; i < num_slots; i++) { | |||||
slots.push_back(bitmap.alloc_slot()); | |||||
} | |||||
} | |||||
int test3() { | |||||
BitMap bitmap; | |||||
std::vector<size_t> slots1; | |||||
slots1.reserve(1000); | |||||
std::vector<size_t> slots2; | |||||
slots2.reserve(1000); | |||||
// Create two threads to allocate slots concurrently. | |||||
std::thread thread1(allocate_slots, std::ref(bitmap), std::ref(slots1), 1000); | |||||
std::thread thread2(allocate_slots, std::ref(bitmap), std::ref(slots2), 1000); | |||||
// Wait for both threads to finish. | |||||
thread1.join(); | |||||
thread2.join(); | |||||
// Print the results. | |||||
std::cout << "Slots allocated by thread 1:\n"; | |||||
printVector(slots1); | |||||
std::cout << "\n\nSlots allocated by thread 2:\n"; | |||||
printVector(slots2); | |||||
std::cout << "\n\n"; | |||||
std::vector<size_t> combined_slots = slots1; | |||||
combined_slots.insert(combined_slots.end(), slots2.begin(), slots2.end()); | |||||
std::sort(combined_slots.begin(), combined_slots.end()); | |||||
// Print the sorted combined slots. | |||||
std::cout << "Sorted combined slots:\n"; | |||||
printVector(combined_slots); | |||||
std::cout << "\n\n"; | |||||
return 1; | |||||
} | |||||
int main() { | |||||
test3(); // 有问题 | |||||
} |
@ -0,0 +1,148 @@ | |||||
// | |||||
// Created by 马也驰 on 2024/12/22. | |||||
// | |||||
#include <iostream> | |||||
#include <thread> | |||||
#include <vector> | |||||
#include "../db/slotpage.h" | |||||
void printVector(const std::vector<uint32_t>& vec) { | |||||
for (size_t i = 0; i < vec.size(); ++i) { | |||||
std::cout << vec[i] << " "; | |||||
} | |||||
std::cout << std::endl; | |||||
} | |||||
int test1() { | |||||
SlotCache slot_cache((std::string&)"test_slotpage"); | |||||
const size_t blksize = 4096; | |||||
const size_t blknum = 32; | |||||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||||
slot_content slot_contents[slot_content_num]; | |||||
for (int i = 0; i < slot_content_num; i++) { | |||||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||||
} | |||||
// | |||||
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content)); | |||||
slot_content sc; | |||||
for (int i = 0; i < slot_content_num; i++) { | |||||
slot_cache.get_slot(i, &sc); | |||||
assert(sc.vlog_num == slot_contents[i].vlog_num); | |||||
assert(sc.value_offset == slot_contents[i].vlog_num); | |||||
std::cout << i << std::endl; | |||||
} | |||||
return 1; | |||||
} | |||||
int test2() { | |||||
SlotCache slot_cache((std::string&)"test_slotpage"); | |||||
const size_t blksize = 4096; | |||||
const size_t blknum = 32; | |||||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||||
slot_content slot_contents[slot_content_num]; | |||||
for (int i = 0; i < slot_content_num; i++) { | |||||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||||
} | |||||
// | |||||
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content)); | |||||
slot_content sc1 = {999999999, 999999999}; | |||||
// slot_cache.set_slot(100, &sc1); | |||||
// slot_cache.set_slot(1000, &sc1); | |||||
// slot_cache.set_slot(2000, &sc1); | |||||
slot_content sc2 = {0, 0}; | |||||
slot_cache.get_slot(100, &sc2); | |||||
assert(sc2.vlog_num == sc1.vlog_num); | |||||
assert(sc2.value_offset == sc1.value_offset); | |||||
slot_cache.get_slot(1000, &sc2); | |||||
assert(sc2.vlog_num == sc1.vlog_num); | |||||
assert(sc2.value_offset == sc1.value_offset); | |||||
slot_cache.get_slot(2000, &sc2); | |||||
assert(sc2.vlog_num == sc1.vlog_num); | |||||
assert(sc2.value_offset == sc1.value_offset); | |||||
// for (int i = 0; i < slot_content_num; i++) { | |||||
// slot_cache.get_slot(i, &sc); | |||||
// assert(sc.vlog_num == slot_contents[i].vlog_num); | |||||
// assert(sc.value_offset == slot_contents[i].vlog_num); | |||||
// std::cout << i << std::endl; | |||||
// } | |||||
return 1; | |||||
} | |||||
void process_slots(SlotCache& slot_cache, | |||||
const slot_content* slot_contents, | |||||
size_t start, size_t end, | |||||
std::vector<uint32_t>& slots) { | |||||
slot_content sc; | |||||
for (size_t i = start; i < end; ++i) { | |||||
slot_cache.get_slot(i, &sc); | |||||
// assert(sc.vlog_num == slot_contents[i].vlog_num); | |||||
// assert(sc.value_offset == slot_contents[i].vlog_num); | |||||
slots.push_back(sc.vlog_num); | |||||
} | |||||
} | |||||
int test3() { | |||||
SlotCache slot_cache((std::string&)("test_slotpage")); | |||||
const size_t blksize = 4096; | |||||
const size_t blknum = 32; | |||||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||||
slot_content slot_contents[slot_content_num]; | |||||
for (size_t i = 0; i < slot_content_num; i++) { | |||||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||||
} | |||||
std::vector<uint32_t> slots1; | |||||
std::vector<uint32_t> slots2; | |||||
// 创建两个线程分别处理 slots1 和 slots2 | |||||
std::thread t1(process_slots, std::ref(slot_cache), slot_contents, 0, 100, std::ref(slots1)); | |||||
std::thread t2(process_slots, std::ref(slot_cache), slot_contents, 100, 200, std::ref(slots2)); | |||||
// 等待线程完成 | |||||
t1.join(); | |||||
t2.join(); | |||||
// 打印两个 vector 的内容 | |||||
std::cout << "Slots1:" << std::endl; | |||||
printVector(slots1); | |||||
std::cout << std::endl; | |||||
std::cout << "Slots2:" << std::endl; | |||||
printVector(slots2); | |||||
std::cout << std::endl; | |||||
// 合并并排序 | |||||
std::vector<uint32_t> merged_slots = slots1; | |||||
merged_slots.insert(merged_slots.end(), slots2.begin(), slots2.end()); | |||||
std::sort(merged_slots.begin(), merged_slots.end()); | |||||
// 打印合并排序后的结果 | |||||
std::cout << "Merged and Sorted Slots:" << std::endl; | |||||
printVector(merged_slots); | |||||
std::cout << std::endl; | |||||
return 1; | |||||
} | |||||
int main() { | |||||
test3(); | |||||
} |