diff --git a/db/db_impl.cc b/db/db_impl.cc index ab747ba..1f4ec83 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1263,19 +1263,17 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; int iter_num=24; - std::vector iters(iter_num, nullptr); - for(int i=0;i(options.snapshot) - ->sequence_number() - : latest_snapshot), - seed); - iters[i]=db_iter; - } - - Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); + Iterator* iter_prefetch = NewInternalIterator(options, &latest_snapshot, &seed); + auto db_iter_prefetch=NewDBIterator(this, user_comparator(), iter_prefetch, + (options.snapshot != nullptr + ? static_cast(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); + + SequenceNumber useless_snapshot; + + Iterator* iter = NewInternalIterator(options, &useless_snapshot, &seed); auto db_iter=NewDBIterator(this, user_comparator(), iter, (options.snapshot != nullptr ? static_cast(options.snapshot) @@ -1284,7 +1282,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { seed); - return NewPreFetchIterator(this,db_iter,iters,iter_num); + return NewPreFetchIterator(this,db_iter,db_iter_prefetch,iter_num); } void DBImpl::RecordReadSample(Slice key) { @@ -1705,6 +1703,7 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, // Now seek to the actual key position and read the key inFile.seekg(offset + sizeof(uint64_t)); + if(key_len>10000)assert(0); char* key_buf = new char[key_len]; inFile.read(key_buf, key_len); if (!inFile.good()) { diff --git a/db/db_impl.h b/db/db_impl.h index a997387..369efb1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -230,6 +230,7 @@ class DBImpl : public DB { uint64_t valuelogfile_number_; log::Writer* log_; std::map oldvaluelog_ids; + std::map mem_valuelogs; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. diff --git a/db/dbformat.h b/db/dbformat.h index 97cd1bb..2466307 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -45,7 +45,9 @@ static const int kMaxMemCompactLevel = 2; static const int kReadBytesPeriod = 1048576; // maximum size of value_log file -static const int value_log_size=4<<24; +static const int value_log_size=64<<20; + +static const int mem_value_log_number=(2<<30)/(value_log_size); } // namespace config diff --git a/db/prefetch_iter.cc b/db/prefetch_iter.cc index 6c868ab..5f237cf 100644 --- a/db/prefetch_iter.cc +++ b/db/prefetch_iter.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include "db/prefetch_iter.h" #include "db/db_impl.h" @@ -37,9 +38,9 @@ class DBPreFetchIter : public Iterator { // just before all entries whose user key == this->key(). enum IterPos {Left,Mid,Right}; - DBPreFetchIter(DBImpl* db, Iterator* iter, std::vector prefetch_iters,int prefetch_num) + DBPreFetchIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num) : - db_(db),iter_(iter),prefetch_iters_(prefetch_iters),prefetch_num_(prefetch_num) {} + db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {} DBPreFetchIter(const DBPreFetchIter&) = delete; DBPreFetchIter& operator=(const DBPreFetchIter&) = delete; @@ -48,10 +49,9 @@ class DBPreFetchIter : public Iterator { if(prefetch_thread.joinable()){ stop_flag.store(true); prefetch_thread.join(); - for(auto iter:prefetch_iters_){ - delete iter; - } + delete prefetch_iter_; } + else delete prefetch_iter_; std::cout<<"fetch:"<key(); } Slice value() const override { - if(cur_pos>0&&cur_pos<1000000&&prefetched_array[cur_pos].load()){ + if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){ fetched_++; return prefetch_array[cur_pos]; } else{ unfetched_++; - return GetAndParseTrueValue(iter_); + return GetAndParseTrueValue(iter_->value()); } } Status status() const override { @@ -80,86 +80,153 @@ class DBPreFetchIter : public Iterator { void SeekToLast() override; private: - Slice GetAndParseTrueValue(Iterator* iter)const{ - Slice tmp_value=iter->value(); + Slice GetAndParseTrueValue(Slice tmp_value)const{ Slice key; - if(tmp_value.size()==0)return tmp_value; - if(tmp_value.data()[0]==0x00){ + if(tmp_value.size()==0){ + return Slice(); + } + if(tmp_value.data()[0]==(char)(0x00)){ tmp_value.remove_prefix(1); - return tmp_value; + char* s=new char[tmp_value.size()]; + memcpy(s,tmp_value.data(),tmp_value.size()); + return Slice(s,tmp_value.size()); } tmp_value.remove_prefix(1); - uint64_t file_id,valuelog_offset,valuelog_len; + uint64_t file_id,valuelog_offset; bool res=GetVarint64(&tmp_value,&file_id); if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); + + Status s=db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); + if(!s.ok()){ + std::cout<> q; + port::Mutex* lock=new port::Mutex(); + port::CondVar* cv=new port::CondVar(lock); + bool local_stop_flag=false; + int remaining_task_cnt=0; + bool main_finish=false; for(int i=0;iValid())prefetch_iter_->Next(); - } - while(prefetch_iter_->Valid()){ - Slice key_=prefetch_iter_->key(); - std::string str_key=std::string(key_.data(),key_.size()); - Slice val=GetAndParseTrueValue(prefetch_iter_); - prefetch_array[pos]=val; - prefetched_array[pos].store(true); - if(stop_flag.load()||pos>1000000){ - break; - } - for(int j=0;jValid())prefetch_iter_->Next(); + prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]() + { + Slice val; + int pos; + while(1){ + lock->Lock(); + while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ + cv->Wait(); + } + if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ + cv->SignalAll(); + lock->Unlock(); + break; + } + std::string s=q.front().first; + pos=q.front().second; + q.pop(); + remaining_task_cnt--; + lock->Unlock(); + val=GetAndParseTrueValue(s); + prefetch_array[pos]=val; + prefetched_array[pos].store(true); } - pos+=prefetch_num_+1; } - }); + ); + } + Slice val; + int pos=0; + for(int i=0;i<100&&prefetch_iter_->Valid();i++){ + prefetch_iter_->Next(); + pos++; } + for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){ + val=prefetch_iter_->value(); + lock->Lock(); + q.push({std::string(val.data(),val.size()),pos}); + cv->Signal(); + remaining_task_cnt++; + lock->Unlock(); + pos++; + } + + lock->Lock(); + main_finish=true; + while(remaining_task_cnt){ + cv->Wait(); + } + lock->Unlock(); + cv->SignalAll(); for (auto& thread : prefetch_threads) { if (thread.joinable()) { thread.join(); } } - - } void PreFetchThreadBackward(){ std::thread prefetch_threads[prefetch_num_]; + std::queue> q; + port::Mutex* lock=new port::Mutex(); + port::CondVar* cv=new port::CondVar(lock); + bool local_stop_flag=false; + int remaining_task_cnt=0; + bool main_finish=false; for(int i=0;iValid())prefetch_iter_->Prev(); - } - while(prefetch_iter_->Valid()){ - Slice key_=prefetch_iter_->key(); - std::string str_key=std::string(key_.data(),key_.size()); - Slice val=GetAndParseTrueValue(prefetch_iter_); - if(stop_flag.load()||pos<0){ - break; - } - prefetch_array[pos]=val; - prefetched_array[pos].store(true); - for(int j=0;jValid())prefetch_iter_->Prev(); + prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]() + { + Slice val; + int pos; + while(1){ + lock->Lock(); + while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ + cv->Wait(); + } + if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ + cv->SignalAll(); + lock->Unlock(); + break; + } + std::string s=q.front().first; + pos=q.front().second; + q.pop(); + remaining_task_cnt--; + lock->Unlock(); + val=GetAndParseTrueValue(s); + prefetch_array[pos]=val; + prefetched_array[pos].store(true); } - pos-=prefetch_num_+1; } - }); + ); } + Slice val; + int pos=1000000; + for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){ + val=prefetch_iter_->value(); + lock->Lock(); + q.push({std::string(val.data(),val.size()),pos}); + cv->Signal(); + remaining_task_cnt++; + lock->Unlock(); + pos--; + } + + lock->Lock(); + main_finish=true; + while(remaining_task_cnt){ + cv->Wait(); + } + lock->Unlock(); + cv->SignalAll(); for (auto& thread : prefetch_threads) { if (thread.joinable()) { @@ -171,9 +238,8 @@ class DBPreFetchIter : public Iterator { DBImpl* db_; Iterator* const iter_; - std::vector const prefetch_iters_; + Iterator* const prefetch_iter_; int prefetch_num_; - IterPos iter_pos_; std::atomic stop_flag; Slice prefetch_array[1000005]; std::atomic prefetched_array[1000005]; @@ -191,15 +257,15 @@ void DBPreFetchIter::Prev() { void DBPreFetchIter::Seek(const Slice& target) { iter_->Seek(target); - cur_pos=0; - for(auto prefetch_iter_:prefetch_iters_) - prefetch_iter_->Seek(target); + if(prefetch_thread.joinable()){ stop_flag.store(true); prefetch_thread.join(); - for(int i=0;i<1000000;i++)prefetched_array[i]=false; stop_flag=false; } + for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + cur_pos=0; + prefetch_iter_->Seek(target); prefetch_thread=std::thread([this]() { PreFetchThreadForward(); }); @@ -207,37 +273,38 @@ void DBPreFetchIter::Seek(const Slice& target) { void DBPreFetchIter::SeekToFirst() { iter_->SeekToFirst(); - cur_pos=0; - for(auto prefetch_iter_:prefetch_iters_) - prefetch_iter_->SeekToFirst(); + if(prefetch_thread.joinable()){ stop_flag.store(true); prefetch_thread.join(); - for(int i=0;i<1000000;i++)prefetched_array[i]=false; stop_flag=false; } + for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + cur_pos=0; + prefetch_iter_->SeekToFirst(); prefetch_thread=std::thread([this]() { PreFetchThreadForward(); }); } void DBPreFetchIter::SeekToLast() { iter_->SeekToLast(); - cur_pos=1000000; - for(auto prefetch_iter_:prefetch_iters_) - prefetch_iter_->SeekToLast(); + if(prefetch_thread.joinable()){ stop_flag.store(true); prefetch_thread.join(); - for(int i=0;i<1000000;i++)prefetched_array[i]=false; stop_flag=false; } + for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + cur_pos=1000000; + prefetch_thread=std::thread([this]() { + prefetch_iter_->SeekToLast(); PreFetchThreadBackward(); }); } } // anonymous namespace -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,std::vector prefetch_iters,int prefetch_num) { - return new DBPreFetchIter(db,db_iter,prefetch_iters,prefetch_num); +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,Iterator* prefetch_iter,int prefetch_num) { + return new DBPreFetchIter(db,db_iter,prefetch_iter,prefetch_num); } } // namespace leveldb diff --git a/db/prefetch_iter.h b/db/prefetch_iter.h index 74e8aec..133a773 100644 --- a/db/prefetch_iter.h +++ b/db/prefetch_iter.h @@ -15,7 +15,7 @@ namespace leveldb { class DBImpl; // add a prefetch function for db_iter -Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter,std::vector prefetch_iter,int prefetch_num); +Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter, Iterator* prefetch_iter,int prefetch_num); } // namespace leveldb diff --git a/test/test.cpp b/test/test.cpp index 08ec745..675fd46 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -43,9 +43,6 @@ TEST(Test, checkIterator) { for(int i=0;i<5000;i++){ assert(iter->Valid()); auto value=iter->value(); - if(value!=values[i]){ - std::cout<Next(); } @@ -54,9 +51,6 @@ TEST(Test, checkIterator) { for(int i=4999;i>=0;i--){ assert(iter->Valid()); auto value=iter->value(); - if(value!=values[i]){ - std::cout<Prev(); } @@ -65,9 +59,6 @@ TEST(Test, checkIterator) { for(int i=4990;i<5000;i++){ assert(iter->Valid()); auto value=iter->value(); - if(value!=values[i]){ - std::cout<Next(); } @@ -76,76 +67,76 @@ TEST(Test, checkIterator) { delete db; } -// TEST(Test, CheckGetFields) { -// DB *db; -// WriteOptions writeOptions; -// ReadOptions readOptions; -// if(OpenDB("testdb_for_XOY", &db).ok() == false) { -// std::cerr << "open db failed" << std::endl; -// abort(); -// } -// std::string key1 = "k_1"; +TEST(Test, CheckGetFields) { + DB *db; + WriteOptions writeOptions; + ReadOptions readOptions; + if(OpenDB("testdb_for_XOY", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + std::string key1 = "k_1"; -// FieldArray fields1 = { -// {"name", "Customer#000000001"}, -// {"address", "IVhzIApeRb"}, -// {"phone", "25-989-741-2988"} -// }; + FieldArray fields1 = { + {"name", "Customer#000000001"}, + {"address", "IVhzIApeRb"}, + {"phone", "25-989-741-2988"} + }; -// auto value1=SerializeValue(fields1); + auto value1=SerializeValue(fields1); -// db->Put(WriteOptions(), key1, value1); + db->Put(WriteOptions(), key1, value1); -// std::string value_ret; -// FieldArray res1; + std::string value_ret; + FieldArray res1; -// db->Get(ReadOptions(), key1, &value_ret); -// DeserializeValue(value_ret, &res1); -// for(auto pr:res1){ -// std::cout<Get(ReadOptions(), key1, &value_ret); + DeserializeValue(value_ret, &res1); + for(auto pr:res1){ + std::cout<Delete(WriteOptions(),key1); + db->Delete(WriteOptions(),key1); -// std::cout<<"get serialized value done"< keys; -// std::vector target_keys; -// for(int i=0;i<10000;i++){ -// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys -// FieldArray fields={ -// {"name", key}, -// {"address", std::to_string(rand()%7)}, -// {"phone", std::to_string(rand()%114514)} -// }; -// if(rand()%5==0){ -// fields[0].second="special_key"; -// target_keys.push_back(key); -// } -// keys.push_back(key); -// db->Put(WriteOptions(),key,SerializeValue(fields)); -// } -// std::sort(target_keys.begin(),target_keys.end()); -// std::vector key_res; -// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); -// ASSERT_TRUE(CompareKey(key_res, target_keys)); -// std::cout<<"get key by field done"<Delete(WriteOptions(),s); -// } -// delete db; -// } + std::cout<<"get serialized value done"< keys; + std::vector target_keys; + for(int i=0;i<10000;i++){ + std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys + FieldArray fields={ + {"name", key}, + {"address", std::to_string(rand()%7)}, + {"phone", std::to_string(rand()%114514)} + }; + if(rand()%5==0){ + fields[0].second="special_key"; + target_keys.push_back(key); + } + keys.push_back(key); + db->Put(WriteOptions(),key,SerializeValue(fields)); + } + std::sort(target_keys.begin(),target_keys.end()); + std::vector key_res; + Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); + ASSERT_TRUE(CompareKey(key_res, target_keys)); + std::cout<<"get key by field done"<Delete(WriteOptions(),s); + } + delete db; +} TEST(Test, LARGE_DATA_COMPACT_TEST) { DB *db; @@ -179,42 +170,42 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { delete db; } -// TEST(Test, Garbage_Collect_TEST) { -// DB *db; -// WriteOptions writeOptions; -// ReadOptions readOptions; -// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { -// std::cerr << "open db failed" << std::endl; -// abort(); -// } -// std::vector values; -// for(int i=0;i<5000;i++){ -// std::string key=std::to_string(i); -// std::string value; -// for(int j=0;j<1000;j++){ -// value+=std::to_string(i); -// } -// values.push_back(value); -// db->Put(writeOptions,key,value); -// } -// std::cout<<"start gc"<TEST_GarbageCollect(); -// std::cout<<"finish gc"<Get(readOptions,key,&value); -// assert(s.ok()); -// if(values[i]!=value){ -// std::cout< values; + for(int i=0;i<5000;i++){ + std::string key=std::to_string(i); + std::string value; + for(int j=0;j<1000;j++){ + value+=std::to_string(i); + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + std::cout<<"start gc"<TEST_GarbageCollect(); + std::cout<<"finish gc"<Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout<