Browse Source

huge update:bugs fixed, tests added, new function(unorder iter .etc) added

pull/3/head
alexfisher 8 months ago
parent
commit
58222843ff
33 changed files with 703 additions and 742 deletions
  1. +1
    -1
      .gitmodules
  2. +5
    -2
      CMakeLists.txt
  3. +1
    -1
      YCSB-cpp
  4. +2
    -2
      benchmarks/db_bench.cc
  5. +2
    -1
      db/autocompact_test.cc
  6. +0
    -10
      db/builder.cc
  7. +1
    -0
      db/corruption_test.cc
  8. +192
    -167
      db/db_impl.cc
  9. +10
    -6
      db/db_impl.h
  10. +9
    -1
      db/db_test.cc
  11. +1
    -5
      db/dbformat.h
  12. +63
    -0
      db/fields.cc
  13. +4
    -1
      db/filename.cc
  14. +1
    -0
      db/filename.h
  15. +0
    -302
      db/prefetch_iter.cc
  16. +6
    -4
      db/recovery_test.cc
  17. +1
    -1
      db/repair.cc
  18. +104
    -0
      db/true_iter.cc
  19. +4
    -5
      db/true_iter.h
  20. +28
    -10
      db/unordered_iter.cc
  21. +1
    -1
      db/unordered_iter.h
  22. +0
    -4
      db/version_edit.cc
  23. +1
    -3
      db/version_edit.h
  24. +7
    -5
      db/version_set.cc
  25. +6
    -4
      db/write_batch.cc
  26. +1
    -1
      db/write_batch_internal.h
  27. +8
    -1
      include/leveldb/db.h
  28. +20
    -0
      include/leveldb/fields.h
  29. +16
    -0
      include/leveldb/options.h
  30. +185
    -49
      test/test.cpp
  31. +4
    -129
      util/coding.cc
  32. +2
    -9
      util/coding.h

+ 1
- 1
.gitmodules View File

@ -6,4 +6,4 @@
url = https://github.com/google/benchmark url = https://github.com/google/benchmark
[submodule "YCSB-cpp"] [submodule "YCSB-cpp"]
path = YCSB-cpp path = YCSB-cpp
url = https://github.com/ls4154/YCSB-cpp.git
url = https://github.com/zerowinter0/my_YCSB_benchmark.git

+ 5
- 2
CMakeLists.txt View File

@ -129,8 +129,9 @@ target_sources(leveldb
"db/db_impl.h" "db/db_impl.h"
"db/db_iter.cc" "db/db_iter.cc"
"db/db_iter.h" "db/db_iter.h"
"db/prefetch_iter.cc"
"db/prefetch_iter.h"
"db/true_iter.cc"
"db/true_iter.h"
"db/fields.cc"
"db/unordered_iter.cc" "db/unordered_iter.cc"
"db/unordered_iter.h" "db/unordered_iter.h"
"db/dbformat.cc" "db/dbformat.cc"
@ -213,6 +214,7 @@ target_sources(leveldb
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/fields.h"
) )
if (WIN32) if (WIN32)
@ -497,6 +499,7 @@ if(LEVELDB_INSTALL)
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/fields.h"
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb" DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb"
) )

+ 1
- 1
YCSB-cpp

@ -1 +1 @@
Subproject commit b8e5cc1446a3df02f09d045b045434bdebf27405
Subproject commit 7df09a150d3ab16b303c25007eb0a27c8eed8049

+ 2
- 2
benchmarks/db_bench.cc View File

@ -883,10 +883,10 @@ class Benchmark {
} }
void ReadUnorderSequential(ThreadState* thread) { void ReadUnorderSequential(ThreadState* thread) {
Iterator* iter = db_->NewUnorderedIterator(ReadOptions());
Iterator* iter = db_->NewUnorderedIterator(ReadOptions(),Slice(),Slice());
int i = 0; int i = 0;
int64_t bytes = 0; int64_t bytes = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
for (; iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size(); bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp(); thread->stats.FinishedSingleOp();
++i; ++i;

+ 2
- 1
db/autocompact_test.cc View File

@ -19,6 +19,7 @@ class AutoCompactTest : public testing::Test {
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
options_.create_if_missing = true; options_.create_if_missing = true;
options_.compression = kNoCompression; options_.compression = kNoCompression;
options_.use_valuelog_length=-1;
EXPECT_LEVELDB_OK(DB::Open(options_, dbname_, &db_)); EXPECT_LEVELDB_OK(DB::Open(options_, dbname_, &db_));
} }
@ -50,7 +51,7 @@ class AutoCompactTest : public testing::Test {
DB* db_; DB* db_;
}; };
static const int kValueSize = 200 * 1024;
static const int kValueSize = 200*1024;
static const int kTotalSize = 100 * 1024 * 1024; static const int kTotalSize = 100 * 1024 * 1024;
static const int kCount = kTotalSize / kValueSize; static const int kCount = kTotalSize / kValueSize;

+ 0
- 10
db/builder.cc View File

@ -28,16 +28,6 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
return s; return s;
} }
// 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value
{
auto tmp_value=iter->value();
if(tmp_value.data()[0]==(char)(0x01)){
tmp_value.remove_prefix(1);
assert(GetVarint64(&tmp_value,&meta->valuelog_id));
}
else meta->valuelog_id=0;
}
TableBuilder* builder = new TableBuilder(options, file); TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
Slice key; Slice key;

+ 1
- 0
db/corruption_test.cc View File

