2 Commits

Author SHA1 Message Date
  马也驰 ca25cec40a Merge remote-tracking branch 'origin/master' 8 months ago
  马也驰 465719f1c4 vlog 9 months ago
13 changed files with 1505 additions and 42 deletions
Split View
  1. +21
    -3
      CMakeLists.txt
  2. +48
    -2
      db/db_impl.cc
  3. +6
    -0
      db/db_impl.h
  4. +52
    -0
      db/shared_lock.h
  5. +141
    -37
      db/slotpage.h
  6. +67
    -0
      db/threadpool.h
  7. +378
    -0
      db/vlog.h
  8. +68
    -0
      db/vlog_gc.cpp
  9. +46
    -0
      db/vlog_gc.h
  10. +336
    -0
      db/vlog_set.cpp
  11. +67
    -0
      db/vlog_set.h
  12. +127
    -0
      test/db_test4.cc
  13. +148
    -0
      test/db_test5.cc

+ 21
- 3
CMakeLists.txt View File

@ -118,7 +118,14 @@ endif(BUILD_SHARED_LIBS)
include(GNUInstallDirs)
add_library(leveldb ""
db/slotpage.h)
db/slotpage.h
db/vlog.h
db/threadpool.h
db/shared_lock.h
db/vlog_set.cpp
db/vlog_set.h
db/vlog_gc.cpp
db/vlog_gc.h)
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
@ -521,10 +528,10 @@ endif(LEVELDB_INSTALL)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
test/db_test1.cc
)
target_link_libraries(db_test1 leveldb)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
)
@ -533,4 +540,15 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(db_test3
"${PROJECT_SOURCE_DIR}/test/db_test3.cc"
)
target_link_libraries(db_test3 PRIVATE leveldb gtest)
target_link_libraries(db_test3 PRIVATE leveldb gtest)
add_executable(db_test4
"${PROJECT_SOURCE_DIR}/test/db_test4.cc"
)
target_link_libraries(db_test4 PRIVATE leveldb)
add_executable(db_test5
"${PROJECT_SOURCE_DIR}/test/db_test5.cc"
test/db_test5.cc
)
target_link_libraries(db_test5 PRIVATE leveldb)

+ 48
- 2
db/db_impl.cc View File

@ -147,7 +147,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
&internal_comparator_)),
slot_page_(new SlotPage(dbname)) {
vlog_set_ = new VlogSet(dbname, nullptr);
vlog_set_->set_vlog_gc(new VlogGC(slot_page_, vlog_set_));
}
DBImpl::~DBImpl() {
// Wait for background work to finish.
@ -176,6 +181,9 @@ DBImpl::~DBImpl() {
if (owns_cache_) {
delete options_.block_cache;
}
delete slot_page_;
delete vlog_set_;
}
Status DBImpl::NewDB() {
@ -1165,6 +1173,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
// TODO: search the slotpage and get value from vlog
size_t slot_num = *(size_t *)value->c_str();
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
*value = vlog_value;
return s;
}
@ -1199,10 +1216,39 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
// TODO: allocate slot_num in slotpage and put value in vlog
size_t slot_num = slot_page_->alloc_slot();
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
return DB::Put(o, key, slot_val);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
// size_t slot_num = *(size_t *)value->c_str();
// struct slot_content sc;
// std::string vlog_value;
// slot_page_->get_slot(slot_num, &sc);
// vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
// *value = vlog_value;
ReadOptions ro;
ro.verify_checksums = true;
ro.fill_cache = false;
ro.snapshot = nullptr;
std::string value;
Get(ro, key, &value);
size_t slot_num = *(size_t *)value.c_str();
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->del_value(sc.vlog_num, sc.value_offset);
return DB::Delete(options, key);
}

+ 6
- 0
db/db_impl.h View File

