Browse Source

fix multi threads'bug for same key pushing while GC

xxy
alexfisher 9 months ago
parent
commit
a6501bc750
10 changed files with 728 additions and 252 deletions
  1. +5
    -4
      CMakeLists.txt
  2. +1
    -0
      db/builder.cc
  3. +542
    -153
      db/db_impl.cc
  4. +47
    -16
      db/db_impl.h
  5. +5
    -3
      db/db_iter.cc
  6. +6
    -5
      db/write_batch.cc
  7. +15
    -3
      include/leveldb/db.h
  8. +3
    -0
      include/leveldb/options.h
  9. +1
    -1
      include/leveldb/slice.h
  10. +103
    -67
      test/test.cpp

+ 5
- 4
CMakeLists.txt View File

@ -2,22 +2,23 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.
cmake_minimum_required(VERSION 3.9)
cmake_minimum_required(VERSION 3.10)
# Keep the version below in sync with the one in db.h
project(leveldb VERSION 1.23.0 LANGUAGES C CXX)
# C standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_C_STANDARD)
# This project can use C11, but will gracefully decay down to C89.
set(CMAKE_C_STANDARD 11)
# 17
set(CMAKE_C_STANDARD 17)
set(CMAKE_C_STANDARD_REQUIRED OFF)
set(CMAKE_C_EXTENSIONS OFF)
endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
# This project requires C++17.
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD)

+ 1
- 0
db/builder.cc View File

