Browse Source

v3 compaction unfinish

xxy
xxy 9 months ago
parent
commit
60c257b829
6 changed files with 168 additions and 65 deletions
  1. +1
    -0
      db/builder.cc
  2. +142
    -55
      db/db_impl.cc
  3. +5
    -2
      db/db_impl.h
  4. +4
    -3
      db/db_iter.cc
  5. +3
    -2
      db/write_batch.cc
  6. +13
    -3
      include/leveldb/db.h

+ 1
- 0
db/builder.cc View File

@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
return s; return s;
} }
// 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value
{ {
auto tmp_value=iter->value(); auto tmp_value=iter->value();
if(tmp_value.data()[0]==(char)(0x01)){ if(tmp_value.data()[0]==(char)(0x01)){

+ 142
- 55
db/db_impl.cc View File

@ -839,8 +839,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
compact->builder = new TableBuilder(options_, compact->outfile); compact->builder = new TableBuilder(options_, compact->outfile);
} }
compact->valuelog_offset=0;
// compaction 后的新 valuelog
compact->valuelog_offset=0; // why
s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile);
return s; return s;
@ -878,6 +878,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
} }
delete compact->outfile; delete compact->outfile;
compact->outfile = nullptr; compact->outfile = nullptr;
// value log 落盘
if (s.ok()) { if (s.ok()) {
s = compact->valuelogfile->Flush(); s = compact->valuelogfile->Flush();
} }
@ -955,9 +956,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key; std::string current_user_key;
bool has_current_user_key = false; bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
// 记录 当前和下一层 level 中被合并的 files
for (int which = 0; which < 2; which++) { for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
if(compact->compaction->input(which, i)->valuelog_id)oldvaluelog_ids[compact->compaction->input(which, i)->number]=compact->compaction->input(which, i)->valuelog_id;
auto tmp_file=compact->compaction->input(which, i);
if(tmp_file->valuelog_id){
oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id;
}
} }
} }
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
@ -1044,31 +1049,39 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
Slice old_value=input->value(); Slice old_value=input->value();
Slice new_value; Slice new_value;
std::string buf=""; std::string buf="";
if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){//when it is a deletion, input->value() will be ""
if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){
// when it is a deletion, input->value() will be ""
new_value=old_value; new_value=old_value;
} }
else{ else{
// not delete
old_value.remove_prefix(1); old_value.remove_prefix(1);
uint64_t file_id,valuelog_offset,valuelog_len;
// put value_len into value_log
int value_offset=sizeof(uint64_t)*2;// 16
// uint64_t file_id,valuelog_offset,valuelog_len;
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&old_value,&file_id); bool res=GetVarint64(&old_value,&file_id);
if(!res)assert(0); if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_offset); res=GetVarint64(&old_value,&valuelog_offset);
if(!res)assert(0); if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_len);
if(!res)assert(0);
Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
// res=GetVarint64(&old_value,&valuelog_len);
// if(!res)assert(0);
// Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
Status s=ReadValueLog(file_id,valuelog_offset,&new_value);
assert(s.ok()); assert(s.ok());
writeValueLogForCompaction(compact->valuelogfile,{new_value}); writeValueLogForCompaction(compact->valuelogfile,{new_value});
buf+=(char)(0x01); buf+=(char)(0x01);
PutVarint64(&buf,compact->valuelog_file_id); PutVarint64(&buf,compact->valuelog_file_id);
PutVarint64(&buf,compact->valuelog_offset); PutVarint64(&buf,compact->valuelog_offset);
PutVarint64(&buf,valuelog_len);
compact->valuelog_offset+=valuelog_len;
// PutVarint64(&buf,valuelog_len);
compact->valuelog_offset+=value_offset;
delete []new_value.data(); delete []new_value.data();
new_value=Slice(buf); new_value=Slice(buf);
} }
compact->builder->Add(key, new_value); compact->builder->Add(key, new_value);
// 更新计数器
// compact->builder->add_log_count();
// Close output file if it is big enough // Close output file if it is big enough
if (compact->builder->FileSize() >= if (compact->builder->FileSize() >=
@ -1241,14 +1254,18 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
} }
Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice value_log_slice=Slice(value->c_str()+1,value->length());
Slice new_value; Slice new_value;
uint64_t file_id,valuelog_offset,valuelog_len;
// put value_len into value_log
int value_offset=sizeof(uint64_t)*2;// 16
// uint64_t file_id,valuelog_offset,valuelog_len;
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&value_log_slice,&file_id); bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id"); if(!res)return Status::Corruption("can't decode file id");
res=GetVarint64(&value_log_slice,&valuelog_offset); res=GetVarint64(&value_log_slice,&valuelog_offset);
if(!res)return Status::Corruption("can't decode valuelog offset"); if(!res)return Status::Corruption("can't decode valuelog offset");
res=GetVarint64(&value_log_slice,&valuelog_len);
if(!res)return Status::Corruption("can't decode valuelog len");
ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
// res=GetVarint64(&value_log_slice,&valuelog_len);
// if(!res)return Status::Corruption("can't decode valuelog len");
// ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
ReadValueLog(file_id,valuelog_offset,&new_value);
*value=std::string(new_value.data(),new_value.size()); *value=std::string(new_value.data(),new_value.size());
delete []new_value.data(); delete []new_value.data();
return s; return s;
@ -1574,35 +1591,66 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
//lock
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valuelogfile_->Append(values[i]);
// res.push_back({valuelogfile_number_,{valuelogfile_offset,len}});
// valuelogfile_offset+=len;
// }
// //unlock
// valuelogfile_->Flush();
// return res;
std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
// if (!valueFile.is_open()) {
// assert(0);
// }
// uint64_t offset=valueFile.tellp();
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
// for(int i=0;i<values.size();i++){
// int len=values[i].size();
// valueFile.write(values[i].data(),len);
// res.push_back({valuelogfile_number_,{offset,len}});
// offset+=len;
// }
// //unlock
// valueFile.close();
// return res;
// }
std::vector<std::pair<uint64_t, uint64_t>> DBImpl::WriteValueLog(std::vector<Slice> values) {
std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) { if (!valueFile.is_open()) {
assert(0);
assert(0);
}
uint64_t offset = valueFile.tellp();
std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& slice : values) {
uint64_t len = slice.size();
// 先写入长度
valueFile.write(reinterpret_cast<const char*>(&len), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
return {}; // 写入长度失败,返回空结果
} }
uint64_t offset=valueFile.tellp();
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
for(int i=0;i<values.size();i++){
int len=values[i].size();
valueFile.write(values[i].data(),len);
res.push_back({valuelogfile_number_,{offset,len}});
offset+=len;
// 再写入实际数据
valueFile.write(slice.data(), len);
if (!valueFile.good()) {
valueFile.close();
return {}; // 写入数据失败,返回空结果
}
// 记录 file_id 和 offset
res.push_back({valuelogfile_number_, offset});
// 更新偏移量,包括长度信息
offset += sizeof(uint64_t) + len;
} }
//unlock
// 解锁资源或进行其他清理操作
valueFile.close(); valueFile.close();
return res; return res;
} }
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){ void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){
for(int i=0;i<values.size();i++){ for(int i=0;i<values.size();i++){
target_file->Append(values[i]); target_file->Append(values[i]);
@ -1610,34 +1658,73 @@ void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector
} }
void DBImpl::addNewValueLog(){ void DBImpl::addNewValueLog(){
//lock
// if(valuelogfile_){
// valuelogfile_->Sync();
// valuelogfile_->Close();
// delete valuelogfile_;
// }
valuelogfile_number_=versions_->NewFileNumber(); valuelogfile_number_=versions_->NewFileNumber();
// valuelogfile_offset=0;
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
// env_->NewWritableFile(file_name_,&valuelogfile_);
//unlock
} }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// //lock_shared
// Status s=Status::OK();
// std::string file_name_=ValueLogFileName(dbname_,file_id);
// //std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
// std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
// if (!inFile.is_open()) {
// std::cerr << "Failed to open file for writing!"<<file_id<<" "<<offset<<" "<<len<< std::endl;
// return Status::Corruption("Failed to open file for writing!");
// }
// inFile.seekg(offset);
// char *value_buf=new char[len];
// inFile.read(value_buf,len);
// // check available
// // s=check_value_available(value_buf);
// inFile.close();
// *value=Slice(value_buf,len);
// return s;
// }
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) {
//lock_shared //lock_shared
std::string file_name_=ValueLogFileName(dbname_,file_id);
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
Status s = Status::OK();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
// Open the file in binary mode for reading
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); std::ifstream inFile(file_name_, std::ios::in | std::ios::binary);
if (!inFile.is_open()) { if (!inFile.is_open()) {
std::cerr << "Failed to open file for writing!"<<file_id<<" "<<offset<<" "<<len<< std::endl;
return Status::Corruption("Failed to open file for writing!");
std::cerr << "Failed to open file: " << file_name_ << " for reading!" << std::endl;
return Status::Corruption("Failed to open file for reading!");
} }
// Seek to the position of len
inFile.seekg(offset); inFile.seekg(offset);
char *value_buf=new char[len];
inFile.read(value_buf,len);
// Read the length of the value
uint64_t len;
inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
if (!inFile.good()) {
inFile.close();
return Status::Corruption("Failed to read length from file!");
}
// Now seek to the actual data position and read the value
inFile.seekg(offset + sizeof(uint64_t));
char* value_buf = new char[len];
inFile.read(value_buf, len);
if (!inFile.good()) {
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
}
// Close the file after reading
inFile.close(); inFile.close();
*value=Slice(value_buf,len);
return Status::OK();
// Assign the read data to the Slice
*value = Slice(value_buf, len);
// Clean up allocated buffer
// should also do in v2
delete[] value_buf;
return s;
} }
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB

