From 58222843ff217f8890943f292b9815655724fca2 Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Fri, 3 Jan 2025 00:31:03 +0800 Subject: [PATCH] huge update:bugs fixed, tests added, new function(unorder iter .etc) added --- .gitmodules | 2 +- CMakeLists.txt | 7 +- YCSB-cpp | 2 +- benchmarks/db_bench.cc | 4 +- db/autocompact_test.cc | 3 +- db/builder.cc | 10 -- db/corruption_test.cc | 1 + db/db_impl.cc | 359 +++++++++++++++++++++++++--------------------- db/db_impl.h | 16 ++- db/db_test.cc | 10 +- db/dbformat.h | 6 +- db/fields.cc | 63 ++++++++ db/filename.cc | 5 +- db/filename.h | 1 + db/prefetch_iter.cc | 302 -------------------------------------- db/prefetch_iter.h | 22 --- db/recovery_test.cc | 10 +- db/repair.cc | 2 +- db/true_iter.cc | 104 ++++++++++++++ db/true_iter.h | 21 +++ db/unordered_iter.cc | 38 +++-- db/unordered_iter.h | 2 +- db/version_edit.cc | 4 - db/version_edit.h | 4 +- db/version_set.cc | 12 +- db/write_batch.cc | 10 +- db/write_batch_internal.h | 2 +- include/leveldb/db.h | 9 +- include/leveldb/fields.h | 20 +++ include/leveldb/options.h | 16 +++ test/test.cpp | 234 +++++++++++++++++++++++------- util/coding.cc | 133 +---------------- util/coding.h | 11 +- 33 files changed, 703 insertions(+), 742 deletions(-) create mode 100644 db/fields.cc delete mode 100644 db/prefetch_iter.cc delete mode 100644 db/prefetch_iter.h create mode 100644 db/true_iter.cc create mode 100644 db/true_iter.h create mode 100644 include/leveldb/fields.h diff --git a/.gitmodules b/.gitmodules index 21367a0..98cff6f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -6,4 +6,4 @@ url = https://github.com/google/benchmark [submodule "YCSB-cpp"] path = YCSB-cpp - url = https://github.com/ls4154/YCSB-cpp.git + url = https://github.com/zerowinter0/my_YCSB_benchmark.git diff --git a/CMakeLists.txt b/CMakeLists.txt index d2898b1..568040d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -129,8 +129,9 @@ target_sources(leveldb "db/db_impl.h" "db/db_iter.cc" "db/db_iter.h" - "db/prefetch_iter.cc" - "db/prefetch_iter.h" + "db/true_iter.cc" + "db/true_iter.h" + "db/fields.cc" "db/unordered_iter.cc" "db/unordered_iter.h" "db/dbformat.cc" @@ -213,6 +214,7 @@ target_sources(leveldb "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/fields.h" ) if (WIN32) @@ -497,6 +499,7 @@ if(LEVELDB_INSTALL) "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/fields.h" DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb" ) diff --git a/YCSB-cpp b/YCSB-cpp index b8e5cc1..7df09a1 160000 --- a/YCSB-cpp +++ b/YCSB-cpp @@ -1 +1 @@ -Subproject commit b8e5cc1446a3df02f09d045b045434bdebf27405 +Subproject commit 7df09a150d3ab16b303c25007eb0a27c8eed8049 diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 0a54d36..fb7eb66 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -883,10 +883,10 @@ class Benchmark { } void ReadUnorderSequential(ThreadState* thread) { - Iterator* iter = db_->NewUnorderedIterator(ReadOptions()); + Iterator* iter = db_->NewUnorderedIterator(ReadOptions(),Slice(),Slice()); int i = 0; int64_t bytes = 0; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + for (; iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); thread->stats.FinishedSingleOp(); ++i; diff --git a/db/autocompact_test.cc b/db/autocompact_test.cc index 69341e3..0980104 100644 --- a/db/autocompact_test.cc +++ b/db/autocompact_test.cc @@ -19,6 +19,7 @@ class AutoCompactTest : public testing::Test { DestroyDB(dbname_, options_); options_.create_if_missing = true; options_.compression = kNoCompression; + options_.use_valuelog_length=-1; EXPECT_LEVELDB_OK(DB::Open(options_, dbname_, &db_)); } @@ -50,7 +51,7 @@ class AutoCompactTest : public testing::Test { DB* db_; }; -static const int kValueSize = 200 * 1024; +static const int kValueSize = 200*1024; static const int kTotalSize = 100 * 1024 * 1024; static const int kCount = kTotalSize / kValueSize; diff --git a/db/builder.cc b/db/builder.cc index 780e6f9..e6329e0 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,16 +28,6 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } - // 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value - { - auto tmp_value=iter->value(); - if(tmp_value.data()[0]==(char)(0x01)){ - tmp_value.remove_prefix(1); - assert(GetVarint64(&tmp_value,&meta->valuelog_id)); - } - else meta->valuelog_id=0; - } - TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); Slice key; diff --git a/db/corruption_test.cc b/db/corruption_test.cc index dc7da76..903e7ed 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -28,6 +28,7 @@ class CorruptionTest : public testing::Test { tiny_cache_(NewLRUCache(100)) { options_.env = &env_; options_.block_cache = tiny_cache_; + options_.use_valuelog_length=-1; DestroyDB(dbname_, options_); options_.create_if_missing = true; diff --git a/db/db_impl.cc b/db/db_impl.cc index c26edbe..5624872 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6,7 +6,7 @@ #include "db/builder.h" #include "db/db_iter.h" -#include "db/prefetch_iter.h" +#include "db/true_iter.h" #include "db/unordered_iter.h" #include "db/dbformat.h" #include "db/filename.h" @@ -155,9 +155,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) background_compaction_scheduled_(false), background_garbage_collect_scheduled_(false), manual_compaction_(nullptr), - valuelog_cache(NewLRUCache(config::mem_value_log_number)), + mem_value_log_number_(raw_options.mem_value_log_number), + valuelog_cache(NewLRUCache(raw_options.mem_value_log_number)), versions_(new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_)) { + &internal_comparator_)), + use_valuelog_length(raw_options.use_valuelog_length), + value_log_size_(raw_options.value_log_size){ } @@ -283,6 +286,7 @@ void DBImpl::RemoveObsoleteFiles() { // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; + case kValueLogFile: case kCurrentFile: case kDBLockFile: case kInfoLogFile: @@ -1052,8 +1056,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Value is >= 100 bytes, read from external file uint64_t file_id, valuelog_offset; value.remove_prefix(1); - bool res = GetVarint64(&value, &file_id); - if (!res) return Status::Corruption("can't decode file id"); + + status=ParseFakeValueForValuelog(value,file_id,valuelog_offset); + if(!status.ok())break; + valuelog_usage[file_id]--; } } @@ -1197,7 +1203,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, Iterator* DBImpl::TEST_NewInternalIterator() { SequenceNumber ignored; uint32_t ignored_seed; - return NewIterator(ReadOptions()); + return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { @@ -1253,24 +1259,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (options.find_value_log_for_gc) { return s; } - - if (value->c_str()[0] == 0x00) { - *value = value->substr(1); - return s; - } - Slice value_log_slice = Slice(value->c_str() + 1, value->length()); - uint64_t file_id, valuelog_offset; - bool res = GetVarint64(&value_log_slice, &file_id); - if (!res) return Status::Corruption("can't decode file id"); - res = GetVarint64(&value_log_slice, &valuelog_offset); - if (!res) return Status::Corruption("can't decode valuelog offset"); - - { - mutex_.Unlock(); - s = ReadValueLog(file_id, valuelog_offset, value); - mutex_.Lock(); - } - + Slice value_log_slice = Slice(value->c_str(), value->length()); + mutex_.Unlock(); + s=parseTrueValue(&value_log_slice,value); + mutex_.Lock(); return s; } @@ -1293,9 +1285,9 @@ Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) { return db_iter; } -Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options) { +Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options,const Slice &lower_key,const Slice &upper_key) { auto iter=NewOriginalIterator(options); - return NewUnorderedIter(this,iter,dbname_); + return NewUnorderedIter(this,iter,dbname_,options.max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator()); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { @@ -1305,27 +1297,16 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { mutex_.Lock(); - Iterator* iter_prefetch = NewInternalIterator(options, &latest_snapshot, &seed); - auto db_iter_prefetch=NewDBIterator(this, user_comparator(), iter_prefetch, - (options.snapshot != nullptr - ? static_cast(options.snapshot) - ->sequence_number() - : latest_snapshot), - seed); - - SequenceNumber useless_snapshot; - - Iterator* iter = NewInternalIterator(options, &useless_snapshot, &seed); + Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); auto db_iter=NewDBIterator(this, user_comparator(), iter, (options.snapshot != nullptr ? static_cast(options.snapshot) ->sequence_number() : latest_snapshot), seed); - mutex_.Unlock(); - return NewPreFetchIterator(this,db_iter,db_iter_prefetch,iter_num); + return NewTrueIterator(this,db_iter); } void DBImpl::RecordReadSample(Slice key) { @@ -1379,7 +1360,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch, this);//need lock! to protect valuelog_number + WriteBatchInternal::ConverToValueLog(write_batch, this,use_valuelog_length);//need lock! to protect valuelog_number WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1644,43 +1625,29 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { std::vector> DBImpl::WriteValueLog( std::vector> kv) { + + if(valuelogfile_number_==0){ + addNewValueLog(); + } + std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::fstream valueFile(file_name_, std::ios::in | std::ios::out | std::ios::binary); - if (!valueFile.is_open()) { - assert(0); - } + assert(valueFile.is_open()); valueFile.seekg(0, std::ios::end); // 移动到文件末尾 uint64_t init_offset = valueFile.tellg(); // 如果超出fixed_size - if(init_offset>=config::value_log_size){ - uint64_t file_data_size = 0; // 文件数据大小标志位 - valueFile.seekg(0, std::ios::beg); - valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - valuelog_usage[valuelogfile_number_]=file_data_size; - valuelog_origin[valuelogfile_number_]=file_data_size; + if(init_offset>=value_log_size_){ addNewValueLog(); valueFile.close(); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); valueFile.seekg(0, std::ios::end); // 移动到文件末尾 - init_offset = valueFile.tellg(); + init_offset = 0; + valuelog_usage[valuelogfile_number_]=0; + valuelog_origin[valuelogfile_number_]=0; } - uint64_t file_data_size = 0; // 文件数据大小标志位 - valueFile.seekg(0, std::ios::beg); - valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - valueFile.clear(); // 清除错误状态 - - //update length first - file_data_size+=kv.size(); - valueFile.seekp(0, std::ios::beg); // 移动到文件开头 - valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - assert(valueFile.good()); - - valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入 - // std::cout<<"file_data_size: "<> res; int total_size=0; @@ -1689,7 +1656,7 @@ std::vector> DBImpl::WriteValueLog( total_size+=pr.first.size()+pr.second.size(); } - char buf[total_size]; + char* buf= new char[total_size];//write all data with one fstream.write using this buf uint64_t offset=0; for (const auto& pr:kv) { @@ -1726,58 +1693,65 @@ std::vector> DBImpl::WriteValueLog( // 解锁资源或进行其他清理操作 //valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 valueFile.close(); + valuelog_usage[valuelogfile_number_]+=res.size(); + valuelog_origin[valuelogfile_number_]+=res.size(); + delete buf; return res; } void DBImpl::addNewValueLog() { + mutex_.AssertHeld(); valuelogfile_number_ = versions_->NewFileNumber(); std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::fstream valueFile(file_name_, std::ios::app | std::ios::binary); - if (!valueFile.is_open()) { - assert(0); - } - uint64_t file_data_size = 0; // 新增的文件数据大小标志位 - if (valueFile.tellp() != 0) { - assert(0); - } - else{ - valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } - else{ - // 正常关闭文件 - valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 - valueFile.close(); - } - } + assert(valueFile.is_open()); + valueFile.close(); } static void valuelog_cache_deleter(const leveldb::Slice &key, void *value){ delete (RandomAccessFile*)value; } +Status DBImpl::parseTrueValue(Slice* value,std::string *true_value){ + if(value->empty()){ + *true_value=""; + } + else if(value->data()[0]==0x00){ + value->remove_prefix(1); + std::string new_str=std::string(value->data(),value->size()); + *true_value=std::move(new_str); + } + else{ + uint64_t value_id,value_offset; + value->remove_prefix(1); + Status s=ParseFakeValueForValuelog(*value,value_id,value_offset); + if(!s.ok())return s; + return ReadValueLog(value_id,value_offset,true_value); + } + return Status::OK(); +} + Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value) { std::string file_name_ = ValueLogFileName(dbname_, file_id); mutex_.Lock(); - if(file_id==valuelogfile_number_||config::mem_value_log_number==0){ + if(file_id==valuelogfile_number_||mem_value_log_number_==0){ mutex_.Unlock(); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); - uint64_t value_len; + uint64_t value_len=0; inFile.seekg(offset); inFile.read((char*)(&value_len),sizeof(uint64_t)); - char buf[value_len]; + char* buf=new char[value_len]; inFile.read(buf,value_len); inFile.close(); *value=std::string(buf,value_len); + delete buf; return Status::OK(); } @@ -1819,54 +1793,38 @@ void DBImpl::GarbageCollect() { Status s = env_->GetChildren(dbname_, &filenames); Log(options_.info_log, "start gc "); assert(s.ok()); - std::set valuelog_set; - for (const auto& filename:filenames) { - if (IsValueLogFile(filename)){ - uint64_t cur_log_number = GetValueLogID(filename); - if (cur_log_number == valuelogfile_number_) { - continue; - } - auto tmp_name = ValueLogFileName(dbname_, cur_log_number); - if (!versions_->checkOldValueLog(tmp_name) && - valuelog_origin[cur_log_number]) { - //std::cout<<((float)valuelog_usage[cur_log_number]) /(float)valuelog_origin[cur_log_number]< gc_valuelog_id_vector; + + mutex_.Lock();// for visit valuelog_origin/usage + for(const auto&pr:valuelog_origin){ + if( + ((float)valuelog_usage[pr.first])/pr.secondRemoveObsoleteFiles(); impl->MaybeScheduleCompaction(); impl->InitializeExistingLogs(); - impl->addNewValueLog(); + //impl->addNewValueLog(); } impl->mutex_.Unlock(); if (s.ok()) { @@ -2071,12 +2044,6 @@ Status DestroyDB(const std::string& dbname, const Options& options) { result = del; } } - else if(IsValueLogFile(filenames[i])){ - Status del = env->RemoveFile(dbname + "/" + filenames[i]); - if (result.ok() && !del.ok()) { - result = del; - } - } } env->UnlockFile(lock); // Ignore error since state is already gone env->RemoveFile(lockname); @@ -2085,57 +2052,115 @@ Status DestroyDB(const std::string& dbname, const Options& options) { return result; } -// 读取所有现有日志文件的 file_data_size +// recover for valuelog void DBImpl::InitializeExistingLogs() { std::vector filenames; Status s = env_->GetChildren(dbname_, &filenames); - Log(options_.info_log, "start set file map "); + Log(options_.info_log, "start recover for valuelog"); assert(s.ok()); - std::set valuelog_set; + std::set all_valuelog_ids; + std::set live_valuelog_ids; + uint64_t latest_valuelog_id=0; + uint64_t latest_valuelog_offset=0; for (const auto& filename : filenames) { - if (IsValueLogFile(filename)) { - uint64_t cur_log_number = GetValueLogID(filename); - uint64_t file_data_size = ReadFileSize(cur_log_number); - valuelog_origin[cur_log_number]=file_data_size; - valuelog_usage[cur_log_number]=0; - } + uint64_t valuelog_number; + FileType type; + ParseFileName(filename,&valuelog_number,&type); + if(type==FileType::kValueLogFile)all_valuelog_ids.emplace(valuelog_number); } + mutex_.Unlock(); auto db_iter=NewOriginalIterator(ReadOptions()); for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){ auto value=db_iter->value(); if(value.size()&&value[0]==0x01){ value.remove_prefix(1); - uint64_t valuelog_id; - auto res=GetVarint64(&value,&valuelog_id); - assert(res); - assert(valuelog_usage.count(valuelog_id)); + uint64_t valuelog_id,valuelog_offset; + + Status status=ParseFakeValueForValuelog(value,valuelog_id,valuelog_offset); + if(!status.ok()){//handle error:skip this value, correct? + continue; + } + valuelog_usage[valuelog_id]++; + if(valuelog_id>=latest_valuelog_id){ + + if(valuelog_id==latest_valuelog_id){ + latest_valuelog_offset=std::max(latest_valuelog_offset,valuelog_offset); + } + else{ + latest_valuelog_id=valuelog_id; + latest_valuelog_offset=valuelog_offset; + } + } } } delete db_iter; mutex_.Lock(); -} + for(const auto& pr:valuelog_usage){ + live_valuelog_ids.emplace(pr.first); + } -// 读取单个文件的 file_data_size -uint64_t DBImpl::ReadFileSize(uint64_t log_number) { - auto file_name = ValueLogFileName(dbname_, log_number); - std::ifstream valueFile(file_name, std::ios::in | std::ios::binary); - if (!valueFile.is_open()) { - std::cerr << "Failed to open file: " << file_name << std::endl; - return 0; + for(const auto &id:all_valuelog_ids){ + if(!live_valuelog_ids.count(id)){ + //useless valuelog, delete directly + auto valuelog_name=ValueLogFileName(dbname_,id); + s=env_->RemoveFile(valuelog_name); + assert(s.ok()); + } } - uint64_t file_data_size = 0; - valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - if (valueFile.fail() || valueFile.bad()) { - std::cerr << "Failed to read data size from file: " << file_name - << std::endl; - valueFile.close(); - return 0; + if(latest_valuelog_id>0){//delete data that was written in valuelog but not written in WAL + auto valuelog_name=ValueLogFileName(dbname_,latest_valuelog_id); + + std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary); + uint64_t value_len,key_len; + inFile.seekg(latest_valuelog_offset); + inFile.read((char*)(&value_len),sizeof(uint64_t)); + latest_valuelog_offset+=value_len+sizeof(uint64_t); + inFile.seekg(latest_valuelog_offset); + inFile.read((char*)(&key_len),sizeof(uint64_t)); + latest_valuelog_offset+=key_len+sizeof(uint64_t); + + + char* buf=new char[latest_valuelog_offset]; + inFile.seekg(0); + inFile.read(buf,latest_valuelog_offset); + inFile.close(); + + + std::ofstream trunc_file(valuelog_name, std::ios::out | std::ios::binary | std::ios::trunc); + trunc_file.write(buf,latest_valuelog_offset); + trunc_file.close(); + + delete buf; + } + + for(const auto&id:live_valuelog_ids){//update valuelog_origin + + auto valuelog_name=ValueLogFileName(dbname_,id); + std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary); + int data_cnt=0; + + + uint64_t value_len,key_len; + int cur_offset=0; + while(1){ + inFile.read((char*)(&value_len),sizeof(uint64_t)); + + if (inFile.eof()) { + break; // 正常退出条件:到达文件末尾 + } + + cur_offset+=value_len+sizeof(uint64_t); + inFile.seekg(cur_offset); + inFile.read((char*)(&key_len),sizeof(uint64_t)); + cur_offset+=key_len+sizeof(uint64_t); + data_cnt++; + } + + valuelog_origin[id]=data_cnt; } - valueFile.close(); - return file_data_size; } } // namespace leveldb diff --git a/db/db_impl.h b/db/db_impl.h index 898690b..2699359 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -62,7 +62,7 @@ class DBImpl : public DB { std::string* value) override; Iterator* NewIterator(const ReadOptions&) override; Iterator* NewOriginalIterator(const ReadOptions&); - Iterator* NewUnorderedIterator(const ReadOptions&) override; + Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key) override;//upper key not included const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; bool GetProperty(const Slice& property, std::string* value) override; @@ -78,6 +78,8 @@ class DBImpl : public DB { Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value) override; + Status parseTrueValue(Slice* value,std::string* true_value) override; + Status ReadValueLogRange(uint64_t file_id,std::vector offsets, std::string* value); @@ -108,8 +110,6 @@ class DBImpl : public DB { void InitializeExistingLogs(); - uint64_t ReadFileSize(uint64_t log_number); - private: friend class DB; @@ -185,7 +185,7 @@ class DBImpl : public DB { void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); - void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void GarbageCollect(); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); @@ -236,10 +236,10 @@ class DBImpl : public DB { WritableFile* valuelogfile_; int valuelogfile_offset = 0; uint64_t logfile_number_; - uint64_t valuelogfile_number_; + uint64_t valuelogfile_number_=0; log::Writer* log_; std::map oldvaluelog_ids; - + int mem_value_log_number_;//if =0, don't use cache Cache* valuelog_cache; std::map valuelog_usage; std::map valuelog_origin; @@ -269,6 +269,10 @@ class DBImpl : public DB { VersionSet* const versions_ GUARDED_BY(mutex_); + int use_valuelog_length=5000; + + int value_log_size_; + // Have we encountered a background error in paranoid mode? Status bg_error_ GUARDED_BY(mutex_); diff --git a/db/db_test.cc b/db/db_test.cc index a4a84cd..c6197f5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -340,7 +340,9 @@ class DBTest : public testing::Test { opts = CurrentOptions(); opts.create_if_missing = true; } + opts.use_valuelog_length=-1; last_options_ = opts; + return DB::Open(opts, dbname_, &db_); } @@ -413,9 +415,13 @@ class DBTest : public testing::Test { result += ", "; } first = false; + std::string res; + Slice true_val; switch (ikey.type) { case kTypeValue: - result += iter->value().ToString(); + true_val=iter->value(); + dbfull()->parseTrueValue(&true_val,&res); + result += res; break; case kTypeDeletion: result += "DEL"; @@ -1107,6 +1113,7 @@ TEST_F(DBTest, MinorCompactionsHappen) { TEST_F(DBTest, RecoverWithLargeLog) { { Options options = CurrentOptions(); + options.use_valuelog_length=-1; Reopen(&options); ASSERT_LEVELDB_OK(Put("big1", std::string(200000, '1'))); ASSERT_LEVELDB_OK(Put("big2", std::string(200000, '2'))); @@ -1118,6 +1125,7 @@ TEST_F(DBTest, RecoverWithLargeLog) { // Make sure that if we re-open with a small write buffer size that // we flush table files in the middle of a large log file. Options options = CurrentOptions(); + options.use_valuelog_length=-1; options.write_buffer_size = 100000; Reopen(&options); ASSERT_EQ(NumTableFilesAtLevel(0), 3); diff --git a/db/dbformat.h b/db/dbformat.h index e0dce4a..4438faf 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -44,12 +44,8 @@ static const int kMaxMemCompactLevel = 2; // Approximate gap in bytes between samples of data read during iteration. static const int kReadBytesPeriod = 1048576; -// maximum size of value_log file -static const int value_log_size=1<<26; -//1<<33/1<<26=1<<7 -static const int mem_value_log_number=0;//8GB -static const int max_unorder_iter_memory_usage=64<<20; //32MB +static const float GC_THRESHOLD=0.6; } // namespace config diff --git a/db/fields.cc b/db/fields.cc new file mode 100644 index 0000000..858e417 --- /dev/null +++ b/db/fields.cc @@ -0,0 +1,63 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_ +#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_ + +#include + +#include "db/dbformat.h" +#include "leveldb/fields.h" + +namespace leveldb { + std::string SerializeValue(const FieldArray& fields){ + std::string res_=""; + PutVarint64(&res_,(uint64_t)fields.size()); + for(auto pr:fields){ + PutLengthPrefixedSlice(&res_, pr.first); + PutLengthPrefixedSlice(&res_, pr.second); + } + return res_; + } + + void DeserializeValue(const std::string& value_str,FieldArray* res){ + Slice slice=Slice(value_str.c_str()); + uint64_t siz; + bool tmpres=GetVarint64(&slice,&siz); + assert(tmpres); + res->clear(); + for(int i=0;iemplace_back(value_name,value); + } + } + + Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys){ + auto it=db->NewUnorderedIterator(options,Slice(),Slice()); + keys->clear(); + while(it->Valid()){ + auto val=it->value(); + FieldArray arr; + auto str_val=std::string(val.data(),val.size()); + DeserializeValue(str_val,&arr); + for(auto pr:arr){ + if(pr.first==field.first&&pr.second==field.second){ + Slice key=it->key(); + keys->push_back(std::string(key.data(),key.size())); + break; + } + } + it->Next(); + } + delete it; + return Status::OK(); + } +} + +#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_ \ No newline at end of file diff --git a/db/filename.cc b/db/filename.cc index ceff186..f811fdf 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -117,7 +117,10 @@ bool ParseFileName(const std::string& filename, uint64_t* number, *type = kTableFile; } else if (suffix == Slice(".dbtmp")) { *type = kTempFile; - } else { + } else if (suffix == Slice(".valuelog")){ + *type = kValueLogFile; + } + else { return false; } *number = num; diff --git a/db/filename.h b/db/filename.h index 6c71754..f157136 100644 --- a/db/filename.h +++ b/db/filename.h @@ -25,6 +25,7 @@ enum FileType { kDescriptorFile, kCurrentFile, kTempFile, + kValueLogFile, kInfoLogFile // Either the current one, or an old one }; diff --git a/db/prefetch_iter.cc b/db/prefetch_iter.cc deleted file mode 100644 index 1e82743..0000000 --- a/db/prefetch_iter.cc +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. -#include -#include -#include -#include -#include "db/prefetch_iter.h" - -#include "db/db_impl.h" -#include "db/dbformat.h" -#include "db/filename.h" -#include "leveldb/env.h" -#include "leveldb/iterator.h" -#include "port/port.h" -#include "util/logging.h" -#include "util/mutexlock.h" -#include "util/random.h" -#include "port/port.h" - -namespace leveldb { - -namespace { - - - -// Memtables and sstables that make the DB representation contain -// (userkey,seq,type) => uservalue entries. DBPreFetchIter -// combines multiple entries for the same userkey found in the DB -// representation into a single entry while accounting for sequence -// numbers, deletion markers, overwrites, etc. -class DBPreFetchIter : public Iterator { - public: - // Which direction is the iterator currently moving? - // (1) When moving forward, the internal iterator is positioned at - // the exact entry that yields this->key(), this->value() - // (2) When moving backwards, the internal iterator is positioned - // just before all entries whose user key == this->key(). - enum IterPos {Left,Mid,Right}; - - DBPreFetchIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num) - : - db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {} - - DBPreFetchIter(const DBPreFetchIter&) = delete; - DBPreFetchIter& operator=(const DBPreFetchIter&) = delete; - - ~DBPreFetchIter() override { - // if(prefetch_thread.joinable()){ - // stop_flag.store(true); - // prefetch_thread.join(); - // } - delete prefetch_iter_; - //std::cout<<"fetch:"<Valid(); } - Slice key() const override { - return iter_->key(); - } - Slice value() const override { - // if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){ - // fetched_++; - // return prefetch_array[cur_pos]; - // } - // else{ - // unfetched_++; - buf_for_value=std::move(GetAndParseTrueValue(iter_->value())); - return Slice(buf_for_value.data(),buf_for_value.size()); - //} - } - Status status() const override { - return iter_->status(); - } - - void Next() override; - void Prev() override; - void Seek(const Slice& target) override; - void SeekToFirst() override; - void SeekToLast() override; - - private: - std::string GetAndParseTrueValue(Slice tmp_value)const{ - if(tmp_value.size()==0){ - return ""; - } - if(tmp_value.data()[0]==(char)(0x00)){ - tmp_value.remove_prefix(1); - return std::string(tmp_value.data(),tmp_value.size()); - } - tmp_value.remove_prefix(1); - uint64_t file_id,valuelog_offset; - bool res=GetVarint64(&tmp_value,&file_id); - if(!res)assert(0); - res=GetVarint64(&tmp_value,&valuelog_offset); - if(!res)assert(0); - std::string str; - Status s=db_->ReadValueLog(file_id,valuelog_offset, &str); - return std::move(str); - } - - - // void PreFetchThreadForward(){ - // std::thread prefetch_threads[prefetch_num_]; - // std::queue> q; - // port::Mutex* lock=new port::Mutex(); - // port::CondVar* cv=new port::CondVar(lock); - // bool local_stop_flag=false; - // int remaining_task_cnt=0; - // bool main_finish=false; - // for(int i=0;iLock(); - // while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ - // cv->Wait(); - // } - // if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ - // cv->SignalAll(); - // lock->Unlock(); - // break; - // } - // std::string s=q.front().first; - // pos=q.front().second; - // q.pop(); - // remaining_task_cnt--; - // lock->Unlock(); - // prefetch_array[pos]=std::move(GetAndParseTrueValue(s)); - // prefetched_array[pos].store(true); - // } - // } - // ); - // } - // Slice val; - // int pos=0; - // for(int i=0;i<100&&prefetch_iter_->Valid();i++){ - // prefetch_iter_->Next(); - // pos++; - // } - // for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){ - // val=prefetch_iter_->value(); - // lock->Lock(); - // q.push({std::string(val.data(),val.size()),pos}); - // cv->Signal(); - // remaining_task_cnt++; - // lock->Unlock(); - // pos++; - // } - - // lock->Lock(); - // main_finish=true; - // while(remaining_task_cnt){ - // cv->Wait(); - // } - // lock->Unlock(); - // cv->SignalAll(); - - // for (auto& thread : prefetch_threads) { - // if (thread.joinable()) { - // thread.join(); - // } - // } - // } - - // void PreFetchThreadBackward(){ - // std::thread prefetch_threads[prefetch_num_]; - // std::queue> q; - // port::Mutex* lock=new port::Mutex(); - // port::CondVar* cv=new port::CondVar(lock); - // bool local_stop_flag=false; - // int remaining_task_cnt=0; - // bool main_finish=false; - // for(int i=0;iLock(); - // while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ - // cv->Wait(); - // } - // if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ - // cv->SignalAll(); - // lock->Unlock(); - // break; - // } - // std::string s=q.front().first; - // pos=q.front().second; - // q.pop(); - // remaining_task_cnt--; - // lock->Unlock(); - // prefetch_array[pos]=std::move(GetAndParseTrueValue(s)); - // prefetched_array[pos].store(true); - // } - // } - // ); - // } - // Slice val; - // int pos=1000000; - // for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){ - // val=prefetch_iter_->value(); - // lock->Lock(); - // q.push({std::string(val.data(),val.size()),pos}); - // cv->Signal(); - // remaining_task_cnt++; - // lock->Unlock(); - // pos--; - // } - - // lock->Lock(); - // main_finish=true; - // while(remaining_task_cnt){ - // cv->Wait(); - // } - // lock->Unlock(); - // cv->SignalAll(); - - // for (auto& thread : prefetch_threads) { - // if (thread.joinable()) { - // thread.join(); - // } - // } - // } - - - DBImpl* db_; - Iterator* const iter_; - Iterator* const prefetch_iter_; - int prefetch_num_; - // std::atomic stop_flag; - // std::string prefetch_array[1000005]; - // std::atomic prefetched_array[1000005]; - std::thread prefetch_thread; - mutable std::string buf_for_value; - int cur_pos=0; - mutable int fetched_=0; - mutable int unfetched_=0; -}; -void DBPreFetchIter::Next() { - iter_->Next(); - //cur_pos++; -} -void DBPreFetchIter::Prev() { - iter_->Prev(); - //cur_pos--; -} - -void DBPreFetchIter::Seek(const Slice& target) { - iter_->Seek(target); - - // if(prefetch_thread.joinable()){ - // stop_flag.store(true); - // prefetch_thread.join(); - // stop_flag=false; - // } - // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - // cur_pos=0; - // prefetch_iter_->Seek(target); - // prefetch_thread=std::thread([this]() { - // PreFetchThreadForward(); - // }); -} - -void DBPreFetchIter::SeekToFirst() { - iter_->SeekToFirst(); - - // if(prefetch_thread.joinable()){ - // stop_flag.store(true); - // prefetch_thread.join(); - // stop_flag=false; - // } - // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - // cur_pos=0; - // prefetch_iter_->SeekToFirst(); - // prefetch_thread=std::thread([this]() { - // PreFetchThreadForward(); - // }); - } -void DBPreFetchIter::SeekToLast() { - iter_->SeekToLast(); - - // if(prefetch_thread.joinable()){ - // stop_flag.store(true); - // prefetch_thread.join(); - // stop_flag=false; - // } - // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - // cur_pos=1000000; - - // prefetch_thread=std::thread([this]() { - // prefetch_iter_->SeekToLast(); - // PreFetchThreadBackward(); - // }); -} - -} // anonymous namespace -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,Iterator* prefetch_iter,int prefetch_num) { - return new DBPreFetchIter(db,db_iter,prefetch_iter,prefetch_num); -} -} // namespace leveldb diff --git a/db/prefetch_iter.h b/db/prefetch_iter.h deleted file mode 100644 index 133a773..0000000 --- a/db/prefetch_iter.h +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ -#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ - -#include - -#include "db/dbformat.h" -#include "leveldb/db.h" - -namespace leveldb { - -class DBImpl; - -// add a prefetch function for db_iter -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter, Iterator* prefetch_iter,int prefetch_num); - -} // namespace leveldb - -#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ diff --git a/db/recovery_test.cc b/db/recovery_test.cc index 8dc039a..a59e930 100644 --- a/db/recovery_test.cc +++ b/db/recovery_test.cc @@ -283,9 +283,11 @@ TEST_F(RecoveryTest, MultipleLogFiles) { // Make a bunch of uncompacted log files. uint64_t old_log = FirstLogFile(); - MakeLogFile(old_log + 1, 1000, "hello", "world"); - MakeLogFile(old_log + 2, 1001, "hi", "there"); - MakeLogFile(old_log + 3, 1002, "foo", "bar2"); + std::string prefix; + prefix+=(char)0x00; + MakeLogFile(old_log + 1, 1000, "hello", prefix+"world"); + MakeLogFile(old_log + 2, 1001, "hi", prefix+"there"); + MakeLogFile(old_log + 3, 1002, "foo", prefix+"bar2"); // Recover and check that all log files were processed. Open(); @@ -310,7 +312,7 @@ TEST_F(RecoveryTest, MultipleLogFiles) { // Check that introducing an older log file does not cause it to be re-read. Close(); - MakeLogFile(old_log + 1, 2000, "hello", "stale write"); + MakeLogFile(old_log + 1, 2000, "hello", prefix+"stale write"); Open(); ASSERT_LE(1, NumTables()); ASSERT_EQ(1, NumLogs()); diff --git a/db/repair.cc b/db/repair.cc index 513327a..97a27c6 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -369,7 +369,7 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest, - t.meta.largest,t.meta.valuelog_id); + t.meta.largest); } // std::fprintf(stderr, diff --git a/db/true_iter.cc b/db/true_iter.cc new file mode 100644 index 0000000..d0c7f13 --- /dev/null +++ b/db/true_iter.cc @@ -0,0 +1,104 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include +#include "db/true_iter.h" + +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "port/port.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "port/port.h" + +namespace leveldb { + +namespace { + + + +// Memtables and sstables that make the DB representation contain +// (userkey,seq,type) => uservalue entries. DBTrueIter +// combines multiple entries for the same userkey found in the DB +// representation into a single entry while accounting for sequence +// numbers, deletion markers, overwrites, etc. +class DBTrueIter : public Iterator { + public: + // Which direction is the iterator currently moving? + // (1) When moving forward, the internal iterator is positioned at + // the exact entry that yields this->key(), this->value() + // (2) When moving backwards, the internal iterator is positioned + // just before all entries whose user key == this->key(). + DBTrueIter(DBImpl* db, Iterator* iter) + : + db_(db),iter_(iter){} + + DBTrueIter(const DBTrueIter&) = delete; + DBTrueIter& operator=(const DBTrueIter&) = delete; + + ~DBTrueIter() override { + delete iter_; + } + bool Valid() const override { return iter_->Valid(); } + Slice key() const override { + return iter_->key(); + } + Slice value() const override { + buf_for_value=std::move(GetAndParseTrueValue(iter_->value())); + return Slice(buf_for_value.data(),buf_for_value.size()); + } + Status status() const override { + return iter_->status(); + } + + void Next() override; + void Prev() override; + void Seek(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + + private: + std::string GetAndParseTrueValue(Slice tmp_value)const{ + if(tmp_value.size()==0){ + return ""; + } + std::string str; + Status s=db_->parseTrueValue(&tmp_value,&str); + return std::move(str); + } + + + DBImpl* db_; + Iterator* const iter_; + mutable std::string buf_for_value; +}; +void DBTrueIter::Next() { + iter_->Next(); +} +void DBTrueIter::Prev() { + iter_->Prev(); +} + +void DBTrueIter::Seek(const Slice& target) { + iter_->Seek(target); +} + +void DBTrueIter::SeekToFirst() { + iter_->SeekToFirst(); + } +void DBTrueIter::SeekToLast() { + iter_->SeekToLast(); +} + +} // anonymous namespace +Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter) { + return new DBTrueIter(db,db_iter); +} +} // namespace leveldb diff --git a/db/true_iter.h b/db/true_iter.h new file mode 100644 index 0000000..79aaeb5 --- /dev/null +++ b/db/true_iter.h @@ -0,0 +1,21 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_TRUE_ITER_H_ +#define STORAGE_LEVELDB_DB_TRUE_ITER_H_ + +#include + +#include "db/dbformat.h" +#include "leveldb/db.h" + +namespace leveldb { + +class DBImpl; + +Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_TRUE_ITER_H_ diff --git a/db/unordered_iter.cc b/db/unordered_iter.cc index 55ca072..2c29c63 100644 --- a/db/unordered_iter.cc +++ b/db/unordered_iter.cc @@ -40,9 +40,18 @@ class UnorderedIter : public Iterator { // just before all entries whose user key == this->key(). enum IterPos {Left,Mid,Right}; - UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name) + UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) : - db_(db),iter_(iter),db_name_(db_name){} + db_(db),iter_(iter),db_name_(db_name),max_unorder_iter_memory_usage_(max_unorder_iter_memory_usage),lower_key_(lower_key),upper_key_(upper_key),comparator_(user_comparator){ + first_one=true; + if(lower_key_.empty())iter_->SeekToFirst(); + else iter_->Seek(lower_key); + if(!iter_->Valid()){ + mode=2; + return; + } + Next(); + } UnorderedIter(const UnorderedIter&) = delete; UnorderedIter& operator=(const UnorderedIter&) = delete; @@ -114,6 +123,12 @@ class UnorderedIter : public Iterator { now_key=Slice(buf_for_now_key,key_len); } + bool keyGreaterThanRequire(){ + if(!iter_->Valid())return true; + else if(upper_key_.empty())return false; + else return(comparator_->Compare(iter_->key(),upper_key_)>=0); + } + DBImpl* db_; Iterator* const iter_; @@ -126,7 +141,6 @@ class UnorderedIter : public Iterator { bool iter_valid=false; std::map> valuelog_map; int memory_usage=0; - uint64_t max_memory_usage=config::max_unorder_iter_memory_usage; std::string db_name_; std::ifstream* current_file=nullptr; @@ -136,20 +150,26 @@ class UnorderedIter : public Iterator { int mode=0;//0=iter, 1=valuelog, 2=invalid std::map>::iterator valuelog_map_iter; int vec_idx=-1; + int max_unorder_iter_memory_usage_; + + const Slice lower_key_; + const Slice upper_key_; + + const Comparator* comparator_; }; void UnorderedIter::Next() { if(mode==0){ - if(iter_->Valid()) + if(iter_->Valid()&&!keyGreaterThanRequire()) { if(first_one){ first_one=false; } else iter_->Next(); for(; - iter_->Valid()&&memory_usageValid()&&memory_usageNext()) { if(checkLongValue(iter_->value())){ @@ -227,16 +247,14 @@ void UnorderedIter::Seek(const Slice& target) { } void UnorderedIter::SeekToFirst() { - first_one=true; - iter_->SeekToFirst(); - Next(); + assert(0); } void UnorderedIter::SeekToLast() { assert(0); } } // anonymous namespace -Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name) { - return new UnorderedIter(db,db_iter,db_name); +Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) { + return new UnorderedIter(db,db_iter,db_name,max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator); } } // namespace leveldb diff --git a/db/unordered_iter.h b/db/unordered_iter.h index 685d0f7..76553cb 100644 --- a/db/unordered_iter.h +++ b/db/unordered_iter.h @@ -15,7 +15,7 @@ namespace leveldb { class DBImpl; // add a prefetch function for db_iter -Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name); +Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* comparator); } // namespace leveldb diff --git a/db/version_edit.cc b/db/version_edit.cc index 2041a3e..356ce88 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -79,7 +79,6 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); - PutVarint64(dst, f.valuelog_id); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); } @@ -179,7 +178,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) { case kNewFile: if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) && GetVarint64(&input, &f.file_size) && - GetVarint64(&input,&f.valuelog_id) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { new_files_.push_back(std::make_pair(level, f)); @@ -249,8 +247,6 @@ std::string VersionEdit::DebugString() const { r.append(" "); AppendNumberTo(&r, f.file_size); r.append(" "); - AppendNumberTo(&r, f.valuelog_id); - r.append(" "); r.append(f.smallest.DebugString()); r.append(" .. "); r.append(f.largest.DebugString()); diff --git a/db/version_edit.h b/db/version_edit.h index 1ee850e..7304aef 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,7 +22,6 @@ struct FileMetaData { int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes - uint64_t valuelog_id=0; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table @@ -63,13 +62,12 @@ class VersionEdit { // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file void AddFile(int level, uint64_t file, uint64_t file_size, - const InternalKey& smallest, const InternalKey& largest,uint64_t valuelog_id=0) { + const InternalKey& smallest, const InternalKey& largest) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; - f.valuelog_id=valuelog_id; new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_set.cc b/db/version_set.cc index 2128186..57a3b56 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -978,13 +978,15 @@ Status VersionSet::Recover(bool* save_manifest) { MarkFileNumberUsed(log_number); } - assert(s.ok()); + //assert(s.ok()); std::vector filenames; env_->GetChildren(dbname_, &filenames); for (const auto& filename:filenames) { - if (IsValueLogFile(filename)){ - uint64_t valuelog_number = GetValueLogID(filename); - //std::cout<& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->valuelog_id); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); } } diff --git a/db/write_batch.cc b/db/write_batch.cc index 0757c39..4ce6bc9 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -131,15 +131,17 @@ class ValueLogInserter : public WriteBatch::Handler { public: WriteBatch writeBatch_; DB* db_; + int use_valuelog_len_; std::vector> kvs; - ValueLogInserter(DB* db){ + ValueLogInserter(DB* db,int use_valuelog_len){ db_=db; + use_valuelog_len_=use_valuelog_len; } void Put(const Slice& key, const Slice& value) override { Slice new_value; std::string buf; - if(value.size()<100){ + if(value.size()Iterate(&inserter); inserter.batch_insert(); *b=inserter.writeBatch_; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 404e0a0..35ebede 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -39,7 +39,7 @@ class WriteBatchInternal { static Status checkValueLog(WriteBatch* batch,DB* db_,Slice* lock_key,port::CondVar* cond_var_); - static Status ConverToValueLog(WriteBatch* batch,DB* db_); + static Status ConverToValueLog(WriteBatch* batch,DB* db_,int use_valuelog_length); static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 95c9d95..6d48ab6 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -124,6 +124,11 @@ class LEVELDB_EXPORT DB { return Status::Corruption("not imp"); } + virtual Status parseTrueValue(Slice* value,std::string* true_value){ + assert(0); + return Status::Corruption("not imp"); + } + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must @@ -133,7 +138,7 @@ class LEVELDB_EXPORT DB { // The returned iterator should be deleted before this db is deleted. virtual Iterator* NewIterator(const ReadOptions& options) = 0; - virtual Iterator* NewUnorderedIterator(const ReadOptions&){ + virtual Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key){ assert(0); return nullptr; }; @@ -190,6 +195,8 @@ class LEVELDB_EXPORT DB { virtual void CompactRange(const Slice* begin, const Slice* end) = 0; virtual void TEST_GarbageCollect(){}; + + }; // Destroy the contents of the specified database. diff --git a/include/leveldb/fields.h b/include/leveldb/fields.h new file mode 100644 index 0000000..a9d2b2a --- /dev/null +++ b/include/leveldb/fields.h @@ -0,0 +1,20 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_ +#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_ + +#include + +#include "leveldb/db.h" + +namespace leveldb { + std::string SerializeValue(const FieldArray& fields); + + void DeserializeValue(const std::string& value_str,FieldArray* res); + + Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys); +} + +#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_ \ No newline at end of file diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 7bd79db..537ee2f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -145,6 +145,17 @@ struct LEVELDB_EXPORT Options { // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. const FilterPolicy* filter_policy = nullptr; + + + //when a value's length>=this value, leveldb will use valuelog to improve performance + int use_valuelog_length=1000;//if -1,then don't use valuelog. If env=memEnv, then it must be set to -1. + + // maximum size of value_log file + int value_log_size=1<<26; + //cache for valuelog(may use lot of memory) + int mem_value_log_number=0;//0=don't use valuelog cache + //memory usage limit for a single unordered iterator + float GC_THRESHOLD=0.6; }; // Options that control read operations @@ -157,6 +168,9 @@ struct LEVELDB_EXPORT ReadOptions { // Callers may wish to set this field to false for bulk scans. bool fill_cache = true; + //if true then return the origin value for data:use one byte to show whether the data belong to valuelog + //if first byte is 0x00, then the rest of data is true value + //else if first byte is 0x01, then the rest of data should use ParseFakeValueForValuelog() to parse its valuelog's id and its offset in valuelog bool find_value_log_for_gc = false; // If "snapshot" is non-null, read as of the supplied snapshot @@ -164,6 +178,8 @@ struct LEVELDB_EXPORT ReadOptions { // not have been released). If "snapshot" is null, use an implicit // snapshot of the state at the beginning of this read operation. const Snapshot* snapshot = nullptr; + + int max_unorder_iter_memory_usage=64<<20; //32MB }; // Options that control write operations diff --git a/test/test.cpp b/test/test.cpp index 383b3cd..d71c25f 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,77 +1,202 @@ #include "gtest/gtest.h" #include "leveldb/env.h" #include "leveldb/db.h" -#include "util/coding.h" +#include "leveldb/fields.h" #include using namespace leveldb; using Field=std::pair; using FieldArray=std::vector>; -Status OpenDB(std::string dbName, DB **db) { - Options options; - options.max_file_size=16*1024; - options.write_buffer_size=32*1024; +Status OpenDB(std::string dbName, DB **db,Options options=Options(),bool destroy_old_db=true) { + if(destroy_old_db){ + DestroyDB(dbName,options); + } options.create_if_missing = true; return DB::Open(options, dbName, db); } -TEST(Test, checkIterator) { +std::string GenKeyByNum(int num,int len){ + std::string key=std::to_string(num); + while(key.size() a, std::vector b) { + if (a.size() != b.size()){ + assert(0); + return false; + } + for (size_t i = 0; i < a.size(); ++i) { + if (a[i] != b[i]){ + assert(0); + return false; + } + } + return true; +} + +TEST(Test, valuelog_iterator_test) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb_for_XOY_search", &db).ok() == false) { + Options dboptions; + dboptions.use_valuelog_length=100; + + int RANGE=5000; + + if(OpenDB("valuelog_iterator_test", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } std::vector values; - for(int i=0;i<5000;i++){ - std::string key=std::to_string(i); - while(key.size()<4){ - key='0'+key; - } - std::string value; - for(int j=0;j<5000;j++){ - value+=std::to_string(i); - } + for(int i=0;iPut(writeOptions,key,value); assert(s.ok()); } auto iter=db->NewIterator(readOptions); iter->SeekToFirst(); - for(int i=0;i<5000;i++){ + for(int i=0;iValid()); auto value=iter->value(); assert(values[i]==value); iter->Next(); } - assert(!iter->Valid()); + ASSERT_FALSE(iter->Valid()); iter->SeekToLast(); - for(int i=4999;i>=0;i--){ + for(int i=RANGE-1;i>=0;i--){ assert(iter->Valid()); auto value=iter->value(); assert(values[i]==value); iter->Prev(); } - assert(!iter->Valid()); - iter->Seek("4990"); - for(int i=4990;i<5000;i++){ + ASSERT_FALSE(iter->Valid()); + iter->Seek(GenKeyByNum(RANGE/2,RANGE)); + for(int i=RANGE/2;iValid()); auto value=iter->value(); assert(values[i]==value); iter->Next(); } - assert(!iter->Valid()); + ASSERT_FALSE(iter->Valid()); delete iter; delete db; } -TEST(Test, CheckGetFields) { +TEST(Test, mix_valuelog_iterator_test) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { + Options dboptions; + dboptions.use_valuelog_length=4000; + + int RANGE=5000; + + if(OpenDB("mix_valuelog_iterator_test", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + std::vector values; + for(int i=0;i1000 then in valuelog(length=4*1000) + values.push_back(value); + Status s=db->Put(writeOptions,key,value); + assert(s.ok()); + } + auto iter=db->NewIterator(readOptions); + iter->SeekToFirst(); + for(int i=0;iValid()); + auto value=iter->value(); + assert(values[i]==value); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + delete iter; + delete db; +} + +TEST(Test, unorder_valuelog_iterator_test) { + DB *db; + WriteOptions writeOptions; + ReadOptions readOptions; + Options dboptions; + dboptions.use_valuelog_length=4000; + + int RANGE=5000; + + if(OpenDB("valuelog_iterator_test", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + std::vector values; + std::vector> new_values; + for(int i=0;i1000 then in valuelog(length=4*1000) + values.push_back(value); + Status s=db->Put(writeOptions,key,value); + assert(s.ok()); + } + auto iter=db->NewUnorderedIterator(readOptions,Slice(),Slice()); + for(int i=0;iValid()); + new_values.push_back({std::string(iter->key().data(),iter->key().size()),std::string(iter->value().data(),iter->value().size())}); + iter->Next(); + } + std::sort(new_values.begin(),new_values.end()); + for(int i=0;iValid()); + delete iter; + + iter=db->NewUnorderedIterator(readOptions,GenKeyByNum(RANGE/4,RANGE),GenKeyByNum(RANGE/2,RANGE)); + new_values.clear(); + for(int i=RANGE/4;iValid()); + new_values.push_back({std::string(iter->key().data(),iter->key().size()),std::string(iter->value().data(),iter->value().size())}); + iter->Next(); + } + std::sort(new_values.begin(),new_values.end()); + for(int i=RANGE/4;iValid()); + + delete iter; + delete db; +} + + +TEST(Test, fields_simple_test) { + DB *db; + WriteOptions writeOptions; + ReadOptions readOptions; + Options dbOptions; + dbOptions.use_valuelog_length=-1; + if(OpenDB("fields_simple_test", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } @@ -92,22 +217,17 @@ TEST(Test, CheckGetFields) { db->Get(ReadOptions(), key1, &value_ret); DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Delete(WriteOptions(),key1); - - std::cout<<"get serialized value done"<Delete(WriteOptions(),s); - } delete db; } -TEST(Test, LARGE_DATA_COMPACT_TEST) { +TEST(Test, valuelog_common_test) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { + Options dbOptions; + dbOptions.use_valuelog_length=100; + if(OpenDB("valuelog_common_test", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } + //test Put std::vector values; for(int i=0;i<50000;i++){ std::string key=std::to_string(i); @@ -158,17 +277,41 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<50000;i++){ + for(int i=0;i<50000;i++){ std::string key=std::to_string(i); std::string value; Status s=db->Get(readOptions,key,&value); assert(s.ok()); - if(values[i]!=value){ - std::cout<Put(writeOptions,key,value); + } + for(int i=0;i<50000;i++){ + std::string key=std::to_string(i); + std::string value; + Status s=db->Get(readOptions,key,&value); + assert(s.ok()); ASSERT_TRUE(values[i]==value); } + //test delete + for(int i=0;i<50000;i++){ + std::string key=std::to_string(i); + db->Delete(writeOptions,key); + } + for(int i=0;i<50000;i++){ + std::string key=std::to_string(i); + std::string value; + Status s=db->Get(readOptions,key,&value); + ASSERT_TRUE(s.IsNotFound()); + } delete db; } @@ -190,20 +333,13 @@ TEST(Test, Garbage_Collect_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - std::cout<<"start gc"<TEST_GarbageCollect(); - std::cout<<"finish gc"<Get(readOptions,key,&value); assert(s.ok()); - if(values[i]!=value){ - std::cout< suffix.size() && - filename.substr(filename.size() - suffix.size()) == suffix; -} - /** * @brief 解析存储值,提取 valuelog_id 和 offset 信息 * @@ -180,51 +165,13 @@ bool IsValueLogFile(const std::string& filename) { * @param valuelog_id 输出的 ValueLog 文件 ID * @param offset 输出的记录偏移量 */ -void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, +Status ParseFakeValueForValuelog(Slice stored_value, uint64_t& valuelog_id, uint64_t& offset) { // 假设 stored_value 格式为:valuelog_id|offset Slice tmp(stored_value.data(), stored_value.size()); - GetVarint64(&tmp, &valuelog_id); - GetVarint64(&tmp, &offset); -} - -/** - * @brief 根据文件名提取 ValueLog 文件的 ID - * - * @param valuelog_name 文件名(例如 "123.valuelog") - * @return uint64_t 提取的 ValueLog 文件 ID - */ -uint64_t GetValueLogID(const std::string& valuelog_name) { - - // 获取文件名部分(假设文件名格式为 "number.extension") - size_t pos = valuelog_name.find_last_of('/'); - std::string filename; - if (pos != std::string::npos) { - filename = valuelog_name.substr(pos + 1); - } else { - filename = valuelog_name; - } - - // 查找文件名中的 '.' 位置,提取数字部分 - pos = filename.find('.'); - assert(pos != std::string::npos); - - // 提取数字部分 - std::string id_str = filename.substr(0, pos); - - // 检查文件扩展名是否为 .valuelog - if (filename.substr(pos + 1) != "valuelog") { - assert(0); - } - - // 转换为 uint64_t - uint64_t id; - std::istringstream iss(id_str); - if (!(iss >> id)) { - assert(0); - } - - return id; + if(!GetVarint64(&tmp, &valuelog_id))return Status::NotSupported("can't decode a valuelog value from its meta info"); + if(!GetVarint64(&tmp, &offset))return Status::NotSupported("can't decode a valuelog value from its meta info"); + return Status::OK(); } // Helper function to split the set of files into chunks @@ -239,76 +186,4 @@ void SplitIntoChunks(const std::set& files, int num_workers, } -bool CompareFieldArray(const FieldArray &a, const FieldArray &b) { - if (a.size() != b.size()) return false; - for (size_t i = 0; i < a.size(); ++i) { - if (a[i].first != b[i].first || a[i].second != b[i].second) return false; - } - return true; -} - -bool CompareKey(const std::vector a, std::vector b) { - if (a.size() != b.size()){ - assert(0); - return false; - } - for (size_t i = 0; i < a.size(); ++i) { - if (a[i] != b[i]){ - assert(0); - return false; - } - } - return true; -} - -std::string SerializeValue(const FieldArray& fields){ - std::string res_=""; - PutVarint64(&res_,(uint64_t)fields.size()); - for(auto pr:fields){ - PutLengthPrefixedSlice(&res_, pr.first); - PutLengthPrefixedSlice(&res_, pr.second); - } - return res_; -} - -void DeserializeValue(const std::string& value_str,FieldArray* res){ - Slice slice=Slice(value_str.c_str()); - uint64_t siz; - bool tmpres=GetVarint64(&slice,&siz); - assert(tmpres); - res->clear(); - for(int i=0;iemplace_back(value_name,value); - } -} - -Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys){ - auto it=db->NewUnorderedIterator(options); - it->SeekToFirst(); - keys->clear(); - while(it->Valid()){ - auto val=it->value(); - FieldArray arr; - auto str_val=std::string(val.data(),val.size()); - DeserializeValue(str_val,&arr); - for(auto pr:arr){ - if(pr.first==field.first&&pr.second==field.second){ - Slice key=it->key(); - keys->push_back(std::string(key.data(),key.size())); - break; - } - } - it->Next(); - } - delete it; - return Status::OK(); -} - - } // namespace leveldb diff --git a/util/coding.h b/util/coding.h index 4c7cfa2..6ce63c8 100644 --- a/util/coding.h +++ b/util/coding.h @@ -121,19 +121,12 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit, return GetVarint32PtrFallback(p, limit, value); } -bool IsValueLogFile(const std::string& filename); -void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, +Status ParseFakeValueForValuelog(Slice stored_value, uint64_t& valuelog_id, uint64_t& offset); -uint64_t GetValueLogID(const std::string& valuelog_name); + void SplitIntoChunks(const std::set& files, int num_workers, std::vector>* chunks); -bool CompareFieldArray(const FieldArray &a, const FieldArray &b); -bool CompareKey(const std::vector a, std::vector b); -std::string SerializeValue(const FieldArray& fields); -void DeserializeValue(const std::string& value_str,FieldArray* res); -Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys); - } // namespace leveldb