|
|
@ -157,8 +157,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) |
|
|
|
valuelog_cache(NewLRUCache(config::mem_value_log_number)), |
|
|
|
versions_(new VersionSet(dbname_, &options_, table_cache_, |
|
|
|
&internal_comparator_)) { |
|
|
|
InitializeExistingLogs(); |
|
|
|
// std::cout<<"init map"<<std::endl;
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1052,15 +1050,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
} else { |
|
|
|
// Value is >= 100 bytes, read from external file
|
|
|
|
uint64_t file_id, valuelog_offset; |
|
|
|
std::string file_id_str = value.ToString().substr(1, 8); |
|
|
|
Slice file_id_slice(file_id_str); |
|
|
|
bool res = GetVarint64(&file_id_slice, &file_id); |
|
|
|
value.remove_prefix(1); |
|
|
|
bool res = GetVarint64(&value, &file_id); |
|
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
|
if(valuelog_origin[file_id] == 0){ |
|
|
|
if(valuelog_origin[file_id] == 0){//???
|
|
|
|
valuelog_origin[file_id] = valuelog_usage[file_id]; |
|
|
|
} |
|
|
|
valuelog_usage[file_id]--; |
|
|
|
// std::cout << "file_id: " << file_id << " usage: " << valuelog_usage[file_id] << std::endl;
|
|
|
|
} |
|
|
|
} |
|
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
@ -1148,9 +1144,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
} |
|
|
|
VersionSet::LevelSummaryStorage tmp; |
|
|
|
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); |
|
|
|
// for(int i=0;i<logfile_number_;i++){
|
|
|
|
// std::cout << "log file: " << i << " usage: " << valuelog_usage[i] << std::endl;
|
|
|
|
// }
|
|
|
|
return status; |
|
|
|
} |
|
|
|
|
|
|
@ -1181,7 +1174,6 @@ static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, |
|
|
|
SequenceNumber* latest_snapshot, |
|
|
|
uint32_t* seed) { |
|
|
|
mutex_.Lock(); |
|
|
|
*latest_snapshot = versions_->LastSequence(); |
|
|
|
|
|
|
|
// Collect together all needed child iterators
|
|
|
@ -1201,14 +1193,13 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, |
|
|
|
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); |
|
|
|
|
|
|
|
*seed = ++seed_; |
|
|
|
mutex_.Unlock(); |
|
|
|
return internal_iter; |
|
|
|
} |
|
|
|
|
|
|
|
Iterator* DBImpl::TEST_NewInternalIterator() { |
|
|
|
SequenceNumber ignored; |
|
|
|
uint32_t ignored_seed; |
|
|
|
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); |
|
|
|
return NewIterator(ReadOptions()); |
|
|
|
} |
|
|
|
|
|
|
|
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { |
|
|
@ -1289,6 +1280,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { |
|
|
|
SequenceNumber latest_snapshot; |
|
|
|
uint32_t seed; |
|
|
|
int iter_num=24; |
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
|
|
|
|
Iterator* iter_prefetch = NewInternalIterator(options, &latest_snapshot, &seed); |
|
|
|
auto db_iter_prefetch=NewDBIterator(this, user_comparator(), iter_prefetch, |
|
|
|
(options.snapshot != nullptr |
|
|
@ -1307,7 +1301,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { |
|
|
|
: latest_snapshot), |
|
|
|
seed); |
|
|
|
|
|
|
|
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
return NewPreFetchIterator(this,db_iter,db_iter_prefetch,iter_num); |
|
|
|
} |
|
|
|
|
|
|
@ -1631,87 +1626,75 @@ std::vector> DBImpl::WriteValueLog( |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
|
|
|
|
uint64_t offset = valueFile.tellg(); |
|
|
|
uint64_t init_offset = valueFile.tellg(); |
|
|
|
|
|
|
|
// 如果超出fixed_size
|
|
|
|
if(offset>=config::value_log_size){ |
|
|
|
int file_capacity=ReadFileSize(valuelogfile_number_); |
|
|
|
// std::cout<<"file_capacity: "<<file_capacity<<std::endl;
|
|
|
|
valuelog_usage[valuelogfile_number_]=file_capacity; |
|
|
|
valuelog_origin[valuelogfile_number_]=file_capacity; |
|
|
|
if(init_offset>=config::value_log_size){ |
|
|
|
valuelog_usage[valuelogfile_number_]=init_offset; |
|
|
|
valuelog_origin[valuelogfile_number_]=init_offset; |
|
|
|
addNewValueLog(); |
|
|
|
valueFile.close(); |
|
|
|
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
|
valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); |
|
|
|
if (!valueFile.is_open()) { |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
|
|
|
|
offset = valueFile.tellg(); |
|
|
|
init_offset = valueFile.tellg(); |
|
|
|
} |
|
|
|
|
|
|
|
uint64_t file_data_size = 0; // 文件数据大小标志位
|
|
|
|
valueFile.seekg(0, std::ios::beg); |
|
|
|
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
valueFile.clear(); // 清除错误状态
|
|
|
|
|
|
|
|
//update length first
|
|
|
|
file_data_size+=kv.size(); |
|
|
|
valueFile.seekp(0, std::ios::beg); // 移动到文件开头
|
|
|
|
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
assert(valueFile.good()); |
|
|
|
|
|
|
|
valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入
|
|
|
|
// std::cout<<"file_data_size: "<<file_data_size<<std::endl;
|
|
|
|
|
|
|
|
std::vector<std::pair<uint64_t, uint64_t>> res; |
|
|
|
|
|
|
|
for (const auto& [key_slice, value_slice] : kv) { |
|
|
|
// 写入 value 的长度
|
|
|
|
uint64_t value_len = value_slice.size(); |
|
|
|
valueFile.write(reinterpret_cast<const char*>(&value_len), |
|
|
|
sizeof(uint64_t)); |
|
|
|
if (!valueFile.good()) { |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
int total_size=0; |
|
|
|
total_size+=sizeof(uint64_t)*2*kv.size(); |
|
|
|
for(const auto &pr:kv){ |
|
|
|
total_size+=pr.first.size()+pr.second.size(); |
|
|
|
} |
|
|
|
|
|
|
|
char buf[total_size]; |
|
|
|
|
|
|
|
uint64_t offset=0; |
|
|
|
for (const auto& pr:kv) { |
|
|
|
|
|
|
|
// 记录 file_id 和 offset
|
|
|
|
res.push_back({valuelogfile_number_, init_offset+offset}); |
|
|
|
|
|
|
|
auto key=pr.first,value=pr.second; |
|
|
|
|
|
|
|
// 写入 value 的长度
|
|
|
|
uint64_t value_len = value.size(); |
|
|
|
memcpy(buf+offset,&value_len,sizeof(uint64_t)); |
|
|
|
offset+=sizeof(uint64_t); |
|
|
|
// 写入 value 本身
|
|
|
|
valueFile.write(value_slice.data(), value_len); |
|
|
|
if (!valueFile.good()) { |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
memcpy(buf+offset,value.data(),value_len); |
|
|
|
offset+=value_len; |
|
|
|
|
|
|
|
// 写入 key 的长度
|
|
|
|
uint64_t key_len = key_slice.size(); |
|
|
|
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t)); |
|
|
|
if (!valueFile.good()) { |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
uint64_t key_len = key.size(); |
|
|
|
memcpy(buf+offset,&key_len,sizeof(uint64_t)); |
|
|
|
offset+=sizeof(uint64_t); |
|
|
|
|
|
|
|
// 写入 key 本身
|
|
|
|
valueFile.write(key_slice.data(), key_len); |
|
|
|
if (!valueFile.good()) { |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
memcpy(buf+offset,key.data(),key_len); |
|
|
|
offset+=key_len; |
|
|
|
|
|
|
|
// 更新文件数据大小
|
|
|
|
file_data_size ++; |
|
|
|
// 记录 file_id 和 offset
|
|
|
|
res.push_back({valuelogfile_number_, offset}); |
|
|
|
|
|
|
|
// 更新偏移量
|
|
|
|
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; |
|
|
|
} |
|
|
|
|
|
|
|
// 在所有数据写入后,将更新的数据大小写回文件开头
|
|
|
|
if (!res.empty()) { |
|
|
|
valueFile.seekp(0, std::ios::beg); // 移动到文件开头
|
|
|
|
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
if (!valueFile.good()) { |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
} |
|
|
|
else{ |
|
|
|
valueFile.close(); |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
valueFile.write(buf,total_size); |
|
|
|
assert(valueFile.good()); |
|
|
|
|
|
|
|
// 解锁资源或进行其他清理操作
|
|
|
|
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
|
|
|
@ -1816,12 +1799,10 @@ void DBImpl::GarbageCollect() { |
|
|
|
continue; |
|
|
|
} |
|
|
|
auto tmp_name = ValueLogFileName(dbname_, cur_log_number); |
|
|
|
// std::cout <<cur_log_number<<" "<<valuelog_usage[cur_log_number] << " " << valuelog_origin[cur_log_number] << std::endl;
|
|
|
|
|
|
|
|
if (!versions_->checkOldValueLog(tmp_name) && |
|
|
|
valuelog_origin[cur_log_number]) { |
|
|
|
if ((float)(valuelog_usage[cur_log_number] / |
|
|
|
valuelog_origin[cur_log_number]) <= 0.6) { |
|
|
|
if (((float)valuelog_usage[cur_log_number]) / |
|
|
|
valuelog_origin[cur_log_number] <= 0.6) { |
|
|
|
valuelog_set.emplace(filename); |
|
|
|
} |
|
|
|
} |
|
|
@ -1889,12 +1870,7 @@ void DBImpl::GarbageCollect() { |
|
|
|
cur_valuelog.seekg(current_offset); |
|
|
|
char* key_buf = new char[key_len]; |
|
|
|
cur_valuelog.read(key_buf, key_len); |
|
|
|
if (!cur_valuelog.good()) { |
|
|
|
delete[] key_buf; |
|
|
|
cur_valuelog.close(); |
|
|
|
std::cerr << "2Failed to read file: " << valuelog_name << std::endl; |
|
|
|
break; |
|
|
|
} |
|
|
|
assert(cur_valuelog.good()); |
|
|
|
current_offset += key_len; |
|
|
|
// std::cout << cnt <<" "<<current_offset<< std::endl;
|
|
|
|
|
|
|
@ -1925,12 +1901,12 @@ void DBImpl::GarbageCollect() { |
|
|
|
// Key 不存在,忽略此记录
|
|
|
|
continue; |
|
|
|
} |
|
|
|
else if (!status.ok()) { |
|
|
|
assert(0); |
|
|
|
} |
|
|
|
else if(stored_value.data()[0]==(char)(0x00)){ |
|
|
|
//value is too small
|
|
|
|
continue; |
|
|
|
else{ |
|
|
|
assert(status.ok()); |
|
|
|
if(stored_value.data()[0]==(char)(0x00)){ |
|
|
|
//value is too small
|
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 检查 valuelog_id 和 offset 是否匹配
|
|
|
@ -1947,13 +1923,7 @@ void DBImpl::GarbageCollect() { |
|
|
|
cur_valuelog.seekg(tmp_offset+sizeof(uint64_t)); |
|
|
|
char* value_buf = new char[val_len]; |
|
|
|
cur_valuelog.read(value_buf, val_len); |
|
|
|
if (!cur_valuelog.good()) { |
|
|
|
delete[] key_buf; |
|
|
|
delete[] value_buf; |
|
|
|
cur_valuelog.close(); |
|
|
|
std::cerr << "4Failed to read file: " << valuelog_name << std::endl; |
|
|
|
break; |
|
|
|
} |
|
|
|
assert(cur_valuelog.good()); |
|
|
|
|
|
|
|
// Assign the read value data to the Slice
|
|
|
|
value = Slice(value_buf, val_len); |
|
|
@ -1968,11 +1938,7 @@ void DBImpl::GarbageCollect() { |
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
|
|
if (!status.ok()) { |
|
|
|
std::cerr << "Error accessing sstable: " << status.ToString() |
|
|
|
<< std::endl; |
|
|
|
continue; |
|
|
|
} |
|
|
|
assert(status.ok()); |
|
|
|
|
|
|
|
delete value_buf; |
|
|
|
delete key_buf; |
|
|
@ -2028,7 +1994,6 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
|
|
|
impl->log_ = new log::Writer(lfile); |
|
|
|
impl->mem_ = new MemTable(impl->internal_comparator_); |
|
|
|
impl->mem_->Ref(); |
|
|
|
impl->addNewValueLog(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (s.ok() && save_manifest) { |
|
|
@ -2039,6 +2004,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
|
|
|
if (s.ok()) { |
|
|
|
impl->RemoveObsoleteFiles(); |
|
|
|
impl->MaybeScheduleCompaction(); |
|
|
|
impl->InitializeExistingLogs(); |
|
|
|
impl->addNewValueLog(); |
|
|
|
} |
|
|
|
impl->mutex_.Unlock(); |
|
|
|
if (s.ok()) { |
|
|
@ -2100,11 +2067,29 @@ void DBImpl::InitializeExistingLogs() { |
|
|
|
if (IsValueLogFile(filename)) { |
|
|
|
uint64_t cur_log_number = GetValueLogID(filename); |
|
|
|
uint64_t file_data_size = ReadFileSize(cur_log_number); |
|
|
|
valuelog_usage.emplace(cur_log_number,file_data_size); |
|
|
|
// std::cout << "cur_log_number: " << cur_log_number << " file_data_size: " << file_data_size << std::endl;
|
|
|
|
valuelog_origin[cur_log_number]=file_data_size; |
|
|
|
valuelog_usage[cur_log_number]=0; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
SequenceNumber latest_snapshot; |
|
|
|
uint32_t seed; |
|
|
|
int iter_num=24; |
|
|
|
Iterator* iter= NewInternalIterator(ReadOptions(), &latest_snapshot, &seed); |
|
|
|
auto db_iter=NewDBIterator(this, user_comparator(), iter,latest_snapshot,seed); |
|
|
|
for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){ |
|
|
|
auto value=db_iter->value(); |
|
|
|
if(value.size()&&value[0]==0x01){ |
|
|
|
value.remove_prefix(1); |
|
|
|
uint64_t valuelog_id=*(uint64_t*)value.data(); |
|
|
|
assert(valuelog_usage.count(valuelog_id)); |
|
|
|
valuelog_usage[valuelog_id]++; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
mutex_.Unlock(); |
|
|
|
delete db_iter; |
|
|
|
mutex_.Lock(); |
|
|
|
} |
|
|
|
|
|
|
|
// 读取单个文件的 file_data_size
|
|
|
|