diff --git a/CMakeLists.txt b/CMakeLists.txt index cf5eec7..373e4f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,7 +118,14 @@ endif(BUILD_SHARED_LIBS) include(GNUInstallDirs) add_library(leveldb "" - db/slotpage.h) + db/slotpage.h + db/vlog.h + db/threadpool.h + db/shared_lock.h + db/vlog_set.cpp + db/vlog_set.h + db/vlog_gc.cpp + db/vlog_gc.h) target_sources(leveldb PRIVATE "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" @@ -521,10 +528,10 @@ endif(LEVELDB_INSTALL) add_executable(db_test1 "${PROJECT_SOURCE_DIR}/test/db_test1.cc" + test/db_test1.cc ) target_link_libraries(db_test1 leveldb) - add_executable(db_test2 "${PROJECT_SOURCE_DIR}/test/db_test2.cc" ) @@ -533,4 +540,15 @@ target_link_libraries(db_test2 PRIVATE leveldb) add_executable(db_test3 "${PROJECT_SOURCE_DIR}/test/db_test3.cc" ) -target_link_libraries(db_test3 PRIVATE leveldb gtest) \ No newline at end of file +target_link_libraries(db_test3 PRIVATE leveldb gtest) + +add_executable(db_test4 + "${PROJECT_SOURCE_DIR}/test/db_test4.cc" +) +target_link_libraries(db_test4 PRIVATE leveldb) + +add_executable(db_test5 + "${PROJECT_SOURCE_DIR}/test/db_test5.cc" + test/db_test5.cc +) +target_link_libraries(db_test5 PRIVATE leveldb) \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index b7927bd..2353227 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -147,7 +147,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) background_compaction_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_)) {} + &internal_comparator_)), + slot_page_(new SlotPage(dbname)) { + + vlog_set_ = new VlogSet(dbname, nullptr); + vlog_set_->set_vlog_gc(new VlogGC(slot_page_, vlog_set_)); +} DBImpl::~DBImpl() { // Wait for background work to finish. @@ -176,6 +181,9 @@ DBImpl::~DBImpl() { if (owns_cache_) { delete options_.block_cache; } + + delete slot_page_; + delete vlog_set_; } Status DBImpl::NewDB() { @@ -1165,6 +1173,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); + + // TODO: search the slotpage and get value from vlog + size_t slot_num = *(size_t *)value->c_str(); + struct slot_content sc; + std::string vlog_value; + slot_page_->get_slot(slot_num, &sc); + vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); + *value = vlog_value; + return s; } @@ -1199,10 +1216,39 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - return DB::Put(o, key, val); + // TODO: allocate slot_num in slotpage and put value in vlog + + size_t slot_num = slot_page_->alloc_slot(); + struct slot_content sc; + vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val); + slot_page_->set_slot(slot_num, &sc); + + char data[sizeof(size_t)]; + memcpy(data, &slot_num, sizeof(size_t)); + Slice slot_val(data, sizeof(data)); + + return DB::Put(o, key, slot_val); } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { +// size_t slot_num = *(size_t *)value->c_str(); +// struct slot_content sc; +// std::string vlog_value; +// slot_page_->get_slot(slot_num, &sc); +// vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); +// *value = vlog_value; + ReadOptions ro; + ro.verify_checksums = true; + ro.fill_cache = false; + ro.snapshot = nullptr; + std::string value; + Get(ro, key, &value); + size_t slot_num = *(size_t *)value.c_str(); + struct slot_content sc; + std::string vlog_value; + slot_page_->get_slot(slot_num, &sc); + vlog_set_->del_value(sc.vlog_num, sc.value_offset); + return DB::Delete(options, key); } diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..9b74668 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -13,6 +13,9 @@ #include "db/dbformat.h" #include "db/log_writer.h" #include "db/snapshot.h" +#include "db/vlog_set.h" +#include "db/vlog_gc.h" +#include "db/slotpage.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "port/port.h" @@ -76,6 +79,9 @@ class DBImpl : public DB { struct CompactionState; struct Writer; + SlotPage *slot_page_; + VlogSet *vlog_set_; + // Information for a manual compaction struct ManualCompaction { int level; diff --git a/db/shared_lock.h b/db/shared_lock.h new file mode 100644 index 0000000..0d90490 --- /dev/null +++ b/db/shared_lock.h @@ -0,0 +1,52 @@ +// +// Created by 马也驰 on 2024/12/28. +// + +#ifndef LEVELDB_SHARED_LOCK_H +#define LEVELDB_SHARED_LOCK_H + +#include + +class SharedLock { + public: + SharedLock() : read_count_(0) {} + + public: + void soft_lock() { + write_latch_.lock(); + + read_count_latch_.lock(); + if (!read_count_) { + read_latch_.lock(); + } + read_count_ ++; + read_count_latch_.unlock(); + + write_latch_.unlock(); + } + void soft_unlock() { + read_count_latch_.lock(); + read_count_ --; + if (!read_count_) { + read_latch_.unlock(); + } + read_count_latch_.unlock(); + } + void hard_lock() { + read_latch_.lock(); + write_latch_.lock(); + read_latch_.unlock(); + } + void hard_unlock() { + write_latch_.unlock(); + } + + private: + std::mutex read_latch_; // indicate if any read is proceeding + std::mutex read_count_latch_; + volatile size_t read_count_; + std::mutex write_latch_; + // volatile size_t write_count_; +}; + +#endif // LEVELDB_SHARED_LOCK_H diff --git a/db/slotpage.h b/db/slotpage.h index be08919..023ce60 100644 --- a/db/slotpage.h +++ b/db/slotpage.h @@ -12,6 +12,8 @@ #include #include #include +#include +#include #define BITMAP_SIZE 8192 #define BITS_PER_BYTE 8 @@ -24,12 +26,14 @@ struct slot_content { uint32_t vlog_num; uint32_t value_offset; + slot_content() {} slot_content(uint32_t vn, uint32_t vo) { vlog_num = vn; value_offset = vo; } }; +// test passed class SlotCache { // slot number -> slot content #define BLOCK_NUM 16 @@ -37,7 +41,21 @@ class SlotCache { #define SLOT_PER_BLOCK (BLOCK_SIZE/sizeof(slot_content)) #define SLOT_OFFSET_IN_BLOCK(slot_num) ((slot_num)%SLOT_PER_BLOCK) public: - SlotCache() { + SlotCache(const std::string &slotpage_fname) { + this->slotpage_fname = slotpage_fname; +// this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out); + this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out); + if (!this->slotpage_handler.is_open()) { + // 文件不存在,尝试创建 + this->slotpage_handler = std::fstream(slotpage_fname, std::ios::out); + this->slotpage_handler.close(); + // 重新以读写模式打开 + this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out); + } + if (!this->slotpage_handler.is_open()) { + std::cerr << "Failed to open or create the file: " << slotpage_fname << std::endl; + std::exit(EXIT_FAILURE); + } for (auto i = 0; i < BLOCK_NUM; i++) { block_cache[i] = static_cast(malloc(BLOCK_SIZE)); access_time[i] = 0; @@ -45,43 +63,78 @@ public: } } - void get_slot(std::string &slotpage_fname, size_t slot_num, struct slot_content *sc) { + ~SlotCache() { + for (auto i = 0; i < BLOCK_NUM; i++) { + latches_[i].lock(); + if (info[i].is_dirty) { + write_back_block(info[i].block_num); + } + latches_[i].unlock(); + } + this->slotpage_handler.close(); + } + + /// methods only for test + void write_for_test(char *sc, size_t bytes) { + slotpage_handler.seekp(0); + slotpage_handler.write(reinterpret_cast(sc), bytes); + slotpage_handler.flush(); + } + + void flush_all_blk() { + for (auto blkinfo : info) { + if (blkinfo.is_dirty) { + write_back_block(blkinfo.block_num); + } + } + this->slotpage_handler.flush(); + } + + void get_slot(size_t slot_num, struct slot_content *sc) { auto block_num = slotnum_hash2_blocknum(slot_num); auto blockcache_num = block_num % BLOCK_NUM; - mtx.lock(); - if (info[blockcache_num].slotpage_fname != slotpage_fname) { // cache miss - write_back_block(blockcache_num); - read_in_block(blockcache_num, slotpage_fname, block_num); + latches_[blockcache_num].lock(); + if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { // cache miss + if (info[blockcache_num].is_dirty) { + write_back_block(blockcache_num); + } + read_in_block(blockcache_num, block_num); + access_time[blockcache_num] = 0; + info[blockcache_num] = block_info(block_num, false, true); } read_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); access_time[blockcache_num]++; - mtx.unlock(); + latches_[blockcache_num].unlock(); } - void set_slot(std::string &slotpage_fname, size_t slot_num, struct slot_content *sc) { + void set_slot(size_t slot_num, struct slot_content *sc) { auto block_num = slotnum_hash2_blocknum(slot_num); auto blockcache_num = block_num % BLOCK_NUM; - mtx.lock(); - if (info[blockcache_num].slotpage_fname != slotpage_fname) { - write_back_block(blockcache_num); - read_in_block(blockcache_num, slotpage_fname, block_num); + latches_[blockcache_num].lock(); + if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { + if (info[blockcache_num].is_dirty) { + write_back_block(blockcache_num); + } + read_in_block(blockcache_num, block_num); + access_time[blockcache_num] = 0; + info[blockcache_num] = block_info(block_num, false, true); } set_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); access_time[blockcache_num]++; info[blockcache_num].is_dirty = true; - mtx.unlock(); + latches_[blockcache_num].unlock(); } private: struct block_info { - std::string slotpage_fname; + bool used; size_t block_num; bool is_dirty; - block_info() {} - block_info(std::string &slotpage_fname, size_t block_num, bool is_dirty=false) { - this->slotpage_fname = slotpage_fname; + block_info() { used = false; is_dirty = false; } + block_info(size_t block_num, bool is_dirty=false, bool used = false) { this->block_num = block_num; this->is_dirty = is_dirty; + this->used = used; } }; @@ -90,19 +143,15 @@ private: return slot_num * sizeof(slot_content) / BLOCK_SIZE; } void write_back_block(size_t blockcache_num) { - auto &fname = info[blockcache_num].slotpage_fname; auto write_pos = info[blockcache_num].block_num * BLOCK_SIZE; - auto slotpage_handler = std::fstream(fname, std::ios::in | std::ios::out); slotpage_handler.seekp(write_pos); slotpage_handler.write( reinterpret_cast(block_cache[blockcache_num]), BLOCK_SIZE); + slotpage_handler.flush(); } - void read_in_block(size_t blockcache_num, std::string &slotpage_fname, size_t block_num) { - auto slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out); + void read_in_block(size_t blockcache_num, size_t block_num) { slotpage_handler.seekp(block_num*BLOCK_SIZE); slotpage_handler.read(reinterpret_cast(block_cache[blockcache_num]), BLOCK_SIZE); - access_time[blockcache_num] = 0; - info[blockcache_num] = block_info(slotpage_fname, block_num); } inline void read_slot(struct slot_content *sc, size_t blockcache_num, size_t slot_num_offset) { auto src = &block_cache[blockcache_num][slot_num_offset]; @@ -114,23 +163,41 @@ private: } private: - std::mutex mtx; + std::string slotpage_fname; + std::fstream slotpage_handler; + std::mutex latches_[BLOCK_NUM]; struct slot_content *block_cache[BLOCK_NUM]; size_t access_time[BLOCK_NUM]; struct block_info info[BLOCK_NUM]; }; - +/// test passed class BitMap { // in memory bitmap public: BitMap() { char *bitmap = static_cast(malloc(BITMAP_SIZE*sizeof(char))); + memset(bitmap, 0, BITMAP_SIZE * sizeof(char)); bitmaps_.push_back(bitmap); size = BITMAP_SIZE; first_empty_slot = 0; } + /** methods for test **/ + void show_allocated_slot() { + for (int i = 0; i < this->size; i++) { + auto byte = *get_bitmap_byte(i); + for (int b = 0; b < sizeof(byte) << 3; b++) { + if (byte & 0x80) { + std::cout << (i<<3)+b << ' '; + } + byte <<= 1; + } +// std::cout << std::endl; + } +// std::cout << std::endl; + } + void dealloc_slot(size_t slot_num) { mtx.lock(); const size_t byte = slot2byte(slot_num); @@ -149,23 +216,29 @@ public: char *start_byte = get_bitmap_byte(slot2byte(first_empty_slot)); const size_t off = slot2offset(first_empty_slot); SETBIT(start_byte, off); + // find the next free slot if (HASFREESLOT(*start_byte)) { auto bit_off = find_first_free_slot_inbyte(*start_byte); first_empty_slot += bit_off - off; + if (slot2byte(first_empty_slot) >= size) { + alloc_new_bitmap(); + } } else { size_t i; for (i = slot2byte(first_empty_slot)+1; i < size; i++) { char *byte = get_bitmap_byte(i); if (HASFREESLOT(*byte)) { + // FIXME: pack four bytes to do free slot finding auto bit_off = find_first_free_slot_inbyte(*byte); first_empty_slot = byte2slot(i) + bit_off; + break; } } // scale the bitmap if (i >= size) { alloc_new_bitmap(); - char *byte = get_bitmap_byte(i); - SETBIT(byte, 0); +// char *byte = get_bitmap_byte(i); +// SETBIT(byte, 0); first_empty_slot = byte2slot(i) + 1; } } @@ -175,7 +248,7 @@ public: private: static inline size_t slot2byte(size_t slot_num) { return slot_num / BITS_PER_BYTE; } - static inline size_t byte2slot(uint8_t byte_num) { return byte_num * BITS_PER_BYTE; } + static inline size_t byte2slot(size_t byte_num) { return byte_num * BITS_PER_BYTE; } static inline size_t slot2offset(size_t slot_num) { return slot_num % BITS_PER_BYTE; } inline char *get_bitmap_byte(size_t byte_num) { char *bitmap = bitmaps_.at(byte_num / BITMAP_SIZE); @@ -186,11 +259,26 @@ private: // bitmap[byte_num % BITMAP_SIZE] = byte; // } static inline size_t find_first_free_slot_inbyte(uint8_t byte) { - uint32_t rbyte = __builtin_bswap32((uint32_t)byte) >> 24; // 使用 32 位变量 - return BITS_PER_BYTE - __builtin_ctz(~rbyte) - 1; // 使用 GCC 内建函数找到最低位 1 的位置 + uint32_t rbyte = reverse_bits(byte); // 使用 32 位变量 + return __builtin_ctz(~rbyte); // 使用 GCC 内建函数找到最低位 1 的位置 + } + static inline uint8_t reverse_bits(uint8_t byte) { + uint8_t reversed = 0; + + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 1st bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 2nd bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 3rd bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 4th bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 5th bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 6th bit + reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 7th bit + reversed = (reversed << 1) | (byte & 1); // 8th bit (final step) + + return reversed; } void alloc_new_bitmap() { char *bitmap = static_cast(malloc(BITMAP_SIZE*sizeof(char))); + memset(bitmap, 0, BITMAP_SIZE * sizeof(char)); bitmaps_.push_back(bitmap); size += BITMAP_SIZE; } @@ -203,28 +291,32 @@ private: }; +//SlotCache * get_slotcache(const std::string &dbname); + // read and write slot in disk // first version with no cache class SlotPage { public: - SlotPage(std::string &dbname) { + SlotPage(const std::string &dbname) { slotpage_fname = slotpage_handler_name(dbname); bitmap = new BitMap(); + slotcache = new SlotCache(slotpage_fname); + assert(slotcache); } public: void get_slot(size_t slot_num, struct slot_content *sc) { - slotcache->get_slot(slotpage_fname, slot_num, sc); + slotcache->get_slot(slot_num, sc); } void set_slot(size_t slot_num, struct slot_content *sc) { - slotcache->set_slot(slotpage_fname, slot_num, sc); + slotcache->set_slot(slot_num, sc); } size_t alloc_slot() { return bitmap->alloc_slot(); } void dealloc_slot(size_t slot_num) { bitmap->dealloc_slot(slot_num); } private: - static std::string slotpage_handler_name(std::string &dbname) { - return "./" + dbname + "_slotpage"; + static std::string slotpage_handler_name(const std::string &dbname) { + return dbname + "_slotpage"; } inline static size_t slotpage_pos(size_t slot_num) { return slot_num * sizeof(slot_content); @@ -233,9 +325,21 @@ private: private: std::string slotpage_fname; BitMap *bitmap; - static SlotCache *slotcache; +// static SlotCache *slotcache; + SlotCache *slotcache; }; -SlotCache *SlotPage::slotcache = new SlotCache(); + +//#ifndef ENABLE_SLOT_CACHES +//#define ENABLE_SLOT_CACHES +//std::unordered_map slotcaches_; +//#endif +// +//SlotCache * get_slotcache(const std::string &dbname) { +// if (slotcaches_.find(dbname) == slotcaches_.end()) { +// slotcaches_[dbname] = new SlotCache(dbname); +// } +// return slotcaches_[dbname]; +//} #endif // LEVELDB_SLOTPAGE_H diff --git a/db/threadpool.h b/db/threadpool.h new file mode 100644 index 0000000..00d861d --- /dev/null +++ b/db/threadpool.h @@ -0,0 +1,67 @@ +// +// Created by 马也驰 on 2024/12/28. +// + +#ifndef LEVELDB_THREADPOOL_H +#define LEVELDB_THREADPOOL_H + + +#include +#include +#include +#include +#include +#include +#include + +class ThreadPool { + public: + explicit ThreadPool(size_t numThreads) : stop(false) { + for (size_t i = 0; i < numThreads; ++i) { + workers.emplace_back([this]() { + while (true) { + std::function task; + { + std::unique_lock lock(queueMutex); + condition.wait(lock, [this]() { return stop || !tasks.empty(); }); + + if (stop && tasks.empty()) return; + + task = std::move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + ~ThreadPool() { + { + std::unique_lock lock(queueMutex); + stop = true; + } + condition.notify_all(); + for (std::thread &worker : workers) { + worker.join(); + } + } + + void enqueue(std::function task) { + { + std::unique_lock lock(queueMutex); + tasks.push(std::move(task)); + } + condition.notify_one(); + } + + private: + std::vector workers; + std::queue> tasks; + std::mutex queueMutex; + std::condition_variable condition; + std::atomic stop; +}; + + +#endif // LEVELDB_THREADPOOL_H diff --git a/db/vlog.h b/db/vlog.h new file mode 100644 index 0000000..54ac5a0 --- /dev/null +++ b/db/vlog.h @@ -0,0 +1,378 @@ +// +// Created by 马也驰 on 2024/12/26. +// + +#ifndef LEVELDB_VLOG_H +#define LEVELDB_VLOG_H + +#include +#include +#include +#include +#include + + +#include "../db/threadpool.h" +#include "../include/leveldb/slice.h" +#include "../db/slotpage.h" +#include "../db/shared_lock.h" + +// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... | +// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | +// value: value_len(uint16_t) | slot_num(size_t) | value | + +#define vlog_num_size sizeof(size_t) +#define vlog_name_size 256 +#define KiB 1024 +#define MiB 1024*KiB +#define VLOG_SIZE 32*MiB +#define GC_THREDHOLD 0.5 + +#define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number +#define VALUE_SIZE_MASK 0x7fff +#define VALUE_DELE_MASK 0x8000 + + +//struct vlog_name { +// int name_size = 0; +// char name[vlog_name_size]; +// vlog_name() { +// memset(name, '\n', sizeof(name)); +// } +// vlog_name(std::string &_name) { +// memset(name, '\n', sizeof(name)); +// memcpy(name, _name.c_str(), _name.size()); +// name_size = _name.size(); +// } +//}; + +struct vlog_info { + std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改 + size_t vlog_num; + size_t vlog_num_for_gc; // set when start gc + bool vlog_valid_; + size_t discard; + size_t value_nums; + size_t curr_size; + vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0), + vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {} + vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums), + vlog_num(vlog_num), curr_size(curr_size) {} +}; + +struct vlog_handler { + SharedLock vlog_latch_; // 表明当前vlog上的并发情况,读上soft_lock,写上hard_lock + vlog_handler() {} +}; + + +//class VlogSet; +// +// +//class VlogGC { +//#define THREADNUM 8 +// public: +// VlogGC(SlotPage *s, VlogSet *vs) : thread_pool_(new ThreadPool(THREADNUM)), +// slot_page_(s), vlog_set(vs) {} +// public: +// void do_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, +// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info){ +// thread_pool_->enqueue([this, old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info]() +// { this->exec_gc(old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); }); +// } +// +// private: +// void exec_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name, +// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info) { +// +// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); +// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); +// +// size_t value_nums = old_vlog_info->value_nums; +// char old_vlog_buff[VLOG_SIZE]; +// char new_vlog_buff[VLOG_SIZE]; +// old_vlog.seekp(0); +// old_vlog.read(old_vlog_buff, VLOG_SIZE); +// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); +// +// size_t ovb_off = sizeof(size_t); +// size_t nvb_off = sizeof(size_t); +// for (auto i = 0; i < value_nums; i++) { +// char *value = &old_vlog_buff[ovb_off]; +// uint16_t value_len = get_value_len(value); +// size_t slot_num = get_value_slotnum(value); +// if (!value_deleted(value_len)) { +// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); +// struct slot_content scn(new_vlog_info->vlog_num, nvb_off); +// slot_page_->set_slot(slot_num, &scn); +// nvb_off += value_len; +// } +// ovb_off += value_len; +// } +// new_vlog.write(new_vlog_buff, VLOG_SIZE); +// new_vlog.flush(); +// +// old_vlog.close(); +// new_vlog.close(); +// +// this->vlog_set->remove_from_config_file(old_vlog_name); +// } +// private: +// inline bool value_deleted(uint16_t value_len) { +// return !(value_len >> 15); +// } +// inline uint16_t get_value_len(char *value) { +// uint16_t value_len; +// memcpy(&value_len, value, sizeof(uint16_t)); +// return value_len; +// } +// inline size_t get_value_slotnum(char *value) { +// size_t slot_num; +// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); +// return slot_num; +// } +// private: +// SlotPage *slot_page_; +// ThreadPool *thread_pool_; +// VlogSet *vlog_set; // inline void remove_vlog_from_maps(std::string &vlog_name) +//}; + + +//class VlogSet { +//// value: value_len(uint16_t) | slot_num(size_t) | value +//// vlog: curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | +//#define VALUE_BUFF_SIZE 0x7fff +//#define VALUE_SIZE_MASK 0x7fff +//#define VALUE_DELE_MASK 0x8000 +// +//friend class VlogGC; +// +//public: +// void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { +// mtx.lock(); +// auto vinfo = get_vlog_info(vlog_num); +// auto vhandler = get_vlog_handler(vlog_num); +// +// vinfo->vlog_info_latch_.lock(); +// if (!vinfo->vlog_valid_) { +// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); +// } +// vhandler->vlog_latch_.soft_lock(); +// vinfo->vlog_info_latch_.unlock(); +// read_vlog_value(vlog_num, value_offset, value); +// vhandler->vlog_latch_.soft_unlock(); +// mtx.unlock(); +// } +// void put_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { +// mtx.lock(); +// auto vinfo = get_vlog_info(vlog_num); +// auto vhandler = get_vlog_handler(vlog_num); +// +// vinfo->vlog_info_latch_.lock(); +// if (!vinfo->vlog_valid_) { +// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); +// } +// vhandler->vlog_latch_.hard_lock(); +// vinfo->vlog_info_latch_.unlock(); +// write_vlog_value(vlog_num, value_offset, value); +// vhandler->vlog_latch_.hard_unlock(); +// mtx.unlock(); +// } +// void del_value(uint32_t vlog_num, uint32_t value_offset) { +// mtx.lock(); +// auto vinfo = get_vlog_info(vlog_num); +// auto vhandler = get_vlog_handler(vlog_num); +// +// vinfo->vlog_info_latch_.lock(); +// if (!vinfo->vlog_valid_) { +// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); +// } +// vhandler->vlog_latch_.hard_lock(); +// mark_del_value(vlog_num, value_offset); +// +// vinfo->vlog_info_latch_.unlock(); +// vhandler->vlog_latch_.hard_unlock(); +// mtx.unlock(); +// } +// +//private: +// size_t register_new_vlog() { +// size_t vn = vlog_nums_; +// std::string vlog_name = get_vlog_name(vn); +// register_inconfig_file(vlog_name); +// create_vlog(vlog_name); +// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); +// if (!vlog_new->is_open()) { +// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; +// std::exit(EXIT_FAILURE); +// } +// register_vlog_inmaps(vn, vlog_name, vlog_new); +// vlog_nums_ ++; +// vlog_count_ ++; +// return vn; +// } +// void remove_old_vlog(size_t old_vlog_num) { +// // after gc, new_vlog has been created +// // new vlog should have been created here ! +// mtx.lock(); +// auto vi_old = get_vlog_info(old_vlog_num); +// vi_old->vlog_info_latch_.lock(); +// std::string old_vlog_name = get_vlog_name(old_vlog_num); +// remove_from_config_file(old_vlog_name); +// remove_vlog_from_maps(old_vlog_name); +// vi_old->vlog_valid_ = false; +// vi_old->vlog_info_latch_.unlock(); +// vlog_count_ --; +// mtx.unlock(); +// } +// bool vlog_need_gc(size_t vlog_num) { +// std::string vlog_name = get_vlog_name(vlog_num); +// auto vi = vlog_info_map_[vlog_name]; +// bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); +// return retval; +// } +// +//private: +// void register_inconfig_file(std::string &vlog_name) { +// struct vlog_name vn(vlog_name); +// // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | +// // first size_t in config file indicates current vlog_nums_(size_t) +// // second size_t in config file indicates current vlog_count_(size_t) +// config_file_->seekp(2*sizeof(size_t) + vlog_nums_* vlog_name_size); +// config_file_->write(vn.name, vn.name_size); +// config_file_->flush(); +// } +// void remove_from_config_file(std::string &vlog_name) { +// char tmp[vlog_name_size]; +// size_t index = 0; +// config_file_->seekp(2*sizeof(size_t)); +// while (config_file_->read(tmp, vlog_name_size) || config_file_->gcount() > 0) { +// size_t length = 0; +// while (length < config_file_->gcount() && tmp[length] != '\n') { +// ++length; +// } +// std::string name(tmp, length); +// if (name == vlog_name) { +// memset(tmp, '\n', sizeof(tmp)); +// config_file_->seekp(2*sizeof(size_t) + index * vlog_name_size); +// config_file_->write(tmp, vlog_name_size); +// config_file_->flush(); +// break; +// } +// index ++; +// } +// } +// void create_vlog(std::string &vlog_name) { +// std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); +// char tmp[2*sizeof(size_t)]; +// memset(tmp, 0, sizeof(tmp)); +// vlog_new->write(tmp, sizeof(tmp)); +// vlog_new->flush(); +// vlog_new->close(); +// } +// inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name, std::fstream *handler) { +// vlog_info_map_[vlog_name] = new vlog_info(vlog_num); +// vlog_handler_map_[vlog_name] = new vlog_handler(); +// } +// inline void remove_vlog_from_maps(std::string &vlog_name) { +// auto vi = vlog_info_map_[vlog_name]; +//// auto vh = vlog_handler_map_[vlog_name]; +// vi->vlog_info_latch_.lock(); +// vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag +//// vh->handler_->close(); +// assert(!std::remove(vlog_name.c_str())); +// vlog_handler_map_.erase(vlog_name); +// vi->vlog_info_latch_.unlock(); +// } +// inline std::string get_vlog_name(size_t vlog_num) { +// return dbname + "_vlog_" + std::to_string(vlog_num); +// } +// inline struct vlog_info *get_vlog_info(size_t vlog_num) { +// return vlog_info_map_[get_vlog_name(vlog_num)]; +// } +// inline struct vlog_handler *get_vlog_handler(size_t vlog_num) { +// return vlog_handler_map_[get_vlog_name(vlog_num)]; +// } +// void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { +// auto vlog_name = get_vlog_name(vlog_num); +// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); +// handler.seekp(value_offset); +// char value_buff[VALUE_BUFF_SIZE]; +// handler.read(value_buff, VALUE_BUFF_SIZE); +// +// uint16_t value_size; +// memcpy(&value_size, value_buff, sizeof(uint16_t)); +// value_size &= VALUE_SIZE_MASK; +// value->assign(&value_buff[sizeof(uint16_t)], value_size); +// handler.close(); +// } +// void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) { +// auto vlog_name = get_vlog_name(vlog_num); +// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); +// handler.seekp(value_offset); +// const char *value_buff = value.data(); +// handler.write(value_buff, value.size()); +// +// auto vinfo = get_vlog_info(vlog_num); +// vinfo->value_nums ++; +// vinfo->curr_size += value.size(); +// +// handler.flush(); +// handler.close(); +// } +// void mark_del_value(uint32_t vlog_num, uint32_t value_offset) { +// auto vinfo = get_vlog_info(vlog_num); +// auto vlog_name = get_vlog_name(vlog_num); +// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); +// handler.seekp(value_offset); +// char value_buff[VALUE_BUFF_SIZE]; +// handler.read(value_buff, VALUE_BUFF_SIZE); +// +// uint16_t value_size; +// memcpy(&value_size, value_buff, sizeof(uint16_t)); +// if (value_size & VALUE_DELE_MASK) { +// // case when value has been deleted +// handler.close(); +// return ; +// } +// assert(!(value_size & VALUE_DELE_MASK)); +// uint16_t masked_value_size = value_size & 0xffff; +// memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); +// handler.write(value_buff, value_size); +// handler.flush(); +// handler.close(); +// +// // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too +//// auto vinfo = get_vlog_info(vlog_num); +// vinfo->discard ++; +// vinfo->value_nums --; +// vinfo->curr_size -= value_size & VALUE_SIZE_MASK; +// if (vlog_need_gc(vlog_num)) { +// // create new vlog +// vinfo->vlog_num_for_gc = register_new_vlog(); +//// vinfo->vlog_valid_ = false; +// auto old_vlog_name = get_vlog_name(vlog_num); +// auto new_vlog_name = get_vlog_name(vinfo->vlog_num_for_gc); +// auto old_vlog_info = vinfo; +// auto new_vlog_info = get_vlog_info(vinfo->vlog_num_for_gc); +// vlog_gc->do_gc(vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); +// } +// } +// +// +//private: +// std::mutex mtx; +// std::string dbname; +// size_t vlog_nums_; +// size_t vlog_count_; +// std::unordered_map vlog_info_map_; // TODO: 读到vlog已经失效时,要返回slotpage重新读取slotnum +// std::unordered_map vlog_handler_map_; +// std::fstream *config_file_; +// +// VlogGC *vlog_gc; // init with slot_page +//// SlotPage *slot_page; +//}; + + + +#endif // LEVELDB_VLOG_H diff --git a/db/vlog_gc.cpp b/db/vlog_gc.cpp new file mode 100644 index 0000000..8555b63 --- /dev/null +++ b/db/vlog_gc.cpp @@ -0,0 +1,68 @@ +// +// Created by 马也驰 on 2024/12/29. +// + +#include "vlog_gc.h" +#include "../db/vlog_set.h" + + +void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { + thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]() + { this->exec_gc(old_vlog_num, new_vlog_num); }); +} + +void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { + auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); + auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); + auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); + auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); + auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); + auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); + + old_vlog_handler->vlog_latch_.soft_lock(); + new_vlog_info->vlog_info_latch_.lock(); + new_vlog_handler->vlog_latch_.hard_lock(); + + auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); + auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); + + char old_vlog_buff[VLOG_SIZE]; + char new_vlog_buff[VLOG_SIZE]; + old_vlog.seekp(0); + old_vlog.read(old_vlog_buff, VLOG_SIZE); +// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); + + size_t value_nums = old_vlog_info->value_nums; + size_t ovb_off = 2 * sizeof(size_t); + size_t nvb_off = 2 * sizeof(size_t); + size_t new_vlog_value_nums = 0; + for (auto i = 0; i < value_nums; i++) { + char *value = &old_vlog_buff[ovb_off]; + uint16_t value_len = get_value_len(value); + size_t slot_num = get_value_slotnum(value); + if (!value_deleted(value_len)) { + memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); + struct slot_content scn(new_vlog_info->vlog_num, nvb_off); + slot_page_->set_slot(slot_num, &scn); + nvb_off += value_len; + new_vlog_value_nums ++; + } + ovb_off += value_len; + } + new_vlog_info->value_nums = value_nums; + new_vlog_info->curr_size = nvb_off; + memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); + memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); + new_vlog.write(new_vlog_buff, VLOG_SIZE); + new_vlog.flush(); + + old_vlog.close(); + new_vlog.close(); + + old_vlog_handler->vlog_latch_.soft_unlock(); + new_vlog_info->vlog_info_latch_.unlock(); + new_vlog_handler->vlog_latch_.hard_unlock(); + + vlog_set->remove_old_vlog(old_vlog_num); +} + diff --git a/db/vlog_gc.h b/db/vlog_gc.h new file mode 100644 index 0000000..cbd1308 --- /dev/null +++ b/db/vlog_gc.h @@ -0,0 +1,46 @@ +// +// Created by 马也驰 on 2024/12/29. +// + +#ifndef LEVELDB_VLOG_GC_H +#define LEVELDB_VLOG_GC_H + + +#include +#include +#include "../db/slotpage.h" +#include "../db/threadpool.h" +#include "../db/vlog.h" + +// 前向声明 VlogSet +class VlogSet; + +class VlogGC { + public: + VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {} + void do_gc(size_t old_vlog_num, size_t new_vlog_num); + + private: + void exec_gc(size_t old_vlog_num, size_t new_vlog_num); + + inline bool value_deleted(uint16_t value_len) { + return !(value_len >> 15); + } + inline uint16_t get_value_len(char *value) { + uint16_t value_len; + memcpy(&value_len, value, sizeof(uint16_t)); + return value_len; + } + inline size_t get_value_slotnum(char *value) { + size_t slot_num; + memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); + return slot_num; + } + + SlotPage *slot_page_; + ThreadPool *thread_pool_; + VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp +}; + + +#endif // LEVELDB_VLOG_GC_H diff --git a/db/vlog_set.cpp b/db/vlog_set.cpp new file mode 100644 index 0000000..90ba2a9 --- /dev/null +++ b/db/vlog_set.cpp @@ -0,0 +1,336 @@ +// +// Created by 马也驰 on 2024/12/29. +// + +#include "vlog_set.h" +#include "../db/vlog_gc.h" + + +// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | +// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | +// value: { value_len(uint16_t) | slot_num(size_t) | value } + + +VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) { + auto cfname = get_config_file_name(); + this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); + if (!this->config_file_->is_open()) { + // config 文件不存在,尝试创建 + delete this->config_file_; + this->config_file_ = new std::fstream(cfname, std::ios::out); + this->config_file_->close(); + delete this->config_file_; + // 重新以读写模式打开 + this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); + this->vlog_nums_ = 0; + } else { + // config 文件存在 + size_t _vlog_nums_; + config_file_->seekp(0); + config_file_->read(reinterpret_cast(&_vlog_nums_), sizeof(size_t)); + this->vlog_nums_ = _vlog_nums_; + + // 从 config file 中读取所有有效的vlog名字 + config_file_->seekp(sizeof(size_t)); + size_t tmp[_vlog_nums_]; + config_file_->read(reinterpret_cast(tmp), sizeof(tmp)); + for (auto i = 0; i < _vlog_nums_; i++) { + size_t curr_vlog_num = tmp[i]; + if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) { + auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out); + size_t curr_vlog_header[2]; + curr_vlog.seekp(0); + curr_vlog.read(reinterpret_cast(curr_vlog_header), 2*sizeof(size_t)); + curr_vlog.close(); + + restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0])); + } + } + } + + + if (!this->config_file_->is_open()) { + std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; + std::exit(EXIT_FAILURE); + } +} + +VlogSet::~VlogSet() { + config_file_->seekp(0); + config_file_->write(reinterpret_cast(&vlog_nums_), sizeof(size_t)); + config_file_->flush(); + config_file_->close(); + + for (auto & it : vlog_info_map_) { + struct vlog_info* vinfo = it.second; + + // 使用 vlog_info* 进行操作 + size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; + auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); + vlog_file_handler.seekp(0); + vlog_file_handler.write(reinterpret_cast(tmp), sizeof(tmp)); + vlog_file_handler.flush(); + vlog_file_handler.close(); + } + + delete vlog_gc; + +} + +void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { + mtx.lock(); + auto vinfo = get_vlog_info(vlog_num); + auto vhandler = get_vlog_handler(vlog_num); + + vinfo->vlog_info_latch_.lock(); + if (!vinfo->vlog_valid_) { + vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); + } + vhandler->vlog_latch_.soft_lock(); + vinfo->vlog_info_latch_.unlock(); + read_vlog_value(vlog_num, value_offset, value); + vhandler->vlog_latch_.soft_unlock(); + mtx.unlock(); +} + +struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { + for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) { + struct vlog_info* vinfo = it->second; + + // 使用 vlog_info* 进行操作 + if (vinfo->curr_size + value_size <= VLOG_SIZE) { + return vinfo; + } + } + + // 所有vlog已满,创建新vlog + return nullptr; +} + +void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { + mtx.lock(); + + auto vinfo = get_writable_vlog_info(value.size()); + if (!vinfo) { + // vlog全部已满,创建新的vlog + auto _vlog_num_ = register_new_vlog(); + vinfo = get_vlog_info(_vlog_num_); + } + + vinfo->vlog_info_latch_.lock(); + *vlog_num = vinfo->vlog_num; + *value_offset = vinfo->curr_size; + vinfo->curr_size += value.size(); + vinfo->value_nums ++; + vinfo->vlog_info_latch_.unlock(); + + auto vhandler = get_vlog_handler(*vlog_num); + + vinfo->vlog_info_latch_.lock(); + if (!vinfo->vlog_valid_) { + vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); + } + vhandler->vlog_latch_.hard_lock(); + vinfo->vlog_info_latch_.unlock(); + write_vlog_value(*vlog_num, *value_offset, value); + vhandler->vlog_latch_.hard_unlock(); + mtx.unlock(); +} + +void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { + mtx.lock(); + auto vinfo = get_vlog_info(vlog_num); + auto vhandler = get_vlog_handler(vlog_num); + + vinfo->vlog_info_latch_.lock(); + if (!vinfo->vlog_valid_) { +// auto vlog_num_gc = vinfo->vlog_num_for_gc; + vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); +// vinfo->vlog_info_latch_.unlock(); +// vinfo = get_vlog_info(vlog_num_gc); +// vinfo->vlog_info_latch_.lock(); + } + vhandler->vlog_latch_.hard_lock(); + mark_del_value(vlog_num, value_offset); + + vinfo->vlog_info_latch_.unlock(); + vhandler->vlog_latch_.hard_unlock(); + mtx.unlock(); +} + +size_t VlogSet::register_new_vlog() { + size_t vn = vlog_nums_; + std::string vlog_name = get_vlog_name(vn); + register_inconfig_file(vn); + create_vlog(vlog_name); +// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); +// if (!vlog_new->is_open()) { +// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; +// std::exit(EXIT_FAILURE); +// } + register_vlog_inmaps(vn, vlog_name); + vlog_nums_ ++; +// vlog_count_ ++; + return vn; +} + +void VlogSet::remove_old_vlog(size_t old_vlog_num) { + // after gc, new_vlog has been created + // new vlog should have been created here ! + mtx.lock(); + auto vi_old = get_vlog_info(old_vlog_num); + vi_old->vlog_info_latch_.lock(); + std::string old_vlog_name = get_vlog_name(old_vlog_num); + remove_from_config_file(old_vlog_num); + remove_vlog_from_maps(old_vlog_name); + vi_old->vlog_valid_ = false; + vi_old->vlog_info_latch_.unlock(); +// vlog_count_ --; + mtx.unlock(); +} + +bool VlogSet::vlog_need_gc(size_t vlog_num) { + std::string vlog_name = get_vlog_name(vlog_num); + auto vi = vlog_info_map_[vlog_name]; + bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); + return retval; +} + +void VlogSet::register_inconfig_file(size_t vlog_num) { + // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | + // first size_t in config file indicates current vlog_nums_(size_t) + // second size_t in config file indicates current vlog_count_(size_t) + config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); + config_file_->write(reinterpret_cast(&vlog_num), sizeof(size_t)); + config_file_->flush(); +} + +void VlogSet::remove_from_config_file(size_t vlog_num) { + char tmp[vlog_nums_*vlog_num_size]; + size_t index = 0; + config_file_->seekp(sizeof(size_t)); + config_file_->read(tmp, sizeof(tmp)); + size_t *vlog_num_ptr = reinterpret_cast(tmp); + for (auto i = 0; i < vlog_nums_; i++) { + size_t curr_vlog_num = *vlog_num_ptr; + if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) { + curr_vlog_num &= CONFIG_FILE_DELE_MASK; + config_file_->seekp(sizeof(size_t) + i*sizeof(size_t)); + config_file_->write(reinterpret_cast(&curr_vlog_num), sizeof(size_t)); + config_file_->flush(); + break; + } + } +} + +void VlogSet::create_vlog(std::string &vlog_name) { + std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); + char tmp[2*sizeof(size_t)]; + memset(tmp, 0, sizeof(tmp)); + vlog_new->write(tmp, sizeof(tmp)); + vlog_new->flush(); + vlog_new->close(); +} + +inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { + auto vlog_name = get_vlog_name(vi->vlog_num); + vlog_info_map_[vlog_name] = vi; + vlog_handler_map_[vlog_name] = new vlog_handler(); +} + +inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { + vlog_info_map_[vlog_name] = new vlog_info(vlog_num); + vlog_handler_map_[vlog_name] = new vlog_handler(); +} + +inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { + auto vi = vlog_info_map_[vlog_name]; + // auto vh = vlog_handler_map_[vlog_name]; + vi->vlog_info_latch_.lock(); + vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag + // vh->handler_->close(); + assert(!std::remove(vlog_name.c_str())); + vlog_handler_map_.erase(vlog_name); + vi->vlog_info_latch_.unlock(); +} + +inline std::string VlogSet::get_config_file_name() { + return dbname + "_config_file"; +} + +std::string VlogSet::get_vlog_name(size_t vlog_num) { + return dbname + "_vlog_" + std::to_string(vlog_num); +} + +struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) { + return vlog_info_map_[get_vlog_name(vlog_num)]; +} + +struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { + return vlog_handler_map_[get_vlog_name(vlog_num)]; +} + +void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { + auto vlog_name = get_vlog_name(vlog_num); + auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); + handler.seekp(value_offset); + char value_buff[VALUE_BUFF_SIZE]; + handler.read(value_buff, VALUE_BUFF_SIZE); + + uint16_t value_size; + memcpy(&value_size, value_buff, sizeof(uint16_t)); + value_size &= VALUE_SIZE_MASK; + value->assign(&value_buff[sizeof(uint16_t)], value_size); + handler.close(); +} + +void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { + auto vlog_name = get_vlog_name(vlog_num); + auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); + handler.seekp(value_offset); + const char *value_buff = value.data(); + handler.write(value_buff, value.size()); + + auto vinfo = get_vlog_info(vlog_num); + vinfo->value_nums ++; + vinfo->curr_size += value.size(); + + handler.flush(); + handler.close(); +} + +void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { + auto vinfo = get_vlog_info(vlog_num); + auto vlog_name = get_vlog_name(vlog_num); + auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); + handler.seekp(value_offset); + char value_buff[VALUE_BUFF_SIZE]; + handler.read(value_buff, VALUE_BUFF_SIZE); + + uint16_t value_size; + memcpy(&value_size, value_buff, sizeof(uint16_t)); + if (value_size & VALUE_DELE_MASK) { + // case when value has been deleted + handler.close(); + return ; + } + assert(!(value_size & VALUE_DELE_MASK)); + uint16_t masked_value_size = value_size & 0xffff; + memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); + handler.write(value_buff, value_size); + handler.flush(); + handler.close(); + + // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too + // auto vinfo = get_vlog_info(vlog_num); + vinfo->discard ++; + vinfo->value_nums --; + vinfo->curr_size -= value_size & VALUE_SIZE_MASK; + if (vlog_need_gc(vlog_num)) { + // create new vlog + vinfo->vlog_valid_ = false; + vinfo->vlog_num_for_gc = register_new_vlog(); + // vinfo->vlog_valid_ = false; + vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); + } +} diff --git a/db/vlog_set.h b/db/vlog_set.h new file mode 100644 index 0000000..918c37c --- /dev/null +++ b/db/vlog_set.h @@ -0,0 +1,67 @@ +// +// Created by 马也驰 on 2024/12/29. +// + +#ifndef LEVELDB_VLOG_SET_H +#define LEVELDB_VLOG_SET_H + +#include +#include +#include +#include +#include "../include/leveldb/slice.h" +#include "../db/shared_lock.h" +#include "../db/vlog.h" + +// 前向声明 VlogGC +class VlogGC; + +class VlogSet { +friend class VlogGC; + +#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) +#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) + public: + VlogSet(std::string dbname, VlogGC *vlog_gc); + ~VlogSet(); + void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); + void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value); + void del_value(uint32_t vlog_num, uint32_t value_offset); + + void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; } + + private: + size_t register_new_vlog(); + void remove_old_vlog(size_t old_vlog_num); + bool vlog_need_gc(size_t vlog_num); + + void register_inconfig_file(size_t vlog_num); + void remove_from_config_file(size_t vlog_num); + void create_vlog(std::string &vlog_name); + struct vlog_info *get_writable_vlog_info(size_t value_size); + inline void restore_vlog_inmaps(struct vlog_info *vi); + inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name); + inline void remove_vlog_from_maps(std::string &vlog_name); + inline std::string get_config_file_name(); + std::string get_vlog_name(size_t vlog_num); + struct vlog_info *get_vlog_info(size_t vlog_num); + struct vlog_handler *get_vlog_handler(size_t vlog_num); + void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); + void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value); + void mark_del_value(uint32_t vlog_num, uint32_t value_offset); + + private: + std::mutex mtx; + std::string dbname; + size_t vlog_nums_; +// size_t vlog_count_; + std::unordered_map vlog_info_map_; + std::unordered_map vlog_handler_map_; + std::fstream *config_file_; + + VlogGC *vlog_gc; // 仅声明为指针,具体定义放在 vlog_set.cpp +}; + + + +#endif // LEVELDB_VLOG_SET_H diff --git a/report.md b/report.md index c81b0dc..351a904 100644 --- a/report.md +++ b/report.md @@ -57,22 +57,22 @@ 1. value的分离式存储 我们使用若干个vlog文件,为每一个vlog文件设置容量上限(比如16MiB),并在内存中为每一个vlog维护一个discard计数器,表示这个vlog中当前有多少value已经在lsm tree中被标记为删除。 2. 存储value所在vlog和偏移量的元数据 - 我们在key和vlog中添加一个vlog_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层vlog_page中。这个vlog_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过vlog_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改vlog_page中的映射即可。 -3. vlog_page文件和vlog文件的GC - 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将vlog_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 + 我们在key和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 +3. slot_page文件和vlog文件的GC + 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 ### 3. 数据结构设计 -`key的格式:| key | vlog_page_slot | ` +`key的格式:| key | slot_num | ` -`vlog_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` +`slot_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` `value 的格式:| attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |` -对于每一次读取,用户线程先读取lsm tree中key的vlog_page_slot下标,然后到vlog_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 +对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 -但是这又带来了一个问题,我们该如何管理vlog_page这个文件?当插入新的kv时,我们需要在这个vlog_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前vlog_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 +但是这又带来了一个问题,我们该如何管理slot_page这个文件?当插入新的kv时,我们需要在这个slot_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 ### 4. 接口设计 @@ -112,7 +112,7 @@ 5. Get_Fields (待实现) #### 4.2 实现KV分离 这里只展示和vlog以及GC无关的接口,vlog的创建,管理以及后台线程的GC设计到vlog等新数据结构的实现,较为复杂和庞大,这里不做展示。我们只列出与kv的插入有关的新接口: -1. 搜索vlog_page文件: Status find_slot(const Slice& key, Slot *slot); +1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot); 2. 搜索vlog文件: Status find_value(Slot *slot); 3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s); 4. 释放slot: void deallocate_slot(Bitmap *map, uint64_t *s); @@ -201,7 +201,7 @@ int main(int argc, char **argv) { ![图片](./pic/test_field_2.png) ### 5.2 单元测试: -1. 测试插入超过初始vlog_page等slot数量之后,是否还能正常插入,检查vlog_page文件等线性可扩展性 +1. 测试插入超过初始slot_page等slot数量之后,是否还能正常插入,检查slot_page文件等线性可扩展性 2. 测试插入后,进行删除,等待GC完成后再读取value和vlog的大小,看看GC过程是否正常进行。 性能测试: @@ -211,7 +211,7 @@ int main(int argc, char **argv) { #### 6. 可能遇到的挑战与解决方案 列出实现过程中可能遇到的技术难题及其解决思路,如如何处理GC开销、数据同步、索引原子更新等问题。 -各种参数的设置,比如vlog的容量上限,以及vlog_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?vlog_page的读写放大也是一个重要的问题。 +各种参数的设置,比如vlog的容量上限,以及slot_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?slot_page的读写放大也是一个重要的问题。 #### 7. 分工和进度安排 @@ -219,8 +219,8 @@ int main(int argc, char **argv) { |----------------------|-------|----------| | Field相关接口实现 | 12.8 | 王雪飞 | | value_log中value的存储格式 | 12.8 | 王雪飞 | -| vlog_page 相关接口 | 12.8 | 马也驰 | -| vlog_page实现 | 12.8 | 马也驰 | +| slot_page 相关接口 | 12.8 | 马也驰 | +| slot_page实现 | 12.8 | 马也驰 | | 修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 | | vlog的GC实现 | 12.29 | 马也驰 | | 性能测试 | 1.5 | 王雪飞, 马也驰 | diff --git a/test/db_test4.cc b/test/db_test4.cc new file mode 100644 index 0000000..fca9c46 --- /dev/null +++ b/test/db_test4.cc @@ -0,0 +1,127 @@ +// +// Created by 马也驰 on 2024/12/20. +// + +#include "../db/slotpage.h" +#include + +#define SLOTNUM 8192 + +void printVector(const std::vector& vec) { + for (size_t i = 0; i < vec.size(); ++i) { + std::cout << vec[i] << " "; + } + std::cout << std::endl; +} + +int test1() { + BitMap bitmap; + std::vector slots; + slots.reserve(SLOTNUM<<2); + int i = 0; + for (i = 0; i < 2047; i++) { + slots.push_back(bitmap.alloc_slot()); + } + slots.push_back(bitmap.alloc_slot()); + for (; i < SLOTNUM<<2; i++) { + slots.push_back(bitmap.alloc_slot()); + } + printVector(slots); + + std::cout << "\n\n"; + + for (i = 0; i < 100; i++) { + bitmap.dealloc_slot(slots.at(i)); + } + bitmap.dealloc_slot(10000); + bitmap.dealloc_slot(11000); + bitmap.dealloc_slot(12000); + + bitmap.show_allocated_slot(); + + std::cout << "\n\n"; + + for (i = 0; i < 103; i++) { + bitmap.alloc_slot(); + } + + bitmap.show_allocated_slot(); + return 1; +} + + +int test2() { + BitMap bitmap; + std::vector slots; + slots.reserve(SLOTNUM); + for (auto i = 0; i < SLOTNUM; i++) { + slots.push_back(bitmap.alloc_slot()); + } + printVector(slots); + + std::cout << "\n\n"; + + bitmap.dealloc_slot(30); + bitmap.dealloc_slot(40); + bitmap.dealloc_slot(50); + + bitmap.show_allocated_slot(); + + std::cout << "\n\n"; + + bitmap.alloc_slot(); + bitmap.alloc_slot(); + + bitmap.show_allocated_slot(); + return 1; +} + +void allocate_slots(BitMap& bitmap, std::vector& slots, int num_slots) { + for (int i = 0; i < num_slots; i++) { + slots.push_back(bitmap.alloc_slot()); + } +} + +int test3() { + BitMap bitmap; + + std::vector slots1; + slots1.reserve(1000); + + std::vector slots2; + slots2.reserve(1000); + + // Create two threads to allocate slots concurrently. + std::thread thread1(allocate_slots, std::ref(bitmap), std::ref(slots1), 1000); + std::thread thread2(allocate_slots, std::ref(bitmap), std::ref(slots2), 1000); + + // Wait for both threads to finish. + thread1.join(); + thread2.join(); + + // Print the results. + std::cout << "Slots allocated by thread 1:\n"; + printVector(slots1); + + std::cout << "\n\nSlots allocated by thread 2:\n"; + printVector(slots2); + + std::cout << "\n\n"; + + std::vector combined_slots = slots1; + combined_slots.insert(combined_slots.end(), slots2.begin(), slots2.end()); + std::sort(combined_slots.begin(), combined_slots.end()); + + // Print the sorted combined slots. + std::cout << "Sorted combined slots:\n"; + printVector(combined_slots); + + std::cout << "\n\n"; + + + return 1; +} + +int main() { + test3(); // 有问题 +} \ No newline at end of file diff --git a/test/db_test5.cc b/test/db_test5.cc new file mode 100644 index 0000000..9d7154f --- /dev/null +++ b/test/db_test5.cc @@ -0,0 +1,148 @@ +// +// Created by 马也驰 on 2024/12/22. +// + +#include +#include +#include + +#include "../db/slotpage.h" + +void printVector(const std::vector& vec) { + for (size_t i = 0; i < vec.size(); ++i) { + std::cout << vec[i] << " "; + } + std::cout << std::endl; +} + +int test1() { + SlotCache slot_cache((std::string&)"test_slotpage"); + + const size_t blksize = 4096; + const size_t blknum = 32; + const size_t slot_content_num = blknum * blksize / sizeof(slot_content); + slot_content slot_contents[slot_content_num]; + + for (int i = 0; i < slot_content_num; i++) { + slot_contents[i] = {static_cast(i), static_cast(i)}; + } +// +// slot_cache.write_for_test(reinterpret_cast(slot_contents), slot_content_num*sizeof(slot_content)); + + slot_content sc; + for (int i = 0; i < slot_content_num; i++) { + slot_cache.get_slot(i, &sc); + assert(sc.vlog_num == slot_contents[i].vlog_num); + assert(sc.value_offset == slot_contents[i].vlog_num); + std::cout << i << std::endl; + } + + return 1; +} + +int test2() { + SlotCache slot_cache((std::string&)"test_slotpage"); + + const size_t blksize = 4096; + const size_t blknum = 32; + const size_t slot_content_num = blknum * blksize / sizeof(slot_content); + slot_content slot_contents[slot_content_num]; + + for (int i = 0; i < slot_content_num; i++) { + slot_contents[i] = {static_cast(i), static_cast(i)}; + } + // + // slot_cache.write_for_test(reinterpret_cast(slot_contents), slot_content_num*sizeof(slot_content)); + + slot_content sc1 = {999999999, 999999999}; +// slot_cache.set_slot(100, &sc1); +// slot_cache.set_slot(1000, &sc1); +// slot_cache.set_slot(2000, &sc1); + + slot_content sc2 = {0, 0}; + slot_cache.get_slot(100, &sc2); + assert(sc2.vlog_num == sc1.vlog_num); + assert(sc2.value_offset == sc1.value_offset); + slot_cache.get_slot(1000, &sc2); + assert(sc2.vlog_num == sc1.vlog_num); + assert(sc2.value_offset == sc1.value_offset); + slot_cache.get_slot(2000, &sc2); + assert(sc2.vlog_num == sc1.vlog_num); + assert(sc2.value_offset == sc1.value_offset); + + +// for (int i = 0; i < slot_content_num; i++) { +// slot_cache.get_slot(i, &sc); +// assert(sc.vlog_num == slot_contents[i].vlog_num); +// assert(sc.value_offset == slot_contents[i].vlog_num); +// std::cout << i << std::endl; +// } + + return 1; +} + + +void process_slots(SlotCache& slot_cache, + const slot_content* slot_contents, + size_t start, size_t end, + std::vector& slots) { + slot_content sc; + for (size_t i = start; i < end; ++i) { + slot_cache.get_slot(i, &sc); +// assert(sc.vlog_num == slot_contents[i].vlog_num); +// assert(sc.value_offset == slot_contents[i].vlog_num); + + slots.push_back(sc.vlog_num); + } +} + +int test3() { + SlotCache slot_cache((std::string&)("test_slotpage")); + + const size_t blksize = 4096; + const size_t blknum = 32; + const size_t slot_content_num = blknum * blksize / sizeof(slot_content); + slot_content slot_contents[slot_content_num]; + + for (size_t i = 0; i < slot_content_num; i++) { + slot_contents[i] = {static_cast(i), static_cast(i)}; + } + + std::vector slots1; + std::vector slots2; + + // 创建两个线程分别处理 slots1 和 slots2 + std::thread t1(process_slots, std::ref(slot_cache), slot_contents, 0, 100, std::ref(slots1)); + std::thread t2(process_slots, std::ref(slot_cache), slot_contents, 100, 200, std::ref(slots2)); + + // 等待线程完成 + t1.join(); + t2.join(); + + // 打印两个 vector 的内容 + std::cout << "Slots1:" << std::endl; + printVector(slots1); + std::cout << std::endl; + + std::cout << "Slots2:" << std::endl; + printVector(slots2); + std::cout << std::endl; + + // 合并并排序 + std::vector merged_slots = slots1; + merged_slots.insert(merged_slots.end(), slots2.begin(), slots2.end()); + std::sort(merged_slots.begin(), merged_slots.end()); + + // 打印合并排序后的结果 + std::cout << "Merged and Sorted Slots:" << std::endl; + printVector(merged_slots); + std::cout << std::endl; + + return 1; +} + + + +int main() { + test3(); +} \ No newline at end of file