Browse Source

Merge remote-tracking branch 'origin/master'

master
马也驰 8 months ago
parent
commit
e6584e7210
15 changed files with 647 additions and 316 deletions
  1. +3
    -2
      CMakeLists.txt
  2. +80
    -92
      db/db_impl.cc
  3. +3
    -3
      db/db_impl.h
  4. +74
    -0
      db/gc_executor.cpp
  5. +40
    -0
      db/gc_executor.h
  6. +1
    -2
      db/shared_lock.h
  7. +67
    -1
      db/slotpage.h
  8. +0
    -67
      db/threadpool.h
  9. +18
    -11
      db/vlog.h
  10. +182
    -51
      db/vlog_gc.cpp
  11. +26
    -4
      db/vlog_gc.h
  12. +93
    -72
      db/vlog_set.cpp
  13. +14
    -7
      db/vlog_set.h
  14. +1
    -1
      include/leveldb/db.h
  15. +45
    -3
      test/db_test1.cc

+ 3
- 2
CMakeLists.txt View File

@ -120,12 +120,13 @@ include(GNUInstallDirs)
add_library(leveldb ""
db/slotpage.h
db/vlog.h
db/threadpool.h
db/shared_lock.h
db/vlog_set.cpp
db/vlog_set.h
db/vlog_gc.cpp
db/vlog_gc.h)
db/vlog_gc.h
db/gc_executor.cpp
db/gc_executor.h)
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"

+ 80
- 92
db/db_impl.cc View File

@ -10,6 +10,7 @@
#include <cstdio>
#include <set>
#include <string>
#include <cstring>
#include <vector>
#include <iomanip>
#include <iostream>
@ -1188,7 +1189,7 @@ Status DBImpl::get_slot_num(const ReadOptions& options, const Slice& key,
}
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray* fields) {
FieldArray& fields) {
// Todo(begin)
size_t slot_num;
// 从value中提取slot_num
@ -1200,10 +1201,10 @@ Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
vlog_set_->get_value(sc, &vlog_value);
std::cout << "value from value_log: " << key.ToString() << vlog_value << std::endl;
*fields = DeserializeValue(vlog_value);
DeserializeValue(fields, vlog_value);
return Status::OK();
// Todo(end)
}
@ -1212,12 +1213,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
size_t slot_num;
auto s = get_slot_num(options, key, &slot_num);
if (!s.ok()) {
return s;
}
// TODO: search the slotpage and get value from vlog
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
vlog_set_->get_value(sc, &vlog_value);
*value = vlog_value;
return s;
@ -1255,13 +1259,13 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) {
// TODO(begin): allocate slot_num in slotpage and put value in vlog
// 将字段数组序列化
std::string serialized_value = SerializeValue(fields);
std::string serialized_value;
std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl;
size_t slot_num = slot_page_->alloc_slot();
SerializeValue(fields, serialized_value, slot_num);
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value);
vlog_set_->put_value(sc, slot_num, serialized_value);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
@ -1269,16 +1273,12 @@ Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
Slice slot_val(data, sizeof(data));
return DB::Put(opt, key, slot_val);
// TODO(end)
}
Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& val) {
// TODO: allocate slot_num in slotpage and put value in vlog
size_t slot_num = slot_page_->alloc_slot();
// std::string slot_num_str((char *)&slot_num, sizeof(size_t));
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val);
vlog_set_->put_value(sc, slot_num, val);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
@ -1291,10 +1291,14 @@ Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& val)
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
size_t slot_num;
auto s = get_slot_num(ReadOptions(), key, &slot_num);
if (!s.ok()) {
return s;
}
struct slot_content sc;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->del_value(sc.vlog_num, sc.value_offset);
vlog_set_->del_value(sc);
slot_page_->dealloc_slot(slot_num);
return DB::Delete(options, key);
}
@ -1579,88 +1583,72 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref();
}
// Todo(begin)
// 反序列化函数,将字符串解码为字段数组
FieldArray DBImpl::DeserializeValue(const std::string& value_str) {
// 存放解析后的字段数组
FieldArray fields;
// 将输入字符串转换为输入流 iss, 方便读取
std::istringstream iss(value_str);
std::string content;
// 临时存放读取的数据
char buffer[100];
// 读取长度(定长,16比特)
iss.read(buffer, 16);
buffer[16] = '\0';
size_t total_length = std::stoi(buffer);
// std::cout << "读取到的总长度为: " << total_length << std::endl;
std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// std::cout << value_content << std::endl;
std::istringstream iss_content(value_content);
iss_content.read(buffer, sizeof(size_t));
buffer[sizeof(size_t)] = '\0';
std::string slot_num = buffer;
// 读取属性个数
iss_content.read(buffer, 16);
// 在第17个比特位处添加终结符,确保字符串以终结符结尾
buffer[16] = '\0';
// 将 buffer 中的内容转化为整数并赋值给 field_count
int field_count = std::stoi(buffer);
// std::cout << "读取到的字段个数为: " << field_count << std::endl;
for (int i = 0; i < field_count; ++i) {
Field field;
// 读取属性名长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int name_length = std::stoi(buffer);
// std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// 读取属性名(变长)
field.name.resize(name_length);
iss_content.read(&field.name[0], name_length);
// std::cout << "读取到的属性名为: " << field.name << std::endl;
// 读取属性值长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int value_length = std::stoi(buffer);
// std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// 读取属性值(变长)
field.value.resize(value_length);
iss_content.read(&field.value[0], value_length);
// std::cout << "读取到的属性值为: " << field.value << std::endl;
fields.push_back(field);
}
return fields;
//using FieldArray = std::vector<Field>;
//struct Field { std::string name; std::string value; };
void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) {
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
const char *value_data = value_str.c_str();
const size_t value_len = value_str.size();
size_t attr_off = sizeof(uint16_t);
while (attr_off < value_len) {
uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off);
attr_off += sizeof(uint8_t);
auto attr_name = std::string(value_data+attr_off, attr_name_len);
attr_off += attr_name_len;
uint16_t attr_len = *(uint16_t *)(value_data+attr_off);
attr_off += sizeof(uint16_t);
auto attr_value = std::string(value_data+attr_off, attr_len);
attr_off += attr_len;
fields.push_back({attr_name, attr_value});
}
assert(attr_off == value_len);
}
// Todo(end)
// Todo(begin)
// 序列化函数,将字段数组序列化为字符串
std::string DBImpl::SerializeValue(const FieldArray& fields) {
// 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
std::ostringstream oss_temp;
std::string slot_num = "slot_num";
oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
oss_temp << std::setw(16) << std::setfill('0') << fields.size();
// 序列化函数,将字段数组序列化为字符串+
//using FieldArray = std::vector<Field>;
//struct Field { std::string name; std::string value; };
void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num) {
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
std::string tmp_value;
uint16_t value_size = sizeof(uint16_t);
uint16_t field_nums = 0;
for (const auto& field : fields) {
// 写入属性名长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// 写入属性名(变长)
oss_temp << field.name;
// 写入属性值长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// 写入属性值(变长)
oss_temp << field.value;
}
std::string temp_str = oss_temp.str();
size_t value_length = temp_str.size();
std::ostringstream oss;
oss << std::setw(16) << std::setfill('0') << value_length;
oss << temp_str;
return oss.str();
const uint8_t attr_name_len = field.name.size();
const uint16_t attr_value_len = field.value.size();
const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t);
char attr_data[attr_size];
size_t off = 0;
memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t));
off += sizeof(uint8_t);
memcpy(attr_data+off, field.name.c_str(), attr_name_len);
off += attr_name_len;
memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t));
off += sizeof(uint16_t);
memcpy(attr_data+off, field.value.c_str(), attr_value_len);
off += attr_value_len;
assert(off == attr_size);
tmp_value += std::string(attr_data, attr_size);
value_size += attr_size;
field_nums ++;
}
char value_data[value_size];
memcpy(value_data, &value_size, sizeof(uint16_t));
memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size());
assert(sizeof(uint16_t) + tmp_value.size() == value_size);
value = std::string(value_data, value_size);
}
// Todo(end)
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

