From 74c80876cd7c25907b07c73c72032e8939252b28 Mon Sep 17 00:00:00 2001 From: xhguo <10211900416@stu.ecnu.edu.cn> Date: Thu, 7 Nov 2024 06:48:38 +0800 Subject: [PATCH] merge lzj_version and little modification --- README.md | 6 +- db/builder.cc | 24 +++++-- db/db_impl.cc | 169 +++++++++++++++++++++++++++++++---------------- db/db_impl.h | 10 ++- db/dbformat.h | 9 +-- db/memtable.cc | 26 +++----- db/version_edit.cc | 9 +++ db/version_edit.h | 20 +++++- db/version_set.cc | 21 +++++- db/version_set.h | 6 +- include/leveldb/db.h | 7 +- include/leveldb/slice.h | 3 + include/leveldb/status.h | 1 + table/block.cc | 22 +----- test/simple_test.cc | 2 +- test/ttl_test.cc | 13 ++-- util/coding.cc | 7 ++ 17 files changed, 229 insertions(+), 126 deletions(-) diff --git a/README.md b/README.md index 2e2c298..099d1f5 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,8 @@ LevelDB project 1 -**本仓库提供TTL基本的测试用例** \ No newline at end of file +10225501460 林子骥 + +10211900416 郭夏辉 + +如果出现open db failed,可能是运行地太慢了,请酌情调高ttl_test.cc中ttl的值。 \ No newline at end of file diff --git a/db/builder.cc b/db/builder.cc index 4c6d802..a7dc15a 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -14,6 +14,16 @@ namespace leveldb { +/** + * description:目的,将内存中的数据写入一个新的 SSTable 文件 + * @param dbname + * @param env + * @param options + * @param table_cache + * @param iter + * @param meta :存储新sstable的相关元数据 + * @return + */ Status BuildTable(const std::string& dbname, Env* env, const Options& options, TableCache* table_cache, Iterator* iter, FileMetaData* meta) { Status s; @@ -21,6 +31,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, iter->SeekToFirst(); std::string fname = TableFileName(dbname, meta->number); + if (iter->Valid()) { WritableFile* file; s = env->NewWritableFile(fname, &file); @@ -30,16 +41,17 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key());//这里是internal_key -// auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength); -// meta->oldest_ts = tmp_ts; -// meta->newer_ts = tmp_ts; + auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength); + meta->oldest_ts = tmp_ts; + meta->newer_ts = tmp_ts; + Slice key; for (; iter->Valid(); iter->Next()) { key = iter->key(); builder->Add(key, iter->value()); -// tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength); -// meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts; -// meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts; + tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength); + meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts; + meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts; } if (!key.empty()) { meta->largest.DecodeFrom(key); diff --git a/db/db_impl.cc b/db/db_impl.cc index b117cb1..751f86f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -14,6 +14,15 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "leveldb/db.h" #include "leveldb/env.h" @@ -42,6 +51,7 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool done; + port::CondVar cv; }; @@ -51,6 +61,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + TIMESTAMP old_ts,new_ts; }; Output* current_output() { return &outputs[outputs.size() - 1]; } @@ -526,11 +537,15 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); + const TIMESTAMP new_ts = meta.newer_ts; + const TIMESTAMP old_ts = meta.oldest_ts; if (base != nullptr) { - level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); + level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);// TODO :基于timestamp和size和seek的新的选择规则 } +// edit->AddFile(level, meta.number, meta.file_size, meta.smallest, +// meta.largest); edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest); + meta.largest,old_ts,new_ts); } CompactionStats stats; @@ -619,7 +634,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin, bg_error_.ok()) { if (manual_compaction_ == nullptr) { // Idle manual_compaction_ = &manual; - MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。 + MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. background_work_finished_signal_.Wait(); } @@ -734,8 +749,10 @@ void DBImpl::BackgroundCompaction() { assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); +// c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, +// f->largest); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest); + f->largest,f->oldest_ts,f->newer_ts); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -747,7 +764,7 @@ void DBImpl::BackgroundCompaction() { status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); - status = DoCompactionWork(compact); + status = DoCompactionWork(compact);// if (!status.ok()) { RecordBackgroundError(status); } @@ -809,6 +826,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.old_ts = UINT64_MAX; + out.new_ts = 0; compact->outputs.push_back(out); mutex_.Unlock(); } @@ -883,8 +902,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { const int level = compact->compaction->level(); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; +// compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, +// out.smallest, out.largest); compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest); + out.smallest, out.largest,out.old_ts,out.new_ts); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -908,13 +929,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } Iterator* input = versions_->MakeInputIterator(compact->compaction); - // Release mutex while we're actually doing the compaction work mutex_.Unlock(); input->SeekToFirst(); Status status; ParsedInternalKey ikey; + TIMESTAMP ts = 0; std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; @@ -940,7 +961,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } - + //auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);//debug // Handle key/value, add to state, etc. bool drop = false; if (!ParseInternalKey(key, &ikey)) { @@ -972,19 +993,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; - } - - Slice value = input->value(); - if (value.size() >= sizeof(uint64_t)) { - const char* ptr = value.data(); - std::string temp_str(value.data(), value.size()); - uint64_t expiration_time = DBImpl::GetTS(&temp_str); - uint64_t current_time = env_->GetCurrentTime(); + }else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){ - if (current_time > expiration_time) { - drop = true; - }else {drop = false;} - } + drop = true; + } last_sequence_for_key = ikey.sequence; } @@ -1010,6 +1022,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); + assert(ts != 0); + //auto b = compact->current_output()->old_ts; + compact->current_output()->old_ts = std::min(compact->current_output()->old_ts,ts); + compact->current_output()->new_ts = std::max(compact->current_output()->new_ts,ts); compact->builder->Add(key, input->value()); // Close output file if it is big enough @@ -1156,29 +1172,29 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } else if (imm != nullptr && imm->Get(lkey, value, &s)) { // Done } else { + stats.now_ts = this->env_->NowMicros(); s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); } - if (s.ok()) { - // 直接在这里判断是否过期 - auto t1 = env_->GetCurrentTime(); - auto t2 = GetTS(value); - if(t1 >= t2){ - // 过期 - s = Status::NotFound("NotFound",Slice()); - } else { - // 没过期 - *value = value->substr(0, value->size() - sizeof(uint64_t)); - } - } - if (have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleCompaction(); + if(s.ok()){ + s = CheckIsExpire(value); + } + if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) { + MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。 } mem->Unref(); if (imm != nullptr) imm->Unref(); + current->Unref(); +// if(!s.ok()){ +// return s; +// } +// auto s2 = CheckIsExpire(value); +// if(!s2.ok()){ +// current->UpdateStats(stats); +// } return s; } @@ -1215,10 +1231,23 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } + + Status DBImpl::Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) { - // 扔掉了很复杂的逻辑 - return DB::Put(options, key, value, ttl); + + std::string val_with_ts; + val_with_ts.reserve(value.size() + kTSLength); + char ts_string[kTSLength]; + TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000; + EncodeFixed64(ts_string,expiration_time); + //assert(sizeof(expiration_time) == sizeof(TIMESTAMP )); + // 追加原始 value 到 val_with_ts + val_with_ts.append(value.data(), value.size()); + + // 将 expiration_time 追加到 val_with_ts + val_with_ts.append(ts_string,kTSLength); + return DB::Put(options, key, Slice(val_with_ts)); } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { @@ -1503,6 +1532,14 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { } v->Unref(); } + + +/** + * + * @param val + * @param val_with_ts 在val后面连接上预计超时的timestamp + * @param ttl 存活时间 + */ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { val_with_ts->reserve(kTSLength + val.size()); char ts_string[kTSLength]; @@ -1517,41 +1554,51 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { * @return timestamp in val,and remove timestamp from val */ uint64_t DBImpl::GetTS(std::string* val) { - // 不用auto再写一下 老逻辑: - // uint64_t expiration_time; - // memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t)); - // return expiration_time; - // 新逻辑: - auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); - val->resize(val->size() - kTSLength); - return expiration_time; -} -// Default implementations of convenience methods that subclasses of DB -// can call if they wish -Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { - WriteBatch batch; - batch.Put(key, value); - return Write(opt, &batch); + auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); + + val->resize(val->size() - kTSLength); + + return expiration_time; + } +Status DBImpl::CheckIsExpire(std::string* value) { + //debug 用 + auto a = env_->NowMicros(); + auto b = GetTS(value); + if(a > b){ + return Status::Expire("Expire",Slice()); + } + return Status(); -Status DB::Put(const WriteOptions& options, const Slice& key, +} + +/** + * + * @param options + * @param key + * @param value + * @param ttl + * @return + */ +Status DB::Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) { + // 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value std::string val_with_ts; - val_with_ts.reserve(value.size() + kTSLength); + + val_with_ts.reserve(value.size() + sizeof(uint64_t)); + uint64_t expiration_time = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count() + ttl * 1000; - char ts_string[kTSLength]; - EncodeFixed64(ts_string, expiration_time); // 追加原始 value 到 val_with_ts val_with_ts.append(value.data(), value.size()); + // 将 expiration_time 追加到 val_with_ts - // val_with_ts.append(reinterpret_cast(&expiration_time), sizeof(expiration_time)); - val_with_ts.append(ts_string, kTSLength); + val_with_ts.append(reinterpret_cast(&expiration_time), sizeof(expiration_time)); // std::cout<<"PUT"<Put(writeOptions, key, value, ttl); - std::cout << "time to alive" << ttl << std::endl; + //std::cout << "time to alive" << ttl << std::endl; ReadOptions readOptions; std::string value_read; diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 7205e12..521cae8 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -39,13 +39,14 @@ void GetData(DB *db, int size = (1 << 30)) { db->Get(readOptions, key, &value); } } - +// TEST(TestTTL, ReadTTL) { DB *db; if(OpenDB("testdb", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } + // 如果出现open db fail,请酌情提高这里 uint64_t ttl = 20; InsertData(db, ttl); @@ -72,11 +73,12 @@ TEST(TestTTL, ReadTTL) { ASSERT_FALSE(status.ok()); } delete db; + Env::Default()->SleepForMicroseconds( 1000); } TEST(TestTTL, CompactionTTL) { - DB *db; DestroyDB("testdb", Options()); + DB *db; if(OpenDB("testdb", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); @@ -95,12 +97,11 @@ TEST(TestTTL, CompactionTTL) { db->CompactRange(nullptr, nullptr); - leveldb::Range ranges1[1]; - ranges[0] = leveldb::Range("-", "A"); + leveldb::Range ranges2[1]; + ranges2[0] = leveldb::Range("-", "A"); uint64_t sizes2[1]; - db->GetApproximateSizes(ranges1, 1, sizes2); + db->GetApproximateSizes(ranges2, 1, sizes2); ASSERT_EQ(sizes2[0], 0); - delete db; } diff --git a/util/coding.cc b/util/coding.cc index a8f8af8..cb0ffaf 100644 --- a/util/coding.cc +++ b/util/coding.cc @@ -69,6 +69,7 @@ void PutVarint64(std::string* dst, uint64_t v) { dst->append(buf, ptr - buf); } + void PutLengthPrefixedSlice(std::string* dst, const Slice& value) { PutVarint32(dst, value.size()); dst->append(value.data(), value.size()); @@ -142,6 +143,12 @@ bool GetVarint64(Slice* input, uint64_t* value) { } } + +//bool GetFixed64(const char* ptr,uint64_t* value){ +// *value = DecodeFixed64(ptr); +// return +//} + bool GetLengthPrefixedSlice(Slice* input, Slice* result) { uint32_t len; if (GetVarint32(input, &len) && input->size() >= len) {