diff --git a/db/db_impl.cc b/db/db_impl.cc index 5814727..4d47e9e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -14,15 +14,6 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "leveldb/db.h" #include "leveldb/env.h" @@ -51,7 +42,6 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool done; - port::CondVar cv; }; @@ -983,6 +973,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { drop = true; } + Slice value = input->value(); + if (value.size() >= sizeof(uint64_t)) { + const char* ptr = value.data(); + std::string temp_str(value.data(), value.size()); + uint64_t expiration_time = DBImpl::GetTS(&temp_str); + uint64_t current_time = env_->GetCurrentTime(); + + if (current_time > expiration_time) { + drop = true; + }else {drop = false;} + } + last_sequence_for_key = ikey.sequence; } #if 0 @@ -1158,18 +1160,25 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } mutex_.Lock(); } - + if (s.ok()) { + // 直接在这里判断是否过期 + auto t1 = env_->GetCurrentTime(); + auto t2 = GetTS(value); + if(t1 >= t2){ + // 过期 + s = Status::Expire("Expire",Slice()); + } else { + // 没过期 + *value = value->substr(0, value->size() - sizeof(uint64_t)); + } + } if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); - if(!s.ok()){ - return s; - } - auto s2 = CheckIsExpire(value); - return s2; + return s; } Iterator* DBImpl::NewIterator(const ReadOptions& options) { @@ -1205,44 +1214,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } - - Status DBImpl::Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) { - //rocksdb的实现 -// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, -// SystemClock* clock) { -// val_with_ts->reserve(kTSLength + val.size()); -// char ts_string[kTSLength]; -// int64_t curtime; -// Status st = clock->GetCurrentTime(&curtime); -// if (!st.ok()) { -// return st; -// } -// EncodeFixed32(ts_string, (int32_t)curtime); -// val_with_ts->append(val.data(), val.size()); -// val_with_ts->append(ts_string, kTSLength); -// return st; -// } - - std::string val_with_ts; - val_with_ts.reserve(value.size() + kTSLength); - char ts_string[kTSLength]; - TIMESTAMP expiration_time = this->env_->GetCurrentTime() + ttl * 1000; - EncodeFixed64(ts_string,expiration_time); - //assert(sizeof(expiration_time) == sizeof(TIMESTAMP )); - // 追加原始 value 到 val_with_ts - val_with_ts.append(value.data(), value.size()); - - // 将 expiration_time 追加到 val_with_ts - val_with_ts.append(ts_string,kTSLength); -// std::cout << "val_with_ts in hex: "; -// for (unsigned char c : val_with_ts) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; - - return DB::Put(options, key, Slice(val_with_ts)); + // 扔掉了很复杂的逻辑 + return DB::Put(options, key, value, ttl); } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { @@ -1527,14 +1502,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { } v->Unref(); } - - -/** - * - * @param val - * @param val_with_ts 在val后面连接上预计超时的timestamp - * @param ttl 存活时间 - */ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { val_with_ts->reserve(kTSLength + val.size()); char ts_string[kTSLength]; @@ -1548,71 +1515,32 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { * @param val * @return timestamp in val,and remove timestamp from val */ -uint64_t DBImpl::GetTS(std::string* val) { - //uint64_t expiration_time; - // 输出 val 的十六进制表示 -// std::cout << "befor decode,val in hex: "; -// for (unsigned char c : *val) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; - auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); - //memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP)); - val->resize(val->size() - kTSLength); -// std::cout << "after decode,val in hex: "; -// for (unsigned char c : *val) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; +uint64_t DBImpl::GetTS(const std::string* val) { + // 不用auto再写一下 + uint64_t expiration_time; + memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t)); return expiration_time; - -// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { -// if (pinnable_val->size() < kTSLength) { -// return Status::Corruption("Bad timestamp in key-value"); -// } -// // Erasing characters which hold the TS -// pinnable_val->remove_suffix(kTSLength); -// return Status::OK(); -// } } -Status DBImpl::CheckIsExpire(std::string* value) { - //debug 用 - auto a = env_->GetCurrentTime(); - auto b = GetTS(value); -// std::cout<<"get current time"< b){ - return Status::Expire("Expire",Slice()); - } - return Status(); -// if(env_->GetCurrentTime() > GetTS(value)){ -// return Status::Expire("Expire",Slice()); -// } + +// Default implementations of convenience methods that subclasses of DB +// can call if they wish +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { + WriteBatch batch; + batch.Put(key, value); + return Write(opt, &batch); } -/** - * - * @param options - * @param key - * @param value - * @param ttl - * @return - */ -Status DB::Put(const WriteOptions& options, const Slice& key, - const Slice& value, uint64_t ttl) { +Status DB::Put(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) { // 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value std::string val_with_ts; - val_with_ts.reserve(value.size() + sizeof(uint64_t)); - uint64_t expiration_time = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count() + ttl * 1000; - // 追加原始 value 到 val_with_ts val_with_ts.append(value.data(), value.size()); - // 将 expiration_time 追加到 val_with_ts val_with_ts.append(reinterpret_cast(&expiration_time), sizeof(expiration_time)); // std::cout<<"PUT"<SleepForMicroseconds( 1000); } TEST(TestTTL, CompactionTTL) { - DestroyDB("testdb", Options()); DB *db; + DestroyDB("testdb", Options()); if(OpenDB("testdb", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); @@ -96,11 +95,12 @@ TEST(TestTTL, CompactionTTL) { db->CompactRange(nullptr, nullptr); - leveldb::Range ranges2[1]; - ranges2[0] = leveldb::Range("-", "A"); + leveldb::Range ranges1[1]; + ranges[0] = leveldb::Range("-", "A"); uint64_t sizes2[1]; - db->GetApproximateSizes(ranges2, 1, sizes2); - ASSERT_EQ(sizes[0], 0); + db->GetApproximateSizes(ranges1, 1, sizes2); + ASSERT_EQ(sizes2[0], 0); + delete db; }