From b00736b43ffa4f3f09920cd1feee024ce9e0192c Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Sat, 28 Dec 2024 14:58:46 +0800 Subject: [PATCH] add unorder iter --- CMakeLists.txt | 2 + benchmarks/db_bench.cc | 19 +++++- db/db_impl.cc | 10 +++- db/db_impl.h | 4 ++ db/dbformat.h | 2 + db/unordered_iter.cc | 157 +++++++++++++++++++++++++++++++++++++++++-------- db/unordered_iter.h | 8 +-- include/leveldb/db.h | 5 ++ test/test.cpp | 2 + util/coding.cc | 4 +- 10 files changed, 182 insertions(+), 31 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e776d2..d2898b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -131,6 +131,8 @@ target_sources(leveldb "db/db_iter.h" "db/prefetch_iter.cc" "db/prefetch_iter.h" + "db/unordered_iter.cc" + "db/unordered_iter.h" "db/dbformat.cc" "db/dbformat.h" "db/dumpfile.cc" diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 12742fa..0a54d36 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -49,14 +49,14 @@ static const char* FLAGS_benchmarks = "fillsync," "fillrandom," "overwrite," - "overwrite," - "overwrite," "readrandom," "readrandom," // Extra run to allow previous compactions to quiesce + "readunorderseq," "readseq," "readreverse," "compact," "readrandom," + "readunorderseq," "readseq," "readreverse," "fill100K," @@ -607,6 +607,8 @@ class Benchmark { method = &Benchmark::WriteRandom; } else if (name == Slice("readseq")) { method = &Benchmark::ReadSequential; + } else if (name == Slice("readunorderseq")) { + method = &Benchmark::ReadUnorderSequential; } else if (name == Slice("readreverse")) { method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { @@ -880,6 +882,19 @@ class Benchmark { thread->stats.AddBytes(bytes); } + void ReadUnorderSequential(ThreadState* thread) { + Iterator* iter = db_->NewUnorderedIterator(ReadOptions()); + int i = 0; + int64_t bytes = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); + ++i; + } + delete iter; + thread->stats.AddBytes(bytes); + } + void ReadReverse(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; diff --git a/db/db_impl.cc b/db/db_impl.cc index 2cd4377..1bda06b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -7,6 +7,7 @@ #include "db/builder.h" #include "db/db_iter.h" #include "db/prefetch_iter.h" +#include "db/unordered_iter.h" #include "db/dbformat.h" #include "db/filename.h" #include "db/log_reader.h" @@ -1292,6 +1293,11 @@ Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) { return db_iter; } +Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options) { + auto iter=NewOriginalIterator(options); + return NewUnorderedIter(this,iter,dbname_); +} + Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; @@ -2099,7 +2105,9 @@ void DBImpl::InitializeExistingLogs() { auto value=db_iter->value(); if(value.size()&&value[0]==0x01){ value.remove_prefix(1); - uint64_t valuelog_id=*(uint64_t*)value.data(); + uint64_t valuelog_id; + auto res=GetVarint64(&value,&valuelog_id); + assert(res); assert(valuelog_usage.count(valuelog_id)); valuelog_usage[valuelog_id]++; } diff --git a/db/db_impl.h b/db/db_impl.h index 8b98055..898690b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -62,6 +62,7 @@ class DBImpl : public DB { std::string* value) override; Iterator* NewIterator(const ReadOptions&) override; Iterator* NewOriginalIterator(const ReadOptions&); + Iterator* NewUnorderedIterator(const ReadOptions&) override; const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; bool GetProperty(const Slice& property, std::string* value) override; @@ -77,6 +78,9 @@ class DBImpl : public DB { Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value) override; + Status ReadValueLogRange(uint64_t file_id,std::vector offsets, + std::string* value); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin,*end] diff --git a/db/dbformat.h b/db/dbformat.h index c6a093b..61ad581 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -49,6 +49,8 @@ static const int value_log_size=1<<26; //1<<33/1<<26=1<<7 static const int mem_value_log_number=1<<8;//8GB +static const int max_unorder_iter_memory_usage=32<<20; //32MB + } // namespace config class InternalKey; diff --git a/db/unordered_iter.cc b/db/unordered_iter.cc index b440147..247b6cd 100644 --- a/db/unordered_iter.cc +++ b/db/unordered_iter.cc @@ -17,6 +17,8 @@ #include "util/mutexlock.h" #include "util/random.h" #include "port/port.h" +#include +#include namespace leveldb { @@ -38,23 +40,26 @@ class UnorderedIter : public Iterator { // just before all entries whose user key == this->key(). enum IterPos {Left,Mid,Right}; - UnorderedIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num) + UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name) : - db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {} + db_(db),iter_(iter),db_name_(db_name){} UnorderedIter(const UnorderedIter&) = delete; UnorderedIter& operator=(const UnorderedIter&) = delete; ~UnorderedIter() override { + if(current_file&¤t_file->is_open()){ + current_file->close(); + delete current_file; + } delete iter_; } - bool Valid() const override { return iter_->Valid(); } + bool Valid() const override { return mode!=2; } Slice key() const override { - return iter_->key(); + return now_key; } Slice value() const override { - buf_for_value=std::move(GetAndParseTrueValue(iter_->value())); - return Slice(buf_for_value.data(),buf_for_value.size()); + return now_value; } Status status() const override { return iter_->status(); @@ -67,49 +72,155 @@ class UnorderedIter : public Iterator { void SeekToLast() override; private: - std::string GetAndParseTrueValue(Slice tmp_value)const{ - if(tmp_value.size()==0){ - return ""; - } - if(tmp_value.data()[0]==(char)(0x00)){ - tmp_value.remove_prefix(1); - return std::string(tmp_value.data(),tmp_value.size()); - } + std::pair GetAndParseValue(Slice tmp_value)const{ tmp_value.remove_prefix(1); uint64_t file_id,valuelog_offset; bool res=GetVarint64(&tmp_value,&file_id); if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - std::string str; - Status s=db_->ReadValueLog(file_id,valuelog_offset, &str); - return std::move(str); + return {file_id,valuelog_offset}; + } + + bool checkLongValue(Slice value){ + return value.size()&&value.data()[0]==(0x01); + } + + void MyReadValuelog(const uint64_t& offset){ + uint64_t value_len,key_len; + current_file->seekg(offset); + current_file->read((char*)(&value_len),sizeof(uint64_t)); + + char buf[value_len]; + current_file->read(buf,value_len); + buf_for_now_value=std::string(buf,value_len); + + current_file->read((char*)(&key_len),sizeof(uint64_t)); + + char key_buf[key_len]; + current_file->read(key_buf,key_len); + buf_for_now_key=std::string(key_buf,key_len); + + now_key=Slice(buf_for_now_key); + now_value=Slice(buf_for_now_value); } DBImpl* db_; Iterator* const iter_; + Slice now_value; + Slice now_key; + std::string buf_for_now_key; + std::string buf_for_now_value; + bool iter_valid=false; + std::map> valuelog_map; + int memory_usage=0; + uint64_t max_memory_usage=config::max_unorder_iter_memory_usage; + + std::string db_name_; + std::ifstream* current_file=nullptr; + + bool first_one=false; + + int mode=0;//0=iter, 1=valuelog, 2=invalid + std::map>::iterator valuelog_map_iter; + int vec_idx=-1; }; + + + void UnorderedIter::Next() { - iter_->Next(); + if(mode==0){ + if(iter_->Valid()) + { + if(first_one){ + first_one=false; + } + else iter_->Next(); + for(; + iter_->Valid()&&memory_usageNext()) + { + if(checkLongValue(iter_->value())){ + auto pr=GetAndParseValue(iter_->value()); + valuelog_map[pr.first].push_back(pr.second); + } + else{ + now_key=iter_->key(); + now_value=iter_->value(); + now_value.remove_prefix(1); + return; + } + } + } + + valuelog_map_iter=valuelog_map.begin(); + if(valuelog_map_iter!=valuelog_map.end()){ + std::string file_name_ = ValueLogFileName(db_name_, valuelog_map_iter->first); + assert(!current_file); + current_file=new std::ifstream(file_name_, std::ios::in | std::ios::binary); + vec_idx=0; + } + + } + + mode=1; + + if(valuelog_map_iter==valuelog_map.end()){ + mode=2; + return; + } + + int offset=valuelog_map_iter->second[vec_idx++]; + MyReadValuelog(offset); + + if(vec_idx>=valuelog_map_iter->second.size()){ + valuelog_map_iter++; + + if(valuelog_map_iter==valuelog_map.end()){ + valuelog_map.clear(); + memory_usage=0; + mode=0; + if(current_file){ + current_file->close(); + delete current_file; + current_file=nullptr; + } + } + else{ + std::string file_name_ = ValueLogFileName(db_name_, valuelog_map_iter->first); + if(current_file){ + current_file->close(); + delete current_file; + } + current_file=new std::ifstream(file_name_, std::ios::in | std::ios::binary); + vec_idx=0; + } + + } + + + } void UnorderedIter::Prev() { - iter_->Prev(); + assert(0); } void UnorderedIter::Seek(const Slice& target) { - iter_->Seek(target); + assert(0); } void UnorderedIter::SeekToFirst() { + first_one=true; iter_->SeekToFirst(); + Next(); } void UnorderedIter::SeekToLast() { - iter_->SeekToLast(); + assert(0); } } // anonymous namespace -Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter) { - return new UnorderedIter(db,db_iter); +Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name) { + return new UnorderedIter(db,db_iter,db_name); } } // namespace leveldb diff --git a/db/unordered_iter.h b/db/unordered_iter.h index 920c573..685d0f7 100644 --- a/db/unordered_iter.h +++ b/db/unordered_iter.h @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ -#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ +#ifndef STORAGE_LEVELDB_DB_UNORDERED_ITER_H_ +#define STORAGE_LEVELDB_DB_UNORDERED_ITER_H_ #include @@ -15,8 +15,8 @@ namespace leveldb { class DBImpl; // add a prefetch function for db_iter -Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter); +Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name); } // namespace leveldb -#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ +#endif // STORAGE_LEVELDB_DB_UNORDERED_ITER_H_ diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 0081d22..95c9d95 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -133,6 +133,11 @@ class LEVELDB_EXPORT DB { // The returned iterator should be deleted before this db is deleted. virtual Iterator* NewIterator(const ReadOptions& options) = 0; + virtual Iterator* NewUnorderedIterator(const ReadOptions&){ + assert(0); + return nullptr; + }; + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the diff --git a/test/test.cpp b/test/test.cpp index 0b073ac..383b3cd 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -130,6 +130,8 @@ TEST(Test, CheckSearchKey) { std::sort(target_keys.begin(),target_keys.end()); std::vector key_res; Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); + std::sort(key_res.begin(),key_res.end()); + std::sort(target_keys.begin(),target_keys.end()); ASSERT_TRUE(CompareKey(key_res, target_keys)); std::cout<<"get key by field done"< a, std::vector b) { if (a.size() != b.size()){ + assert(0); return false; } for (size_t i = 0; i < a.size(); ++i) { if (a[i] != b[i]){ + assert(0); return false; } } @@ -287,7 +289,7 @@ void DeserializeValue(const std::string& value_str,FieldArray* res){ } Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys){ - auto it=db->NewIterator(options); + auto it=db->NewUnorderedIterator(options); it->SeekToFirst(); keys->clear(); while(it->Valid()){