+ 3
- 3
db/db_impl.h View File

@ -53,7 +53,7 @@ class DBImpl : public DB {
std::string* value) override;
// Todo(begin)
Status Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray* fields) override;
FieldArray& fields) override;
// Todo(end)
Iterator* NewIterator(const ReadOptions&) override;
const Snapshot* GetSnapshot() override;
@ -91,8 +91,8 @@ class DBImpl : public DB {
// TODO(begin)
SlotPage *slot_page_;
VlogSet *vlog_set_;
static std::string SerializeValue(const FieldArray& fields);
static FieldArray DeserializeValue(const std::string& value_str);
static void SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num);
static void DeserializeValue(FieldArray& fields, const std::string& value_str);
// TODO(end)
// Information for a manual compaction
struct ManualCompaction {

+ 74
- 0
db/gc_executor.cpp View File

@ -0,0 +1,74 @@
//
// Created by 马也驰 on 2025/1/3.
//
#include "gc_executor.h"
#include "vlog_set.h"
#include "vlog_gc.h"
void gc_executor::exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) {
auto vlog_set = vlog_gc_->vlog_set;
auto slot_page_ = vlog_gc_->slot_page_;
vlog_set->mtx.lock();
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
old_vlog_info->vlog_info_latch_.lock();
old_vlog_handler->vlog_latch_.soft_lock();
new_vlog_info->vlog_info_latch_.lock();
new_vlog_handler->vlog_latch_.hard_lock();
vlog_set->mtx.unlock();
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
char old_vlog_buff[VLOG_SIZE];
char new_vlog_buff[VLOG_SIZE];
old_vlog.seekp(0);
old_vlog.read(old_vlog_buff, VLOG_SIZE);
size_t value_nums = old_vlog_info->value_nums;
size_t ovb_off = 2 * sizeof(size_t);
size_t nvb_off = 2 * sizeof(size_t);
size_t new_vlog_value_nums = 0;
for (auto i = 0; i < value_nums; i++) {
char *value = &old_vlog_buff[ovb_off];
uint16_t value_len = get_value_len(value);
size_t slot_num = get_value_slotnum(value);
if (!value_deleted(value_len)) {
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t));
struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
slot_page_->set_slot(slot_num, &scn);
nvb_off += value_len;
new_vlog_value_nums ++;
}
ovb_off += value_len;
}
new_vlog_info->value_nums = new_vlog_value_nums;
new_vlog_info->curr_size = nvb_off;
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
new_vlog.seekp(0);
new_vlog.write(new_vlog_buff, VLOG_SIZE);
new_vlog.flush();
old_vlog.close();
new_vlog.close();
old_vlog_info->vlog_valid_ = false;
vlog_set->remove_old_vlog(old_vlog_num);
old_vlog_info->vlog_info_latch_.unlock();
old_vlog_handler->vlog_latch_.soft_unlock();
new_vlog_info->vlog_info_latch_.unlock();
new_vlog_handler->vlog_latch_.hard_unlock();
// vlog_gc_->gc_counter_decrement();
}

