diff --git a/CMakeLists.txt b/CMakeLists.txt index 54b14a1..fd98cb6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -521,6 +521,7 @@ endif(LEVELDB_INSTALL) add_executable(db_test2 "${PROJECT_SOURCE_DIR}/test/db_test2.cc" + util/ttl/tt_impl.cc ) target_link_libraries(db_test2 PRIVATE leveldb) diff --git a/db/db_impl.cc b/db/db_impl.cc index f96d245..8eb536c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -48,6 +48,7 @@ struct DBImpl::Writer { WriteBatch* batch; bool sync; bool done; + port::CondVar cv; }; @@ -1197,6 +1198,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } +Status DBImpl::Put(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) { + return DB::Put(options, key, value,ttl); +} Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); @@ -1483,6 +1488,52 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } + +/** + * + * @param val + * @param val_with_ts 在val后面连接上预计超时的timestamp + * @param ttl 存活时间 + */ +void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { + val_with_ts->reserve(kTSLength + val.size()); + char ts_string[kTSLength]; + uint64_t st = env_->GetCurrentTime() + ttl; + *val_with_ts = val.ToString(); // 原始 value + val_with_ts->append(reinterpret_cast(&st), sizeof(st)); +} + +/** + * + * @param options + * @param key + * @param value + * @param ttl + * @return + */ +Status DB::Put(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) { + + // 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value + std::string val_with_ts; + + val_with_ts.reserve(value.size() + sizeof(uint64_t)); + + uint64_t expiration_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + ttl; + + // 追加原始 value 到 val_with_ts + val_with_ts.append(value.data(), value.size()); + + // 将 expiration_time 追加到 val_with_ts + val_with_ts.append(reinterpret_cast(&expiration_time), sizeof(expiration_time)); + + WriteBatch batch; + batch.Put(key, Slice(val_with_ts)); + return Write(options, &batch); +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..2517e7f 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -38,6 +38,9 @@ class DBImpl : public DB { // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; + + Status Put(const WriteOptions& options, const Slice& key, const Slice& value, + uint64_t ttl) override; Status Delete(const WriteOptions&, const Slice& key) override; Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Get(const ReadOptions& options, const Slice& key, @@ -48,7 +51,7 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - + void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl); // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin,*end] @@ -70,6 +73,8 @@ class DBImpl : public DB { // Samples are taken approximately once every config::kReadBytesPeriod // bytes. void RecordReadSample(Slice key); +// Status Write(const WriteOptions& options, WriteBatch* updates, +// uint64_t ttl) override; private: friend class DB; diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..6bf2f09 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -66,6 +66,9 @@ typedef uint64_t SequenceNumber; // can be packed together into 64-bits. static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1); +static const uint32_t kTSLength = sizeof(uint64_t ); // size of timestamp + + struct ParsedInternalKey { Slice user_key; SequenceNumber sequence; @@ -85,6 +88,8 @@ inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { // Append the serialization of "key" to *result. void AppendInternalKey(std::string* result, const ParsedInternalKey& key); + + // Attempt to parse an internal key from "internal_key". On success, // stores the parsed data in "*result", and returns true. // @@ -219,6 +224,10 @@ inline LookupKey::~LookupKey() { if (start_ != space_) delete[] start_; } + + + + } // namespace leveldb #endif // STORAGE_LEVELDB_DB_DBFORMAT_H_ diff --git a/db/memtable.cc b/db/memtable.cc index 4f09340..6b875e0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -81,6 +81,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, // tag : uint64((sequence << 8) | type) // value_size : varint32 of value.size() // value bytes : char[value.size()] + // timestamp : uint64 size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; @@ -94,7 +95,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); - std::memcpy(p, value.data(), val_size); + std::memcpy(p, value.data(), val_size);//value包含timestamp assert(p + val_size == buf + encoded_len); table_.Insert(buf); } diff --git a/include/leveldb/db.h b/include/leveldb/db.h index bf4eec5..481f43b 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -150,6 +150,8 @@ class LEVELDB_EXPORT DB { // 为当前key设置ttl,过期后自动失效 virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) = 0; +// virtual Status Write(const WriteOptions& options, WriteBatch* updates,uint64_t ttl) = 0; + }; // Destroy the contents of the specified database. diff --git a/include/leveldb/env.h b/include/leveldb/env.h index e00895a..9f72b54 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -13,6 +13,7 @@ #ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_ #define STORAGE_LEVELDB_INCLUDE_ENV_H_ +#include #include #include #include @@ -64,6 +65,13 @@ class LEVELDB_EXPORT Env { // The result of Default() belongs to leveldb and must never be deleted. static Env* Default(); + + uint64_t GetCurrentTime() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); // 以毫秒为单位返回当前时间戳 + } + // Create an object that sequentially reads the file with the specified name. // On success, stores a pointer to the new file in *result and returns OK. // On failure stores nullptr in *result and returns non-OK. If the file does diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h index e97223a..1c46be7 100644 --- a/include/leveldb/slice.h +++ b/include/leveldb/slice.h @@ -38,6 +38,7 @@ class LEVELDB_EXPORT Slice { // Create a slice that refers to s[0,strlen(s)-1] Slice(const char* s) : data_(s), size_(strlen(s)) {} + // Intentionally copyable. Slice(const Slice&) = default; Slice& operator=(const Slice&) = default; @@ -48,6 +49,7 @@ class LEVELDB_EXPORT Slice { // Return the length (in bytes) of the referenced data size_t size() const { return size_; } + // Return true iff the length of the referenced data is zero bool empty() const { return size_ == 0; } @@ -74,6 +76,7 @@ class LEVELDB_EXPORT Slice { size_ -= n; } + // Return a string that contains the copy of the referenced data. std::string ToString() const { return std::string(data_, size_); } diff --git a/test/db_test2.cc b/test/db_test2.cc index 49e26ab..76fd0ba 100644 --- a/test/db_test2.cc +++ b/test/db_test2.cc @@ -20,7 +20,20 @@ Status OpenDB(std::string dbName, DB **db) { // 1. 存储(数据结构与写入) // 4. 数据合并(Compaction) -void InsertData(DB *db) { +//void InsertData(DB *db) { +// WriteOptions writeOptions; +// int key_num = data_size / value_size; +// srand(static_cast(time(0))); +// +// for (int i = 0; i < key_num; i++) { +// int key_ = rand() % key_num+1; +// std::string key = std::to_string(key_); +// std::string value(value_size, 'a'); +// db->Put(writeOptions, key, value); +// } +//} + +void InsertData(DB *db, uint64_t ttl/* second */) { WriteOptions writeOptions; int key_num = data_size / value_size; srand(static_cast(time(0))); @@ -29,10 +42,9 @@ void InsertData(DB *db) { int key_ = rand() % key_num+1; std::string key = std::to_string(key_); std::string value(value_size, 'a'); - db->Put(writeOptions, key, value); + db->Put(writeOptions, key, value, ttl); } } - // 2. 数据访问(如何读数据) void GetData(DB *db, int size = (1 << 30)) { ReadOptions readOptions; @@ -60,7 +72,9 @@ int main() { DB *db; if(OpenDB("testdb", &db).ok()) { - InsertData(db); + uint64_t ttl = 20; + + InsertData(db, ttl); delete db; } diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 06f4cda..ce43f04 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -14,6 +14,84 @@ constexpr int data_size = 128 << 20; Status OpenDB(std::string dbName, DB **db) { Options options; options.create_if_missing = true; +#include "gtest/gtest.h" + +#include "leveldb/env.h" +#include "leveldb/db.h" + + +using namespace leveldb; + +constexpr int value_size = 2048; +constexpr int data_size = 128 << 20; + +Status OpenDB(std::string dbName, DB **db) { + Options options; + options.create_if_missing = true; + return DB::Open(options, dbName, db); +} + +void InsertData(DB *db, uint64_t ttl/* second */) { + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + + for (int i = 0; i < key_num; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value(value_size, 'a'); + db->Put(writeOptions, key, value, ttl); + } +} + +void GetData(DB *db, int size = (1 << 30)){ + ReadOptions readOptions; + int key_num = data_size / value_size; + + // 点查 + srand(static_cast(time(0))); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + db->Get(readOptions, key, &value); + } +} + +TEST(TestTTL, ReadTTL) { + DB *db; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 20; + + InsertData(db, ttl); + + ReadOptions readOptions; + Status status; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); + } + + Env::Default()->SleepForMicroseconds(ttl * 1000000); + + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + ASSERT_FALSE(status.ok()); + } +} + return DB::Open(options, dbName, db); } @@ -109,6 +187,7 @@ TEST(TestTTL, CompactionTTL) { int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. + testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/util/ttl/tt_impl.cc b/util/ttl/tt_impl.cc new file mode 100644 index 0000000..e69de29