Browse Source

k/v seperation version 2 roughly finish

pull/1/head
alexfisher 10 months ago
parent
commit
50b731220b
6 changed files with 132 additions and 34 deletions
  1. +59
    -26
      db/db_impl.cc
  2. +8
    -1
      db/db_impl.h
  3. +14
    -7
      db/db_iter.cc
  4. +36
    -0
      db/write_batch.cc
  5. +2
    -0
      db/write_batch_internal.h
  6. +13
    -0
      include/leveldb/db.h

+ 59
- 26
db/db_impl.cc View File

@ -1162,18 +1162,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref(); mem->Unref();
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
std::ifstream inFile("tmp.txt", std::ios::in | std::ios::binary);
if (!inFile.is_open()) {
std::cerr << "Failed to open file for writing!" << std::endl;
return Status::Corruption("Failed to open file for writing!");
}
uint64_t value_offset=*(uint64_t*)(value->c_str());
size_t value_size=*(size_t*)(value->c_str()+sizeof(uint64_t));
inFile.seekg(value_offset);
char value_buf[value_size];
inFile.read(value_buf,value_size);
inFile.close();
*value=std::string(value_buf,value_size);
if(value->c_str()[0]==0x00){
*value=value->substr(1);
return s;
}
Slice value_log_slice=Slice(value->c_str()+1,value->length());
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id");
res=GetVarint64(&value_log_slice,&valuelog_offset);
if(!res)return Status::Corruption("can't decode valuelog offset");
res=GetVarint64(&value_log_slice,&valuelog_len);
if(!res)return Status::Corruption("can't decode valuelog len");
ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice);
*value=std::string(value_log_slice.data(),value_log_slice.size());
return s; return s;
} }
@ -1297,6 +1300,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch,this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
@ -1556,25 +1560,54 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
//lock
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG";
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
uint64_t offset=valueFile.tellp();
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
for(int i=0;i<values.size();i++){
int len=values[i].size();
valueFile.write(values[i].data(),len);
res.push_back({valuelogfile_number_,{offset,len}});
offset+=len;
}
//unlock
valueFile.close();
return res;
}
void DBImpl::addNewValueLog(){
//lock
valuelogfile_number_=versions_->NewFileNumber();
//unlock
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
//lock_shared
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG";
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) {
std::cerr << "Failed to open file for writing!" << std::endl;
return Status::Corruption("Failed to open file for writing!");
}
inFile.seekg(offset);
char *value_buf=new char[len];
inFile.read(value_buf,len);
inFile.close();
value=new Slice(value_buf,len);
return Status::OK();
//unlock_shared
}
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch;
std::ofstream valueFile("tmp.txt", std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
std::cerr << "Failed to open file for writing!" << std::endl;
return Status::Corruption("Failed to open file for writing!");
}
uint64_t offset=valueFile.tellp();
valueFile.write(value.data(),value.size());
valueFile.close();
auto value_len=value.size();
char *new_value_buf=new char[sizeof(uint64_t)+sizeof(size_t)];
memcpy(new_value_buf,(void*)(&offset),sizeof(uint64_t));
memcpy(new_value_buf+sizeof(uint64_t),(void*)(&value_len),sizeof(size_t));
Slice new_value=Slice(new_value_buf,sizeof(uint64_t)+sizeof(size_t));
batch.Put(key, new_value);
batch.Put(key, value);
return Write(opt, &batch); return Write(opt, &batch);
} }

+ 8
- 1
db/db_impl.h View File

@ -9,6 +9,8 @@
#include <deque> #include <deque>
#include <set> #include <set>
#include <string> #include <string>
#include <mutex>
#include <shared_mutex>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_writer.h" #include "db/log_writer.h"
@ -60,6 +62,9 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override; bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
void addNewValueLog()override;
Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -184,13 +189,15 @@ class DBImpl : public DB {
// State below is protected by mutex_ // State below is protected by mutex_
port::Mutex mutex_; port::Mutex mutex_;
//std::shared_mutex value_log_mutex;
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_; MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
uint64_t logfile_number_;
uint64_t valuelogfile_number_ GUARDED_BY(mutex_);
log::Writer* log_; log::Writer* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.

+ 14
- 7
db/db_iter.cc View File

@ -69,13 +69,20 @@ class DBIter : public Iterator {
Slice value() const override { Slice value() const override {
assert(valid_); assert(valid_);
auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_;
std::ifstream inFile("tmp.txt", std::ios::in | std::ios::binary);
uint64_t value_offset=*(uint64_t*)(tmp_value.data());
size_t value_size=*(size_t*)(tmp_value.data()+sizeof(uint64_t));
inFile.seekg(value_offset);
char *value_buf=new char[value_size];
inFile.read(value_buf,value_size);
return Slice(value_buf,value_size);
if(tmp_value.data()[0]==0x00){
tmp_value.remove_prefix(1);
return tmp_value;
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&tmp_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_len);
if(!res)assert(0);
db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
return tmp_value;
} }
Status status() const override { Status status() const override {
if (status_.ok()) { if (status_.ok()) {

+ 36
- 0
db/write_batch.cc View File

@ -127,6 +127,34 @@ class MemTableInserter : public WriteBatch::Handler {
sequence_++; sequence_++;
} }
}; };
class ValueLogInserter : public WriteBatch::Handler {
public:
WriteBatch writeBatch_;
DB* db_;
void Put(const Slice& key, const Slice& value) override {
Slice new_value;
std::string buf;
if(value.size()<100){
buf+=(char)(0x00);
buf.append(value.data(),value.size());
}
else{
buf+=(char)(0x01);
std::vector<Slice> v;
v.push_back(value);
auto res=db_->WriteValueLog(v);
PutVarint64(&buf,res[0].first);
PutVarint64(&buf,res[0].second.first);
PutVarint64(&buf,res[0].second.second);
}
new_value=Slice(buf);
writeBatch_.Put(key,new_value);
}
void Delete(const Slice& key) override {
writeBatch_.Delete(key);
}
};
} // namespace } // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
@ -136,6 +164,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){
ValueLogInserter inserter;
inserter.writeBatch_=WriteBatch();
auto res=b->Iterate(&inserter);
*b=inserter.writeBatch_;
return res;
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader); assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());

+ 2
- 0
db/write_batch_internal.h View File

@ -37,6 +37,8 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static Status ConverToValueLog(WriteBatch* batch,DB* db_);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);
}; };

+ 13
- 0
include/leveldb/db.h View File

@ -102,6 +102,19 @@ class LEVELDB_EXPORT DB {
virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys); virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys);
virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
assert(0);
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
return v;
}
virtual void addNewValueLog(){assert(0);}
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
assert(0); // Not implemented
return Status::Corruption("not imp");
}
// Return a heap-allocated iterator over the contents of the database. // Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it). // call one of the Seek methods on the iterator before using it).

Loading…
Cancel
Save