Browse Source

v3 roughly complete while mutex lacks in put

xxy
xxy 9 months ago
parent
commit
8d45bf108b
5 changed files with 174 additions and 166 deletions
  1. +1
    -1
      CMakeLists.txt
  2. +160
    -162
      db/db_impl.cc
  3. +7
    -0
      db/db_impl.h
  4. +3
    -0
      include/leveldb/options.h
  5. +3
    -3
      test/test.cpp

+ 1
- 1
CMakeLists.txt View File

@ -10,7 +10,7 @@ project(leveldb VERSION 1.23.0 LANGUAGES C CXX)
if(NOT CMAKE_C_STANDARD)
# This project can use C11, but will gracefully decay down to C89.
# 17
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD 17)
set(CMAKE_C_STANDARD_REQUIRED OFF)
set(CMAKE_C_EXTENSIONS OFF)
endif(NOT CMAKE_C_STANDARD)

+ 160
- 162
db/db_impl.cc View File

@ -20,6 +20,7 @@
#include <cstdio>
#include <fstream>
#include <thread>
#include <iostream>
#include <set>
#include <string>
@ -142,6 +143,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(nullptr),
shutting_down_(false),
background_work_finished_signal_(&mutex_),
background_gc_finished_signal_(&gc_mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
@ -668,8 +670,9 @@ void DBImpl::TEST_GarbageCollect() {
// Finish current background compaction in the case where
// `background_work_finished_signal_` was signalled due to an error.
while (background_garbage_collect_scheduled_) {
background_work_finished_signal_.Wait();
background_gc_finished_signal_.Wait();
}
// std::cout<<"bg_signal"<<std::endl;
}
void DBImpl::RecordBackgroundError(const Status& s) {
@ -707,10 +710,11 @@ void DBImpl::MaybeScheduleGarbageCollect() {
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else {
gc_mutex_.Lock();
background_garbage_collect_scheduled_ = true;
env_->Schedule(&DBImpl::BGWorkGC, this);
auto bg_thread_ = std::thread(&DBImpl::BGWorkGC, this);
bg_thread_.detach();
}
}
void DBImpl::BGWorkGC(void* db) {
@ -744,7 +748,7 @@ void DBImpl::BackgroundCall() {
}
void DBImpl::BackgroundGarbageCollect() {
mutex_.AssertHeld();
MutexLock l(&gc_mutex_);
assert(background_garbage_collect_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
@ -754,12 +758,15 @@ void DBImpl::BackgroundGarbageCollect() {
} else {
// Perform garbage collection here
GarbageCollect();
gc_mutex_.Unlock();
}
background_garbage_collect_scheduled_ = false;
// Notify any waiting threads
background_work_finished_signal_.SignalAll();
background_gc_finished_signal_.SignalAll();
}
void DBImpl::BackgroundCompaction() {
@ -1219,6 +1226,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
if(!s.ok())return s;
if(options.find_value_log_for_gc){
return s;
}
if (value->c_str()[0] == 0x00) {
*value = value->substr(1);
@ -1233,7 +1244,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
if(!s.ok()){
return s;
}
*value = std::string(new_value.data(), new_value.size());
delete[] new_value.data();
return s;
@ -1787,8 +1801,9 @@ bool IsValueLogFile(const std::string& filename) {
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset
std::istringstream iss(stored_value);
iss >> valuelog_id >> offset;
Slice tmp(stored_value.data(),stored_value.size());
GetVarint64(&tmp,&valuelog_id);
GetVarint64(&tmp,&offset);
}
// 示例:获取 ValueLog 文件 ID
@ -1818,184 +1833,167 @@ uint64_t GetValueLogID(const std::string& valuelog_name) {
// 垃圾回收实现
void DBImpl::GarbageCollect() {
gc_mutex_.AssertHeld();
// 遍历数据库目录,找到所有 valuelog 文件
Log(options_.info_log, "start gc ");
auto files_set = fs::directory_iterator(dbname_);
std::set<std::string> valuelog_set;
std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_);
for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) &&
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
std::string valuelog_name = cur_log_file.path().string();
std::cout << valuelog_name << std::endl;
uint64_t cur_log_number = GetValueLogID(valuelog_name);
std::cout << "check point 1" << std::endl;
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* new_valuelog = nullptr;
std::string new_valuelog_name = LogFileName(dbname_, new_log_number);
Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number),
&new_valuelog);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
addNewValueLog();
std::cout << "check point 2" << std::endl;
// whether to reopen
std::ifstream new_valuelog_file(new_valuelog_name,
std::ios::in | std::ios::binary);
if (!new_valuelog_file.is_open()) {
std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name
<< std::endl;
continue;
}
uint64_t current_offset = 0;
uint64_t tmp_offset = 0;
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
if(cur_valuelog_name==cur_log_file.path().filename().string())continue;
valuelog_set.emplace(cur_log_file.path().filename().string());
}
}
for (std::string valuelog_name:valuelog_set) {
// std::cout << valuelog_name << std::endl;
uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name=ValueLogFileName(dbname_,cur_log_number);
int cnt=0;
uint64_t current_offset = 0;
uint64_t tmp_offset = 0;
std::cout << "check point 3" << std::endl;
int cnt=0;
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!"
<< std::endl;
continue;
}
while (true) {
tmp_offset=current_offset;
++cnt;
std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对
uint64_t key_len, value_len;
Slice key, value;
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!"
<< std::endl;
continue;
}
Status s = Status::OK();
while (true) {
tmp_offset=current_offset;
++cnt;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对
uint64_t key_len, value_len;
Slice key, value;
// Seek to the position of key length
cur_valuelog.seekg(current_offset);
Status s = Status::OK();
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
// Seek to the position of key length
cur_valuelog.seekg(current_offset);
if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
}
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
}
if (!cur_valuelog.good()) {
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual key position and read the key
cur_valuelog.seekg(current_offset);
char* key_buf = new char[key_len];
cur_valuelog.read(key_buf, key_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += key_len;
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset);
char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += val_len;
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
// Assign the read value data to the Slice
value = Slice(value_buf, val_len);
if (!cur_valuelog.good()) {
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual key position and read the key
cur_valuelog.seekg(current_offset);
char* key_buf = new char[key_len];
cur_valuelog.read(key_buf, key_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += key_len;
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset);
char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += val_len;
// 检查 key 是否在 sstable 中存在
std::string stored_value;
Status status = Get(leveldb::ReadOptions(), key, &stored_value);
// Assign the read value data to the Slice
value = Slice(value_buf, val_len);
// std::cout<<val_len<<std::endl;
if (status.IsNotFound()) {
// Key 不存在,忽略此记录
continue;
}
// 检查 key 是否在 sstable 中存在
std::string stored_value;
auto option=leveldb::ReadOptions();
option.find_value_log_for_gc = true;
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
Status status = Get(option, key, &stored_value);
// 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value, stored_valuelog_id,
stored_offset); // 假设解析函数
if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != tmp_offset) {
// 记录无效,跳过
continue;
}
if (status.IsNotFound()) {
// Key 不存在,忽略此记录
continue;
}
status = Put(leveldb::WriteOptions(), key, value);
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
// 清理旧文件(如果需要)
cur_valuelog.close();
new_valuelog_file.close();
// 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value.substr(1), stored_valuelog_id,
stored_offset); // 假设解析函数
if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != tmp_offset) {
// 记录无效,跳过
continue;
}
status = Put(leveldb::WriteOptions(), key, value);
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
}
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str());
Log(options_.info_log, "add file during gc %s", new_valuelog_name.c_str());
// 清理旧文件(如果需要)
cur_valuelog.close();
}
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str());
}
}