+ 40
- 0
db/gc_executor.h View File

@ -0,0 +1,40 @@
//
// Created by on 2025/1/3.
//
#ifndef LEVELDB_GC_EXECUTOR_H
#define LEVELDB_GC_EXECUTOR_H
#include <cstdlib>
#include <string>
class VlogGC;
class gc_executor {
public:
explicit gc_executor() {}
~gc_executor() = default;
public:
static void exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num);
private:
static inline bool value_deleted(uint16_t value_len) {
return !(value_len >> 15);
}
static inline uint16_t get_value_len(char *value) {
uint16_t value_len;
memcpy(&value_len, value, sizeof(uint16_t));
return value_len;
}
static inline size_t get_value_slotnum(char *value) {
size_t slot_num;
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
return slot_num;
}
};
#endif // LEVELDB_GC_EXECUTOR_H

+ 1
- 2
db/shared_lock.h View File

@ -45,8 +45,7 @@ class SharedLock {
std::mutex read_latch_; // indicate if any read is proceeding
std::mutex read_count_latch_;
volatile size_t read_count_;
std::mutex write_latch_;
// volatile size_t write_count_;
std::mutex write_latch_; // indicate if any write is proceeding
};
#endif // LEVELDB_SHARED_LOCK_H

+ 67
- 1
db/slotpage.h View File

