diff --git a/db/db_impl.cc b/db/db_impl.cc index 6402fc0..3663fbe 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -157,8 +157,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) valuelog_cache(NewLRUCache(config::mem_value_log_number)), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) { - InitializeExistingLogs(); - // std::cout<<"init map"<= 100 bytes, read from external file uint64_t file_id, valuelog_offset; - std::string file_id_str = value.ToString().substr(1, 8); - Slice file_id_slice(file_id_str); - bool res = GetVarint64(&file_id_slice, &file_id); + value.remove_prefix(1); + bool res = GetVarint64(&value, &file_id); if (!res) return Status::Corruption("can't decode file id"); - if(valuelog_origin[file_id] == 0){ + if(valuelog_origin[file_id] == 0){//??? valuelog_origin[file_id] = valuelog_usage[file_id]; } valuelog_usage[file_id]--; - // std::cout << "file_id: " << file_id << " usage: " << valuelog_usage[file_id] << std::endl; } } } else if (ikey.type == kTypeDeletion && @@ -1148,9 +1144,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); - // for(int i=0;iLastSequence(); // Collect together all needed child iterators @@ -1201,14 +1193,13 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); *seed = ++seed_; - mutex_.Unlock(); return internal_iter; } Iterator* DBImpl::TEST_NewInternalIterator() { SequenceNumber ignored; uint32_t ignored_seed; - return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); + return NewIterator(ReadOptions()); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { @@ -1289,6 +1280,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; int iter_num=24; + + mutex_.Lock(); + Iterator* iter_prefetch = NewInternalIterator(options, &latest_snapshot, &seed); auto db_iter_prefetch=NewDBIterator(this, user_comparator(), iter_prefetch, (options.snapshot != nullptr @@ -1307,7 +1301,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { : latest_snapshot), seed); - + mutex_.Unlock(); + return NewPreFetchIterator(this,db_iter,db_iter_prefetch,iter_num); } @@ -1631,87 +1626,75 @@ std::vector> DBImpl::WriteValueLog( assert(0); } valueFile.seekg(0, std::ios::end); // 移动到文件末尾 - uint64_t offset = valueFile.tellg(); + uint64_t init_offset = valueFile.tellg(); // 如果超出fixed_size - if(offset>=config::value_log_size){ - int file_capacity=ReadFileSize(valuelogfile_number_); - // std::cout<<"file_capacity: "<=config::value_log_size){ + valuelog_usage[valuelogfile_number_]=init_offset; + valuelog_origin[valuelogfile_number_]=init_offset; addNewValueLog(); valueFile.close(); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); - if (!valueFile.is_open()) { - assert(0); - } valueFile.seekg(0, std::ios::end); // 移动到文件末尾 - offset = valueFile.tellg(); + init_offset = valueFile.tellg(); } uint64_t file_data_size = 0; // 文件数据大小标志位 valueFile.seekg(0, std::ios::beg); valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); valueFile.clear(); // 清除错误状态 + + //update length first + file_data_size+=kv.size(); + valueFile.seekp(0, std::ios::beg); // 移动到文件开头 + valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + assert(valueFile.good()); + valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入 // std::cout<<"file_data_size: "<> res; - for (const auto& [key_slice, value_slice] : kv) { - // 写入 value 的长度 - uint64_t value_len = value_slice.size(); - valueFile.write(reinterpret_cast(&value_len), - sizeof(uint64_t)); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } + int total_size=0; + total_size+=sizeof(uint64_t)*2*kv.size(); + for(const auto &pr:kv){ + total_size+=pr.first.size()+pr.second.size(); + } + char buf[total_size]; + + uint64_t offset=0; + for (const auto& pr:kv) { + + // 记录 file_id 和 offset + res.push_back({valuelogfile_number_, init_offset+offset}); + + auto key=pr.first,value=pr.second; + + // 写入 value 的长度 + uint64_t value_len = value.size(); + memcpy(buf+offset,&value_len,sizeof(uint64_t)); + offset+=sizeof(uint64_t); // 写入 value 本身 - valueFile.write(value_slice.data(), value_len); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } + memcpy(buf+offset,value.data(),value_len); + offset+=value_len; // 写入 key 的长度 - uint64_t key_len = key_slice.size(); - valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } + uint64_t key_len = key.size(); + memcpy(buf+offset,&key_len,sizeof(uint64_t)); + offset+=sizeof(uint64_t); // 写入 key 本身 - valueFile.write(key_slice.data(), key_len); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } + memcpy(buf+offset,key.data(),key_len); + offset+=key_len; - // 更新文件数据大小 - file_data_size ++; - // 记录 file_id 和 offset - res.push_back({valuelogfile_number_, offset}); + // 更新偏移量 - offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; } - // 在所有数据写入后,将更新的数据大小写回文件开头 - if (!res.empty()) { - valueFile.seekp(0, std::ios::beg); // 移动到文件开头 - valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); - if (!valueFile.good()) { - valueFile.close(); - assert(0); - } - } - else{ - valueFile.close(); - assert(0); - } + valueFile.write(buf,total_size); + assert(valueFile.good()); // 解锁资源或进行其他清理操作 valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 @@ -1816,12 +1799,10 @@ void DBImpl::GarbageCollect() { continue; } auto tmp_name = ValueLogFileName(dbname_, cur_log_number); - // std::cout <checkOldValueLog(tmp_name) && valuelog_origin[cur_log_number]) { - if ((float)(valuelog_usage[cur_log_number] / - valuelog_origin[cur_log_number]) <= 0.6) { + if (((float)valuelog_usage[cur_log_number]) / + valuelog_origin[cur_log_number] <= 0.6) { valuelog_set.emplace(filename); } } @@ -1889,12 +1870,7 @@ void DBImpl::GarbageCollect() { cur_valuelog.seekg(current_offset); char* key_buf = new char[key_len]; cur_valuelog.read(key_buf, key_len); - if (!cur_valuelog.good()) { - delete[] key_buf; - cur_valuelog.close(); - std::cerr << "2Failed to read file: " << valuelog_name << std::endl; - break; - } + assert(cur_valuelog.good()); current_offset += key_len; // std::cout << cnt <<" "<log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); - impl->addNewValueLog(); } } if (s.ok() && save_manifest) { @@ -2039,6 +2004,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { if (s.ok()) { impl->RemoveObsoleteFiles(); impl->MaybeScheduleCompaction(); + impl->InitializeExistingLogs(); + impl->addNewValueLog(); } impl->mutex_.Unlock(); if (s.ok()) { @@ -2100,11 +2067,29 @@ void DBImpl::InitializeExistingLogs() { if (IsValueLogFile(filename)) { uint64_t cur_log_number = GetValueLogID(filename); uint64_t file_data_size = ReadFileSize(cur_log_number); - valuelog_usage.emplace(cur_log_number,file_data_size); - // std::cout << "cur_log_number: " << cur_log_number << " file_data_size: " << file_data_size << std::endl; + valuelog_origin[cur_log_number]=file_data_size; + valuelog_usage[cur_log_number]=0; + } + } + SequenceNumber latest_snapshot; + uint32_t seed; + int iter_num=24; + Iterator* iter= NewInternalIterator(ReadOptions(), &latest_snapshot, &seed); + auto db_iter=NewDBIterator(this, user_comparator(), iter,latest_snapshot,seed); + for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){ + auto value=db_iter->value(); + if(value.size()&&value[0]==0x01){ + value.remove_prefix(1); + uint64_t valuelog_id=*(uint64_t*)value.data(); + assert(valuelog_usage.count(valuelog_id)); + valuelog_usage[valuelog_id]++; } } + + mutex_.Unlock(); + delete db_iter; + mutex_.Lock(); } // 读取单个文件的 file_data_size