Browse Source

merge bufferpool & meta info for valuelog

pull/3/head
dgy 9 months ago
parent
commit
bccbab115a
5 changed files with 203 additions and 126 deletions
  1. +184
    -114
      db/db_impl.cc
  2. +10
    -2
      db/db_impl.h
  3. +2
    -3
      db/prefetch_iter.cc
  4. +1
    -1
      include/leveldb/db.h
  5. +6
    -6
      test/test.cpp

+ 184
- 114
db/db_impl.cc View File

@ -155,7 +155,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_garbage_collect_scheduled_(false), background_garbage_collect_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
&internal_comparator_)) {
InitializeExistingLogs();
// std::cout<<"init map"<<std::endl;
}
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
// Wait for background work to finish. // Wait for background work to finish.
@ -1037,6 +1040,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (last_sequence_for_key <= compact->smallest_snapshot) { if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key // Hidden by an newer entry for same user key
drop = true; // (A) drop = true; // (A)
// Parse the value based on its first character
if(ikey.type != kTypeDeletion){
Slice value = input->value();
char type = value[0];
if (type == 0x00) {
// Value is less than 100 bytes, use it directly
} else {
// Value is >= 100 bytes, read from external file
uint64_t file_id, valuelog_offset;
std::string file_id_str = value.ToString().substr(1, 8);
Slice file_id_slice(file_id_str);
bool res = GetVarint64(&file_id_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id");
if(valuelog_origin[file_id] == 0){
valuelog_origin[file_id] = valuelog_usage[file_id];
}
valuelog_usage[file_id]--;
// std::cout << "file_id: " << file_id << " usage: " << valuelog_usage[file_id] << std::endl;
}
}
} else if (ikey.type == kTypeDeletion && } else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot && ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
@ -1073,7 +1096,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (compact->builder->NumEntries() == 0) { if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key); compact->current_output()->smallest.DecodeFrom(key);
} }
compact->current_output()->largest.DecodeFrom(key);
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value()); compact->builder->Add(key, input->value());
// Close output file if it is big enough // Close output file if it is big enough
@ -1085,7 +1108,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
} }
} }
input->Next(); input->Next();
} }
@ -1123,6 +1145,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
// for(int i=0;i<logfile_number_;i++){
// std::cout << "log file: " << i << " usage: " << valuelog_usage[i] << std::endl;
// }
return status; return status;
} }
@ -1242,8 +1267,6 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
return s; return s;
} }
Slice value_log_slice = Slice(value->c_str() + 1, value->length()); Slice value_log_slice = Slice(value->c_str() + 1, value->length());
Slice new_key;
int value_offset = sizeof(uint64_t) * 2; // 16
uint64_t file_id, valuelog_offset; uint64_t file_id, valuelog_offset;
bool res = GetVarint64(&value_log_slice, &file_id); bool res = GetVarint64(&value_log_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id"); if (!res) return Status::Corruption("can't decode file id");
@ -1252,7 +1275,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
{ {
mutex_.Unlock(); mutex_.Unlock();
s = ReadValueLog(file_id, valuelog_offset, &new_key, value);
s = ReadValueLog(file_id, valuelog_offset, value);
mutex_.Lock(); mutex_.Lock();
} }
@ -1603,67 +1626,95 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog( std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(
std::vector<std::pair<Slice, Slice>> kv) { std::vector<std::pair<Slice, Slice>> kv) {
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
std::fstream valueFile(file_name_, std::ios::inan> pan class="o">|pan> std::ios::out | std::ios::binary);
if (!valueFile.is_open()) { if (!valueFile.is_open()) {
assert(0); assert(0);
} }
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
uint64_t offset = valueFile.tellg();
uint64_t offset = valueFile.tellp();
// 如果超出fixed_size
if(offset>=config::value_log_size){ if(offset>=config::value_log_size){
int file_capacity=ReadFileSize(valuelogfile_number_);
// std::cout<<"file_capacity: "<<file_capacity<<std::endl;
valuelog_usage[valuelogfile_number_]=file_capacity;
valuelog_origin[valuelogfile_number_]=file_capacity;
addNewValueLog(); addNewValueLog();
valueFile.close(); valueFile.close();
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
valueFile =std::ofstream(file_name_, std::ios::app | std::ios::binary);
valueFile =std::fstream(file_name_, std::ios::inan> pan class="o">|pan> std::ios::out | std::ios::binary);
if (!valueFile.is_open()) { if (!valueFile.is_open()) {
assert(0); assert(0);
} }
offset = valueFile.tellp();
valueFile.seekg(0, std::ios::end); // 移动到文件末尾
offset = valueFile.tellg();
} }
uint64_t file_data_size = 0; // 文件数据大小标志位
valueFile.seekg(0, std::ios::beg);
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
valueFile.clear(); // 清除错误状态
valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入
// std::cout<<"file_data_size: "<<file_data_size<<std::endl;
std::vector<std::pair<uint64_t, uint64_t>> res; std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& [key_slice, value_slice] : kv) { for (const auto& [key_slice, value_slice] : kv) {
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close(); valueFile.close();
assert(0); assert(0);
} }
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close(); valueFile.close();
assert(0); assert(0);
} }
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close(); valueFile.close();
assert(0); assert(0);
} }
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close(); valueFile.close();
assert(0); assert(0);
} }
// 更新文件数据大小
file_data_size ++;
// 记录 file_id 和 offset // 记录 file_id 和 offset
res.push_back({valuelogfile_number_, offset}); res.push_back({valuelogfile_number_, offset});
// 更新偏移量 // 更新偏移量
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len;
} }
// 解锁资源或进行其他清理操作
// 在所有数据写入后,将更新的数据大小写回文件开头
if (!res.empty()) {
valueFile.seekp(0, std::ios::beg); // 移动到文件开头
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
}
else{
valueFile.close();
assert(0);
}
// 解锁资源或进行其他清理操作
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
valueFile.close(); valueFile.close();
return res; return res;
} }
@ -1671,23 +1722,39 @@ std::vector> DBImpl::WriteValueLog(
void DBImpl::addNewValueLog() { void DBImpl::addNewValueLog() {
valuelogfile_number_ = versions_->NewFileNumber(); valuelogfile_number_ = versions_->NewFileNumber();
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::fstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
uint64_t file_data_size = 0; // 新增的文件数据大小标志位
if (valueFile.tellp() != 0) {
assert(0);
}
else{
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
else{
// 正常关闭文件
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
valueFile.close();
}
}
} }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) { std::string* value) {
mutex_.Lock(); mutex_.Lock();
if(file_id==valuelogfile_number_){ if(file_id==valuelogfile_number_){
mutex_.Unlock(); mutex_.Unlock();
std::string file_name_ = ValueLogFileName(dbname_, file_id); std::string file_name_ = ValueLogFileName(dbname_, file_id);
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
uint64_t key_len,value_len;
uint64_t value_len;
inFile.seekg(offset); inFile.seekg(offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
char* key_buf=new char[key_len];
inFile.read(key_buf,key_len);
*key=Slice(key_buf,key_len);
inFile.read((char*)(&value_len),sizeof(uint64_t)); inFile.read((char*)(&value_len),sizeof(uint64_t));
char buf[value_len]; char buf[value_len];
@ -1723,20 +1790,10 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice res; Slice res;
s=valuelog_file->Read(offset,sizeof(uint64_t),&res,buf); s=valuelog_file->Read(offset,sizeof(uint64_t),&res,buf);
assert(s.ok()); assert(s.ok());
uint64_t key_len=*(uint64_t*)(res.data());
char*key_buf=new char[key_len];
s=valuelog_file->Read(offset+sizeof(uint64_t),key_len,&res,key_buf);
assert(s.ok());
*key=Slice(key_buf,key_len);
s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len,sizeof(uint64_t),&res,buf);
assert(s.ok());
uint64_t value_len=*(uint64_t*)(res.data()); uint64_t value_len=*(uint64_t*)(res.data());
char value_buf[value_len]; char value_buf[value_len];
s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len+sizeof(uint64_t),value_len,&res,value_buf);
s=valuelog_file->Read(offset+sizeof(uint64_t),value_len,&res,value_buf);
assert(s.ok()); assert(s.ok());
*value=std::string(res.data(),res.size()); *value=std::string(res.data(),res.size());
@ -1753,7 +1810,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
// 垃圾回收实现 // 垃圾回收实现
void DBImpl::GarbageCollect() { void DBImpl::GarbageCollect() {
// 遍历数据库目录,找到所有 valuelog 文件 // 遍历数据库目录,找到所有 valuelog 文件
std::vector<std::string> filenames; std::vector<std::string> filenames;
Status s = env_->GetChildren(dbname_, &filenames); Status s = env_->GetChildren(dbname_, &filenames);
Log(options_.info_log, "start gc "); Log(options_.info_log, "start gc ");
@ -1762,11 +1818,25 @@ void DBImpl::GarbageCollect() {
for (const auto& filename:filenames) { for (const auto& filename:filenames) {
if (IsValueLogFile(filename)){ if (IsValueLogFile(filename)){
uint64_t cur_log_number = GetValueLogID(filename); uint64_t cur_log_number = GetValueLogID(filename);
if (cur_log_number == valuelogfile_number_) {
continue;
}
auto tmp_name = ValueLogFileName(dbname_, cur_log_number); auto tmp_name = ValueLogFileName(dbname_, cur_log_number);
if(!versions_->checkOldValueLog(tmp_name))valuelog_set.emplace(filename);
// std::cout <<cur_log_number<<" "<<valuelog_usage[cur_log_number] << " " << valuelog_origin[cur_log_number] << std::endl;
if (!versions_->checkOldValueLog(tmp_name) &&
valuelog_origin[cur_log_number]) {
if ((float)(valuelog_usage[cur_log_number] /
valuelog_origin[cur_log_number]) <= 0.6) {
valuelog_set.emplace(filename);
}
}
} }
} }
// std::cout << "valuelog_set size: " << valuelog_set.size() << std::endl;
Log(options_.info_log, "valuelog_set size: %d", valuelog_set.size());
//bool tmp_judge=false;//only clean one file
for (std::string valuelog_name : valuelog_set) { for (std::string valuelog_name : valuelog_set) {
Log(options_.info_log, ("gc processing: "+valuelog_name).data()); Log(options_.info_log, ("gc processing: "+valuelog_name).data());
uint64_t cur_log_number = GetValueLogID(valuelog_name); uint64_t cur_log_number = GetValueLogID(valuelog_name);
@ -1775,8 +1845,9 @@ void DBImpl::GarbageCollect() {
continue; continue;
} }
uint64_t current_offset = 0;
uint64_t tmp_offset = 0;
// 初始化offset为占用大小
uint64_t current_offset = sizeof(uint64_t);
uint64_t tmp_offset = current_offset;
int cnt = 0; int cnt = 0;
@ -1794,86 +1865,48 @@ void DBImpl::GarbageCollect() {
// std::cout << cnt <<" "<<current_offset<< std::endl; // std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对 // 读取一个 kv 对
uint64_t key_len, value_len;
uint64_t key_len, val_len;
Slice key, value; Slice key, value;
Status s = Status::OK(); Status s = Status::OK();
// Seek to the position of key length
// Read the length of the value
cur_valuelog.seekg(current_offset); cur_valuelog.seekg(current_offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
cur_valuelog.read((char*)(&val_len), sizeof(uint64_t));
if (cur_valuelog.eof()) { if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾 break; // 正常退出条件:到达文件末尾
} }
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) { if (!cur_valuelog.good()) {
delete[] key_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
assert(0);
} }
// 更新当前偏移
current_offset += sizeof(uint64_t);
current_offset += val_len;
cur_valuelog.seekg(current_offset);
cur_valuelog.read((char*)(&key_len), sizeof(uint64_t));
current_offset += sizeof(uint64_t); current_offset += sizeof(uint64_t);
// Now seek to the actual key position and read the key
cur_valuelog.seekg(current_offset); cur_valuelog.seekg(current_offset);
char* key_buf = new char[key_len]; char* key_buf = new char[key_len];
cur_valuelog.read(key_buf, key_len); cur_valuelog.read(key_buf, key_len);
if (!cur_valuelog.good()) { if (!cur_valuelog.good()) {
delete[] key_buf; delete[] key_buf;
delete[] key_buf_len;
cur_valuelog.close(); cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
std::cerr << "2Failed to read file: " << valuelog_name << std::endl;
break; break;
} }
current_offset += key_len; current_offset += key_len;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// Assign the read key data to the Slice // Assign the read key data to the Slice
key = Slice(key_buf, key_len); key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// // Now seek to the actual data position and read the value
// cur_valuelog.seekg(current_offset);
// char* value_buf = new char[val_len];
// cur_valuelog.read(value_buf, val_len);
// if (!cur_valuelog.good()) {
// delete[] key_buf;
// delete[] key_buf_len;
// delete[] value_buf_len;
// delete[] value_buf;
// cur_valuelog.close();
// std::cerr << "Failed to read file: " << valuelog_name << std::endl;
// break;
// }
current_offset += val_len;
// // Assign the read value data to the Slice
// value = Slice(value_buf, val_len);
// 检查 key 是否在 sstable 中存在 // 检查 key 是否在 sstable 中存在
std::string stored_value; std::string stored_value;
@ -1898,21 +1931,18 @@ void DBImpl::GarbageCollect() {
// Key 不存在,忽略此记录 // Key 不存在,忽略此记录
continue; continue;
} }
else if (!status.ok()) {
assert(0);
}
else if(stored_value.data()[0]==(char)(0x00)){ else if(stored_value.data()[0]==(char)(0x00)){
//value is too small //value is too small
continue; continue;
} }
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
// 检查 valuelog_id 和 offset 是否匹配 // 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset; uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value.substr(1), stored_valuelog_id, ParseStoredValue(stored_value.substr(1), stored_valuelog_id,
stored_offset); // 假设解析函数
stored_offset);
if (stored_valuelog_id != GetValueLogID(valuelog_name) || if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != tmp_offset) { stored_offset != tmp_offset) {
// 记录无效,跳过 // 记录无效,跳过
@ -1920,16 +1950,14 @@ void DBImpl::GarbageCollect() {
} }
// Now seek to the actual data position and read the value // Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset-val_len);
cur_valuelog.seekg(tmp_offset+sizeof(uint64_t));
char* value_buf = new char[val_len]; char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len); cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) { if (!cur_valuelog.good()) {
delete[] key_buf; delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf; delete[] value_buf;
cur_valuelog.close(); cur_valuelog.close();
std::cerr << "Failed to read file: " << valuelog_name << std::endl;
std::cerr << "4Failed to read file: " << valuelog_name << std::endl;
break; break;
} }
@ -1950,6 +1978,9 @@ void DBImpl::GarbageCollect() {
<< std::endl; << std::endl;
continue; continue;
} }
delete value_buf;
delete key_buf;
} }
// 清理旧文件(如果需要) // 清理旧文件(如果需要)
@ -2063,4 +2094,43 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result; return result;
} }
// 读取所有现有日志文件的 file_data_size
void DBImpl::InitializeExistingLogs() {
std::vector<std::string> filenames;
Status s = env_->GetChildren(dbname_, &filenames);
Log(options_.info_log, "start set file map ");
assert(s.ok());
std::set<std::string> valuelog_set;
for (const auto& filename : filenames) {
if (IsValueLogFile(filename)) {
uint64_t cur_log_number = GetValueLogID(filename);
uint64_t file_data_size = ReadFileSize(cur_log_number);
valuelog_usage.emplace(cur_log_number,file_data_size);
// std::cout << "cur_log_number: " << cur_log_number << " file_data_size: " << file_data_size << std::endl;
}
}
}
// 读取单个文件的 file_data_size
uint64_t DBImpl::ReadFileSize(uint64_t log_number) {
auto file_name = ValueLogFileName(dbname_, log_number);
std::ifstream valueFile(file_name, std::ios::in | std::ios::binary);
if (!valueFile.is_open()) {
std::cerr << "Failed to open file: " << file_name << std::endl;
return 0;
}
uint64_t file_data_size = 0;
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
if (valueFile.fail() || valueFile.bad()) {
std::cerr << "Failed to read data size from file: " << file_name
<< std::endl;
valueFile.close();
return 0;
}
valueFile.close();
return file_data_size;
}
} // namespace leveldb } // namespace leveldb

