瀏覽代碼

add TTL version Get() and Put()

main
VirgilZhu 3 週之前
父節點
當前提交
24828dd069
共有 7 個文件被更改,包括 107 次插入4 次删除
  1. +12
    -1
      CMakeLists.txt
  2. +46
    -1
      db/db_impl.cc
  3. +6
    -0
      db/db_impl.h
  4. +2
    -0
      include/leveldb/db.h
  5. +26
    -0
      test/test_a.cpp
  6. +13
    -0
      test/time.cpp
  7. +2
    -2
      test/ttl_test.cc

+ 12
- 1
CMakeLists.txt 查看文件

@ -521,6 +521,7 @@ endif(LEVELDB_INSTALL)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
test/time.cpp
)
target_link_libraries(db_test2 PRIVATE leveldb)
@ -528,4 +529,14 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(test_a
"${PROJECT_SOURCE_DIR}/test/test_a.cpp"
)
target_link_libraries(test_a leveldb)
add_executable(time
"${PROJECT_SOURCE_DIR}/test/time.cpp"
)
target_link_libraries(time leveldb)

+ 46
- 1
db/db_impl.cc 查看文件

@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <chrono>
#include "db/db_impl.h"
#include <algorithm>
@ -978,7 +980,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
/* TODO: Add TTL Version Compaction Drop Condition */
else {
std::string user_value = input->value().ToString();
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
if (user_value.find("_ts_") == std::string::npos) {
input->value() = user_value + "_ts_" + std::to_string(now + 20);
}
else {
uint64_t deadtime = std::stoi(user_value.substr(user_value.find("_ts_") + 4));
if (now >= deadtime) {
drop = true;
}
}
}
/* ----------------------------------------------- */
last_sequence_for_key = ikey.sequence;
}
#if 0
@ -1117,6 +1133,17 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
/* TODO: Add TTL Version isLive() */
Status isLive(const Slice& key, std::string* value, Status& s) {
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
uint64_t deadtime = std::stoi(value->substr(value->find("_ts_") + 4));
if (now >= deadtime) {
s = Status::NotFound(key);
}
return s;
}
/* --------------------------- */
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1144,14 +1171,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
/* TODO: Add TTL Version Get() */
if (mem->Get(lkey, value, &s)) {
isLive(key, value, s);
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
isLive(key, value, s);
// Done
} else {
s = current->Get(options, lkey, value, &stats);
if (s.ok()) isLive(key, value, s);
have_stat_update = true;
}
/* --------------------------- */
mutex_.Lock();
}
@ -1491,6 +1525,17 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
return Write(opt, &batch);
}
/* TODO: Add TTL Version Put() */
Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl){
WriteBatch batch;
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto end = now + ttl;
Slice value_timestamp = Slice(value.ToString() + "_ts_" + std::to_string(end));
batch.Put(key, value_timestamp);
return Write(opt, &batch);
}
/* --------------------------- */
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 6
- 0
db/db_impl.h 查看文件

@ -42,6 +42,12 @@ class DBImpl : public DB {
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
/* TODO: Add TTL Version Put() */
Status Put(const WriteOptions&, const Slice& key,
const Slice& value, uint64_t ttl) override;
/* --------------------------- */
Iterator* NewIterator(const ReadOptions&) override;
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;

+ 2
- 0
include/leveldb/db.h 查看文件

@ -148,8 +148,10 @@ class LEVELDB_EXPORT DB {
// ----------------------------For TTL-----------------------------
// key设置ttl
/* TODO: Add TTL Version Put() */
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
/* --------------------------- */
};
// Destroy the contents of the specified database.

+ 26
- 0
test/test_a.cpp 查看文件

@ -0,0 +1,26 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 13
- 0
test/time.cpp 查看文件

@ -0,0 +1,13 @@
#include <iostream>
#include <chrono>
int main() {
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto end = now + 5;
// 输出当前时间点和未来时间点
std::cout << "Current time point: " << now << std::endl;
std::cout << "Future time point (+5 seconds): " << end << std::endl;
return 0;
}

+ 2
- 2
test/ttl_test.cc 查看文件

@ -99,9 +99,9 @@ TEST(TestTTL, CompactionTTL) {
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges[1];
// leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
// uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
}

Loading…
取消
儲存