@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
return s;
}
// 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value
{
auto tmp_value=iter->value();
if(tmp_value.data()[0]==(char)(0x01)){

+ 542
- 153
db/db_impl.cc View File

@ -4,15 +4,6 @@
#include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -23,11 +14,24 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <fstream>
#include <thread>
#include <iostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
@ -35,6 +39,8 @@
#include "util/coding.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include <filesystem>
namespace fs = std::filesystem;
namespace leveldb {
@ -57,7 +63,6 @@ struct DBImpl::CompactionState {
struct Output {
uint64_t number;
uint64_t file_size;
uint64_t valuelog_id;
InternalKey smallest, largest;
};
@ -84,10 +89,6 @@ struct DBImpl::CompactionState {
WritableFile* outfile;
TableBuilder* builder;
WritableFile* valuelogfile;
uint64_t valuelog_offset=0;
uint64_t valuelog_file_id=0;
uint64_t total_bytes;
};
@ -142,6 +143,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(nullptr),
shutting_down_(false),
background_work_finished_signal_(&mutex_),
background_gc_finished_signal_(&gc_mutex_),
spj_mutex_cond_(&spj_mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
@ -151,6 +154,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0),
tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false),
background_garbage_collect_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
@ -281,10 +285,6 @@ void DBImpl::RemoveObsoleteFiles() {
}
Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
static_cast<unsigned long long>(number));
if(oldvaluelog_ids.count(number)){
std::string valuelog_filename=ValueLogFileName(dbname_,oldvaluelog_ids[number]);
env_->RemoveFile(valuelog_filename);
}
}
}
}
@ -546,7 +546,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest,meta.valuelog_id);
meta.largest);
}
CompactionStats stats;
@ -666,6 +666,16 @@ Status DBImpl::TEST_CompactMemTable() {
return s;
}
void DBImpl::TEST_GarbageCollect() {
MaybeScheduleGarbageCollect();
// Finish current background compaction in the case where
// `background_work_finished_signal_` was signalled due to an error.
while (background_garbage_collect_scheduled_) {
background_gc_finished_signal_.Wait();
}
// std::cout<<"bg_signal"<<std::endl;
}
void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
@ -676,6 +686,7 @@ void DBImpl::RecordBackgroundError(const Status& s) {
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
@ -691,6 +702,26 @@ void DBImpl::MaybeScheduleCompaction() {
}
}
void DBImpl::MaybeScheduleGarbageCollect() {
mutex_.AssertHeld();
if (background_garbage_collect_scheduled_) {
// Garbage collection already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background work
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else {
gc_mutex_.Lock();
background_garbage_collect_scheduled_ = true;
auto bg_thread_ = std::thread(&DBImpl::BGWorkGC, this);
bg_thread_.detach();
}
}
void DBImpl::BGWorkGC(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundGarbageCollect();
}
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
@ -708,12 +739,37 @@ void DBImpl::BackgroundCall() {
background_compaction_scheduled_ = false;
// // Check if garbage collection needs to be scheduled after compaction
// MaybeScheduleGarbageCollect();
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
background_work_finished_signal_.SignalAll();
}
void DBImpl::BackgroundGarbageCollect() {
MutexLock l(&gc_mutex_);
assert(background_garbage_collect_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
// Perform garbage collection here
GarbageCollect();
gc_mutex_.Unlock();
}
background_garbage_collect_scheduled_ = false;
// Notify any waiting threads
background_gc_finished_signal_.SignalAll();
}
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
@ -750,7 +806,7 @@ void DBImpl::BackgroundCompaction() {
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest,f->valuelog_id);
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
@ -824,11 +880,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->valuelog_file_id=versions_->NewFileNumber();
out.valuelog_id=compact->valuelog_file_id;
compact->outputs.push_back(out);
mutex_.Unlock();
}
@ -838,11 +890,6 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
compact->valuelog_offset=0;
s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile);
return s;
}
@ -878,19 +925,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
}
delete compact->outfile;
compact->outfile = nullptr;
if (s.ok()) {
s = compact->valuelogfile->Flush();
}
if (s.ok()) {
s = compact->valuelogfile->Sync();
}
if (s.ok()) {
s = compact->valuelogfile->Close();
}
delete compact->valuelogfile;
compact->valuelogfile=nullptr;
compact->valuelog_file_id=0;
compact->valuelog_offset=0;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
@ -921,7 +955,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest,out.valuelog_id);
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -955,11 +989,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
if(compact->compaction->input(which, i)->valuelog_id)oldvaluelog_ids[compact->compaction->input(which, i)->number]=compact->compaction->input(which, i)->valuelog_id;
}
}
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// Prioritize immutable compaction work
if (has_imm_.load(std::memory_order_relaxed)) {
@ -1040,35 +1069,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
Slice old_value=input->value();
Slice new_value;
std::string buf="";
if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){//when it is a deletion, input->value() will be ""
new_value=old_value;
}
else{
old_value.remove_prefix(1);
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&old_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_offset);
if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_len);
if(!res)assert(0);
Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
assert(s.ok());
writeValueLogForCompaction(compact->valuelogfile,{new_value});
buf+=(char)(0x01);
PutVarint64(&buf,compact->valuelog_file_id);
PutVarint64(&buf,compact->valuelog_offset);
PutVarint64(&buf,valuelog_len);
compact->valuelog_offset+=valuelog_len;
delete []new_value.data();
new_value=Slice(buf);
}
compact->builder->Add(key, new_value);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
@ -1092,14 +1093,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) {
status = input->status();
}
//not completely correct, should be written in new function, related to removeabsol...
// if(status.ok()){
// for(auto id:old_valuelog_ids){
// auto valuelog_filename=ValueLogFileName(dbname_,id);
// Status s=env_->RemoveFile(valuelog_filename);
// assert(s.ok());
// }
// }
delete input;
input = nullptr;
@ -1235,22 +1228,29 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (imm != nullptr) imm->Unref();
current->Unref();
if(!s.ok())return s;
if(value->c_str()[0]==0x00){
*value=value->substr(1);
if(options.find_value_log_for_gc){
return s;
}
Slice value_log_slice=Slice(value->c_str()+1,value->length());
if (value->c_str()[0] == 0x00) {
*value = value->substr(1);
return s;
}
Slice value_log_slice = Slice(value->c_str() + 1, value->length());
Slice new_key;
Slice new_value;
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id");
res=GetVarint64(&value_log_slice,&valuelog_offset);
if(!res)return Status::Corruption("can't decode valuelog offset");
res=GetVarint64(&value_log_slice,&valuelog_len);
if(!res)return Status::Corruption("can't decode valuelog len");
ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
*value=std::string(new_value.data(),new_value.size());
delete []new_value.data();
int value_offset = sizeof(uint64_t) * 2; // 16
uint64_t file_id, valuelog_offset;
bool res = GetVarint64(&value_log_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
if(!s.ok()){
return s;
}
*value = std::string(new_value.data(), new_value.size());
delete[] new_value.data();
return s;
}
@ -1285,10 +1285,17 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
if(!o.valuelog_write){
spj_mutex_.Lock();
while(key==valuelog_finding_key){
spj_mutex_cond_.Wait();
}
spj_mutex_.Unlock();
}
return DB::Put(o, key, val);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
@ -1314,7 +1321,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch,this);
WriteBatchInternal::ConverToValueLog(write_batch, this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
@ -1351,17 +1358,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
//if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.SignalAll();
//}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
writers_.front()->cv.SignalAll();
}
return status;
@ -1574,70 +1581,452 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref();
}
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
//lock
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valuelogfile_->Append(values[i]);
// res.push_back({valuelogfile_number_,{valuelogfile_offset,len}});
// valuelogfile_offset+=len;
// }
// //unlock
// valuelogfile_->Flush();
// return res;
std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// DBImpl::WriteValueLog(std::vector<Slice> values){
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
// if (!valueFile.is_open()) {
// assert(0);
// }
// uint64_t offset=valueFile.tellp();
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valueFile.write(values[i].data(),len);
// res.push_back({valuelogfile_number_,{offset,len}});
// offset+=len;
// }
// //unlock
// valueFile.close();
// return res;
// }
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(
std::vector<std::pair<Slice, Slice>> kv) {
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
assert(0);
}
uint64_t offset = valueFile.tellp();
std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& [key_slice, value_slice] : kv) {
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
uint64_t offset=valueFile.tellp();
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
for(int i=0;i<values.size();i++){
int len=values[i].size();
valueFile.write(values[i].data(),len);
res.push_back({valuelogfile_number_,{offset,len}});
offset+=len;
}
//unlock
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 记录 file_id 和 offset
res.push_back({valuelogfile_number_, offset});
// 更新偏移量
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len;
}
// 解锁资源或进行其他清理操作
valueFile.close();
return res;
}
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){
for(int i=0;i<values.size();i++){
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,
std::vector<Slice> values) {
for (int i = 0; i < values.size(); i++) {
target_file->Append(values[i]);
}
}
void DBImpl::addNewValueLog(){
//lock
// if(valuelogfile_){
// valuelogfile_->Sync();
// valuelogfile_->Close();
// delete valuelogfile_;
// }
valuelogfile_number_=versions_->NewFileNumber();
// valuelogfile_offset=0;
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// env_->NewWritableFile(file_name_,&valuelogfile_);
//unlock
void DBImpl::addNewValueLog() {
valuelogfile_number_ = versions_->NewFileNumber();
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
//lock_shared
std::string file_name_=ValueLogFileName(dbname_,file_id);
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value)
// {
// //lock_shared
// Status s = Status::OK();
// std::string file_name_ = ValueLogFileName(dbname_, file_id);
// // Open the file in binary mode for reading
// std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
// if (!inFile.is_open()) {
// std::cerr << "Failed to open file: " << file_name_ << " for reading!"
// << std::endl; return Status::Corruption("Failed to open file for
// reading!");
// }
// // Seek to the position of len
// inFile.seekg(offset);
// // Read the length of the value
// // uint64_t len;
// // inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
// char *value_buf_len=new char[sizeof(uint64_t)];
// inFile.read(value_buf_len,sizeof(uint64_t));
// uint64_t len=0;
// std::memcpy(&len, value_buf_len, sizeof(uint64_t));
// if (!inFile.good()) {
// inFile.close();
// return Status::Corruption("Failed to read length from file!");
// }
// // Now seek to the actual data position and read the value
// inFile.seekg(offset + sizeof(uint64_t));
// char* value_buf = new char[len];
// inFile.read(value_buf, len);
// if (!inFile.good()) {
// delete[] value_buf;
// inFile.close();
// return Status::Corruption("Failed to read value from file!");
// }
// // Close the file after reading
// inFile.close();
// // Assign the read data to the Slice
// *value = Slice(value_buf, len);
// return s;
// }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) {
Status s = Status::OK();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
// Open the file in binary mode for reading
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) {
std::cerr << "Failed to open file for writing!"<<file_id<<" "<<offset<<" "<<len<< std::endl;
return Status::Corruption("Failed to open file for writing!");
std::cerr << "Failed to open file: " << file_name_ << " for read valuelog!"
<< std::endl;
return Status::Corruption("Failed to open file for reading!");
}
// Seek to the position of key length
inFile.seekg(offset);
char *value_buf=new char[len];
inFile.read(value_buf,len);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
inFile.read(key_buf_len, sizeof(uint64_t));
uint64_t key_len = 0;
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key length from file!");
}
// Now seek to the actual key position and read the key
inFile.seekg(offset + sizeof(uint64_t));
char* key_buf = new char[key_len];
inFile.read(key_buf, key_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key from file!");
}
// Assign the read key data to the Slice
*key = Slice(key_buf, key_len);
// Read the length of the value
inFile.seekg(offset + sizeof(uint64_t) + key_len);
char* value_buf_len = new char[sizeof(uint64_t)];
inFile.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
inFile.close();
return Status::Corruption("Failed to read value length from file!");
}
// Now seek to the actual data position and read the value
inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t));
char* value_buf = new char[val_len];
inFile.read(value_buf, val_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
}
// Close the file after reading
inFile.close();
*value=Slice(value_buf,len);
return Status::OK();
// Assign the read value data to the Slice
*value = Slice(value_buf, val_len);
return s;
}
// 判断文件是否为 valuelog 文件
bool IsValueLogFile(const std::string& filename) {
// 检查文件是否以 ".valuelog" 结尾
const std::string suffix = ".valuelog";
return filename.size() > suffix.size() &&
filename.substr(filename.size() - suffix.size()) == suffix;
}
// 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset
Slice tmp(stored_value.data(),stored_value.size());
GetVarint64(&tmp,&valuelog_id);
GetVarint64(&tmp,&offset);
}
// 示例:获取 ValueLog 文件 ID
uint64_t GetValueLogID(const std::string& valuelog_name) {
// 使用 std::filesystem::path 解析文件名
std::filesystem::path file_path(valuelog_name);
std::string filename = file_path.filename().string(); // 获取文件名部分
// 查找文件名中的 '.' 位置,提取数字部分
auto pos = filename.find('.');
if (pos == std::string::npos) {
assert(0);
}
// 提取数字部分
std::string id_str = filename.substr(0, pos);
// 检查提取的部分是否为有效数字
for (char c : id_str) {
if (!isdigit(c)) {
assert(0);
}
}
return std::stoull(id_str); // 转换为 uint64_t
}
// 垃圾回收实现
void DBImpl::GarbageCollect() {
gc_mutex_.AssertHeld();
// 遍历数据库目录,找到所有 valuelog 文件
Log(options_.info_log, "start gc ");
auto files_set = fs::directory_iterator(dbname_);
std::set<std::string> valuelog_set;
std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_);
for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) &&
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
if(cur_valuelog_name==cur_log_file.path().filename().string())continue;
valuelog_set.emplace(cur_log_file.path().filename().string());
}
}
for (std::string valuelog_name:valuelog_set) {
// std::cout << valuelog_name << std::endl;
uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name=ValueLogFileName(dbname_,cur_log_number);
uint64_t current_offset = 0;
uint64_t tmp_offset = 0;
int cnt=0;
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!"
<< std::endl;
continue;
}
while (true) {
tmp_offset=current_offset;
++cnt;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对
uint64_t key_len, value_len;
Slice key, value;
Status s = Status::OK();
// Seek to the position of key length
cur_valuelog.seekg(current_offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
}
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual key position and read the key
cur_valuelog.seekg(current_offset);
char* key_buf = new char[key_len];
cur_valuelog.read(key_buf, key_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += key_len;
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset);
char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
current_offset += val_len;
// Assign the read value data to the Slice
value = Slice(value_buf, val_len);
// std::cout<<val_len<<std::endl;
// 检查 key 是否在 sstable 中存在
std::string stored_value;
//lock those thread who attempt to push "key"
spj_mutex_.Lock();
valuelog_finding_key=key;
spj_mutex_.Unlock();
//wait for current writer queue to do all their thing
mutex_.Lock();
if(writers_.size()>0){
auto last_writer=writers_.back();
while(!last_writer->done){
last_writer->cv.Wait();
}
}
mutex_.Unlock();
auto option=leveldb::ReadOptions();
option.find_value_log_for_gc = true;
Status status = Get(option, key, &stored_value);
if (status.IsNotFound()) {
// Key 不存在,忽略此记录
continue;
}
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
// 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value.substr(1), stored_valuelog_id,
stored_offset); // 假设解析函数
if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != tmp_offset) {
// 记录无效,跳过
continue;
}
auto write_op=leveldb::WriteOptions();
write_op.valuelog_write=true;
status = Put(write_op, key, value);
spj_mutex_.Lock();
valuelog_finding_key="";
spj_mutex_.Unlock();
spj_mutex_cond_.SignalAll();
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
}
// 清理旧文件(如果需要)
cur_valuelog.close();
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str());
}
}
// Default implementations of convenience methods that subclasses of DB