@ -15,6 +15,8 @@
#include <iostream>
#include <unordered_map>
// bitmap: || bitmap_size(size_t) | first_empty_slot(size_t) || bits... |
#define BITMAP_SIZE 8192
#define BITS_PER_BYTE 8
@ -183,6 +185,69 @@ public:
first_empty_slot = 0;
}
BitMap(const std::string &dbname) {
name = dbname + "_bitmap";
auto bitmap_handler = std::fstream(name, std::ios::in | std::ios::out);
if (!bitmap_handler.is_open()) {
//
bitmap_handler = std::fstream(name, std::ios::out);
bitmap_handler.close();
//
bitmap_handler = std::fstream(name, std::ios::in | std::ios::out);
bitmap_handler.seekp(0);
size_t tmp[2] = {2*sizeof(size_t), 0};
bitmap_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
bitmap_handler.close();
// init bitmap
char *bitmap = static_cast<char*>(malloc(BITMAP_SIZE*sizeof(char)));
memset(bitmap, 0, BITMAP_SIZE * sizeof(char));
bitmaps_.push_back(bitmap);
size = BITMAP_SIZE;
first_empty_slot = 0;
} else {
// read in bitmap content
size_t bitmap_header[2];
bitmap_handler.seekp(0);
bitmap_handler.read(reinterpret_cast<char*>(bitmap_header), 2*sizeof(size_t));
this->first_empty_slot = bitmap_header[1];
this->size = bitmap_header[0];
const size_t page_num = (size - 2*sizeof(size_t)) / BITMAP_SIZE;
for (auto i = 0; i < page_num; i++) {
char *bitmap = static_cast<char*>(malloc(BITMAP_SIZE*sizeof(char)));
bitmap_handler.seekp(2*sizeof(size_t) + i*BITMAP_SIZE);
bitmap_handler.read(bitmap, BITMAP_SIZE);
bitmaps_.push_back(bitmap);
}
// init bitmap
if (page_num == 0) {
char *bitmap = static_cast<char*>(malloc(BITMAP_SIZE*sizeof(char)));
memset(bitmap, 0, BITMAP_SIZE * sizeof(char));
bitmaps_.push_back(bitmap);
size = BITMAP_SIZE;
first_empty_slot = 0;
}
}
}
~BitMap() {
auto bitmap_handler = std::fstream(name, std::ios::in | std::ios::out);
assert(bitmap_handler.is_open());
size_t tmp[2] = {size, first_empty_slot};
bitmap_handler.seekp(0);
bitmap_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
size_t off = 2 * sizeof(size_t);
for (auto bitmap : bitmaps_) {
bitmap_handler.seekp(off);
bitmap_handler.write(bitmap, BITMAP_SIZE);
off += BITMAP_SIZE;
}
assert(off == size);
bitmap_handler.flush();
}
/** methods for test **/
void show_allocated_slot() {
for (int i = 0; i < this->size; i++) {
@ -284,6 +349,7 @@ private:
}
private:
std::string name;
std::vector<char *> bitmaps_;
size_t size;
size_t first_empty_slot;
@ -299,7 +365,7 @@ class SlotPage {
public:
SlotPage(const std::string &dbname) {
slotpage_fname = slotpage_handler_name(dbname);
bitmap = new BitMap();
bitmap = new BitMap(dbname);
slotcache = new SlotCache(slotpage_fname);
assert(slotcache);
}

+ 0
- 67
db/threadpool.h View File

@ -1,67 +0,0 @@
//
// Created by on 2024/12/28.
//
#ifndef LEVELDB_THREADPOOL_H
#define LEVELDB_THREADPOOL_H
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <condition_variable>
#include <atomic>
class ThreadPool {
public:
explicit ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(std::move(task));
}
condition.notify_one();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
std::atomic<bool> stop;
};
#endif // LEVELDB_THREADPOOL_H

+ 18
- 11
db/vlog.h View File

@ -12,20 +12,23 @@
#include <cstdio>
#include "../db/threadpool.h"
#include "../include/leveldb/slice.h"
#include "../db/slotpage.h"
#include "../db/shared_lock.h"
// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... |
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) value |
// config file: || vlog_nums_(size_t) || vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... |
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
// {field_nums(uint16_t), attr1, attr2, ... } is wrapped/unwrapped in dm_impl
// || value_size(uint16_t) | slot_num(size_t) || VALUE: {field_nums(uint16_t), attr1, attr2, ... } | is wrapped/unwrapped in vlog_set
#define vlog_num_size sizeof(size_t)
#define vlog_name_size 256
#define KiB 1024
#define MiB 1024*KiB
#define VLOG_SIZE 32*MiB
#define VLOG_SIZE 1*MiB // origin: 32*MiB
#define GC_THREDHOLD 0.5
#define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number
@ -46,18 +49,22 @@
// }
//};
struct vlog_info {
std::mutex vlog_info_latch_; // vlog_info本身的并发修改
size_t vlog_num;
size_t vlog_num_for_gc; // set when start gc
bool vlog_valid_;
bool processing_gc;; // init to be false, set as true when processing gc
bool vlog_valid_; // init to be true, set as false after deleted
size_t discard;
size_t value_nums;
size_t curr_size;
vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0),
vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {}
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums),
vlog_num(vlog_num), curr_size(curr_size) {}
vlog_info(size_t vlog_num) : processing_gc(false), discard(0), value_nums(0),
vlog_num(vlog_num), curr_size(2*sizeof(size_t)), vlog_valid_(true) {}
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : processing_gc(false),
discard(0), value_nums(value_nums),
vlog_num(vlog_num), curr_size(curr_size),
vlog_valid_(true) {}
};
struct vlog_handler {

+ 182
- 51
db/vlog_gc.cpp View File

@ -4,65 +4,196 @@
#include "vlog_gc.h"
#include "../db/vlog_set.h"
#include <thread>
#include <utility>
struct executor_param {
VlogGC *vg;
size_t old_vlog_num;
size_t new_vlog_num;
};
std::mutex map_latch_;
std::unordered_map<size_t, struct executor_param> executor_params_map_;
std::unordered_map<size_t, VlogGC *> vlog_gc_map_;
void test_func() {
int nums[4] = {1, 2, 3, 4};
for (int i = 0; i < 4; i++) {
nums[i] ++;
}
}
void add_executor_params(size_t gc_num, struct executor_param &ep) {
map_latch_.lock();
executor_params_map_[gc_num] = ep;
map_latch_.unlock();
}
struct executor_param get_executor_params(size_t gc_num) {
map_latch_.lock();
auto ep = executor_params_map_[gc_num];
map_latch_.unlock();
return ep;
}
void del_executor_params(size_t gc_num) {
map_latch_.lock();
executor_params_map_.erase(gc_num);
map_latch_.unlock();
}
void add_vlog_gc(size_t gc_num, VlogGC *vg) {
map_latch_.lock();
vlog_gc_map_[gc_num] = vg;
map_latch_.unlock();
}
VlogGC * get_vlog_gc(size_t gc_num) {
map_latch_.lock();
auto vg = vlog_gc_map_[gc_num];
map_latch_.unlock();
return vg;
}
void del_vlog_gc(size_t gc_num) {
map_latch_.lock();
vlog_gc_map_.erase(gc_num);
map_latch_.unlock();
}
// 函数:增加 counter
void VlogGC::gc_counter_increment() {
vlog_set->counter_latch_.lock();
++vlog_set->counter; // 增加 counter
std::cout << "Increment: Counter incremented to " << vlog_set->counter << "\n";
vlog_set->counter_latch_.unlock();
}
// 函数:减少 counter
void VlogGC::gc_counter_decrement() {
vlog_set->counter_latch_.lock();
--vlog_set->counter; // 减少 counter
std::cout << "Decrement: Counter decremented to " << vlog_set->counter << "\n";
if (vlog_set->counter == 0) {
// 如果 counter 为 0
vlog_set->finished_latch_.lock();
vlog_set->finished = true; // 设置完成标志
vlog_set->finished_latch_.unlock();
}
vlog_set->counter_latch_.unlock();
}
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]()
{ this->exec_gc(old_vlog_num, new_vlog_num); });
gc_counter_increment();
size_t _gc_num_ = get_gc_num();
struct executor_param ep = {this, old_vlog_num, new_vlog_num};
add_executor_params(_gc_num_, ep);
add_vlog_gc(_gc_num_, this);
std::thread gc_thread([_gc_num_]() mutable {
auto _vlog_gc_ = get_vlog_gc(_gc_num_);
assert(_vlog_gc_ != nullptr);
_vlog_gc_->exec_gc(_gc_num_);
});
}
void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) {
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
old_vlog_handler->vlog_latch_.soft_lock();
new_vlog_info->vlog_info_latch_.lock();
new_vlog_handler->vlog_latch_.hard_lock();
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
char old_vlog_buff[VLOG_SIZE];
char new_vlog_buff[VLOG_SIZE];
old_vlog.seekp(0);
old_vlog.read(old_vlog_buff, VLOG_SIZE);
// memcpy(new_vlog_buff, &value_nums, sizeof(size_t));
size_t value_nums = old_vlog_info->value_nums;
size_t ovb_off = 2 * sizeof(size_t);
size_t nvb_off = 2 * sizeof(size_t);
size_t new_vlog_value_nums = 0;
for (auto i = 0; i < value_nums; i++) {
char *value = &old_vlog_buff[ovb_off];
uint16_t value_len = get_value_len(value);
size_t slot_num = get_value_slotnum(value);
if (!value_deleted(value_len)) {
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
slot_page_->set_slot(slot_num, &scn);
nvb_off += value_len;
new_vlog_value_nums ++;
}
ovb_off += value_len;
void VlogGC::exec_gc(size_t gc_num_) {
// FIXME: might break, due to unknown concurrency problem
curr_thread_nums_latch_.lock();
curr_thread_nums_ ++;
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.lock();
}
new_vlog_info->value_nums = value_nums;
new_vlog_info->curr_size = nvb_off;
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
new_vlog.write(new_vlog_buff, VLOG_SIZE);
new_vlog.flush();
curr_thread_nums_latch_.unlock();
old_vlog.close();
new_vlog.close();
// start gc process
auto ep = get_executor_params(gc_num_);
gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
// test_func();
old_vlog_handler->vlog_latch_.soft_unlock();
new_vlog_info->vlog_info_latch_.unlock();
new_vlog_handler->vlog_latch_.hard_unlock();
curr_thread_nums_latch_.lock();
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.unlock();
}
curr_thread_nums_ --;
curr_thread_nums_latch_.unlock();
vlog_set->remove_old_vlog(old_vlog_num);
gc_counter_decrement();
del_executor_params(gc_num_);
del_vlog_gc(gc_num_);
}
// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... |
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
//void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) {
// vlog_set->mtx.lock();
// auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
// auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
// auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
// auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
// auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
// auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
//
// old_vlog_info->vlog_info_latch_.lock();
// old_vlog_handler->vlog_latch_.soft_lock();
// new_vlog_info->vlog_info_latch_.lock();
// new_vlog_handler->vlog_latch_.hard_lock();
// vlog_set->mtx.unlock();
//
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
//
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
// old_vlog.seekp(0);
// old_vlog.read(old_vlog_buff, VLOG_SIZE);
//
// size_t value_nums = old_vlog_info->value_nums;
// size_t ovb_off = 2 * sizeof(size_t);
// size_t nvb_off = 2 * sizeof(size_t);
// size_t new_vlog_value_nums = 0;
// for (auto i = 0; i < value_nums; i++) {
// char *value = &old_vlog_buff[ovb_off];
// uint16_t value_len = get_value_len(value);
// size_t slot_num = get_value_slotnum(value);
// if (!value_deleted(value_len)) {
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
// memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t));
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
// slot_page_->set_slot(slot_num, &scn);
// nvb_off += value_len;
// new_vlog_value_nums ++;
// }
// ovb_off += value_len;
// }
// new_vlog_info->value_nums = new_vlog_value_nums;
// new_vlog_info->curr_size = nvb_off;
// memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
// memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
// new_vlog.seekp(0);
// new_vlog.write(new_vlog_buff, VLOG_SIZE);
// new_vlog.flush();
//
// old_vlog.close();
// new_vlog.close();
//
// old_vlog_info->vlog_valid_ = false;
// vlog_set->remove_old_vlog(old_vlog_num);
//
// old_vlog_info->vlog_info_latch_.unlock();
// old_vlog_handler->vlog_latch_.soft_unlock();
// new_vlog_info->vlog_info_latch_.unlock();
// new_vlog_handler->vlog_latch_.hard_unlock();
//
//}
//