+ 10
- 2
db/db_impl.h View File

@ -70,9 +70,9 @@ class DBImpl : public DB {
std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog( std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog(
std::vector<std::pair<Slice, Slice>> value) override; std::vector<std::pair<Slice, Slice>> value) override;
void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_);
;
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Status ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) override; std::string* value) override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -100,6 +100,11 @@ class DBImpl : public DB {
// bytes. // bytes.
void RecordReadSample(Slice key); void RecordReadSample(Slice key);
void InitializeExistingLogs();
uint64_t ReadFileSize(uint64_t log_number);
private: private:
friend class DB; friend class DB;
struct CompactionState; struct CompactionState;
@ -237,6 +242,9 @@ class DBImpl : public DB {
std::shared_mutex mem_valuelog_mutex; std::shared_mutex mem_valuelog_mutex;
std::unordered_map<uint64_t,mem_valuelog> mem_valuelogs; GUARDED_BY(mem_valuelog_mutex); std::unordered_map<uint64_t,mem_valuelog> mem_valuelogs; GUARDED_BY(mem_valuelog_mutex);
std::map<uint64_t, uint64_t> valuelog_usage;
std::map<uint64_t, uint64_t> valuelog_origin;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers. // Queue of writers.

+ 2
- 3
db/prefetch_iter.cc View File

@ -81,7 +81,6 @@ class DBPreFetchIter : public Iterator {
private: private:
std::string GetAndParseTrueValue(Slice tmp_value)const{ std::string GetAndParseTrueValue(Slice tmp_value)const{
Slice key;
if(tmp_value.size()==0){ if(tmp_value.size()==0){
return ""; return "";
} }
@ -96,8 +95,8 @@ class DBPreFetchIter : public Iterator {
res=GetVarint64(&tmp_value,&valuelog_offset); res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0); if(!res)assert(0);
std::string str; std::string str;
Status s=db_->ReadValueLog(file_id,valuelog_offset, &key, &str);
return str;
Status s=db_->ReadValueLog(file_id,valuelog_offset, &str);
return std::move(str);
} }

+ 1
- 1
include/leveldb/db.h View File

@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB {
// assert(0); // Not implemented // assert(0); // Not implemented
// return Status::Corruption("not imp"); // return Status::Corruption("not imp");
// } // }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, std::string* value){
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value){
assert(0); // Not implemented assert(0); // Not implemented
return Status::Corruption("not imp"); return Status::Corruption("not imp");
} }

+ 6
- 6
test/test.cpp View File

@ -71,7 +71,7 @@ TEST(Test, CheckGetFields) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
@ -107,7 +107,7 @@ TEST(Test, CheckGetFields) {
TEST(Test, CheckSearchKey) { TEST(Test, CheckSearchKey) {
DB *db; DB *db;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
@ -147,7 +147,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
abort(); abort();
} }
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<5000;i++){
for(int i=0;i<50000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
for(int j=0;j<5000;j++){ for(int j=0;j<5000;j++){
@ -156,7 +156,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
values.push_back(value); values.push_back(value);
db->Put(writeOptions,key,value); db->Put(writeOptions,key,value);
} }
for(int i=0;i<5000;i++){
for(int i=0;i<50000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
Status s=db->Get(readOptions,key,&value); Status s=db->Get(readOptions,key,&value);
@ -179,7 +179,7 @@ TEST(Test, Garbage_Collect_TEST) {
abort(); abort();
} }
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<5000;i++){
for(int i=0;i<50000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
for(int j=0;j<1000;j++){ for(int j=0;j<1000;j++){
@ -192,7 +192,7 @@ TEST(Test, Garbage_Collect_TEST) {
db->TEST_GarbageCollect(); db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl; std::cout<<"finish gc"<<std::endl;
for(int i=0;i<5000;i++){
for(int i=0;i<50000;i++){
// std::cout<<i<<std::endl; // std::cout<<i<<std::endl;
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;

Loading…
Cancel
Save