Browse Source

上传文件至 'db'

main
曹可心 10 months ago
parent
commit
40251a25f6
1 changed files with 189 additions and 3 deletions
  1. +189
    -3
      db/db_impl.cc

+ 189
- 3
db/db_impl.cc View File

@ -591,7 +591,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
for (int level = 0; level <= max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
@ -894,10 +894,170 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
// Status DBImpl::DoCompactionWork(CompactionState* compact) {
// const uint64_t start_micros = env_->NowMicros();
// int64_t imm_micros = 0; // Micros spent doing imm_ compactions
// Log(options_.info_log, "Compacting %d@%d + %d@%d files",
// compact->compaction->num_input_files(0), compact->compaction->level(),
// compact->compaction->num_input_files(1),
// compact->compaction->level() + 1);
// assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
// assert(compact->builder == nullptr);
// assert(compact->outfile == nullptr);
// if (snapshots_.empty()) {
// compact->smallest_snapshot = versions_->LastSequence();
// } else {
// compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
// }
// Iterator* input = versions_->MakeInputIterator(compact->compaction);
// // Release mutex while we're actually doing the compaction work
// mutex_.Unlock();
// input->SeekToFirst();
// Status status;
// ParsedInternalKey ikey;
// std::string current_user_key;
// bool has_current_user_key = false;
// SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
// while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
// // Prioritize immutable compaction work
// if (has_imm_.load(std::memory_order_relaxed)) {
// const uint64_t imm_start = env_->NowMicros();
// mutex_.Lock();
// if (imm_ != nullptr) {
// CompactMemTable();
// // Wake up MakeRoomForWrite() if necessary.
// background_work_finished_signal_.SignalAll();
// }
// mutex_.Unlock();
// imm_micros += (env_->NowMicros() - imm_start);
// }
// Slice key = input->key();
// if (compact->compaction->ShouldStopBefore(key) &&
// compact->builder != nullptr) {
// status = FinishCompactionOutputFile(compact, input);
// if (!status.ok()) {
// break;
// }
// }
// // Handle key/value, add to state, etc.
// bool drop = false;
// if (!ParseInternalKey(key, &ikey)) {
// // Do not hide error keys
// current_user_key.clear();
// has_current_user_key = false;
// last_sequence_for_key = kMaxSequenceNumber;
// } else {
// if (!has_current_user_key ||
// user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
// 0) {
// // First occurrence of this user key
// current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
// has_current_user_key = true;
// last_sequence_for_key = kMaxSequenceNumber;
// }
// if (last_sequence_for_key <= compact->smallest_snapshot) {
// // Hidden by an newer entry for same user key
// drop = true; // (A)
// } else if (ikey.type == kTypeDeletion &&
// ikey.sequence <= compact->smallest_snapshot &&
// compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// // For this user key:
// // (1) there is no data in higher levels
// // (2) data in lower levels will have larger sequence numbers
// // (3) data in layers that are being compacted here and have
// // smaller sequence numbers will be dropped in the next
// // few iterations of this loop (by rule (A) above).
// // Therefore this deletion marker is obsolete and can be dropped.
// drop = true;
// }
// last_sequence_for_key = ikey.sequence;
// }
// #if 0
// Log(options_.info_log,
// " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
// "%d smallest_snapshot: %d",
// ikey.user_key.ToString().c_str(),
// (int)ikey.sequence, ikey.type, kTypeValue, drop,
// compact->compaction->IsBaseLevelForKey(ikey.user_key),
// (int)last_sequence_for_key, (int)compact->smallest_snapshot);
// #endif
// if (!drop) {
// // Open output file if necessary
// if (compact->builder == nullptr) {
// status = OpenCompactionOutputFile(compact);
// if (!status.ok()) {
// break;
// }
// }
// if (compact->builder->NumEntries() == 0) {
// compact->current_output()->smallest.DecodeFrom(key);
// }
// compact->current_output()->largest.DecodeFrom(key);
// compact->builder->Add(key, input->value());
// // Close output file if it is big enough
// if (compact->builder->FileSize() >=
// compact->compaction->MaxOutputFileSize()) {
// status = FinishCompactionOutputFile(compact, input);
// if (!status.ok()) {
// break;
// }
// }
// }
// input->Next();
// }
// if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
// status = Status::IOError("Deleting DB during compaction");
// }
// if (status.ok() && compact->builder != nullptr) {
// status = FinishCompactionOutputFile(compact, input);
// }
// if (status.ok()) {
// status = input->status();
// }
// delete input;
// input = nullptr;
// CompactionStats stats;
// stats.micros = env_->NowMicros() - start_micros - imm_micros;
// for (int which = 0; which < 2; which++) {
// for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
// stats.bytes_read += compact->compaction->input(which, i)->file_size;
// }
// }
// for (size_t i = 0; i < compact->outputs.size(); i++) {
// stats.bytes_written += compact->outputs[i].file_size;
// }
// mutex_.Lock();
// stats_[compact->compaction->level() + 1].Add(stats);
// if (status.ok()) {
// status = InstallCompactionResults(compact);
// }
// if (!status.ok()) {
// RecordBackgroundError(status);
// }
// VersionSet::LevelSummaryStorage tmp;
// Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
// return status;
// }
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
@ -978,6 +1138,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
else{
Slice value = input->value();
std::string combined_str = value.ToString();
// 从右往左提取固定长度的字符串
std::string time_part = combined_str.substr(combined_str.length() - 19, 19);
// 解析时间字符串
std::tm parsed_tm = {};
const char* result = strptime(time_part.c_str(), "%Y-%m-%d %H:%M:%S", &parsed_tm);
// 将解析出的时间转为 time_t
std::time_t parsed_time_t = std::mktime(&parsed_tm);
// 获取当前时间
auto now = std::chrono::system_clock::now();
// 转换为 time_t
std::time_t now_time_t = std::chrono::system_clock::to_time_t(now);
if (parsed_time_t <= now_time_t)
{
drop = true;
std::string s = key.ToString();
}
// 心
}
last_sequence_for_key = ikey.sequence;
}
@ -990,7 +1175,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->compaction->IsBaseLevelForKey(ikey.user_key),
(int)last_sequence_for_key, (int)compact->smallest_snapshot);
#endif
if (!drop) {
// Open output file if necessary
if (compact->builder == nullptr) {
@ -1079,6 +1263,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
} // anonymous namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {

Loading…
Cancel
Save