Browse Source

no prefetch and use bufferpool instead, read seq speed up to 2000MB/S

pull/3/head
dgy 9 months ago
parent
commit
ad1c12a33c
5 changed files with 270 additions and 274 deletions
  1. +1
    -1
      CMakeLists.txt
  2. +77
    -80
      db/db_impl.cc
  3. +11
    -5
      db/db_impl.h
  4. +180
    -187
      db/prefetch_iter.cc
  5. +1
    -1
      include/leveldb/db.h

+ 1
- 1
CMakeLists.txt View File

@ -18,7 +18,7 @@ endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD)
# This project requires C++17.
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD)

+ 77
- 80
db/db_impl.cc View File

@ -1243,19 +1243,19 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
Slice value_log_slice = Slice(value->c_str() + 1, value->length());
Slice new_key;
Slice new_value;
int value_offset = sizeof(uint64_t) * 2; // 16
uint64_t file_id, valuelog_offset;
bool res = GetVarint64(&value_log_slice, &file_id);
if (!res) return Status::Corruption("can't decode file id");
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
if (!s.ok()) {
return s;
{
mutex_.Unlock();
s = ReadValueLog(file_id, valuelog_offset, &new_key, value);
mutex_.Lock();
}
*value = std::string(new_value.data(), new_value.size());
delete[] new_value.data();
return s;
}
@ -1674,82 +1674,79 @@ void DBImpl::addNewValueLog() {
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) {
std::string* value) {
mutex_.Lock();
if(file_id==valuelogfile_number_){
mutex_.Unlock();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
uint64_t key_len,value_len;
inFile.seekg(offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
char* key_buf=new char[key_len];
inFile.read(key_buf,key_len);
*key=Slice(key_buf,key_len);
inFile.read((char*)(&value_len),sizeof(uint64_t));
char buf[value_len];
inFile.read(buf,value_len);
*value=std::string(buf,value_len);
return Status::OK();
}
mutex_.Unlock();
Status s = Status::OK();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
// Open the file in binary mode for reading
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) {
std::cerr << "Failed to open file: " << file_name_ << " for read valuelog!"
<< std::endl;
return Status::Corruption("Failed to open file for reading!");
}
// Seek to the position of key length
inFile.seekg(offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
inFile.read(key_buf_len, sizeof(uint64_t));
uint64_t key_len = 0;
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key length from file!");
}
// Now seek to the actual key position and read the key
inFile.seekg(offset + sizeof(uint64_t));
if(key_len>10000)assert(0);
char* key_buf = new char[key_len];
inFile.read(key_buf, key_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key from file!");
}
// Assign the read key data to the Slice
*key = Slice(key_buf, key_len);
// Read the length of the value
inFile.seekg(offset + sizeof(uint64_t) + key_len);
char* value_buf_len = new char[sizeof(uint64_t)];
inFile.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
inFile.close();
return Status::Corruption("Failed to read value length from file!");
}
// Now seek to the actual data position and read the value
inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t));
char* value_buf = new char[val_len];
inFile.read(value_buf, val_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
}
// Close the file after reading
inFile.close();
// Assign the read value data to the Slice
*value = Slice(value_buf, val_len);
leveldb::RandomAccessFile* valuelog_file;
mem_valuelog_mutex.lock_shared();
if(mem_valuelogs.count(file_id)){
valuelog_file=mem_valuelogs[file_id].file;
//mem_valuelogs[file_id].ref++;
mem_valuelog_mutex.unlock_shared();
}
else{
mem_valuelog_mutex.unlock_shared();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
env_->NewRandomAccessFile(file_name_,&valuelog_file);
mem_valuelog tmp;
tmp.file=valuelog_file;
tmp.ref=1;
mem_valuelog_mutex.lock();
mem_valuelogs[file_id]=tmp;
mem_valuelog_mutex.unlock();
}
char buf[sizeof(uint64_t)];
Slice res;
s=valuelog_file->Read(offset,sizeof(uint64_t),&res,buf);
assert(s.ok());
uint64_t key_len=*(uint64_t*)(res.data());
char*key_buf=new char[key_len];
s=valuelog_file->Read(offset+sizeof(uint64_t),key_len,&res,key_buf);
assert(s.ok());
*key=Slice(key_buf,key_len);
s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len,sizeof(uint64_t),&res,buf);
assert(s.ok());
uint64_t value_len=*(uint64_t*)(res.data());
char value_buf[value_len];
s=valuelog_file->Read(offset+sizeof(uint64_t)+key_len+sizeof(uint64_t),value_len,&res,value_buf);
assert(s.ok());
*value=std::string(res.data(),res.size());
// mem_valuelog_mutex.Lock();
// mem_valuelogs[file_id].ref--;
// if(mem_valuelogs.size()>100&&mem_valuelogs[file_id].ref==0){
// delete mem_valuelogs[file_id].file;
// mem_valuelogs.erase(file_id);
// }
// mem_valuelog_mutex.Unlock();
return s;
}

