diff --git a/CMakeLists.txt b/CMakeLists.txt index fd98cb6..3b03133 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -529,4 +529,10 @@ target_link_libraries(db_test2 PRIVATE leveldb) add_executable(ttl_test "${PROJECT_SOURCE_DIR}/test/ttl_test.cc" ) -target_link_libraries(ttl_test PRIVATE leveldb gtest) \ No newline at end of file +target_link_libraries(ttl_test PRIVATE leveldb gtest) + + +add_executable(simple_test + "${PROJECT_SOURCE_DIR}/test/simple_test.cc" +) +target_link_libraries(simple_test PRIVATE leveldb gtest) \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index 8eb536c..0d5e730 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,14 +4,6 @@ #include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include - #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -22,11 +14,21 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include +#include +#include +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" + #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -1162,6 +1164,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); + if(s.ok()){ + auto a = env_->GetCurrentTime(); + auto b = GetTS(value); + std::cout<< "read when " << a<GetCurrentTime() > GetTS(value)){ + return Status::Expire("Expire",Slice()); + } + } return s; } @@ -1475,7 +1486,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { MutexLock l(&mutex_); Version* v = versions_->current(); v->Ref(); - for (int i = 0; i < n; i++) { // Convert user_key into a corresponding internal key. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); @@ -1484,7 +1494,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { uint64_t limit = versions_->ApproximateOffsetOf(v, k2); sizes[i] = (limit >= start ? limit - start : 0); } - v->Unref(); } @@ -1503,6 +1512,12 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { val_with_ts->append(reinterpret_cast(&st), sizeof(st)); } +uint64_t DBImpl::GetTS(const std::string* val) { + uint64_t expiration_time; + memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t)); + return expiration_time; +} + /** * * @param options @@ -1521,14 +1536,16 @@ Status DB::Put(const WriteOptions& options, const Slice& key, uint64_t expiration_time = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) - .count(); + ttl; + .count() + ttl; // 追加原始 value 到 val_with_ts val_with_ts.append(value.data(), value.size()); // 将 expiration_time 追加到 val_with_ts val_with_ts.append(reinterpret_cast(&expiration_time), sizeof(expiration_time)); - + std::cout<<"PUT"<Put(writeOptions, key, value, ttl); + std::cout << "time to alive" << ttl << std::endl; + break; } } // 2. 数据访问(如何读数据) @@ -54,9 +64,11 @@ void GetData(DB *db, int size = (1 << 30)) { srand(static_cast(time(0))); for (int i = 0; i < 100; i++) { int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); std::string value; db->Get(readOptions, key, &value); + //break; } // 范围查询 @@ -70,19 +82,54 @@ void GetData(DB *db, int size = (1 << 30)) { int main() { +// DB *db; +// if(OpenDB("testdb", &db).ok()) { +// uint64_t ttl = 20; +// +// InsertData(db, ttl); +// delete db; +// } +// +// if(OpenDB("testdb", &db).ok()) { +// GetData(db); +// delete db; +// } +// DB *db; - if(OpenDB("testdb", &db).ok()) { - uint64_t ttl = 20; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } - InsertData(db, ttl); - delete db; + uint64_t ttl = 200; + + InsertData(db, ttl); + + ReadOptions readOptions; + Status status; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + key_ = 1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + assert(status.ok()); + break; } - if(OpenDB("testdb", &db).ok()) { - GetData(db); - delete db; + Env::Default()->SleepForMicroseconds(ttl * 10000); + + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + key_ = 1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + assert(status.ok() != true); + break; } - return 0; } diff --git a/test/simple_test.cc b/test/simple_test.cc new file mode 100644 index 0000000..a4e5f3c --- /dev/null +++ b/test/simple_test.cc @@ -0,0 +1,93 @@ +#include "leveldb/db.h" +#include "leveldb/filter_policy.h" + +#include "leveldb/env.h" +#include "leveldb/db.h" + + +using namespace leveldb; + +constexpr int value_size = 2048; +constexpr int data_size = 128 << 20; + +#include +#include +#include + +using namespace leveldb; + + +// 3. 数据管理(Manifest/创建/恢复数据库) +Status OpenDB(std::string dbName, DB **db) { + Options options; + options.create_if_missing = true; + options.filter_policy = NewBloomFilterPolicy(10); + return DB::Open(options, dbName, db); +} + +// 1. 存储(数据结构与写入) +// 4. 数据合并(Compaction) +//void InsertData(DB *db) { +// WriteOptions writeOptions; +// int key_num = data_size / value_size; +// srand(static_cast(time(0))); +// +// for (int i = 0; i < key_num; i++) { +// int key_ = rand() % key_num+1; +// std::string key = std::to_string(key_); +// std::string value(value_size, 'a'); +// db->Put(writeOptions, key, value); +// } +//} + +void InsertData(DB *db, uint64_t ttl/* second */) { + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + for (int i = 0; i < key_num; i++) { + int key_ = rand() % key_num+1; + key_ = 1; + std::string key = std::to_string(key_); + std::string value(value_size, 'a'); + db->Put(writeOptions, key, value, ttl); + std::cout << "time to alive" << ttl << std::endl; + break; + } +} + +int main() { + + DB *db; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 200; + +// InsertData(db, ttl); + WriteOptions writeOptions; + int key_ = 1; + std::string key = std::to_string(key_); + std::string value(1, 'a'); + db->Put(writeOptions, key, value, ttl); + std::cout << "time to alive" << ttl << std::endl; + + ReadOptions readOptions; + std::string value_read; + Status status = db->Get(readOptions, key, &value); + + + //std::cout<<"start sleep for " << + Env::Default()->SleepForMicroseconds(ttl * 100000); + + + key_ = 1; + key = std::to_string(key_); + std::string value_read_second; + status = db->Get(readOptions, key, &value_read_second); + assert(status.ok() != true); + + return 0; +} +