@ -28,6 +28,7 @@ class CorruptionTest : public testing::Test {
tiny_cache_(NewLRUCache(100)) { tiny_cache_(NewLRUCache(100)) {
options_.env = &env_; options_.env = &env_;
options_.block_cache = tiny_cache_; options_.block_cache = tiny_cache_;
options_.use_valuelog_length=-1;
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
options_.create_if_missing = true; options_.create_if_missing = true;

+ 192
- 167
db/db_impl.cc View File

@ -6,7 +6,7 @@
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/prefetch_iter.h"
#include "db/true_iter.h"
#include "db/unordered_iter.h" #include "db/unordered_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/filename.h" #include "db/filename.h"
@ -155,9 +155,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
background_garbage_collect_scheduled_(false), background_garbage_collect_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
valuelog_cache(NewLRUCache(config::mem_value_log_number)),
mem_value_log_number_(raw_options.mem_value_log_number),
valuelog_cache(NewLRUCache(raw_options.mem_value_log_number)),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
&internal_comparator_)),
use_valuelog_length(raw_options.use_valuelog_length),
value_log_size_(raw_options.value_log_size){
} }
@ -283,6 +286,7 @@ void DBImpl::RemoveObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live" // be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end()); keep = (live.find(number) != live.end());
break; break;
case kValueLogFile:
case kCurrentFile: case kCurrentFile:
case kDBLockFile: case kDBLockFile:
case kInfoLogFile: case kInfoLogFile:
@ -1052,8 +1056,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Value is >= 100 bytes, read from external file // Value is >= 100 bytes, read from external file
uint64_t file_id, valuelog_offset; uint64_t file_id, valuelog_offset;
value.remove_prefix(1); value.remove_prefix(1);
bool res = GetVarint64(&value, &file_id);
if (!res) return Status::Corruption("can't decode file id");
status=ParseFakeValueForValuelog(value,file_id,valuelog_offset);
if(!status.ok())break;
valuelog_usage[file_id]--; valuelog_usage[file_id]--;
} }
} }
@ -1197,7 +1203,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
Iterator* DBImpl::TEST_NewInternalIterator() { Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored; SequenceNumber ignored;
uint32_t ignored_seed; uint32_t ignored_seed;
return NewIterator(ReadOptions());
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
} }
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
@ -1253,24 +1259,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (options.find_value_log_for_gc) { if (options.find_value_log_for_gc) {
return s; return s;
} }
if (value->c_str()[0] == 0x00) {
*value = value->substr(1);
return s;
}
Slice value_log_slice = Slice(value->c_str() + 1, value->length());
uint64_t file_id, valuelog_offset;
bool res = GetVarint64(&value_log_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
{
mutex_.Unlock();
s = ReadValueLog(file_id, valuelog_offset, value);
mutex_.Lock();
}
Slice value_log_slice = Slice(value->c_str(), value->length());
mutex_.Unlock();
s=parseTrueValue(&value_log_slice,value);
mutex_.Lock();
return s; return s;
} }
@ -1293,9 +1285,9 @@ Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) {
return db_iter; return db_iter;
} }
Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options) {
Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options,const Slice &lower_key,const Slice &upper_key) {
auto iter=NewOriginalIterator(options); auto iter=NewOriginalIterator(options);
return NewUnorderedIter(this,iter,dbname_);
return NewUnorderedIter(this,iter,dbname_,options.max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator());
} }
Iterator* DBImpl::NewIterator(const ReadOptions& options) { Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1305,27 +1297,16 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
mutex_.Lock(); mutex_.Lock();
Iterator* iter_prefetch = NewInternalIterator(options, &latest_snapshot, &seed);
auto db_iter_prefetch=NewDBIterator(this, user_comparator(), iter_prefetch,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
SequenceNumber useless_snapshot;
Iterator* iter = NewInternalIterator(options, &useless_snapshot, &seed);
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
auto db_iter=NewDBIterator(this, user_comparator(), iter, auto db_iter=NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr (options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot) ? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number() ->sequence_number()
: latest_snapshot), : latest_snapshot),
seed); seed);
mutex_.Unlock(); mutex_.Unlock();
return NewPreFetchIterator(this,db_iter,db_iter_prefetch,iter_num);
return NewTrueIterator(this,db_iter);
} }
void DBImpl::RecordReadSample(Slice key) { void DBImpl::RecordReadSample(Slice key) {
@ -1379,7 +1360,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch, this);//need lock! to protect valuelog_number
WriteBatchInternal::ConverToValueLog(write_batch, this,use_valuelog_length);//need lock! to protect valuelog_number
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
@ -1644,43 +1625,29 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog( std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(
std::vector<std::pair<Slice, Slice>> kv) { std::vector<std::pair<Slice, Slice>> kv) {
if(valuelogfile_number_==0){
addNewValueLog();
}
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::fstream valueFile(file_name_, std::ios::in | std::ios::out | std::ios::binary); std::fstream valueFile(file_name_, std::ios::in | std::ios::out | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
assert(valueFile.is_open());
valueFile.seekg(0, std::ios::end); // 移动到文件末尾 valueFile.seekg(0, std::ios::end); // 移动到文件末尾
uint64_t init_offset = valueFile.tellg(); uint64_t init_offset = valueFile.tellg();
// 如果超出fixed_size // 如果超出fixed_size
if(init_offset>=config::value_log_size){
uint64_t file_data_size = 0; // 文件数据大小标志位
valueFile.seekg(0, std::ios::beg);
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
valuelog_usage[valuelogfile_number_]=file_data_size;
valuelog_origin[valuelogfile_number_]=file_data_size;
if(init_offset>=value_log_size_){
addNewValueLog(); addNewValueLog();
valueFile.close(); valueFile.close();
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary);
valueFile.seekg(0, std::ios::end); // 移动到文件末尾 valueFile.seekg(0, std::ios::end); // 移动到文件末尾
init_offset = valueFile.tellg();
init_offset = 0;
valuelog_usage[valuelogfile_number_]=0;
valuelog_origin[valuelogfile_number_]=0;
} }
uint64_t file_data_size = 0; // 文件数据大小标志位
valueFile.seekg(0, std::ios::beg);
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
valueFile.clear(); // 清除错误状态
//update length first
file_data_size+=kv.size();
valueFile.seekp(0, std::ios::beg); // 移动到文件开头
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t));
assert(valueFile.good());
valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入
// std::cout<<"file_data_size: "<<file_data_size<<std::endl;
std::vector<std::pair<uint64_t, uint64_t>> res; std::vector<std::pair<uint64_t, uint64_t>> res;
int total_size=0; int total_size=0;
@ -1689,7 +1656,7 @@ std::vector> DBImpl::WriteValueLog(
total_size+=pr.first.size()+pr.second.size(); total_size+=pr.first.size()+pr.second.size();
} }
char buf[total_size];
char* buf= new char[total_size];//write all data with one fstream.write using this buf
uint64_t offset=0; uint64_t offset=0;
for (const auto& pr:kv) { for (const auto& pr:kv) {
@ -1726,58 +1693,65 @@ std::vector> DBImpl::WriteValueLog(
// 解锁资源或进行其他清理操作 // 解锁资源或进行其他清理操作
//valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 //valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
valueFile.close(); valueFile.close();
valuelog_usage[valuelogfile_number_]+=res.size();
valuelog_origin[valuelogfile_number_]+=res.size();
delete buf;
return res; return res;
} }
void DBImpl::addNewValueLog() { void DBImpl::addNewValueLog() {
mutex_.AssertHeld();
valuelogfile_number_ = versions_->NewFileNumber(); valuelogfile_number_ = versions_->NewFileNumber();
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::fstream valueFile(file_name_, std::ios::app | std::ios::binary); std::fstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
uint64_t file_data_size = 0; // 新增的文件数据大小标志位
if (valueFile.tellp() != 0) {
assert(0);
}
else{
valueFile.write(reinterpret_cast<const char*>(&file_data_size), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
else{
// 正常关闭文件
valueFile.flush(); // 确保所有缓冲区的数据都被写入文件
valueFile.close();
}
}
assert(valueFile.is_open());
valueFile.close();
} }
static void valuelog_cache_deleter(const leveldb::Slice &key, void *value){ static void valuelog_cache_deleter(const leveldb::Slice &key, void *value){
delete (RandomAccessFile*)value; delete (RandomAccessFile*)value;
} }
Status DBImpl::parseTrueValue(Slice* value,std::string *true_value){
if(value->empty()){
*true_value="";
}
else if(value->data()[0]==0x00){
value->remove_prefix(1);
std::string new_str=std::string(value->data(),value->size());
*true_value=std::move(new_str);
}
else{
uint64_t value_id,value_offset;
value->remove_prefix(1);
Status s=ParseFakeValueForValuelog(*value,value_id,value_offset);
if(!s.ok())return s;
return ReadValueLog(value_id,value_offset,true_value);
}
return Status::OK();
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) { std::string* value) {
std::string file_name_ = ValueLogFileName(dbname_, file_id); std::string file_name_ = ValueLogFileName(dbname_, file_id);
mutex_.Lock(); mutex_.Lock();
if(file_id==valuelogfile_number_||config::mem_value_log_number==0){
if(file_id==valuelogfile_number_||mem_value_log_number_==0){
mutex_.Unlock(); mutex_.Unlock();
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
uint64_t value_len;
uint64_t value_len=0;
inFile.seekg(offset); inFile.seekg(offset);
inFile.read((char*)(&value_len),sizeof(uint64_t)); inFile.read((char*)(&value_len),sizeof(uint64_t));
char buf[value_len];
char* buf=new char[value_len];
inFile.read(buf,value_len); inFile.read(buf,value_len);
inFile.close(); inFile.close();
*value=std::string(buf,value_len); *value=std::string(buf,value_len);
delete buf;
return Status::OK(); return Status::OK();
} }
@ -1819,54 +1793,38 @@ void DBImpl::GarbageCollect() {
Status s = env_->GetChildren(dbname_, &filenames); Status s = env_->GetChildren(dbname_, &filenames);
Log(options_.info_log, "start gc "); Log(options_.info_log, "start gc ");
assert(s.ok()); assert(s.ok());
std::set<std::string> valuelog_set;
for (const auto& filename:filenames) {
if (IsValueLogFile(filename)){
uint64_t cur_log_number = GetValueLogID(filename);
if (cur_log_number == valuelogfile_number_) {
continue;
}
auto tmp_name = ValueLogFileName(dbname_, cur_log_number);
if (!versions_->checkOldValueLog(tmp_name) &&
valuelog_origin[cur_log_number]) {
//std::cout<<((float)valuelog_usage[cur_log_number]) /(float)valuelog_origin[cur_log_number]<<std::endl;
if (((float)valuelog_usage[cur_log_number]) /
(float)valuelog_origin[cur_log_number] <= 0.6) {
valuelog_set.emplace(filename);
}
}
std::vector<uint64_t> gc_valuelog_id_vector;
mutex_.Lock();// for visit valuelog_origin/usage
for(const auto&pr:valuelog_origin){
if(
((float)valuelog_usage[pr.first])/pr.second<config::GC_THRESHOLD
){
gc_valuelog_id_vector.push_back(pr.first);
} }
} }
// std::cout << "valuelog_set size: " << valuelog_set.size() << std::endl;
Log(options_.info_log, "valuelog_set size: %d", valuelog_set.size());
//bool tmp_judge=false;//only clean one file
for (std::string valuelog_name : valuelog_set) {
Log(options_.info_log, ("gc processing: "+valuelog_name).data());
uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name = ValueLogFileName(dbname_, cur_log_number);
for(const auto&id:gc_valuelog_id_vector){
valuelog_origin.erase(id);
valuelog_usage.erase(id);
}
mutex_.Unlock();
for (uint64_t cur_log_number : gc_valuelog_id_vector) {
std::string valuelog_name = ValueLogFileName(dbname_, cur_log_number);
Log(options_.info_log, "gc processing: %s",valuelog_name.c_str());
if (cur_log_number == valuelogfile_number_) { if (cur_log_number == valuelogfile_number_) {
continue; continue;
} }
// 初始化offset为占用大小
uint64_t current_offset = sizeof(uint64_t);
uint64_t current_offset = 0;
uint64_t tmp_offset = current_offset; uint64_t tmp_offset = current_offset;
int cnt = 0;
// Open the file in binary mode for reading
std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary);
if (!cur_valuelog.is_open()) {
std::cerr << "Failed to open file: " << valuelog_name
<< " for reading cur_valuelog!" << std::endl;
continue;
}
assert(cur_valuelog.is_open());
while (true) { while (true) {
tmp_offset = current_offset; tmp_offset = current_offset;
++cnt;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// 读取一个 kv 对 // 读取一个 kv 对
uint64_t key_len, val_len; uint64_t key_len, val_len;
@ -1881,10 +1839,7 @@ void DBImpl::GarbageCollect() {
if (cur_valuelog.eof()) { if (cur_valuelog.eof()) {
break; // 正常退出条件:到达文件末尾 break; // 正常退出条件:到达文件末尾
} }
if (!cur_valuelog.good()) {
assert(0);
}
assert(cur_valuelog.good());
current_offset += sizeof(uint64_t); current_offset += sizeof(uint64_t);
@ -1901,7 +1856,6 @@ void DBImpl::GarbageCollect() {
cur_valuelog.read(key_buf, key_len); cur_valuelog.read(key_buf, key_len);
assert(cur_valuelog.good()); assert(cur_valuelog.good());
current_offset += key_len; current_offset += key_len;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// Assign the read key data to the Slice // Assign the read key data to the Slice
key = Slice(key_buf, key_len); key = Slice(key_buf, key_len);
@ -1930,8 +1884,16 @@ void DBImpl::GarbageCollect() {
// Key 不存在,忽略此记录 // Key 不存在,忽略此记录
continue; continue;
} }
else if(!status.ok()){//handle error:skip this valuelog
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
break;
}
else{ else{
assert(status.ok());
if(stored_value.data()[0]==(char)(0x00)){ if(stored_value.data()[0]==(char)(0x00)){
//value is too small //value is too small
continue; continue;
@ -1940,9 +1902,20 @@ void DBImpl::GarbageCollect() {
// 检查 valuelog_id 和 offset 是否匹配 // 检查 valuelog_id 和 offset 是否匹配
uint64_t stored_valuelog_id, stored_offset; uint64_t stored_valuelog_id, stored_offset;
ParseStoredValue(stored_value.substr(1), stored_valuelog_id,
status=ParseFakeValueForValuelog(stored_value.substr(1), stored_valuelog_id,
stored_offset); stored_offset);
if (stored_valuelog_id != GetValueLogID(valuelog_name) ||
if(!status.ok()){//handle error:skip this valuelog
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
break;
}
if (stored_valuelog_id != cur_log_number ||
stored_offset != tmp_offset) { stored_offset != tmp_offset) {
// 记录无效,跳过 // 记录无效,跳过
continue; continue;
@ -2034,7 +2007,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->RemoveObsoleteFiles(); impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction(); impl->MaybeScheduleCompaction();
impl->InitializeExistingLogs(); impl->InitializeExistingLogs();
impl->addNewValueLog();
//impl->addNewValueLog();
} }
impl->mutex_.Unlock(); impl->mutex_.Unlock();
if (s.ok()) { if (s.ok()) {
@ -2071,12 +2044,6 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
result = del; result = del;
} }
} }
else if(IsValueLogFile(filenames[i])){
Status del = env->RemoveFile(dbname + "/" + filenames[i]);
if (result.ok() && !del.ok()) {
result = del;
}
}
} }
env->UnlockFile(lock); // Ignore error since state is already gone env->UnlockFile(lock); // Ignore error since state is already gone
env->RemoveFile(lockname); env->RemoveFile(lockname);
@ -2085,57 +2052,115 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result; return result;
} }
// 读取所有现有日志文件的 file_data_size
// recover for valuelog
void DBImpl::InitializeExistingLogs() { void DBImpl::InitializeExistingLogs() {
std::vector<std::string> filenames; std::vector<std::string> filenames;
Status s = env_->GetChildren(dbname_, &filenames); Status s = env_->GetChildren(dbname_, &filenames);
Log(options_.info_log, "start set file map ");
Log(options_.info_log, "start recover for valuelog");
assert(s.ok()); assert(s.ok());
std::set<std::string> valuelog_set;
std::set<uint64_t> all_valuelog_ids;
std::set<uint64_t> live_valuelog_ids;
uint64_t latest_valuelog_id=0;
uint64_t latest_valuelog_offset=0;
for (const auto& filename : filenames) { for (const auto& filename : filenames) {
if (IsValueLogFile(filename)) {
uint64_t cur_log_number = GetValueLogID(filename);
uint64_t file_data_size = ReadFileSize(cur_log_number);
valuelog_origin[cur_log_number]=file_data_size;
valuelog_usage[cur_log_number]=0;
}
uint64_t valuelog_number;
FileType type;
ParseFileName(filename,&valuelog_number,&type);
if(type==FileType::kValueLogFile)all_valuelog_ids.emplace(valuelog_number);
} }
mutex_.Unlock(); mutex_.Unlock();
auto db_iter=NewOriginalIterator(ReadOptions()); auto db_iter=NewOriginalIterator(ReadOptions());
for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){ for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){
auto value=db_iter->value(); auto value=db_iter->value();
if(value.size()&&value[0]==0x01){ if(value.size()&&value[0]==0x01){
value.remove_prefix(1); value.remove_prefix(1);
uint64_t valuelog_id;
auto res=GetVarint64(&value,&valuelog_id);
assert(res);
assert(valuelog_usage.count(valuelog_id));
uint64_t valuelog_id,valuelog_offset;
Status status=ParseFakeValueForValuelog(value,valuelog_id,valuelog_offset);
if(!status.ok()){//handle error:skip this value, correct?
continue;
}
valuelog_usage[valuelog_id]++; valuelog_usage[valuelog_id]++;
if(valuelog_id>=latest_valuelog_id){
if(valuelog_id==latest_valuelog_id){
latest_valuelog_offset=std::max(latest_valuelog_offset,valuelog_offset);
}
else{
latest_valuelog_id=valuelog_id;
latest_valuelog_offset=valuelog_offset;
}
}
} }
} }
delete db_iter; delete db_iter;
mutex_.Lock(); mutex_.Lock();
}
for(const auto& pr:valuelog_usage){
live_valuelog_ids.emplace(pr.first);
}
// 读取单个文件的 file_data_size
uint64_t DBImpl::ReadFileSize(uint64_t log_number) {
auto file_name = ValueLogFileName(dbname_, log_number);
std::ifstream valueFile(file_name, std::ios::in | std::ios::binary);
if (!valueFile.is_open()) {
std::cerr << "Failed to open file: " << file_name << std::endl;
return 0;
for(const auto &id:all_valuelog_ids){
if(!live_valuelog_ids.count(id)){
//useless valuelog, delete directly
auto valuelog_name=ValueLogFileName(dbname_,id);
s=env_->RemoveFile(valuelog_name);
assert(s.ok());
}
} }
uint64_t file_data_size = 0;
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
if (valueFile.fail() || valueFile.bad()) {
std::cerr << "Failed to read data size from file: " << file_name
<< std::endl;
valueFile.close();
return 0;
if(latest_valuelog_id>0){//delete data that was written in valuelog but not written in WAL
auto valuelog_name=ValueLogFileName(dbname_,latest_valuelog_id);
std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary);
uint64_t value_len,key_len;
inFile.seekg(latest_valuelog_offset);
inFile.read((char*)(&value_len),sizeof(uint64_t));
latest_valuelog_offset+=value_len+sizeof(uint64_t);
inFile.seekg(latest_valuelog_offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
latest_valuelog_offset+=key_len+sizeof(uint64_t);
char* buf=new char[latest_valuelog_offset];
inFile.seekg(0);
inFile.read(buf,latest_valuelog_offset);
inFile.close();
std::ofstream trunc_file(valuelog_name, std::ios::out | std::ios::binary | std::ios::trunc);
trunc_file.write(buf,latest_valuelog_offset);
trunc_file.close();
delete buf;
}
for(const auto&id:live_valuelog_ids){//update valuelog_origin
auto valuelog_name=ValueLogFileName(dbname_,id);
std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary);
int data_cnt=0;
uint64_t value_len,key_len;
int cur_offset=0;
while(1){
inFile.read((char*)(&value_len),sizeof(uint64_t));
if (inFile.eof()) {
break; // 正常退出条件:到达文件末尾
}
cur_offset+=value_len+sizeof(uint64_t);
inFile.seekg(cur_offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
cur_offset+=key_len+sizeof(uint64_t);
data_cnt++;
}
valuelog_origin[id]=data_cnt;
} }
valueFile.close();
return file_data_size;
} }
} // namespace leveldb } // namespace leveldb

+ 10
- 6
db/db_impl.h View File

@ -62,7 +62,7 @@ class DBImpl : public DB {
std::string* value) override; std::string* value) override;
Iterator* NewIterator(const ReadOptions&) override; Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewOriginalIterator(const ReadOptions&); Iterator* NewOriginalIterator(const ReadOptions&);
Iterator* NewUnorderedIterator(const ReadOptions&) override;
Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key) override;//upper key not included
const Snapshot* GetSnapshot() override; const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override; void ReleaseSnapshot(const Snapshot* snapshot) override;
bool GetProperty(const Slice& property, std::string* value) override; bool GetProperty(const Slice& property, std::string* value) override;
@ -78,6 +78,8 @@ class DBImpl : public DB {
Status ReadValueLog(uint64_t file_id, uint64_t offset, Status ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) override; std::string* value) override;
Status parseTrueValue(Slice* value,std::string* true_value) override;
Status ReadValueLogRange(uint64_t file_id,std::vector<uint64_t> offsets, Status ReadValueLogRange(uint64_t file_id,std::vector<uint64_t> offsets,
std::string* value); std::string* value);
@ -108,8 +110,6 @@ class DBImpl : public DB {
void InitializeExistingLogs(); void InitializeExistingLogs();
uint64_t ReadFileSize(uint64_t log_number);
private: private:
friend class DB; friend class DB;
@ -185,7 +185,7 @@ class DBImpl : public DB {
void BackgroundCall(); void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void GarbageCollect();
void CleanupCompaction(CompactionState* compact) void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
@ -236,10 +236,10 @@ class DBImpl : public DB {
WritableFile* valuelogfile_; WritableFile* valuelogfile_;
int valuelogfile_offset = 0; int valuelogfile_offset = 0;
uint64_t logfile_number_; uint64_t logfile_number_;
uint64_t valuelogfile_number_;
uint64_t valuelogfile_number_=0;
log::Writer* log_; log::Writer* log_;
std::map<uint64_t, uint64_t> oldvaluelog_ids; std::map<uint64_t, uint64_t> oldvaluelog_ids;
int mem_value_log_number_;//if =0, don't use cache
Cache* valuelog_cache; Cache* valuelog_cache;
std::map<uint64_t, uint64_t> valuelog_usage; std::map<uint64_t, uint64_t> valuelog_usage;
std::map<uint64_t, uint64_t> valuelog_origin; std::map<uint64_t, uint64_t> valuelog_origin;
@ -269,6 +269,10 @@ class DBImpl : public DB {
VersionSet* const versions_ GUARDED_BY(mutex_); VersionSet* const versions_ GUARDED_BY(mutex_);
int use_valuelog_length=5000;
int value_log_size_;
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
Status bg_error_ GUARDED_BY(mutex_); Status bg_error_ GUARDED_BY(mutex_);

+ 9
- 1
db/db_test.cc View File

@ -340,7 +340,9 @@ class DBTest : public testing::Test {
opts = CurrentOptions(); opts = CurrentOptions();
opts.create_if_missing = true; opts.create_if_missing = true;
} }
opts.use_valuelog_length=-1;
last_options_ = opts; last_options_ = opts;
return DB::Open(opts, dbname_, &db_); return DB::Open(opts, dbname_, &db_);
} }
@ -413,9 +415,13 @@ class DBTest : public testing::Test {
result += ", "; result += ", ";
} }
first = false; first = false;
std::string res;
Slice true_val;
switch (ikey.type) { switch (ikey.type) {
case kTypeValue: case kTypeValue:
result += iter->value().ToString();
true_val=iter->value();
dbfull()->parseTrueValue(&true_val,&res);
result += res;
break; break;
case kTypeDeletion: case kTypeDeletion:
result += "DEL"; result += "DEL";
@ -1107,6 +1113,7 @@ TEST_F(DBTest, MinorCompactionsHappen) {
TEST_F(DBTest, RecoverWithLargeLog) { TEST_F(DBTest, RecoverWithLargeLog) {
{ {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.use_valuelog_length=-1;
Reopen(&options); Reopen(&options);
ASSERT_LEVELDB_OK(Put("big1", std::string(200000, '1'))); ASSERT_LEVELDB_OK(Put("big1", std::string(200000, '1')));
ASSERT_LEVELDB_OK(Put("big2", std::string(200000, '2'))); ASSERT_LEVELDB_OK(Put("big2", std::string(200000, '2')));
@ -1118,6 +1125,7 @@ TEST_F(DBTest, RecoverWithLargeLog) {
// Make sure that if we re-open with a small write buffer size that // Make sure that if we re-open with a small write buffer size that
// we flush table files in the middle of a large log file. // we flush table files in the middle of a large log file.
Options options = CurrentOptions(); Options options = CurrentOptions();
options.use_valuelog_length=-1;
options.write_buffer_size = 100000; options.write_buffer_size = 100000;
Reopen(&options); Reopen(&options);
ASSERT_EQ(NumTableFilesAtLevel(0), 3); ASSERT_EQ(NumTableFilesAtLevel(0), 3);

+ 1
- 5
db/dbformat.h View File

@ -44,12 +44,8 @@ static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration. // Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576; static const int kReadBytesPeriod = 1048576;
// maximum size of value_log file
static const int value_log_size=1<<26;
//1<<33/1<<26=1<<7
static const int mem_value_log_number=0;//8GB
static const int max_unorder_iter_memory_usage=64<<20; //32MB
static const float GC_THRESHOLD=0.6;
} // namespace config } // namespace config

+ 63
- 0
db/fields.cc View File

@ -0,0 +1,63 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#include <string>
#include "db/dbformat.h"
#include "leveldb/fields.h"
namespace leveldb {
std::string SerializeValue(const FieldArray& fields){
std::string res_="";
PutVarint64(&res_,(uint64_t)fields.size());
for(auto pr:fields){
PutLengthPrefixedSlice(&res_, pr.first);
PutLengthPrefixedSlice(&res_, pr.second);
}
return res_;
}
void DeserializeValue(const std::string& value_str,FieldArray* res){
Slice slice=Slice(value_str.c_str());
uint64_t siz;
bool tmpres=GetVarint64(&slice,&siz);
assert(tmpres);
res->clear();
for(int i=0;i<siz;i++){
Slice value_name;
Slice value;
tmpres=GetLengthPrefixedSlice(&slice,&value_name);
assert(tmpres);
tmpres=GetLengthPrefixedSlice(&slice,&value);
assert(tmpres);
res->emplace_back(value_name,value);
}
}
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys){
auto it=db->NewUnorderedIterator(options,Slice(),Slice());
keys->clear();
while(it->Valid()){
auto val=it->value();
FieldArray arr;
auto str_val=std::string(val.data(),val.size());
DeserializeValue(str_val,&arr);
for(auto pr:arr){
if(pr.first==field.first&&pr.second==field.second){
Slice key=it->key();
keys->push_back(std::string(key.data(),key.size()));
break;
}
}
it->Next();
}
delete it;
return Status::OK();
}
}
#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_

+ 4
- 1
db/filename.cc View File

@ -117,7 +117,10 @@ bool ParseFileName(const std::string& filename, uint64_t* number,
*type = kTableFile; *type = kTableFile;
} else if (suffix == Slice(".dbtmp")) { } else if (suffix == Slice(".dbtmp")) {
*type = kTempFile; *type = kTempFile;
} else {
} else if (suffix == Slice(".valuelog")){
*type = kValueLogFile;
}
else {
return false; return false;
} }
*number = num; *number = num;

+ 1
- 0
db/filename.h View File

@ -25,6 +25,7 @@ enum FileType {
kDescriptorFile, kDescriptorFile,
kCurrentFile, kCurrentFile,
kTempFile, kTempFile,
kValueLogFile,
kInfoLogFile // Either the current one, or an old one kInfoLogFile // Either the current one, or an old one
}; };

+ 0
- 302
db/prefetch_iter.cc View File

@ -1,302 +0,0 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>
#include <fstream>
#include <thread>
#include <queue>
#include "db/prefetch_iter.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBPreFetchIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBPreFetchIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right};
DBPreFetchIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num)
:
db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {}
DBPreFetchIter(const DBPreFetchIter&) = delete;
DBPreFetchIter& operator=(const DBPreFetchIter&) = delete;
~DBPreFetchIter() override {
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// }
delete prefetch_iter_;
//std::cout<<"fetch:"<<fetched_<<" unfetch:"<<unfetched_<<"\n";
delete iter_;
}
bool Valid() const override { return iter_->Valid(); }
Slice key() const override {
return iter_->key();
}
Slice value() const override {
// if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){
// fetched_++;
// return prefetch_array[cur_pos];
// }
// else{
// unfetched_++;
buf_for_value=std::move(GetAndParseTrueValue(iter_->value()));
return Slice(buf_for_value.data(),buf_for_value.size());
//}
}
Status status() const override {
return iter_->status();
}
void Next() override;
void Prev() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
private:
std::string GetAndParseTrueValue(Slice tmp_value)const{
if(tmp_value.size()==0){
return "";
}
if(tmp_value.data()[0]==(char)(0x00)){
tmp_value.remove_prefix(1);
return std::string(tmp_value.data(),tmp_value.size());
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&tmp_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
std::string str;
Status s=db_->ReadValueLog(file_id,valuelog_offset, &str);
return std::move(str);
}
// void PreFetchThreadForward(){
// std::thread prefetch_threads[prefetch_num_];
// std::queue<std::pair<std::string,int>> q;
// port::Mutex* lock=new port::Mutex();
// port::CondVar* cv=new port::CondVar(lock);
// bool local_stop_flag=false;
// int remaining_task_cnt=0;
// bool main_finish=false;
// for(int i=0;i<prefetch_num_;i++){
// prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
// {
// int pos;
// while(1){
// lock->Lock();
// while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
// cv->Wait();
// }
// if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
// cv->SignalAll();
// lock->Unlock();
// break;
// }
// std::string s=q.front().first;
// pos=q.front().second;
// q.pop();
// remaining_task_cnt--;
// lock->Unlock();
// prefetch_array[pos]=std::move(GetAndParseTrueValue(s));
// prefetched_array[pos].store(true);
// }
// }
// );
// }
// Slice val;
// int pos=0;
// for(int i=0;i<100&&prefetch_iter_->Valid();i++){
// prefetch_iter_->Next();
// pos++;
// }
// for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){
// val=prefetch_iter_->value();
// lock->Lock();
// q.push({std::string(val.data(),val.size()),pos});
// cv->Signal();
// remaining_task_cnt++;
// lock->Unlock();
// pos++;
// }
// lock->Lock();
// main_finish=true;
// while(remaining_task_cnt){
// cv->Wait();
// }
// lock->Unlock();
// cv->SignalAll();
// for (auto& thread : prefetch_threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// }
// void PreFetchThreadBackward(){
// std::thread prefetch_threads[prefetch_num_];
// std::queue<std::pair<std::string,int>> q;
// port::Mutex* lock=new port::Mutex();
// port::CondVar* cv=new port::CondVar(lock);
// bool local_stop_flag=false;
// int remaining_task_cnt=0;
// bool main_finish=false;
// for(int i=0;i<prefetch_num_;i++){
// prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
// {
// int pos;
// while(1){
// lock->Lock();
// while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
// cv->Wait();
// }
// if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
// cv->SignalAll();
// lock->Unlock();
// break;
// }
// std::string s=q.front().first;
// pos=q.front().second;
// q.pop();
// remaining_task_cnt--;
// lock->Unlock();
// prefetch_array[pos]=std::move(GetAndParseTrueValue(s));
// prefetched_array[pos].store(true);
// }
// }
// );
// }
// Slice val;
// int pos=1000000;
// for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){
// val=prefetch_iter_->value();
// lock->Lock();
// q.push({std::string(val.data(),val.size()),pos});
// cv->Signal();
// remaining_task_cnt++;
// lock->Unlock();
// pos--;
// }
// lock->Lock();
// main_finish=true;
// while(remaining_task_cnt){
// cv->Wait();
// }
// lock->Unlock();
// cv->SignalAll();
// for (auto& thread : prefetch_threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// }
DBImpl* db_;
Iterator* const iter_;
Iterator* const prefetch_iter_;
int prefetch_num_;
// std::atomic<bool> stop_flag;
// std::string prefetch_array[1000005];
// std::atomic<bool> prefetched_array[1000005];
std::thread prefetch_thread;
mutable std::string buf_for_value;
int cur_pos=0;
mutable int fetched_=0;
mutable int unfetched_=0;
};
void DBPreFetchIter::Next() {
iter_->Next();
//cur_pos++;
}
void DBPreFetchIter::Prev() {
iter_->Prev();
//cur_pos--;
}
void DBPreFetchIter::Seek(const Slice& target) {
iter_->Seek(target);
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=0;
// prefetch_iter_->Seek(target);
// prefetch_thread=std::thread([this]() {
// PreFetchThreadForward();
// });
}
void DBPreFetchIter::SeekToFirst() {
iter_->SeekToFirst();
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=0;
// prefetch_iter_->SeekToFirst();
// prefetch_thread=std::thread([this]() {
// PreFetchThreadForward();
// });
}
void DBPreFetchIter::SeekToLast() {
iter_->SeekToLast();
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=1000000;
// prefetch_thread=std::thread([this]() {
// prefetch_iter_->SeekToLast();
// PreFetchThreadBackward();
// });
}
} // anonymous namespace
Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,Iterator* prefetch_iter,int prefetch_num) {
return new DBPreFetchIter(db,db_iter,prefetch_iter,prefetch_num);
}
} // namespace leveldb

+ 6
- 4
db/recovery_test.cc View File

@ -283,9 +283,11 @@ TEST_F(RecoveryTest, MultipleLogFiles) {
// Make a bunch of uncompacted log files. // Make a bunch of uncompacted log files.
uint64_t old_log = FirstLogFile(); uint64_t old_log = FirstLogFile();
MakeLogFile(old_log + 1, 1000, "hello", "world");
MakeLogFile(old_log + 2, 1001, "hi", "there");
MakeLogFile(old_log + 3, 1002, "foo", "bar2");
std::string prefix;
prefix+=(char)0x00;
MakeLogFile(old_log + 1, 1000, "hello", prefix+"world");
MakeLogFile(old_log + 2, 1001, "hi", prefix+"there");
MakeLogFile(old_log + 3, 1002, "foo", prefix+"bar2");
// Recover and check that all log files were processed. // Recover and check that all log files were processed.
Open(); Open();
@ -310,7 +312,7 @@ TEST_F(RecoveryTest, MultipleLogFiles) {
// Check that introducing an older log file does not cause it to be re-read. // Check that introducing an older log file does not cause it to be re-read.
Close(); Close();
MakeLogFile(old_log + 1, 2000, "hello", "stale write");
MakeLogFile(old_log + 1, 2000, "hello", prefix+"stale write");
Open(); Open();
ASSERT_LE(1, NumTables()); ASSERT_LE(1, NumTables());
ASSERT_EQ(1, NumLogs()); ASSERT_EQ(1, NumLogs());

+ 1
- 1
db/repair.cc View File

@ -369,7 +369,7 @@ class Repairer {
// TODO(opt): separate out into multiple levels // TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i]; const TableInfo& t = tables_[i];
edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest, edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest,
t.meta.largest,t.meta.valuelog_id);
t.meta.largest);
} }
// std::fprintf(stderr, // std::fprintf(stderr,

+ 104
- 0
db/true_iter.cc View File

@ -0,0 +1,104 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>
#include <fstream>
#include <thread>
#include <queue>
#include "db/true_iter.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBTrueIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBTrueIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
DBTrueIter(DBImpl* db, Iterator* iter)
:
db_(db),iter_(iter){}
DBTrueIter(const DBTrueIter&) = delete;
DBTrueIter& operator=(const DBTrueIter&) = delete;
~DBTrueIter() override {
delete iter_;
}
bool Valid() const override { return iter_->Valid(); }
Slice key() const override {
return iter_->key();
}
Slice value() const override {
buf_for_value=std::move(GetAndParseTrueValue(iter_->value()));
return Slice(buf_for_value.data(),buf_for_value.size());
}
Status status() const override {
return iter_->status();
}
void Next() override;
void Prev() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
private:
std::string GetAndParseTrueValue(Slice tmp_value)const{
if(tmp_value.size()==0){
return "";
}
std::string str;
Status s=db_->parseTrueValue(&tmp_value,&str);
return std::move(str);
}
DBImpl* db_;
Iterator* const iter_;
mutable std::string buf_for_value;
};
void DBTrueIter::Next() {
iter_->Next();
}
void DBTrueIter::Prev() {
iter_->Prev();
}
void DBTrueIter::Seek(const Slice& target) {
iter_->Seek(target);
}
void DBTrueIter::SeekToFirst() {
iter_->SeekToFirst();
}
void DBTrueIter::SeekToLast() {
iter_->SeekToLast();
}
} // anonymous namespace
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter) {
return new DBTrueIter(db,db_iter);
}
} // namespace leveldb

