16 Commits

Author SHA1 Message Date
  VirgilZhu 258a4f2dac migrate to shuishan 10 months ago
  VirgilZhu 5b8eaab8e9 finish lab 10 months ago
  VirgilZhu 098cde3769 finish testCompactionTTL 10 months ago
  GUJIEJASON 604470f2c4 add some comments 10 months ago
  GUJIEJASON 59a258c3fd fix some bugs about ts 10 months ago
  GUJIEJASON 04fccda06f merge 10 months ago
  GUJIEJASON 614cf96bc7 fix some bugs in ttl_test 10 months ago
  VirgilZhu 2530c897ec finish ReadTTL 10 months ago
  GUJIEJASON 1c1970d241 Merge branch 'virgil' into jie 10 months ago
  VirgilZhu bc872072d3 fix db_test1/2 bug 10 months ago
  GUJIEJASON 1bf2665d03 Merge branch 'virgil' into jie 10 months ago
  GUJIEJASON c97d7cee71 init 10 months ago
  VirgilZhu 24828dd069 add TTL version Get() and Put() 10 months ago
  GUJIEJASON facd2ab6ff init repo 11 months ago
  root 2d26e4ab6d init repo 11 months ago
  GUJIEJASON 681aed6df6 init 11 months ago
10 changed files with 242 additions and 28 deletions
Split View
  1. +18
    -1
      CMakeLists.txt
  2. +0
    -7
      README.md
  3. +116
    -2
      db/db_impl.cc
  4. +7
    -0
      db/db_impl.h
  5. +3
    -0
      include/leveldb/db.h
  6. +26
    -0
      test/db_test1.cc
  7. +4
    -6
      test/db_test2.cc
  8. +26
    -0
      test/test_a.cpp
  9. +13
    -0
      test/time.cpp
  10. +29
    -12
      test/ttl_test.cc

+ 18
- 1
CMakeLists.txt View File

@ -112,6 +112,7 @@ include_directories(
if(BUILD_SHARED_LIBS)
# Only export LEVELDB_EXPORT symbols from the shared library.
add_compile_options(-fvisibility=hidden)
endif(BUILD_SHARED_LIBS)
# Must be included before CMAKE_INSTALL_INCLUDEDIR is used.
@ -518,9 +519,15 @@ if(LEVELDB_INSTALL)
)
endif(LEVELDB_INSTALL)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
test/time.cpp
)
target_link_libraries(db_test2 PRIVATE leveldb)
@ -528,4 +535,14 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(test_a
"${PROJECT_SOURCE_DIR}/test/test_a.cpp"
)
target_link_libraries(test_a leveldb)
add_executable(time
"${PROJECT_SOURCE_DIR}/test/time.cpp"
)
target_link_libraries(time leveldb)

+ 0
- 7
README.md View File

@ -1,9 +1,2 @@
**本仓库提供TTL基本的测试用例**
克隆代码:
```bash
git clone --recurse-submodules https://gitea.shuishan.net.cn/building_data_management_systems.Xuanzhou.2024Fall.DaSE/leveldb_base.git
```

+ 116
- 2
db/db_impl.cc View File