@ -13,6 +13,9 @@
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/vlog_set.h"
#include "db/vlog_gc.h"
#include "db/slotpage.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
@ -76,6 +79,9 @@ class DBImpl : public DB {
struct CompactionState;
struct Writer;
SlotPage *slot_page_;
VlogSet *vlog_set_;
// Information for a manual compaction
struct ManualCompaction {
int level;

+ 52
- 0
db/shared_lock.h View File

@ -0,0 +1,52 @@
//
// Created by on 2024/12/28.
//
#ifndef LEVELDB_SHARED_LOCK_H
#define LEVELDB_SHARED_LOCK_H
#include <mutex>
class SharedLock {
public:
SharedLock() : read_count_(0) {}
public:
void soft_lock() {
write_latch_.lock();
read_count_latch_.lock();
if (!read_count_) {
read_latch_.lock();
}
read_count_ ++;
read_count_latch_.unlock();
write_latch_.unlock();
}
void soft_unlock() {
read_count_latch_.lock();
read_count_ --;
if (!read_count_) {
read_latch_.unlock();
}
read_count_latch_.unlock();
}
void hard_lock() {
read_latch_.lock();
write_latch_.lock();
read_latch_.unlock();
}
void hard_unlock() {
write_latch_.unlock();
}
private:
std::mutex read_latch_; // indicate if any read is proceeding
std::mutex read_count_latch_;
volatile size_t read_count_;
std::mutex write_latch_;
// volatile size_t write_count_;
};
#endif // LEVELDB_SHARED_LOCK_H

+ 141
- 37
db/slotpage.h View File

@ -12,6 +12,8 @@
#include <iostream>
#include <fstream>
#include <mutex>
#include <iostream>
#include <unordered_map>
#define BITMAP_SIZE 8192
#define BITS_PER_BYTE 8
@ -24,12 +26,14 @@
struct slot_content {
uint32_t vlog_num;
uint32_t value_offset;
slot_content() {}
slot_content(uint32_t vn, uint32_t vo) {
vlog_num = vn;
value_offset = vo;
}
};
// test passed
class SlotCache {
// slot number -> slot content
#define BLOCK_NUM 16
@ -37,7 +41,21 @@ class SlotCache {
#define SLOT_PER_BLOCK (BLOCK_SIZE/sizeof(slot_content))
#define SLOT_OFFSET_IN_BLOCK(slot_num) ((slot_num)%SLOT_PER_BLOCK)
public:
SlotCache() {
SlotCache(const std::string &slotpage_fname) {
this->slotpage_fname = slotpage_fname;
// this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out);
this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out);
if (!this->slotpage_handler.is_open()) {
//
this->slotpage_handler = std::fstream(slotpage_fname, std::ios::out);
this->slotpage_handler.close();
//
this->slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out);
}
if (!this->slotpage_handler.is_open()) {
std::cerr << "Failed to open or create the file: " << slotpage_fname << std::endl;
std::exit(EXIT_FAILURE);
}
for (auto i = 0; i < BLOCK_NUM; i++) {
block_cache[i] = static_cast<struct slot_content*>(malloc(BLOCK_SIZE));
access_time[i] = 0;
@ -45,43 +63,78 @@ public:
}
}
void get_slot(std::string &slotpage_fname, size_t slot_num, struct slot_content *sc) {
~SlotCache() {
for (auto i = 0; i < BLOCK_NUM; i++) {
latches_[i].lock();
if (info[i].is_dirty) {
write_back_block(info[i].block_num);
}
latches_[i].unlock();
}
this->slotpage_handler.close();
}
/// methods only for test
void write_for_test(char *sc, size_t bytes) {
slotpage_handler.seekp(0);
slotpage_handler.write(reinterpret_cast<const char*>(sc), bytes);
slotpage_handler.flush();
}
void flush_all_blk() {
for (auto blkinfo : info) {
if (blkinfo.is_dirty) {
write_back_block(blkinfo.block_num);
}
}
this->slotpage_handler.flush();
}
void get_slot(size_t slot_num, struct slot_content *sc) {
auto block_num = slotnum_hash2_blocknum(slot_num);
auto blockcache_num = block_num % BLOCK_NUM;
mtx.lock();
if (info[blockcache_num].slotpage_fname != slotpage_fname) { // cache miss
write_back_block(blockcache_num);
read_in_block(blockcache_num, slotpage_fname, block_num);
latches_[blockcache_num].lock();
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { // cache miss
if (info[blockcache_num].is_dirty) {
write_back_block(blockcache_num);
}
read_in_block(blockcache_num, block_num);
access_time[blockcache_num] = 0;
info[blockcache_num] = block_info(block_num, false, true);
}
read_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num));
access_time[blockcache_num]++;
mtx.unlock();
latches_[blockcache_num].unlock();
}
void set_slot(std::string &slotpage_fname, size_t slot_num, struct slot_content *sc) {
void set_slot(size_t slot_num, struct slot_content *sc) {
auto block_num = slotnum_hash2_blocknum(slot_num);
auto blockcache_num = block_num % BLOCK_NUM;
mtx.lock();
if (info[blockcache_num].slotpage_fname != slotpage_fname) {
write_back_block(blockcache_num);
read_in_block(blockcache_num, slotpage_fname, block_num);
latches_[blockcache_num].lock();
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) {
if (info[blockcache_num].is_dirty) {
write_back_block(blockcache_num);
}
read_in_block(blockcache_num, block_num);
access_time[blockcache_num] = 0;
info[blockcache_num] = block_info(block_num, false, true);
}
set_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num));
access_time[blockcache_num]++;
info[blockcache_num].is_dirty = true;
mtx.unlock();
latches_[blockcache_num].unlock();
}
private:
struct block_info {
std::string slotpage_fname;
bool used;
size_t block_num;
bool is_dirty;
block_info() {}
block_info(std::string &slotpage_fname, size_t block_num, bool is_dirty=false) {
this->slotpage_fname = slotpage_fname;
block_info() { used = false; is_dirty = false; }
block_info(size_t block_num, bool is_dirty=false, bool used = false) {
this->block_num = block_num;
this->is_dirty = is_dirty;
this->used = used;
}
};
@ -90,19 +143,15 @@ private:
return slot_num * sizeof(slot_content) / BLOCK_SIZE;
}
void write_back_block(size_t blockcache_num) {
auto &fname = info[blockcache_num].slotpage_fname;
auto write_pos = info[blockcache_num].block_num * BLOCK_SIZE;
auto slotpage_handler = std::fstream(fname, std::ios::in | std::ios::out);
slotpage_handler.seekp(write_pos);
slotpage_handler.write(
reinterpret_cast<const char*>(block_cache[blockcache_num]), BLOCK_SIZE);
slotpage_handler.flush();
}
void read_in_block(size_t blockcache_num, std::string &slotpage_fname, size_t block_num) {
auto slotpage_handler = std::fstream(slotpage_fname, std::ios::in | std::ios::out);
void read_in_block(size_t blockcache_num, size_t block_num) {
slotpage_handler.seekp(block_num*BLOCK_SIZE);
slotpage_handler.read(reinterpret_cast<char*>(block_cache[blockcache_num]), BLOCK_SIZE);
access_time[blockcache_num] = 0;
info[blockcache_num] = block_info(slotpage_fname, block_num);
}
inline void read_slot(struct slot_content *sc, size_t blockcache_num, size_t slot_num_offset) {
auto src = &block_cache[blockcache_num][slot_num_offset];
@ -114,23 +163,41 @@ private:
}
private:
std::mutex mtx;
std::string slotpage_fname;
std::fstream slotpage_handler;
std::mutex latches_[BLOCK_NUM];
struct slot_content *block_cache[BLOCK_NUM];
size_t access_time[BLOCK_NUM];
struct block_info info[BLOCK_NUM];
};
/// test passed
class BitMap {
// in memory bitmap
public:
BitMap() {
char *bitmap = static_cast<char*>(malloc(BITMAP_SIZE*sizeof(char)));
memset(bitmap, 0, BITMAP_SIZE * sizeof(char));
bitmaps_.push_back(bitmap);
size = BITMAP_SIZE;
first_empty_slot = 0;
}
/** methods for test **/
void show_allocated_slot() {
for (int i = 0; i < this->size; i++) {
auto byte = *get_bitmap_byte(i);
for (int b = 0; b < sizeof(byte) << 3; b++) {
if (byte & 0x80) {
std::cout << (i<<3)+b << ' ';
}
byte <<= 1;
}
// std::cout << std::endl;
}
// std::cout << std::endl;
}
void dealloc_slot(size_t slot_num) {
mtx.lock();
const size_t byte = slot2byte(slot_num);
@ -149,23 +216,29 @@ public:
char *start_byte = get_bitmap_byte(slot2byte(first_empty_slot));
const size_t off = slot2offset(first_empty_slot);
SETBIT(start_byte, off);
// find the next free slot
if (HASFREESLOT(*start_byte)) {
auto bit_off = find_first_free_slot_inbyte(*start_byte);
first_empty_slot += bit_off - off;
if (slot2byte(first_empty_slot) >= size) {
alloc_new_bitmap();
}
} else {
size_t i;
for (i = slot2byte(first_empty_slot)+1; i < size; i++) {
char *byte = get_bitmap_byte(i);
if (HASFREESLOT(*byte)) {
// FIXME: pack four bytes to do free slot finding
auto bit_off = find_first_free_slot_inbyte(*byte);
first_empty_slot = byte2slot(i) + bit_off;
break;
}
}
// scale the bitmap
if (i >= size) {
alloc_new_bitmap();
char *byte = get_bitmap_byte(i);
SETBIT(byte, 0);
// char *byte = get_bitmap_byte(i);
// SETBIT(byte, 0);
first_empty_slot = byte2slot(i) + 1;
}
}
@ -175,7 +248,7 @@ public:
private:
static inline size_t slot2byte(size_t slot_num) { return slot_num / BITS_PER_BYTE; }
static inline size_t byte2slot(uint8_t byte_num) { return byte_num * BITS_PER_BYTE; }
static inline size_t byte2slot(size_t byte_num) { return byte_num * BITS_PER_BYTE; }
static inline size_t slot2offset(size_t slot_num) { return slot_num % BITS_PER_BYTE; }
inline char *get_bitmap_byte(size_t byte_num) {
char *bitmap = bitmaps_.at(byte_num / BITMAP_SIZE);
@ -186,11 +259,26 @@ private:
// bitmap[byte_num % BITMAP_SIZE] = byte;
// }
static inline size_t find_first_free_slot_inbyte(uint8_t byte) {
uint32_t rbyte = __builtin_bswap32((uint32_t)byte) >> 24; // 使 32
return BITS_PER_BYTE - __builtin_ctz(~rbyte) - 1; // 使 GCC 1
uint32_t rbyte = reverse_bits(byte); // 使 32
return __builtin_ctz(~rbyte); // 使 GCC 1
}
static inline uint8_t reverse_bits(uint8_t byte) {
uint8_t reversed = 0;
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 1st bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 2nd bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 3rd bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 4th bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 5th bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 6th bit
reversed = (reversed << 1) | (byte & 1); byte >>= 1; // 7th bit
reversed = (reversed << 1) | (byte & 1); // 8th bit (final step)
return reversed;
}
void alloc_new_bitmap() {
char *bitmap = static_cast<char*>(malloc(BITMAP_SIZE*sizeof(char)));
memset(bitmap, 0, BITMAP_SIZE * sizeof(char));
bitmaps_.push_back(bitmap);
size += BITMAP_SIZE;
}
@ -203,28 +291,32 @@ private:
};
//SlotCache * get_slotcache(const std::string &dbname);
// read and write slot in disk
// first version with no cache
class SlotPage {
public:
SlotPage(std::string &dbname) {
SlotPage(const std::string &dbname) {
slotpage_fname = slotpage_handler_name(dbname);
bitmap = new BitMap();
slotcache = new SlotCache(slotpage_fname);
assert(slotcache);
}
public:
void get_slot(size_t slot_num, struct slot_content *sc) {
slotcache->get_slot(slotpage_fname, slot_num, sc);
slotcache->get_slot(slot_num, sc);
}
void set_slot(size_t slot_num, struct slot_content *sc) {
slotcache->set_slot(slotpage_fname, slot_num, sc);
slotcache->set_slot(slot_num, sc);
}
size_t alloc_slot() { return bitmap->alloc_slot(); }
void dealloc_slot(size_t slot_num) { bitmap->dealloc_slot(slot_num); }
private:
static std::string slotpage_handler_name(std::string &dbname) {
return "./" + dbname + "_slotpage";
static std::string slotpage_handler_name(const std::string &dbname) {
return dbname + "_slotpage";
}
inline static size_t slotpage_pos(size_t slot_num) {
return slot_num * sizeof(slot_content);
@ -233,9 +325,21 @@ private:
private:
std::string slotpage_fname;
BitMap *bitmap;
static SlotCache *slotcache;
// static SlotCache *slotcache;
SlotCache *slotcache;
};
SlotCache *SlotPage::slotcache = new SlotCache();
//#ifndef ENABLE_SLOT_CACHES
//#define ENABLE_SLOT_CACHES
//std::unordered_map<std::string, SlotCache *> slotcaches_;
//#endif
//
//SlotCache * get_slotcache(const std::string &dbname) {
// if (slotcaches_.find(dbname) == slotcaches_.end()) {
// slotcaches_[dbname] = new SlotCache(dbname);
// }
// return slotcaches_[dbname];
//}
#endif // LEVELDB_SLOTPAGE_H

+ 67
- 0
db/threadpool.h View File

@ -0,0 +1,67 @@
//
// Created by on 2024/12/28.
//
#ifndef LEVELDB_THREADPOOL_H
#define LEVELDB_THREADPOOL_H
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <condition_variable>
#include <atomic>
class ThreadPool {
public:
explicit ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(std::move(task));
}
condition.notify_one();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
std::atomic<bool> stop;
};
#endif // LEVELDB_THREADPOOL_H

+ 378
- 0
db/vlog.h View File

@ -0,0 +1,378 @@
//
// Created by on 2024/12/26.
//
#ifndef LEVELDB_VLOG_H
#define LEVELDB_VLOG_H
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <vector>
#include <cstdio>
#include "../db/threadpool.h"
#include "../include/leveldb/slice.h"
#include "../db/slotpage.h"
#include "../db/shared_lock.h"
// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... |
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
// value: value_len(uint16_t) | slot_num(size_t) | value |
#define vlog_num_size sizeof(size_t)
#define vlog_name_size 256
#define KiB 1024
#define MiB 1024*KiB
#define VLOG_SIZE 32*MiB
#define GC_THREDHOLD 0.5
#define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number
#define VALUE_SIZE_MASK 0x7fff
#define VALUE_DELE_MASK 0x8000
//struct vlog_name {
// int name_size = 0;
// char name[vlog_name_size];
// vlog_name() {
// memset(name, '\n', sizeof(name));
// }
// vlog_name(std::string &_name) {
// memset(name, '\n', sizeof(name));
// memcpy(name, _name.c_str(), _name.size());
// name_size = _name.size();
// }
//};
struct vlog_info {
std::mutex vlog_info_latch_; // vlog_info本身的并发修改
size_t vlog_num;
size_t vlog_num_for_gc; // set when start gc
bool vlog_valid_;
size_t discard;
size_t value_nums;
size_t curr_size;
vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0),
vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {}
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums),
vlog_num(vlog_num), curr_size(curr_size) {}
};
struct vlog_handler {
SharedLock vlog_latch_; // vlog上的并发情况soft_lockhard_lock
vlog_handler() {}
};
//class VlogSet;
//
//
//class VlogGC {
//#define THREADNUM 8
// public:
// VlogGC(SlotPage *s, VlogSet *vs) : thread_pool_(new ThreadPool(THREADNUM)),
// slot_page_(s), vlog_set(vs) {}
// public:
// void do_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name,
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info){
// thread_pool_->enqueue([this, old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info]()
// { this->exec_gc(old_vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info); });
// }
//
// private:
// void exec_gc(size_t old_vlog_num, std::string old_vlog_name, std::string new_vlog_name,
// struct vlog_info *old_vlog_info, struct vlog_info *new_vlog_info) {
//
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
//
// size_t value_nums = old_vlog_info->value_nums;
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
// old_vlog.seekp(0);
// old_vlog.read(old_vlog_buff, VLOG_SIZE);
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t));
//
// size_t ovb_off = sizeof(size_t);
// size_t nvb_off = sizeof(size_t);
// for (auto i = 0; i < value_nums; i++) {
// char *value = &old_vlog_buff[ovb_off];
// uint16_t value_len = get_value_len(value);
// size_t slot_num = get_value_slotnum(value);
// if (!value_deleted(value_len)) {
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
// slot_page_->set_slot(slot_num, &scn);
// nvb_off += value_len;
// }
// ovb_off += value_len;
// }
// new_vlog.write(new_vlog_buff, VLOG_SIZE);
// new_vlog.flush();
//
// old_vlog.close();
// new_vlog.close();
//
// this->vlog_set->remove_from_config_file(old_vlog_name);
// }
// private:
// inline bool value_deleted(uint16_t value_len) {
// return !(value_len >> 15);
// }
// inline uint16_t get_value_len(char *value) {
// uint16_t value_len;
// memcpy(&value_len, value, sizeof(uint16_t));
// return value_len;
// }
// inline size_t get_value_slotnum(char *value) {
// size_t slot_num;
// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
// return slot_num;
// }
// private:
// SlotPage *slot_page_;
// ThreadPool *thread_pool_;
// VlogSet *vlog_set; // inline void remove_vlog_from_maps(std::string &vlog_name)
//};
//class VlogSet {
//// value: value_len(uint16_t) | slot_num(size_t) | value
//// vlog: curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
//#define VALUE_BUFF_SIZE 0x7fff
//#define VALUE_SIZE_MASK 0x7fff
//#define VALUE_DELE_MASK 0x8000
//
//friend class VlogGC;
//
//public:
// void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
// mtx.lock();
// auto vinfo = get_vlog_info(vlog_num);
// auto vhandler = get_vlog_handler(vlog_num);
//
// vinfo->vlog_info_latch_.lock();
// if (!vinfo->vlog_valid_) {
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// }
// vhandler->vlog_latch_.soft_lock();
// vinfo->vlog_info_latch_.unlock();
// read_vlog_value(vlog_num, value_offset, value);
// vhandler->vlog_latch_.soft_unlock();
// mtx.unlock();
// }
// void put_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) {
// mtx.lock();
// auto vinfo = get_vlog_info(vlog_num);
// auto vhandler = get_vlog_handler(vlog_num);
//
// vinfo->vlog_info_latch_.lock();
// if (!vinfo->vlog_valid_) {
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// }
// vhandler->vlog_latch_.hard_lock();
// vinfo->vlog_info_latch_.unlock();
// write_vlog_value(vlog_num, value_offset, value);
// vhandler->vlog_latch_.hard_unlock();
// mtx.unlock();
// }
// void del_value(uint32_t vlog_num, uint32_t value_offset) {
// mtx.lock();
// auto vinfo = get_vlog_info(vlog_num);
// auto vhandler = get_vlog_handler(vlog_num);
//
// vinfo->vlog_info_latch_.lock();
// if (!vinfo->vlog_valid_) {
// vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// }
// vhandler->vlog_latch_.hard_lock();
// mark_del_value(vlog_num, value_offset);
//
// vinfo->vlog_info_latch_.unlock();
// vhandler->vlog_latch_.hard_unlock();
// mtx.unlock();
// }
//
//private:
// size_t register_new_vlog() {
// size_t vn = vlog_nums_;
// std::string vlog_name = get_vlog_name(vn);
// register_inconfig_file(vlog_name);
// create_vlog(vlog_name);
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
// if (!vlog_new->is_open()) {
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
// std::exit(EXIT_FAILURE);
// }
// register_vlog_inmaps(vn, vlog_name, vlog_new);
// vlog_nums_ ++;
// vlog_count_ ++;
// return vn;
// }
// void remove_old_vlog(size_t old_vlog_num) {
// // after gc, new_vlog has been created
// // new vlog should have been created here !
// mtx.lock();
// auto vi_old = get_vlog_info(old_vlog_num);
// vi_old->vlog_info_latch_.lock();
// std::string old_vlog_name = get_vlog_name(old_vlog_num);
// remove_from_config_file(old_vlog_name);
// remove_vlog_from_maps(old_vlog_name);
// vi_old->vlog_valid_ = false;
// vi_old->vlog_info_latch_.unlock();
// vlog_count_ --;
// mtx.unlock();
// }
// bool vlog_need_gc(size_t vlog_num) {
// std::string vlog_name = get_vlog_name(vlog_num);
// auto vi = vlog_info_map_[vlog_name];
// bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
// return retval;
// }
//
//private:
// void register_inconfig_file(std::string &vlog_name) {
// struct vlog_name vn(vlog_name);
// // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
// // first size_t in config file indicates current vlog_nums_(size_t)
// // second size_t in config file indicates current vlog_count_(size_t)
// config_file_->seekp(2*sizeof(size_t) + vlog_nums_* vlog_name_size);
// config_file_->write(vn.name, vn.name_size);
// config_file_->flush();
// }
// void remove_from_config_file(std::string &vlog_name) {
// char tmp[vlog_name_size];
// size_t index = 0;
// config_file_->seekp(2*sizeof(size_t));
// while (config_file_->read(tmp, vlog_name_size) || config_file_->gcount() > 0) {
// size_t length = 0;
// while (length < config_file_->gcount() && tmp[length] != '\n') {
// ++length;
// }
// std::string name(tmp, length);
// if (name == vlog_name) {
// memset(tmp, '\n', sizeof(tmp));
// config_file_->seekp(2*sizeof(size_t) + index * vlog_name_size);
// config_file_->write(tmp, vlog_name_size);
// config_file_->flush();
// break;
// }
// index ++;
// }
// }
// void create_vlog(std::string &vlog_name) {
// std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out);
// char tmp[2*sizeof(size_t)];
// memset(tmp, 0, sizeof(tmp));
// vlog_new->write(tmp, sizeof(tmp));
// vlog_new->flush();
// vlog_new->close();
// }
// inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name, std::fstream *handler) {
// vlog_info_map_[vlog_name] = new vlog_info(vlog_num);
// vlog_handler_map_[vlog_name] = new vlog_handler();
// }
// inline void remove_vlog_from_maps(std::string &vlog_name) {
// auto vi = vlog_info_map_[vlog_name];
//// auto vh = vlog_handler_map_[vlog_name];
// vi->vlog_info_latch_.lock();
// vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag
//// vh->handler_->close();
// assert(!std::remove(vlog_name.c_str()));
// vlog_handler_map_.erase(vlog_name);
// vi->vlog_info_latch_.unlock();
// }
// inline std::string get_vlog_name(size_t vlog_num) {
// return dbname + "_vlog_" + std::to_string(vlog_num);
// }
// inline struct vlog_info *get_vlog_info(size_t vlog_num) {
// return vlog_info_map_[get_vlog_name(vlog_num)];
// }
// inline struct vlog_handler *get_vlog_handler(size_t vlog_num) {
// return vlog_handler_map_[get_vlog_name(vlog_num)];
// }
// void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
// auto vlog_name = get_vlog_name(vlog_num);
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
// handler.seekp(value_offset);
// char value_buff[VALUE_BUFF_SIZE];
// handler.read(value_buff, VALUE_BUFF_SIZE);
//
// uint16_t value_size;
// memcpy(&value_size, value_buff, sizeof(uint16_t));
// value_size &= VALUE_SIZE_MASK;
// value->assign(&value_buff[sizeof(uint16_t)], value_size);
// handler.close();
// }
// void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, leveldb::Slice &value) {
// auto vlog_name = get_vlog_name(vlog_num);
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
// handler.seekp(value_offset);
// const char *value_buff = value.data();
// handler.write(value_buff, value.size());
//
// auto vinfo = get_vlog_info(vlog_num);
// vinfo->value_nums ++;
// vinfo->curr_size += value.size();
//
// handler.flush();
// handler.close();
// }
// void mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
// auto vinfo = get_vlog_info(vlog_num);
// auto vlog_name = get_vlog_name(vlog_num);
// auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
// handler.seekp(value_offset);
// char value_buff[VALUE_BUFF_SIZE];
// handler.read(value_buff, VALUE_BUFF_SIZE);
//
// uint16_t value_size;
// memcpy(&value_size, value_buff, sizeof(uint16_t));
// if (value_size & VALUE_DELE_MASK) {
// // case when value has been deleted
// handler.close();
// return ;
// }
// assert(!(value_size & VALUE_DELE_MASK));
// uint16_t masked_value_size = value_size & 0xffff;
// memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
// handler.write(value_buff, value_size);
// handler.flush();
// handler.close();
//
// // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
//// auto vinfo = get_vlog_info(vlog_num);
// vinfo->discard ++;
// vinfo->value_nums --;
// vinfo->curr_size -= value_size & VALUE_SIZE_MASK;
// if (vlog_need_gc(vlog_num)) {
// // create new vlog
// vinfo->vlog_num_for_gc = register_new_vlog();
//// vinfo->vlog_valid_ = false;
// auto old_vlog_name = get_vlog_name(vlog_num);
// auto new_vlog_name = get_vlog_name(vinfo->vlog_num_for_gc);
// auto old_vlog_info = vinfo;
// auto new_vlog_info = get_vlog_info(vinfo->vlog_num_for_gc);
// vlog_gc->do_gc(vlog_num, old_vlog_name, new_vlog_name, old_vlog_info, new_vlog_info);
// }
// }
//
//
//private:
// std::mutex mtx;
// std::string dbname;
// size_t vlog_nums_;
// size_t vlog_count_;
// std::unordered_map<std::string, struct vlog_info *> vlog_info_map_; // TODO: vlog已经失效时slotpage重新读取slotnum
// std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_;
// std::fstream *config_file_;
//
// VlogGC *vlog_gc; // init with slot_page
//// SlotPage *slot_page;
//};
#endif // LEVELDB_VLOG_H

+ 68
- 0
db/vlog_gc.cpp View File

@ -0,0 +1,68 @@
//
// Created by 马也驰 on 2024/12/29.
//
#include "vlog_gc.h"
#include "../db/vlog_set.h"
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]()
{ this->exec_gc(old_vlog_num, new_vlog_num); });
}
void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) {
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
old_vlog_handler->vlog_latch_.soft_lock();
new_vlog_info->vlog_info_latch_.lock();
new_vlog_handler->vlog_latch_.hard_lock();
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
char old_vlog_buff[VLOG_SIZE];
char new_vlog_buff[VLOG_SIZE];
old_vlog.seekp(0);
old_vlog.read(old_vlog_buff, VLOG_SIZE);
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t));
size_t value_nums = old_vlog_info->value_nums;
size_t ovb_off = 2 * sizeof(size_t);
size_t nvb_off = 2 * sizeof(size_t);
size_t new_vlog_value_nums = 0;
for (auto i = 0; i < value_nums; i++) {
char *value = &old_vlog_buff[ovb_off];
uint16_t value_len = get_value_len(value);
size_t slot_num = get_value_slotnum(value);
if (!value_deleted(value_len)) {
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
slot_page_->set_slot(slot_num, &scn);
nvb_off += value_len;
new_vlog_value_nums ++;
}
ovb_off += value_len;
}
new_vlog_info->value_nums = value_nums;
new_vlog_info->curr_size = nvb_off;
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
new_vlog.write(new_vlog_buff, VLOG_SIZE);
new_vlog.flush();
old_vlog.close();
new_vlog.close();
old_vlog_handler->vlog_latch_.soft_unlock();
new_vlog_info->vlog_info_latch_.unlock();
new_vlog_handler->vlog_latch_.hard_unlock();
vlog_set->remove_old_vlog(old_vlog_num);
}

