From f9577338e5f2d181300e3753ba15d218e8da7fce Mon Sep 17 00:00:00 2001 From: dgy Date: Fri, 13 Dec 2024 07:48:41 +0000 Subject: [PATCH] prefetch update, range search speed up to 600MB/S --- db/db_impl.cc | 21 ++- db/prefetch_iter.cc | 368 +++++++++++++++++++--------------------------------- db/prefetch_iter.h | 2 +- 3 files changed, 156 insertions(+), 235 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 0d3dcd6..ab747ba 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1262,14 +1262,29 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; + int iter_num=24; + std::vector iters(iter_num, nullptr); + for(int i=0;i(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); + iters[i]=db_iter; + } + Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); auto db_iter=NewDBIterator(this, user_comparator(), iter, - (options.snapshot != nullptr + (options.snapshot != nullptr ? static_cast(options.snapshot) ->sequence_number() : latest_snapshot), - seed); - return NewPreFetchIterator(this,db_iter); + seed); + + + return NewPreFetchIterator(this,db_iter,iters,iter_num); } void DBImpl::RecordReadSample(Slice key) { diff --git a/db/prefetch_iter.cc b/db/prefetch_iter.cc index ac811fc..6c868ab 100644 --- a/db/prefetch_iter.cc +++ b/db/prefetch_iter.cc @@ -37,21 +37,37 @@ class DBPreFetchIter : public Iterator { // just before all entries whose user key == this->key(). enum IterPos {Left,Mid,Right}; - DBPreFetchIter(DBImpl* db, Iterator* iter) + DBPreFetchIter(DBImpl* db, Iterator* iter, std::vector prefetch_iters,int prefetch_num) : - db_(db),iter_(iter) {} + db_(db),iter_(iter),prefetch_iters_(prefetch_iters),prefetch_num_(prefetch_num) {} DBPreFetchIter(const DBPreFetchIter&) = delete; DBPreFetchIter& operator=(const DBPreFetchIter&) = delete; - ~DBPreFetchIter() override { delete iter_; } + ~DBPreFetchIter() override { + if(prefetch_thread.joinable()){ + stop_flag.store(true); + prefetch_thread.join(); + for(auto iter:prefetch_iters_){ + delete iter; + } + } + std::cout<<"fetch:"<Valid(); } Slice key() const override { return iter_->key(); } Slice value() const override { - if(current_valid_)return current_value_; - else assert(0); + if(cur_pos>0&&cur_pos<1000000&&prefetched_array[cur_pos].load()){ + fetched_++; + return prefetch_array[cur_pos]; + } + else{ + unfetched_++; + return GetAndParseTrueValue(iter_); + } } Status status() const override { return iter_->status(); @@ -64,9 +80,10 @@ class DBPreFetchIter : public Iterator { void SeekToLast() override; private: - Slice GetAndParseTrueValue(){ - Slice tmp_value=iter_->value(); + Slice GetAndParseTrueValue(Iterator* iter)const{ + Slice tmp_value=iter->value(); Slice key; + if(tmp_value.size()==0)return tmp_value; if(tmp_value.data()[0]==0x00){ tmp_value.remove_prefix(1); return tmp_value; @@ -81,257 +98,146 @@ class DBPreFetchIter : public Iterator { return tmp_value; } - void prefetch_left(){ - prefetch_mutex.AssertHeld(); - left_value_=GetAndParseTrueValue(); - left_valid_=true; - prefetch_mutex.Unlock(); - } - - void prefetch_right(){ - prefetch_mutex.AssertHeld(); - right_value_=GetAndParseTrueValue(); - right_valid_=true; - prefetch_mutex.Unlock(); - } - - port::Mutex prefetch_mutex; - DBImpl* db_; - Iterator* const iter_; - Slice left_value_; - Slice current_value_; - Slice right_value_; - bool left_valid_=false; - bool current_valid_=false; - bool right_valid_=false; - IterPos iter_pos_; -}; -void DBPreFetchIter::Next() { -prefetch_mutex.Lock(); -prefetch_mutex.Unlock(); -assert(iter_->Valid()); - if(iter_pos_==IterPos::Left){ - - iter_->Next(); - assert(current_valid_); - left_value_=current_value_; - left_valid_=true; - - iter_->Next(); - if(!iter_->Valid()){ - iter_pos_=IterPos::Mid; - current_valid_=false; - return; - } - assert(right_valid_); - current_value_=right_value_; - current_valid_=true; - - iter_->Next(); - if(!iter_->Valid()){ - //back to last - iter_->SeekToLast(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - right_valid_=false; - return; + void PreFetchThreadForward(){ + std::thread prefetch_threads[prefetch_num_]; + for(int i=0;iValid())prefetch_iter_->Next(); + } + while(prefetch_iter_->Valid()){ + Slice key_=prefetch_iter_->key(); + std::string str_key=std::string(key_.data(),key_.size()); + Slice val=GetAndParseTrueValue(prefetch_iter_); + prefetch_array[pos]=val; + prefetched_array[pos].store(true); + if(stop_flag.load()||pos>1000000){ + break; + } + for(int j=0;jValid())prefetch_iter_->Next(); + } + pos+=prefetch_num_+1; + } + }); } - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_right(); - }).detach(); - iter_pos_=IterPos::Right; - } - - else if(iter_pos_==IterPos::Mid){ - assert(current_valid_); - left_value_=current_value_; - left_valid_=true; - - iter_->Next(); - if(!iter_->Valid()){ - iter_pos_=IterPos::Mid; - current_valid_=false; - return; + for (auto& thread : prefetch_threads) { + if (thread.joinable()) { + thread.join(); + } } - if(right_valid_)current_value_=right_value_; - else current_value_=GetAndParseTrueValue(); - current_valid_=true; + - iter_->Next(); - if(!iter_->Valid()){ - //back to last - iter_->SeekToLast(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - right_valid_=false; - return; - } - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_right(); - }).detach(); - iter_pos_=IterPos::Right; } - else if(iter_pos_==IterPos::Right){ - assert(current_valid_); - left_value_=current_value_; - left_valid_=true; - - assert(right_valid_); - current_value_=right_value_; - current_valid_=true; + void PreFetchThreadBackward(){ + std::thread prefetch_threads[prefetch_num_]; + for(int i=0;iValid())prefetch_iter_->Prev(); + } + while(prefetch_iter_->Valid()){ + Slice key_=prefetch_iter_->key(); + std::string str_key=std::string(key_.data(),key_.size()); + Slice val=GetAndParseTrueValue(prefetch_iter_); + if(stop_flag.load()||pos<0){ + break; + } + prefetch_array[pos]=val; + prefetched_array[pos].store(true); + for(int j=0;jValid())prefetch_iter_->Prev(); + } + pos-=prefetch_num_+1; + } + }); + } - iter_->Next(); - if(!iter_->Valid()){ - //back to last - iter_->SeekToLast(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - right_valid_=false; - return; + for (auto& thread : prefetch_threads) { + if (thread.joinable()) { + thread.join(); + } } - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_right(); - }).detach(); - iter_pos_=IterPos::Right; } + + DBImpl* db_; + Iterator* const iter_; + std::vector const prefetch_iters_; + int prefetch_num_; + IterPos iter_pos_; + std::atomic stop_flag; + Slice prefetch_array[1000005]; + std::atomic prefetched_array[1000005]; + std::thread prefetch_thread; + int cur_pos=0; + mutable int fetched_=0; + mutable int unfetched_=0; +}; +void DBPreFetchIter::Next() { + iter_->Next();cur_pos++; } void DBPreFetchIter::Prev() { -prefetch_mutex.Lock(); -prefetch_mutex.Unlock(); -assert(iter_->Valid()); - if(iter_pos_==IterPos::Left){ - assert(current_valid_); - right_value_=current_value_; - right_valid_=true; - - assert(left_valid_); - current_value_=left_value_; - current_valid_=true; - - iter_->Prev(); - if(!iter_->Valid()){ - //back to first - iter_->SeekToFirst(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - left_valid_=false; - return; - } - - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_left(); - }).detach(); - iter_pos_=IterPos::Left; - } - - else if(iter_pos_==IterPos::Mid){ - assert(current_valid_); - right_value_=current_value_; - right_valid_=true; - - iter_->Prev(); - if(!iter_->Valid()){ - iter_pos_=IterPos::Mid; - current_valid_=false; - return; - } - if(left_valid_)current_value_=left_value_; - else current_value_=GetAndParseTrueValue(); - current_valid_=true; - - iter_->Prev(); - if(!iter_->Valid()){ - //back to first - iter_->SeekToFirst(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - left_valid_=false; - return; - } - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_left(); - }).detach(); - iter_pos_=IterPos::Left; - } - - else if(iter_pos_==IterPos::Right){ - iter_->Prev(); - assert(current_valid_); - right_value_=current_value_; - - iter_->Prev(); - if(!iter_->Valid()){ - iter_pos_=IterPos::Mid; - current_valid_=false; - return; - } - current_valid_=true; - assert(left_valid_); - current_value_=left_value_; - - iter_->Prev(); - if(!iter_->Valid()){ - //back to first - iter_->SeekToFirst(); - assert(iter_->Valid()); - iter_pos_=IterPos::Mid; - left_valid_=false; - return; - } - prefetch_mutex.Lock(); - std::thread([this]() { - prefetch_left(); - }).detach(); - iter_pos_=IterPos::Left; - } + iter_->Prev();cur_pos--; } void DBPreFetchIter::Seek(const Slice& target) { iter_->Seek(target); - left_valid_=false; - right_valid_=false; - current_valid_=false; - iter_pos_=IterPos::Mid; - if(iter_->Valid()){ - current_value_=GetAndParseTrueValue(); - current_valid_=true; + cur_pos=0; + for(auto prefetch_iter_:prefetch_iters_) + prefetch_iter_->Seek(target); + if(prefetch_thread.joinable()){ + stop_flag.store(true); + prefetch_thread.join(); + for(int i=0;i<1000000;i++)prefetched_array[i]=false; + stop_flag=false; } + prefetch_thread=std::thread([this]() { + PreFetchThreadForward(); + }); } void DBPreFetchIter::SeekToFirst() { iter_->SeekToFirst(); - left_valid_=false; - right_valid_=false; - iter_pos_=IterPos::Mid; - if(iter_->Valid()){ - current_valid_=true; - current_value_=GetAndParseTrueValue(); + cur_pos=0; + for(auto prefetch_iter_:prefetch_iters_) + prefetch_iter_->SeekToFirst(); + if(prefetch_thread.joinable()){ + stop_flag.store(true); + prefetch_thread.join(); + for(int i=0;i<1000000;i++)prefetched_array[i]=false; + stop_flag=false; + } + prefetch_thread=std::thread([this]() { + PreFetchThreadForward(); + }); } - else current_valid_=false; - -} void DBPreFetchIter::SeekToLast() { iter_->SeekToLast(); - left_valid_=false; - right_valid_=false; - iter_pos_=IterPos::Mid; - if(iter_->Valid()){ - current_valid_=true; - current_value_=GetAndParseTrueValue(); + cur_pos=1000000; + for(auto prefetch_iter_:prefetch_iters_) + prefetch_iter_->SeekToLast(); + if(prefetch_thread.joinable()){ + stop_flag.store(true); + prefetch_thread.join(); + for(int i=0;i<1000000;i++)prefetched_array[i]=false; + stop_flag=false; } - else current_valid_=false; + prefetch_thread=std::thread([this]() { + PreFetchThreadBackward(); + }); } } // anonymous namespace -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter) { - return new DBPreFetchIter(db,db_iter); +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,std::vector prefetch_iters,int prefetch_num) { + return new DBPreFetchIter(db,db_iter,prefetch_iters,prefetch_num); } } // namespace leveldb diff --git a/db/prefetch_iter.h b/db/prefetch_iter.h index 74ca618..74e8aec 100644 --- a/db/prefetch_iter.h +++ b/db/prefetch_iter.h @@ -15,7 +15,7 @@ namespace leveldb { class DBImpl; // add a prefetch function for db_iter -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter); +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,std::vector prefetch_iter,int prefetch_num); } // namespace leveldb