diff --git a/CMakeLists.txt b/CMakeLists.txt index d5a57ed..8e776d2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) # This project requires C++17. - set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index 1f4ec83..3e2879e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1243,19 +1243,19 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice = Slice(value->c_str() + 1, value->length()); Slice new_key; - Slice new_value; int value_offset = sizeof(uint64_t) * 2; // 16 uint64_t file_id, valuelog_offset; bool res = GetVarint64(&value_log_slice, &file_id); if (!res) return Status::Corruption("can't decode file id"); res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); - s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); - if (!s.ok()) { - return s; + + { + mutex_.Unlock(); + s = ReadValueLog(file_id, valuelog_offset, &new_key, value); + mutex_.Lock(); } - *value = std::string(new_value.data(), new_value.size()); - delete[] new_value.data(); + return s; } @@ -1674,82 +1674,79 @@ void DBImpl::addNewValueLog() { } Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, - Slice* value) { + std::string* value) { + mutex_.Lock(); + if(file_id==valuelogfile_number_){ + mutex_.Unlock(); + std::string file_name_ = ValueLogFileName(dbname_, file_id); + std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); + uint64_t key_len,value_len; + inFile.seekg(offset); + inFile.read((char*)(&key_len),sizeof(uint64_t)); + + char* key_buf=new char[key_len]; + inFile.read(key_buf,key_len); + *key=Slice(key_buf,key_len); + + inFile.read((char*)(&value_len),sizeof(uint64_t)); + + char buf[value_len]; + inFile.read(buf,value_len); + *value=std::string(buf,value_len); + return Status::OK(); + } + + mutex_.Unlock(); + + Status s = Status::OK(); - std::string file_name_ = ValueLogFileName(dbname_, file_id); - - // Open the file in binary mode for reading - std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); - if (!inFile.is_open()) { - std::cerr << "Failed to open file: " << file_name_ << " for read valuelog!" - << std::endl; - return Status::Corruption("Failed to open file for reading!"); - } - - // Seek to the position of key length - inFile.seekg(offset); - - // Read the length of the key - char* key_buf_len = new char[sizeof(uint64_t)]; - inFile.read(key_buf_len, sizeof(uint64_t)); - uint64_t key_len = 0; - std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); - - if (!inFile.good()) { - delete[] key_buf_len; - inFile.close(); - return Status::Corruption("Failed to read key length from file!"); - } - - // Now seek to the actual key position and read the key - inFile.seekg(offset + sizeof(uint64_t)); - if(key_len>10000)assert(0); - char* key_buf = new char[key_len]; - inFile.read(key_buf, key_len); - if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; - inFile.close(); - return Status::Corruption("Failed to read key from file!"); - } - - // Assign the read key data to the Slice - *key = Slice(key_buf, key_len); - - // Read the length of the value - inFile.seekg(offset + sizeof(uint64_t) + key_len); - char* value_buf_len = new char[sizeof(uint64_t)]; - inFile.read(value_buf_len, sizeof(uint64_t)); - uint64_t val_len = 0; - std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); - - if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; - delete[] value_buf_len; - inFile.close(); - return Status::Corruption("Failed to read value length from file!"); - } - - // Now seek to the actual data position and read the value - inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); - char* value_buf = new char[val_len]; - inFile.read(value_buf, val_len); - if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; - delete[] value_buf_len; - delete[] value_buf; - inFile.close(); - return Status::Corruption("Failed to read value from file!"); - } - - // Close the file after reading - inFile.close(); - - // Assign the read value data to the Slice - *value = Slice(value_buf, val_len); + leveldb::RandomAccessFile* valuelog_file; + mem_valuelog_mutex.lock_shared(); + if(mem_valuelogs.count(file_id)){ + valuelog_file=mem_valuelogs[file_id].file; + //mem_valuelogs[file_id].ref++; + mem_valuelog_mutex.unlock_shared(); + } + else{ + mem_valuelog_mutex.unlock_shared(); + std::string file_name_ = ValueLogFileName(dbname_, file_id); + env_->NewRandomAccessFile(file_name_,&valuelog_file); + mem_valuelog tmp; + tmp.file=valuelog_file; + tmp.ref=1; + mem_valuelog_mutex.lock(); + mem_valuelogs[file_id]=tmp; + mem_valuelog_mutex.unlock(); + } + + char buf[sizeof(uint64_t)]; + Slice res; + s=valuelog_file->Read(offset,sizeof(uint64_t),&res,buf); + assert(s.ok()); + uint64_t key_len=*(uint64_t*)(res.data()); + char*key_buf=new char[key_len]; + + s=valuelog_file->Read(offset+sizeof(uint64_t),key_len,&res,key_buf); + assert(s.ok()); + *key=Slice(key_buf,key_len); + + s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len,sizeof(uint64_t),&res,buf); + assert(s.ok()); + uint64_t value_len=*(uint64_t*)(res.data()); + + char value_buf[value_len]; + s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len+sizeof(uint64_t),value_len,&res,value_buf); + assert(s.ok()); + *value=std::string(res.data(),res.size()); + + // mem_valuelog_mutex.Lock(); + // mem_valuelogs[file_id].ref--; + // if(mem_valuelogs.size()>100&&mem_valuelogs[file_id].ref==0){ + // delete mem_valuelogs[file_id].file; + // mem_valuelogs.erase(file_id); + // } + // mem_valuelog_mutex.Unlock(); return s; } diff --git a/db/db_impl.h b/db/db_impl.h index 369efb1..5a81f3e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -11,9 +11,10 @@ #include #include #include +#include #include -#include #include +#include #include #include "leveldb/db.h" @@ -71,10 +72,8 @@ class DBImpl : public DB { void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); ; std::pair getNewValuelog(); // use for compaction - // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* - // value)override; Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, - Slice* value) override; + std::string* value) override; // Extra methods (for testing) that are not in the public DB interface @@ -230,7 +229,14 @@ class DBImpl : public DB { uint64_t valuelogfile_number_; log::Writer* log_; std::map oldvaluelog_ids; - std::map mem_valuelogs; + + struct mem_valuelog{ + RandomAccessFile* file; + int ref=0; + }; + + std::shared_mutex mem_valuelog_mutex; + std::unordered_map mem_valuelogs; GUARDED_BY(mem_valuelog_mutex); uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. diff --git a/db/prefetch_iter.cc b/db/prefetch_iter.cc index 5f237cf..66d1337 100644 --- a/db/prefetch_iter.cc +++ b/db/prefetch_iter.cc @@ -46,13 +46,12 @@ class DBPreFetchIter : public Iterator { DBPreFetchIter& operator=(const DBPreFetchIter&) = delete; ~DBPreFetchIter() override { - if(prefetch_thread.joinable()){ - stop_flag.store(true); - prefetch_thread.join(); - delete prefetch_iter_; - } - else delete prefetch_iter_; - std::cout<<"fetch:"<Valid(); } @@ -60,14 +59,15 @@ class DBPreFetchIter : public Iterator { return iter_->key(); } Slice value() const override { - if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){ - fetched_++; - return prefetch_array[cur_pos]; - } - else{ - unfetched_++; - return GetAndParseTrueValue(iter_->value()); - } + // if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){ + // fetched_++; + // return prefetch_array[cur_pos]; + // } + // else{ + // unfetched_++; + buf_for_value=std::move(GetAndParseTrueValue(iter_->value())); + return Slice(buf_for_value.data(),buf_for_value.size()); + //} } Status status() const override { return iter_->status(); @@ -80,16 +80,14 @@ class DBPreFetchIter : public Iterator { void SeekToLast() override; private: - Slice GetAndParseTrueValue(Slice tmp_value)const{ + std::string GetAndParseTrueValue(Slice tmp_value)const{ Slice key; if(tmp_value.size()==0){ - return Slice(); + return ""; } if(tmp_value.data()[0]==(char)(0x00)){ tmp_value.remove_prefix(1); - char* s=new char[tmp_value.size()]; - memcpy(s,tmp_value.data(),tmp_value.size()); - return Slice(s,tmp_value.size()); + return std::string(tmp_value.data(),tmp_value.size()); } tmp_value.remove_prefix(1); uint64_t file_id,valuelog_offset; @@ -97,210 +95,205 @@ class DBPreFetchIter : public Iterator { if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - - Status s=db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); - if(!s.ok()){ - std::cout<ReadValueLog(file_id,valuelog_offset, &key, &str); + return str; } - void PreFetchThreadForward(){ - std::thread prefetch_threads[prefetch_num_]; - std::queue> q; - port::Mutex* lock=new port::Mutex(); - port::CondVar* cv=new port::CondVar(lock); - bool local_stop_flag=false; - int remaining_task_cnt=0; - bool main_finish=false; - for(int i=0;iLock(); - while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ - cv->Wait(); - } - if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ - cv->SignalAll(); - lock->Unlock(); - break; - } - std::string s=q.front().first; - pos=q.front().second; - q.pop(); - remaining_task_cnt--; - lock->Unlock(); - val=GetAndParseTrueValue(s); - prefetch_array[pos]=val; - prefetched_array[pos].store(true); - } - } - ); - } - Slice val; - int pos=0; - for(int i=0;i<100&&prefetch_iter_->Valid();i++){ - prefetch_iter_->Next(); - pos++; - } - for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){ - val=prefetch_iter_->value(); - lock->Lock(); - q.push({std::string(val.data(),val.size()),pos}); - cv->Signal(); - remaining_task_cnt++; - lock->Unlock(); - pos++; - } + // void PreFetchThreadForward(){ + // std::thread prefetch_threads[prefetch_num_]; + // std::queue> q; + // port::Mutex* lock=new port::Mutex(); + // port::CondVar* cv=new port::CondVar(lock); + // bool local_stop_flag=false; + // int remaining_task_cnt=0; + // bool main_finish=false; + // for(int i=0;iLock(); + // while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ + // cv->Wait(); + // } + // if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ + // cv->SignalAll(); + // lock->Unlock(); + // break; + // } + // std::string s=q.front().first; + // pos=q.front().second; + // q.pop(); + // remaining_task_cnt--; + // lock->Unlock(); + // prefetch_array[pos]=std::move(GetAndParseTrueValue(s)); + // prefetched_array[pos].store(true); + // } + // } + // ); + // } + // Slice val; + // int pos=0; + // for(int i=0;i<100&&prefetch_iter_->Valid();i++){ + // prefetch_iter_->Next(); + // pos++; + // } + // for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){ + // val=prefetch_iter_->value(); + // lock->Lock(); + // q.push({std::string(val.data(),val.size()),pos}); + // cv->Signal(); + // remaining_task_cnt++; + // lock->Unlock(); + // pos++; + // } - lock->Lock(); - main_finish=true; - while(remaining_task_cnt){ - cv->Wait(); - } - lock->Unlock(); - cv->SignalAll(); + // lock->Lock(); + // main_finish=true; + // while(remaining_task_cnt){ + // cv->Wait(); + // } + // lock->Unlock(); + // cv->SignalAll(); - for (auto& thread : prefetch_threads) { - if (thread.joinable()) { - thread.join(); - } - } - } + // for (auto& thread : prefetch_threads) { + // if (thread.joinable()) { + // thread.join(); + // } + // } + // } - void PreFetchThreadBackward(){ - std::thread prefetch_threads[prefetch_num_]; - std::queue> q; - port::Mutex* lock=new port::Mutex(); - port::CondVar* cv=new port::CondVar(lock); - bool local_stop_flag=false; - int remaining_task_cnt=0; - bool main_finish=false; - for(int i=0;iLock(); - while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ - cv->Wait(); - } - if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ - cv->SignalAll(); - lock->Unlock(); - break; - } - std::string s=q.front().first; - pos=q.front().second; - q.pop(); - remaining_task_cnt--; - lock->Unlock(); - val=GetAndParseTrueValue(s); - prefetch_array[pos]=val; - prefetched_array[pos].store(true); - } - } - ); - } - Slice val; - int pos=1000000; - for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){ - val=prefetch_iter_->value(); - lock->Lock(); - q.push({std::string(val.data(),val.size()),pos}); - cv->Signal(); - remaining_task_cnt++; - lock->Unlock(); - pos--; - } + // void PreFetchThreadBackward(){ + // std::thread prefetch_threads[prefetch_num_]; + // std::queue> q; + // port::Mutex* lock=new port::Mutex(); + // port::CondVar* cv=new port::CondVar(lock); + // bool local_stop_flag=false; + // int remaining_task_cnt=0; + // bool main_finish=false; + // for(int i=0;iLock(); + // while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){ + // cv->Wait(); + // } + // if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){ + // cv->SignalAll(); + // lock->Unlock(); + // break; + // } + // std::string s=q.front().first; + // pos=q.front().second; + // q.pop(); + // remaining_task_cnt--; + // lock->Unlock(); + // prefetch_array[pos]=std::move(GetAndParseTrueValue(s)); + // prefetched_array[pos].store(true); + // } + // } + // ); + // } + // Slice val; + // int pos=1000000; + // for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){ + // val=prefetch_iter_->value(); + // lock->Lock(); + // q.push({std::string(val.data(),val.size()),pos}); + // cv->Signal(); + // remaining_task_cnt++; + // lock->Unlock(); + // pos--; + // } - lock->Lock(); - main_finish=true; - while(remaining_task_cnt){ - cv->Wait(); - } - lock->Unlock(); - cv->SignalAll(); + // lock->Lock(); + // main_finish=true; + // while(remaining_task_cnt){ + // cv->Wait(); + // } + // lock->Unlock(); + // cv->SignalAll(); - for (auto& thread : prefetch_threads) { - if (thread.joinable()) { - thread.join(); - } - } - } + // for (auto& thread : prefetch_threads) { + // if (thread.joinable()) { + // thread.join(); + // } + // } + // } DBImpl* db_; Iterator* const iter_; Iterator* const prefetch_iter_; int prefetch_num_; - std::atomic stop_flag; - Slice prefetch_array[1000005]; - std::atomic prefetched_array[1000005]; + // std::atomic stop_flag; + // std::string prefetch_array[1000005]; + // std::atomic prefetched_array[1000005]; std::thread prefetch_thread; + mutable std::string buf_for_value; int cur_pos=0; mutable int fetched_=0; mutable int unfetched_=0; }; void DBPreFetchIter::Next() { - iter_->Next();cur_pos++; + iter_->Next(); + //cur_pos++; } void DBPreFetchIter::Prev() { - iter_->Prev();cur_pos--; + iter_->Prev(); + //cur_pos--; } void DBPreFetchIter::Seek(const Slice& target) { iter_->Seek(target); - if(prefetch_thread.joinable()){ - stop_flag.store(true); - prefetch_thread.join(); - stop_flag=false; - } - for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - cur_pos=0; - prefetch_iter_->Seek(target); - prefetch_thread=std::thread([this]() { - PreFetchThreadForward(); - }); + // if(prefetch_thread.joinable()){ + // stop_flag.store(true); + // prefetch_thread.join(); + // stop_flag=false; + // } + // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + // cur_pos=0; + // prefetch_iter_->Seek(target); + // prefetch_thread=std::thread([this]() { + // PreFetchThreadForward(); + // }); } void DBPreFetchIter::SeekToFirst() { iter_->SeekToFirst(); - if(prefetch_thread.joinable()){ - stop_flag.store(true); - prefetch_thread.join(); - stop_flag=false; - } - for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - cur_pos=0; - prefetch_iter_->SeekToFirst(); - prefetch_thread=std::thread([this]() { - PreFetchThreadForward(); - }); + // if(prefetch_thread.joinable()){ + // stop_flag.store(true); + // prefetch_thread.join(); + // stop_flag=false; + // } + // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + // cur_pos=0; + // prefetch_iter_->SeekToFirst(); + // prefetch_thread=std::thread([this]() { + // PreFetchThreadForward(); + // }); } void DBPreFetchIter::SeekToLast() { iter_->SeekToLast(); - if(prefetch_thread.joinable()){ - stop_flag.store(true); - prefetch_thread.join(); - stop_flag=false; - } - for(int i=0;i<=1000000;i++)prefetched_array[i]=false; - cur_pos=1000000; + // if(prefetch_thread.joinable()){ + // stop_flag.store(true); + // prefetch_thread.join(); + // stop_flag=false; + // } + // for(int i=0;i<=1000000;i++)prefetched_array[i]=false; + // cur_pos=1000000; - prefetch_thread=std::thread([this]() { - prefetch_iter_->SeekToLast(); - PreFetchThreadBackward(); - }); + // prefetch_thread=std::thread([this]() { + // prefetch_iter_->SeekToLast(); + // PreFetchThreadBackward(); + // }); } } // anonymous namespace diff --git a/include/leveldb/db.h b/include/leveldb/db.h index cefa79c..03915db 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB { // assert(0); // Not implemented // return Status::Corruption("not imp"); // } - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){ + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, std::string* value){ assert(0); // Not implemented return Status::Corruption("not imp"); }