Browse Source

format

xxy
xxy 9 months ago
parent
commit
2a42349890
1 changed files with 32 additions and 107 deletions
  1. +32
    -107
      db/db_impl.cc

+ 32
- 107
db/db_impl.cc View File

@ -18,12 +18,12 @@
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <filesystem>
#include <fstream> #include <fstream>
#include <thread>
#include <iostream> #include <iostream>
#include <set> #include <set>
#include <string> #include <string>
#include <thread>
#include <vector> #include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
@ -39,7 +39,6 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include <filesystem>
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace leveldb { namespace leveldb {
@ -667,12 +666,10 @@ Status DBImpl::TEST_CompactMemTable() {
void DBImpl::TEST_GarbageCollect() { void DBImpl::TEST_GarbageCollect() {
MaybeScheduleGarbageCollect(); MaybeScheduleGarbageCollect();
// Finish current background compaction in the case where
// `background_work_finished_signal_` was signalled due to an error.
// Finish current background gc in the case where
while (background_garbage_collect_scheduled_) { while (background_garbage_collect_scheduled_) {
background_gc_finished_signal_.Wait(); background_gc_finished_signal_.Wait();
} }
// std::cout<<"bg_signal"<<std::endl;
} }
void DBImpl::RecordBackgroundError(const Status& s) { void DBImpl::RecordBackgroundError(const Status& s) {
@ -685,7 +682,7 @@ void DBImpl::RecordBackgroundError(const Status& s) {
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (background_compaction_scheduled_) { if (background_compaction_scheduled_) {
// Already scheduled // Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) { } else if (shutting_down_.load(std::memory_order_acquire)) {
@ -759,14 +756,12 @@ void DBImpl::BackgroundGarbageCollect() {
// Perform garbage collection here // Perform garbage collection here
GarbageCollect(); GarbageCollect();
gc_mutex_.Unlock(); gc_mutex_.Unlock();
} }
background_garbage_collect_scheduled_ = false; background_garbage_collect_scheduled_ = false;
// Notify any waiting threads // Notify any waiting threads
background_gc_finished_signal_.SignalAll(); background_gc_finished_signal_.SignalAll();
} }
void DBImpl::BackgroundCompaction() { void DBImpl::BackgroundCompaction() {
@ -1226,8 +1221,8 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref(); mem->Unref();
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
if(!s.ok())return s;
if(options.find_value_log_for_gc){
if (!s.ok()) return s;
if (options.find_value_log_for_gc) {
return s; return s;
} }
@ -1244,8 +1239,8 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (!res) return Status::Corruption("can't decode file id"); if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset); res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset"); if (!res) return Status::Corruption("can't decode valuelog offset");
s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
if(!s.ok()){
s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
if (!s.ok()) {
return s; return s;
} }
*value = std::string(new_value.data(), new_value.size()); *value = std::string(new_value.data(), new_value.size());
@ -1572,26 +1567,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// DBImpl::WriteValueLog(std::vector<Slice> values){
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
// if (!valueFile.is_open()) {
// assert(0);
// }
// uint64_t offset=valueFile.tellp();
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valueFile.write(values[i].data(),len);
// res.push_back({valuelogfile_number_,{offset,len}});
// offset+=len;
// }
// //unlock
// valueFile.close();
// return res;
// }
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog( std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(
std::vector<std::pair<Slice, Slice>> kv) { std::vector<std::pair<Slice, Slice>> kv) {
@ -1660,55 +1635,6 @@ void DBImpl::addNewValueLog() {
valuelogfile_number_ = versions_->NewFileNumber(); valuelogfile_number_ = versions_->NewFileNumber();
} }
// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value)
// {
// //lock_shared
// Status s = Status::OK();
// std::string file_name_ = ValueLogFileName(dbname_, file_id);
// // Open the file in binary mode for reading
// std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
// if (!inFile.is_open()) {
// std::cerr << "Failed to open file: " << file_name_ << " for reading!"
// << std::endl; return Status::Corruption("Failed to open file for
// reading!");
// }
// // Seek to the position of len
// inFile.seekg(offset);
// // Read the length of the value
// // uint64_t len;
// // inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
// char *value_buf_len=new char[sizeof(uint64_t)];
// inFile.read(value_buf_len,sizeof(uint64_t));
// uint64_t len=0;
// std::memcpy(&len, value_buf_len, sizeof(uint64_t));
// if (!inFile.good()) {
// inFile.close();
// return Status::Corruption("Failed to read length from file!");
// }
// // Now seek to the actual data position and read the value
// inFile.seekg(offset + sizeof(uint64_t));
// char* value_buf = new char[len];
// inFile.read(value_buf, len);
// if (!inFile.good()) {
// delete[] value_buf;
// inFile.close();
// return Status::Corruption("Failed to read value from file!");
// }
// // Close the file after reading
// inFile.close();
// // Assign the read data to the Slice
// *value = Slice(value_buf, len);
// return s;
// }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) { Slice* value) {
Status s = Status::OK(); Status s = Status::OK();
@ -1796,14 +1722,13 @@ bool IsValueLogFile(const std::string& filename) {
filename.substr(filename.size() - suffix.size()) == suffix; filename.substr(filename.size() - suffix.size()) == suffix;
} }
// 示例:解析 sstable 中的元信息 // 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) { uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset // 假设 stored_value 格式为:valuelog_id|offset
Slice tmp(stored_value.data(),stored_value.size());
GetVarint64(&tmp,&valuelog_id);
GetVarint64(&tmp,&offset);
Slice tmp(stored_value.data(), stored_value.size());
GetVarint64(&tmp, &valuelog_id);
GetVarint64(&tmp, &offset);
} }
// 示例:获取 ValueLog 文件 ID // 示例:获取 ValueLog 文件 ID
@ -1830,7 +1755,6 @@ uint64_t GetValueLogID(const std::string& valuelog_name) {
return std::stoull(id_str); // 转换为 uint64_t return std::stoull(id_str); // 转换为 uint64_t
} }
// 垃圾回收实现 // 垃圾回收实现
void DBImpl::GarbageCollect() { void DBImpl::GarbageCollect() {
gc_mutex_.AssertHeld(); gc_mutex_.AssertHeld();
@ -1838,45 +1762,47 @@ void DBImpl::GarbageCollect() {
Log(options_.info_log, "start gc "); Log(options_.info_log, "start gc ");
auto files_set = fs::directory_iterator(dbname_); auto files_set = fs::directory_iterator(dbname_);
std::set<std::string> valuelog_set; std::set<std::string> valuelog_set;
std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_);
std::string cur_valuelog_name =
ValueLogFileName(dbname_, valuelogfile_number_);
for (const auto& cur_log_file : files_set) { for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) && if (fs::exists(cur_log_file) &&
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
if(cur_valuelog_name==cur_log_file.path().filename().string())continue;
valuelog_set.emplace(cur_log_file.path().filename().string());
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
if (cur_valuelog_name == cur_log_file.path().filename().string())
continue;
valuelog_set.emplace(cur_log_file.path().filename().string());
} }
} }
for (std::string valuelog_name:valuelog_set) {
for (std::string valuelog_name : valuelog_set) {
// std::cout << valuelog_name << std::endl; // std::cout << valuelog_name << std::endl;
uint64_t cur_log_number = GetValueLogID(valuelog_name); uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name=ValueLogFileName(dbname_,cur_log_number);
valuelog_name = ValueLogFileName(dbname_, cur_log_number);
uint64_t current_offset = 0; uint64_t current_offset = 0;
uint64_t tmp_offset = 0; uint64_t tmp_offset = 0;
int cnt=0;
int cnt = 0;
// Open the file in binary mode for reading
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) { if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!"
<< std::endl;
std::cerr << "Failed to open file: " << valuelog_name
<< " for reading cur_valuelog!" << std::endl;
continue; continue;
} }
while (true) { while (true) {
tmp_offset=current_offset;
tmp_offset = current_offset;
++cnt; ++cnt;
// std::cout << cnt <<" "<<current_offset<< std::endl; // std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对 // 读取一个 kv 对
uint64_t key_len, value_len; uint64_t key_len, value_len;
Slice key, value; Slice key, value;
Status s = Status::OK(); Status s = Status::OK();
// Seek to the position of key length
// Seek to the position of key length
cur_valuelog.seekg(current_offset); cur_valuelog.seekg(current_offset);
// Read the length of the key // Read the length of the key
@ -1885,7 +1811,7 @@ void DBImpl::GarbageCollect() {
if (cur_valuelog.eof()) { if (cur_valuelog.eof()) {
delete[] key_buf_len; delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
break; // 正常退出条件:到达文件末尾
} }
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
@ -1911,7 +1837,6 @@ void DBImpl::GarbageCollect() {
break; break;
} }
current_offset += key_len; current_offset += key_len;
// Assign the read key data to the Slice // Assign the read key data to the Slice
key = Slice(key_buf, key_len); key = Slice(key_buf, key_len);
@ -1955,7 +1880,7 @@ void DBImpl::GarbageCollect() {
// 检查 key 是否在 sstable 中存在 // 检查 key 是否在 sstable 中存在
std::string stored_value; std::string stored_value;
auto option=leveldb::ReadOptions();
auto option = leveldb::ReadOptions();
option.find_value_log_for_gc = true; option.find_value_log_for_gc = true;
Status status = Get(option, key, &stored_value); Status status = Get(option, key, &stored_value);
@ -1974,13 +1899,13 @@ void DBImpl::GarbageCollect() {
// 检查 valuelog_id 和 offset 是否匹配 // 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset; uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value.substr(1), stored_valuelog_id, ParseStoredValue(stored_value.substr(1), stored_valuelog_id,
stored_offset); // 假设解析函数
stored_offset); // 假设解析函数
if (stored_valuelog_id != GetValueLogID(valuelog_name) || if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != tmp_offset) { stored_offset != tmp_offset) {
// 记录无效,跳过 // 记录无效,跳过
continue; continue;
} }
status = Put(leveldb::WriteOptions(), key, value); status = Put(leveldb::WriteOptions(), key, value);
if (!status.ok()) { if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString() std::cerr << "Error accessing sstable: " << status.ToString()
@ -1992,7 +1917,7 @@ void DBImpl::GarbageCollect() {
// 清理旧文件(如果需要) // 清理旧文件(如果需要)
cur_valuelog.close(); cur_valuelog.close();
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
fs::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str()); Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str());
} }
} }

Loading…
Cancel
Save