Author | SHA1 | Message | Date |
---|---|---|---|
|
ca25cec40a | Merge remote-tracking branch 'origin/master' | 8 months ago |
|
465719f1c4 | vlog | 9 months ago |
@ -0,0 +1,52 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/28. | |||
// | |||
#ifndef LEVELDB_SHARED_LOCK_H | |||
#define LEVELDB_SHARED_LOCK_H | |||
#include <mutex> | |||
class SharedLock { | |||
public: | |||
SharedLock() : read_count_(0) {} | |||
public: | |||
void soft_lock() { | |||
write_latch_.lock(); | |||
read_count_latch_.lock(); | |||
if (!read_count_) { | |||
read_latch_.lock(); | |||
} | |||
read_count_ ++; | |||
read_count_latch_.unlock(); | |||
write_latch_.unlock(); | |||
} | |||
void soft_unlock() { | |||
read_count_latch_.lock(); | |||
read_count_ --; | |||
if (!read_count_) { | |||
read_latch_.unlock(); | |||
} | |||
read_count_latch_.unlock(); | |||
} | |||
void hard_lock() { | |||
read_latch_.lock(); | |||
write_latch_.lock(); | |||
read_latch_.unlock(); | |||
} | |||
void hard_unlock() { | |||
write_latch_.unlock(); | |||
} | |||
private: | |||
std::mutex read_latch_; // indicate if any read is proceeding | |||
std::mutex read_count_latch_; | |||
volatile size_t read_count_; | |||
std::mutex write_latch_; | |||
// volatile size_t write_count_; | |||
}; | |||
#endif // LEVELDB_SHARED_LOCK_H |
@ -0,0 +1,67 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/28. | |||
// | |||
#ifndef LEVELDB_THREADPOOL_H | |||
#define LEVELDB_THREADPOOL_H | |||
#include <iostream> | |||
#include <vector> | |||
#include <thread> | |||
#include <queue> | |||
#include <functional> | |||
#include <condition_variable> | |||
#include <atomic> | |||
class ThreadPool { | |||
public: | |||
explicit ThreadPool(size_t numThreads) : stop(false) { | |||
for (size_t i = 0; i < numThreads; ++i) { | |||
workers.emplace_back([this]() { | |||
while (true) { | |||
std::function<void()> task; | |||
{ | |||
std::unique_lock<std::mutex> lock(queueMutex); | |||
condition.wait(lock, [this]() { return stop || !tasks.empty(); }); | |||
if (stop && tasks.empty()) return; | |||
task = std::move(tasks.front()); | |||
tasks.pop(); | |||
} | |||
task(); | |||
} | |||
}); | |||
} | |||
} | |||
~ThreadPool() { | |||
{ | |||
std::unique_lock<std::mutex> lock(queueMutex); | |||
stop = true; | |||
} | |||
condition.notify_all(); | |||
for (std::thread &worker : workers) { | |||
worker.join(); | |||
} | |||
} | |||
void enqueue(std::function<void()> task) { | |||
{ | |||
std::unique_lock<std::mutex> lock(queueMutex); | |||
tasks.push(std::move(task)); | |||
} | |||
condition.notify_one(); | |||
} | |||
private: | |||
std::vector<std::thread> workers; | |||
std::queue<std::function<void()>> tasks; | |||
std::mutex queueMutex; | |||
std::condition_variable condition; | |||
std::atomic<bool> stop; | |||
}; | |||
#endif // LEVELDB_THREADPOOL_H |
@ -0,0 +1,378 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/26. | |||
// | |||
#ifndef LEVELDB_VLOG_H | |||
#define LEVELDB_VLOG_H | |||
#include <cstdlib> | |||
#include <fstream> | |||
#include <iostream> | |||
#include <vector> | |||
#include <cstdio> | |||
#include "../db/threadpool.h" | |||
#include "../include/leveldb/slice.h" | |||
#include "../db/slotpage.h" | |||
#include "../db/shared_lock.h" | |||
// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... | | |||
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||
// value: value_len(uint16_t) | slot_num(size_t) | value | | |||
#define vlog_num_size sizeof(size_t) | |||
#define vlog_name_size 256 | |||
#define KiB 1024 | |||
#define MiB 1024*KiB | |||
#define VLOG_SIZE 32*MiB | |||
#define GC_THREDHOLD 0.5 | |||
#define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number | |||
#define VALUE_SIZE_MASK 0x7fff | |||
#define VALUE_DELE_MASK 0x8000 | |||
//struct vlog_name { | |||
// int name_size = 0; | |||
// char name[vlog_name_size]; | |||
// vlog_name() { | |||
// memset(name, '\n', sizeof(name)); | |||
// } | |||
// vlog_name(std::string &_name) { | |||
// memset(name, '\n', sizeof(name)); | |||
// memcpy(name, _name.c_str(), _name.size()); | |||
// name_size = _name.size(); | |||
// } | |||
//}; | |||
struct vlog_info { | |||
std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改 | |||
size_t vlog_num; | |||
size_t vlog_num_for_gc; // set when start gc | |||
bool vlog_valid_; | |||
size_t discard; | |||
size_t value_nums; | |||
size_t curr_size; | |||
vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0), | |||
vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {} | |||
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums), | |||
vlog_num(vlog_num), curr_size(curr_size) {} | |||
}; | |||
struct vlog_handler { | |||
SharedLock vlog_latch_; // 表明当前vlog上的并发情况,读上soft_lock,写上hard_lock | |||
vlog_handler() {} | |||
}; | |||
//class VlogSet; | |||
// | |||
// | |||
//class VlogGC { | |||
//#define THREADNUM 8 | |||
// public: | |||
// VlogGC(SlotPage *s, VlogSet *vs) : thread_pool_(new ThreadPool(THREADNUM)), | |||
// slot_page_(s), vlog_set(vs) {} | |||
// public: | |||
// void do_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, | |||
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info){ | |||
// thread_pool_->enqueue([this, old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info]() | |||
// { this->exec_gc(old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); }); | |||
// } | |||
// | |||
// private: | |||
// void exec_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, | |||
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info) { | |||
// | |||
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); | |||
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); | |||
// | |||
// size_t value_nums = old_vlog_info->value_nums; | |||
// char old_vlog_buff[VLOG_SIZE]; | |||
// char new_vlog_buff[VLOG_SIZE]; | |||
// old_vlog.seekp(0); | |||
// old_vlog.read(old_vlog_buff, VLOG_SIZE); | |||
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); | |||
// | |||
// size_t ovb_off = sizeof(size_t); | |||
// size_t nvb_off = sizeof(size_t); | |||
// for (auto i = 0; i < value_nums; i++) { | |||
// char *value = &old_vlog_buff[ovb_off]; | |||
// uint16_t value_len = get_value_len(value); | |||
// size_t slot_num = get_value_slotnum(value); | |||
// if (!value_deleted(value_len)) { | |||
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); | |||
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off); | |||
// slot_page_->set_slot(slot_num, &scn); | |||
// nvb_off += value_len; | |||
// } | |||
// ovb_off += value_len; | |||
// } | |||
// new_vlog.write(new_vlog_buff, VLOG_SIZE); | |||
// new_vlog.flush(); | |||
// | |||
// old_vlog.close(); | |||
// new_vlog.close(); | |||
// | |||
// this->vlog_set->remove_from_config_file(old_vlog_name); | |||
// } | |||
// private: | |||
// inline bool value_deleted(uint16_t value_len) { | |||
// return !(value_len >> 15); | |||
// } | |||
// inline uint16_t get_value_len(char *value) { | |||
// uint16_t value_len; | |||
// memcpy(&value_len, value, sizeof(uint16_t)); | |||
// return value_len; | |||
// } | |||
// inline size_t get_value_slotnum(char *value) { | |||
// size_t slot_num; | |||
// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); | |||
// return slot_num; | |||
// } | |||
// private: | |||
// SlotPage *slot_page_; | |||
// ThreadPool *thread_pool_; | |||
// VlogSet *vlog_set; // inline void remove_vlog_from_maps(std::string &vlog_name) | |||
//}; | |||
//class VlogSet { | |||
//// value: value_len(uint16_t) | slot_num(size_t) | value | |||
//// vlog: curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||
//#define VALUE_BUFF_SIZE 0x7fff | |||
//#define VALUE_SIZE_MASK 0x7fff | |||
//#define VALUE_DELE_MASK 0x8000 | |||
// | |||
//friend class VlogGC; | |||
// | |||
//public: | |||
// void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||
// mtx.lock(); | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
// auto vhandler = get_vlog_handler(vlog_num); | |||
// | |||
// vinfo->vlog_info_latch_.lock(); | |||
// if (!vinfo->vlog_valid_) { | |||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
// } | |||
// vhandler->vlog_latch_.soft_lock(); | |||
// vinfo->vlog_info_latch_.unlock(); | |||
// read_vlog_value(vlog_num, value_offset, value); | |||
// vhandler->vlog_latch_.soft_unlock(); | |||
// mtx.unlock(); | |||
// } | |||
// void put_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { | |||
// mtx.lock(); | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
// auto vhandler = get_vlog_handler(vlog_num); | |||
// | |||
// vinfo->vlog_info_latch_.lock(); | |||
// if (!vinfo->vlog_valid_) { | |||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
// } | |||
// vhandler->vlog_latch_.hard_lock(); | |||
// vinfo->vlog_info_latch_.unlock(); | |||
// write_vlog_value(vlog_num, value_offset, value); | |||
// vhandler->vlog_latch_.hard_unlock(); | |||
// mtx.unlock(); | |||
// } | |||
// void del_value(uint32_t vlog_num, uint32_t value_offset) { | |||
// mtx.lock(); | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
// auto vhandler = get_vlog_handler(vlog_num); | |||
// | |||
// vinfo->vlog_info_latch_.lock(); | |||
// if (!vinfo->vlog_valid_) { | |||
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
// } | |||
// vhandler->vlog_latch_.hard_lock(); | |||
// mark_del_value(vlog_num, value_offset); | |||
// | |||
// vinfo->vlog_info_latch_.unlock(); | |||
// vhandler->vlog_latch_.hard_unlock(); | |||
// mtx.unlock(); | |||
// } | |||
// | |||
//private: | |||
// size_t register_new_vlog() { | |||
// size_t vn = vlog_nums_; | |||
// std::string vlog_name = get_vlog_name(vn); | |||
// register_inconfig_file(vlog_name); | |||
// create_vlog(vlog_name); | |||
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
// if (!vlog_new->is_open()) { | |||
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; | |||
// std::exit(EXIT_FAILURE); | |||
// } | |||
// register_vlog_inmaps(vn, vlog_name, vlog_new); | |||
// vlog_nums_ ++; | |||
// vlog_count_ ++; | |||
// return vn; | |||
// } | |||
// void remove_old_vlog(size_t old_vlog_num) { | |||
// // after gc, new_vlog has been created | |||
// // new vlog should have been created here ! | |||
// mtx.lock(); | |||
// auto vi_old = get_vlog_info(old_vlog_num); | |||
// vi_old->vlog_info_latch_.lock(); | |||
// std::string old_vlog_name = get_vlog_name(old_vlog_num); | |||
// remove_from_config_file(old_vlog_name); | |||
// remove_vlog_from_maps(old_vlog_name); | |||
// vi_old->vlog_valid_ = false; | |||
// vi_old->vlog_info_latch_.unlock(); | |||
// vlog_count_ --; | |||
// mtx.unlock(); | |||
// } | |||
// bool vlog_need_gc(size_t vlog_num) { | |||
// std::string vlog_name = get_vlog_name(vlog_num); | |||
// auto vi = vlog_info_map_[vlog_name]; | |||
// bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); | |||
// return retval; | |||
// } | |||
// | |||
//private: | |||
// void register_inconfig_file(std::string &vlog_name) { | |||
// struct vlog_name vn(vlog_name); | |||
// // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | | |||
// // first size_t in config file indicates current vlog_nums_(size_t) | |||
// // second size_t in config file indicates current vlog_count_(size_t) | |||
// config_file_->seekp(2*sizeof(size_t) + vlog_nums_* vlog_name_size); | |||
// config_file_->write(vn.name, vn.name_size); | |||
// config_file_->flush(); | |||
// } | |||
// void remove_from_config_file(std::string &vlog_name) { | |||
// char tmp[vlog_name_size]; | |||
// size_t index = 0; | |||
// config_file_->seekp(2*sizeof(size_t)); | |||
// while (config_file_->read(tmp, vlog_name_size) || config_file_->gcount() > 0) { | |||
// size_t length = 0; | |||
// while (length < config_file_->gcount() && tmp[length] != '\n') { | |||
// ++length; | |||
// } | |||
// std::string name(tmp, length); | |||
// if (name == vlog_name) { | |||
// memset(tmp, '\n', sizeof(tmp)); | |||
// config_file_->seekp(2*sizeof(size_t) + index * vlog_name_size); | |||
// config_file_->write(tmp, vlog_name_size); | |||
// config_file_->flush(); | |||
// break; | |||
// } | |||
// index ++; | |||
// } | |||
// } | |||
// void create_vlog(std::string &vlog_name) { | |||
// std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); | |||
// char tmp[2*sizeof(size_t)]; | |||
// memset(tmp, 0, sizeof(tmp)); | |||
// vlog_new->write(tmp, sizeof(tmp)); | |||
// vlog_new->flush(); | |||
// vlog_new->close(); | |||
// } | |||
// inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name, std::fstream *handler) { | |||
// vlog_info_map_[vlog_name] = new vlog_info(vlog_num); | |||
// vlog_handler_map_[vlog_name] = new vlog_handler(); | |||
// } | |||
// inline void remove_vlog_from_maps(std::string &vlog_name) { | |||
// auto vi = vlog_info_map_[vlog_name]; | |||
//// auto vh = vlog_handler_map_[vlog_name]; | |||
// vi->vlog_info_latch_.lock(); | |||
// vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag | |||
//// vh->handler_->close(); | |||
// assert(!std::remove(vlog_name.c_str())); | |||
// vlog_handler_map_.erase(vlog_name); | |||
// vi->vlog_info_latch_.unlock(); | |||
// } | |||
// inline std::string get_vlog_name(size_t vlog_num) { | |||
// return dbname + "_vlog_" + std::to_string(vlog_num); | |||
// } | |||
// inline struct vlog_info *get_vlog_info(size_t vlog_num) { | |||
// return vlog_info_map_[get_vlog_name(vlog_num)]; | |||
// } | |||
// inline struct vlog_handler *get_vlog_handler(size_t vlog_num) { | |||
// return vlog_handler_map_[get_vlog_name(vlog_num)]; | |||
// } | |||
// void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||
// auto vlog_name = get_vlog_name(vlog_num); | |||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
// handler.seekp(value_offset); | |||
// char value_buff[VALUE_BUFF_SIZE]; | |||
// handler.read(value_buff, VALUE_BUFF_SIZE); | |||
// | |||
// uint16_t value_size; | |||
// memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||
// value_size &= VALUE_SIZE_MASK; | |||
// value->assign(&value_buff[sizeof(uint16_t)], value_size); | |||
// handler.close(); | |||
// } | |||
// void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { | |||
// auto vlog_name = get_vlog_name(vlog_num); | |||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
// handler.seekp(value_offset); | |||
// const char *value_buff = value.data(); | |||
// handler.write(value_buff, value.size()); | |||
// | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
// vinfo->value_nums ++; | |||
// vinfo->curr_size += value.size(); | |||
// | |||
// handler.flush(); | |||
// handler.close(); | |||
// } | |||
// void mark_del_value(uint32_t vlog_num, uint32_t value_offset) { | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
// auto vlog_name = get_vlog_name(vlog_num); | |||
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
// handler.seekp(value_offset); | |||
// char value_buff[VALUE_BUFF_SIZE]; | |||
// handler.read(value_buff, VALUE_BUFF_SIZE); | |||
// | |||
// uint16_t value_size; | |||
// memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||
// if (value_size & VALUE_DELE_MASK) { | |||
// // case when value has been deleted | |||
// handler.close(); | |||
// return ; | |||
// } | |||
// assert(!(value_size & VALUE_DELE_MASK)); | |||
// uint16_t masked_value_size = value_size & 0xffff; | |||
// memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); | |||
// handler.write(value_buff, value_size); | |||
// handler.flush(); | |||
// handler.close(); | |||
// | |||
// // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too | |||
//// auto vinfo = get_vlog_info(vlog_num); | |||
// vinfo->discard ++; | |||
// vinfo->value_nums --; | |||
// vinfo->curr_size -= value_size & VALUE_SIZE_MASK; | |||
// if (vlog_need_gc(vlog_num)) { | |||
// // create new vlog | |||
// vinfo->vlog_num_for_gc = register_new_vlog(); | |||
//// vinfo->vlog_valid_ = false; | |||
// auto old_vlog_name = get_vlog_name(vlog_num); | |||
// auto new_vlog_name = get_vlog_name(vinfo->vlog_num_for_gc); | |||
// auto old_vlog_info = vinfo; | |||
// auto new_vlog_info = get_vlog_info(vinfo->vlog_num_for_gc); | |||
// vlog_gc->do_gc(vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); | |||
// } | |||
// } | |||
// | |||
// | |||
//private: | |||
// std::mutex mtx; | |||
// std::string dbname; | |||
// size_t vlog_nums_; | |||
// size_t vlog_count_; | |||
// std::unordered_map<std::string, struct vlog_info *> vlog_info_map_; // TODO: 读到vlog已经失效时,要返回slotpage重新读取slotnum | |||
// std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_; | |||
// std::fstream *config_file_; | |||
// | |||
// VlogGC *vlog_gc; // init with slot_page | |||
//// SlotPage *slot_page; | |||
//}; | |||
#endif // LEVELDB_VLOG_H |
@ -0,0 +1,68 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/29. | |||
// | |||
#include "vlog_gc.h" | |||
#include "../db/vlog_set.h" | |||
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { | |||
thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]() | |||
{ this->exec_gc(old_vlog_num, new_vlog_num); }); | |||
} | |||
void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { | |||
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); | |||
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); | |||
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); | |||
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); | |||
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); | |||
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); | |||
old_vlog_handler->vlog_latch_.soft_lock(); | |||
new_vlog_info->vlog_info_latch_.lock(); | |||
new_vlog_handler->vlog_latch_.hard_lock(); | |||
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); | |||
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); | |||
char old_vlog_buff[VLOG_SIZE]; | |||
char new_vlog_buff[VLOG_SIZE]; | |||
old_vlog.seekp(0); | |||
old_vlog.read(old_vlog_buff, VLOG_SIZE); | |||
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); | |||
size_t value_nums = old_vlog_info->value_nums; | |||
size_t ovb_off = 2 * sizeof(size_t); | |||
size_t nvb_off = 2 * sizeof(size_t); | |||
size_t new_vlog_value_nums = 0; | |||
for (auto i = 0; i < value_nums; i++) { | |||
char *value = &old_vlog_buff[ovb_off]; | |||
uint16_t value_len = get_value_len(value); | |||
size_t slot_num = get_value_slotnum(value); | |||
if (!value_deleted(value_len)) { | |||
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); | |||
struct slot_content scn(new_vlog_info->vlog_num, nvb_off); | |||
slot_page_->set_slot(slot_num, &scn); | |||
nvb_off += value_len; | |||
new_vlog_value_nums ++; | |||
} | |||
ovb_off += value_len; | |||
} | |||
new_vlog_info->value_nums = value_nums; | |||
new_vlog_info->curr_size = nvb_off; | |||
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); | |||
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); | |||
new_vlog.write(new_vlog_buff, VLOG_SIZE); | |||
new_vlog.flush(); | |||
old_vlog.close(); | |||
new_vlog.close(); | |||
old_vlog_handler->vlog_latch_.soft_unlock(); | |||
new_vlog_info->vlog_info_latch_.unlock(); | |||
new_vlog_handler->vlog_latch_.hard_unlock(); | |||
vlog_set->remove_old_vlog(old_vlog_num); | |||
} | |||
@ -0,0 +1,46 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/29. | |||
// | |||
#ifndef LEVELDB_VLOG_GC_H | |||
#define LEVELDB_VLOG_GC_H | |||
#include <string> | |||
#include <mutex> | |||
#include "../db/slotpage.h" | |||
#include "../db/threadpool.h" | |||
#include "../db/vlog.h" | |||
// 前向声明 VlogSet | |||
class VlogSet; | |||
class VlogGC { | |||
public: | |||
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {} | |||
void do_gc(size_t old_vlog_num, size_t new_vlog_num); | |||
private: | |||
void exec_gc(size_t old_vlog_num, size_t new_vlog_num); | |||
inline bool value_deleted(uint16_t value_len) { | |||
return !(value_len >> 15); | |||
} | |||
inline uint16_t get_value_len(char *value) { | |||
uint16_t value_len; | |||
memcpy(&value_len, value, sizeof(uint16_t)); | |||
return value_len; | |||
} | |||
inline size_t get_value_slotnum(char *value) { | |||
size_t slot_num; | |||
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); | |||
return slot_num; | |||
} | |||
SlotPage *slot_page_; | |||
ThreadPool *thread_pool_; | |||
VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp | |||
}; | |||
#endif // LEVELDB_VLOG_GC_H |
@ -0,0 +1,336 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/29. | |||
// | |||
#include "vlog_set.h" | |||
#include "../db/vlog_gc.h" | |||
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | | |||
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | | |||
// value: { value_len(uint16_t) | slot_num(size_t) | value } | |||
VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) { | |||
auto cfname = get_config_file_name(); | |||
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); | |||
if (!this->config_file_->is_open()) { | |||
// config 文件不存在,尝试创建 | |||
delete this->config_file_; | |||
this->config_file_ = new std::fstream(cfname, std::ios::out); | |||
this->config_file_->close(); | |||
delete this->config_file_; | |||
// 重新以读写模式打开 | |||
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); | |||
this->vlog_nums_ = 0; | |||
} else { | |||
// config 文件存在 | |||
size_t _vlog_nums_; | |||
config_file_->seekp(0); | |||
config_file_->read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t)); | |||
this->vlog_nums_ = _vlog_nums_; | |||
// 从 config file 中读取所有有效的vlog名字 | |||
config_file_->seekp(sizeof(size_t)); | |||
size_t tmp[_vlog_nums_]; | |||
config_file_->read(reinterpret_cast<char*>(tmp), sizeof(tmp)); | |||
for (auto i = 0; i < _vlog_nums_; i++) { | |||
size_t curr_vlog_num = tmp[i]; | |||
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) { | |||
auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out); | |||
size_t curr_vlog_header[2]; | |||
curr_vlog.seekp(0); | |||
curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t)); | |||
curr_vlog.close(); | |||
restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0])); | |||
} | |||
} | |||
} | |||
if (!this->config_file_->is_open()) { | |||
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; | |||
std::exit(EXIT_FAILURE); | |||
} | |||
} | |||
VlogSet::~VlogSet() { | |||
config_file_->seekp(0); | |||
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t)); | |||
config_file_->flush(); | |||
config_file_->close(); | |||
for (auto & it : vlog_info_map_) { | |||
struct vlog_info* vinfo = it.second; | |||
// 使用 vlog_info* 进行操作 | |||
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; | |||
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); | |||
vlog_file_handler.seekp(0); | |||
vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp)); | |||
vlog_file_handler.flush(); | |||
vlog_file_handler.close(); | |||
} | |||
delete vlog_gc; | |||
} | |||
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||
mtx.lock(); | |||
auto vinfo = get_vlog_info(vlog_num); | |||
auto vhandler = get_vlog_handler(vlog_num); | |||
vinfo->vlog_info_latch_.lock(); | |||
if (!vinfo->vlog_valid_) { | |||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
} | |||
vhandler->vlog_latch_.soft_lock(); | |||
vinfo->vlog_info_latch_.unlock(); | |||
read_vlog_value(vlog_num, value_offset, value); | |||
vhandler->vlog_latch_.soft_unlock(); | |||
mtx.unlock(); | |||
} | |||
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { | |||
for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) { | |||
struct vlog_info* vinfo = it->second; | |||
// 使用 vlog_info* 进行操作 | |||
if (vinfo->curr_size + value_size <= VLOG_SIZE) { | |||
return vinfo; | |||
} | |||
} | |||
// 所有vlog已满,创建新vlog | |||
return nullptr; | |||
} | |||
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { | |||
mtx.lock(); | |||
auto vinfo = get_writable_vlog_info(value.size()); | |||
if (!vinfo) { | |||
// vlog全部已满,创建新的vlog | |||
auto _vlog_num_ = register_new_vlog(); | |||
vinfo = get_vlog_info(_vlog_num_); | |||
} | |||
vinfo->vlog_info_latch_.lock(); | |||
*vlog_num = vinfo->vlog_num; | |||
*value_offset = vinfo->curr_size; | |||
vinfo->curr_size += value.size(); | |||
vinfo->value_nums ++; | |||
vinfo->vlog_info_latch_.unlock(); | |||
auto vhandler = get_vlog_handler(*vlog_num); | |||
vinfo->vlog_info_latch_.lock(); | |||
if (!vinfo->vlog_valid_) { | |||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
} | |||
vhandler->vlog_latch_.hard_lock(); | |||
vinfo->vlog_info_latch_.unlock(); | |||
write_vlog_value(*vlog_num, *value_offset, value); | |||
vhandler->vlog_latch_.hard_unlock(); | |||
mtx.unlock(); | |||
} | |||
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { | |||
mtx.lock(); | |||
auto vinfo = get_vlog_info(vlog_num); | |||
auto vhandler = get_vlog_handler(vlog_num); | |||
vinfo->vlog_info_latch_.lock(); | |||
if (!vinfo->vlog_valid_) { | |||
// auto vlog_num_gc = vinfo->vlog_num_for_gc; | |||
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); | |||
// vinfo->vlog_info_latch_.unlock(); | |||
// vinfo = get_vlog_info(vlog_num_gc); | |||
// vinfo->vlog_info_latch_.lock(); | |||
} | |||
vhandler->vlog_latch_.hard_lock(); | |||
mark_del_value(vlog_num, value_offset); | |||
vinfo->vlog_info_latch_.unlock(); | |||
vhandler->vlog_latch_.hard_unlock(); | |||
mtx.unlock(); | |||
} | |||
size_t VlogSet::register_new_vlog() { | |||
size_t vn = vlog_nums_; | |||
std::string vlog_name = get_vlog_name(vn); | |||
register_inconfig_file(vn); | |||
create_vlog(vlog_name); | |||
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
// if (!vlog_new->is_open()) { | |||
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; | |||
// std::exit(EXIT_FAILURE); | |||
// } | |||
register_vlog_inmaps(vn, vlog_name); | |||
vlog_nums_ ++; | |||
// vlog_count_ ++; | |||
return vn; | |||
} | |||
void VlogSet::remove_old_vlog(size_t old_vlog_num) { | |||
// after gc, new_vlog has been created | |||
// new vlog should have been created here ! | |||
mtx.lock(); | |||
auto vi_old = get_vlog_info(old_vlog_num); | |||
vi_old->vlog_info_latch_.lock(); | |||
std::string old_vlog_name = get_vlog_name(old_vlog_num); | |||
remove_from_config_file(old_vlog_num); | |||
remove_vlog_from_maps(old_vlog_name); | |||
vi_old->vlog_valid_ = false; | |||
vi_old->vlog_info_latch_.unlock(); | |||
// vlog_count_ --; | |||
mtx.unlock(); | |||
} | |||
bool VlogSet::vlog_need_gc(size_t vlog_num) { | |||
std::string vlog_name = get_vlog_name(vlog_num); | |||
auto vi = vlog_info_map_[vlog_name]; | |||
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); | |||
return retval; | |||
} | |||
void VlogSet::register_inconfig_file(size_t vlog_num) { | |||
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | | |||
// first size_t in config file indicates current vlog_nums_(size_t) | |||
// second size_t in config file indicates current vlog_count_(size_t) | |||
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); | |||
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t)); | |||
config_file_->flush(); | |||
} | |||
void VlogSet::remove_from_config_file(size_t vlog_num) { | |||
char tmp[vlog_nums_*vlog_num_size]; | |||
size_t index = 0; | |||
config_file_->seekp(sizeof(size_t)); | |||
config_file_->read(tmp, sizeof(tmp)); | |||
size_t *vlog_num_ptr = reinterpret_cast<size_t*>(tmp); | |||
for (auto i = 0; i < vlog_nums_; i++) { | |||
size_t curr_vlog_num = *vlog_num_ptr; | |||
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) { | |||
curr_vlog_num &= CONFIG_FILE_DELE_MASK; | |||
config_file_->seekp(sizeof(size_t) + i*sizeof(size_t)); | |||
config_file_->write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t)); | |||
config_file_->flush(); | |||
break; | |||
} | |||
} | |||
} | |||
void VlogSet::create_vlog(std::string &vlog_name) { | |||
std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); | |||
char tmp[2*sizeof(size_t)]; | |||
memset(tmp, 0, sizeof(tmp)); | |||
vlog_new->write(tmp, sizeof(tmp)); | |||
vlog_new->flush(); | |||
vlog_new->close(); | |||
} | |||
inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { | |||
auto vlog_name = get_vlog_name(vi->vlog_num); | |||
vlog_info_map_[vlog_name] = vi; | |||
vlog_handler_map_[vlog_name] = new vlog_handler(); | |||
} | |||
inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { | |||
vlog_info_map_[vlog_name] = new vlog_info(vlog_num); | |||
vlog_handler_map_[vlog_name] = new vlog_handler(); | |||
} | |||
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { | |||
auto vi = vlog_info_map_[vlog_name]; | |||
// auto vh = vlog_handler_map_[vlog_name]; | |||
vi->vlog_info_latch_.lock(); | |||
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag | |||
// vh->handler_->close(); | |||
assert(!std::remove(vlog_name.c_str())); | |||
vlog_handler_map_.erase(vlog_name); | |||
vi->vlog_info_latch_.unlock(); | |||
} | |||
inline std::string VlogSet::get_config_file_name() { | |||
return dbname + "_config_file"; | |||
} | |||
std::string VlogSet::get_vlog_name(size_t vlog_num) { | |||
return dbname + "_vlog_" + std::to_string(vlog_num); | |||
} | |||
struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) { | |||
return vlog_info_map_[get_vlog_name(vlog_num)]; | |||
} | |||
struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { | |||
return vlog_handler_map_[get_vlog_name(vlog_num)]; | |||
} | |||
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { | |||
auto vlog_name = get_vlog_name(vlog_num); | |||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
handler.seekp(value_offset); | |||
char value_buff[VALUE_BUFF_SIZE]; | |||
handler.read(value_buff, VALUE_BUFF_SIZE); | |||
uint16_t value_size; | |||
memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||
value_size &= VALUE_SIZE_MASK; | |||
value->assign(&value_buff[sizeof(uint16_t)], value_size); | |||
handler.close(); | |||
} | |||
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { | |||
auto vlog_name = get_vlog_name(vlog_num); | |||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
handler.seekp(value_offset); | |||
const char *value_buff = value.data(); | |||
handler.write(value_buff, value.size()); | |||
auto vinfo = get_vlog_info(vlog_num); | |||
vinfo->value_nums ++; | |||
vinfo->curr_size += value.size(); | |||
handler.flush(); | |||
handler.close(); | |||
} | |||
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { | |||
auto vinfo = get_vlog_info(vlog_num); | |||
auto vlog_name = get_vlog_name(vlog_num); | |||
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); | |||
handler.seekp(value_offset); | |||
char value_buff[VALUE_BUFF_SIZE]; | |||
handler.read(value_buff, VALUE_BUFF_SIZE); | |||
uint16_t value_size; | |||
memcpy(&value_size, value_buff, sizeof(uint16_t)); | |||
if (value_size & VALUE_DELE_MASK) { | |||
// case when value has been deleted | |||
handler.close(); | |||
return ; | |||
} | |||
assert(!(value_size & VALUE_DELE_MASK)); | |||
uint16_t masked_value_size = value_size & 0xffff; | |||
memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); | |||
handler.write(value_buff, value_size); | |||
handler.flush(); | |||
handler.close(); | |||
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too | |||
// auto vinfo = get_vlog_info(vlog_num); | |||
vinfo->discard ++; | |||
vinfo->value_nums --; | |||
vinfo->curr_size -= value_size & VALUE_SIZE_MASK; | |||
if (vlog_need_gc(vlog_num)) { | |||
// create new vlog | |||
vinfo->vlog_valid_ = false; | |||
vinfo->vlog_num_for_gc = register_new_vlog(); | |||
// vinfo->vlog_valid_ = false; | |||
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); | |||
} | |||
} |
@ -0,0 +1,67 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/29. | |||
// | |||
#ifndef LEVELDB_VLOG_SET_H | |||
#define LEVELDB_VLOG_SET_H | |||
#include <unordered_map> | |||
#include <string> | |||
#include <mutex> | |||
#include <cassert> | |||
#include "../include/leveldb/slice.h" | |||
#include "../db/shared_lock.h" | |||
#include "../db/vlog.h" | |||
// 前向声明 VlogGC | |||
class VlogGC; | |||
class VlogSet { | |||
friend class VlogGC; | |||
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) | |||
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) | |||
public: | |||
VlogSet(std::string dbname, VlogGC *vlog_gc); | |||
~VlogSet(); | |||
void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); | |||
void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value); | |||
void del_value(uint32_t vlog_num, uint32_t value_offset); | |||
void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; } | |||
private: | |||
size_t register_new_vlog(); | |||
void remove_old_vlog(size_t old_vlog_num); | |||
bool vlog_need_gc(size_t vlog_num); | |||
void register_inconfig_file(size_t vlog_num); | |||
void remove_from_config_file(size_t vlog_num); | |||
void create_vlog(std::string &vlog_name); | |||
struct vlog_info *get_writable_vlog_info(size_t value_size); | |||
inline void restore_vlog_inmaps(struct vlog_info *vi); | |||
inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name); | |||
inline void remove_vlog_from_maps(std::string &vlog_name); | |||
inline std::string get_config_file_name(); | |||
std::string get_vlog_name(size_t vlog_num); | |||
struct vlog_info *get_vlog_info(size_t vlog_num); | |||
struct vlog_handler *get_vlog_handler(size_t vlog_num); | |||
void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); | |||
void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value); | |||
void mark_del_value(uint32_t vlog_num, uint32_t value_offset); | |||
private: | |||
std::mutex mtx; | |||
std::string dbname; | |||
size_t vlog_nums_; | |||
// size_t vlog_count_; | |||
std::unordered_map<std::string, struct vlog_info *> vlog_info_map_; | |||
std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_; | |||
std::fstream *config_file_; | |||
VlogGC *vlog_gc; // 仅声明为指针,具体定义放在 vlog_set.cpp | |||
}; | |||
#endif // LEVELDB_VLOG_SET_H |
@ -0,0 +1,127 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/20. | |||
// | |||
#include "../db/slotpage.h" | |||
#include <thread> | |||
#define SLOTNUM 8192 | |||
void printVector(const std::vector<size_t>& vec) { | |||
for (size_t i = 0; i < vec.size(); ++i) { | |||
std::cout << vec[i] << " "; | |||
} | |||
std::cout << std::endl; | |||
} | |||
int test1() { | |||
BitMap bitmap; | |||
std::vector<size_t> slots; | |||
slots.reserve(SLOTNUM<<2); | |||
int i = 0; | |||
for (i = 0; i < 2047; i++) { | |||
slots.push_back(bitmap.alloc_slot()); | |||
} | |||
slots.push_back(bitmap.alloc_slot()); | |||
for (; i < SLOTNUM<<2; i++) { | |||
slots.push_back(bitmap.alloc_slot()); | |||
} | |||
printVector(slots); | |||
std::cout << "\n\n"; | |||
for (i = 0; i < 100; i++) { | |||
bitmap.dealloc_slot(slots.at(i)); | |||
} | |||
bitmap.dealloc_slot(10000); | |||
bitmap.dealloc_slot(11000); | |||
bitmap.dealloc_slot(12000); | |||
bitmap.show_allocated_slot(); | |||
std::cout << "\n\n"; | |||
for (i = 0; i < 103; i++) { | |||
bitmap.alloc_slot(); | |||
} | |||
bitmap.show_allocated_slot(); | |||
return 1; | |||
} | |||
int test2() { | |||
BitMap bitmap; | |||
std::vector<size_t> slots; | |||
slots.reserve(SLOTNUM); | |||
for (auto i = 0; i < SLOTNUM; i++) { | |||
slots.push_back(bitmap.alloc_slot()); | |||
} | |||
printVector(slots); | |||
std::cout << "\n\n"; | |||
bitmap.dealloc_slot(30); | |||
bitmap.dealloc_slot(40); | |||
bitmap.dealloc_slot(50); | |||
bitmap.show_allocated_slot(); | |||
std::cout << "\n\n"; | |||
bitmap.alloc_slot(); | |||
bitmap.alloc_slot(); | |||
bitmap.show_allocated_slot(); | |||
return 1; | |||
} | |||
void allocate_slots(BitMap& bitmap, std::vector<size_t>& slots, int num_slots) { | |||
for (int i = 0; i < num_slots; i++) { | |||
slots.push_back(bitmap.alloc_slot()); | |||
} | |||
} | |||
int test3() { | |||
BitMap bitmap; | |||
std::vector<size_t> slots1; | |||
slots1.reserve(1000); | |||
std::vector<size_t> slots2; | |||
slots2.reserve(1000); | |||
// Create two threads to allocate slots concurrently. | |||
std::thread thread1(allocate_slots, std::ref(bitmap), std::ref(slots1), 1000); | |||
std::thread thread2(allocate_slots, std::ref(bitmap), std::ref(slots2), 1000); | |||
// Wait for both threads to finish. | |||
thread1.join(); | |||
thread2.join(); | |||
// Print the results. | |||
std::cout << "Slots allocated by thread 1:\n"; | |||
printVector(slots1); | |||
std::cout << "\n\nSlots allocated by thread 2:\n"; | |||
printVector(slots2); | |||
std::cout << "\n\n"; | |||
std::vector<size_t> combined_slots = slots1; | |||
combined_slots.insert(combined_slots.end(), slots2.begin(), slots2.end()); | |||
std::sort(combined_slots.begin(), combined_slots.end()); | |||
// Print the sorted combined slots. | |||
std::cout << "Sorted combined slots:\n"; | |||
printVector(combined_slots); | |||
std::cout << "\n\n"; | |||
return 1; | |||
} | |||
int main() { | |||
test3(); // 有问题 | |||
} |
@ -0,0 +1,148 @@ | |||
// | |||
// Created by 马也驰 on 2024/12/22. | |||
// | |||
#include <iostream> | |||
#include <thread> | |||
#include <vector> | |||
#include "../db/slotpage.h" | |||
void printVector(const std::vector<uint32_t>& vec) { | |||
for (size_t i = 0; i < vec.size(); ++i) { | |||
std::cout << vec[i] << " "; | |||
} | |||
std::cout << std::endl; | |||
} | |||
int test1() { | |||
SlotCache slot_cache((std::string&)"test_slotpage"); | |||
const size_t blksize = 4096; | |||
const size_t blknum = 32; | |||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||
slot_content slot_contents[slot_content_num]; | |||
for (int i = 0; i < slot_content_num; i++) { | |||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||
} | |||
// | |||
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content)); | |||
slot_content sc; | |||
for (int i = 0; i < slot_content_num; i++) { | |||
slot_cache.get_slot(i, &sc); | |||
assert(sc.vlog_num == slot_contents[i].vlog_num); | |||
assert(sc.value_offset == slot_contents[i].vlog_num); | |||
std::cout << i << std::endl; | |||
} | |||
return 1; | |||
} | |||
int test2() { | |||
SlotCache slot_cache((std::string&)"test_slotpage"); | |||
const size_t blksize = 4096; | |||
const size_t blknum = 32; | |||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||
slot_content slot_contents[slot_content_num]; | |||
for (int i = 0; i < slot_content_num; i++) { | |||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||
} | |||
// | |||
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content)); | |||
slot_content sc1 = {999999999, 999999999}; | |||
// slot_cache.set_slot(100, &sc1); | |||
// slot_cache.set_slot(1000, &sc1); | |||
// slot_cache.set_slot(2000, &sc1); | |||
slot_content sc2 = {0, 0}; | |||
slot_cache.get_slot(100, &sc2); | |||
assert(sc2.vlog_num == sc1.vlog_num); | |||
assert(sc2.value_offset == sc1.value_offset); | |||
slot_cache.get_slot(1000, &sc2); | |||
assert(sc2.vlog_num == sc1.vlog_num); | |||
assert(sc2.value_offset == sc1.value_offset); | |||
slot_cache.get_slot(2000, &sc2); | |||
assert(sc2.vlog_num == sc1.vlog_num); | |||
assert(sc2.value_offset == sc1.value_offset); | |||
// for (int i = 0; i < slot_content_num; i++) { | |||
// slot_cache.get_slot(i, &sc); | |||
// assert(sc.vlog_num == slot_contents[i].vlog_num); | |||
// assert(sc.value_offset == slot_contents[i].vlog_num); | |||
// std::cout << i << std::endl; | |||
// } | |||
return 1; | |||
} | |||
void process_slots(SlotCache& slot_cache, | |||
const slot_content* slot_contents, | |||
size_t start, size_t end, | |||
std::vector<uint32_t>& slots) { | |||
slot_content sc; | |||
for (size_t i = start; i < end; ++i) { | |||
slot_cache.get_slot(i, &sc); | |||
// assert(sc.vlog_num == slot_contents[i].vlog_num); | |||
// assert(sc.value_offset == slot_contents[i].vlog_num); | |||
slots.push_back(sc.vlog_num); | |||
} | |||
} | |||
int test3() { | |||
SlotCache slot_cache((std::string&)("test_slotpage")); | |||
const size_t blksize = 4096; | |||
const size_t blknum = 32; | |||
const size_t slot_content_num = blknum * blksize / sizeof(slot_content); | |||
slot_content slot_contents[slot_content_num]; | |||
for (size_t i = 0; i < slot_content_num; i++) { | |||
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)}; | |||
} | |||
std::vector<uint32_t> slots1; | |||
std::vector<uint32_t> slots2; | |||
// 创建两个线程分别处理 slots1 和 slots2 | |||
std::thread t1(process_slots, std::ref(slot_cache), slot_contents, 0, 100, std::ref(slots1)); | |||
std::thread t2(process_slots, std::ref(slot_cache), slot_contents, 100, 200, std::ref(slots2)); | |||
// 等待线程完成 | |||
t1.join(); | |||
t2.join(); | |||
// 打印两个 vector 的内容 | |||
std::cout << "Slots1:" << std::endl; | |||
printVector(slots1); | |||
std::cout << std::endl; | |||
std::cout << "Slots2:" << std::endl; | |||
printVector(slots2); | |||
std::cout << std::endl; | |||
// 合并并排序 | |||
std::vector<uint32_t> merged_slots = slots1; | |||
merged_slots.insert(merged_slots.end(), slots2.begin(), slots2.end()); | |||
std::sort(merged_slots.begin(), merged_slots.end()); | |||
// 打印合并排序后的结果 | |||
std::cout << "Merged and Sorted Slots:" << std::endl; | |||
printVector(merged_slots); | |||
std::cout << std::endl; | |||
return 1; | |||
} | |||
int main() { | |||
test3(); | |||
} |