|
@ -154,7 +154,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) |
|
|
background_garbage_collect_scheduled_(false), |
|
|
background_garbage_collect_scheduled_(false), |
|
|
manual_compaction_(nullptr), |
|
|
manual_compaction_(nullptr), |
|
|
versions_(new VersionSet(dbname_, &options_, table_cache_, |
|
|
versions_(new VersionSet(dbname_, &options_, table_cache_, |
|
|
&internal_comparator_)) {} |
|
|
|
|
|
|
|
|
&internal_comparator_)) { |
|
|
|
|
|
InitializeExistingLogs(); |
|
|
|
|
|
// std::cout<<"init map"<<std::endl;
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
DBImpl::~DBImpl() { |
|
|
DBImpl::~DBImpl() { |
|
|
// Wait for background work to finish.
|
|
|
// Wait for background work to finish.
|
|
@ -1036,6 +1039,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
if (last_sequence_for_key <= compact->smallest_snapshot) { |
|
|
if (last_sequence_for_key <= compact->smallest_snapshot) { |
|
|
// Hidden by an newer entry for same user key
|
|
|
// Hidden by an newer entry for same user key
|
|
|
drop = true; // (A)
|
|
|
drop = true; // (A)
|
|
|
|
|
|
// Parse the value based on its first character
|
|
|
|
|
|
if(ikey.type != kTypeDeletion){ |
|
|
|
|
|
Slice value = input->value(); |
|
|
|
|
|
char type = value[0]; |
|
|
|
|
|
if (type == 0x00) { |
|
|
|
|
|
// Value is less than 100 bytes, use it directly
|
|
|
|
|
|
} else { |
|
|
|
|
|
// Value is >= 100 bytes, read from external file
|
|
|
|
|
|
uint64_t file_id, valuelog_offset; |
|
|
|
|
|
std::string file_id_str = value.ToString().substr(1, 8); |
|
|
|
|
|
Slice file_id_slice(file_id_str); |
|
|
|
|
|
bool res = GetVarint64(&file_id_slice, &file_id); |
|
|
|
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
|
|
|
if(valuelog_origin[file_id] == 0){ |
|
|
|
|
|
valuelog_origin[file_id] = valuelog_usage[file_id]; |
|
|
|
|
|
} |
|
|
|
|
|
valuelog_usage[file_id]--; |
|
|
|
|
|
// std::cout << "file_id: " << file_id << " usage: " << valuelog_usage[file_id] << std::endl;
|
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
ikey.sequence <= compact->smallest_snapshot && |
|
|
ikey.sequence <= compact->smallest_snapshot && |
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
|
@ -1072,7 +1095,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
if (compact->builder->NumEntries() == 0) { |
|
|
if (compact->builder->NumEntries() == 0) { |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
} |
|
|
} |
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
|
|
|
|
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
compact->builder->Add(key, input->value()); |
|
|
compact->builder->Add(key, input->value()); |
|
|
|
|
|
|
|
|
// Close output file if it is big enough
|
|
|
// Close output file if it is big enough
|
|
@ -1084,7 +1107,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
input->Next(); |
|
|
input->Next(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -1122,6 +1144,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
} |
|
|
} |
|
|
VersionSet::LevelSummaryStorage tmp; |
|
|
VersionSet::LevelSummaryStorage tmp; |
|
|
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); |
|
|
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); |
|
|
|
|
|
// for(int i=0;i<logfile_number_;i++){
|
|
|
|
|
|
// std::cout << "log file: " << i << " usage: " << valuelog_usage[i] << std::endl;
|
|
|
|
|
|
// }
|
|
|
return status; |
|
|
return status; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -1242,13 +1267,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
} |
|
|
} |
|
|
Slice value_log_slice = Slice(value->c_str() + 1, value->length()); |
|
|
Slice value_log_slice = Slice(value->c_str() + 1, value->length()); |
|
|
Slice new_value; |
|
|
Slice new_value; |
|
|
int value_offset = sizeof(uint64_t) * 2; // 16
|
|
|
|
|
|
uint64_t file_id, valuelog_offset; |
|
|
uint64_t file_id, valuelog_offset; |
|
|
bool res = GetVarint64(&value_log_slice, &file_id); |
|
|
bool res = GetVarint64(&value_log_slice, &file_id); |
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
res = GetVarint64(&value_log_slice, &valuelog_offset); |
|
|
res = GetVarint64(&value_log_slice, &valuelog_offset); |
|
|
if (!res) return Status::Corruption("can't decode valuelog offset"); |
|
|
if (!res) return Status::Corruption("can't decode valuelog offset"); |
|
|
// std::cout<<"file_id: "<<file_id<<", valuelog_offset: "<<valuelog_offset<<std::endl;
|
|
|
|
|
|
s = ReadValueLog(file_id, valuelog_offset, &new_value); |
|
|
s = ReadValueLog(file_id, valuelog_offset, &new_value); |
|
|
if (!s.ok()) { |
|
|
if (!s.ok()) { |
|
|
return s; |
|
|
return s; |
|
@ -1597,6 +1620,10 @@ std::vector> DBImpl::WriteValueLog( |
|
|
|
|
|
|
|
|
// 如果超出fixed_size
|
|
|
// 如果超出fixed_size
|
|
|
if(offset>=config::value_log_size){ |
|
|
if(offset>=config::value_log_size){ |
|
|
|
|
|
int file_capacity=ReadFileSize(valuelogfile_number_); |
|
|
|
|
|
// std::cout<<"file_capacity: "<<file_capacity<<std::endl;
|
|
|
|
|
|
valuelog_usage[valuelogfile_number_]=file_capacity; |
|
|
|
|
|
valuelog_origin[valuelogfile_number_]=file_capacity; |
|
|
addNewValueLog(); |
|
|
addNewValueLog(); |
|
|
valueFile.close(); |
|
|
valueFile.close(); |
|
|
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
@ -1618,8 +1645,6 @@ std::vector> DBImpl::WriteValueLog( |
|
|
std::vector<std::pair<uint64_t, uint64_t>> res; |
|
|
std::vector<std::pair<uint64_t, uint64_t>> res; |
|
|
|
|
|
|
|
|
for (const auto& [key_slice, value_slice] : kv) { |
|
|
for (const auto& [key_slice, value_slice] : kv) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 写入 value 的长度
|
|
|
// 写入 value 的长度
|
|
|
uint64_t value_len = value_slice.size(); |
|
|
uint64_t value_len = value_slice.size(); |
|
|
valueFile.write(reinterpret_cast<const char*>(&value_len), |
|
|
valueFile.write(reinterpret_cast<const char*>(&value_len), |
|
@ -1652,7 +1677,7 @@ std::vector> DBImpl::WriteValueLog( |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 更新文件数据大小
|
|
|
// 更新文件数据大小
|
|
|
file_data_size += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; |
|
|
|
|
|
|
|
|
file_data_size ++; |
|
|
// 记录 file_id 和 offset
|
|
|
// 记录 file_id 和 offset
|
|
|
res.push_back({valuelogfile_number_, offset}); |
|
|
res.push_back({valuelogfile_number_, offset}); |
|
|
// 更新偏移量
|
|
|
// 更新偏移量
|
|
@ -1758,7 +1783,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, |
|
|
// 垃圾回收实现
|
|
|
// 垃圾回收实现
|
|
|
void DBImpl::GarbageCollect() { |
|
|
void DBImpl::GarbageCollect() { |
|
|
// 遍历数据库目录,找到所有 valuelog 文件
|
|
|
// 遍历数据库目录,找到所有 valuelog 文件
|
|
|
|
|
|
|
|
|
std::vector<std::string> filenames; |
|
|
std::vector<std::string> filenames; |
|
|
Status s = env_->GetChildren(dbname_, &filenames); |
|
|
Status s = env_->GetChildren(dbname_, &filenames); |
|
|
Log(options_.info_log, "start gc "); |
|
|
Log(options_.info_log, "start gc "); |
|
@ -1767,11 +1791,24 @@ void DBImpl::GarbageCollect() { |
|
|
for (const auto& filename:filenames) { |
|
|
for (const auto& filename:filenames) { |
|
|
if (IsValueLogFile(filename)){ |
|
|
if (IsValueLogFile(filename)){ |
|
|
uint64_t cur_log_number = GetValueLogID(filename); |
|
|
uint64_t cur_log_number = GetValueLogID(filename); |
|
|
|
|
|
if (cur_log_number == valuelogfile_number_) { |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
auto tmp_name = ValueLogFileName(dbname_, cur_log_number); |
|
|
auto tmp_name = ValueLogFileName(dbname_, cur_log_number); |
|
|
|
|
|
|
|
|
if(!versions_->checkOldValueLog(tmp_name))valuelog_set.emplace(filename); |
|
|
|
|
|
|
|
|
// std::cout <<cur_log_number<<" "<<valuelog_usage[cur_log_number] << " " << valuelog_origin[cur_log_number] << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
if (!versions_->checkOldValueLog(tmp_name) && |
|
|
|
|
|
valuelog_origin[cur_log_number]) { |
|
|
|
|
|
if ((float)(valuelog_usage[cur_log_number] / |
|
|
|
|
|
valuelog_origin[cur_log_number]) <= 0.6) { |
|
|
|
|
|
valuelog_set.emplace(filename); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
// std::cout << "valuelog_set size: " << valuelog_set.size() << std::endl;
|
|
|
|
|
|
Log(options_.info_log, "valuelog_set size: %d", valuelog_set.size()); |
|
|
|
|
|
|
|
|
//bool tmp_judge=false;//only clean one file
|
|
|
//bool tmp_judge=false;//only clean one file
|
|
|
for (std::string valuelog_name : valuelog_set) { |
|
|
for (std::string valuelog_name : valuelog_set) { |
|
|
Log(options_.info_log, ("gc processing: "+valuelog_name).data()); |
|
|
Log(options_.info_log, ("gc processing: "+valuelog_name).data()); |
|
@ -2064,4 +2101,43 @@ Status DestroyDB(const std::string& dbname, const Options& options) { |
|
|
return result; |
|
|
return result; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 读取所有现有日志文件的 file_data_size
|
|
|
|
|
|
void DBImpl::InitializeExistingLogs() { |
|
|
|
|
|
std::vector<std::string> filenames; |
|
|
|
|
|
Status s = env_->GetChildren(dbname_, &filenames); |
|
|
|
|
|
Log(options_.info_log, "start set file map "); |
|
|
|
|
|
assert(s.ok()); |
|
|
|
|
|
std::set<std::string> valuelog_set; |
|
|
|
|
|
for (const auto& filename : filenames) { |
|
|
|
|
|
if (IsValueLogFile(filename)) { |
|
|
|
|
|
uint64_t cur_log_number = GetValueLogID(filename); |
|
|
|
|
|
uint64_t file_data_size = ReadFileSize(cur_log_number); |
|
|
|
|
|
valuelog_usage.emplace(cur_log_number,file_data_size); |
|
|
|
|
|
// std::cout << "cur_log_number: " << cur_log_number << " file_data_size: " << file_data_size << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 读取单个文件的 file_data_size
|
|
|
|
|
|
uint64_t DBImpl::ReadFileSize(uint64_t log_number) { |
|
|
|
|
|
auto file_name = ValueLogFileName(dbname_, log_number); |
|
|
|
|
|
std::ifstream valueFile(file_name, std::ios::in | std::ios::binary); |
|
|
|
|
|
if (!valueFile.is_open()) { |
|
|
|
|
|
std::cerr << "Failed to open file: " << file_name << std::endl; |
|
|
|
|
|
return 0; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
uint64_t file_data_size = 0; |
|
|
|
|
|
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
|
|
if (valueFile.fail() || valueFile.bad()) { |
|
|
|
|
|
std::cerr << "Failed to read data size from file: " << file_name |
|
|
|
|
|
<< std::endl; |
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
return 0; |
|
|
|
|
|
} |
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
return file_data_size; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
} // namespace leveldb
|
|
|
} // namespace leveldb
|