Browse Source

gc pass make

xxy
xxy 9 months ago
parent
commit
f7fa26c9df
8 changed files with 436 additions and 169 deletions
  1. +4
    -3
      CMakeLists.txt
  2. +323
    -74
      db/db_impl.cc
  3. +35
    -19
      db/db_impl.h
  4. +2
    -1
      db/db_iter.cc
  5. +3
    -3
      db/write_batch.cc
  6. +2
    -2
      include/leveldb/db.h
  7. +1
    -1
      include/leveldb/slice.h
  8. +66
    -66
      test/test.cpp

+ 4
- 3
CMakeLists.txt View File

@ -2,13 +2,14 @@
# Use of this source code is governed by a BSD-style license that can be # Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors. # found in the LICENSE file. See the AUTHORS file for names of contributors.
cmake_minimum_required(VERSION 3.9)
cmake_minimum_required(VERSION 3.10)
# Keep the version below in sync with the one in db.h # Keep the version below in sync with the one in db.h
project(leveldb VERSION 1.23.0 LANGUAGES C CXX) project(leveldb VERSION 1.23.0 LANGUAGES C CXX)
# C standard can be overridden when this is used as a sub-project. # C standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_C_STANDARD) if(NOT CMAKE_C_STANDARD)
# This project can use C11, but will gracefully decay down to C89. # This project can use C11, but will gracefully decay down to C89.
# 17
set(CMAKE_C_STANDARD 11) set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_STANDARD_REQUIRED OFF)
set(CMAKE_C_EXTENSIONS OFF) set(CMAKE_C_EXTENSIONS OFF)
@ -16,8 +17,8 @@ endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project. # C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD) if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
# This project requires C++17.
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD) endif(NOT CMAKE_CXX_STANDARD)

+ 323
- 74
db/db_impl.cc View File