+ 46
- 0
db/vlog_gc.h View File

@ -0,0 +1,46 @@
//
// Created by on 2024/12/29.
//
#ifndef LEVELDB_VLOG_GC_H
#define LEVELDB_VLOG_GC_H
#include <string>
#include <mutex>
#include "../db/slotpage.h"
#include "../db/threadpool.h"
#include "../db/vlog.h"
// VlogSet
class VlogSet;
class VlogGC {
public:
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {}
void do_gc(size_t old_vlog_num, size_t new_vlog_num);
private:
void exec_gc(size_t old_vlog_num, size_t new_vlog_num);
inline bool value_deleted(uint16_t value_len) {
return !(value_len >> 15);
}
inline uint16_t get_value_len(char *value) {
uint16_t value_len;
memcpy(&value_len, value, sizeof(uint16_t));
return value_len;
}
inline size_t get_value_slotnum(char *value) {
size_t slot_num;
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
return slot_num;
}
SlotPage *slot_page_;
ThreadPool *thread_pool_;
VlogSet *vlog_set; // vlog_gc.cpp
};
#endif // LEVELDB_VLOG_GC_H

+ 336
- 0
db/vlog_set.cpp View File

@ -0,0 +1,336 @@
//
// Created by 马也驰 on 2024/12/29.
//
#include "vlog_set.h"
#include "../db/vlog_gc.h"
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
// value: { value_len(uint16_t) | slot_num(size_t) | value }
VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) {
auto cfname = get_config_file_name();
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out);
if (!this->config_file_->is_open()) {
// config 文件不存在,尝试创建
delete this->config_file_;
this->config_file_ = new std::fstream(cfname, std::ios::out);
this->config_file_->close();
delete this->config_file_;
// 重新以读写模式打开
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out);
this->vlog_nums_ = 0;
} else {
// config 文件存在
size_t _vlog_nums_;
config_file_->seekp(0);
config_file_->read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t));
this->vlog_nums_ = _vlog_nums_;
// 从 config file 中读取所有有效的vlog名字
config_file_->seekp(sizeof(size_t));
size_t tmp[_vlog_nums_];
config_file_->read(reinterpret_cast<char*>(tmp), sizeof(tmp));
for (auto i = 0; i < _vlog_nums_; i++) {
size_t curr_vlog_num = tmp[i];
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) {
auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out);
size_t curr_vlog_header[2];
curr_vlog.seekp(0);
curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t));
curr_vlog.close();
restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0]));
}
}
}
if (!this->config_file_->is_open()) {
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl;
std::exit(EXIT_FAILURE);
}
}
VlogSet::~VlogSet() {
config_file_->seekp(0);
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t));
config_file_->flush();
config_file_->close();
for (auto & it : vlog_info_map_) {
struct vlog_info* vinfo = it.second;
// 使用 vlog_info* 进行操作
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums};
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out);
vlog_file_handler.seekp(0);
vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
vlog_file_handler.flush();
vlog_file_handler.close();
}
delete vlog_gc;
}
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.soft_lock();
vinfo->vlog_info_latch_.unlock();
read_vlog_value(vlog_num, value_offset, value);
vhandler->vlog_latch_.soft_unlock();
mtx.unlock();
}
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) {
struct vlog_info* vinfo = it->second;
// 使用 vlog_info* 进行操作
if (vinfo->curr_size + value_size <= VLOG_SIZE) {
return vinfo;
}
}
// 所有vlog已满,创建新vlog
return nullptr;
}
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) {
mtx.lock();
auto vinfo = get_writable_vlog_info(value.size());
if (!vinfo) {
// vlog全部已满,创建新的vlog
auto _vlog_num_ = register_new_vlog();
vinfo = get_vlog_info(_vlog_num_);
}
vinfo->vlog_info_latch_.lock();
*vlog_num = vinfo->vlog_num;
*value_offset = vinfo->curr_size;
vinfo->curr_size += value.size();
vinfo->value_nums ++;
vinfo->vlog_info_latch_.unlock();
auto vhandler = get_vlog_handler(*vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.hard_lock();
vinfo->vlog_info_latch_.unlock();
write_vlog_value(*vlog_num, *value_offset, value);
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
// auto vlog_num_gc = vinfo->vlog_num_for_gc;
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// vinfo->vlog_info_latch_.unlock();
// vinfo = get_vlog_info(vlog_num_gc);
// vinfo->vlog_info_latch_.lock();
}
vhandler->vlog_latch_.hard_lock();
mark_del_value(vlog_num, value_offset);
vinfo->vlog_info_latch_.unlock();
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
size_t VlogSet::register_new_vlog() {
size_t vn = vlog_nums_;
std::string vlog_name = get_vlog_name(vn);
register_inconfig_file(vn);
create_vlog(vlog_name);
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
// if (!vlog_new->is_open()) {
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
// std::exit(EXIT_FAILURE);
// }
register_vlog_inmaps(vn, vlog_name);
vlog_nums_ ++;
// vlog_count_ ++;
return vn;
}
void VlogSet::remove_old_vlog(size_t old_vlog_num) {
// after gc, new_vlog has been created
// new vlog should have been created here !
mtx.lock();
auto vi_old = get_vlog_info(old_vlog_num);
vi_old->vlog_info_latch_.lock();
std::string old_vlog_name = get_vlog_name(old_vlog_num);
remove_from_config_file(old_vlog_num);
remove_vlog_from_maps(old_vlog_name);
vi_old->vlog_valid_ = false;
vi_old->vlog_info_latch_.unlock();
// vlog_count_ --;
mtx.unlock();
}
bool VlogSet::vlog_need_gc(size_t vlog_num) {
std::string vlog_name = get_vlog_name(vlog_num);
auto vi = vlog_info_map_[vlog_name];
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
return retval;
}
void VlogSet::register_inconfig_file(size_t vlog_num) {
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
// first size_t in config file indicates current vlog_nums_(size_t)
// second size_t in config file indicates current vlog_count_(size_t)
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t));
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t));
config_file_->flush();
}
void VlogSet::remove_from_config_file(size_t vlog_num) {
char tmp[vlog_nums_*vlog_num_size];
size_t index = 0;
config_file_->seekp(sizeof(size_t));
config_file_->read(tmp, sizeof(tmp));
size_t *vlog_num_ptr = reinterpret_cast<size_t*>(tmp);
for (auto i = 0; i < vlog_nums_; i++) {
size_t curr_vlog_num = *vlog_num_ptr;
if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) {
curr_vlog_num &= CONFIG_FILE_DELE_MASK;
config_file_->seekp(sizeof(size_t) + i*sizeof(size_t));
config_file_->write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t));
config_file_->flush();
break;
}
}
}
void VlogSet::create_vlog(std::string &vlog_name) {
std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out);
char tmp[2*sizeof(size_t)];
memset(tmp, 0, sizeof(tmp));
vlog_new->write(tmp, sizeof(tmp));
vlog_new->flush();
vlog_new->close();
}
inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) {
auto vlog_name = get_vlog_name(vi->vlog_num);
vlog_info_map_[vlog_name] = vi;
vlog_handler_map_[vlog_name] = new vlog_handler();
}
inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) {
vlog_info_map_[vlog_name] = new vlog_info(vlog_num);
vlog_handler_map_[vlog_name] = new vlog_handler();
}
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
auto vi = vlog_info_map_[vlog_name];
// auto vh = vlog_handler_map_[vlog_name];
vi->vlog_info_latch_.lock();
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag
// vh->handler_->close();
assert(!std::remove(vlog_name.c_str()));
vlog_handler_map_.erase(vlog_name);
vi->vlog_info_latch_.unlock();
}
inline std::string VlogSet::get_config_file_name() {
return dbname + "_config_file";
}
std::string VlogSet::get_vlog_name(size_t vlog_num) {
return dbname + "_vlog_" + std::to_string(vlog_num);
}
struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) {
return vlog_info_map_[get_vlog_name(vlog_num)];
}
struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) {
return vlog_handler_map_[get_vlog_name(vlog_num)];
}
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
value_size &= VALUE_SIZE_MASK;
value->assign(&value_buff[sizeof(uint16_t)], value_size);
handler.close();
}
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) {
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
const char *value_buff = value.data();
handler.write(value_buff, value.size());
auto vinfo = get_vlog_info(vlog_num);
vinfo->value_nums ++;
vinfo->curr_size += value.size();
handler.flush();
handler.close();
}
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
auto vinfo = get_vlog_info(vlog_num);
auto vlog_name = get_vlog_name(vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
if (value_size & VALUE_DELE_MASK) {
// case when value has been deleted
handler.close();
return ;
}
assert(!(value_size & VALUE_DELE_MASK));
uint16_t masked_value_size = value_size & 0xffff;
memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
handler.write(value_buff, value_size);
handler.flush();
handler.close();
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
// auto vinfo = get_vlog_info(vlog_num);
vinfo->discard ++;
vinfo->value_nums --;
vinfo->curr_size -= value_size & VALUE_SIZE_MASK;
if (vlog_need_gc(vlog_num)) {
// create new vlog
vinfo->vlog_valid_ = false;
vinfo->vlog_num_for_gc = register_new_vlog();
// vinfo->vlog_valid_ = false;
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc);
}
}