@ -2,12 +2,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <chrono>
#include <iostream>
#include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <cctype>
#include <set>
#include <string>
#include <vector>
@ -591,7 +595,8 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
for (int level = 0; level <= max_level_with_files; level++) {
// for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
@ -894,6 +899,17 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
/* TODO: Check if a string consists entirely of digits */
bool isAllDigits(const std::string& str) {
for (char c : str) {
if (!isdigit(c)) {
return false;
}
}
return true;
}
/* --------------------------- */
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
@ -978,7 +994,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
/* TODO: Add TTL Version Compaction Drop Condition */
else {
std::string user_value = input->value().ToString();
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
// if (user_value.find("_ts_") == std::string::npos) {
// input->value() = user_value + "_ts_" + std::to_string(now + ttl);
// }
// else {
// uint64_t deadtime = std::stoi(user_value.substr(user_value.find("_ts_") + 4));
// if (now >= deadtime) {
// drop = true;
// }
// }
size_t pos = user_value.rfind("_ts_");
if (pos != std::string::npos){
std::string timestampStr = user_value.substr(pos + 4);
if (isAllDigits(timestampStr)) {
uint64_t deadtime = std::stoull(timestampStr);
if (now >= deadtime) {
drop = true;
}
}
}
}
/* ----------------------------------------------- */
last_sequence_for_key = ikey.sequence;
}
#if 0
@ -1117,6 +1157,44 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
/* TODO: Add TTL Version isLive() */
// Status isLive(const Slice& key, std::string* value, Status& s, uint64_t ttl) {
// if (value->empty()) {
// s = Status::NotFound(key);
// return s;
// }
// uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
// if (value->find("_ts_") == std::string::npos) {
// *value = *value + "_ts_" + std::to_string(now + ttl);
// }
// else {
// uint64_t deadtime = std::stoi(value->substr(value->find("_ts_") + 4));
// if (now >= deadtime) {
// s = Status::NotFound(key);
// }
// }
// return s;
// }
/* --------------------------- */
Status isLive(const Slice& key, std::string* value, Status& s) {
if (value->empty()) {
s = Status::NotFound(key);
return s;
}
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
size_t pos = value->rfind("_ts_");
if (pos != std::string::npos){
std::string timestampStr = value->substr(pos + 4);
if (isAllDigits(timestampStr)) {
uint64_t deadtime = std::stoull(timestampStr);
if (now >= deadtime) {
s = Status::NotFound(key);
}
}
}
return s;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1144,14 +1222,24 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
/* TODO: Add TTL Version Get() */
if (mem->Get(lkey, value, &s)) {
// isLive(key, value, s, ttl);
isLive(key, value, s);
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// isLive(key, value, s, ttl);
isLive(key, value, s);
// Done
} else {
s = current->Get(options, lkey, value, &stats);
// if (s.ok()) isLive(key, value, s, ttl);
if (s.ok()) isLive(key, value, s);
have_stat_update = true;
}
/* --------------------------- */
mutex_.Lock();
}
@ -1491,6 +1579,17 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
return Write(opt, &batch);
}
/* TODO: Add TTL Version Put() */
Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl){
WriteBatch batch;
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto end = now + ttl;
Slice value_timestamp = Slice(value.ToString() + "_ts_" + std::to_string(end));
batch.Put(key, value_timestamp);
return Write(opt, &batch);
}
/* --------------------------- */
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);
@ -1508,12 +1607,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (!s.ok()) {
std::cerr << "Recover failed: " << s.ToString() << std::endl;
}
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (!s.ok()) {
std::cerr << "NewWritableFile failed: " << s.ToString() << std::endl;
}
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
@ -1527,6 +1636,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
if (!s.ok()) {
std::cerr << "LogAndApply failed: " << s.ToString() << std::endl;
}
}
if (s.ok()) {
impl->RemoveObsoleteFiles();

+ 7
- 0
db/db_impl.h View File

@ -28,6 +28,7 @@ class VersionSet;
class DBImpl : public DB {
public:
uint64_t ttl;
DBImpl(const Options& options, const std::string& dbname);
DBImpl(const DBImpl&) = delete;
@ -42,6 +43,12 @@ class DBImpl : public DB {
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
/* TODO: Add TTL Version Put() */
Status Put(const WriteOptions&, const Slice& key,
const Slice& value, uint64_t ttl) override;
/* --------------------------- */
Iterator* NewIterator(const ReadOptions&) override;
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;

+ 3
- 0
include/leveldb/db.h View File

@ -54,6 +54,7 @@ class LEVELDB_EXPORT DB {
DB** dbptr);
DB() = default;
uint64_t ttl;
DB(const DB&) = delete;
DB& operator=(const DB&) = delete;
@ -148,8 +149,10 @@ class LEVELDB_EXPORT DB {
// ----------------------------For TTL-----------------------------
// key设置ttl
/* TODO: Add TTL Version Put() */
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
/* --------------------------- */
};
// Destroy the contents of the specified database.

+ 26
- 0
test/db_test1.cc View File

@ -0,0 +1,26 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 4
- 6
test/db_test2.cc View File

@ -1,10 +1,8 @@
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace leveldb;
constexpr int value_size = 2048;
@ -37,7 +35,7 @@ void InsertData(DB *db) {
void GetData(DB *db, int size = (1 << 30)) {
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
srand(0);
for (int i = 0; i < 100; i++) {
@ -45,6 +43,7 @@ void GetData(DB *db, int size = (1 << 30)) {
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
std::cout << value << std::endl;
}
// 范围查询
@ -68,7 +67,6 @@ int main() {
GetData(db);
delete db;
}
return 0;
}
return 0;
}

+ 26
- 0
test/test_a.cpp View File

@ -0,0 +1,26 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 13
- 0
test/time.cpp View File

@ -0,0 +1,13 @@
#include <iostream>
#include <chrono>
//int main() {
// auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
// auto end = now + 5;
//
// // 输出当前时间点和未来时间点
// std::cout << "Current time point: " << now << std::endl;
// std::cout << "Future time point (+5 seconds): " << end << std::endl;
//
// return 0;
//}

+ 29
- 12
test/ttl_test.cc View File

@ -1,17 +1,18 @@
#include <chrono>
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
Status OpenDB(std::string dbName, DB **db) {
std::string rm_command = "rm -rf " + dbName;
system(rm_command.c_str());
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
@ -20,14 +21,24 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(0);
srand(42);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->ttl = ttl;
db->Put(writeOptions, key, value, ttl);
}
// for (int i = 0; i < key_num; i++) {
// int key_ = rand() % key_num+1;
// std::string key = std::to_string(key_ );
// std::string value = "aaaaaa_ts_5678aaaaaa";
// db->Put(writeOptions, key, value);
// }
}
void GetData(DB *db, int size = (1 << 30)) {
@ -35,7 +46,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size;
// 点查
srand(0);
srand(42);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -46,7 +57,7 @@ void GetData(DB *db, int size = (1 << 30)) {
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
if(OpenDB("testdb_ReadTTL", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
@ -58,12 +69,14 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(0);
srand(42);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::cout << status.ToString() << " key: " << key << " value: ******" << value.substr(value.find("_ts_")) << " now: " << now << std::endl;
ASSERT_TRUE(status.ok());
}
@ -74,6 +87,8 @@ TEST(TestTTL, ReadTTL) {
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
std::cout << status.ToString() << " key: " << key << " value: ******" << value.substr(value.find("_ts_")) << " now: " << now << std::endl;
ASSERT_FALSE(status.ok());
}
}
@ -81,8 +96,8 @@ TEST(TestTTL, ReadTTL) {
TEST(TestTTL, CompactionTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
if(OpenDB("testdb_CompactionTTL", &db).ok() == false) {
std::cerr << "open db failed" << OpenDB("testdb_CompactionTTL", &db).ToString() << std::endl;
abort();
}
@ -93,16 +108,18 @@ TEST(TestTTL, CompactionTTL) {
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
std::cout << "ApproximateSizes before TTL: " << sizes[0] << std::endl;
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges[1];
// leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
// uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
std::cout << "ApproximateSizes after TTL: " << sizes[0] << std::endl;
ASSERT_EQ(sizes[0], 0);
}
@ -111,4 +128,4 @@ int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

Loading…
Cancel
Save