Browse Source

range prefetch(length=1) maybe finish

pull/3/head
小人鱼 9 months ago
parent
commit
8dafb3af8f
7 changed files with 519 additions and 119 deletions
  1. +2
    -0
      CMakeLists.txt
  2. +3
    -8
      db/db_impl.cc
  3. +2
    -17
      db/db_iter.cc
  4. +337
    -0
      db/prefetch_iter.cc
  5. +22
    -0
      db/prefetch_iter.h
  6. +1
    -1
      port/port.h
  7. +152
    -93
      test/test.cpp

+ 2
- 0
CMakeLists.txt View File

@ -129,6 +129,8 @@ target_sources(leveldb
"db/db_impl.h"
"db/db_iter.cc"
"db/db_iter.h"
"db/prefetch_iter.cc"
"db/prefetch_iter.h"
"db/dbformat.cc"
"db/dbformat.h"
"db/dumpfile.cc"

+ 3
- 8
db/db_impl.cc View File

@ -6,6 +6,7 @@
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/prefetch_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
@ -1262,12 +1263,13 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(this, user_comparator(), iter,
auto db_iter=NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
return NewPreFetchIterator(this,db_iter);
}
void DBImpl::RecordReadSample(Slice key) {
@ -1754,15 +1756,8 @@ void DBImpl::GarbageCollect() {
if(!versions_->checkOldValueLog(tmp_name))valuelog_set.emplace(filename);
}
}
//bool tmp_judge=false;//only clean one file
for (std::string valuelog_name : valuelog_set) {
Log(options_.info_log, ("gc processing: "+valuelog_name).data());
// if(tmp_judge){
// break;
// }
// else{
// tmp_judge=true;
// }
uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name = ValueLogFileName(dbname_, cur_log_number);
if (cur_log_number == valuelogfile_number_) {

+ 2
- 17
db/db_iter.cc View File

@ -68,23 +68,8 @@ class DBIter : public Iterator {
}
Slice value() const override {
assert(valid_);
auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_;
Slice key;
if(tmp_value.data()[0]==0x00){
tmp_value.remove_prefix(1);
return tmp_value;
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&tmp_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
// res=GetVarint64(&tmp_value,&valuelog_len);
// if(!res)assert(0);
// db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
return tmp_value;
Slice val = (direction_ == kForward) ? iter_->value() : saved_value_;
return val;
}
Status status() const override {
if (status_.ok()) {

+ 337
- 0
db/prefetch_iter.cc View File

@ -0,0 +1,337 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>
#include <fstream>
#include <thread>
#include "db/prefetch_iter.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBPreFetchIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBPreFetchIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right};
DBPreFetchIter(DBImpl* db, Iterator* iter)
:
db_(db),iter_(iter) {}
DBPreFetchIter(const DBPreFetchIter&) = delete;
DBPreFetchIter& operator=(const DBPreFetchIter&) = delete;
~DBPreFetchIter() override { delete iter_; }
bool Valid() const override { return iter_->Valid(); }
Slice key() const override {
return iter_->key();
}
Slice value() const override {
if(current_valid_)return current_value_;
else assert(0);
}
Status status() const override {
return iter_->status();
}
void Next() override;
void Prev() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
private:
Slice GetAndParseTrueValue(){
Slice tmp_value=iter_->value();
Slice key;
if(tmp_value.data()[0]==0x00){
tmp_value.remove_prefix(1);
return tmp_value;
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&tmp_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
return tmp_value;
}
void prefetch_left(){
prefetch_mutex.AssertHeld();
left_value_=GetAndParseTrueValue();
left_valid_=true;
prefetch_mutex.Unlock();
}
void prefetch_right(){
prefetch_mutex.AssertHeld();
right_value_=GetAndParseTrueValue();
right_valid_=true;
prefetch_mutex.Unlock();
}
port::Mutex prefetch_mutex;
DBImpl* db_;
Iterator* const iter_;
Slice left_value_;
Slice current_value_;
Slice right_value_;
bool left_valid_=false;
bool current_valid_=false;
bool right_valid_=false;
IterPos iter_pos_;
};
void DBPreFetchIter::Next() {
prefetch_mutex.Lock();
prefetch_mutex.Unlock();
assert(iter_->Valid());
if(iter_pos_==IterPos::Left){
iter_->Next();
assert(current_valid_);
left_value_=current_value_;
left_valid_=true;
iter_->Next();
if(!iter_->Valid()){
iter_pos_=IterPos::Mid;
current_valid_=false;
return;
}
assert(right_valid_);
current_value_=right_value_;
current_valid_=true;
iter_->Next();
if(!iter_->Valid()){
//back to last
iter_->SeekToLast();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
right_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_right();
}).detach();
iter_pos_=IterPos::Right;
}
else if(iter_pos_==IterPos::Mid){
assert(current_valid_);
left_value_=current_value_;
left_valid_=true;
iter_->Next();
if(!iter_->Valid()){
iter_pos_=IterPos::Mid;
current_valid_=false;
return;
}
if(right_valid_)current_value_=right_value_;
else current_value_=GetAndParseTrueValue();
current_valid_=true;
iter_->Next();
if(!iter_->Valid()){
//back to last
iter_->SeekToLast();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
right_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_right();
}).detach();
iter_pos_=IterPos::Right;
}
else if(iter_pos_==IterPos::Right){
assert(current_valid_);
left_value_=current_value_;
left_valid_=true;
assert(right_valid_);
current_value_=right_value_;
current_valid_=true;
iter_->Next();
if(!iter_->Valid()){
//back to last
iter_->SeekToLast();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
right_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_right();
}).detach();
iter_pos_=IterPos::Right;
}
}
void DBPreFetchIter::Prev() {
prefetch_mutex.Lock();
prefetch_mutex.Unlock();
assert(iter_->Valid());
if(iter_pos_==IterPos::Left){
assert(current_valid_);
right_value_=current_value_;
right_valid_=true;
assert(left_valid_);
current_value_=left_value_;
current_valid_=true;
iter_->Prev();
if(!iter_->Valid()){
//back to first
iter_->SeekToFirst();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
left_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_left();
}).detach();
iter_pos_=IterPos::Left;
}
else if(iter_pos_==IterPos::Mid){
assert(current_valid_);
right_value_=current_value_;
right_valid_=true;
iter_->Prev();
if(!iter_->Valid()){
iter_pos_=IterPos::Mid;
current_valid_=false;
return;
}
if(left_valid_)current_value_=left_value_;
else current_value_=GetAndParseTrueValue();
current_valid_=true;
iter_->Prev();
if(!iter_->Valid()){
//back to first
iter_->SeekToFirst();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
left_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_left();
}).detach();
iter_pos_=IterPos::Left;
}
else if(iter_pos_==IterPos::Right){
iter_->Prev();
assert(current_valid_);
right_value_=current_value_;
iter_->Prev();
if(!iter_->Valid()){
iter_pos_=IterPos::Mid;
current_valid_=false;
return;
}
current_valid_=true;
assert(left_valid_);
current_value_=left_value_;
iter_->Prev();
if(!iter_->Valid()){
//back to first
iter_->SeekToFirst();
assert(iter_->Valid());
iter_pos_=IterPos::Mid;
left_valid_=false;
return;
}
prefetch_mutex.Lock();
std::thread([this]() {
prefetch_left();
}).detach();
iter_pos_=IterPos::Left;
}
}
void DBPreFetchIter::Seek(const Slice& target) {
iter_->Seek(target);
left_valid_=false;
right_valid_=false;
current_valid_=false;
iter_pos_=IterPos::Mid;
if(iter_->Valid()){
current_value_=GetAndParseTrueValue();
current_valid_=true;
}
}
void DBPreFetchIter::SeekToFirst() {
iter_->SeekToFirst();
left_valid_=false;
right_valid_=false;
iter_pos_=IterPos::Mid;
if(iter_->Valid()){
current_valid_=true;
current_value_=GetAndParseTrueValue();
}
else current_valid_=false;
}
void DBPreFetchIter::SeekToLast() {
iter_->SeekToLast();
left_valid_=false;
right_valid_=false;
iter_pos_=IterPos::Mid;
if(iter_->Valid()){
current_valid_=true;
current_value_=GetAndParseTrueValue();
}
else current_valid_=false;
}
} // anonymous namespace
Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter) {
return new DBPreFetchIter(db,db_iter);
}
} // namespace leveldb