+ 26
- 4
db/vlog_gc.h View File

@ -9,19 +9,27 @@
#include <string>
#include <mutex>
#include "../db/slotpage.h"
#include "../db/threadpool.h"
#include "../db/vlog.h"
#include "../db/gc_executor.h"
// VlogSet
class VlogSet;
class VlogGC {
#define THREAD_NUM 8
friend class gc_executor;
public:
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {}
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs),max_thread_nums_(THREAD_NUM),
curr_thread_nums_(0), gc_num(0) {}
~VlogGC() {}
void do_gc(size_t old_vlog_num, size_t new_vlog_num);
private:
void exec_gc(size_t old_vlog_num, size_t new_vlog_num);
void exec_gc(size_t gc_num_);
void gc_counter_increment();
void gc_counter_decrement();
inline bool value_deleted(uint16_t value_len) {
return !(value_len >> 15);
@ -36,10 +44,24 @@ class VlogGC {
memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t));
return slot_num;
}
inline size_t get_gc_num() {
gc_num_latch_.lock();
size_t _gc_num_ = gc_num ++;
gc_num_latch_.unlock();
return _gc_num_;
}
SlotPage *slot_page_;
ThreadPool *thread_pool_;
VlogSet *vlog_set; // vlog_gc.cpp
// NOTE: threadpool
std::mutex full_latch_; // indicate thread pool is full when set as locked
size_t max_thread_nums_;
std::mutex curr_thread_nums_latch_;
size_t curr_thread_nums_;
std::mutex gc_num_latch_;
size_t gc_num;
};

