Browse Source

v3 except gc

xxy
xxy 9 months ago
parent
commit
05fd39cd6b
1 changed files with 13 additions and 103 deletions
  1. +13
    -103
      db/db_impl.cc

+ 13
- 103
db/db_impl.cc View File

@ -57,7 +57,6 @@ struct DBImpl::CompactionState {
struct Output {
uint64_t number;
uint64_t file_size;
uint64_t valuelog_id;
InternalKey smallest, largest;
};
@ -84,10 +83,6 @@ struct DBImpl::CompactionState {
WritableFile* outfile;
TableBuilder* builder;
WritableFile* valuelogfile;
uint64_t valuelog_offset=0;
uint64_t valuelog_file_id=0;
uint64_t total_bytes;
};
@ -281,10 +276,6 @@ void DBImpl::RemoveObsoleteFiles() {
}
Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
static_cast<unsigned long long>(number));
if(oldvaluelog_ids.count(number)){
std::string valuelog_filename=ValueLogFileName(dbname_,oldvaluelog_ids[number]);
env_->RemoveFile(valuelog_filename);
}
}
}
}
@ -546,7 +537,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest,meta.valuelog_id);
meta.largest);
}
CompactionStats stats;
@ -750,7 +741,7 @@ void DBImpl::BackgroundCompaction() {
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest,f->valuelog_id);
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
@ -824,11 +815,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->valuelog_file_id=versions_->NewFileNumber();
out.valuelog_id=compact->valuelog_file_id;
compact->outputs.push_back(out);
mutex_.Unlock();
}
@ -838,11 +825,6 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
// compaction 后的新 valuelog
compact->valuelog_offset=0; // why
s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile);
return s;
}
@ -878,20 +860,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
}
delete compact->outfile;
compact->outfile = nullptr;
// value log 落盘
if (s.ok()) {
s = compact->valuelogfile->Flush();
}
if (s.ok()) {
s = compact->valuelogfile->Sync();
}
if (s.ok()) {
s = compact->valuelogfile->Close();
}
delete compact->valuelogfile;
compact->valuelogfile=nullptr;
compact->valuelog_file_id=0;
compact->valuelog_offset=0;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
@ -922,7 +890,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest,out.valuelog_id);
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -956,15 +924,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
// 记录 当前和下一层 level 中被合并的 files
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
auto tmp_file=compact->compaction->input(which, i);
if(tmp_file->valuelog_id){
oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id;
}
}
}
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// Prioritize immutable compaction work
if (has_imm_.load(std::memory_order_relaxed)) {
@ -1045,43 +1004,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
Slice old_value=input->value();
Slice new_value;
std::string buf="";
if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){
// when it is a deletion, input->value() will be ""
new_value=old_value;
}
else{
// not delete
old_value.remove_prefix(1);
// put value_len into value_log
int value_offset=sizeof(uint64_t)*2;// 16
// uint64_t file_id,valuelog_offset,valuelog_len;
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&old_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&old_value,&valuelog_offset);
if(!res)assert(0);
// res=GetVarint64(&old_value,&valuelog_len);
// if(!res)assert(0);
// Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
Status s=ReadValueLog(file_id,valuelog_offset,&new_value);
assert(s.ok());
writeValueLogForCompaction(compact->valuelogfile,{new_value});
buf+=(char)(0x01);
PutVarint64(&buf,compact->valuelog_file_id);
PutVarint64(&buf,compact->valuelog_offset);
// PutVarint64(&buf,valuelog_len);
compact->valuelog_offset+=value_offset;
delete []new_value.data();
new_value=Slice(buf);
}
compact->builder->Add(key, new_value);
// 更新计数器
// compact->builder->add_log_count();
compact->builder->Add(key, input->value());
// Close output file if it is big enough
if (compact->builder->FileSize() >=
@ -1105,14 +1028,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) {
status = input->status();
}
//not completely correct, should be written in new function, related to removeabsol...
// if(status.ok()){
// for(auto id:old_valuelog_ids){
// auto valuelog_filename=ValueLogFileName(dbname_,id);
// Status s=env_->RemoveFile(valuelog_filename);
// assert(s.ok());
// }
// }
delete input;
input = nullptr;
@ -1254,17 +1169,12 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
Slice value_log_slice=Slice(value->c_str()+1,value->length());
Slice new_value;
// put value_len into value_log
int value_offset=sizeof(uint64_t)*2;// 16
// uint64_t file_id,valuelog_offset,valuelog_len;
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&value_log_slice,&file_id);
if(!res)return Status::Corruption("can't decode file id");
res=GetVarint64(&value_log_slice,&valuelog_offset);
if(!res)return Status::Corruption("can't decode valuelog offset");
// res=GetVarint64(&value_log_slice,&valuelog_len);
// if(!res)return Status::Corruption("can't decode valuelog len");
// ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value);
ReadValueLog(file_id,valuelog_offset,&new_value);
*value=std::string(new_value.data(),new_value.size());
delete []new_value.data();
@ -1628,20 +1538,19 @@ std::vector> DBImpl::WriteValueLog(std::vector
valueFile.write(reinterpret_cast<const char*>(&len), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
return {}; // 写入长度失败,返回空结果
assert(0);
}
// 再写入实际数据
valueFile.write(slice.data(), len);
if (!valueFile.good()) {
valueFile.close();
return {}; // 写入数据失败,返回空结果
assert(0);
}
// 记录 file_id 和 offset
res.push_back({valuelogfile_number_, offset});
// 更新偏移量,包括长度信息
offset += sizeof(uint64_t) + len;
}
@ -1697,8 +1606,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) {
inFile.seekg(offset);
// Read the length of the value
uint64_t len;
inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
// uint64_t len;
// inFile.read(reinterpret_cast<char*>(&len), sizeof(uint64_t));
char *value_buf_len=new char[sizeof(uint64_t)];
inFile.read(value_buf_len,sizeof(uint64_t));
uint64_t len=0;
std::memcpy(&len, value_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
inFile.close();
return Status::Corruption("Failed to read length from file!");
@ -1720,10 +1634,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) {
// Assign the read data to the Slice
*value = Slice(value_buf, len);
// Clean up allocated buffer
// should also do in v2
delete[] value_buf;
return s;
}

Loading…
Cancel
Save