Compare commits

...

2 Commits

Author SHA1 Message Date
  小人鱼 a6cbcb44cd data push valuelog until sstable 9 months ago
  小人鱼 6bf30eea97 fix bug and update benchmark 9 months ago
8 changed files with 67 additions and 122 deletions
Unified View
  1. +1
    -1
      benchmarks/db_bench.cc
  2. +23
    -8
      db/builder.cc
  3. +7
    -56
      db/db_impl.cc
  4. +0
    -5
      db/db_impl.h
  5. +32
    -39
      db/write_batch.cc
  6. +0
    -2
      db/write_batch_internal.h
  7. +0
    -8
      include/leveldb/db.h
  8. +4
    -3
      test/test.cpp

+ 1
- 1
benchmarks/db_bench.cc View File

@ -74,7 +74,7 @@ static int FLAGS_reads = -1;
static int FLAGS_threads = 1; static int FLAGS_threads = 1;
// Size of each value // Size of each value
static int FLAGS_value_size = 100;
static int FLAGS_value_size = 5000;
// Arrange to generate values that shrink to this fraction of // Arrange to generate values that shrink to this fraction of
// their original size after compression // their original size after compression

+ 23
- 8
db/builder.cc View File

@ -21,6 +21,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
iter->SeekToFirst(); iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number); std::string fname = TableFileName(dbname, meta->number);
std::string value_fname = ValueLogFileName(dbname, meta->valuelog_id);
if (iter->Valid()) { if (iter->Valid()) {
WritableFile* file; WritableFile* file;
s = env->NewWritableFile(fname, &file); s = env->NewWritableFile(fname, &file);
@ -28,22 +29,36 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
return s; return s;
} }
{
auto tmp_value=iter->value();
if(tmp_value.data()[0]==(char)(0x01)){
tmp_value.remove_prefix(1);
assert(GetVarint64(&tmp_value,&meta->valuelog_id));
}
else meta->valuelog_id=0;
WritableFile* value_file;
s = env->NewWritableFile(value_fname, &value_file);
if (!s.ok()) {
return s;
} }
TableBuilder* builder = new TableBuilder(options, file); TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
Slice key; Slice key;
uint64_t valuelog_offset=0;
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
key = iter->key(); key = iter->key();
builder->Add(key, iter->value());
Slice value=iter->value();
Slice new_value=value;
if(value.size()>100){
value.remove_prefix(1);
std::string buf;
value_file->Append(value);
buf+=(char)(0x01);
PutVarint64(&buf,meta->valuelog_id);
PutVarint64(&buf,valuelog_offset);
PutVarint64(&buf,value.size());
valuelog_offset+=value.size();
new_value=Slice(buf);
}
builder->Add(key, new_value);
} }
value_file->Flush();
value_file->Sync();
value_file->Close();
if (!key.empty()) { if (!key.empty()) {
meta->largest.DecodeFrom(key); meta->largest.DecodeFrom(key);
} }

+ 7
- 56
db/db_impl.cc View File

