Browse Source

finish ReadTTL

main
VirgilZhu 1 month ago
parent
commit
2530c897ec
5 changed files with 44 additions and 23 deletions
  1. +22
    -6
      db/db_impl.cc
  2. +1
    -0
      db/db_impl.h
  3. +1
    -0
      include/leveldb/db.h
  4. +6
    -10
      test/db_test2.cc
  5. +14
    -7
      test/ttl_test.cc

+ 22
- 6
db/db_impl.cc View File

@ -3,6 +3,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <chrono>
#include <iostream>
#include "db/db_impl.h"
@ -985,7 +986,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string user_value = input->value().ToString();
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
if (user_value.find("_ts_") == std::string::npos) {
input->value() = user_value + "_ts_" + std::to_string(now + 20);
input->value() = user_value + "_ts_" + std::to_string(now + ttl);
}
else {
uint64_t deadtime = std::stoi(user_value.substr(user_value.find("_ts_") + 4));
@ -1134,14 +1135,14 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
}
/* TODO: Add TTL Version isLive() */
Status isLive(const Slice& key, std::string* value, Status& s) {
Status isLive(const Slice& key, std::string* value, Status& s, uint64_t ttl) {
if (value->empty()) {
s = Status::NotFound(key);
return s;
}
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
if (value->find("_ts_") == std::string::npos) {
*value = *value + "_ts_" + std::to_string(now + 20);
*value = *value + "_ts_" + std::to_string(now + ttl);
}
else {
uint64_t deadtime = std::stoi(value->substr(value->find("_ts_") + 4));
@ -1183,14 +1184,14 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
/* TODO: Add TTL Version Get() */
if (mem->Get(lkey, value, &s)) {
isLive(key, value, s);
isLive(key, value, s, ttl);
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
isLive(key, value, s);
isLive(key, value, s, ttl);
// Done
} else {
s = current->Get(options, lkey, value, &stats);
if (s.ok()) isLive(key, value, s);
if (s.ok()) isLive(key, value, s, ttl);
have_stat_update = true;
}
/* --------------------------- */
@ -1562,12 +1563,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (!s.ok()) {
std::cerr << "Recover failed: " << s.ToString() << std::endl;
}
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (!s.ok()) {
std::cerr << "NewWritableFile failed: " << s.ToString() << std::endl;
}
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
@ -1581,6 +1592,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
if (!s.ok()) {
std::cerr << "LogAndApply failed: " << s.ToString() << std::endl;
}
}
if (s.ok()) {
impl->RemoveObsoleteFiles();

+ 1
- 0
db/db_impl.h View File

@ -28,6 +28,7 @@ class VersionSet;
class DBImpl : public DB {
public:
uint64_t ttl;
DBImpl(const Options& options, const std::string& dbname);
DBImpl(const DBImpl&) = delete;

+ 1
- 0
include/leveldb/db.h View File

@ -54,6 +54,7 @@ class LEVELDB_EXPORT DB {
DB** dbptr);
DB() = default;
uint64_t ttl;
DB(const DB&) = delete;
DB& operator=(const DB&) = delete;

+ 6
- 10
test/db_test2.cc View File

@ -1,14 +1,10 @@
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace leveldb;
using namespace std;
constexpr int value_size = 2048;
constexpr int data_size = 256 << 20;
@ -25,7 +21,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
@ -39,14 +35,15 @@ void InsertData(DB *db) {
void GetData(DB *db, int size = (1 << 30)) {
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
std::cout << value << std::endl;
}
// 范围查询
@ -70,7 +67,6 @@ int main() {
GetData(db);
delete db;
}
return 0;
}
return 0;
}

+ 14
- 7
test/ttl_test.cc View File

@ -1,4 +1,4 @@
#include <chrono>
#include "gtest/gtest.h"
@ -20,12 +20,13 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(42);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->ttl = ttl;
db->Put(writeOptions, key, value, ttl);
}
}
@ -35,7 +36,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
srand(42);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -46,7 +47,7 @@ void GetData(DB *db, int size = (1 << 30)) {
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
if(OpenDB("testdb_ReadTTL", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -58,12 +59,14 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(42);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::cout << status.ToString() << " key: " << key << " value: ******" << value.substr(value.find("_ts_")) << " now: " << now << std::endl;
ASSERT_TRUE(status.ok());
}
@ -74,6 +77,8 @@ TEST(TestTTL, ReadTTL) {
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::cout << status.ToString() << " key: " << key << " value: ******" << value.substr(value.find("_ts_")) << " now: " << now << std::endl;
ASSERT_FALSE(status.ok());
}
}
@ -81,8 +86,8 @@ TEST(TestTTL, ReadTTL) {
TEST(TestTTL, CompactionTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
if(OpenDB("testdb_CompactionTTL", &db).ok() == false) {
std::cerr << "open db failed" << OpenDB("testdb_CompactionTTL", &db).ToString() << std::endl;
abort();
}
@ -93,6 +98,7 @@ TEST(TestTTL, CompactionTTL) {
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
std::cout << "ApproximateSizes before TTL: " << sizes[0] << std::endl;
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
@ -103,6 +109,7 @@ TEST(TestTTL, CompactionTTL) {
ranges[0] = leveldb::Range("-", "A");
// uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
std::cout << "ApproximateSizes after TTL: " << sizes[0] << std::endl;
ASSERT_EQ(sizes[0], 0);
}

Loading…
Cancel
Save