diff --git a/db/db_impl.cc b/db/db_impl.cc index 82d9ce7..d3b1471 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,7 +57,6 @@ struct DBImpl::CompactionState { struct Output { uint64_t number; uint64_t file_size; - uint64_t valuelog_id; InternalKey smallest, largest; }; @@ -84,10 +83,6 @@ struct DBImpl::CompactionState { WritableFile* outfile; TableBuilder* builder; - WritableFile* valuelogfile; - uint64_t valuelog_offset=0; - uint64_t valuelog_file_id=0; - uint64_t total_bytes; }; @@ -281,10 +276,6 @@ void DBImpl::RemoveObsoleteFiles() { } Log(options_.info_log, "Delete type=%d #%lld\n", static_cast(type), static_cast(number)); - if(oldvaluelog_ids.count(number)){ - std::string valuelog_filename=ValueLogFileName(dbname_,oldvaluelog_ids[number]); - env_->RemoveFile(valuelog_filename); - } } } } @@ -546,7 +537,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest,meta.valuelog_id); + meta.largest); } CompactionStats stats; @@ -750,7 +741,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest,f->valuelog_id); + f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -824,11 +815,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); - compact->valuelog_file_id=versions_->NewFileNumber(); - out.valuelog_id=compact->valuelog_file_id; - compact->outputs.push_back(out); - mutex_.Unlock(); } @@ -838,11 +825,6 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->builder = new TableBuilder(options_, compact->outfile); } - - // compaction 后的新 valuelog - compact->valuelog_offset=0; // why - s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); - return s; } @@ -878,20 +860,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; - // value log 落盘 - if (s.ok()) { - s = compact->valuelogfile->Flush(); - } - if (s.ok()) { - s = compact->valuelogfile->Sync(); - } - if (s.ok()) { - s = compact->valuelogfile->Close(); - } - delete compact->valuelogfile; - compact->valuelogfile=nullptr; - compact->valuelog_file_id=0; - compact->valuelog_offset=0; if (s.ok() && current_entries > 0) { // Verify that the table is usable @@ -922,7 +890,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest,out.valuelog_id); + out.smallest, out.largest); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -956,15 +924,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - // 记录 当前和下一层 level 中被合并的 files - for (int which = 0; which < 2; which++) { - for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - auto tmp_file=compact->compaction->input(which, i); - if(tmp_file->valuelog_id){ - oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id; - } - } - } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { @@ -1045,43 +1004,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - - Slice old_value=input->value(); - Slice new_value; - std::string buf=""; - if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){ - // when it is a deletion, input->value() will be "" - new_value=old_value; - } - else{ - // not delete - old_value.remove_prefix(1); - // put value_len into value_log - int value_offset=sizeof(uint64_t)*2;// 16 - // uint64_t file_id,valuelog_offset,valuelog_len; - uint64_t file_id,valuelog_offset; - bool res=GetVarint64(&old_value,&file_id); - if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_offset); - if(!res)assert(0); - // res=GetVarint64(&old_value,&valuelog_len); - // if(!res)assert(0); - // Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); - Status s=ReadValueLog(file_id,valuelog_offset,&new_value); - assert(s.ok()); - writeValueLogForCompaction(compact->valuelogfile,{new_value}); - buf+=(char)(0x01); - PutVarint64(&buf,compact->valuelog_file_id); - PutVarint64(&buf,compact->valuelog_offset); - // PutVarint64(&buf,valuelog_len); - compact->valuelog_offset+=value_offset; - delete []new_value.data(); - new_value=Slice(buf); - } - - compact->builder->Add(key, new_value); - // 更新计数器 - // compact->builder->add_log_count(); + compact->builder->Add(key, input->value()); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1105,14 +1028,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } - //not completely correct, should be written in new function, related to removeabsol... - // if(status.ok()){ - // for(auto id:old_valuelog_ids){ - // auto valuelog_filename=ValueLogFileName(dbname_,id); - // Status s=env_->RemoveFile(valuelog_filename); - // assert(s.ok()); - // } - // } delete input; input = nullptr; @@ -1254,17 +1169,12 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice new_value; - // put value_len into value_log int value_offset=sizeof(uint64_t)*2;// 16 - // uint64_t file_id,valuelog_offset,valuelog_len; uint64_t file_id,valuelog_offset; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); res=GetVarint64(&value_log_slice,&valuelog_offset); if(!res)return Status::Corruption("can't decode valuelog offset"); - // res=GetVarint64(&value_log_slice,&valuelog_len); - // if(!res)return Status::Corruption("can't decode valuelog len"); - // ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); ReadValueLog(file_id,valuelog_offset,&new_value); *value=std::string(new_value.data(),new_value.size()); delete []new_value.data(); @@ -1628,20 +1538,19 @@ std::vector> DBImpl::WriteValueLog(std::vector(&len), sizeof(uint64_t)); if (!valueFile.good()) { valueFile.close(); - return {}; // 写入长度失败,返回空结果 + assert(0); } // 再写入实际数据 valueFile.write(slice.data(), len); if (!valueFile.good()) { valueFile.close(); - return {}; // 写入数据失败,返回空结果 + assert(0); } // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); - // 更新偏移量,包括长度信息 offset += sizeof(uint64_t) + len; } @@ -1697,8 +1606,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { inFile.seekg(offset); // Read the length of the value - uint64_t len; - inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); + // uint64_t len; + // inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); + char *value_buf_len=new char[sizeof(uint64_t)]; + inFile.read(value_buf_len,sizeof(uint64_t)); + uint64_t len=0; + std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + if (!inFile.good()) { inFile.close(); return Status::Corruption("Failed to read length from file!"); @@ -1720,10 +1634,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { // Assign the read data to the Slice *value = Slice(value_buf, len); - // Clean up allocated buffer - // should also do in v2 - delete[] value_buf; - return s; }