+ 7
- 0
db/db_impl.h View File

@ -210,9 +210,16 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::Mutex gc_mutex_;
// port::Mutex spj_mutex_;
// std::shared_mutex value_log_mutex;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_);
// Slice valuelog_finding_key GUARDED_BY(mutex_ );
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_

+ 3
- 0
include/leveldb/options.h View File

@ -157,6 +157,8 @@ struct LEVELDB_EXPORT ReadOptions {
// Callers may wish to set this field to false for bulk scans.
bool fill_cache = true;
bool find_value_log_for_gc = false;
// If "snapshot" is non-null, read as of the supplied snapshot
// (which must belong to the DB that is being read and which must
// not have been released). If "snapshot" is null, use an implicit
@ -183,6 +185,7 @@ struct LEVELDB_EXPORT WriteOptions {
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
bool sync = false;
// bool valuelog_write=false;
};
} // namespace leveldb

+ 3
- 3
test/test.cpp View File

@ -199,7 +199,7 @@ TEST(Test, Garbage_Collect_TEST) {
abort();
}
std::vector<std::string> values;
for(int i=0;i<5000;i++){
for(int i=0;i<500000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1000;j++){
@ -212,8 +212,8 @@ TEST(Test, Garbage_Collect_TEST) {
db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<5000;i++){
std::cout<<i<<std::endl;
for(int i=0;i<500000;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);

Loading…
Cancel
Save