Browse Source

update imp 2

pull/2/head
alexfisher 9 months ago
parent
commit
6b1da0d8b4
6 changed files with 201 additions and 78 deletions
  1. +84
    -8
      db/db_impl.cc
  2. +6
    -2
      db/db_impl.h
  3. +5
    -0
      db/filename.cc
  4. +2
    -0
      db/filename.h
  5. +5
    -2
      db/write_batch.cc
  6. +99
    -66
      test/test.cpp

+ 84
- 8
db/db_impl.cc View File

@ -83,6 +83,10 @@ struct DBImpl::CompactionState {
WritableFile* outfile; WritableFile* outfile;
TableBuilder* builder; TableBuilder* builder;
WritableFile* valuelogfile;
uint64_t valuelog_offset=0;
uint64_t valuelog_file_id=0;
uint64_t total_bytes; uint64_t total_bytes;
}; };
@ -816,6 +820,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.smallest.Clear(); out.smallest.Clear();
out.largest.Clear(); out.largest.Clear();
compact->outputs.push_back(out); compact->outputs.push_back(out);
compact->valuelog_file_id=versions_->NewFileNumber();
mutex_.Unlock(); mutex_.Unlock();
} }
@ -825,6 +830,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) { if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile); compact->builder = new TableBuilder(options_, compact->outfile);
} }
compact->valuelog_offset=0;
s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile);
return s; return s;
} }
@ -860,6 +870,19 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
} }
delete compact->outfile; delete compact->outfile;
compact->outfile = nullptr; compact->outfile = nullptr;
if (s.ok()) {
s = compact->valuelogfile->Flush();
}
if (s.ok()) {
s = compact->valuelogfile->Sync();
}
if (s.ok()) {
s = compact->valuelogfile->Close();
}
delete compact->valuelogfile;
compact->valuelogfile=nullptr;
compact->valuelog_file_id=0;
compact->valuelog_offset=0;
if (s.ok() && current_entries > 0) { if (s.ok() && current_entries > 0) {
// Verify that the table is usable // Verify that the table is usable
@ -1004,7 +1027,34 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key); compact->current_output()->smallest.DecodeFrom(key);
} }
compact->current_output()->largest.DecodeFrom(key); compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
Slice old_value=input->value();
Slice new_value;
std::string buf="";
if(old_value.data()[0]==(char)(0x00)){
new_value=old_value;
}
else{
uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&old_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_offset);
if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_len);
if(!res)assert(0);
Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
assert(s.ok());
writeValueLogForCompaction(compact->valuelogfile,{new_value});
buf+=(char)(0x01);
PutVarint64(&buf,compact->valuelog_file_id);
PutVarint64(&buf,compact->valuelog_offset);
PutVarint64(&buf,valuelog_len);
compact->valuelog_offset+=valuelog_len;
delete []new_value.data();
new_value=Slice(buf);
}
compact->builder->Add(key, new_value);
// Close output file if it is big enough // Close output file if it is big enough
if (compact->builder->FileSize() >= if (compact->builder->FileSize() >=
@ -1168,6 +1218,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
return s; return s;
} }
Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice value_log_slice=Slice(value->c_str()+1,value->length());
Slice new_value;
uint64_t file_id,valuelog_offset,valuelog_len; uint64_t file_id,valuelog_offset,valuelog_len;
bool res=GetVarint64(&value_log_slice,&file_id); bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id"); if(!res)return Status::Corruption("can't decode file id");
@ -1175,8 +1226,9 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if(!res)return Status::Corruption("can't decode valuelog offset"); if(!res)return Status::Corruption("can't decode valuelog offset");
res=GetVarint64(&value_log_slice,&valuelog_len); res=GetVarint64(&value_log_slice,&valuelog_len);
if(!res)return Status::Corruption("can't decode valuelog len"); if(!res)return Status::Corruption("can't decode valuelog len");
ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice);
*value=std::string(value_log_slice.data(),value_log_slice.size());
ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
*value=std::string(new_value.data(),new_value.size());
delete []new_value.data();
return s; return s;
} }
@ -1406,7 +1458,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
RecordBackgroundError(s); RecordBackgroundError(s);
} }
delete logfile_; delete logfile_;
addNewValueLog();
logfile_ = lfile; logfile_ = lfile;
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_ = new log::Writer(lfile); log_ = new log::Writer(lfile);
@ -1502,7 +1554,17 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
} }
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){ std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
//lock //lock
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG";
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valuelogfile_->Append(values[i]);
// res.push_back({valuelogfile_number_,{valuelogfile_offset,len}});
// valuelogfile_offset+=len;
// }
// //unlock
// valuelogfile_->Flush();
// return res;
std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) { if (!valueFile.is_open()) {
assert(0); assert(0);
@ -1519,16 +1581,30 @@ std::vector>> DBImpl::WriteValue
valueFile.close(); valueFile.close();
return res; return res;
} }
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){
for(int i=0;i<values.size();i++){
target_file->Append(values[i]);
}
}
void DBImpl::addNewValueLog(){ void DBImpl::addNewValueLog(){
//lock //lock
// if(valuelogfile_){
// valuelogfile_->Sync();
// valuelogfile_->Close();
// delete valuelogfile_;
// }
valuelogfile_number_=versions_->NewFileNumber(); valuelogfile_number_=versions_->NewFileNumber();
// valuelogfile_offset=0;
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// env_->NewWritableFile(file_name_,&valuelogfile_);
//unlock //unlock
} }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
//lock_shared //lock_shared
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG";
std::string file_name_=ValueLogFileName(dbname_,file_id);
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) { if (!inFile.is_open()) {
std::cerr << "Failed to open file for writing!" << std::endl; std::cerr << "Failed to open file for writing!" << std::endl;
@ -1538,9 +1614,8 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice
char *value_buf=new char[len]; char *value_buf=new char[len];
inFile.read(value_buf,len); inFile.read(value_buf,len);
inFile.close(); inFile.close();
value=new Slice(value_buf,len);
*value=Slice(value_buf,len);
return Status::OK(); return Status::OK();
//unlock_shared
} }
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
@ -1581,6 +1656,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref(); impl->mem_->Ref();
impl->addNewValueLog();
} }
} }
if (s.ok() && save_manifest) { if (s.ok() && save_manifest) {

+ 6
- 2
db/db_impl.h View File

@ -63,7 +63,9 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override; std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
void addNewValueLog()override;
void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value);
void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);;
std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -196,8 +198,10 @@ class DBImpl : public DB {
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_; WritableFile* logfile_;
WritableFile* valuelogfile_;
int valuelogfile_offset=0;
uint64_t logfile_number_; uint64_t logfile_number_;
uint64_t valuelogfile_number_ GUARDED_BY(mutex_);
uint64_t valuelogfile_number_;
log::Writer* log_; log::Writer* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.

+ 5
- 0
db/filename.cc View File

@ -30,6 +30,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) {
return MakeFileName(dbname, number, "log"); return MakeFileName(dbname, number, "log");
} }
std::string ValueLogFileName(const std::string& dbname, uint64_t number){
assert(number > 0);
return MakeFileName(dbname, number, "valuelog");
}
std::string TableFileName(const std::string& dbname, uint64_t number) { std::string TableFileName(const std::string& dbname, uint64_t number) {
assert(number > 0); assert(number > 0);
return MakeFileName(dbname, number, "ldb"); return MakeFileName(dbname, number, "ldb");

+ 2
- 0
db/filename.h View File

@ -33,6 +33,8 @@ enum FileType {
// "dbname". // "dbname".
std::string LogFileName(const std::string& dbname, uint64_t number); std::string LogFileName(const std::string& dbname, uint64_t number);
std::string ValueLogFileName(const std::string& dbname, uint64_t number);
// Return the name of the sstable with the specified number // Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with // in the db named by "dbname". The result will be prefixed with
// "dbname". // "dbname".

+ 5
- 2
db/write_batch.cc View File

@ -131,6 +131,10 @@ class ValueLogInserter : public WriteBatch::Handler {
public: public:
WriteBatch writeBatch_; WriteBatch writeBatch_;
DB* db_; DB* db_;
ValueLogInserter(WriteBatch writeBatch,DB* db){
writeBatch_=writeBatch;
db_=db;
}
void Put(const Slice& key, const Slice& value) override { void Put(const Slice& key, const Slice& value) override {
Slice new_value; Slice new_value;
@ -165,8 +169,7 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
} }
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){
ValueLogInserter inserter;
inserter.writeBatch_=WriteBatch();
ValueLogInserter inserter(WriteBatch(),db_);
auto res=b->Iterate(&inserter); auto res=b->Iterate(&inserter);
*b=inserter.writeBatch_; *b=inserter.writeBatch_;
return res; return res;

+ 99
- 66
test/test.cpp View File

@ -8,9 +8,6 @@ using namespace leveldb;
using Field=std::pair<Slice,Slice>; using Field=std::pair<Slice,Slice>;
using FieldArray=std::vector<std::pair<Slice, Slice>>; using FieldArray=std::vector<std::pair<Slice, Slice>>;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
Status OpenDB(std::string dbName, DB **db) { Status OpenDB(std::string dbName, DB **db) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
@ -26,9 +23,13 @@ bool CompareFieldArray(const FieldArray &a, const FieldArray &b) {
} }
bool CompareKey(const std::vector<std::string> a, std::vector<std::string> b) { bool CompareKey(const std::vector<std::string> a, std::vector<std::string> b) {
if (a.size() != b.size()) return false;
if (a.size() != b.size()){
return false;
}
for (size_t i = 0; i < a.size(); ++i) { for (size_t i = 0; i < a.size(); ++i) {
if (a[i] != b[i]) return false;
if (a[i] != b[i]){
return false;
}
} }
return true; return true;
} }
@ -43,7 +44,7 @@ std::string SerializeValue(const FieldArray& fields){
return res_; return res_;
} }
// 反序列化为字段数组
// 鍙嶅簭鍒楀寲涓哄瓧娈垫暟缁?
void DeserializeValue(const std::string& value_str,FieldArray* res){ void DeserializeValue(const std::string& value_str,FieldArray* res){
Slice slice=Slice(value_str.c_str()); Slice slice=Slice(value_str.c_str());
uint64_t siz; uint64_t siz;
@ -83,75 +84,107 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st
return Status::OK(); return Status::OK();
} }
TEST(Test, CheckGetFields) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
std::string key2 = "k_2";
// TEST(Test, CheckGetFields) {
// DB *db;
// WriteOptions writeOptions;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::string key1 = "k_1";
// std::string key2 = "k_2";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
FieldArray fields2 = {
{"name", "Customer#000000001"},
{"address", "abc"},
{"phone", "def"}
};
auto value1=SerializeValue(fields1);
auto value2=SerializeValue(fields2);
db->Put(WriteOptions(), key1, value1);
db->Put(WriteOptions(), key2, value2);
// 读取并反序列化
std::string value_ret;
FieldArray res1;
db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1);
for(auto pr:res1){
std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
}
ASSERT_TRUE(CompareFieldArray(fields1, res1));
// FieldArray fields1 = {
// {"name", "Customer#000000001"},
// {"address", "IVhzIApeRb"},
// {"phone", "25-989-741-2988"}
// };
// FieldArray fields2 = {
// {"name", "Customer#000000001"},
// {"address", "abc"},
// {"phone", "def"}
// };
// auto value1=SerializeValue(fields1);
// auto value2=SerializeValue(fields2);
// db->Put(WriteOptions(), key1, value1);
// db->Put(WriteOptions(), key2, value2);
// // 璇诲彇骞跺弽搴忓垪鍖?
// std::string value_ret;
// FieldArray res1;
// db->Get(ReadOptions(), key1, &value_ret);
// DeserializeValue(value_ret, &res1);
// for(auto pr:res1){
// std::cout<<std::string(pr.first.data(),pr.first.size())<<" "<<std::string(pr.second.data(),pr.second.size())<<"\n";
// }
// ASSERT_TRUE(CompareFieldArray(fields1, res1));
std::cout<<"get serialized value done"<<std::endl;
delete db;
// std::cout<<"get serialized value done"<<std::endl;
// delete db;
// }
// TEST(Test, CheckSearchKey) {
// DB *db;
// ReadOptions readOptions;
// if(OpenDB("testdb_for_XOY", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// std::string key1 = "k_1";
// std::string key2 = "k_2";
// FieldArray fields1 = {
// {"name", "Customer#000000001"},
// {"address", "IVhzIApeRb"},
// {"phone", "25-989-741-2988"}
// };
// std::vector<std::string> keys = {key1, key2};
// std::vector<std::string> key_res;
// Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res);
// for(auto s:key_res)std::cout<<s<<"\n";
// ASSERT_TRUE(CompareKey(key_res, keys));
}
// std::cout<<"get key by field done"<<std::endl;
// delete db;
// }
TEST(Test, CheckSearchKey) {
TEST(Test, LARGE_DATA_COMPACT_TEST) {
DB *db; DB *db;
WriteOptions writeOptions;
ReadOptions readOptions; ReadOptions readOptions;
if(OpenDB("testdb_for_XOY", &db).ok() == false) {
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
std::string key1 = "k_1";
std::string key2 = "k_2";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
std::vector<std::string> keys = {key1, key2};
std::vector<std::string> key_res;
Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res);
for(auto s:key_res)std::cout<<s<<"\n";
ASSERT_TRUE(CompareKey(key_res, keys));
std::cout<<"get key by field done"<<std::endl;
std::vector<std::string> values;
for(int i=0;i<1000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1000;j++){
value+=std::to_string(i);
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
for(int i=0;i<1000;i++){
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
if(values[i]!=value){
std::cout<<value.size()<<std::endl;
assert(0);
}
ASSERT_TRUE(values[i]==value);
}
delete db; delete db;
} }

Loading…
Cancel
Save