db/prefetch_iter.h → db/true_iter.h View File

@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#ifndef STORAGE_LEVELDB_DB_TRUE_ITER_H_
#define STORAGE_LEVELDB_DB_TRUE_ITER_H_
#include <cstdint> #include <cstdint>
@ -14,9 +14,8 @@ namespace leveldb {
class DBImpl; class DBImpl;
// add a prefetch function for db_iter
Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter, Iterator* prefetch_iter,int prefetch_num);
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter);
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#endif // STORAGE_LEVELDB_DB_TRUE_ITER_H_

+ 28
- 10
db/unordered_iter.cc View File

@ -40,9 +40,18 @@ class UnorderedIter : public Iterator {
// just before all entries whose user key == this->key(). // just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right}; enum IterPos {Left,Mid,Right};
UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name)
UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator)
: :
db_(db),iter_(iter),db_name_(db_name){}
db_(db),iter_(iter),db_name_(db_name),max_unorder_iter_memory_usage_(max_unorder_iter_memory_usage),lower_key_(lower_key),upper_key_(upper_key),comparator_(user_comparator){
first_one=true;
if(lower_key_.empty())iter_->SeekToFirst();
else iter_->Seek(lower_key);
if(!iter_->Valid()){
mode=2;
return;
}
Next();
}
UnorderedIter(const UnorderedIter&) = delete; UnorderedIter(const UnorderedIter&) = delete;
UnorderedIter& operator=(const UnorderedIter&) = delete; UnorderedIter& operator=(const UnorderedIter&) = delete;
@ -114,6 +123,12 @@ class UnorderedIter : public Iterator {
now_key=Slice(buf_for_now_key,key_len); now_key=Slice(buf_for_now_key,key_len);
} }
bool keyGreaterThanRequire(){
if(!iter_->Valid())return true;
else if(upper_key_.empty())return false;
else return(comparator_->Compare(iter_->key(),upper_key_)>=0);
}
DBImpl* db_; DBImpl* db_;
Iterator* const iter_; Iterator* const iter_;
@ -126,7 +141,6 @@ class UnorderedIter : public Iterator {
bool iter_valid=false; bool iter_valid=false;
std::map<uint64_t,std::vector<uint64_t>> valuelog_map; std::map<uint64_t,std::vector<uint64_t>> valuelog_map;
int memory_usage=0; int memory_usage=0;
uint64_t max_memory_usage=config::max_unorder_iter_memory_usage;
std::string db_name_; std::string db_name_;
std::ifstream* current_file=nullptr; std::ifstream* current_file=nullptr;
@ -136,20 +150,26 @@ class UnorderedIter : public Iterator {
int mode=0;//0=iter, 1=valuelog, 2=invalid int mode=0;//0=iter, 1=valuelog, 2=invalid
std::map<uint64_t,std::vector<uint64_t>>::iterator valuelog_map_iter; std::map<uint64_t,std::vector<uint64_t>>::iterator valuelog_map_iter;
int vec_idx=-1; int vec_idx=-1;
int max_unorder_iter_memory_usage_;
const Slice lower_key_;
const Slice upper_key_;
const Comparator* comparator_;
}; };
void UnorderedIter::Next() { void UnorderedIter::Next() {
if(mode==0){ if(mode==0){
if(iter_->Valid())
if(iter_->Valid()&&!keyGreaterThanRequire())
{ {
if(first_one){ if(first_one){
first_one=false; first_one=false;
} }
else iter_->Next(); else iter_->Next();
for(; for(;
iter_->Valid()&&memory_usage<max_memory_usage;
iter_->Valid()&&memory_usage<max_unorder_iter_memory_usage_&&!keyGreaterThanRequire();
memory_usage+=2*sizeof(uint64_t),iter_->Next()) memory_usage+=2*sizeof(uint64_t),iter_->Next())
{ {
if(checkLongValue(iter_->value())){ if(checkLongValue(iter_->value())){
@ -227,16 +247,14 @@ void UnorderedIter::Seek(const Slice& target) {
} }
void UnorderedIter::SeekToFirst() { void UnorderedIter::SeekToFirst() {
first_one=true;
iter_->SeekToFirst();
Next();
assert(0);
} }
void UnorderedIter::SeekToLast() { void UnorderedIter::SeekToLast() {
assert(0); assert(0);
} }
} // anonymous namespace } // anonymous namespace
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name) {
return new UnorderedIter(db,db_iter,db_name);
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) {
return new UnorderedIter(db,db_iter,db_name,max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator);
} }
} // namespace leveldb } // namespace leveldb

+ 1
- 1
db/unordered_iter.h View File

@ -15,7 +15,7 @@ namespace leveldb {
class DBImpl; class DBImpl;
// add a prefetch function for db_iter // add a prefetch function for db_iter
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name);
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* comparator);
} // namespace leveldb } // namespace leveldb

+ 0
- 4
db/version_edit.cc View File

@ -79,7 +79,6 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, new_files_[i].first); // level PutVarint32(dst, new_files_[i].first); // level
PutVarint64(dst, f.number); PutVarint64(dst, f.number);
PutVarint64(dst, f.file_size); PutVarint64(dst, f.file_size);
PutVarint64(dst, f.valuelog_id);
PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode());
} }
@ -179,7 +178,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
case kNewFile: case kNewFile:
if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) && if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
GetVarint64(&input, &f.file_size) && GetVarint64(&input, &f.file_size) &&
GetVarint64(&input,&f.valuelog_id) &&
GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) { GetInternalKey(&input, &f.largest)) {
new_files_.push_back(std::make_pair(level, f)); new_files_.push_back(std::make_pair(level, f));
@ -249,8 +247,6 @@ std::string VersionEdit::DebugString() const {
r.append(" "); r.append(" ");
AppendNumberTo(&r, f.file_size); AppendNumberTo(&r, f.file_size);
r.append(" "); r.append(" ");
AppendNumberTo(&r, f.valuelog_id);
r.append(" ");
r.append(f.smallest.DebugString()); r.append(f.smallest.DebugString());
r.append(" .. "); r.append(" .. ");
r.append(f.largest.DebugString()); r.append(f.largest.DebugString());

+ 1
- 3
db/version_edit.h View File

@ -22,7 +22,6 @@ struct FileMetaData {
int allowed_seeks; // Seeks allowed until compaction int allowed_seeks; // Seeks allowed until compaction
uint64_t number; uint64_t number;
uint64_t file_size; // File size in bytes uint64_t file_size; // File size in bytes
uint64_t valuelog_id=0;
InternalKey smallest; // Smallest internal key served by table InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table InternalKey largest; // Largest internal key served by table
@ -63,13 +62,12 @@ class VersionEdit {
// REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file // REQUIRES: "smallest" and "largest" are smallest and largest keys in file
void AddFile(int level, uint64_t file, uint64_t file_size, void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest,uint64_t valuelog_id=0) {
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f; FileMetaData f;
f.number = file; f.number = file;
f.file_size = file_size; f.file_size = file_size;
f.smallest = smallest; f.smallest = smallest;
f.largest = largest; f.largest = largest;
f.valuelog_id=valuelog_id;
new_files_.push_back(std::make_pair(level, f)); new_files_.push_back(std::make_pair(level, f));
} }

+ 7
- 5
db/version_set.cc View File

@ -978,13 +978,15 @@ Status VersionSet::Recover(bool* save_manifest) {
MarkFileNumberUsed(log_number); MarkFileNumberUsed(log_number);
} }
assert(s.ok());
//assert(s.ok());
std::vector<std::string> filenames; std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); env_->GetChildren(dbname_, &filenames);
for (const auto& filename:filenames) { for (const auto& filename:filenames) {
if (IsValueLogFile(filename)){
uint64_t valuelog_number = GetValueLogID(filename);
//std::cout<<valuelog_number<<std::endl;
uint64_t valuelog_number;
FileType type;
ParseFileName(filename,&valuelog_number,&type);
if (type==FileType::kValueLogFile){
MarkFileNumberUsed(valuelog_number); MarkFileNumberUsed(valuelog_number);
} }
} }
@ -1113,7 +1115,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
const std::vector<FileMetaData*>& files = current_->files_[level]; const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i]; const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->valuelog_id);
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
} }
} }