+ 93
- 72
db/vlog_set.cpp View File

@ -4,6 +4,7 @@
#include "vlog_set.h"
#include "../db/vlog_gc.h"
#include <thread>
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
@ -53,7 +54,6 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(
}
}
if (!this->config_file_->is_open()) {
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl;
std::exit(EXIT_FAILURE);
@ -61,6 +61,18 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(
}
VlogSet::~VlogSet() {
// wait for all gc threads to finish
while (true) {
finished_latch_.lock();
if (!finished) {
finished_latch_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(2));
} else {
finished_latch_.unlock();
break;
}
}
config_file_->seekp(0);
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t));
config_file_->flush();
@ -69,6 +81,7 @@ VlogSet::~VlogSet() {
for (auto & it : vlog_info_map_) {
struct vlog_info* vinfo = it.second;
// 所有操作都已经同步完成
// 使用 vlog_info* 进行操作
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums};
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out);
@ -82,20 +95,22 @@ VlogSet::~VlogSet() {
}
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::get_value(const struct slot_content &sc, std::string *value) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
auto vinfo = get_vlog_info(sc.vlog_num);
auto vhandler = get_vlog_handler(sc.vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.soft_lock();
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
read_vlog_value(vlog_numspan>class="p">, value_offset, value);
read_vlog_value(sc, value);
vhandler->vlog_latch_.soft_unlock();
mtx.unlock();
}
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
@ -103,16 +118,21 @@ struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
struct vlog_info* vinfo = it->second;
// 使用 vlog_info* 进行操作
if (vinfo->curr_size + value_size <= VLOG_SIZE) {
vinfo->vlog_info_latch_.lock();
if (vinfo->vlog_valid_ && !vinfo->processing_gc && vinfo->curr_size+value_size <= VLOG_SIZE) {
vinfo->vlog_info_latch_.unlock();
return vinfo;
}
vinfo->vlog_info_latch_.unlock();
}
// 所有vlog已满,创建新vlog
return nullptr;
}
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) {
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
mtx.lock();
auto vinfo = get_writable_vlog_info(value.size());
@ -123,82 +143,69 @@ void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveld
}
vinfo->vlog_info_latch_.lock();
*vlog_num = vinfo->vlog_num;
*value_offset = vinfo->curr_size;
vinfo->curr_size += value.size();
sc.vlog_num = vinfo->vlog_num;
sc.value_offset = vinfo->curr_size;
vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t);
vinfo->value_nums ++;
vinfo->vlog_info_latch_.unlock();
auto vhandler = get_vlog_handler(*vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
auto vhandler = get_vlog_handler(vinfo->vlog_num);
if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.hard_lock();
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
write_vlog_value(*vlog_num, *value_offset, value);
write_vlog_value(sc, slot_num, value);
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) {
void VlogSet::del_value(const struct slot_content &sc) {
mtx.lock();
auto vinfo = get_vlog_info(vlog_num);
auto vhandler = get_vlog_handler(vlog_num);
auto vinfo = get_vlog_info(sc.vlog_num);
auto vhandler = get_vlog_handler(sc.vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_) {
// auto vlog_num_gc = vinfo->vlog_num_for_gc;
if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
// vinfo->vlog_info_latch_.unlock();
// vinfo = get_vlog_info(vlog_num_gc);
// vinfo->vlog_info_latch_.lock();
}
vhandler->vlog_latch_.hard_lock();
mark_del_value(vlog_num, value_offset);
vhandler->vlog_latch_.hard_lock();
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
mark_del_value(sc);
vhandler->vlog_latch_.hard_unlock();
mtx.unlock();
}
size_t VlogSet::register_new_vlog() {
// FIXME: concurrency on config file
size_t vn = vlog_nums_;
std::string vlog_name = get_vlog_name(vn);
register_inconfig_file(vn);
create_vlog(vn);
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
// if (!vlog_new->is_open()) {
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
// std::exit(EXIT_FAILURE);
// }
register_vlog_inmaps(vn, vlog_name);
vlog_nums_ ++;
// vlog_count_ ++;
return vn;
}
// only used in vlog gc
void VlogSet::remove_old_vlog(size_t old_vlog_num) {
// after gc, new_vlog has been created
// new vlog should have been created here !
// new vlog should have been created here
mtx.lock();
auto vi_old = get_vlog_info(old_vlog_num);
vi_old->vlog_info_latch_.lock();
std::string old_vlog_name = get_vlog_name(old_vlog_num);
remove_from_config_file(old_vlog_num);
remove_vlog_from_maps(old_vlog_name);
// remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!!
vi_old->vlog_valid_ = false;
vi_old->vlog_info_latch_.unlock();
// vlog_count_ --;
mtx.unlock();
}
bool VlogSet::vlog_need_gc(size_t vlog_num) {
// FIXME: vlog应该已经满了才行
std::string vlog_name = get_vlog_name(vlog_num);
auto vi = vlog_info_map_[vlog_name];
if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) {
auto vi = get_vlog_info(vlog_num);
if ((double)(vi->curr_size*1.0)/VLOG_SIZE < VLOG_GC_THREHOLD) {
return false;
}
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
@ -209,12 +216,16 @@ void VlogSet::register_inconfig_file(size_t vlog_num) {
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
// first size_t in config file indicates current vlog_nums_(size_t)
// second size_t in config file indicates current vlog_count_(size_t)
config_file_latch_.lock();
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t));
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t));
config_file_->flush();
config_file_latch_.unlock();
}
void VlogSet::remove_from_config_file(size_t vlog_num) {
// FIXME: concurrency on config file
config_file_latch_.lock();
char tmp[vlog_nums_*vlog_num_size];
config_file_->seekp(sizeof(size_t));
config_file_->read(tmp, sizeof(tmp));
@ -229,6 +240,7 @@ void VlogSet::remove_from_config_file(size_t vlog_num) {
break;
}
}
config_file_latch_.unlock();
}
void VlogSet::create_vlog(size_t vlog_num) {
@ -264,14 +276,8 @@ inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_nam
}
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
auto vi = vlog_info_map_[vlog_name];
// auto vh = vlog_handler_map_[vlog_name];
vi->vlog_info_latch_.lock();
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag
// vh->handler_->close();
assert(!std::remove(vlog_name.c_str()));
vlog_handler_map_.erase(vlog_name);
vi->vlog_info_latch_.unlock();
}
inline std::string VlogSet::get_config_file_name() {
@ -290,42 +296,57 @@ struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) {
return vlog_handler_map_[get_vlog_name(vlog_num)];
}
// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) | value |
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) {
auto vlog_name = get_vlog_name(vlog_num);
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) {
auto vlog_name = get_vlog_name(sc.vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
handler.seekp(sc.value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
// FIXME: remove value size
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
if (value_size & VALUE_DELE_MASK) {
*value = "";
return ;
}
value_size &= VALUE_SIZE_MASK;
// *value = std::string(value_buff);
assert(value_size <= VALUE_BUFF_SIZE);
value->assign(&value_buff[sizeof(uint16_t)], value_size);
const size_t off = sizeof(uint16_t)+sizeof(size_t);
*value = std::string(&value_buff[off], value_size-off);
handler.close();
}
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) {
auto vlog_name = get_vlog_name(vlog_num);
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
auto vlog_name = get_vlog_name(sc.vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
handler.seekp(sc.value_offset);
const char *value_buff = value.data();
handler.write(value_buff, value.size());
auto vinfo = get_vlog_info(vlog_num);
const size_t off = sizeof(uint16_t) + sizeof(size_t);
const size_t value_size = off + value.size();
char data[value_size];
memcpy(data, &value_size, sizeof(uint16_t));
memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t));
memcpy(data+off, value_buff, value.size());
handler.write(data, value_size);
handler.flush();
handler.close();
}
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
auto vinfo = get_vlog_info(vlog_num);
auto vlog_name = get_vlog_name(vlog_num);
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::mark_del_value(const struct slot_content &sc) {
auto vinfo = get_vlog_info(sc.vlog_num);
auto vlog_name = get_vlog_name(sc.vlog_num);
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
handler.seekp(value_offset);
handler.seekp(sc.value_offset);
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
@ -337,22 +358,22 @@ void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
return ;
}
assert(!(value_size & VALUE_DELE_MASK));
uint16_t masked_value_size = value_size | VALUE_DELE_MASK;
uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK;
memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
handler.write(value_buff, value_size);
handler.seekp(sc.value_offset);
handler.write(value_buff, sizeof(uint16_t));
handler.flush();
handler.close();
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
// auto vinfo = get_vlog_info(vlog_num);
vinfo->discard ++;
vinfo->value_nums --;
vinfo->curr_size -= value_size & VALUE_SIZE_MASK;
if (vlog_need_gc(vlog_num)) {
vinfo->curr_size -= value_size;
// FIXME: gc process
if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) {
// create new vlog
vinfo->vlog_valid_ = false;
vinfo->processing_gc = true;
vinfo->vlog_num_for_gc = register_new_vlog();
// vinfo->vlog_valid_ = false;
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc);
vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc);
}
}

