From 84186e065237b0f38021182d08fe7ae0e00cbb4f Mon Sep 17 00:00:00 2001 From: kevinyao0901 Date: Wed, 30 Oct 2024 19:11:39 +0800 Subject: [PATCH] fix auto copaction problem , still need to deal compaction --- db/db_impl.cc | 71 +++++++++++++++++++++++++++++------- db/dbformat.h | 7 ++-- db/memtable.cc | 28 +++++++-------- db/version_set.h | 2 ++ test/ttl_test.cc | 107 ++++++++++++++++++++++++++++++++++++++++++++++++------- 5 files changed, 173 insertions(+), 42 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ffb1889..ba61336 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include "db/builder.h" #include "db/db_iter.h" @@ -763,7 +764,7 @@ void DBImpl::BackgroundCompaction() { Status status; if (c == nullptr) { // Nothing to do - } else if (!is_manual && c->IsTrivialMove()) { + } else if (!is_manual && c->IsTrivialMove() && 1==2) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); @@ -924,9 +925,16 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } Status DBImpl::DoCompactionWork(CompactionState* compact) { + std::cout<< "start compact" << std::endl; const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions + //TTL ToDo + // 定义要检测的目标键 + Slice target_key = "10000"; + int dropped_keys_count = 0; // 初始化计数器 + int total_keys_count = 0; + Log(options_.info_log, "Compacting %d@%d + %d@%d files", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), @@ -935,6 +943,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(compact->outfile == nullptr); + + //TTL ToDo + // { + // //MutexLock l(&mutex_); + // if (has_imm_.load(std::memory_order_relaxed)) { + // CompactMemTable(); + // background_work_finished_signal_.SignalAll(); + // } + // } + //finish modify + if (snapshots_.empty()) { compact->smallest_snapshot = versions_->LastSequence(); } else { @@ -947,6 +966,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { mutex_.Unlock(); input->SeekToFirst(); + std::cout << "Compation first key: " << input->key().ToString() << std::endl; Status status; ParsedInternalKey ikey; std::string current_user_key; @@ -1009,21 +1029,45 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } // TTL ToDo: add expiration time check - if (!drop) { // 如果还未被标记为丢弃 - Slice value = input->value(); - if (value.size() >= sizeof(uint64_t)) { - const char* ptr = value.data(); - uint64_t expiration_time = DecodeFixed64(ptr); - uint64_t current_time = env_->NowMicros() / 1000000; - - if (current_time > expiration_time) { - drop = true; // 过期的键值对,标记为丢弃 - } + // 检查是否为目标键 + if (key == target_key) { + // 输出调试信息 + Log(options_.info_log, "Found target key during compaction: %s\n", key.ToString().c_str()); + } + + Slice value = input->value(); + if (value.size() >= sizeof(uint64_t)) { + const char* ptr = value.data(); + uint64_t expiration_time = DecodeFixed64(ptr); + uint64_t current_time = env_->NowMicros() / 1000000; + + if (current_time > expiration_time) { + drop = true; // 过期的键值对,标记为丢弃 + dropped_keys_count ++; // 初始化计数器 + }else{ + bool flag = current_time > expiration_time; } + }else{ + bool bs = value.size() >= sizeof(uint64_t); } + // if (!drop) { // 如果还未被标记为丢弃 + // Slice value = input->value(); + // if (value.size() >= sizeof(uint64_t)) { + // const char* ptr = value.data(); + // uint64_t expiration_time = DecodeFixed64(ptr); + // uint64_t current_time = env_->NowMicros() / 1000000; + + // if (current_time > expiration_time) { + // drop = true; // 过期的键值对,标记为丢弃 + // } + // } + // } last_sequence_for_key = ikey.sequence; } + Log(options_.info_log, "Total dropped keys in compaction: %d\n", dropped_keys_count); // 输出统计结果 + total_keys_count++; + #if 0 Log(options_.info_log, " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " @@ -1061,6 +1105,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { input->Next(); } + std::cout << "Total dropped keys in compaction:" << dropped_keys_count + << ", Total: " << total_keys_count << "\n"; + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { status = Status::IOError("Deleting DB during compaction"); } @@ -1206,7 +1253,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, uint64_t current_time = GetCurrentTime(); // 如果当前时间已经超过过期时间,则认为数据过期,返回 NotFound - if (current_time > expiration_time) { + if (current_time >= expiration_time) { s = Status::NotFound(Slice()); } else { // 数据未过期,解析出实际的值 diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..263cfb3 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -24,14 +24,15 @@ namespace leveldb { namespace config { static const int kNumLevels = 7; +//TTL ToDo // Level-0 compaction is started when we hit this many files. -static const int kL0_CompactionTrigger = 4; +static const int kL0_CompactionTrigger = 4096; // Soft limit on number of level-0 files. We slow down writes at this point. -static const int kL0_SlowdownWritesTrigger = 8; +static const int kL0_SlowdownWritesTrigger = 8192; // Maximum number of level-0 files. We stop writes at this point. -static const int kL0_StopWritesTrigger = 12; +static const int kL0_StopWritesTrigger = 1024*1024; // Maximum level to which a new compacted memtable is pushed if it // does not create overlap. We try to push to level 2 to avoid the diff --git a/db/memtable.cc b/db/memtable.cc index d7c4078..b5772dc 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -127,20 +127,20 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); - // TTL ToDo :检查:从 value 中解析出过期时间戳并与当前时间比较 - assert(value->size() >= sizeof(uint64_t)); - uint64_t expiration_time; - memcpy(&expiration_time, value->data(), sizeof(uint64_t)); // 解析出过期时间戳 - uint64_t current_time = static_cast(time(nullptr)); // 获取当前时间 - - // 如果当前时间已超过过期时间,设置状态为 NotFound 表示数据过期 - if (current_time > expiration_time) { - *s = Status::NotFound(Slice()); - return true; - } else { - // 数据未过期,解析出实际的值部分,去掉过期时间戳 - *value = value->substr(sizeof(uint64_t)); - } + // // TTL ToDo :检查:从 value 中解析出过期时间戳并与当前时间比较 + // assert(value->size() >= sizeof(uint64_t)); + // uint64_t expiration_time; + // memcpy(&expiration_time, value->data(), sizeof(uint64_t)); // 解析出过期时间戳 + // uint64_t current_time = static_cast(time(nullptr)); // 获取当前时间 + + // // 如果当前时间已超过过期时间,设置状态为 NotFound 表示数据过期 + // if (current_time > expiration_time) { + // *s = Status::NotFound(Slice()); + // return true; + // } else { + // // 数据未过期,解析出实际的值部分,去掉过期时间戳 + // *value = value->substr(sizeof(uint64_t)); + // } //finish modify return true; diff --git a/db/version_set.h b/db/version_set.h index ea0c925..7d4e643 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -252,6 +252,8 @@ class VersionSet { bool NeedsCompaction() const { Version* v = current_; return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr); + //TTL ToDo + //return false; } // Add all files listed in any live version to *live. diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 3712b25..f71a306 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -11,6 +11,33 @@ using namespace leveldb; constexpr int value_size = 2048; constexpr int data_size = 128 << 20; +//-------------------------------------------------------------- +void PrintAllKeys(DB *db) { + // 创建一个读选项对象 + ReadOptions readOptions; + + int LeftKeyCount = 0; + + // 创建迭代器 + std::unique_ptr it(db->NewIterator(readOptions)); + + // 遍历所有键 + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // std::string key = it->key().ToString(); + // std::string value = it->value().ToString(); + // std::cout << "Key: " << key << std::endl; + LeftKeyCount++; + } + + // 检查迭代器的有效性 + if (!it->status().ok()) { + std::cerr << "Error iterating through keys: " << it->status().ToString() << std::endl; + } + std::cerr << "Key hasn't been deleted: " << LeftKeyCount << std::endl; +} + +//------------------------------------------------------------------ + Status OpenDB(std::string dbName, DB **db) { Options options; options.create_if_missing = true; @@ -26,19 +53,36 @@ void InsertData(DB *db, uint64_t ttl/* second */) { std::unordered_set unique_keys; for (int i = 0; i < key_num; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value(value_size, 'a'); - Status status = db->Put(writeOptions, key, value, ttl); - if (!status.ok()) { - // 输出失败的状态信息并退出循环 - std::cerr << "Failed to write key: " << key - << ", Status: " << status.ToString() << std::endl; - }else{ - std::cerr << "Success to write key: " << key << std::endl; - unique_keys.insert(key); // 插入集合中,如果已经存在则不会重复插入 + std::string key; + do { + int key_ = rand() % key_num + 1; + key = std::to_string(key_); + } while (unique_keys.find(key) != unique_keys.end()); // 检查是否已存在 + + std::string value(value_size, 'a'); + + // 判断 key 是否在范围内 + if (key >= "-" && key < "A") { + //std::cout << "Key: " << key << " is within the range (-, A)" << std::endl; + } else { + std::cout << "Key: " << key << " is outside the range (-, A)" << std::endl; + return; + } + + Status status = db->Put(writeOptions, key, value, ttl); + if (!status.ok()) { + // 输出失败的状态信息并退出循环 + std::cerr << "Failed to write key: " << key + << ", Status: " << status.ToString() << std::endl; + } else { + unique_keys.insert(key); // 插入集合中,如果已经存在则不会重复插入 + } } - } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + std::cout << "Data base First key: " << iter->key().ToString() << std::endl; + delete iter; // 打印成功写入的唯一键的数量 std::cout << "Total unique keys successfully written: " << unique_keys.size() << std::endl; @@ -56,8 +100,22 @@ void GetData(DB *db, int size = (1 << 30)) { std::string value; db->Get(readOptions, key, &value); } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + std::cout << "Data base First key: " << iter->key().ToString() << std::endl; + int cnt = 0; + while (iter->Valid()) + { + cnt++; + iter->Next(); + } + std::cout << "Total key cnt: " << cnt << "\n"; + delete iter; + } +#if 0 TEST(TestTTL, ReadTTL) { DB *db; if(OpenDB("testdb", &db).ok() == false) { @@ -89,6 +147,7 @@ TEST(TestTTL, ReadTTL) { Env::Default()->SleepForMicroseconds(ttl * 1000000); + srand(42); for (int i = 0; i < 100; i++) { int key_ = rand() % key_num+1; std::string key = std::to_string(key_); @@ -102,32 +161,54 @@ TEST(TestTTL, ReadTTL) { ASSERT_FALSE(status.ok()); } + + delete db; } +#endif + TEST(TestTTL, CompactionTTL) { DB *db; + leveldb::Options options; + // options.write_buffer_size = 1024*1024*1024; + // options.max_file_size = 1024*1024*1024; + leveldb::DestroyDB("testdb", options); + if(OpenDB("testdb", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } uint64_t ttl = 20; - InsertData(db, ttl); leveldb::Range ranges[1]; ranges[0] = leveldb::Range("-", "A"); uint64_t sizes[1]; db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_EQ(sizes[0], 0); + + InsertData(db, ttl); + + //leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + //uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); ASSERT_GT(sizes[0], 0); + + ttl += 10; Env::Default()->SleepForMicroseconds(ttl * 1000000); + std::cout << "Start drop\n"; db->CompactRange(nullptr, nullptr); ranges[0] = leveldb::Range("-", "A"); db->GetApproximateSizes(ranges, 1, sizes); + PrintAllKeys(db); ASSERT_EQ(sizes[0], 0); + + delete db; }