@ -4,15 +4,6 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -23,11 +14,23 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <fstream>
#include <iostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "leveldb/table_builder.h" #include "leveldb/table_builder.h"
#include "port/port.h" #include "port/port.h"
#include "table/block.h" #include "table/block.h"
#include "table/merger.h" #include "table/merger.h"
@ -35,6 +38,8 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include <filesystem>
namespace fs = std::filesystem;
namespace leveldb { namespace leveldb {
@ -146,6 +151,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
seed_(0), seed_(0),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
background_garbage_collect_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {} &internal_comparator_)) {}
@ -667,6 +673,7 @@ void DBImpl::RecordBackgroundError(const Status& s) {
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (background_compaction_scheduled_) { if (background_compaction_scheduled_) {
// Already scheduled // Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) { } else if (shutting_down_.load(std::memory_order_acquire)) {
@ -682,6 +689,25 @@ void DBImpl::MaybeScheduleCompaction() {
} }
} }
void DBImpl::MaybeScheduleGarbageCollect() {
mutex_.AssertHeld();
if (background_garbage_collect_scheduled_) {
// Garbage collection already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background work
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else {
background_garbage_collect_scheduled_ = true;
env_->Schedule(&DBImpl::BGWorkGC, this);
}
}
void DBImpl::BGWorkGC(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundGarbageCollect();
}
void DBImpl::BGWork(void* db) { void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall(); reinterpret_cast<DBImpl*>(db)->BackgroundCall();
} }
@ -699,12 +725,34 @@ void DBImpl::BackgroundCall() {
background_compaction_scheduled_ = false; background_compaction_scheduled_ = false;
// Check if garbage collection needs to be scheduled after compaction
MaybeScheduleGarbageCollect();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed. // so reschedule another compaction if needed.
MaybeScheduleCompaction(); MaybeScheduleCompaction();
background_work_finished_signal_.SignalAll(); background_work_finished_signal_.SignalAll();
} }
void DBImpl::BackgroundGarbageCollect() {
MutexLock l(&mutex_);
assert(background_garbage_collect_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
// Perform garbage collection here
GarbageCollect();
}
background_garbage_collect_scheduled_ = false;
// Notify any waiting threads
background_work_finished_signal_.SignalAll();
}
void DBImpl::BackgroundCompaction() { void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
@ -1163,21 +1211,22 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
if(value->c_str()[0]==0x00){
*value=value->substr(1);
if (value->c_str()[0] == 0x00) {
*value = value->substr(1);
return s; return s;
} }
Slice value_log_slice=Slice(value->c_str()+1,value->length());
Slice value_log_slice = Slice(value->c_str() + 1, value->length());
Slice new_key;
Slice new_value; Slice new_value;
int value_offset=sizeof(uint64_t)*2;// 16
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id");
res=GetVarint64(&value_log_slice,&valuelog_offset);
if(!res)return Status::Corruption("can't decode valuelog offset");
ReadValueLog(file_id,valuelog_offset,&new_value);
*value=std::string(new_value.data(),new_value.size());
delete []new_value.data();
int value_offset = sizeof(uint64_t) * 2; // 16
uint64_t file_id, valuelog_offset;
bool res = GetVarint64(&value_log_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
*value = std::string(new_value.data(), new_value.size());
delete[] new_value.data();
return s; return s;
} }
@ -1215,7 +1264,6 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val); return DB::Put(o, key, val);
} }
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
@ -1241,7 +1289,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch,this);
WriteBatchInternal::ConverToValueLog(write_batch, this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
@ -1501,7 +1549,8 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// DBImpl::WriteValueLog(std::vector<Slice> values){
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); // std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
@ -1521,37 +1570,54 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// return res; // return res;
// } // }
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(std::vector<Slice> values) {
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(
std::vector<std::pair<Slice, Slice>> kv) {
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) { if (!valueFile.is_open()) {
assert(0);
assert(0);
} }
uint64_t offset = valueFile.tellp(); uint64_t offset = valueFile.tellp();
std::vector<std::pair<uint64_t, uint64_t>> res; std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& slice : values) {
uint64_t len = slice.size();
for (const auto& [key_slice, value_slice] : kv) {
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 先写入长度
valueFile.write(reinterpret_cast<const char*>(&len), sizeof(uint64_t));
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close();
assert(0);
valueFile.close();
assert(0);
} }
// 再写入实际数据
valueFile.write(slice.data(), len);
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
if (!valueFile.good()) { if (!valueFile.good()) {
valueFile.close();
assert(0);
valueFile.close();
assert(0);
}
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
if (!valueFile.good()) {
valueFile.close();
assert(0);
} }
// 记录 file_id 和 offset // 记录 file_id 和 offset
res.push_back({valuelogfile_number_, offset}); res.push_back({valuelogfile_number_, offset});
offset += sizeof(uint64_t) + len;
// 更新偏移量
offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len;
} }
// 解锁资源或进行其他清理操作 // 解锁资源或进行其他清理操作
@ -1560,83 +1626,266 @@ std::vector> DBImpl::WriteValueLog(std::vector
return res; return res;
} }
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){
for(int i=0;i<values.size();i++){
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,
std::vector<Slice> values) {
for (int i = 0; i < values.size(); i++) {
target_file->Append(values[i]); target_file->Append(values[i]);
} }
} }
void DBImpl::addNewValueLog(){
valuelogfile_number_=versions_->NewFileNumber();
void DBImpl::addNewValueLog() {
valuelogfile_number_ = versions_->NewFileNumber();
} }
// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value)
// {
// //lock_shared // //lock_shared
// Status s=Status::OK();
// std::string file_name_=ValueLogFileName(dbname_,file_id);
// //std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
// Status s = Status::OK();
// std::string file_name_ = ValueLogFileName(dbname_, file_id);
// // Open the file in binary mode for reading
// std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); // std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
// if (!inFile.is_open()) { // if (!inFile.is_open()) {
// std::cerr << "Failed to open file for writing!"<<file_id<<" "<<offset<<" "<<len<< std::endl;
// return Status::Corruption("Failed to open file for writing!");
// std::cerr << "Failed to open file: " << file_name_ << " for reading!"
// << std::endl; return Status::Corruption("Failed to open file for
// reading!");
// } // }
// // Seek to the position of len
// inFile.seekg(offset); // inFile.seekg(offset);
// char *value_buf=new char[len];
// inFile.read(value_buf,len);
// // check available
// // s=check_value_available(value_buf);
// // Read the length of the value
// // uint64_t len;
// // inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
// char *value_buf_len=new char[sizeof(uint64_t)];
// inFile.read(value_buf_len,sizeof(uint64_t));
// uint64_t len=0;
// std::memcpy(&len, value_buf_len, sizeof(uint64_t));
// if (!inFile.good()) {
// inFile.close();
// return Status::Corruption("Failed to read length from file!");
// }
// // Now seek to the actual data position and read the value
// inFile.seekg(offset + sizeof(uint64_t));
// char* value_buf = new char[len];
// inFile.read(value_buf, len);
// if (!inFile.good()) {
// delete[] value_buf;
// inFile.close();
// return Status::Corruption("Failed to read value from file!");
// }
// // Close the file after reading
// inFile.close(); // inFile.close();
// *value=Slice(value_buf,len);
// // Assign the read data to the Slice
// *value = Slice(value_buf, len);
// return s; // return s;
// } // }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) {
//lock_shared
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) {
Status s = Status::OK(); Status s = Status::OK();
std::string file_name_ = ValueLogFileName(dbname_, file_id); std::string file_name_ = ValueLogFileName(dbname_, file_id);
// Open the file in binary mode for reading // Open the file in binary mode for reading
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) { if (!inFile.is_open()) {
std::cerr << "Failed to open file: " << file_name_ << " for reading!" << std::endl;
return Status::Corruption("Failed to open file for reading!");
std::cerr << "Failed to open file: " << file_name_ << " for reading!"
<< std::endl;
return Status::Corruption("Failed to open file for reading!");
} }
// Seek to the position of len
// Seek to the position of key length
inFile.seekg(offset); inFile.seekg(offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
inFile.read(key_buf_len, sizeof(uint64_t));
uint64_t key_len = 0;
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key length from file!");
}
// Now seek to the actual key position and read the key
inFile.seekg(offset + sizeof(uint64_t));
char* key_buf = new char[key_len];
inFile.read(key_buf, key_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key from file!");
}
// Assign the read key data to the Slice
*key = Slice(key_buf, key_len);
// Read the length of the value // Read the length of the value
// uint64_t len;
// inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
char *value_buf_len=new char[sizeof(uint64_t)];
inFile.read(value_buf_len,sizeof(uint64_t));
uint64_t len=0;
std::memcpy(&len, value_buf_len, sizeof(uint64_t));
inFile.seekg(offset + sizeof(uint64_t) + key_len);
char* value_buf_len = new char[sizeof(uint64_t)];
inFile.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!inFile.good()) { if (!inFile.good()) {
inFile.close();
return Status::Corruption("Failed to read length from file!");
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
inFile.close();
return Status::Corruption("Failed to read value length from file!");
} }
// Now seek to the actual data position and read the value // Now seek to the actual data position and read the value
inFile.seekg(offset + sizeof(uint64_t));
char* value_buf = new char[len];
inFile.read(value_buf, len);
inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t));
char* value_buf = new char[val_len];
inFile.read(value_buf, val_len);
if (!inFile.good()) { if (!inFile.good()) {
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
} }
// Close the file after reading // Close the file after reading
inFile.close(); inFile.close();
// Assign the read data to the Slice
*value = Slice(value_buf, len);
// Assign the read value data to the Slice
*value = Slice(value_buf, val_len);
return s; return s;
} }
// 判断文件是否为 valuelog 文件
bool IsValueLogFile(const std::string& filename) {
return filename.find("valuelog_") ==
0; // 简单判断文件名是否匹配 valuelog 前缀
}
// 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset
std::istringstream iss(stored_value);
iss >> valuelog_id >> offset;
}
// 示例:获取 ValueLog 文件 ID
uint64_t GetValueLogID(const std::string& valuelog_name) {
// 假设文件名中包含唯一的 ID,例如 "valuelog_1"
auto pos = valuelog_name.find_last_of('_');
return std::stoull(valuelog_name.substr(pos + 1));
}
// 垃圾回收实现
void DBImpl::GarbageCollect() {
// 遍历数据库目录,找到所有 valuelog 文件
auto files_set = fs::directory_iterator(dbname_);
for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) &&
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
std::string valuelog_name = cur_log_file.path().string();
uint64_t cur_log_number = GetValueLogID(valuelog_name);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* new_valuelog = nullptr;
std::string new_valuelog_name = LogFileName(dbname_, new_log_number);
Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number),
&new_valuelog);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
addNewValueLog();
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open ValueLog file: " << valuelog_name
<< std::endl;
continue;
}
// whether to reopen
std::ifstream new_valuelog_file(new_valuelog_name,
std::ios::in | std::ios::binary);
if (!new_valuelog_file.is_open()) {
std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name
<< std::endl;
continue;
}
uint64_t current_offset = 0;
uint64_t new_offset = 0; // 新的 ValueLog 偏移
while (true) {
// 读取一个 kv 对
uint64_t key_len, value_len;
Slice key, value;
Slice new_value;
ReadValueLog(cur_log_number, current_offset, &key, &value);
value = std::string(new_value.data(), new_value.size());
// 检查 key 是否在 sstable 中存在
std::string stored_value;
Status status = Get(leveldb::ReadOptions(), key, &stored_value);
if (status.IsNotFound()) {
// Key 不存在,忽略此记录
continue;
}
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
// 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value, stored_valuelog_id,
stored_offset); // 假设解析函数
if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
stored_offset != current_offset) {
// 记录无效,跳过
continue;
}
status = Put(leveldb::WriteOptions(), key, value);
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;
continue;
}
// 更新偏移
new_offset +=
sizeof(key_len) + key.size() + sizeof(value_len) + value.size();
// 更新当前偏移
current_offset +=
sizeof(key_len) + key.size() + sizeof(value_len) + value.size();
}
// 清理旧文件(如果需要)
cur_valuelog.close();
new_valuelog_file.close();
std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件
}
}
}
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

