Browse Source

Merge pull request 'add more tests and update ycsb-benchmark and add crc' (#4) from version_3 into master

Reviewed-on: https://gitea.shuishan.net.cn/10225101483/XOY-leveldb-exp2/pulls/4
version_3
谢瑞阳 8 months ago
parent
commit
2dd5156072
15 changed files with 458 additions and 74 deletions
  1. +0
    -3
      .gitignore
  2. +1
    -1
      .gitmodules
  3. +1
    -1
      YCSB-cpp
  4. +98
    -12
      db/db_impl.cc
  5. +4
    -5
      db/db_impl.h
  6. +1
    -1
      db/db_test.cc
  7. +36
    -15
      db/true_iter.cc
  8. +1
    -1
      db/true_iter.h
  9. +28
    -9
      db/unordered_iter.cc
  10. +1
    -1
      db/unordered_iter.h
  11. +2
    -6
      include/leveldb/db.h
  12. +6
    -0
      include/leveldb/options.h
  13. +277
    -19
      test/test.cpp
  14. +1
    -0
      third_party/benchmark
  15. +1
    -0
      third_party/googletest

+ 0
- 3
.gitignore View File

@ -7,6 +7,3 @@
build/
out/
# 忽略 third_party 目录及其内容
third_party/
testdb_for_XOY/

+ 1
- 1
.gitmodules View File

@ -3,7 +3,7 @@
url = https://github.com/google/googletest.git
[submodule "third_party/benchmark"]
path = third_party/benchmark
url = https://github.com/google/benchmark
url = https://github.com/google/benchmark.git
[submodule "YCSB-cpp"]
path = YCSB-cpp
url = https://github.com/zerowinter0/my_YCSB_benchmark.git

+ 1
- 1
YCSB-cpp

@ -1 +1 @@
Subproject commit 7df09a150d3ab16b303c25007eb0a27c8eed8049
Subproject commit d78c62696691f5d745aa58ed8008e3f38632c2a4

+ 98
- 12
db/db_impl.cc View File

@ -16,6 +16,7 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "util/crc32c.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
@ -160,7 +161,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)),
use_valuelog_length(raw_options.use_valuelog_length),
value_log_size_(raw_options.value_log_size){
value_log_size_(raw_options.value_log_size),
valuelog_crc_(raw_options.valuelog_crc){
}
@ -838,7 +840,7 @@ void DBImpl::BackgroundCompaction() {
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
MaybeScheduleGarbageCollect();
if(options_.valuelog_gc)MaybeScheduleGarbageCollect();
}
delete c;
@ -1261,7 +1263,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
Slice value_log_slice = Slice(value->c_str(), value->length());
mutex_.Unlock();
s=parseTrueValue(&value_log_slice,value);
s=parseTrueValue(&value_log_slice,value,options.verify_checksums_for_valuelog);
mutex_.Lock();
return s;
}
@ -1287,7 +1289,7 @@ Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) {
Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options,const Slice &lower_key,const Slice &upper_key) {
auto iter=NewOriginalIterator(options);
return NewUnorderedIter(this,iter,dbname_,options.max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator());
return NewUnorderedIter(this,iter,dbname_,options,lower_key,upper_key,user_comparator());
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1306,7 +1308,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
seed);
mutex_.Unlock();
return NewTrueIterator(this,db_iter);
return NewTrueIterator(this,db_iter,options.verify_checksums_for_valuelog);
}
void DBImpl::RecordReadSample(Slice key) {
@ -1655,6 +1657,9 @@ std::vector> DBImpl::WriteValueLog(
for(const auto &pr:kv){
total_size+=pr.first.size()+pr.second.size();
}
if(valuelog_crc_){
total_size+=sizeof(uint32_t)*kv.size();
}
char* buf= new char[total_size];//write all data with one fstream.write using this buf
@ -1666,6 +1671,8 @@ std::vector> DBImpl::WriteValueLog(
auto key=pr.first,value=pr.second;
int head_offset=offset;//use for crc
// 写入 value 的长度
uint64_t value_len = value.size();
memcpy(buf+offset,&value_len,sizeof(uint64_t));
@ -1684,7 +1691,12 @@ std::vector> DBImpl::WriteValueLog(
offset+=key_len;
// 更新偏移量
if(valuelog_crc_){
uint32_t crc = crc32c::Value(buf+head_offset+sizeof(uint64_t),value_len);
crc=crc32c::Extend(crc,buf+head_offset+value_len+2*sizeof(uint64_t),key_len);
memcpy(buf+offset,&crc,sizeof(uint32_t));
offset+=sizeof(uint32_t);
}
}
valueFile.write(buf,total_size);
@ -1714,7 +1726,7 @@ static void valuelog_cache_deleter(const leveldb::Slice &key, void *value){
delete (RandomAccessFile*)value;
}
Status DBImpl::parseTrueValue(Slice* value,std::string *true_value){
Status DBImpl::parseTrueValue(Slice* value,std::string *true_value,bool checkcrc){
if(value->empty()){
*true_value="";
}
@ -1728,13 +1740,13 @@ Status DBImpl::parseTrueValue(Slice* value,std::string *true_value){
value->remove_prefix(1);
Status s=ParseFakeValueForValuelog(*value,value_id,value_offset);
if(!s.ok())return s;
return ReadValueLog(value_id,value_offset,true_value);
return ReadValueLog(value_id,value_offset,true_value,checkcrc);
}
return Status::OK();
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) {
std::string* value,bool check_crc) {
std::string file_name_ = ValueLogFileName(dbname_, file_id);
@ -1743,14 +1755,49 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,
mutex_.Unlock();
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
inFile.seekg(0, std::ios::end); // get total length
uint64_t totalSize = inFile.tellg();
if(totalSize<offset)return Status::Corruption("get value for valuelog fail:parse fail");
uint64_t value_len=0;
inFile.seekg(offset);
inFile.read((char*)(&value_len),sizeof(uint64_t));
if(totalSize<offset+value_len+sizeof(uint64_t))return Status::Corruption("get value for valuelog fail:parse fail");
char* buf=new char[value_len];
inFile.read(buf,value_len);
inFile.close();
if(check_crc){
uint64_t key_len;
inFile.read((char*)(&key_len),sizeof(uint64_t));
int total_len=value_len+key_len+2*sizeof(uint64_t);
uint64_t crc_offset=offset+total_len;
if(totalSize<crc_offset){
delete buf;
return Status::Corruption("get value for valuelog fail:parse fail");
}
char* key_buf=new char[key_len];
inFile.read(key_buf,key_len);
uint32_t crc_value;
inFile.read((char*)(&crc_value),sizeof(uint32_t));
uint32_t cal_crc_value=crc32c::Value(buf,value_len);
cal_crc_value=crc32c::Extend(cal_crc_value,key_buf,key_len);
if(cal_crc_value!=crc_value){
delete key_buf;
delete buf;
return Status::Corruption("get value for valuelog fail:crc check fail");
}
delete key_buf;
}
*value=std::string(buf,value_len);
delete buf;
return Status::OK();
}
@ -1857,6 +1904,13 @@ void DBImpl::GarbageCollect() {
assert(cur_valuelog.good());
current_offset += key_len;
uint32_t crc_value;
if(valuelog_crc_){
cur_valuelog.seekg(current_offset);
cur_valuelog.read((char*)(&crc_value),sizeof(uint32_t));
current_offset+=sizeof(uint32_t);
}
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
@ -1882,10 +1936,15 @@ void DBImpl::GarbageCollect() {
if (status.IsNotFound()) {
// Key 不存在,忽略此记录
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
continue;
}
else if(!status.ok()){//handle error:skip this valuelog
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
@ -1896,6 +1955,11 @@ void DBImpl::GarbageCollect() {
else{
if(stored_value.data()[0]==(char)(0x00)){
//value is too small
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
continue;
}
}
@ -1906,7 +1970,7 @@ void DBImpl::GarbageCollect() {
stored_offset);
if(!status.ok()){//handle error:skip this valuelog
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
@ -1918,6 +1982,11 @@ void DBImpl::GarbageCollect() {
if (stored_valuelog_id != cur_log_number ||
stored_offset != tmp_offset) {
// 记录无效,跳过
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
continue;
}
@ -1929,6 +1998,20 @@ void DBImpl::GarbageCollect() {
// Assign the read value data to the Slice
value = Slice(value_buf, val_len);
if(valuelog_crc_){
uint32_t cal_crc_value=crc32c::Value(value_buf,val_len);
cal_crc_value=crc32c::Extend(cal_crc_value,key_buf,key_len);
if(cal_crc_value!=crc_value){//the rest of this valuelog can't be trust
delete value_buf;
delete key_buf;
mutex_.Lock();
valuelog_finding_key=Slice();
lock_valuelog_key_mutex_cond_.SignalAll();
mutex_.Unlock();
break;
}
}
auto write_op=leveldb::WriteOptions();
write_op.valuelog_write=true;
@ -2121,6 +2204,7 @@ void DBImpl::InitializeExistingLogs() {
inFile.seekg(latest_valuelog_offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
latest_valuelog_offset+=key_len+sizeof(uint64_t);
if(options_.valuelog_crc)latest_valuelog_offset+=sizeof(uint32_t);
char* buf=new char[latest_valuelog_offset];
@ -2146,6 +2230,7 @@ void DBImpl::InitializeExistingLogs() {
uint64_t value_len,key_len;
int cur_offset=0;
while(1){
inFile.seekg(cur_offset);
inFile.read((char*)(&value_len),sizeof(uint64_t));
if (inFile.eof()) {
@ -2156,6 +2241,7 @@ void DBImpl::InitializeExistingLogs() {
inFile.seekg(cur_offset);
inFile.read((char*)(&key_len),sizeof(uint64_t));
cur_offset+=key_len+sizeof(uint64_t);
if(options_.valuelog_crc)cur_offset+=sizeof(uint32_t);
data_cnt++;
}

+ 4
- 5
db/db_impl.h View File

@ -76,12 +76,9 @@ class DBImpl : public DB {
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value) override;
std::string* value,bool check_crc) override;
Status parseTrueValue(Slice* value,std::string* true_value) override;
Status ReadValueLogRange(uint64_t file_id,std::vector<uint64_t> offsets,
std::string* value);
Status parseTrueValue(Slice* value,std::string* true_value,bool checkcrc) override;
// Extra methods (for testing) that are not in the public DB interface
@ -273,6 +270,8 @@ class DBImpl : public DB {
int value_log_size_;
bool valuelog_crc_;
// Have we encountered a background error in paranoid mode?
Status bg_error_ GUARDED_BY(mutex_);

+ 1
- 1
db/db_test.cc View File

@ -420,7 +420,7 @@ class DBTest : public testing::Test {
switch (ikey.type) {
case kTypeValue:
true_val=iter->value();
dbfull()->parseTrueValue(&true_val,&res);
dbfull()->parseTrueValue(&true_val,&res,false);
result += res;
break;
case kTypeDeletion:

+ 36
- 15
db/true_iter.cc View File

@ -36,9 +36,9 @@ class DBTrueIter : public Iterator {
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
DBTrueIter(DBImpl* db, Iterator* iter)
DBTrueIter(DBImpl* db, Iterator* iter,bool check_crc)
:
db_(db),iter_(iter){}
db_(db),iter_(iter),check_crc_(check_crc){}
DBTrueIter(const DBTrueIter&) = delete;
DBTrueIter& operator=(const DBTrueIter&) = delete;
@ -51,11 +51,12 @@ class DBTrueIter : public Iterator {
return iter_->key();
}
Slice value() const override {
buf_for_value=std::move(GetAndParseTrueValue(iter_->value()));
return Slice(buf_for_value.data(),buf_for_value.size());
return Slice(buf_for_value.data(),buf_for_value.size());
}
Status status() const override {
return iter_->status();
if(status_.ok())
return iter_->status();
else return status_;
}
void Next() override;
@ -65,40 +66,60 @@ class DBTrueIter : public Iterator {
void SeekToLast() override;
private:
std::string GetAndParseTrueValue(Slice tmp_value)const{
if(tmp_value.size()==0){
return "";
}
std::string str;
Status s=db_->parseTrueValue(&tmp_value,&str);
return std::move(str);
Status GetAndParseTrueValue(Slice tmp_value){
Status status=db_->parseTrueValue(&tmp_value,&buf_for_value,check_crc_);
if(!status.ok())status_=status;
return status;
}
DBImpl* db_;
Iterator* const iter_;
mutable std::string buf_for_value;
std::string buf_for_value;
Status status_=Status::OK();
bool check_crc_;
};
void DBTrueIter::Next() {
iter_->Next();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();
}
}
void DBTrueIter::Prev() {
iter_->Prev();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Prev();
}
}
void DBTrueIter::Seek(const Slice& target) {
iter_->Seek(target);
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();//lowerbound
}
}
void DBTrueIter::SeekToFirst() {
iter_->SeekToFirst();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();
}
}
void DBTrueIter::SeekToLast() {
iter_->SeekToLast();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Prev();
}
}
} // anonymous namespace
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter) {
return new DBTrueIter(db,db_iter);
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter,bool check_crc) {
return new DBTrueIter(db,db_iter,check_crc);
}
} // namespace leveldb

+ 1
- 1
db/true_iter.h View File

@ -14,7 +14,7 @@ namespace leveldb {
class DBImpl;
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter);
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter,bool check_crc);
} // namespace leveldb

+ 28
- 9
db/unordered_iter.cc View File

@ -17,6 +17,7 @@
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
#include "util/crc32c.h"
#include <fstream>
#include <iostream>
@ -40,9 +41,10 @@ class UnorderedIter : public Iterator {
// just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right};
UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator)
UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,ReadOptions readOptions,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator)
:
db_(db),iter_(iter),db_name_(db_name),max_unorder_iter_memory_usage_(max_unorder_iter_memory_usage),lower_key_(lower_key),upper_key_(upper_key),comparator_(user_comparator){
db_(db),iter_(iter),db_name_(db_name),max_unorder_iter_memory_usage_(readOptions.max_unorder_iter_memory_usage),check_crc_(readOptions.verify_checksums_for_valuelog),
lower_key_(lower_key),upper_key_(upper_key),comparator_(user_comparator){
first_one=true;
if(lower_key_.empty())iter_->SeekToFirst();
else iter_->Seek(lower_key);
@ -73,7 +75,9 @@ class UnorderedIter : public Iterator {
return now_value;
}
Status status() const override {
return iter_->status();
if(status_.ok())
return iter_->status();
else return status_;
}
void Next() override;
@ -81,7 +85,6 @@ class UnorderedIter : public Iterator {
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
private:
std::pair<uint64_t,uint64_t> GetAndParseValue(Slice tmp_value)const{
tmp_value.remove_prefix(1);
@ -97,7 +100,7 @@ class UnorderedIter : public Iterator {
return value.size()&&value.data()[0]==(0x01);
}
void MyReadValuelog(const uint64_t& offset){
bool MyReadValuelog(const uint64_t& offset){
uint64_t value_len,key_len;
current_file->seekg(offset);
current_file->read((char*)(&value_len),sizeof(uint64_t));
@ -119,8 +122,20 @@ class UnorderedIter : public Iterator {
current_file->read(buf_for_now_key,key_len);
if(check_crc_){
uint32_t crc_value;
current_file->read((char*)(&crc_value),sizeof(uint32_t));
uint32_t cal_crc_value=crc32c::Value(buf_for_now_value,value_len);
cal_crc_value=crc32c::Extend(cal_crc_value,buf_for_now_key,key_len);
if(cal_crc_value!=crc_value){
status_=Status::Corruption("valuelog crc check fail");
return false;
}
}
now_value=Slice(buf_for_now_value,value_len);
now_key=Slice(buf_for_now_key,key_len);
return true;
}
bool keyGreaterThanRequire(){
@ -151,6 +166,8 @@ class UnorderedIter : public Iterator {
std::map<uint64_t,std::vector<uint64_t>>::iterator valuelog_map_iter;
int vec_idx=-1;
int max_unorder_iter_memory_usage_;
bool check_crc_;
Status status_=Status::OK();
const Slice lower_key_;
const Slice upper_key_;
@ -208,7 +225,7 @@ void UnorderedIter::Next() {
}
int offset=valuelog_map_iter->second[vec_idx++];
MyReadValuelog(offset);
bool res=MyReadValuelog(offset);
if(vec_idx>=valuelog_map_iter->second.size()){
valuelog_map_iter++;
@ -235,7 +252,7 @@ void UnorderedIter::Next() {
}
if(!res)Next();//ignore fault like other iter did
}
void UnorderedIter::Prev() {
@ -254,7 +271,9 @@ void UnorderedIter::SeekToLast() {
}
} // anonymous namespace
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) {
return new UnorderedIter(db,db_iter,db_name,max_unorder_iter_memory_usage,lower_key,upper_key,user_comparator);
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,ReadOptions readOptions,
const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) {
return new UnorderedIter(db,db_iter,db_name,readOptions,
lower_key,upper_key,user_comparator);
}
} // namespace leveldb

+ 1
- 1
db/unordered_iter.h View File

@ -15,7 +15,7 @@ namespace leveldb {
class DBImpl;
// add a prefetch function for db_iter
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,int max_unorder_iter_memory_usage,const Slice &lower_key,const Slice &upper_key,const Comparator* comparator);
Iterator* NewUnorderedIter(DBImpl* db,Iterator* db_iter,std::string db_name,ReadOptions readOptions,const Slice &lower_key,const Slice &upper_key,const Comparator* comparator);
} // namespace leveldb

+ 2
- 6
include/leveldb/db.h View File

@ -115,16 +115,12 @@ class LEVELDB_EXPORT DB {
virtual void addNewValueLog(){assert(0);}
// virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// assert(0); // Not implemented
// return Status::Corruption("not imp");
// }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value){
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value,bool check_crc){
assert(0); // Not implemented
return Status::Corruption("not imp");
}
virtual Status parseTrueValue(Slice* value,std::string* true_value){
virtual Status parseTrueValue(Slice* value,std::string* true_value,bool checkcrc){
assert(0);
return Status::Corruption("not imp");
}

+ 6
- 0
include/leveldb/options.h View File

@ -156,6 +156,10 @@ struct LEVELDB_EXPORT Options {
int mem_value_log_number=0;//0=don't use valuelog cache
//memory usage limit for a single unordered iterator
float GC_THRESHOLD=0.6;
//if valuelog_crc is on, every k-v pair using valuelog_crc will use crc in valuelog_crc
bool valuelog_crc=false;
//use GC for valuelog
bool valuelog_gc=true;
};
// Options that control read operations
@ -180,6 +184,8 @@ struct LEVELDB_EXPORT ReadOptions {
const Snapshot* snapshot = nullptr;
int max_unorder_iter_memory_usage=64<<20; //32MB
bool verify_checksums_for_valuelog=false;
};
// Options that control write operations

+ 277
- 19
test/test.cpp View File

@ -2,13 +2,13 @@
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "leveldb/fields.h"
#include "db/filename.h"
#include <iostream>
using namespace leveldb;
std::string dbName="valuelog_test";
using Field=std::pair<Slice,Slice>;
using FieldArray=std::vector<std::pair<Slice, Slice>>;
Status OpenDB(DB **db,Options options=Options(),bool destroy_old_db=true) {
Status OpenDB(std::string dbName, DB **db,Options options=Options(),bool destroy_old_db=true) {
if(destroy_old_db){
DestroyDB(dbName,options);
}
@ -16,6 +16,53 @@ Status OpenDB(std::string dbName, DB **db,Options options=Options(),bool destroy
return DB::Open(options, dbName, db);
}
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt,std::string dbname_) {
// Pick file to corrupt
std::vector<std::string> filenames;
auto env_=Env::Default();
assert(env_->GetChildren(dbname_, &filenames).ok());
uint64_t number;
FileType type;
std::string fname;
int picked_number = 10000000;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
int(number) < picked_number) { // Pick oldest file
fname = dbname_ + "/" + filenames[i];
picked_number = number;
}
}
ASSERT_TRUE(!fname.empty()) << filetype;
uint64_t file_size;
ASSERT_TRUE(env_->GetFileSize(fname, &file_size).ok());
if (offset < 0) {
// Relative to end of file; make it absolute
if (-offset > file_size) {
offset = 0;
} else {
offset = file_size + offset;
}
}
if (offset > file_size) {
offset = file_size;
}
if (offset + bytes_to_corrupt > file_size) {
bytes_to_corrupt = file_size - offset;
}
// Do it
std::string contents;
Status s = ReadFileToString(env_, fname, &contents);
ASSERT_TRUE(s.ok()) << s.ToString();
for (int i = 0; i < bytes_to_corrupt; i++) {
contents[i + offset] ^= 0x80;
}
s = WriteStringToFile(env_, contents, fname);
ASSERT_TRUE(s.ok()) << s.ToString();
}
std::string GenKeyByNum(int num,int len){
std::string key=std::to_string(num);
while(key.size()<std::to_string(len).size()){
@ -62,7 +109,7 @@ TEST(Test, valuelog_iterator_test) {
int RANGE=5000;
if(OpenDB("valuelog_iterator_test", &db).ok() == false) {
if(OpenDB(&db,dboptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -112,7 +159,7 @@ TEST(Test, mix_valuelog_iterator_test) {
int RANGE=5000;
if(OpenDB("mix_valuelog_iterator_test", &db).ok() == false) {
if(OpenDB(&db,dboptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -142,11 +189,11 @@ TEST(Test, unorder_valuelog_iterator_test) {
WriteOptions writeOptions;
ReadOptions readOptions;
Options dboptions;
dboptions.use_valuelog_length=4000;
dboptions.use_valuelog_length=100;
int RANGE=5000;
if(OpenDB("valuelog_iterator_test", &db).ok() == false) {
if(OpenDB(&db,dboptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -154,7 +201,7 @@ TEST(Test, unorder_valuelog_iterator_test) {
std::vector<std::pair<std::string,std::string>> new_values;
for(int i=0;i<RANGE;i++){
std::string key=GenKeyByNum(i,RANGE);
std::string value=GenValueByNum(rand()%2000,1000);//if >1000 then in valuelog(length=4*1000)
std::string value=GenValueByNum(i,1000);
values.push_back(value);
Status s=db->Put(writeOptions,key,value);
assert(s.ok());
@ -196,7 +243,7 @@ TEST(Test, fields_simple_test) {
ReadOptions readOptions;
Options dbOptions;
dbOptions.use_valuelog_length=-1;
if(OpenDB("fields_simple_test", &db).ok() == false) {
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -227,7 +274,7 @@ TEST(Test, fields_simple_test) {
TEST(Test, get_keys_by_field_test) {
DB *db;
ReadOptions readOptions;
if(OpenDB("get_keys_by_field_test", &db).ok() == false) {
if(OpenDB(&db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -262,13 +309,13 @@ TEST(Test, valuelog_common_test) {
ReadOptions readOptions;
Options dbOptions;
dbOptions.use_valuelog_length=100;
if(OpenDB("valuelog_common_test", &db).ok() == false) {
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
//test Put
std::vector<std::string> values;
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<5000;j++){
@ -277,7 +324,7 @@ TEST(Test, valuelog_common_test) {
values.push_back(value);
db->Put(writeOptions,key,value);
}
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
@ -285,7 +332,7 @@ TEST(Test, valuelog_common_test) {
ASSERT_TRUE(values[i]==value);
}
//test cover put
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<3000;j++){
@ -294,7 +341,7 @@ TEST(Test, valuelog_common_test) {
values[i]=value;
db->Put(writeOptions,key,value);
}
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
@ -302,11 +349,11 @@ TEST(Test, valuelog_common_test) {
ASSERT_TRUE(values[i]==value);
}
//test delete
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
db->Delete(writeOptions,key);
}
for(int i=0;i<50000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
@ -315,11 +362,105 @@ TEST(Test, valuelog_common_test) {
delete db;
}
TEST(Test, Garbage_Collect_TEST) {
TEST(Test, valuelog_corruption_test) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
readOptions.verify_checksums_for_valuelog=true;
Options dbOptions;
dbOptions.use_valuelog_length=100;
dbOptions.valuelog_gc=false;
dbOptions.value_log_size=1<<26;
dbOptions.valuelog_crc=true;
//a record size:8+4+8+4*5000+(4)=20024
//64*1024*1024/20024=3351.42
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
//test Put
std::vector<std::string> values;
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
for(int j=0;j<5000;j++){
value+=key;
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
ASSERT_TRUE(values[i]==value);
}
//test corrupt
Corrupt(FileType::kValueLogFile,20100,1,dbName);
//the second record is corrupt,
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
if(i!=1)ASSERT_TRUE(db->Get(readOptions,key,&value).ok());
else ASSERT_FALSE(db->Get(readOptions,key,&value).ok());
}
auto iter=db->NewIterator(readOptions);
iter->SeekToFirst();
ASSERT_TRUE(iter->status().ok()&&iter->Valid());
iter->Next();//skip 1,to 2
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[2]);
iter->Seek(GenKeyByNum(1,5000));
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[2]);
iter->Prev();//skip 1,to 0
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[0]);
delete iter;
db->Put(writeOptions,GenKeyByNum(1,5000),values[1]);//1 is back to normal
//test corrupt on length
Corrupt(FileType::kValueLogFile,20024+20024+2,1,dbName);
//the third record is corrupt,
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
if(i!=2)ASSERT_TRUE(db->Get(readOptions,key,&value).ok());
else ASSERT_FALSE(db->Get(readOptions,key,&value).ok());
}
iter=db->NewIterator(readOptions);
iter->SeekToFirst();
ASSERT_TRUE(iter->status().ok()&&iter->Valid());
iter->Next();
ASSERT_TRUE(iter->status().ok()&&iter->Valid());
iter->Next();//skip 2,to 3
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[3]);
iter->Seek(GenKeyByNum(2,5000));
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[3]);
iter->Prev();//skip 2,to 1
ASSERT_TRUE(!iter->status().ok()&&iter->Valid());
ASSERT_TRUE(iter->value()==values[1]);
delete iter;
delete db;
}
TEST(Test, garbage_collect_test) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
Options dbOptions;
dbOptions.write_buffer_size=1024;
dbOptions.max_file_size=8*1024;
dbOptions.valuelog_gc=false;
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -330,10 +471,44 @@ TEST(Test, Garbage_Collect_TEST) {
for(int j=0;j<1000;j++){
value+=std::to_string(i);
}
db->Put(writeOptions,key,value);
}
for(int i=0;i<50000;i++){//make all remaining valuelog worthless, so they will be GC
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1001;j++){
value+=std::to_string(i);
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
std::vector<std::string> origin_filenames;
auto env_=Env::Default();
ASSERT_TRUE(env_->GetChildren(dbName, &origin_filenames).ok());
int oldest_valuelog_id=1000;
for(auto file:origin_filenames){
uint64_t number;
FileType fileType;
ParseFileName(file,&number,&fileType);
if(fileType==FileType::kValueLogFile&&number<oldest_valuelog_id)oldest_valuelog_id=number;
}
ASSERT_TRUE(oldest_valuelog_id<1000);
db->CompactRange(nullptr,nullptr);//create garbage
db->TEST_GarbageCollect();
db->CompactRange(nullptr,nullptr);//update version
std::vector<std::string> new_filenames;
ASSERT_TRUE(env_->GetChildren(dbName, &new_filenames).ok());
int oldest_new_valuelog_id=1000;
for(auto file:new_filenames){
uint64_t number;
FileType fileType;
ParseFileName(file,&number,&fileType);
if(fileType==FileType::kValueLogFile&&number<oldest_new_valuelog_id)oldest_new_valuelog_id=number;
}
ASSERT_TRUE(oldest_new_valuelog_id<1000);
ASSERT_TRUE(oldest_new_valuelog_id>oldest_valuelog_id);//at least one valuelog file should be deleted
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
@ -345,6 +520,89 @@ TEST(Test, Garbage_Collect_TEST) {
delete db;
}
TEST(Test, recovery_test){
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
Options dbOptions;
dbOptions.write_buffer_size=1024;
dbOptions.max_file_size=8*1024;
dbOptions.valuelog_gc=false;
dbOptions.valuelog_crc=true;
dbOptions.use_valuelog_length=100;
readOptions.verify_checksums_for_valuelog=true;
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> values;
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
for(int j=0;j<5000;j++){
value+=key;
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
delete db;
if(OpenDB(&db,dbOptions,false).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
ASSERT_TRUE(values[i]==value);
}
delete db;
if(OpenDB(&db,dbOptions,false).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
for(int j=0;j<5000;j++){
value+=key;
}
db->Put(writeOptions,key,value);
}
//test the meta info for gc is still useable
std::vector<std::string> origin_filenames;
auto env_=Env::Default();
ASSERT_TRUE(env_->GetChildren(dbName, &origin_filenames).ok());
int oldest_valuelog_id=1000;
for(auto file:origin_filenames){
uint64_t number;
FileType fileType;
ParseFileName(file,&number,&fileType);
if(fileType==FileType::kValueLogFile&&number<oldest_valuelog_id)oldest_valuelog_id=number;
}
ASSERT_TRUE(oldest_valuelog_id<1000);
db->CompactRange(nullptr,nullptr);//create garbage
db->TEST_GarbageCollect();
db->CompactRange(nullptr,nullptr);//update version
std::vector<std::string> new_filenames;
ASSERT_TRUE(env_->GetChildren(dbName, &new_filenames).ok());
int oldest_new_valuelog_id=1000;
for(auto file:new_filenames){
uint64_t number;
FileType fileType;
ParseFileName(file,&number,&fileType);
if(fileType==FileType::kValueLogFile&&number<oldest_new_valuelog_id)oldest_new_valuelog_id=number;
}
ASSERT_TRUE(oldest_new_valuelog_id<1000);
ASSERT_TRUE(oldest_new_valuelog_id>oldest_valuelog_id);//at least one valuelog file should be deleted
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.

+ 1
- 0
third_party/benchmark

@ -0,0 +1 @@
Subproject commit f4f93b5553ced834b2120048f65690cddb4b7a2f

+ 1
- 0
third_party/googletest

@ -0,0 +1 @@
Subproject commit 7d76a231b0e29caf86e68d1df858308cd53b2a66

Loading…
Cancel
Save