Browse Source

gc dead

xxy
xxy 9 months ago
parent
commit
9da542dfe1
4 changed files with 172 additions and 27 deletions
  1. +128
    -23
      db/db_impl.cc
  2. +3
    -0
      db/db_impl.h
  3. +2
    -0
      include/leveldb/db.h
  4. +39
    -4
      test/test.cpp

+ 128
- 23
db/db_impl.cc View File

@ -663,6 +663,15 @@ Status DBImpl::TEST_CompactMemTable() {
return s; return s;
} }
void DBImpl::TEST_GarbageCollect() {
MaybeScheduleGarbageCollect();
// Finish current background compaction in the case where
// `background_work_finished_signal_` was signalled due to an error.
while (background_garbage_collect_scheduled_) {
background_work_finished_signal_.Wait();
}
}
void DBImpl::RecordBackgroundError(const Status& s) { void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (bg_error_.ok()) { if (bg_error_.ok()) {
@ -725,8 +734,8 @@ void DBImpl::BackgroundCall() {
background_compaction_scheduled_ = false; background_compaction_scheduled_ = false;
// Check if garbage collection needs to be scheduled after compaction
MaybeScheduleGarbageCollect();
// // Check if garbage collection needs to be scheduled after compaction
// MaybeScheduleGarbageCollect();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed. // so reschedule another compaction if needed.
@ -735,7 +744,7 @@ void DBImpl::BackgroundCall() {
} }
void DBImpl::BackgroundGarbageCollect() { void DBImpl::BackgroundGarbageCollect() {
MutexLock l(&mutex_);
mutex_.AssertHeld();
assert(background_garbage_collect_scheduled_); assert(background_garbage_collect_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
@ -1767,10 +1776,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
// 判断文件是否为 valuelog 文件 // 判断文件是否为 valuelog 文件
bool IsValueLogFile(const std::string& filename) { bool IsValueLogFile(const std::string& filename) {
return filename.find("valuelog_") ==
0; // 简单判断文件名是否匹配 valuelog 前缀
// 检查文件是否以 ".valuelog" 结尾
const std::string suffix = ".valuelog";
return filename.size() > suffix.size() &&
filename.substr(filename.size() - suffix.size()) == suffix;
} }
// 示例:解析 sstable 中的元信息 // 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) { uint64_t& offset) {
@ -1781,14 +1793,33 @@ void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
// 示例:获取 ValueLog 文件 ID // 示例:获取 ValueLog 文件 ID
uint64_t GetValueLogID(const std::string& valuelog_name) { uint64_t GetValueLogID(const std::string& valuelog_name) {
// 假设文件名中包含唯一的 ID,例如 "valuelog_1"
auto pos = valuelog_name.find_last_of('_');
return std::stoull(valuelog_name.substr(pos + 1));
// 使用 std::filesystem::path 解析文件名
std::filesystem::path file_path(valuelog_name);
std::string filename = file_path.filename().string(); // 获取文件名部分
// 查找文件名中的 '.' 位置,提取数字部分
auto pos = filename.find('.');
if (pos == std::string::npos) {
assert(0);
}
// 提取数字部分
std::string id_str = filename.substr(0, pos);
// 检查提取的部分是否为有效数字
for (char c : id_str) {
if (!isdigit(c)) {
assert(0);
}
}
return std::stoull(id_str); // 转换为 uint64_t
} }
// 垃圾回收实现 // 垃圾回收实现
void DBImpl::GarbageCollect() { void DBImpl::GarbageCollect() {
// 遍历数据库目录,找到所有 valuelog 文件 // 遍历数据库目录,找到所有 valuelog 文件
Log(options_.info_log, "start gc ");
auto files_set = fs::directory_iterator(dbname_); auto files_set = fs::directory_iterator(dbname_);
for (const auto& cur_log_file : files_set) { for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) && if (fs::exists(cur_log_file) &&
@ -1796,6 +1827,7 @@ void DBImpl::GarbageCollect() {
IsValueLogFile(cur_log_file.path().filename().string())) { IsValueLogFile(cur_log_file.path().filename().string())) {
std::string valuelog_name = cur_log_file.path().string(); std::string valuelog_name = cur_log_file.path().string();
uint64_t cur_log_number = GetValueLogID(valuelog_name); uint64_t cur_log_number = GetValueLogID(valuelog_name);
std::cout << "check point 1" << std::endl;
uint64_t new_log_number = versions_->NewFileNumber(); uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* new_valuelog = nullptr; WritableFile* new_valuelog = nullptr;
std::string new_valuelog_name = LogFileName(dbname_, new_log_number); std::string new_valuelog_name = LogFileName(dbname_, new_log_number);
@ -1807,13 +1839,7 @@ void DBImpl::GarbageCollect() {
break; break;
} }
addNewValueLog(); addNewValueLog();
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open ValueLog file: " << valuelog_name
<< std::endl;
continue;
}
std::cout << "check point 2" << std::endl;
// whether to reopen // whether to reopen
std::ifstream new_valuelog_file(new_valuelog_name, std::ifstream new_valuelog_file(new_valuelog_name,
@ -1825,16 +1851,96 @@ void DBImpl::GarbageCollect() {
} }
uint64_t current_offset = 0; uint64_t current_offset = 0;
uint64_t new_offset = 0; // 新的 ValueLog 偏移
int cnt=0;
std::cout << "check point 3" << std::endl;
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name << " for reading!"
<< std::endl;
continue;
}
while (true) { while (true) {
++cnt;
std::cout << cnt << std::endl;
// 读取一个 kv 对 // 读取一个 kv 对
uint64_t key_len, value_len; uint64_t key_len, value_len;
Slice key, value; Slice key, value;
Slice new_value;
ReadValueLog(cur_log_number, current_offset, &key, &value);
value = std::string(new_value.data(), new_value.size());
Status s = Status::OK();
// Seek to the position of key length
cur_valuelog.seekg(current_offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
}
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// Now seek to the actual key position and read the key
cur_valuelog.seekg(current_offset + sizeof(uint64_t));
char* key_buf = new char[key_len];
cur_valuelog.read(key_buf, key_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len + sizeof(uint64_t));
char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// Assign the read value data to the Slice
value = Slice(value_buf, val_len);
// 检查 key 是否在 sstable 中存在 // 检查 key 是否在 sstable 中存在
std::string stored_value; std::string stored_value;
@ -1868,10 +1974,6 @@ void DBImpl::GarbageCollect() {
continue; continue;
} }
// 更新偏移
new_offset +=
sizeof(key_len) + key.size() + sizeof(value_len) + value.size();
// 更新当前偏移 // 更新当前偏移
current_offset += current_offset +=
sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); sizeof(key_len) + key.size() + sizeof(value_len) + value.size();
@ -1882,6 +1984,9 @@ void DBImpl::GarbageCollect() {
new_valuelog_file.close(); new_valuelog_file.close();
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str());
Log(options_.info_log, "add file during gc %s", new_valuelog_name.c_str());
} }
} }
} }

+ 3
- 0
db/db_impl.h View File

@ -86,6 +86,9 @@ class DBImpl : public DB {
// Force current memtable contents to be compacted. // Force current memtable contents to be compacted.
Status TEST_CompactMemTable(); Status TEST_CompactMemTable();
void TEST_GarbageCollect() override;
// Return an internal iterator over the current state of the database. // Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h). // The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed. // The returned iterator should be deleted when no longer needed.

+ 2
- 0
include/leveldb/db.h View File

@ -183,6 +183,8 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database: // Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr); // db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0; virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
virtual void TEST_GarbageCollect() = 0;
}; };
// Destroy the contents of the specified database. // Destroy the contents of the specified database.

+ 39
- 4
test/test.cpp View File

@ -158,7 +158,39 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st
// delete db; // delete db;
// } // }
TEST(Test, LARGE_DATA_COMPACT_TEST) {
// TEST(Test, LARGE_DATA_COMPACT_TEST) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> values;
// for(int i=0;i<500000;i++){
// std::string key=std::to_string(i);
// std::string value;
// for(int j=0;j<1000;j++){
// value+=std::to_string(i);
// }
// values.push_back(value);
// db->Put(writeOptions,key,value);
// }
// for(int i=0;i<500000;i++){
// std::string key=std::to_string(i);
// std::string value;
// Status s=db->Get(readOptions,key,&value);
// assert(s.ok());
// if(values[i]!=value){
// std::cout<<value.size()<<std::endl;
// assert(0);
// }
// ASSERT_TRUE(values[i]==value);
// }
// delete db;
// }
TEST(Test, Garbage_Collect_TEST) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
@ -167,7 +199,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
abort(); abort();
} }
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<5000;i++){
for(int i=0;i<500000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
for(int j=0;j<1000;j++){ for(int j=0;j<1000;j++){
@ -176,7 +208,11 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
values.push_back(value); values.push_back(value);
db->Put(writeOptions,key,value); db->Put(writeOptions,key,value);
} }
for(int i=0;i<5000;i++){
std::cout<<"start gc"<<std::endl;
db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<500000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
Status s=db->Get(readOptions,key,&value); Status s=db->Get(readOptions,key,&value);
@ -191,7 +227,6 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits. // All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);

Loading…
Cancel
Save