+ 14
- 7
db/vlog_set.h View File

@ -9,6 +9,7 @@
#include <string>
#include <mutex>
#include <cassert>
#include <condition_variable>
#include "../include/leveldb/slice.h"
#include "../db/shared_lock.h"
#include "../db/vlog.h"
@ -18,6 +19,7 @@ class VlogGC;
class VlogSet {
friend class VlogGC;
friend class gc_executor;
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1))
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK)
@ -25,9 +27,9 @@ friend class VlogGC;
public:
VlogSet(std::string dbname, VlogGC *vlog_gc);
~VlogSet();
void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value);
void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value);
void del_value(uint32_t vlog_num, uint32_t value_offset);
void get_value(const struct slot_content &sc, std::string *value);
void put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value);
void del_value(const struct slot_content &sc);
void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; }
@ -47,19 +49,24 @@ friend class VlogGC;
std::string get_vlog_name(size_t vlog_num);
struct vlog_info *get_vlog_info(size_t vlog_num);
struct vlog_handler *get_vlog_handler(size_t vlog_num);
void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value);
void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value);
void mark_del_value(uint32_t vlog_num, uint32_t value_offset);
void read_vlog_value(const struct slot_content &sc, std::string *value);
void write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value);
void mark_del_value(const struct slot_content &sc);
private:
std::mutex mtx;
std::string dbname;
size_t vlog_nums_;
// size_t vlog_count_;
std::unordered_map<std::string, struct vlog_info *> vlog_info_map_;
std::unordered_map<std::string, struct vlog_handler *> vlog_handler_map_;
std::mutex config_file_latch_;
std::fstream *config_file_;
int counter = 0;
std::mutex counter_latch_;
std::mutex finished_latch_;
bool finished = false;
VlogGC *vlog_gc; // vlog_set.cpp
};