+ 67
- 0
db/vlog_set.h View File

@ -0,0 +1,67 @@
//
// Created by on 2024/12/29.
//
#ifndef LEVELDB_VLOG_SET_H
#define LEVELDB_VLOG_SET_H
#include <unordered_map>
#include <string>
#include <mutex>
#include <cassert>
#include "../include/leveldb/slice.h"
#include "../db/shared_lock.h"
#include "../db/vlog.h"
// VlogGC
class VlogGC;
class VlogSet {
friend class VlogGC;
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1))
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK)
public:
VlogSet(std::string dbname, VlogGC *vlog_gc);
~VlogSet();
void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value);
void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value);
void del_value(uint32_t vlog_num, uint32_t value_offset);
void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; }
private:
size_t register_new_vlog();
void remove_old_vlog(size_t old_vlog_num);
bool vlog_need_gc(size_t vlog_num);
void register_inconfig_file(size_t vlog_num);
void remove_from_config_file(size_t vlog_num);
void create_vlog(std::string &vlog_name);
struct vlog_info *get_writable_vlog_info(size_t value_size);
inline void restore_vlog_inmaps(struct vlog_info *vi);
inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name);
inline void remove_vlog_from_maps(std::string &vlog_name);
inline std::string get_config_file_name();
std::string get_vlog_name(size_t vlog_num);
struct vlog_info *get_vlog_info(size_t vlog_num);
struct vlog_handler *get_vlog_handler(size_t vlog_num);
void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value);
void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value);
void mark_del_value(uint32_t vlog_num, uint32_t value_offset);
private:
std::mutex mtx;
std::string dbname;
size_t vlog_nums_;
// size_t vlog_count_;
std::unordered_map<std::string, struct vlog_info *> vlog_info_map_;
std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_;
std::fstream *config_file_;
VlogGC *vlog_gc; // vlog_set.cpp
};
#endif // LEVELDB_VLOG_SET_H