@ -518,6 +518,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
meta.number = versions_->NewFileNumber(); meta.number = versions_->NewFileNumber();
meta.valuelog_id = versions_->NewFileNumber();
pending_outputs_.insert(meta.number); pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started", Log(options_.info_log, "Level-0 table #%llu: started",
@ -1092,14 +1093,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) { if (status.ok()) {
status = input->status(); status = input->status();
} }
//not completely correct, should be written in new function, related to removeabsol...
// if(status.ok()){
// for(auto id:old_valuelog_ids){
// auto valuelog_filename=ValueLogFileName(dbname_,id);
// Status s=env_->RemoveFile(valuelog_filename);
// assert(s.ok());
// }
// }
delete input; delete input;
input = nullptr; input = nullptr;
@ -1234,7 +1227,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref(); mem->Unref();
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
if(!s.ok())return s;
if(value->c_str()[0]==0x00){ if(value->c_str()[0]==0x00){
*value=value->substr(1); *value=value->substr(1);
return s; return s;
@ -1314,7 +1307,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch,this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
@ -1480,7 +1472,6 @@ Status DBImpl::MakeRoomForWrite(bool force) {
RecordBackgroundError(s); RecordBackgroundError(s);
} }
delete logfile_; delete logfile_;
addNewValueLog();
logfile_ = lfile; logfile_ = lfile;
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_ = new log::Writer(lfile); log_ = new log::Writer(lfile);
@ -1574,55 +1565,12 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
//lock
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valuelogfile_->Append(values[i]);
// res.push_back({valuelogfile_number_,{valuelogfile_offset,len}});
// valuelogfile_offset+=len;
// }
// //unlock
// valuelogfile_->Flush();
// return res;
std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
uint64_t offset=valueFile.tellp();
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
for(int i=0;i<values.size();i++){
int len=values[i].size();
valueFile.write(values[i].data(),len);
res.push_back({valuelogfile_number_,{offset,len}});
offset+=len;
}
//unlock
valueFile.close();
return res;
}
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){ void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){
for(int i=0;i<values.size();i++){ for(int i=0;i<values.size();i++){
target_file->Append(values[i]); target_file->Append(values[i]);
} }
} }
void DBImpl::addNewValueLog(){
//lock
// if(valuelogfile_){
// valuelogfile_->Sync();
// valuelogfile_->Close();
// delete valuelogfile_;
// }
valuelogfile_number_=versions_->NewFileNumber();
// valuelogfile_offset=0;
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// env_->NewWritableFile(file_name_,&valuelogfile_);
//unlock
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
//lock_shared //lock_shared
std::string file_name_=ValueLogFileName(dbname_,file_id); std::string file_name_=ValueLogFileName(dbname_,file_id);
@ -1644,7 +1592,11 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch;
batch.Put(key, value);
std::string buf;
buf+=(char)(0x00);
buf+=std::string(value.data(),value.size());
Slice new_value=Slice(buf);
batch.Put(key, new_value);
return Write(opt, &batch); return Write(opt, &batch);
} }
@ -1678,7 +1630,6 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref(); impl->mem_->Ref();
impl->addNewValueLog();
} }
} }
if (s.ok() && save_manifest) { if (s.ok() && save_manifest) {

+ 0
- 5
db/db_impl.h View File

@ -63,9 +63,7 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override; bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value); void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value);
void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);;
std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
@ -199,10 +197,7 @@ class DBImpl : public DB {
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_; WritableFile* logfile_;
WritableFile* valuelogfile_;
int valuelogfile_offset=0;
uint64_t logfile_number_; uint64_t logfile_number_;
uint64_t valuelogfile_number_;
log::Writer* log_; log::Writer* log_;
std::map<uint64_t,uint64_t> oldvaluelog_ids; std::map<uint64_t,uint64_t> oldvaluelog_ids;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.

+ 32
- 39
db/write_batch.cc View File

@ -127,38 +127,38 @@ class MemTableInserter : public WriteBatch::Handler {
sequence_++; sequence_++;
} }
}; };
class ValueLogInserter : public WriteBatch::Handler {
public:
WriteBatch writeBatch_;
DB* db_;
ValueLogInserter(WriteBatch writeBatch,DB* db){
writeBatch_=writeBatch;
db_=db;
}
void Put(const Slice& key, const Slice& value) override {
Slice new_value;
std::string buf;
if(value.size()<100){
buf+=(char)(0x00);
buf.append(value.data(),value.size());
}
else{
buf+=(char)(0x01);
std::vector<Slice> v;
v.push_back(value);
auto res=db_->WriteValueLog(v);
PutVarint64(&buf,res[0].first);
PutVarint64(&buf,res[0].second.first);
PutVarint64(&buf,res[0].second.second);
}
new_value=Slice(buf);
writeBatch_.Put(key,new_value);
}
void Delete(const Slice& key) override {
writeBatch_.Delete(key);
}
};
// class ValueLogInserter : public WriteBatch::Handler {
// public:
// WriteBatch writeBatch_;
// DB* db_;
// ValueLogInserter(WriteBatch writeBatch,DB* db){
// writeBatch_=writeBatch;
// db_=db;
// }
// void Put(const Slice& key, const Slice& value) override {
// Slice new_value;
// std::string buf;
// if(value.size()<100){
// buf+=(char)(0x00);
// buf.append(value.data(),value.size());
// }
// else{
// buf+=(char)(0x01);
// std::vector<Slice> v;
// v.push_back(value);
// auto res=db_->WriteValueLog(v);
// PutVarint64(&buf,res[0].first);
// PutVarint64(&buf,res[0].second.first);
// PutVarint64(&buf,res[0].second.second);
// }
// new_value=Slice(buf);
// writeBatch_.Put(key,new_value);
// }
// void Delete(const Slice& key) override {
// writeBatch_.Delete(key);
// }
// };
} // namespace } // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
@ -168,13 +168,6 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){
ValueLogInserter inserter(WriteBatch(),db_);
auto res=b->Iterate(&inserter);
*b=inserter.writeBatch_;
return res;
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader); assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());

+ 0
- 2
db/write_batch_internal.h View File

@ -37,8 +37,6 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static Status ConverToValueLog(WriteBatch* batch,DB* db_);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);
}; };

+ 0
- 8
include/leveldb/db.h View File

@ -102,14 +102,6 @@ class LEVELDB_EXPORT DB {
// virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys); // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys);
virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
assert(0);
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
return v;
}
virtual void addNewValueLog(){assert(0);}
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
assert(0); // Not implemented assert(0); // Not implemented
return Status::Corruption("not imp"); return Status::Corruption("not imp");

+ 4
- 3
test/test.cpp View File

@ -10,8 +10,8 @@ using FieldArray=std::vector>;
Status OpenDB(std::string dbName, DB **db) { Status OpenDB(std::string dbName, DB **db) {
Options options; Options options;
options.max_file_size=16*1024;
options.write_buffer_size=32*1024;
// options.max_file_size=16*1024;
// options.write_buffer_size=32*1024;
options.create_if_missing = true; options.create_if_missing = true;
return DB::Open(options, dbName, db); return DB::Open(options, dbName, db);
} }
@ -90,12 +90,13 @@ TEST(Test, CheckGetFields) {
DB *db; DB *db;
WriteOptions writeOptions; WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
std::cout<<"!!!"<<std::endl;
if(OpenDB("testdb_for_XOY", &db).ok() == false) { if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
std::string key1 = "k_1"; std::string key1 = "k_1";
std::cout<<"!!!"<<std::endl;
FieldArray fields1 = { FieldArray fields1 = {
{"name", "Customer#000000001"}, {"name", "Customer#000000001"},
{"address", "IVhzIApeRb"}, {"address", "IVhzIApeRb"},

Loading…
Cancel
Save