+ 6
- 4
db/write_batch.cc View File

@ -131,15 +131,17 @@ class ValueLogInserter : public WriteBatch::Handler {
public: public:
WriteBatch writeBatch_; WriteBatch writeBatch_;
DB* db_; DB* db_;
int use_valuelog_len_;
std::vector<std::pair<Slice,Slice>> kvs; std::vector<std::pair<Slice,Slice>> kvs;
ValueLogInserter(DB* db){
ValueLogInserter(DB* db,int use_valuelog_len){
db_=db; db_=db;
use_valuelog_len_=use_valuelog_len;
} }
void Put(const Slice& key, const Slice& value) override { void Put(const Slice& key, const Slice& value) override {
Slice new_value; Slice new_value;
std::string buf; std::string buf;
if(value.size()<100){
if(value.size()<use_valuelog_len_||use_valuelog_len_==-1){
buf+=(char)(0x00);// should set in key buf+=(char)(0x00);// should set in key
buf.append(value.data(),value.size()); buf.append(value.data(),value.size());
writeBatch_.Put(key,Slice(buf)); writeBatch_.Put(key,Slice(buf));
@ -229,8 +231,8 @@ Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,p
return Status::OK(); return Status::OK();
} }
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){
ValueLogInserter inserter(db_);
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_,int use_valuelog_length){
ValueLogInserter inserter(db_,use_valuelog_length);
auto res=b->Iterate(&inserter); auto res=b->Iterate(&inserter);
inserter.batch_insert(); inserter.batch_insert();
*b=inserter.writeBatch_; *b=inserter.writeBatch_;

+ 1
- 1
db/write_batch_internal.h View File

@ -39,7 +39,7 @@ class WriteBatchInternal {
static Status checkValueLog(WriteBatch* batch,DB* db_,Slice* lock_key,port::CondVar* cond_var_); static Status checkValueLog(WriteBatch* batch,DB* db_,Slice* lock_key,port::CondVar* cond_var_);
static Status ConverToValueLog(WriteBatch* batch,DB* db_);
static Status ConverToValueLog(WriteBatch* batch,DB* db_,int use_valuelog_length);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);
}; };

+ 8
- 1
include/leveldb/db.h View File

@ -124,6 +124,11 @@ class LEVELDB_EXPORT DB {
return Status::Corruption("not imp"); return Status::Corruption("not imp");
} }
virtual Status parseTrueValue(Slice* value,std::string* true_value){
assert(0);
return Status::Corruption("not imp");
}
// Return a heap-allocated iterator over the contents of the database. // Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
@ -133,7 +138,7 @@ class LEVELDB_EXPORT DB {
// The returned iterator should be deleted before this db is deleted. // The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0; virtual Iterator* NewIterator(const ReadOptions& options) = 0;
virtual Iterator* NewUnorderedIterator(const ReadOptions&){
virtual Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key){
assert(0); assert(0);
return nullptr; return nullptr;
}; };
@ -190,6 +195,8 @@ class LEVELDB_EXPORT DB {
virtual void CompactRange(const Slice* begin, const Slice* end) = 0; virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
virtual void TEST_GarbageCollect(){}; virtual void TEST_GarbageCollect(){};
}; };
// Destroy the contents of the specified database. // Destroy the contents of the specified database.