+ 127
- 0
test/db_test4.cc View File

@ -0,0 +1,127 @@
//
// Created by 马也驰 on 2024/12/20.
//
#include "../db/slotpage.h"
#include <thread>
#define SLOTNUM 8192
void printVector(const std::vector<size_t>& vec) {
for (size_t i = 0; i < vec.size(); ++i) {
std::cout << vec[i] << " ";
}
std::cout << std::endl;
}
int test1() {
BitMap bitmap;
std::vector<size_t> slots;
slots.reserve(SLOTNUM<<2);
int i = 0;
for (i = 0; i < 2047; i++) {
slots.push_back(bitmap.alloc_slot());
}
slots.push_back(bitmap.alloc_slot());
for (; i < SLOTNUM<<2; i++) {
slots.push_back(bitmap.alloc_slot());
}
printVector(slots);
std::cout << "\n\n";
for (i = 0; i < 100; i++) {
bitmap.dealloc_slot(slots.at(i));
}
bitmap.dealloc_slot(10000);
bitmap.dealloc_slot(11000);
bitmap.dealloc_slot(12000);
bitmap.show_allocated_slot();
std::cout << "\n\n";
for (i = 0; i < 103; i++) {
bitmap.alloc_slot();
}
bitmap.show_allocated_slot();
return 1;
}
int test2() {
BitMap bitmap;
std::vector<size_t> slots;
slots.reserve(SLOTNUM);
for (auto i = 0; i < SLOTNUM; i++) {
slots.push_back(bitmap.alloc_slot());
}
printVector(slots);
std::cout << "\n\n";
bitmap.dealloc_slot(30);
bitmap.dealloc_slot(40);
bitmap.dealloc_slot(50);
bitmap.show_allocated_slot();
std::cout << "\n\n";
bitmap.alloc_slot();
bitmap.alloc_slot();
bitmap.show_allocated_slot();
return 1;
}
void allocate_slots(BitMap& bitmap, std::vector<size_t>& slots, int num_slots) {
for (int i = 0; i < num_slots; i++) {
slots.push_back(bitmap.alloc_slot());
}
}
int test3() {
BitMap bitmap;
std::vector<size_t> slots1;
slots1.reserve(1000);
std::vector<size_t> slots2;
slots2.reserve(1000);
// Create two threads to allocate slots concurrently.
std::thread thread1(allocate_slots, std::ref(bitmap), std::ref(slots1), 1000);
std::thread thread2(allocate_slots, std::ref(bitmap), std::ref(slots2), 1000);
// Wait for both threads to finish.
thread1.join();
thread2.join();
// Print the results.
std::cout << "Slots allocated by thread 1:\n";
printVector(slots1);
std::cout << "\n\nSlots allocated by thread 2:\n";
printVector(slots2);
std::cout << "\n\n";
std::vector<size_t> combined_slots = slots1;
combined_slots.insert(combined_slots.end(), slots2.begin(), slots2.end());
std::sort(combined_slots.begin(), combined_slots.end());
// Print the sorted combined slots.
std::cout << "Sorted combined slots:\n";
printVector(combined_slots);
std::cout << "\n\n";
return 1;
}
int main() {
test3(); // 有问题
}

