diff --git a/db/db_impl.cc b/db/db_impl.cc index 54e3038..751f86f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1178,18 +1178,24 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } mutex_.Lock(); } - - if (have_stat_update && current->UpdateStats(stats)) { + if(s.ok()){ + s = CheckIsExpire(value); + } + if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) { MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。 } mem->Unref(); if (imm != nullptr) imm->Unref(); + current->Unref(); - if(!s.ok()){ - return s; - } - auto s2 = CheckIsExpire(value); - return s2; +// if(!s.ok()){ +// return s; +// } +// auto s2 = CheckIsExpire(value); +// if(!s2.ok()){ +// current->UpdateStats(stats); +// } + return s; } Iterator* DBImpl::NewIterator(const ReadOptions& options) { @@ -1229,21 +1235,6 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { Status DBImpl::Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) { - //rocksdb的实现 -// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, -// SystemClock* clock) { -// val_with_ts->reserve(kTSLength + val.size()); -// char ts_string[kTSLength]; -// int64_t curtime; -// Status st = clock->GetCurrentTime(&curtime); -// if (!st.ok()) { -// return st; -// } -// EncodeFixed32(ts_string, (int32_t)curtime); -// val_with_ts->append(val.data(), val.size()); -// val_with_ts->append(ts_string, kTSLength); -// return st; -// } std::string val_with_ts; val_with_ts.reserve(value.size() + kTSLength); @@ -1256,12 +1247,6 @@ Status DBImpl::Put(const WriteOptions& options, const Slice& key, // 将 expiration_time 追加到 val_with_ts val_with_ts.append(ts_string,kTSLength); -// std::cout << "val_with_ts in hex: "; -// for (unsigned char c : val_with_ts) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; - return DB::Put(options, key, Slice(val_with_ts)); } @@ -1569,45 +1554,24 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { * @return timestamp in val,and remove timestamp from val */ uint64_t DBImpl::GetTS(std::string* val) { - //uint64_t expiration_time; - // 输出 val 的十六进制表示 -// std::cout << "befor decode,val in hex: "; -// for (unsigned char c : *val) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; + auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); - //memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP)); + val->resize(val->size() - kTSLength); -// std::cout << "after decode,val in hex: "; -// for (unsigned char c : *val) { -// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " "; -// } -// std::cout << std::endl; + return expiration_time; -// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { -// if (pinnable_val->size() < kTSLength) { -// return Status::Corruption("Bad timestamp in key-value"); -// } -// // Erasing characters which hold the TS -// pinnable_val->remove_suffix(kTSLength); -// return Status::OK(); -// } } Status DBImpl::CheckIsExpire(std::string* value) { //debug 用 auto a = env_->NowMicros(); auto b = GetTS(value); -// std::cout<<"get current time"< b){ return Status::Expire("Expire",Slice()); } return Status(); -// if(env_->GetCurrentTime() > GetTS(value)){ -// return Status::Expire("Expire",Slice()); -// } + } /** diff --git a/db/version_set.cc b/db/version_set.cc index 2484743..b81e29b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -413,6 +413,20 @@ bool Version::UpdateStats(const GetStats& stats) { return false; } +bool Version::UpdateStats(const GetStats& stats,bool is_expire) { + FileMetaData* f = stats.seek_file; + if (f != nullptr) { + f->allowed_seeks--; + if(is_expire)f->allowed_seeks--; + if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) { + file_to_compact_ = f; + file_to_compact_level_ = stats.seek_file_level; + return true; + } + } + return false; +} + bool Version::RecordReadSample(Slice internal_key) { ParsedInternalKey ikey; if (!ParseInternalKey(internal_key, &ikey)) { @@ -821,7 +835,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { // Write new record to MANIFEST log if (s.ok()) { std::string record; - edit->EncodeTo(&record);// TODO:修改 + edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); diff --git a/db/version_set.h b/db/version_set.h index 7b96986..cfdf7c4 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -80,8 +80,8 @@ class Version { // compaction may need to be triggered, false otherwise. // REQUIRES: lock is held bool UpdateStats(const GetStats& stats); - - // Record a sample of bytes read at the specified internal key. + bool UpdateStats(const GetStats& stats,bool is_expire); + // Record a sample of bytes read at the specified internal key. // Samples are taken approximately once every config::kReadBytesPeriod // bytes. Returns true if a new compaction may need to be triggered. // REQUIRES: lock is held @@ -313,7 +313,7 @@ class VersionSet { // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. - std::string compact_pointer_[config::kNumLevels]; + std::string compact_pointer_[config::kNumLevels];//会在完成一次压缩操作后进行更新,以确保下一次压缩不会总是从相同的地方 }; // A Compaction encapsulates information about a compaction. diff --git a/include/leveldb/status.h b/include/leveldb/status.h index e4943be..9c63d01 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -74,6 +74,7 @@ class LEVELDB_EXPORT Status { // Returns true iff the status indicates an InvalidArgument. bool IsInvalidArgument() const { return code() == kInvalidArgument; } + bool IsExpire()const{return code() == kExpire;} // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 5d57904..3686336 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -75,33 +75,33 @@ TEST(TestTTL, ReadTTL) { Env::Default()->SleepForMicroseconds( 1000); } -//TEST(TestTTL, CompactionTTL) { -// DestroyDB("testdb", Options()); -// DB *db; -// if(OpenDB("testdb", &db).ok() == false) { -// std::cerr << "open db failed" << std::endl; -// abort(); -// } -// -// uint64_t ttl = 20; -// InsertData(db, ttl); -// -// leveldb::Range ranges[1]; -// ranges[0] = leveldb::Range("-", "A"); -// uint64_t sizes[1]; -// db->GetApproximateSizes(ranges, 1, sizes); -// ASSERT_GT(sizes[0], 0); -// -// Env::Default()->SleepForMicroseconds(ttl * 1000000); -// -// db->CompactRange(nullptr, nullptr); -// -// leveldb::Range ranges2[1]; -// ranges2[0] = leveldb::Range("-", "A"); -// uint64_t sizes2[1]; -// db->GetApproximateSizes(ranges2, 1, sizes2); -// ASSERT_EQ(sizes2[0], 0); -//} +TEST(TestTTL, CompactionTTL) { + DestroyDB("testdb", Options()); + DB *db; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 20; + InsertData(db, ttl); + + leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_GT(sizes[0], 0); + + Env::Default()->SleepForMicroseconds(ttl * 1000000); + + db->CompactRange(nullptr, nullptr); + + leveldb::Range ranges2[1]; + ranges2[0] = leveldb::Range("-", "A"); + uint64_t sizes2[1]; + db->GetApproximateSizes(ranges2, 1, sizes2); + ASSERT_EQ(sizes2[0], 0); +} int main(int argc, char** argv) {