+ 20
- 0
include/leveldb/fields.h View File

@ -0,0 +1,20 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#include <string>
#include "leveldb/db.h"
namespace leveldb {
std::string SerializeValue(const FieldArray& fields);
void DeserializeValue(const std::string& value_str,FieldArray* res);
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys);
}
#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_

+ 16
- 0
include/leveldb/options.h View File

@ -145,6 +145,17 @@ struct LEVELDB_EXPORT Options {
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr; const FilterPolicy* filter_policy = nullptr;
//when a value's length>=this value, leveldb will use valuelog to improve performance
int use_valuelog_length=1000;//if -1,then don't use valuelog. If env=memEnv, then it must be set to -1.
// maximum size of value_log file
int value_log_size=1<<26;
//cache for valuelog(may use lot of memory)
int mem_value_log_number=0;//0=don't use valuelog cache
//memory usage limit for a single unordered iterator
float GC_THRESHOLD=0.6;
}; };
// Options that control read operations // Options that control read operations
@ -157,6 +168,9 @@ struct LEVELDB_EXPORT ReadOptions {
// Callers may wish to set this field to false for bulk scans. // Callers may wish to set this field to false for bulk scans.
bool fill_cache = true; bool fill_cache = true;
//if true then return the origin value for data:use one byte to show whether the data belong to valuelog
//if first byte is 0x00, then the rest of data is true value
//else if first byte is 0x01, then the rest of data should use ParseFakeValueForValuelog() to parse its valuelog's id and its offset in valuelog
bool find_value_log_for_gc = false; bool find_value_log_for_gc = false;
// If "snapshot" is non-null, read as of the supplied snapshot // If "snapshot" is non-null, read as of the supplied snapshot
@ -164,6 +178,8 @@ struct LEVELDB_EXPORT ReadOptions {
// not have been released). If "snapshot" is null, use an implicit // not have been released). If "snapshot" is null, use an implicit
// snapshot of the state at the beginning of this read operation. // snapshot of the state at the beginning of this read operation.
const Snapshot* snapshot = nullptr; const Snapshot* snapshot = nullptr;
int max_unorder_iter_memory_usage=64<<20; //32MB
}; };
// Options that control write operations // Options that control write operations