+ 47
- 16
db/db_impl.h View File

@ -5,19 +5,20 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include <atomic>
#include <deque>
#include <set>
#include <string>
#include <mutex>
#include <map>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
@ -44,12 +45,12 @@ class DBImpl : public DB {
// //
// FieldArray ParseValue(const std::string& value_str)override;
// Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields)override;
// Status Put_with_fields(const WriteOptions& options, const Slice& key,const
// FieldArray& fields)override;
// Status Get_with_fields(const ReadOptions& options, const Slice& key,
// FieldArray* fields)override;
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
@ -63,11 +64,19 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value);
void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);;
std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// WriteValueLog(std::vector<Slice> value)override;
std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog(
std::vector<std::pair<Slice, Slice>> value) override;
void writeValueLogForCompaction(WritableFile* target_file,
std::vector<Slice> value);
void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_);
;
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
// Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice*
// value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) override;
// Extra methods (for testing) that are not in the public DB interface
@ -77,6 +86,9 @@ class DBImpl : public DB {
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
void TEST_GarbageCollect() override;
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
@ -158,9 +170,15 @@ class DBImpl : public DB {
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void MaybeScheduleGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
static void BGWorkGC(void* db);
void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
@ -192,19 +210,28 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
//std::shared_mutex value_log_mutex;
port::Mutex gc_mutex_;
port::Mutex spj_mutex_;
port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_);
// std::shared_mutex value_log_mutex;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_);
Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ );
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
WritableFile* valuelogfile_;
int valuelogfile_offset=0;
int valuelogfile_offset = 0;
uint64_t logfile_number_;
uint64_t valuelogfile_number_;
log::Writer* log_;
std::map<uint64_t,uint64_t> oldvaluelog_ids;
std::map<uint64_t, uint64_t> oldvaluelog_ids;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
@ -220,6 +247,10 @@ class DBImpl : public DB {
// Has a background compaction been scheduled or is running?
bool background_compaction_scheduled_ GUARDED_BY(mutex_);
// Has a background gc been scheduled or is running?
bool background_garbage_collect_scheduled_ GUARDED_BY(mutex_);
ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
VersionSet* const versions_ GUARDED_BY(mutex_);

+ 5
- 3
db/db_iter.cc View File

@ -69,6 +69,7 @@ class DBIter : public Iterator {
Slice value() const override {
assert(valid_);
auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_;
Slice key;
if(tmp_value.data()[0]==0x00){
tmp_value.remove_prefix(1);
return tmp_value;
@ -79,9 +80,10 @@ class DBIter : public Iterator {
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_len);
if(!res)assert(0);
db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
// res=GetVarint64(&tmp_value,&valuelog_len);
// if(!res)assert(0);
// db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
return tmp_value;
}
Status status() const override {

+ 6
- 5
db/write_batch.cc View File

@ -145,12 +145,13 @@ class ValueLogInserter : public WriteBatch::Handler {
}
else{
buf+=(char)(0x01);
std::vector<Slice> v;
v.push_back(value);
auto res=db_->WriteValueLog(v);
std::vector<std::pair<Slice,Slice>> kv;
kv.push_back({key,value});
auto res=db_->WriteValueLog(kv);
PutVarint64(&buf,res[0].first);
PutVarint64(&buf,res[0].second.first);
PutVarint64(&buf,res[0].second.second);
// PutVarint64(&buf,res[0].second.first);
// PutVarint64(&buf,res[0].second.second);
PutVarint64(&buf,res[0].second);
}
new_value=Slice(buf);
writeBatch_.Put(key,new_value);

+ 15
- 3
include/leveldb/db.h View File

@ -102,19 +102,29 @@ class LEVELDB_EXPORT DB {
// virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys);
virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
// virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
// assert(0);
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
// return v;
// }
virtual std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<std::pair<Slice,Slice>> value){
assert(0);
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
std::vector<std::pair<uint64_t,uint64_t>> v;
return v;
}
virtual void addNewValueLog(){assert(0);}
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// assert(0); // Not implemented
// return Status::Corruption("not imp");
// }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){
assert(0); // Not implemented
return Status::Corruption("not imp");
}
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
@ -173,6 +183,8 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
virtual void TEST_GarbageCollect(){};
};
// Destroy the contents of the specified database.

+ 3
- 0
include/leveldb/options.h View File

@ -157,6 +157,8 @@ struct LEVELDB_EXPORT ReadOptions {
// Callers may wish to set this field to false for bulk scans.
bool fill_cache = true;
bool find_value_log_for_gc = false;
// If "snapshot" is non-null, read as of the supplied snapshot
// (which must belong to the DB that is being read and which must
// not have been released). If "snapshot" is null, use an implicit
@ -183,6 +185,7 @@ struct LEVELDB_EXPORT WriteOptions {
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
bool sync = false;
bool valuelog_write=false;
};
} // namespace leveldb

+ 1
- 1
include/leveldb/slice.h View File

@ -69,7 +69,7 @@ class LEVELDB_EXPORT Slice {
// Drop the first "n" bytes from this slice.
void remove_prefix(size_t n) {
if(n>size()){
if (n > size()) {
assert(0);
}
assert(n <= size());

+ 103
- 67
test/test.cpp View File

@ -86,79 +86,111 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st
return Status::OK();
}
TEST(Test, CheckGetFields) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
// TEST(Test, CheckGetFields) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::string key1 = "k_1";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
// FieldArray fields1 = {
// {"name", "Customer#000000001"},
// {"address", "IVhzIApeRb"},
// {"phone", "25-989-741-2988"}
// };
auto value1=SerializeValue(fields1);
// auto value1=SerializeValue(fields1);
db->Put(WriteOptions(), key1, value1);
// db->Put(WriteOptions(), key1, value1);
// 璇诲彇骞跺弽搴忓垪鍖?
std::string value_ret;
FieldArray res1;
// // 璇诲彇骞跺弽搴忓垪鍖?
// std::string value_ret;
// FieldArray res1;
db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1);
for(auto pr:res1){
std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
}
ASSERT_TRUE(CompareFieldArray(fields1, res1));
// db->Get(ReadOptions(), key1, &value_ret);
// DeserializeValue(value_ret, &res1);
// for(auto pr:res1){
// std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
// }
// ASSERT_TRUE(CompareFieldArray(fields1, res1));
db->Delete(WriteOptions(),key1);
// db->Delete(WriteOptions(),key1);
std::cout<<"get serialized value done"<<std::endl;
delete db;
}
TEST(Test, CheckSearchKey) {
DB *db;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> keys;
std::vector<std::string> target_keys;
for(int i=0;i<10000;i++){
std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
FieldArray fields={
{"name", key},
{"address", std::to_string(rand()%7)},
{"phone", std::to_string(rand()%114514)}
};
if(rand()%5==0){
fields[0].second="special_key";
target_keys.push_back(key);
}
keys.push_back(key);
db->Put(WriteOptions(),key,SerializeValue(fields));
}
std::sort(target_keys.begin(),target_keys.end());
std::vector<std::string> key_res;
Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
ASSERT_TRUE(CompareKey(key_res, target_keys));
std::cout<<"get key by field done"<<std::endl;
for(auto s:keys){
db->Delete(WriteOptions(),s);
}
delete db;
}
TEST(Test, LARGE_DATA_COMPACT_TEST) {
// std::cout<<"get serialized value done"<<std::endl;
// delete db;
// }
// TEST(Test, CheckSearchKey) {
// DB *db;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> keys;
// std::vector<std::string> target_keys;
// for(int i=0;i<10000;i++){
// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
// FieldArray fields={
// {"name", key},
// {"address", std::to_string(rand()%7)},
// {"phone", std::to_string(rand()%114514)}
// };
// if(rand()%5==0){
// fields[0].second="special_key";
// target_keys.push_back(key);
// }
// keys.push_back(key);
// db->Put(WriteOptions(),key,SerializeValue(fields));
// }
// std::sort(target_keys.begin(),target_keys.end());
// std::vector<std::string> key_res;
// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
// ASSERT_TRUE(CompareKey(key_res, target_keys));
// std::cout<<"get key by field done"<<std::endl;
// for(auto s:keys){
// db->Delete(WriteOptions(),s);
// }
// delete db;
// }
// TEST(Test, LARGE_DATA_COMPACT_TEST) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> values;
// for(int i=0;i<500000;i++){
// std::string key=std::to_string(i);
// std::string value;
// for(int j=0;j<1000;j++){
// value+=std::to_string(i);
// }
// values.push_back(value);
// db->Put(writeOptions,key,value);
// }
// for(int i=0;i<500000;i++){
// std::string key=std::to_string(i);
// std::string value;
// Status s=db->Get(readOptions,key,&value);
// assert(s.ok());
// if(values[i]!=value){
// std::cout<<value.size()<<std::endl;
// assert(0);
// }
// ASSERT_TRUE(values[i]==value);
// }
// delete db;
// }
TEST(Test, Garbage_Collect_TEST) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
@ -176,7 +208,12 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
values.push_back(value);
db->Put(writeOptions,key,value);
}
std::cout<<"start gc"<<std::endl;
db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<500000;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
@ -191,7 +228,6 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);

Loading…
Cancel
Save