+ 148
- 0
test/db_test5.cc View File

@ -0,0 +1,148 @@
//
// Created by 马也驰 on 2024/12/22.
//
#include <iostream>
#include <thread>
#include <vector>
#include "../db/slotpage.h"
void printVector(const std::vector<uint32_t>& vec) {
for (size_t i = 0; i < vec.size(); ++i) {
std::cout << vec[i] << " ";
}
std::cout << std::endl;
}
int test1() {
SlotCache slot_cache((std::string&)"test_slotpage");
const size_t blksize = 4096;
const size_t blknum = 32;
const size_t slot_content_num = blknum * blksize / sizeof(slot_content);
slot_content slot_contents[slot_content_num];
for (int i = 0; i < slot_content_num; i++) {
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)};
}
//
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content));
slot_content sc;
for (int i = 0; i < slot_content_num; i++) {
slot_cache.get_slot(i, &sc);
assert(sc.vlog_num == slot_contents[i].vlog_num);
assert(sc.value_offset == slot_contents[i].vlog_num);
std::cout << i << std::endl;
}
return 1;
}
int test2() {
SlotCache slot_cache((std::string&)"test_slotpage");
const size_t blksize = 4096;
const size_t blknum = 32;
const size_t slot_content_num = blknum * blksize / sizeof(slot_content);
slot_content slot_contents[slot_content_num];
for (int i = 0; i < slot_content_num; i++) {
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)};
}
//
// slot_cache.write_for_test(reinterpret_cast<char*>(slot_contents), slot_content_num*sizeof(slot_content));
slot_content sc1 = {999999999, 999999999};
// slot_cache.set_slot(100, &sc1);
// slot_cache.set_slot(1000, &sc1);
// slot_cache.set_slot(2000, &sc1);
slot_content sc2 = {0, 0};
slot_cache.get_slot(100, &sc2);
assert(sc2.vlog_num == sc1.vlog_num);
assert(sc2.value_offset == sc1.value_offset);
slot_cache.get_slot(1000, &sc2);
assert(sc2.vlog_num == sc1.vlog_num);
assert(sc2.value_offset == sc1.value_offset);
slot_cache.get_slot(2000, &sc2);
assert(sc2.vlog_num == sc1.vlog_num);
assert(sc2.value_offset == sc1.value_offset);
// for (int i = 0; i < slot_content_num; i++) {
// slot_cache.get_slot(i, &sc);
// assert(sc.vlog_num == slot_contents[i].vlog_num);
// assert(sc.value_offset == slot_contents[i].vlog_num);
// std::cout << i << std::endl;
// }
return 1;
}
void process_slots(SlotCache& slot_cache,
const slot_content* slot_contents,
size_t start, size_t end,
std::vector<uint32_t>& slots) {
slot_content sc;
for (size_t i = start; i < end; ++i) {
slot_cache.get_slot(i, &sc);
// assert(sc.vlog_num == slot_contents[i].vlog_num);
// assert(sc.value_offset == slot_contents[i].vlog_num);
slots.push_back(sc.vlog_num);
}
}
int test3() {
SlotCache slot_cache((std::string&)("test_slotpage"));
const size_t blksize = 4096;
const size_t blknum = 32;
const size_t slot_content_num = blknum * blksize / sizeof(slot_content);
slot_content slot_contents[slot_content_num];
for (size_t i = 0; i < slot_content_num; i++) {
slot_contents[i] = {static_cast<uint32_t>(i), static_cast<uint32_t>(i)};
}
std::vector<uint32_t> slots1;
std::vector<uint32_t> slots2;
// 创建两个线程分别处理 slots1 和 slots2
std::thread t1(process_slots, std::ref(slot_cache), slot_contents, 0, 100, std::ref(slots1));
std::thread t2(process_slots, std::ref(slot_cache), slot_contents, 100, 200, std::ref(slots2));
// 等待线程完成
t1.join();
t2.join();
// 打印两个 vector 的内容
std::cout << "Slots1:" << std::endl;
printVector(slots1);
std::cout << std::endl;
std::cout << "Slots2:" << std::endl;
printVector(slots2);
std::cout << std::endl;
// 合并并排序
std::vector<uint32_t> merged_slots = slots1;
merged_slots.insert(merged_slots.end(), slots2.begin(), slots2.end());
std::sort(merged_slots.begin(), merged_slots.end());
// 打印合并排序后的结果
std::cout << "Merged and Sorted Slots:" << std::endl;
printVector(merged_slots);
std::cout << std::endl;
return 1;
}
int main() {
test3();
}

Loading…
Cancel
Save