+ 35
- 19
db/db_impl.h View File

@ -5,19 +5,20 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <set>
#include <string>
#include <mutex>
#include <map> #include <map>
#include <mutex>
#include <set>
#include <shared_mutex> #include <shared_mutex>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h" #include "port/port.h"
#include "port/thread_annotations.h" #include "port/thread_annotations.h"
@ -44,12 +45,12 @@ class DBImpl : public DB {
// // // //
// FieldArray ParseValue(const std::string& value_str)override; // FieldArray ParseValue(const std::string& value_str)override;
// Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields)override;
// Status Put_with_fields(const WriteOptions& options, const Slice& key,const
// FieldArray& fields)override;
// Status Get_with_fields(const ReadOptions& options, const Slice& key, // Status Get_with_fields(const ReadOptions& options, const Slice& key,
// FieldArray* fields)override; // FieldArray* fields)override;
// Implementations of the DB interface // Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
@ -63,14 +64,19 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override; bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<Slice> value)override;
void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value);
void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);;
std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction
// Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override;
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// WriteValueLog(std::vector<Slice> value)override;
std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog(
std::vector<std::pair<Slice, Slice>> value) override;
void writeValueLogForCompaction(WritableFile* target_file,
std::vector<Slice> value);
void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_);
;
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
// Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice*
// value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -161,9 +167,15 @@ class DBImpl : public DB {
void RecordBackgroundError(const Status& s); void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void MaybeScheduleGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db); static void BGWork(void* db);
static void BGWorkGC(void* db);
void BackgroundCall(); void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact) void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact) Status DoCompactionWork(CompactionState* compact)
@ -195,7 +207,7 @@ class DBImpl : public DB {
// State below is protected by mutex_ // State below is protected by mutex_
port::Mutex mutex_; port::Mutex mutex_;
//std::shared_mutex value_log_mutex;
// std::shared_mutex value_log_mutex;
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_; MemTable* mem_;
@ -203,11 +215,11 @@ class DBImpl : public DB {
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_; WritableFile* logfile_;
WritableFile* valuelogfile_; WritableFile* valuelogfile_;
int valuelogfile_offset=0;
int valuelogfile_offset = 0;
uint64_t logfile_number_; uint64_t logfile_number_;
uint64_t valuelogfile_number_; uint64_t valuelogfile_number_;
log::Writer* log_; log::Writer* log_;
std::map<uint64_t,uint64_t> oldvaluelog_ids;
std::map<uint64_t, uint64_t> oldvaluelog_ids;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers. // Queue of writers.
@ -223,6 +235,10 @@ class DBImpl : public DB {
// Has a background compaction been scheduled or is running? // Has a background compaction been scheduled or is running?
bool background_compaction_scheduled_ GUARDED_BY(mutex_); bool background_compaction_scheduled_ GUARDED_BY(mutex_);
// Has a background gc been scheduled or is running?
bool background_garbage_collect_scheduled_ GUARDED_BY(mutex_);
ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
VersionSet* const versions_ GUARDED_BY(mutex_); VersionSet* const versions_ GUARDED_BY(mutex_);

+ 2
- 1
db/db_iter.cc View File

@ -69,6 +69,7 @@ class DBIter : public Iterator {
Slice value() const override { Slice value() const override {
assert(valid_); assert(valid_);
auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_;
Slice key;
if(tmp_value.data()[0]==0x00){ if(tmp_value.data()[0]==0x00){
tmp_value.remove_prefix(1); tmp_value.remove_prefix(1);
return tmp_value; return tmp_value;
@ -82,7 +83,7 @@ class DBIter : public Iterator {
// res=GetVarint64(&tmp_value,&valuelog_len); // res=GetVarint64(&tmp_value,&valuelog_len);
// if(!res)assert(0); // if(!res)assert(0);
// db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
return tmp_value; return tmp_value;
} }
Status status() const override { Status status() const override {

+ 3
- 3
db/write_batch.cc View File

@ -145,9 +145,9 @@ class ValueLogInserter : public WriteBatch::Handler {
} }
else{ else{
buf+=(char)(0x01); buf+=(char)(0x01);
std::vector<Slice> v;
v.push_back(value);
auto res=db_->WriteValueLog(v);
std::vector<std::pair<Slice,Slice>> kv;
kv.push_back({key,value});
auto res=db_->WriteValueLog(kv);
PutVarint64(&buf,res[0].first); PutVarint64(&buf,res[0].first);
// PutVarint64(&buf,res[0].second.first); // PutVarint64(&buf,res[0].second.first);
// PutVarint64(&buf,res[0].second.second); // PutVarint64(&buf,res[0].second.second);

+ 2
- 2
include/leveldb/db.h View File

@ -107,7 +107,7 @@ class LEVELDB_EXPORT DB {
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v; // std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
// return v; // return v;
// } // }
virtual std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<Slice> value){
virtual std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<std::pair<Slice,Slice>> value){
assert(0); assert(0);
std::vector<std::pair<uint64_t,uint64_t>> v; std::vector<std::pair<uint64_t,uint64_t>> v;
return v; return v;
@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB {
// assert(0); // Not implemented // assert(0); // Not implemented
// return Status::Corruption("not imp"); // return Status::Corruption("not imp");
// } // }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){
assert(0); // Not implemented assert(0); // Not implemented
return Status::Corruption("not imp"); return Status::Corruption("not imp");
} }

+ 1
- 1
include/leveldb/slice.h View File

@ -69,7 +69,7 @@ class LEVELDB_EXPORT Slice {
// Drop the first "n" bytes from this slice. // Drop the first "n" bytes from this slice.
void remove_prefix(size_t n) { void remove_prefix(size_t n) {
if(n>size()){
if (n > size()) {
assert(0); assert(0);
} }
assert(n <= size()); assert(n <= size());

+ 66
- 66
test/test.cpp View File

@ -86,77 +86,77 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st
return Status::OK(); return Status::OK();
} }
TEST(Test, CheckGetFields) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
// TEST(Test, CheckGetFields) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::string key1 = "k_1";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
// FieldArray fields1 = {
// {"name", "Customer#000000001"},
// {"address", "IVhzIApeRb"},
// {"phone", "25-989-741-2988"}
// };
auto value1=SerializeValue(fields1);
// auto value1=SerializeValue(fields1);
db->Put(WriteOptions(), key1, value1);
// db->Put(WriteOptions(), key1, value1);
// 璇诲彇骞跺弽搴忓垪鍖?
std::string value_ret;
FieldArray res1;
// // 璇诲彇骞跺弽搴忓垪鍖?
// std::string value_ret;
// FieldArray res1;
db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1);
for(auto pr:res1){
std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
}
ASSERT_TRUE(CompareFieldArray(fields1, res1));
// db->Get(ReadOptions(), key1, &value_ret);
// DeserializeValue(value_ret, &res1);
// for(auto pr:res1){
// std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
// }
// ASSERT_TRUE(CompareFieldArray(fields1, res1));
db->Delete(WriteOptions(),key1);
// db->Delete(WriteOptions(),key1);
std::cout<<"get serialized value done"<<std::endl;
delete db;
}
TEST(Test, CheckSearchKey) {
DB *db;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> keys;
std::vector<std::string> target_keys;
for(int i=0;i<10000;i++){
std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
FieldArray fields={
{"name", key},
{"address", std::to_string(rand()%7)},
{"phone", std::to_string(rand()%114514)}
};
if(rand()%5==0){
fields[0].second="special_key";
target_keys.push_back(key);
}
keys.push_back(key);
db->Put(WriteOptions(),key,SerializeValue(fields));
}
std::sort(target_keys.begin(),target_keys.end());
std::vector<std::string> key_res;
Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
ASSERT_TRUE(CompareKey(key_res, target_keys));
std::cout<<"get key by field done"<<std::endl;
for(auto s:keys){
db->Delete(WriteOptions(),s);
}
delete db;
}
// std::cout<<"get serialized value done"<<std::endl;
// delete db;
// }
// TEST(Test, CheckSearchKey) {
// DB *db;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> keys;
// std::vector<std::string> target_keys;
// for(int i=0;i<10000;i++){
// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
// FieldArray fields={
// {"name", key},
// {"address", std::to_string(rand()%7)},
// {"phone", std::to_string(rand()%114514)}
// };
// if(rand()%5==0){
// fields[0].second="special_key";
// target_keys.push_back(key);
// }
// keys.push_back(key);
// db->Put(WriteOptions(),key,SerializeValue(fields));
// }
// std::sort(target_keys.begin(),target_keys.end());
// std::vector<std::string> key_res;
// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
// ASSERT_TRUE(CompareKey(key_res, target_keys));
// std::cout<<"get key by field done"<<std::endl;
// for(auto s:keys){
// db->Delete(WriteOptions(),s);
// }
// delete db;
// }
TEST(Test, LARGE_DATA_COMPACT_TEST) { TEST(Test, LARGE_DATA_COMPACT_TEST) {
DB *db; DB *db;
@ -167,7 +167,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
abort(); abort();
} }
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<500000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
for(int j=0;j<1000;j++){ for(int j=0;j<1000;j++){
@ -176,7 +176,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
values.push_back(value); values.push_back(value);
db->Put(writeOptions,key,value); db->Put(writeOptions,key,value);
} }
for(int i=0;i<500000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
Status s=db->Get(readOptions,key,&value); Status s=db->Get(readOptions,key,&value);

Loading…
Cancel
Save