+ 11
- 5
db/db_impl.h View File

@ -11,9 +11,10 @@
#include <atomic>
#include <deque>
#include <map>
#include <unordered_map>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <set>
#include <string>
#include "leveldb/db.h"
@ -71,10 +72,8 @@ class DBImpl : public DB {
void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_);
;
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
// Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice*
// value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Slice* value) override;
std::string* value) override;
// Extra methods (for testing) that are not in the public DB interface
@ -230,7 +229,14 @@ class DBImpl : public DB {
uint64_t valuelogfile_number_;
log::Writer* log_;
std::map<uint64_t, uint64_t> oldvaluelog_ids;
std::map<uint64_t,RandomAccessFile*> mem_valuelogs;
struct mem_valuelog{
RandomAccessFile* file;
int ref=0;
};
std::shared_mutex mem_valuelog_mutex;
std::unordered_map<uint64_t,mem_valuelog> mem_valuelogs; GUARDED_BY(mem_valuelog_mutex);
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.

+ 180
- 187
db/prefetch_iter.cc View File

@ -46,13 +46,12 @@ class DBPreFetchIter : public Iterator {
DBPreFetchIter& operator=(const DBPreFetchIter&) = delete;
~DBPreFetchIter() override {
if(prefetch_thread.joinable()){
stop_flag.store(true);
prefetch_thread.join();
delete prefetch_iter_;
}
else delete prefetch_iter_;
std::cout<<"fetch:"<<fetched_<<" unfetch:"<<unfetched_<<"\n";
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// }
delete prefetch_iter_;
//std::cout<<"fetch:"<<fetched_<<" unfetch:"<<unfetched_<<"\n";
delete iter_;
}
bool Valid() const override { return iter_->Valid(); }
@ -60,14 +59,15 @@ class DBPreFetchIter : public Iterator {
return iter_->key();
}
Slice value() const override {
if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){
fetched_++;
return prefetch_array[cur_pos];
}
else{
unfetched_++;
return GetAndParseTrueValue(iter_->value());
}
// if(cur_pos>=0&&cur_pos<=1000000&&prefetched_array[cur_pos].load()){
// fetched_++;
// return prefetch_array[cur_pos];
// }
// else{
// unfetched_++;
buf_for_value=std::move(GetAndParseTrueValue(iter_->value()));
return Slice(buf_for_value.data(),buf_for_value.size());
//}
}
Status status() const override {
return iter_->status();
@ -80,16 +80,14 @@ class DBPreFetchIter : public Iterator {
void SeekToLast() override;
private:
Slice GetAndParseTrueValue(Slice tmp_value)const{
std::string GetAndParseTrueValue(Slice tmp_value)const{
Slice key;
if(tmp_value.size()==0){
return Slice();
return "";
}
if(tmp_value.data()[0]==(char)(0x00)){
tmp_value.remove_prefix(1);
char* s=new char[tmp_value.size()];
memcpy(s,tmp_value.data(),tmp_value.size());
return Slice(s,tmp_value.size());
return std::string(tmp_value.data(),tmp_value.size());
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset;
@ -97,210 +95,205 @@ class DBPreFetchIter : public Iterator {
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
Status s=db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
if(!s.ok()){
std::cout<<std::string(tmp_value.data(),tmp_value.size())<<std::endl;
assert(0);
}
return tmp_value;
std::string str;
Status s=db_->ReadValueLog(file_id,valuelog_offset, &key, &str);
return str;
}
void PreFetchThreadForward(){
std::thread prefetch_threads[prefetch_num_];
std::queue<std::pair<std::string,int>> q;
port::Mutex* lock=new port::Mutex();
port::CondVar* cv=new port::CondVar(lock);
bool local_stop_flag=false;
int remaining_task_cnt=0;
bool main_finish=false;
for(int i=0;i<prefetch_num_;i++){
prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
{
Slice val;
int pos;
while(1){
lock->Lock();
while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
cv->Wait();
}
if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
cv->SignalAll();
lock->Unlock();
break;
}
std::string s=q.front().first;
pos=q.front().second;
q.pop();
remaining_task_cnt--;
lock->Unlock();
val=GetAndParseTrueValue(s);
prefetch_array[pos]=val;
prefetched_array[pos].store(true);
}
}
);
}
Slice val;
int pos=0;
for(int i=0;i<100&&prefetch_iter_->Valid();i++){
prefetch_iter_->Next();
pos++;
}
for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){
val=prefetch_iter_->value();
lock->Lock();
q.push({std::string(val.data(),val.size()),pos});
cv->Signal();
remaining_task_cnt++;
lock->Unlock();
pos++;
}
// void PreFetchThreadForward(){
// std::thread prefetch_threads[prefetch_num_];
// std::queue<std::pair<std::string,int>> q;
// port::Mutex* lock=new port::Mutex();
// port::CondVar* cv=new port::CondVar(lock);
// bool local_stop_flag=false;
// int remaining_task_cnt=0;
// bool main_finish=false;
// for(int i=0;i<prefetch_num_;i++){
// prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
// {
// int pos;
// while(1){
// lock->Lock();
// while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
// cv->Wait();
// }
// if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
// cv->SignalAll();
// lock->Unlock();
// break;
// }
// std::string s=q.front().first;
// pos=q.front().second;
// q.pop();
// remaining_task_cnt--;
// lock->Unlock();
// prefetch_array[pos]=std::move(GetAndParseTrueValue(s));
// prefetched_array[pos].store(true);
// }
// }
// );
// }
// Slice val;
// int pos=0;
// for(int i=0;i<100&&prefetch_iter_->Valid();i++){
// prefetch_iter_->Next();
// pos++;
// }
// for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos<1000000;prefetch_iter_->Next()){
// val=prefetch_iter_->value();
// lock->Lock();
// q.push({std::string(val.data(),val.size()),pos});
// cv->Signal();
// remaining_task_cnt++;
// lock->Unlock();
// pos++;
// }
lock->Lock();
main_finish=true;
while(remaining_task_cnt){
cv->Wait();
}
lock->Unlock();
cv->SignalAll();
// lock->Lock();
// main_finish=true;
// while(remaining_task_cnt){
// cv->Wait();
// }
// lock->Unlock();
// cv->SignalAll();
for (auto& thread : prefetch_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
// for (auto& thread : prefetch_threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// }
void PreFetchThreadBackward(){
std::thread prefetch_threads[prefetch_num_];
std::queue<std::pair<std::string,int>> q;
port::Mutex* lock=new port::Mutex();
port::CondVar* cv=new port::CondVar(lock);
bool local_stop_flag=false;
int remaining_task_cnt=0;
bool main_finish=false;
for(int i=0;i<prefetch_num_;i++){
prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
{
Slice val;
int pos;
while(1){
lock->Lock();
while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
cv->Wait();
}
if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
cv->SignalAll();
lock->Unlock();
break;
}
std::string s=q.front().first;
pos=q.front().second;
q.pop();
remaining_task_cnt--;
lock->Unlock();
val=GetAndParseTrueValue(s);
prefetch_array[pos]=val;
prefetched_array[pos].store(true);
}
}
);
}
Slice val;
int pos=1000000;
for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){
val=prefetch_iter_->value();
lock->Lock();
q.push({std::string(val.data(),val.size()),pos});
cv->Signal();
remaining_task_cnt++;
lock->Unlock();
pos--;
}
// void PreFetchThreadBackward(){
// std::thread prefetch_threads[prefetch_num_];
// std::queue<std::pair<std::string,int>> q;
// port::Mutex* lock=new port::Mutex();
// port::CondVar* cv=new port::CondVar(lock);
// bool local_stop_flag=false;
// int remaining_task_cnt=0;
// bool main_finish=false;
// for(int i=0;i<prefetch_num_;i++){
// prefetch_threads[i]=std::thread([this,&q,&lock,&cv,&local_stop_flag,&remaining_task_cnt,&main_finish]()
// {
// int pos;
// while(1){
// lock->Lock();
// while(q.empty()&&!local_stop_flag&&!(remaining_task_cnt==0&&main_finish)){
// cv->Wait();
// }
// if(local_stop_flag||(remaining_task_cnt==0&&main_finish)){
// cv->SignalAll();
// lock->Unlock();
// break;
// }
// std::string s=q.front().first;
// pos=q.front().second;
// q.pop();
// remaining_task_cnt--;
// lock->Unlock();
// prefetch_array[pos]=std::move(GetAndParseTrueValue(s));
// prefetched_array[pos].store(true);
// }
// }
// );
// }
// Slice val;
// int pos=1000000;
// for(;prefetch_iter_->Valid()&&!stop_flag.load()&&pos>=0;prefetch_iter_->Prev()){
// val=prefetch_iter_->value();
// lock->Lock();
// q.push({std::string(val.data(),val.size()),pos});
// cv->Signal();
// remaining_task_cnt++;
// lock->Unlock();
// pos--;
// }
lock->Lock();
main_finish=true;
while(remaining_task_cnt){
cv->Wait();
}
lock->Unlock();
cv->SignalAll();
// lock->Lock();
// main_finish=true;
// while(remaining_task_cnt){
// cv->Wait();
// }
// lock->Unlock();
// cv->SignalAll();
for (auto& thread : prefetch_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
// for (auto& thread : prefetch_threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// }
DBImpl* db_;
Iterator* const iter_;
Iterator* const prefetch_iter_;
int prefetch_num_;
std::atomic<bool> stop_flag;
Slice prefetch_array[1000005];
std::atomic<bool> prefetched_array[1000005];
// std::atomic<bool> stop_flag;
// std::string prefetch_array[1000005];
// std::atomic<bool> prefetched_array[1000005];
std::thread prefetch_thread;
mutable std::string buf_for_value;
int cur_pos=0;
mutable int fetched_=0;
mutable int unfetched_=0;
};
void DBPreFetchIter::Next() {
iter_->Next();cur_pos++;
iter_->Next();
//cur_pos++;
}
void DBPreFetchIter::Prev() {
iter_->Prev();cur_pos--;
iter_->Prev();
//cur_pos--;
}
void DBPreFetchIter::Seek(const Slice& target) {
iter_->Seek(target);
if(prefetch_thread.joinable()){
stop_flag.store(true);
prefetch_thread.join();
stop_flag=false;
}
for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
cur_pos=0;
prefetch_iter_->Seek(target);
prefetch_thread=std::thread([this]() {
PreFetchThreadForward();
});
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=0;
// prefetch_iter_->Seek(target);
// prefetch_thread=std::thread([this]() {
// PreFetchThreadForward();
// });
}
void DBPreFetchIter::SeekToFirst() {
iter_->SeekToFirst();
if(prefetch_thread.joinable()){
stop_flag.store(true);
prefetch_thread.join();
stop_flag=false;
}
for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
cur_pos=0;
prefetch_iter_->SeekToFirst();
prefetch_thread=std::thread([this]() {
PreFetchThreadForward();
});
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=0;
// prefetch_iter_->SeekToFirst();
// prefetch_thread=std::thread([this]() {
// PreFetchThreadForward();
// });
}
void DBPreFetchIter::SeekToLast() {
iter_->SeekToLast();
if(prefetch_thread.joinable()){
stop_flag.store(true);
prefetch_thread.join();
stop_flag=false;
}
for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
cur_pos=1000000;
// if(prefetch_thread.joinable()){
// stop_flag.store(true);
// prefetch_thread.join();
// stop_flag=false;
// }
// for(int i=0;i<=1000000;i++)prefetched_array[i]=false;
// cur_pos=1000000;
prefetch_thread=std::thread([this]() {
prefetch_iter_->SeekToLast();
PreFetchThreadBackward();
});
// prefetch_thread=std::thread([this]() {
// prefetch_iter_->SeekToLast();
// PreFetchThreadBackward();
// });
}
} // anonymous namespace

+ 1
- 1
include/leveldb/db.h View File

@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB {
// assert(0); // Not implemented
// return Status::Corruption("not imp");
// }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, std::string* value){
assert(0); // Not implemented
return Status::Corruption("not imp");
}

Loading…
Cancel
Save