|
@ -1249,6 +1249,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
if (!res) return Status::Corruption("can't decode file id"); |
|
|
res = GetVarint64(&value_log_slice, &valuelog_offset); |
|
|
res = GetVarint64(&value_log_slice, &valuelog_offset); |
|
|
if (!res) return Status::Corruption("can't decode valuelog offset"); |
|
|
if (!res) return Status::Corruption("can't decode valuelog offset"); |
|
|
|
|
|
// std::cout<<"file_id: "<<file_id<<", valuelog_offset: "<<valuelog_offset<<std::endl;
|
|
|
s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); |
|
|
s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); |
|
|
if (!s.ok()) { |
|
|
if (!s.ok()) { |
|
|
return s; |
|
|
return s; |
|
@ -1588,24 +1589,33 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { |
|
|
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog( |
|
|
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog( |
|
|
std::vector<std::pair<Slice, Slice>> kv) { |
|
|
std::vector<std::pair<Slice, Slice>> kv) { |
|
|
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); |
|
|
|
|
|
|
|
|
std::fstream valueFile(file_name_, std::ios::inan> pan class="o">|pan> std::ios::out | std::ios::binary); |
|
|
if (!valueFile.is_open()) { |
|
|
if (!valueFile.is_open()) { |
|
|
assert(0); |
|
|
assert(0); |
|
|
} |
|
|
} |
|
|
|
|
|
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
|
|
|
|
|
|
uint64_t offset = valueFile.tellg(); |
|
|
|
|
|
|
|
|
uint64_t offset = valueFile.tellp(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 如果超出fixed_size
|
|
|
if(offset>=config::value_log_size){ |
|
|
if(offset>=config::value_log_size){ |
|
|
addNewValueLog(); |
|
|
addNewValueLog(); |
|
|
valueFile.close(); |
|
|
valueFile.close(); |
|
|
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
valueFile =std::ofstream(file_name_, std::ios::app | std::ios::binary); |
|
|
|
|
|
|
|
|
valueFile =std::fstream(file_name_, std::ios::inan> pan class="o">|pan> std::ios::out | std::ios::binary); |
|
|
if (!valueFile.is_open()) { |
|
|
if (!valueFile.is_open()) { |
|
|
assert(0); |
|
|
assert(0); |
|
|
} |
|
|
} |
|
|
offset = valueFile.tellp(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
|
|
|
|
|
|
offset = valueFile.tellg(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
uint64_t file_data_size = 0; // 文件数据大小标志位
|
|
|
|
|
|
valueFile.seekg(0, std::ios::beg); |
|
|
|
|
|
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
|
|
valueFile.clear(); // 清除错误状态
|
|
|
|
|
|
valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入
|
|
|
|
|
|
// std::cout<<"file_data_size: "<<file_data_size<<std::endl;
|
|
|
|
|
|
|
|
|
std::vector<std::pair<uint64_t, uint64_t>> res; |
|
|
std::vector<std::pair<uint64_t, uint64_t>> res; |
|
|
|
|
|
|
|
|
for (const auto& [key_slice, value_slice] : kv) { |
|
|
for (const auto& [key_slice, value_slice] : kv) { |
|
@ -1640,15 +1650,30 @@ std::vector> DBImpl::WriteValueLog( |
|
|
assert(0); |
|
|
assert(0); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 更新文件数据大小
|
|
|
|
|
|
file_data_size += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; |
|
|
// 记录 file_id 和 offset
|
|
|
// 记录 file_id 和 offset
|
|
|
res.push_back({valuelogfile_number_, offset}); |
|
|
res.push_back({valuelogfile_number_, offset}); |
|
|
|
|
|
|
|
|
// 更新偏移量
|
|
|
// 更新偏移量
|
|
|
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; |
|
|
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 解锁资源或进行其他清理操作
|
|
|
|
|
|
|
|
|
// 在所有数据写入后,将更新的数据大小写回文件开头
|
|
|
|
|
|
if (!res.empty()) { |
|
|
|
|
|
valueFile.seekp(0, std::ios::beg); // 移动到文件开头
|
|
|
|
|
|
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
|
|
if (!valueFile.good()) { |
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
assert(0); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
else{ |
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
assert(0); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 解锁资源或进行其他清理操作
|
|
|
|
|
|
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
|
|
|
valueFile.close(); |
|
|
valueFile.close(); |
|
|
return res; |
|
|
return res; |
|
|
} |
|
|
} |
|
@ -1656,6 +1681,28 @@ std::vector> DBImpl::WriteValueLog( |
|
|
|
|
|
|
|
|
void DBImpl::addNewValueLog() { |
|
|
void DBImpl::addNewValueLog() { |
|
|
valuelogfile_number_ = versions_->NewFileNumber(); |
|
|
valuelogfile_number_ = versions_->NewFileNumber(); |
|
|
|
|
|
|
|
|
|
|
|
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); |
|
|
|
|
|
std::fstream valueFile(file_name_, std::ios::app | std::ios::binary); |
|
|
|
|
|
if (!valueFile.is_open()) { |
|
|
|
|
|
assert(0); |
|
|
|
|
|
} |
|
|
|
|
|
uint64_t file_data_size = 0; // 新增的文件数据大小标志位
|
|
|
|
|
|
if (valueFile.tellp() != 0) { |
|
|
|
|
|
assert(0); |
|
|
|
|
|
} |
|
|
|
|
|
else{ |
|
|
|
|
|
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t)); |
|
|
|
|
|
if (!valueFile.good()) { |
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
assert(0); |
|
|
|
|
|
} |
|
|
|
|
|
else{ |
|
|
|
|
|
// 正常关闭文件
|
|
|
|
|
|
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
|
|
|
|
|
|
valueFile.close(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, |
|
|
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, |
|
@ -1769,8 +1816,9 @@ void DBImpl::GarbageCollect() { |
|
|
continue; |
|
|
continue; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
uint64_t current_offset = 0; |
|
|
|
|
|
uint64_t tmp_offset = 0; |
|
|
|
|
|
|
|
|
// 初始化offset为占用大小
|
|
|
|
|
|
uint64_t current_offset = sizeof(uint64_t); |
|
|
|
|
|
uint64_t tmp_offset = current_offset; |
|
|
|
|
|
|
|
|
int cnt = 0; |
|
|
int cnt = 0; |
|
|
|
|
|
|
|
@ -1810,11 +1858,12 @@ void DBImpl::GarbageCollect() { |
|
|
if (!cur_valuelog.good()) { |
|
|
if (!cur_valuelog.good()) { |
|
|
delete[] key_buf_len; |
|
|
delete[] key_buf_len; |
|
|
cur_valuelog.close(); |
|
|
cur_valuelog.close(); |
|
|
std::cerr << "Failed to read file: " << valuelog_name << std::endl; |
|
|
|
|
|
|
|
|
std::cerr << "1Failed to read file: " << valuelog_name << std::endl; |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
// 更新当前偏移
|
|
|
// 更新当前偏移
|
|
|
current_offset += sizeof(uint64_t); |
|
|
current_offset += sizeof(uint64_t); |
|
|
|
|
|
// std::cout << cnt <<" "<<current_offset<< std::endl;
|
|
|
|
|
|
|
|
|
// Now seek to the actual key position and read the key
|
|
|
// Now seek to the actual key position and read the key
|
|
|
cur_valuelog.seekg(current_offset); |
|
|
cur_valuelog.seekg(current_offset); |
|
@ -1824,10 +1873,11 @@ void DBImpl::GarbageCollect() { |
|
|
delete[] key_buf; |
|
|
delete[] key_buf; |
|
|
delete[] key_buf_len; |
|
|
delete[] key_buf_len; |
|
|
cur_valuelog.close(); |
|
|
cur_valuelog.close(); |
|
|
std::cerr << "Failed to read file: " << valuelog_name << std::endl; |
|
|
|
|
|
|
|
|
std::cerr << "2Failed to read file: " << valuelog_name << std::endl; |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
current_offset += key_len; |
|
|
current_offset += key_len; |
|
|
|
|
|
// std::cout << cnt <<" "<<current_offset<< std::endl;
|
|
|
|
|
|
|
|
|
// Assign the read key data to the Slice
|
|
|
// Assign the read key data to the Slice
|
|
|
key = Slice(key_buf, key_len); |
|
|
key = Slice(key_buf, key_len); |
|
@ -1844,26 +1894,15 @@ void DBImpl::GarbageCollect() { |
|
|
delete[] key_buf_len; |
|
|
delete[] key_buf_len; |
|
|
delete[] value_buf_len; |
|
|
delete[] value_buf_len; |
|
|
cur_valuelog.close(); |
|
|
cur_valuelog.close(); |
|
|
std::cerr << "Failed to read file: " << valuelog_name << std::endl; |
|
|
|
|
|
|
|
|
std::cerr << "3Failed to read file: " << valuelog_name << std::endl; |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
// 更新当前偏移
|
|
|
// 更新当前偏移
|
|
|
current_offset += sizeof(uint64_t); |
|
|
current_offset += sizeof(uint64_t); |
|
|
|
|
|
// std::cout << cnt <<" "<<current_offset<< std::endl;
|
|
|
|
|
|
|
|
|
// // Now seek to the actual data position and read the value
|
|
|
|
|
|
// cur_valuelog.seekg(current_offset);
|
|
|
|
|
|
// char* value_buf = new char[val_len];
|
|
|
|
|
|
// cur_valuelog.read(value_buf, val_len);
|
|
|
|
|
|
// if (!cur_valuelog.good()) {
|
|
|
|
|
|
// delete[] key_buf;
|
|
|
|
|
|
// delete[] key_buf_len;
|
|
|
|
|
|
// delete[] value_buf_len;
|
|
|
|
|
|
// delete[] value_buf;
|
|
|
|
|
|
// cur_valuelog.close();
|
|
|
|
|
|
// std::cerr << "Failed to read file: " << valuelog_name << std::endl;
|
|
|
|
|
|
// break;
|
|
|
|
|
|
// }
|
|
|
|
|
|
current_offset += val_len; |
|
|
current_offset += val_len; |
|
|
|
|
|
// std::cout << cnt <<" "<<current_offset<< std::endl;
|
|
|
|
|
|
|
|
|
// // Assign the read value data to the Slice
|
|
|
// // Assign the read value data to the Slice
|
|
|
// value = Slice(value_buf, val_len);
|
|
|
// value = Slice(value_buf, val_len);
|
|
@ -1923,7 +1962,7 @@ void DBImpl::GarbageCollect() { |
|
|
delete[] value_buf_len; |
|
|
delete[] value_buf_len; |
|
|
delete[] value_buf; |
|
|
delete[] value_buf; |
|
|
cur_valuelog.close(); |
|
|
cur_valuelog.close(); |
|
|
std::cerr << "Failed to read file: " << valuelog_name << std::endl; |
|
|
|
|
|
|
|
|
std::cerr << "4Failed to read file: " << valuelog_name << std::endl; |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|