+ 5
- 2
db/db_impl.h View File

@ -63,11 +63,14 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override; bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value)override;
std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<Slice> value)override;
void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value); void writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> value);
void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);;
std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction std::pair<WritableFile*,uint64_t> getNewValuelog();//use for compaction
Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
// Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface

+ 4
- 3
db/db_iter.cc View File

@ -79,9 +79,10 @@ class DBIter : public Iterator {
if(!res)assert(0); if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset); res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0); if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_len);
if(!res)assert(0);
db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
// res=GetVarint64(&tmp_value,&valuelog_len);
// if(!res)assert(0);
// db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset,&tmp_value);
return tmp_value; return tmp_value;
} }
Status status() const override { Status status() const override {

+ 3
- 2
db/write_batch.cc View File

@ -149,8 +149,9 @@ class ValueLogInserter : public WriteBatch::Handler {
v.push_back(value); v.push_back(value);
auto res=db_->WriteValueLog(v); auto res=db_->WriteValueLog(v);
PutVarint64(&buf,res[0].first); PutVarint64(&buf,res[0].first);
PutVarint64(&buf,res[0].second.first);
PutVarint64(&buf,res[0].second.second);
// PutVarint64(&buf,res[0].second.first);
// PutVarint64(&buf,res[0].second.second);
PutVarint64(&buf,res[0].second);
} }
new_value=Slice(buf); new_value=Slice(buf);
writeBatch_.Put(key,new_value); writeBatch_.Put(key,new_value);

+ 13
- 3
include/leveldb/db.h View File

@ -102,19 +102,29 @@ class LEVELDB_EXPORT DB {
// virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys); // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys);
virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
// virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
// assert(0);
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
// return v;
// }
virtual std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<Slice> value){
assert(0); assert(0);
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
std::vector<std::pair<uint64_t,uint64_t>> v;
return v; return v;
} }
virtual void addNewValueLog(){assert(0);} virtual void addNewValueLog(){assert(0);}
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){
// assert(0); // Not implemented
// return Status::Corruption("not imp");
// }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){
assert(0); // Not implemented assert(0); // Not implemented
return Status::Corruption("not imp"); return Status::Corruption("not imp");
} }
// Return a heap-allocated iterator over the contents of the database. // Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it). // call one of the Seek methods on the iterator before using it).

Loading…
Cancel
Save