+ 1
- 1
include/leveldb/db.h View File

@ -156,7 +156,7 @@ class LEVELDB_EXPORT DB {
//
// Todo(begin)
virtual Status Put_Fields(const leveldb::WriteOptions& opt, const leveldb::Slice& key, const FieldArray& fields) = 0;
virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray* fields) = 0;
virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray& fields) = 0;
// // Todo(end)
};

+ 45
- 3
test/db_test1.cc View File

@ -4,11 +4,11 @@
using namespace std;
using namespace leveldb;
int main() {
int test1() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
Status status = DB::Open(op, "testdb1", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
@ -18,11 +18,53 @@ int main() {
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Delete(WriteOptions(), "002"); // sc: {0, 33}
db->Get(ReadOptions(), "002", &s1);
cout << s1.size() << endl;
cout<<s1<<endl;
delete db;
return 0;
}
int test2() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb2", &db);
assert(status.ok());
const std::string value_prefix = "value_";
const size_t loop_times = 1000;
for (auto i = 0; i < loop_times; i++) {
auto key = std::to_string(i);
auto value = value_prefix + key;
db->Put(WriteOptions(), key, value);
string s;
db->Get(ReadOptions(), key, &s);
cout << s << " | " << s.size() << endl;
}
cout << endl << "all put are done" << endl << endl;
for (auto i = 0; i < loop_times; i++) {
auto key = std::to_string(i);
auto value = value_prefix + key;
// db->Put(WriteOptions(), key, value+"_");
string s1;
db->Delete(WriteOptions(), key); // sc: {0, 33}
db->Get(ReadOptions(), key, &s1);
cout << s1 << " | " << s1.size() << " | " << i << endl;
}
delete db;
return 0;
}
int main() {
test2();
}

Loading…
Cancel
Save