+ 185
- 49
test/test.cpp View File

@ -1,77 +1,202 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "util/coding.h"
#include "leveldb/fields.h"
#include <iostream> #include <iostream>
using namespace leveldb; using namespace leveldb;
using Field=std::pair<Slice,Slice>; using Field=std::pair<Slice,Slice>;
using FieldArray=std::vector<std::pair<Slice, Slice>>; using FieldArray=std::vector<std::pair<Slice, Slice>>;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.max_file_size=16*1024;
options.write_buffer_size=32*1024;
Status OpenDB(std::string dbName, DB **db,Options options=Options(),bool destroy_old_db=true) {
if(destroy_old_db){
DestroyDB(dbName,options);
}
options.create_if_missing = true; options.create_if_missing = true;
return DB::Open(options, dbName, db); return DB::Open(options, dbName, db);
} }
TEST(Test, checkIterator) {
std::string GenKeyByNum(int num,int len){
std::string key=std::to_string(num);
while(key.size()<std::to_string(len).size()){
key='0'+key;
}
return key;
}
std::string GenValueByNum(int num,int len){
std::string value;
for(int i=0;i<len;i++){
value+=std::to_string(i);
}
return value;
}
bool CompareFieldArray(const FieldArray &a, const FieldArray &b) {
if (a.size() != b.size()) return false;
for (size_t i = 0; i < a.size(); ++i) {
if (a[i].first != b[i].first || a[i].second != b[i].second) return false;
}
return true;
}
bool CompareKey(const std::vector<std::string> a, std::vector<std::string> b) {
if (a.size() != b.size()){
assert(0);
return false;
}
for (size_t i = 0; i < a.size(); ++i) {
if (a[i] != b[i]){
assert(0);
return false;
}
}
return true;
}
TEST(Test, valuelog_iterator_test) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_search", &db).ok() == false) {
Options dboptions;
dboptions.use_valuelog_length=100;
int RANGE=5000;
if(OpenDB("valuelog_iterator_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
while(key.size()<4){
key='0'+key;
}
std::string value;
for(int j=0;j<5000;j++){
value+=std::to_string(i);
}
for(int i=0;i<RANGE;i++){
std::string key=GenKeyByNum(i,RANGE);
std::string value=GenValueByNum(i,RANGE);
values.push_back(value); values.push_back(value);
Status s=db->Put(writeOptions,key,value); Status s=db->Put(writeOptions,key,value);
assert(s.ok()); assert(s.ok());
} }
auto iter=db->NewIterator(readOptions); auto iter=db->NewIterator(readOptions);
iter->SeekToFirst(); iter->SeekToFirst();
for(int i=0;i<5000;i++){
for(int i=0;i<RANGE;i++){
assert(iter->Valid()); assert(iter->Valid());
auto value=iter->value(); auto value=iter->value();
assert(values[i]==value); assert(values[i]==value);
iter->Next(); iter->Next();
} }
assert(!iter->Valid());
ASSERT_FALSE(iter->Valid());
iter->SeekToLast(); iter->SeekToLast();
for(int i=4999;i>=0;i--){
for(int i=RANGE-1;i>=0;i--){
assert(iter->Valid()); assert(iter->Valid());
auto value=iter->value(); auto value=iter->value();
assert(values[i]==value); assert(values[i]==value);
iter->Prev(); iter->Prev();
} }
assert(!iter->Valid());
iter->Seek("4990");
for(int i=4990;i<5000;i++){
ASSERT_FALSE(iter->Valid());
iter->Seek(GenKeyByNum(RANGE/2,RANGE));
for(int i=RANGE/2;i<RANGE;i++){
assert(iter->Valid()); assert(iter->Valid());
auto value=iter->value(); auto value=iter->value();
assert(values[i]==value); assert(values[i]==value);
iter->Next(); iter->Next();
} }
assert(!iter->Valid());
ASSERT_FALSE(iter->Valid());
delete iter; delete iter;
delete db; delete db;
} }
TEST(Test, CheckGetFields) {
TEST(Test, mix_valuelog_iterator_test) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
Options dboptions;
dboptions.use_valuelog_length=4000;
int RANGE=5000;
if(OpenDB("mix_valuelog_iterator_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> values;
for(int i=0;i<RANGE;i++){
std::string key=GenKeyByNum(i,RANGE);
std::string value=GenValueByNum(rand()%2000,1000);//if >1000 then in valuelog(length=4*1000)
values.push_back(value);
Status s=db->Put(writeOptions,key,value);
assert(s.ok());
}
auto iter=db->NewIterator(readOptions);
iter->SeekToFirst();
for(int i=0;i<RANGE;i++){
assert(iter->Valid());
auto value=iter->value();
assert(values[i]==value);
iter->Next();
}
ASSERT_FALSE(iter->Valid());
delete iter;
delete db;
}
TEST(Test, unorder_valuelog_iterator_test) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
Options dboptions;
dboptions.use_valuelog_length=4000;
int RANGE=5000;
if(OpenDB("valuelog_iterator_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> values;
std::vector<std::pair<std::string,std::string>> new_values;
for(int i=0;i<RANGE;i++){
std::string key=GenKeyByNum(i,RANGE);
std::string value=GenValueByNum(rand()%2000,1000);//if >1000 then in valuelog(length=4*1000)
values.push_back(value);
Status s=db->Put(writeOptions,key,value);
assert(s.ok());
}
auto iter=db->NewUnorderedIterator(readOptions,Slice(),Slice());
for(int i=0;i<RANGE;i++){
assert(iter->Valid());
new_values.push_back({std::string(iter->key().data(),iter->key().size()),std::string(iter->value().data(),iter->value().size())});
iter->Next();
}
std::sort(new_values.begin(),new_values.end());
for(int i=0;i<RANGE;i++){
ASSERT_TRUE(values[i]==new_values[i].second);
}
ASSERT_FALSE(iter->Valid());
delete iter;
iter=db->NewUnorderedIterator(readOptions,GenKeyByNum(RANGE/4,RANGE),GenKeyByNum(RANGE/2,RANGE));
new_values.clear();
for(int i=RANGE/4;i<RANGE/2;i++){
assert(iter->Valid());
new_values.push_back({std::string(iter->key().data(),iter->key().size()),std::string(iter->value().data(),iter->value().size())});
iter->Next();
}
std::sort(new_values.begin(),new_values.end());
for(int i=RANGE/4;i<RANGE/2;i++){
ASSERT_TRUE(values[i]==new_values[i-RANGE/4].second);
}
ASSERT_FALSE(iter->Valid());
delete iter;
delete db;
}
TEST(Test, fields_simple_test) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
Options dbOptions;
dbOptions.use_valuelog_length=-1;
if(OpenDB("fields_simple_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
@ -92,22 +217,17 @@ TEST(Test, CheckGetFields) {
db->Get(ReadOptions(), key1, &value_ret); db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1); DeserializeValue(value_ret, &res1);
for(auto pr:res1){
std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
}
ASSERT_TRUE(CompareFieldArray(fields1, res1)); ASSERT_TRUE(CompareFieldArray(fields1, res1));
db->Delete(WriteOptions(),key1); db->Delete(WriteOptions(),key1);
std::cout<<"get serialized value done"<<std::endl;
delete db; delete db;
} }
TEST(Test, CheckSearchKey) {
TEST(Test, get_keys_by_field_test) {
DB *db; DB *db;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
if(OpenDB("get_keys_by_field_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
@ -133,21 +253,20 @@ TEST(Test, CheckSearchKey) {
std::sort(key_res.begin(),key_res.end()); std::sort(key_res.begin(),key_res.end());
std::sort(target_keys.begin(),target_keys.end()); std::sort(target_keys.begin(),target_keys.end());
ASSERT_TRUE(CompareKey(key_res, target_keys)); ASSERT_TRUE(CompareKey(key_res, target_keys));
std::cout<<"get key by field done"<<std::endl;
for(auto s:keys){
db->Delete(WriteOptions(),s);
}
delete db; delete db;
} }
TEST(Test, LARGE_DATA_COMPACT_TEST) {
TEST(Test, valuelog_common_test) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
Options dbOptions;
dbOptions.use_valuelog_length=100;
if(OpenDB("valuelog_common_test", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
//test Put
std::vector<std::string> values; std::vector<std::string> values;
for(int i=0;i<50000;i++){ for(int i=0;i<50000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
@ -158,17 +277,41 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) {
values.push_back(value); values.push_back(value);
db->Put(writeOptions,key,value); db->Put(writeOptions,key,value);
} }
for(int i=0;i<50000;i++){
for(int i=0;i<50000;i++){
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
Status s=db->Get(readOptions,key,&value); Status s=db->Get(readOptions,key,&value);
assert(s.ok()); assert(s.ok());
if(values[i]!=value){
std::cout<<value.size()<<std::endl;
assert(0);
ASSERT_TRUE(values[i]==value);
}
//test cover put
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<3000;j++){
value+=std::to_string(i);
} }
values[i]=value;
db->Put(writeOptions,key,value);
}
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
ASSERT_TRUE(values[i]==value); ASSERT_TRUE(values[i]==value);
} }
//test delete
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
db->Delete(writeOptions,key);
}
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
ASSERT_TRUE(s.IsNotFound());
}
delete db; delete db;
} }
@ -190,20 +333,13 @@ TEST(Test, Garbage_Collect_TEST) {
values.push_back(value); values.push_back(value);
db->Put(writeOptions,key,value); db->Put(writeOptions,key,value);
} }
std::cout<<"start gc"<<std::endl;
db->TEST_GarbageCollect(); db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<50000;i++){ for(int i=0;i<50000;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i); std::string key=std::to_string(i);
std::string value; std::string value;
Status s=db->Get(readOptions,key,&value); Status s=db->Get(readOptions,key,&value);
assert(s.ok()); assert(s.ok());
if(values[i]!=value){
std::cout<<value.size()<<std::endl;
assert(0);
}
ASSERT_TRUE(values[i]==value); ASSERT_TRUE(values[i]==value);
} }
delete db; delete db;

