diff --git a/CMakeLists.txt b/CMakeLists.txt index 373e4f7..6f18549 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,12 +120,13 @@ include(GNUInstallDirs) add_library(leveldb "" db/slotpage.h db/vlog.h - db/threadpool.h db/shared_lock.h db/vlog_set.cpp db/vlog_set.h db/vlog_gc.cpp - db/vlog_gc.h) + db/vlog_gc.h + db/gc_executor.cpp + db/gc_executor.h) target_sources(leveldb PRIVATE "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" diff --git a/db/db_impl.cc b/db/db_impl.cc index 4c0f2e5..7db4001 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -1188,7 +1189,7 @@ Status DBImpl::get_slot_num(const ReadOptions& options, const Slice& key, } Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, - FieldArray* fields) { + FieldArray& fields) { // Todo(begin) size_t slot_num; // 从value中提取slot_num @@ -1200,10 +1201,10 @@ Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, struct slot_content sc; std::string vlog_value; slot_page_->get_slot(slot_num, &sc); - vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); + vlog_set_->get_value(sc, &vlog_value); std::cout << "value from value_log: " << key.ToString() << vlog_value << std::endl; - *fields = DeserializeValue(vlog_value); + DeserializeValue(fields, vlog_value); return Status::OK(); // Todo(end) } @@ -1212,12 +1213,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { size_t slot_num; auto s = get_slot_num(options, key, &slot_num); + if (!s.ok()) { + return s; + } // TODO: search the slotpage and get value from vlog struct slot_content sc; std::string vlog_value; slot_page_->get_slot(slot_num, &sc); - vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); + vlog_set_->get_value(sc, &vlog_value); *value = vlog_value; return s; @@ -1255,13 +1259,13 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields) { - // TODO(begin): allocate slot_num in slotpage and put value in vlog // 将字段数组序列化 - std::string serialized_value = SerializeValue(fields); + std::string serialized_value; std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; size_t slot_num = slot_page_->alloc_slot(); + SerializeValue(fields, serialized_value, slot_num); struct slot_content sc; - vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value); + vlog_set_->put_value(sc, slot_num, serialized_value); slot_page_->set_slot(slot_num, &sc); char data[sizeof(size_t)]; @@ -1269,16 +1273,12 @@ Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, Slice slot_val(data, sizeof(data)); return DB::Put(opt, key, slot_val); - // TODO(end) } Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& val) { - // TODO: allocate slot_num in slotpage and put value in vlog - size_t slot_num = slot_page_->alloc_slot(); -// std::string slot_num_str((char *)&slot_num, sizeof(size_t)); struct slot_content sc; - vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val); + vlog_set_->put_value(sc, slot_num, val); slot_page_->set_slot(slot_num, &sc); char data[sizeof(size_t)]; @@ -1291,10 +1291,14 @@ Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& val) Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { size_t slot_num; auto s = get_slot_num(ReadOptions(), key, &slot_num); + if (!s.ok()) { + return s; + } struct slot_content sc; slot_page_->get_slot(slot_num, &sc); - vlog_set_->del_value(sc.vlog_num, sc.value_offset); + vlog_set_->del_value(sc); + slot_page_->dealloc_slot(slot_num); return DB::Delete(options, key); } @@ -1579,88 +1583,72 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -// Todo(begin) -// 反序列化函数,将字符串解码为字段数组 -FieldArray DBImpl::DeserializeValue(const std::string& value_str) { - // 存放解析后的字段数组 - FieldArray fields; - // 将输入字符串转换为输入流 iss, 方便读取 - std::istringstream iss(value_str); - std::string content; - // 临时存放读取的数据 - char buffer[100]; - // 读取长度(定长,16比特) - iss.read(buffer, 16); - buffer[16] = '\0'; - size_t total_length = std::stoi(buffer); - // std::cout << "读取到的总长度为: " << total_length << std::endl; - std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length); - // std::cout << value_content << std::endl; - std::istringstream iss_content(value_content); - iss_content.read(buffer, sizeof(size_t)); - buffer[sizeof(size_t)] = '\0'; - std::string slot_num = buffer; - // 读取属性个数 - iss_content.read(buffer, 16); - // 在第17个比特位处添加终结符,确保字符串以终结符结尾 - buffer[16] = '\0'; - // 将 buffer 中的内容转化为整数并赋值给 field_count - int field_count = std::stoi(buffer); - // std::cout << "读取到的字段个数为: " << field_count << std::endl; - - for (int i = 0; i < field_count; ++i) { - Field field; - // 读取属性名长度(定长,16比特) - iss_content.read(buffer, 16); - buffer[16] = '\0'; - int name_length = std::stoi(buffer); - // std::cout << "读取到的属性名长度为: " << name_length << std::endl; - // 读取属性名(变长) - field.name.resize(name_length); - iss_content.read(&field.name[0], name_length); - // std::cout << "读取到的属性名为: " << field.name << std::endl; - // 读取属性值长度(定长,16比特) - iss_content.read(buffer, 16); - buffer[16] = '\0'; - int value_length = std::stoi(buffer); - // std::cout << "读取到的属性值长度为: " << value_length << std::endl; - // 读取属性值(变长) - field.value.resize(value_length); - iss_content.read(&field.value[0], value_length); - // std::cout << "读取到的属性值为: " << field.value << std::endl; - fields.push_back(field); - } - return fields; + +//using FieldArray = std::vector; +//struct Field { std::string name; std::string value; }; +void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) { + // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | + // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | + const char *value_data = value_str.c_str(); + const size_t value_len = value_str.size(); + size_t attr_off = sizeof(uint16_t); + + while (attr_off < value_len) { + uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off); + attr_off += sizeof(uint8_t); + auto attr_name = std::string(value_data+attr_off, attr_name_len); + attr_off += attr_name_len; + + uint16_t attr_len = *(uint16_t *)(value_data+attr_off); + attr_off += sizeof(uint16_t); + auto attr_value = std::string(value_data+attr_off, attr_len); + attr_off += attr_len; + + fields.push_back({attr_name, attr_value}); + } + + assert(attr_off == value_len); } -// Todo(end) -// Todo(begin) -// 序列化函数,将字段数组序列化为字符串 -std::string DBImpl::SerializeValue(const FieldArray& fields) { - // 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串 - std::ostringstream oss_temp; - std::string slot_num = "slot_num"; - oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num; - // 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中 - oss_temp << std::setw(16) << std::setfill('0') << fields.size(); + +// 序列化函数,将字段数组序列化为字符串+ +//using FieldArray = std::vector; +//struct Field { std::string name; std::string value; }; +void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num) { + // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | + // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | + std::string tmp_value; + uint16_t value_size = sizeof(uint16_t); + uint16_t field_nums = 0; for (const auto& field : fields) { - // 写入属性名长度(定长,16比特) - oss_temp << std::setw(16) << std::setfill('0') << field.name.size(); - // 写入属性名(变长) - oss_temp << field.name; - // 写入属性值长度(定长,16比特) - oss_temp << std::setw(16) << std::setfill('0') << field.value.size(); - // 写入属性值(变长) - oss_temp << field.value; - } - std::string temp_str = oss_temp.str(); - size_t value_length = temp_str.size(); - - std::ostringstream oss; - oss << std::setw(16) << std::setfill('0') << value_length; - oss << temp_str; - return oss.str(); + const uint8_t attr_name_len = field.name.size(); + const uint16_t attr_value_len = field.value.size(); + const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t); + char attr_data[attr_size]; + + size_t off = 0; + memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t)); + off += sizeof(uint8_t); + memcpy(attr_data+off, field.name.c_str(), attr_name_len); + off += attr_name_len; + memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t)); + off += sizeof(uint16_t); + memcpy(attr_data+off, field.value.c_str(), attr_value_len); + off += attr_value_len; + + assert(off == attr_size); + tmp_value += std::string(attr_data, attr_size); + value_size += attr_size; + field_nums ++; + } + + char value_data[value_size]; + memcpy(value_data, &value_size, sizeof(uint16_t)); + memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size()); + + assert(sizeof(uint16_t) + tmp_value.size() == value_size); + value = std::string(value_data, value_size); } -// Todo(end) + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 1f77d07..68c8320 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -53,7 +53,7 @@ class DBImpl : public DB { std::string* value) override; // Todo(begin) Status Get_Fields(const ReadOptions& options, const Slice& key, - FieldArray* fields) override; + FieldArray& fields) override; // Todo(end) Iterator* NewIterator(const ReadOptions&) override; const Snapshot* GetSnapshot() override; @@ -91,8 +91,8 @@ class DBImpl : public DB { // TODO(begin) SlotPage *slot_page_; VlogSet *vlog_set_; - static std::string SerializeValue(const FieldArray& fields); - static FieldArray DeserializeValue(const std::string& value_str); + static void SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num); + static void DeserializeValue(FieldArray& fields, const std::string& value_str); // TODO(end) // Information for a manual compaction struct ManualCompaction { diff --git a/db/gc_executor.cpp b/db/gc_executor.cpp new file mode 100644 index 0000000..2091e9b --- /dev/null +++ b/db/gc_executor.cpp @@ -0,0 +1,74 @@ +// +// Created by 马也驰 on 2025/1/3. +// + +#include "gc_executor.h" +#include "vlog_set.h" +#include "vlog_gc.h" + + + void gc_executor::exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) { + auto vlog_set = vlog_gc_->vlog_set; + auto slot_page_ = vlog_gc_->slot_page_; + + vlog_set->mtx.lock(); + auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); + auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); + auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); + auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); + auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); + auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); + + old_vlog_info->vlog_info_latch_.lock(); + old_vlog_handler->vlog_latch_.soft_lock(); + new_vlog_info->vlog_info_latch_.lock(); + new_vlog_handler->vlog_latch_.hard_lock(); + vlog_set->mtx.unlock(); + + auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); + auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); + + char old_vlog_buff[VLOG_SIZE]; + char new_vlog_buff[VLOG_SIZE]; + old_vlog.seekp(0); + old_vlog.read(old_vlog_buff, VLOG_SIZE); + + size_t value_nums = old_vlog_info->value_nums; + size_t ovb_off = 2 * sizeof(size_t); + size_t nvb_off = 2 * sizeof(size_t); + size_t new_vlog_value_nums = 0; + for (auto i = 0; i < value_nums; i++) { + char *value = &old_vlog_buff[ovb_off]; + uint16_t value_len = get_value_len(value); + size_t slot_num = get_value_slotnum(value); + if (!value_deleted(value_len)) { + memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); + memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t)); + struct slot_content scn(new_vlog_info->vlog_num, nvb_off); + slot_page_->set_slot(slot_num, &scn); + nvb_off += value_len; + new_vlog_value_nums ++; + } + ovb_off += value_len; + } + new_vlog_info->value_nums = new_vlog_value_nums; + new_vlog_info->curr_size = nvb_off; + memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); + memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); + new_vlog.seekp(0); + new_vlog.write(new_vlog_buff, VLOG_SIZE); + new_vlog.flush(); + + old_vlog.close(); + new_vlog.close(); + + old_vlog_info->vlog_valid_ = false; + vlog_set->remove_old_vlog(old_vlog_num); + + old_vlog_info->vlog_info_latch_.unlock(); + old_vlog_handler->vlog_latch_.soft_unlock(); + new_vlog_info->vlog_info_latch_.unlock(); + new_vlog_handler->vlog_latch_.hard_unlock(); + +// vlog_gc_->gc_counter_decrement(); +} diff --git a/db/gc_executor.h b/db/gc_executor.h new file mode 100644 index 0000000..3c5af50 --- /dev/null +++ b/db/gc_executor.h @@ -0,0 +1,40 @@ +// +// Created by 马也驰 on 2025/1/3. +// + +#ifndef LEVELDB_GC_EXECUTOR_H +#define LEVELDB_GC_EXECUTOR_H + +#include +#include + + +class VlogGC; + +class gc_executor { + public: + explicit gc_executor() {} + + ~gc_executor() = default; + + public: + static void exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num); + + private: + static inline bool value_deleted(uint16_t value_len) { + return !(value_len >> 15); + } + static inline uint16_t get_value_len(char *value) { + uint16_t value_len; + memcpy(&value_len, value, sizeof(uint16_t)); + return value_len; + } + static inline size_t get_value_slotnum(char *value) { + size_t slot_num; + memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); + return slot_num; + } + +}; + +#endif // LEVELDB_GC_EXECUTOR_H diff --git a/db/shared_lock.h b/db/shared_lock.h index 0d90490..083a1a7 100644 --- a/db/shared_lock.h +++ b/db/shared_lock.h @@ -45,8 +45,7 @@ class SharedLock { std::mutex read_latch_; // indicate if any read is proceeding std::mutex read_count_latch_; volatile size_t read_count_; - std::mutex write_latch_; - // volatile size_t write_count_; + std::mutex write_latch_; // indicate if any write is proceeding }; #endif // LEVELDB_SHARED_LOCK_H diff --git a/db/slotpage.h b/db/slotpage.h index 023ce60..b32fcd4 100644 --- a/db/slotpage.h +++ b/db/slotpage.h @@ -15,6 +15,8 @@ #include #include +// bitmap: || bitmap_size(size_t) | first_empty_slot(size_t) || bits... | + #define BITMAP_SIZE 8192 #define BITS_PER_BYTE 8 @@ -183,6 +185,69 @@ public: first_empty_slot = 0; } + BitMap(const std::string &dbname) { + name = dbname + "_bitmap"; + auto bitmap_handler = std::fstream(name, std::ios::in | std::ios::out); + if (!bitmap_handler.is_open()) { + // 文件不存在,尝试创建 + bitmap_handler = std::fstream(name, std::ios::out); + bitmap_handler.close(); + // 重新以读写模式打开 + bitmap_handler = std::fstream(name, std::ios::in | std::ios::out); + bitmap_handler.seekp(0); + size_t tmp[2] = {2*sizeof(size_t), 0}; + bitmap_handler.write(reinterpret_cast(tmp), sizeof(tmp)); + bitmap_handler.close(); + + // init bitmap + char *bitmap = static_cast(malloc(BITMAP_SIZE*sizeof(char))); + memset(bitmap, 0, BITMAP_SIZE * sizeof(char)); + bitmaps_.push_back(bitmap); + size = BITMAP_SIZE; + first_empty_slot = 0; + } else { + // read in bitmap content + size_t bitmap_header[2]; + bitmap_handler.seekp(0); + bitmap_handler.read(reinterpret_cast(bitmap_header), 2*sizeof(size_t)); + this->first_empty_slot = bitmap_header[1]; + this->size = bitmap_header[0]; + + const size_t page_num = (size - 2*sizeof(size_t)) / BITMAP_SIZE; + for (auto i = 0; i < page_num; i++) { + char *bitmap = static_cast(malloc(BITMAP_SIZE*sizeof(char))); + bitmap_handler.seekp(2*sizeof(size_t) + i*BITMAP_SIZE); + bitmap_handler.read(bitmap, BITMAP_SIZE); + bitmaps_.push_back(bitmap); + } + + // init bitmap + if (page_num == 0) { + char *bitmap = static_cast(malloc(BITMAP_SIZE*sizeof(char))); + memset(bitmap, 0, BITMAP_SIZE * sizeof(char)); + bitmaps_.push_back(bitmap); + size = BITMAP_SIZE; + first_empty_slot = 0; + } + } + } + + ~BitMap() { + auto bitmap_handler = std::fstream(name, std::ios::in | std::ios::out); + assert(bitmap_handler.is_open()); + size_t tmp[2] = {size, first_empty_slot}; + bitmap_handler.seekp(0); + bitmap_handler.write(reinterpret_cast(tmp), sizeof(tmp)); + size_t off = 2 * sizeof(size_t); + for (auto bitmap : bitmaps_) { + bitmap_handler.seekp(off); + bitmap_handler.write(bitmap, BITMAP_SIZE); + off += BITMAP_SIZE; + } + assert(off == size); + bitmap_handler.flush(); + } + /** methods for test **/ void show_allocated_slot() { for (int i = 0; i < this->size; i++) { @@ -284,6 +349,7 @@ private: } private: + std::string name; std::vector bitmaps_; size_t size; size_t first_empty_slot; @@ -299,7 +365,7 @@ class SlotPage { public: SlotPage(const std::string &dbname) { slotpage_fname = slotpage_handler_name(dbname); - bitmap = new BitMap(); + bitmap = new BitMap(dbname); slotcache = new SlotCache(slotpage_fname); assert(slotcache); } diff --git a/db/threadpool.h b/db/threadpool.h deleted file mode 100644 index 00d861d..0000000 --- a/db/threadpool.h +++ /dev/null @@ -1,67 +0,0 @@ -// -// Created by 马也驰 on 2024/12/28. -// - -#ifndef LEVELDB_THREADPOOL_H -#define LEVELDB_THREADPOOL_H - - -#include -#include -#include -#include -#include -#include -#include - -class ThreadPool { - public: - explicit ThreadPool(size_t numThreads) : stop(false) { - for (size_t i = 0; i < numThreads; ++i) { - workers.emplace_back([this]() { - while (true) { - std::function task; - { - std::unique_lock lock(queueMutex); - condition.wait(lock, [this]() { return stop || !tasks.empty(); }); - - if (stop && tasks.empty()) return; - - task = std::move(tasks.front()); - tasks.pop(); - } - task(); - } - }); - } - } - - ~ThreadPool() { - { - std::unique_lock lock(queueMutex); - stop = true; - } - condition.notify_all(); - for (std::thread &worker : workers) { - worker.join(); - } - } - - void enqueue(std::function task) { - { - std::unique_lock lock(queueMutex); - tasks.push(std::move(task)); - } - condition.notify_one(); - } - - private: - std::vector workers; - std::queue> tasks; - std::mutex queueMutex; - std::condition_variable condition; - std::atomic stop; -}; - - -#endif // LEVELDB_THREADPOOL_H diff --git a/db/vlog.h b/db/vlog.h index b48d72b..9c504df 100644 --- a/db/vlog.h +++ b/db/vlog.h @@ -12,20 +12,23 @@ #include -#include "../db/threadpool.h" #include "../include/leveldb/slice.h" #include "../db/slotpage.h" #include "../db/shared_lock.h" -// config file: | vlog_nums_(size_t) | vlog_1_name | vlog_2_name | ... | -// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... | -// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) | value | +// config file: || vlog_nums_(size_t) || vlog_1_num(size_t) | vlog_2_num(size_t) | ... | +// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... | +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | + +// {field_nums(uint16_t), attr1, attr2, ... } is wrapped/unwrapped in dm_impl +// || value_size(uint16_t) | slot_num(size_t) || VALUE: {field_nums(uint16_t), attr1, attr2, ... } | is wrapped/unwrapped in vlog_set + #define vlog_num_size sizeof(size_t) -#define vlog_name_size 256 #define KiB 1024 #define MiB 1024*KiB -#define VLOG_SIZE 32*MiB +#define VLOG_SIZE 1*MiB // origin: 32*MiB #define GC_THREDHOLD 0.5 #define VALUE_BUFF_SIZE 0x7fff // value size cannot exceed this number @@ -46,18 +49,22 @@ // } //}; + struct vlog_info { std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改 size_t vlog_num; size_t vlog_num_for_gc; // set when start gc - bool vlog_valid_; + bool processing_gc;; // init to be false, set as true when processing gc + bool vlog_valid_; // init to be true, set as false after deleted size_t discard; size_t value_nums; size_t curr_size; - vlog_info(size_t vlog_num) : vlog_valid_(true), discard(0), value_nums(0), - vlog_num(vlog_num), curr_size(2*sizeof(size_t)) {} - vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : vlog_valid_(true), discard(0), value_nums(value_nums), - vlog_num(vlog_num), curr_size(curr_size) {} + vlog_info(size_t vlog_num) : processing_gc(false), discard(0), value_nums(0), + vlog_num(vlog_num), curr_size(2*sizeof(size_t)), vlog_valid_(true) {} + vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : processing_gc(false), + discard(0), value_nums(value_nums), + vlog_num(vlog_num), curr_size(curr_size), + vlog_valid_(true) {} }; struct vlog_handler { diff --git a/db/vlog_gc.cpp b/db/vlog_gc.cpp index 8555b63..d7bdca0 100644 --- a/db/vlog_gc.cpp +++ b/db/vlog_gc.cpp @@ -4,65 +4,196 @@ #include "vlog_gc.h" #include "../db/vlog_set.h" +#include +#include + + +struct executor_param { + VlogGC *vg; + size_t old_vlog_num; + size_t new_vlog_num; +}; + +std::mutex map_latch_; +std::unordered_map executor_params_map_; +std::unordered_map vlog_gc_map_; + + +void test_func() { + int nums[4] = {1, 2, 3, 4}; + for (int i = 0; i < 4; i++) { + nums[i] ++; + } +} + +void add_executor_params(size_t gc_num, struct executor_param &ep) { + map_latch_.lock(); + executor_params_map_[gc_num] = ep; + map_latch_.unlock(); +} + +struct executor_param get_executor_params(size_t gc_num) { + map_latch_.lock(); + auto ep = executor_params_map_[gc_num]; + map_latch_.unlock(); + return ep; +} + +void del_executor_params(size_t gc_num) { + map_latch_.lock(); + executor_params_map_.erase(gc_num); + map_latch_.unlock(); +} + +void add_vlog_gc(size_t gc_num, VlogGC *vg) { + map_latch_.lock(); + vlog_gc_map_[gc_num] = vg; + map_latch_.unlock(); +} + +VlogGC * get_vlog_gc(size_t gc_num) { + map_latch_.lock(); + auto vg = vlog_gc_map_[gc_num]; + map_latch_.unlock(); + return vg; +} + +void del_vlog_gc(size_t gc_num) { + map_latch_.lock(); + vlog_gc_map_.erase(gc_num); + map_latch_.unlock(); +} + +// 函数:增加 counter +void VlogGC::gc_counter_increment() { + vlog_set->counter_latch_.lock(); + ++vlog_set->counter; // 增加 counter + std::cout << "Increment: Counter incremented to " << vlog_set->counter << "\n"; + vlog_set->counter_latch_.unlock(); +} + +// 函数:减少 counter +void VlogGC::gc_counter_decrement() { + vlog_set->counter_latch_.lock(); + --vlog_set->counter; // 减少 counter + std::cout << "Decrement: Counter decremented to " << vlog_set->counter << "\n"; + + if (vlog_set->counter == 0) { + // 如果 counter 为 0 + vlog_set->finished_latch_.lock(); + vlog_set->finished = true; // 设置完成标志 + vlog_set->finished_latch_.unlock(); + } + vlog_set->counter_latch_.unlock(); +} void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { - thread_pool_->enqueue([this, old_vlog_num, new_vlog_num]() - { this->exec_gc(old_vlog_num, new_vlog_num); }); + gc_counter_increment(); + size_t _gc_num_ = get_gc_num(); + + struct executor_param ep = {this, old_vlog_num, new_vlog_num}; + add_executor_params(_gc_num_, ep); + add_vlog_gc(_gc_num_, this); + + std::thread gc_thread([_gc_num_]() mutable { + auto _vlog_gc_ = get_vlog_gc(_gc_num_); + assert(_vlog_gc_ != nullptr); + _vlog_gc_->exec_gc(_gc_num_); + }); + } -void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { - auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); - auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); - auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); - auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); - auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); - auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); - - old_vlog_handler->vlog_latch_.soft_lock(); - new_vlog_info->vlog_info_latch_.lock(); - new_vlog_handler->vlog_latch_.hard_lock(); - - auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); - auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); - - char old_vlog_buff[VLOG_SIZE]; - char new_vlog_buff[VLOG_SIZE]; - old_vlog.seekp(0); - old_vlog.read(old_vlog_buff, VLOG_SIZE); -// memcpy(new_vlog_buff, &value_nums, sizeof(size_t)); - - size_t value_nums = old_vlog_info->value_nums; - size_t ovb_off = 2 * sizeof(size_t); - size_t nvb_off = 2 * sizeof(size_t); - size_t new_vlog_value_nums = 0; - for (auto i = 0; i < value_nums; i++) { - char *value = &old_vlog_buff[ovb_off]; - uint16_t value_len = get_value_len(value); - size_t slot_num = get_value_slotnum(value); - if (!value_deleted(value_len)) { - memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); - struct slot_content scn(new_vlog_info->vlog_num, nvb_off); - slot_page_->set_slot(slot_num, &scn); - nvb_off += value_len; - new_vlog_value_nums ++; - } - ovb_off += value_len; + +void VlogGC::exec_gc(size_t gc_num_) { + // FIXME: might break, due to unknown concurrency problem + curr_thread_nums_latch_.lock(); + curr_thread_nums_ ++; + if (curr_thread_nums_ >= max_thread_nums_) { + full_latch_.lock(); } - new_vlog_info->value_nums = value_nums; - new_vlog_info->curr_size = nvb_off; - memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); - memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); - new_vlog.write(new_vlog_buff, VLOG_SIZE); - new_vlog.flush(); + curr_thread_nums_latch_.unlock(); + - old_vlog.close(); - new_vlog.close(); + // start gc process + auto ep = get_executor_params(gc_num_); + gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); +// test_func(); - old_vlog_handler->vlog_latch_.soft_unlock(); - new_vlog_info->vlog_info_latch_.unlock(); - new_vlog_handler->vlog_latch_.hard_unlock(); + curr_thread_nums_latch_.lock(); + if (curr_thread_nums_ >= max_thread_nums_) { + full_latch_.unlock(); + } + curr_thread_nums_ --; + curr_thread_nums_latch_.unlock(); - vlog_set->remove_old_vlog(old_vlog_num); + gc_counter_decrement(); + del_executor_params(gc_num_); + del_vlog_gc(gc_num_); } +// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... | +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +//void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { +// vlog_set->mtx.lock(); +// auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); +// auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); +// auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); +// auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); +// auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); +// auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); +// +// old_vlog_info->vlog_info_latch_.lock(); +// old_vlog_handler->vlog_latch_.soft_lock(); +// new_vlog_info->vlog_info_latch_.lock(); +// new_vlog_handler->vlog_latch_.hard_lock(); +// vlog_set->mtx.unlock(); +// +// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); +// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); +// +// char old_vlog_buff[VLOG_SIZE]; +// char new_vlog_buff[VLOG_SIZE]; +// old_vlog.seekp(0); +// old_vlog.read(old_vlog_buff, VLOG_SIZE); +// +// size_t value_nums = old_vlog_info->value_nums; +// size_t ovb_off = 2 * sizeof(size_t); +// size_t nvb_off = 2 * sizeof(size_t); +// size_t new_vlog_value_nums = 0; +// for (auto i = 0; i < value_nums; i++) { +// char *value = &old_vlog_buff[ovb_off]; +// uint16_t value_len = get_value_len(value); +// size_t slot_num = get_value_slotnum(value); +// if (!value_deleted(value_len)) { +// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); +// memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t)); +// struct slot_content scn(new_vlog_info->vlog_num, nvb_off); +// slot_page_->set_slot(slot_num, &scn); +// nvb_off += value_len; +// new_vlog_value_nums ++; +// } +// ovb_off += value_len; +// } +// new_vlog_info->value_nums = new_vlog_value_nums; +// new_vlog_info->curr_size = nvb_off; +// memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); +// memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); +// new_vlog.seekp(0); +// new_vlog.write(new_vlog_buff, VLOG_SIZE); +// new_vlog.flush(); +// +// old_vlog.close(); +// new_vlog.close(); +// +// old_vlog_info->vlog_valid_ = false; +// vlog_set->remove_old_vlog(old_vlog_num); +// +// old_vlog_info->vlog_info_latch_.unlock(); +// old_vlog_handler->vlog_latch_.soft_unlock(); +// new_vlog_info->vlog_info_latch_.unlock(); +// new_vlog_handler->vlog_latch_.hard_unlock(); +// +//} +// diff --git a/db/vlog_gc.h b/db/vlog_gc.h index cbd1308..e49c197 100644 --- a/db/vlog_gc.h +++ b/db/vlog_gc.h @@ -9,19 +9,27 @@ #include #include #include "../db/slotpage.h" -#include "../db/threadpool.h" #include "../db/vlog.h" +#include "../db/gc_executor.h" // 前向声明 VlogSet class VlogSet; class VlogGC { +#define THREAD_NUM 8 +friend class gc_executor; + public: - VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs) {} + VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs),max_thread_nums_(THREAD_NUM), + curr_thread_nums_(0), gc_num(0) {} + ~VlogGC() {} + void do_gc(size_t old_vlog_num, size_t new_vlog_num); private: - void exec_gc(size_t old_vlog_num, size_t new_vlog_num); + void exec_gc(size_t gc_num_); + void gc_counter_increment(); + void gc_counter_decrement(); inline bool value_deleted(uint16_t value_len) { return !(value_len >> 15); @@ -36,10 +44,24 @@ class VlogGC { memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); return slot_num; } + inline size_t get_gc_num() { + gc_num_latch_.lock(); + size_t _gc_num_ = gc_num ++; + gc_num_latch_.unlock(); + return _gc_num_; + } SlotPage *slot_page_; - ThreadPool *thread_pool_; VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp + + // NOTE: threadpool + std::mutex full_latch_; // indicate thread pool is full when set as locked + size_t max_thread_nums_; + std::mutex curr_thread_nums_latch_; + size_t curr_thread_nums_; + + std::mutex gc_num_latch_; + size_t gc_num; }; diff --git a/db/vlog_set.cpp b/db/vlog_set.cpp index 84793bf..0f4ed8f 100644 --- a/db/vlog_set.cpp +++ b/db/vlog_set.cpp @@ -4,6 +4,7 @@ #include "vlog_set.h" #include "../db/vlog_gc.h" +#include // config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... | @@ -53,7 +54,6 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( } } - if (!this->config_file_->is_open()) { std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; std::exit(EXIT_FAILURE); @@ -61,6 +61,18 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( } VlogSet::~VlogSet() { + // wait for all gc threads to finish + while (true) { + finished_latch_.lock(); + if (!finished) { + finished_latch_.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } else { + finished_latch_.unlock(); + break; + } + } + config_file_->seekp(0); config_file_->write(reinterpret_cast(&vlog_nums_), sizeof(size_t)); config_file_->flush(); @@ -69,6 +81,7 @@ VlogSet::~VlogSet() { for (auto & it : vlog_info_map_) { struct vlog_info* vinfo = it.second; + // 所有操作都已经同步完成 // 使用 vlog_info* 进行操作 size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); @@ -82,20 +95,22 @@ VlogSet::~VlogSet() { } -void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +void VlogSet::get_value(const struct slot_content &sc, std::string *value) { mtx.lock(); - auto vinfo = get_vlog_info(vlog_num); - auto vhandler = get_vlog_handler(vlog_num); + auto vinfo = get_vlog_info(sc.vlog_num); + auto vhandler = get_vlog_handler(sc.vlog_num); vinfo->vlog_info_latch_.lock(); if (!vinfo->vlog_valid_) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } vhandler->vlog_latch_.soft_lock(); + mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); - read_vlog_value(vlog_num, value_offset, value); + read_vlog_value(sc, value); vhandler->vlog_latch_.soft_unlock(); - mtx.unlock(); } struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { @@ -103,16 +118,21 @@ struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { struct vlog_info* vinfo = it->second; // 使用 vlog_info* 进行操作 - if (vinfo->curr_size + value_size <= VLOG_SIZE) { + vinfo->vlog_info_latch_.lock(); + if (vinfo->vlog_valid_ && !vinfo->processing_gc && vinfo->curr_size+value_size <= VLOG_SIZE) { + vinfo->vlog_info_latch_.unlock(); return vinfo; } + vinfo->vlog_info_latch_.unlock(); } // 所有vlog已满,创建新vlog return nullptr; } -void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { mtx.lock(); auto vinfo = get_writable_vlog_info(value.size()); @@ -123,82 +143,69 @@ void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveld } vinfo->vlog_info_latch_.lock(); - *vlog_num = vinfo->vlog_num; - *value_offset = vinfo->curr_size; - vinfo->curr_size += value.size(); + sc.vlog_num = vinfo->vlog_num; + sc.value_offset = vinfo->curr_size; + vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t); vinfo->value_nums ++; - vinfo->vlog_info_latch_.unlock(); - - auto vhandler = get_vlog_handler(*vlog_num); - vinfo->vlog_info_latch_.lock(); - if (!vinfo->vlog_valid_) { + auto vhandler = get_vlog_handler(vinfo->vlog_num); + if (!vinfo->vlog_valid_ || vinfo->processing_gc) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); } + vhandler->vlog_latch_.hard_lock(); + mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); - write_vlog_value(*vlog_num, *value_offset, value); + write_vlog_value(sc, slot_num, value); vhandler->vlog_latch_.hard_unlock(); - mtx.unlock(); } -void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { +void VlogSet::del_value(const struct slot_content &sc) { mtx.lock(); - auto vinfo = get_vlog_info(vlog_num); - auto vhandler = get_vlog_handler(vlog_num); + auto vinfo = get_vlog_info(sc.vlog_num); + auto vhandler = get_vlog_handler(sc.vlog_num); vinfo->vlog_info_latch_.lock(); - if (!vinfo->vlog_valid_) { -// auto vlog_num_gc = vinfo->vlog_num_for_gc; + if (!vinfo->vlog_valid_ || vinfo->processing_gc) { vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); -// vinfo->vlog_info_latch_.unlock(); -// vinfo = get_vlog_info(vlog_num_gc); -// vinfo->vlog_info_latch_.lock(); } - vhandler->vlog_latch_.hard_lock(); - mark_del_value(vlog_num, value_offset); + vhandler->vlog_latch_.hard_lock(); + mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); + mark_del_value(sc); vhandler->vlog_latch_.hard_unlock(); - mtx.unlock(); } size_t VlogSet::register_new_vlog() { + // FIXME: concurrency on config file size_t vn = vlog_nums_; std::string vlog_name = get_vlog_name(vn); register_inconfig_file(vn); create_vlog(vn); -// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); -// if (!vlog_new->is_open()) { -// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; -// std::exit(EXIT_FAILURE); -// } register_vlog_inmaps(vn, vlog_name); vlog_nums_ ++; -// vlog_count_ ++; return vn; } +// only used in vlog gc void VlogSet::remove_old_vlog(size_t old_vlog_num) { // after gc, new_vlog has been created - // new vlog should have been created here ! + // new vlog should have been created here mtx.lock(); auto vi_old = get_vlog_info(old_vlog_num); vi_old->vlog_info_latch_.lock(); std::string old_vlog_name = get_vlog_name(old_vlog_num); remove_from_config_file(old_vlog_num); - remove_vlog_from_maps(old_vlog_name); +// remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!! vi_old->vlog_valid_ = false; vi_old->vlog_info_latch_.unlock(); -// vlog_count_ --; mtx.unlock(); } bool VlogSet::vlog_need_gc(size_t vlog_num) { - // FIXME: vlog应该已经满了才行 - std::string vlog_name = get_vlog_name(vlog_num); - auto vi = vlog_info_map_[vlog_name]; - if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) { + auto vi = get_vlog_info(vlog_num); + if ((double)(vi->curr_size*1.0)/VLOG_SIZE < VLOG_GC_THREHOLD) { return false; } bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); @@ -209,12 +216,16 @@ void VlogSet::register_inconfig_file(size_t vlog_num) { // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... | // first size_t in config file indicates current vlog_nums_(size_t) // second size_t in config file indicates current vlog_count_(size_t) + config_file_latch_.lock(); config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); config_file_->write(reinterpret_cast(&vlog_num), sizeof(size_t)); config_file_->flush(); + config_file_latch_.unlock(); } void VlogSet::remove_from_config_file(size_t vlog_num) { + // FIXME: concurrency on config file + config_file_latch_.lock(); char tmp[vlog_nums_*vlog_num_size]; config_file_->seekp(sizeof(size_t)); config_file_->read(tmp, sizeof(tmp)); @@ -229,6 +240,7 @@ void VlogSet::remove_from_config_file(size_t vlog_num) { break; } } + config_file_latch_.unlock(); } void VlogSet::create_vlog(size_t vlog_num) { @@ -264,14 +276,8 @@ inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_nam } inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { - auto vi = vlog_info_map_[vlog_name]; - // auto vh = vlog_handler_map_[vlog_name]; - vi->vlog_info_latch_.lock(); - vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag - // vh->handler_->close(); assert(!std::remove(vlog_name.c_str())); vlog_handler_map_.erase(vlog_name); - vi->vlog_info_latch_.unlock(); } inline std::string VlogSet::get_config_file_name() { @@ -290,42 +296,57 @@ struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { return vlog_handler_map_[get_vlog_name(vlog_num)]; } -// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) | value | -void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { - auto vlog_name = get_vlog_name(vlog_num); +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) { + auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); - handler.seekp(value_offset); + handler.seekp(sc.value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); - // FIXME: remove value size uint16_t value_size; memcpy(&value_size, value_buff, sizeof(uint16_t)); + if (value_size & VALUE_DELE_MASK) { + *value = ""; + return ; + } value_size &= VALUE_SIZE_MASK; -// *value = std::string(value_buff); assert(value_size <= VALUE_BUFF_SIZE); - value->assign(&value_buff[sizeof(uint16_t)], value_size); + + const size_t off = sizeof(uint16_t)+sizeof(size_t); + *value = std::string(&value_buff[off], value_size-off); handler.close(); } -void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { - auto vlog_name = get_vlog_name(vlog_num); +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { + auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); - handler.seekp(value_offset); + handler.seekp(sc.value_offset); const char *value_buff = value.data(); - handler.write(value_buff, value.size()); - auto vinfo = get_vlog_info(vlog_num); + const size_t off = sizeof(uint16_t) + sizeof(size_t); + const size_t value_size = off + value.size(); + char data[value_size]; + memcpy(data, &value_size, sizeof(uint16_t)); + memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); + memcpy(data+off, value_buff, value.size()); + + handler.write(data, value_size); handler.flush(); handler.close(); } -void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { - auto vinfo = get_vlog_info(vlog_num); - auto vlog_name = get_vlog_name(vlog_num); +// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | +// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | +void VlogSet::mark_del_value(const struct slot_content &sc) { + auto vinfo = get_vlog_info(sc.vlog_num); + auto vlog_name = get_vlog_name(sc.vlog_num); auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); - handler.seekp(value_offset); + handler.seekp(sc.value_offset); char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); @@ -337,22 +358,22 @@ void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { return ; } assert(!(value_size & VALUE_DELE_MASK)); - uint16_t masked_value_size = value_size | VALUE_DELE_MASK; + uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK; memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); - handler.write(value_buff, value_size); + handler.seekp(sc.value_offset); + handler.write(value_buff, sizeof(uint16_t)); handler.flush(); handler.close(); // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too - // auto vinfo = get_vlog_info(vlog_num); vinfo->discard ++; vinfo->value_nums --; - vinfo->curr_size -= value_size & VALUE_SIZE_MASK; - if (vlog_need_gc(vlog_num)) { + vinfo->curr_size -= value_size; + // FIXME: gc process + if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) { // create new vlog - vinfo->vlog_valid_ = false; + vinfo->processing_gc = true; vinfo->vlog_num_for_gc = register_new_vlog(); - // vinfo->vlog_valid_ = false; - vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); + vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc); } } diff --git a/db/vlog_set.h b/db/vlog_set.h index a4f031c..dd44725 100644 --- a/db/vlog_set.h +++ b/db/vlog_set.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "../include/leveldb/slice.h" #include "../db/shared_lock.h" #include "../db/vlog.h" @@ -18,6 +19,7 @@ class VlogGC; class VlogSet { friend class VlogGC; +friend class gc_executor; #define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) #define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) @@ -25,9 +27,9 @@ friend class VlogGC; public: VlogSet(std::string dbname, VlogGC *vlog_gc); ~VlogSet(); - void get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); - void put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value); - void del_value(uint32_t vlog_num, uint32_t value_offset); + void get_value(const struct slot_content &sc, std::string *value); + void put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value); + void del_value(const struct slot_content &sc); void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; } @@ -47,19 +49,24 @@ friend class VlogGC; std::string get_vlog_name(size_t vlog_num); struct vlog_info *get_vlog_info(size_t vlog_num); struct vlog_handler *get_vlog_handler(size_t vlog_num); - void read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value); - void write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value); - void mark_del_value(uint32_t vlog_num, uint32_t value_offset); + void read_vlog_value(const struct slot_content &sc, std::string *value); + void write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value); + void mark_del_value(const struct slot_content &sc); private: std::mutex mtx; std::string dbname; size_t vlog_nums_; -// size_t vlog_count_; std::unordered_map vlog_info_map_; std::unordered_map vlog_handler_map_; + std::mutex config_file_latch_; std::fstream *config_file_; + int counter = 0; + std::mutex counter_latch_; + std::mutex finished_latch_; + bool finished = false; + VlogGC *vlog_gc; // 仅声明为指针,具体定义放在 vlog_set.cpp }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 82493de..6d739b4 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -156,7 +156,7 @@ class LEVELDB_EXPORT DB { // 字段信息结构体 // Todo(begin) virtual Status Put_Fields(const leveldb::WriteOptions& opt, const leveldb::Slice& key, const FieldArray& fields) = 0; - virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray* fields) = 0; + virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray& fields) = 0; // // Todo(end) }; diff --git a/test/db_test1.cc b/test/db_test1.cc index ade9372..7ce6779 100644 --- a/test/db_test1.cc +++ b/test/db_test1.cc @@ -4,11 +4,11 @@ using namespace std; using namespace leveldb; -int main() { +int test1() { DB* db = nullptr; Options op; op.create_if_missing = true; - Status status = DB::Open(op, "testdb", &db); + Status status = DB::Open(op, "testdb1", &db); assert(status.ok()); db->Put(WriteOptions(), "001", "leveldb"); string s; @@ -18,11 +18,53 @@ int main() { db->Put(WriteOptions(), "002", "world"); string s1; - db->Delete(WriteOptions(), "002"); + db->Delete(WriteOptions(), "002"); // sc: {0, 33} db->Get(ReadOptions(), "002", &s1); cout << s1.size() << endl; cout<Put(WriteOptions(), key, value); + string s; + db->Get(ReadOptions(), key, &s); + cout << s << " | " << s.size() << endl; + + } + + cout << endl << "all put are done" << endl << endl; + + for (auto i = 0; i < loop_times; i++) { + auto key = std::to_string(i); + auto value = value_prefix + key; + +// db->Put(WriteOptions(), key, value+"_"); + string s1; + db->Delete(WriteOptions(), key); // sc: {0, 33} + db->Get(ReadOptions(), key, &s1); + cout << s1 << " | " << s1.size() << " | " << i << endl; + } + + delete db; + return 0; +} + + +int main() { + test2(); } \ No newline at end of file