From 8dafb3af8f133f5e206b941161a7d090c505161f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E4=BA=BA=E9=B1=BC?= <1823748191@qq.com> Date: Thu, 12 Dec 2024 19:13:49 +0800 Subject: [PATCH] range prefetch(length=1) maybe finish --- CMakeLists.txt | 2 + db/db_impl.cc | 11 +- db/db_iter.cc | 19 +-- db/prefetch_iter.cc | 337 ++++++++++++++++++++++++++++++++++++++++++++++++++++ db/prefetch_iter.h | 22 ++++ port/port.h | 2 +- test/test.cpp | 245 +++++++++++++++++++++++--------------- 7 files changed, 519 insertions(+), 119 deletions(-) create mode 100644 db/prefetch_iter.cc create mode 100644 db/prefetch_iter.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8fd8ce7..d5a57ed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -129,6 +129,8 @@ target_sources(leveldb "db/db_impl.h" "db/db_iter.cc" "db/db_iter.h" + "db/prefetch_iter.cc" + "db/prefetch_iter.h" "db/dbformat.cc" "db/dbformat.h" "db/dumpfile.cc" diff --git a/db/db_impl.cc b/db/db_impl.cc index 45c7a4b..0d3dcd6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6,6 +6,7 @@ #include "db/builder.h" #include "db/db_iter.h" +#include "db/prefetch_iter.h" #include "db/dbformat.h" #include "db/filename.h" #include "db/log_reader.h" @@ -1262,12 +1263,13 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); - return NewDBIterator(this, user_comparator(), iter, + auto db_iter=NewDBIterator(this, user_comparator(), iter, (options.snapshot != nullptr ? static_cast(options.snapshot) ->sequence_number() : latest_snapshot), seed); + return NewPreFetchIterator(this,db_iter); } void DBImpl::RecordReadSample(Slice key) { @@ -1754,15 +1756,8 @@ void DBImpl::GarbageCollect() { if(!versions_->checkOldValueLog(tmp_name))valuelog_set.emplace(filename); } } - //bool tmp_judge=false;//only clean one file for (std::string valuelog_name : valuelog_set) { Log(options_.info_log, ("gc processing: "+valuelog_name).data()); - // if(tmp_judge){ - // break; - // } - // else{ - // tmp_judge=true; - // } uint64_t cur_log_number = GetValueLogID(valuelog_name); valuelog_name = ValueLogFileName(dbname_, cur_log_number); if (cur_log_number == valuelogfile_number_) { diff --git a/db/db_iter.cc b/db/db_iter.cc index e27f1a4..bc68aaa 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -68,23 +68,8 @@ class DBIter : public Iterator { } Slice value() const override { assert(valid_); - auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; - Slice key; - if(tmp_value.data()[0]==0x00){ - tmp_value.remove_prefix(1); - return tmp_value; - } - tmp_value.remove_prefix(1); - uint64_t file_id,valuelog_offset,valuelog_len; - bool res=GetVarint64(&tmp_value,&file_id); - if(!res)assert(0); - res=GetVarint64(&tmp_value,&valuelog_offset); - if(!res)assert(0); - // res=GetVarint64(&tmp_value,&valuelog_len); - // if(!res)assert(0); - // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); - db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); - return tmp_value; + Slice val = (direction_ == kForward) ? iter_->value() : saved_value_; + return val; } Status status() const override { if (status_.ok()) { diff --git a/db/prefetch_iter.cc b/db/prefetch_iter.cc new file mode 100644 index 0000000..ac811fc --- /dev/null +++ b/db/prefetch_iter.cc @@ -0,0 +1,337 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include "db/prefetch_iter.h" + +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "port/port.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "port/port.h" + +namespace leveldb { + +namespace { + + + +// Memtables and sstables that make the DB representation contain +// (userkey,seq,type) => uservalue entries. DBPreFetchIter +// combines multiple entries for the same userkey found in the DB +// representation into a single entry while accounting for sequence +// numbers, deletion markers, overwrites, etc. +class DBPreFetchIter : public Iterator { + public: + // Which direction is the iterator currently moving? + // (1) When moving forward, the internal iterator is positioned at + // the exact entry that yields this->key(), this->value() + // (2) When moving backwards, the internal iterator is positioned + // just before all entries whose user key == this->key(). + enum IterPos {Left,Mid,Right}; + + DBPreFetchIter(DBImpl* db, Iterator* iter) + : + db_(db),iter_(iter) {} + + DBPreFetchIter(const DBPreFetchIter&) = delete; + DBPreFetchIter& operator=(const DBPreFetchIter&) = delete; + + ~DBPreFetchIter() override { delete iter_; } + bool Valid() const override { return iter_->Valid(); } + Slice key() const override { + return iter_->key(); + } + Slice value() const override { + if(current_valid_)return current_value_; + else assert(0); + } + Status status() const override { + return iter_->status(); + } + + void Next() override; + void Prev() override; + void Seek(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + + private: + Slice GetAndParseTrueValue(){ + Slice tmp_value=iter_->value(); + Slice key; + if(tmp_value.data()[0]==0x00){ + tmp_value.remove_prefix(1); + return tmp_value; + } + tmp_value.remove_prefix(1); + uint64_t file_id,valuelog_offset,valuelog_len; + bool res=GetVarint64(&tmp_value,&file_id); + if(!res)assert(0); + res=GetVarint64(&tmp_value,&valuelog_offset); + if(!res)assert(0); + db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); + return tmp_value; + } + + void prefetch_left(){ + prefetch_mutex.AssertHeld(); + left_value_=GetAndParseTrueValue(); + left_valid_=true; + prefetch_mutex.Unlock(); + } + + void prefetch_right(){ + prefetch_mutex.AssertHeld(); + right_value_=GetAndParseTrueValue(); + right_valid_=true; + prefetch_mutex.Unlock(); + } + + port::Mutex prefetch_mutex; + DBImpl* db_; + Iterator* const iter_; + Slice left_value_; + Slice current_value_; + Slice right_value_; + bool left_valid_=false; + bool current_valid_=false; + bool right_valid_=false; + IterPos iter_pos_; +}; +void DBPreFetchIter::Next() { +prefetch_mutex.Lock(); +prefetch_mutex.Unlock(); +assert(iter_->Valid()); + if(iter_pos_==IterPos::Left){ + + iter_->Next(); + assert(current_valid_); + left_value_=current_value_; + left_valid_=true; + + iter_->Next(); + if(!iter_->Valid()){ + iter_pos_=IterPos::Mid; + current_valid_=false; + return; + } + assert(right_valid_); + current_value_=right_value_; + current_valid_=true; + + iter_->Next(); + if(!iter_->Valid()){ + //back to last + iter_->SeekToLast(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + right_valid_=false; + return; + } + + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_right(); + }).detach(); + iter_pos_=IterPos::Right; + } + + else if(iter_pos_==IterPos::Mid){ + assert(current_valid_); + left_value_=current_value_; + left_valid_=true; + + iter_->Next(); + if(!iter_->Valid()){ + iter_pos_=IterPos::Mid; + current_valid_=false; + return; + } + if(right_valid_)current_value_=right_value_; + else current_value_=GetAndParseTrueValue(); + current_valid_=true; + + iter_->Next(); + if(!iter_->Valid()){ + //back to last + iter_->SeekToLast(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + right_valid_=false; + return; + } + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_right(); + }).detach(); + iter_pos_=IterPos::Right; + } + + else if(iter_pos_==IterPos::Right){ + assert(current_valid_); + left_value_=current_value_; + left_valid_=true; + + assert(right_valid_); + current_value_=right_value_; + current_valid_=true; + + iter_->Next(); + if(!iter_->Valid()){ + //back to last + iter_->SeekToLast(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + right_valid_=false; + return; + } + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_right(); + }).detach(); + iter_pos_=IterPos::Right; + } + +} +void DBPreFetchIter::Prev() { +prefetch_mutex.Lock(); +prefetch_mutex.Unlock(); +assert(iter_->Valid()); + if(iter_pos_==IterPos::Left){ + assert(current_valid_); + right_value_=current_value_; + right_valid_=true; + + assert(left_valid_); + current_value_=left_value_; + current_valid_=true; + + iter_->Prev(); + if(!iter_->Valid()){ + //back to first + iter_->SeekToFirst(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + left_valid_=false; + return; + } + + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_left(); + }).detach(); + iter_pos_=IterPos::Left; + } + + else if(iter_pos_==IterPos::Mid){ + assert(current_valid_); + right_value_=current_value_; + right_valid_=true; + + iter_->Prev(); + if(!iter_->Valid()){ + iter_pos_=IterPos::Mid; + current_valid_=false; + return; + } + if(left_valid_)current_value_=left_value_; + else current_value_=GetAndParseTrueValue(); + current_valid_=true; + + iter_->Prev(); + if(!iter_->Valid()){ + //back to first + iter_->SeekToFirst(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + left_valid_=false; + return; + } + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_left(); + }).detach(); + iter_pos_=IterPos::Left; + } + + else if(iter_pos_==IterPos::Right){ + iter_->Prev(); + assert(current_valid_); + right_value_=current_value_; + + iter_->Prev(); + if(!iter_->Valid()){ + iter_pos_=IterPos::Mid; + current_valid_=false; + return; + } + current_valid_=true; + assert(left_valid_); + current_value_=left_value_; + + iter_->Prev(); + if(!iter_->Valid()){ + //back to first + iter_->SeekToFirst(); + assert(iter_->Valid()); + iter_pos_=IterPos::Mid; + left_valid_=false; + return; + } + prefetch_mutex.Lock(); + std::thread([this]() { + prefetch_left(); + }).detach(); + iter_pos_=IterPos::Left; + } +} + +void DBPreFetchIter::Seek(const Slice& target) { + iter_->Seek(target); + left_valid_=false; + right_valid_=false; + current_valid_=false; + iter_pos_=IterPos::Mid; + if(iter_->Valid()){ + current_value_=GetAndParseTrueValue(); + current_valid_=true; + } +} + +void DBPreFetchIter::SeekToFirst() { + iter_->SeekToFirst(); + left_valid_=false; + right_valid_=false; + iter_pos_=IterPos::Mid; + if(iter_->Valid()){ + current_valid_=true; + current_value_=GetAndParseTrueValue(); + } + else current_valid_=false; + +} +void DBPreFetchIter::SeekToLast() { + iter_->SeekToLast(); + left_valid_=false; + right_valid_=false; + iter_pos_=IterPos::Mid; + if(iter_->Valid()){ + current_valid_=true; + current_value_=GetAndParseTrueValue(); + } + else current_valid_=false; +} + +} // anonymous namespace +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter) { + return new DBPreFetchIter(db,db_iter); +} +} // namespace leveldb diff --git a/db/prefetch_iter.h b/db/prefetch_iter.h new file mode 100644 index 0000000..74ca618 --- /dev/null +++ b/db/prefetch_iter.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ +#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ + +#include + +#include "db/dbformat.h" +#include "leveldb/db.h" + +namespace leveldb { + +class DBImpl; + +// add a prefetch function for db_iter +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ diff --git a/port/port.h b/port/port.h index 4b247f7..5174fd4 100644 --- a/port/port.h +++ b/port/port.h @@ -4,7 +4,7 @@ #ifndef STORAGE_LEVELDB_PORT_PORT_H_ #define STORAGE_LEVELDB_PORT_PORT_H_ - +//#define LEVELDB_PLATFORM_POSIX #include // Include the appropriate platform specific file below. If you are diff --git a/test/test.cpp b/test/test.cpp index b0d17c3..08ec745 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -2,7 +2,7 @@ #include "leveldb/env.h" #include "leveldb/db.h" #include "util/coding.h" - +#include using namespace leveldb; using Field=std::pair; @@ -16,111 +16,138 @@ Status OpenDB(std::string dbName, DB **db) { return DB::Open(options, dbName, db); } - -TEST(Test, CheckGetFields) { +TEST(Test, checkIterator) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; - - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; - - auto value1=SerializeValue(fields1); - - db->Put(WriteOptions(), key1, value1); - - std::string value_ret; - FieldArray res1; - - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Delete(WriteOptions(),key1); - - std::cout<<"get serialized value done"< keys; - std::vector target_keys; - for(int i=0;i<10000;i++){ - std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys - FieldArray fields={ - {"name", key}, - {"address", std::to_string(rand()%7)}, - {"phone", std::to_string(rand()%114514)} - }; - if(rand()%5==0){ - fields[0].second="special_key"; - target_keys.push_back(key); - } - keys.push_back(key); - db->Put(WriteOptions(),key,SerializeValue(fields)); - } - std::sort(target_keys.begin(),target_keys.end()); - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); - ASSERT_TRUE(CompareKey(key_res, target_keys)); - std::cout<<"get key by field done"<Delete(WriteOptions(),s); - } - delete db; -} - -TEST(Test, LARGE_DATA_COMPACT_TEST) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { + if(OpenDB("testdb_for_XOY_search", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } std::vector values; - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); + while(key.size()<4){ + key='0'+key; + } std::string value; for(int j=0;j<5000;j++){ value+=std::to_string(i); } values.push_back(value); - db->Put(writeOptions,key,value); - } - for(int i=0;i<500000;i++){ - std::string key=std::to_string(i); - std::string value; - Status s=db->Get(readOptions,key,&value); + Status s=db->Put(writeOptions,key,value); assert(s.ok()); - if(values[i]!=value){ - std::cout<NewIterator(readOptions); + iter->SeekToFirst(); + for(int i=0;i<5000;i++){ + assert(iter->Valid()); + auto value=iter->value(); + if(value!=values[i]){ + std::cout<Next(); + } + assert(!iter->Valid()); + iter->SeekToLast(); + for(int i=4999;i>=0;i--){ + assert(iter->Valid()); + auto value=iter->value(); + if(value!=values[i]){ + std::cout<Prev(); + } + assert(!iter->Valid()); + iter->Seek("4990"); + for(int i=4990;i<5000;i++){ + assert(iter->Valid()); + auto value=iter->value(); + if(value!=values[i]){ + std::cout<Next(); } + assert(!iter->Valid()); + delete iter; delete db; } -TEST(Test, Garbage_Collect_TEST) { +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; + +// FieldArray fields1 = { +// {"name", "Customer#000000001"}, +// {"address", "IVhzIApeRb"}, +// {"phone", "25-989-741-2988"} +// }; + +// auto value1=SerializeValue(fields1); + +// db->Put(WriteOptions(), key1, value1); + +// std::string value_ret; +// FieldArray res1; + +// db->Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout<Delete(WriteOptions(),key1); + +// std::cout<<"get serialized value done"< keys; +// std::vector target_keys; +// for(int i=0;i<10000;i++){ +// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys +// FieldArray fields={ +// {"name", key}, +// {"address", std::to_string(rand()%7)}, +// {"phone", std::to_string(rand()%114514)} +// }; +// if(rand()%5==0){ +// fields[0].second="special_key"; +// target_keys.push_back(key); +// } +// keys.push_back(key); +// db->Put(WriteOptions(),key,SerializeValue(fields)); +// } +// std::sort(target_keys.begin(),target_keys.end()); +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); +// ASSERT_TRUE(CompareKey(key_res, target_keys)); +// std::cout<<"get key by field done"<Delete(WriteOptions(),s); +// } +// delete db; +// } + +TEST(Test, LARGE_DATA_COMPACT_TEST) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; @@ -132,18 +159,13 @@ TEST(Test, Garbage_Collect_TEST) { for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; - for(int j=0;j<1000;j++){ + for(int j=0;j<5000;j++){ value+=std::to_string(i); } values.push_back(value); db->Put(writeOptions,key,value); } - std::cout<<"start gc"<TEST_GarbageCollect(); - std::cout<<"finish gc"<Get(readOptions,key,&value); @@ -157,6 +179,43 @@ TEST(Test, Garbage_Collect_TEST) { delete db; } +// TEST(Test, Garbage_Collect_TEST) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::vector values; +// for(int i=0;i<5000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// for(int j=0;j<1000;j++){ +// value+=std::to_string(i); +// } +// values.push_back(value); +// db->Put(writeOptions,key,value); +// } +// std::cout<<"start gc"<TEST_GarbageCollect(); +// std::cout<<"finish gc"<Get(readOptions,key,&value); +// assert(s.ok()); +// if(values[i]!=value){ +// std::cout<