+ 4
- 129
util/coding.cc View File

@ -158,21 +158,6 @@ bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
return false; return false;
} }
} }
/**
* @brief valuelog
*
* @param filename
* @return true valuelog
* @return false valuelog
*/
bool IsValueLogFile(const std::string& filename) {
// 检查文件是否以 ".valuelog" 结尾
const std::string suffix = ".valuelog";
return filename.size() > suffix.size() &&
filename.substr(filename.size() - suffix.size()) == suffix;
}
/** /**
* @brief valuelog_id offset * @brief valuelog_id offset
* *
@ -180,51 +165,13 @@ bool IsValueLogFile(const std::string& filename) {
* @param valuelog_id ValueLog ID * @param valuelog_id ValueLog ID
* @param offset * @param offset
*/ */
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
Status ParseFakeValueForValuelog(Slice stored_value, uint64_t& valuelog_id,
uint64_t& offset) { uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset // 假设 stored_value 格式为:valuelog_id|offset
Slice tmp(stored_value.data(), stored_value.size()); Slice tmp(stored_value.data(), stored_value.size());
GetVarint64(&tmp, &valuelog_id);
GetVarint64(&tmp, &offset);
}
/**
* @brief ValueLog ID
*
* @param valuelog_name "123.valuelog"
* @return uint64_t ValueLog ID
*/
uint64_t GetValueLogID(const std::string& valuelog_name) {
// 获取文件名部分(假设文件名格式为 "number.extension")
size_t pos = valuelog_name.find_last_of('/');
std::string filename;
if (pos != std::string::npos) {
filename = valuelog_name.substr(pos + 1);
} else {
filename = valuelog_name;
}
// 查找文件名中的 '.' 位置,提取数字部分
pos = filename.find('.');
assert(pos != std::string::npos);
// 提取数字部分
std::string id_str = filename.substr(0, pos);
// 检查文件扩展名是否为 .valuelog
if (filename.substr(pos + 1) != "valuelog") {
assert(0);
}
// 转换为 uint64_t
uint64_t id;
std::istringstream iss(id_str);
if (!(iss >> id)) {
assert(0);
}
return id;
if(!GetVarint64(&tmp, &valuelog_id))return Status::NotSupported("can't decode a valuelog value from its meta info");
if(!GetVarint64(&tmp, &offset))return Status::NotSupported("can't decode a valuelog value from its meta info");
return Status::OK();
} }
// Helper function to split the set of files into chunks // Helper function to split the set of files into chunks
@ -239,76 +186,4 @@ void SplitIntoChunks(const std::set& files, int num_workers,
} }
bool CompareFieldArray(const FieldArray &a, const FieldArray &b) {
if (a.size() != b.size()) return false;
for (size_t i = 0; i < a.size(); ++i) {
if (a[i].first != b[i].first || a[i].second != b[i].second) return false;
}
return true;
}
bool CompareKey(const std::vector<std::string> a, std::vector<std::string> b) {
if (a.size() != b.size()){
assert(0);
return false;
}
for (size_t i = 0; i < a.size(); ++i) {
if (a[i] != b[i]){
assert(0);
return false;
}
}
return true;
}
std::string SerializeValue(const FieldArray& fields){
std::string res_="";
PutVarint64(&res_,(uint64_t)fields.size());
for(auto pr:fields){
PutLengthPrefixedSlice(&res_, pr.first);
PutLengthPrefixedSlice(&res_, pr.second);
}
return res_;
}
void DeserializeValue(const std::string& value_str,FieldArray* res){
Slice slice=Slice(value_str.c_str());
uint64_t siz;
bool tmpres=GetVarint64(&slice,&siz);
assert(tmpres);
res->clear();
for(int i=0;i<siz;i++){
Slice value_name;
Slice value;
tmpres=GetLengthPrefixedSlice(&slice,&value_name);
assert(tmpres);
tmpres=GetLengthPrefixedSlice(&slice,&value);
assert(tmpres);
res->emplace_back(value_name,value);
}
}
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys){
auto it=db->NewUnorderedIterator(options);
it->SeekToFirst();
keys->clear();
while(it->Valid()){
auto val=it->value();
FieldArray arr;
auto str_val=std::string(val.data(),val.size());
DeserializeValue(str_val,&arr);
for(auto pr:arr){
if(pr.first==field.first&&pr.second==field.second){
Slice key=it->key();
keys->push_back(std::string(key.data(),key.size()));
break;
}
}
it->Next();
}
delete it;
return Status::OK();
}
} // namespace leveldb } // namespace leveldb

+ 2
- 9
util/coding.h View File

@ -121,19 +121,12 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value); return GetVarint32PtrFallback(p, limit, value);
} }
bool IsValueLogFile(const std::string& filename);
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
Status ParseFakeValueForValuelog(Slice stored_value, uint64_t& valuelog_id,
uint64_t& offset); uint64_t& offset);
uint64_t GetValueLogID(const std::string& valuelog_name);
void SplitIntoChunks(const std::set<std::string>& files, int num_workers, void SplitIntoChunks(const std::set<std::string>& files, int num_workers,
std::vector<std::vector<std::string>>* chunks); std::vector<std::vector<std::string>>* chunks);
bool CompareFieldArray(const FieldArray &a, const FieldArray &b);
bool CompareKey(const std::vector<std::string> a, std::vector<std::string> b);
std::string SerializeValue(const FieldArray& fields);
void DeserializeValue(const std::string& value_str,FieldArray* res);
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys);
} // namespace leveldb } // namespace leveldb

Loading…
Cancel
Save