+ 22
- 0
db/prefetch_iter.h View File

@ -0,0 +1,22 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#include <cstdint>
#include "db/dbformat.h"
#include "leveldb/db.h"
namespace leveldb {
class DBImpl;
// add a prefetch function for db_iter
Iterator* NewPreFetchIterator(DBImpl* db,Iterator* db_iter);
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_

+ 1
- 1
port/port.h View File

@ -4,7 +4,7 @@
#ifndef STORAGE_LEVELDB_PORT_PORT_H_
#define STORAGE_LEVELDB_PORT_PORT_H_
//#define LEVELDB_PLATFORM_POSIX
#include <string.h>
// Include the appropriate platform specific file below. If you are

+ 152
- 93
test/test.cpp View File

@ -2,7 +2,7 @@
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "util/coding.h"
#include <iostream>
using namespace leveldb;
using Field=std::pair<Slice,Slice>;
@ -16,111 +16,138 @@ Status OpenDB(std::string dbName, DB **db) {
return DB::Open(options, dbName, db);
}
TEST(Test, CheckGetFields) {
TEST(Test, checkIterator) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
auto value1=SerializeValue(fields1);
db->Put(WriteOptions(), key1, value1);
std::string value_ret;
FieldArray res1;
db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1);
for(auto pr:res1){
std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
}
ASSERT_TRUE(CompareFieldArray(fields1, res1));
db->Delete(WriteOptions(),key1);
std::cout<<"get serialized value done"<<std::endl;
delete db;
}
TEST(Test, CheckSearchKey) {
DB *db;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> keys;
std::vector<std::string> target_keys;
for(int i=0;i<10000;i++){
std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
FieldArray fields={
{"name", key},
{"address", std::to_string(rand()%7)},
{"phone", std::to_string(rand()%114514)}
};
if(rand()%5==0){
fields[0].second="special_key";
target_keys.push_back(key);
}
keys.push_back(key);
db->Put(WriteOptions(),key,SerializeValue(fields));
}
std::sort(target_keys.begin(),target_keys.end());
std::vector<std::string> key_res;
Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
ASSERT_TRUE(CompareKey(key_res, target_keys));
std::cout<<"get key by field done"<<std::endl;
for(auto s:keys){
db->Delete(WriteOptions(),s);
}
delete db;
}
TEST(Test, LARGE_DATA_COMPACT_TEST) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
if(OpenDB("testdb_for_XOY_search", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> values;
for(int i=0;i<500000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
while(key.size()<4){
key='0'+key;
}
std::string value;
for(int j=0;j<5000;j++){
value+=std::to_string(i);
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
for(int i=0;i<500000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
Status s=db->Put(writeOptions,key,value);
assert(s.ok());
if(values[i]!=value){
std::cout<<value.size()<<std::endl;
assert(0);
}
auto iter=db->NewIterator(readOptions);
iter->SeekToFirst();
for(int i=0;i<5000;i++){
assert(iter->Valid());
auto value=iter->value();
if(value!=values[i]){
std::cout<<std::string(value.data(),value.size())<<std::endl;
}
ASSERT_TRUE(values[i]==value);
assert(values[i]==value);
iter->Next();
}
assert(!iter->Valid());
iter->SeekToLast();
for(int i=4999;i>=0;i--){
assert(iter->Valid());
auto value=iter->value();
if(value!=values[i]){
std::cout<<std::string(value.data(),value.size())<<std::endl;
}
assert(values[i]==value);
iter->Prev();
}
assert(!iter->Valid());
iter->Seek("4990");
for(int i=4990;i<5000;i++){
assert(iter->Valid());
auto value=iter->value();
if(value!=values[i]){
std::cout<<std::string(value.data(),value.size())<<std::endl;
}
assert(values[i]==value);
iter->Next();
}
assert(!iter->Valid());
delete iter;
delete db;
}
TEST(Test, Garbage_Collect_TEST) {
// TEST(Test, CheckGetFields) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::string key1 = "k_1";
// FieldArray fields1 = {
// {"name", "Customer#000000001"},
// {"address", "IVhzIApeRb"},
// {"phone", "25-989-741-2988"}
// };
// auto value1=SerializeValue(fields1);
// db->Put(WriteOptions(), key1, value1);
// std::string value_ret;
// FieldArray res1;
// db->Get(ReadOptions(), key1, &value_ret);
// DeserializeValue(value_ret, &res1);
// for(auto pr:res1){
// std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
// }
// ASSERT_TRUE(CompareFieldArray(fields1, res1));
// db->Delete(WriteOptions(),key1);
// std::cout<<"get serialized value done"<<std::endl;
// delete db;
// }
// TEST(Test, CheckSearchKey) {
// DB *db;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> keys;
// std::vector<std::string> target_keys;
// for(int i=0;i<10000;i++){
// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
// FieldArray fields={
// {"name", key},
// {"address", std::to_string(rand()%7)},
// {"phone", std::to_string(rand()%114514)}
// };
// if(rand()%5==0){
// fields[0].second="special_key";
// target_keys.push_back(key);
// }
// keys.push_back(key);
// db->Put(WriteOptions(),key,SerializeValue(fields));
// }
// std::sort(target_keys.begin(),target_keys.end());
// std::vector<std::string> key_res;
// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res);
// ASSERT_TRUE(CompareKey(key_res, target_keys));
// std::cout<<"get key by field done"<<std::endl;
// for(auto s:keys){
// db->Delete(WriteOptions(),s);
// }
// delete db;
// }
TEST(Test, LARGE_DATA_COMPACT_TEST) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
@ -132,18 +159,13 @@ TEST(Test, Garbage_Collect_TEST) {
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1000;j++){
for(int j=0;j<5000;j++){
value+=std::to_string(i);
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
std::cout<<"start gc"<<std::endl;
db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<5000;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
@ -157,6 +179,43 @@ TEST(Test, Garbage_Collect_TEST) {
delete db;
}
// TEST(Test, Garbage_Collect_TEST) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::vector<std::string> values;
// for(int i=0;i<5000;i++){
// std::string key=std::to_string(i);
// std::string value;
// for(int j=0;j<1000;j++){
// value+=std::to_string(i);
// }
// values.push_back(value);
// db->Put(writeOptions,key,value);
// }
// std::cout<<"start gc"<<std::endl;
// db->TEST_GarbageCollect();
// std::cout<<"finish gc"<<std::endl;
// for(int i=0;i<5000;i++){
// // std::cout<<i<<std::endl;
// std::string key=std::to_string(i);
// std::string value;
// Status s=db->Get(readOptions,key,&value);
// assert(s.ok());
// if(values[i]!=value){
// std::cout<<value.size()<<std::endl;
// assert(0);
// }
// ASSERT_TRUE(values[i]==value);
// }
// delete db;
// }
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.

Loading…
Cancel
Save