Bläddra i källkod

内存读写可以运行pass

lzj_version
林子骥 1 månad sedan
förälder
incheckning
2f9e49d9ed
8 ändrade filer med 201 tillägg och 24 borttagningar
  1. +7
    -1
      CMakeLists.txt
  2. +29
    -12
      db/db_impl.cc
  3. +1
    -0
      db/db_impl.h
  4. +7
    -0
      db/memtable.cc
  5. +2
    -0
      include/leveldb/db.h
  6. +5
    -1
      include/leveldb/status.h
  7. +57
    -10
      test/db_test2.cc
  8. +93
    -0
      test/simple_test.cc

+ 7
- 1
CMakeLists.txt Visa fil

@ -529,4 +529,10 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(simple_test
"${PROJECT_SOURCE_DIR}/test/simple_test.cc"
)
target_link_libraries(simple_test PRIVATE leveldb gtest)

+ 29
- 12
db/db_impl.cc Visa fil

@ -4,14 +4,6 @@
#include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -22,11 +14,21 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
@ -1162,6 +1164,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
if(s.ok()){
auto a = env_->GetCurrentTime();
auto b = GetTS(value);
std::cout<< "read when " << a<<std::endl;
std::cout<< "read timestamp "<<b << std::endl;
if(env_->GetCurrentTime() > GetTS(value)){
return Status::Expire("Expire",Slice());
}
}
return s;
}
@ -1475,7 +1486,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
MutexLock l(&mutex_);
Version* v = versions_->current();
v->Ref();
for (int i = 0; i < n; i++) {
// Convert user_key into a corresponding internal key.
InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
@ -1484,7 +1494,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
sizes[i] = (limit >= start ? limit - start : 0);
}
v->Unref();
}
@ -1503,6 +1512,12 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
val_with_ts->append(reinterpret_cast<const char*>(&st), sizeof(st));
}
uint64_t DBImpl::GetTS(const std::string* val) {
uint64_t expiration_time;
memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t));
return expiration_time;
}
/**
*
* @param options
@ -1521,14 +1536,16 @@ Status DB::Put(const WriteOptions& options, const Slice& key,
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count(); + ttl;
.count() + ttl;
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
std::cout<<"PUT"<<std::endl;
std::cout << "timestamp: " << expiration_time << std::endl;
//"a\323='\277\222\001\000"
WriteBatch batch;
batch.Put(key, Slice(val_with_ts));
return Write(options, &batch);

+ 1
- 0
db/db_impl.h Visa fil

@ -52,6 +52,7 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl);
static uint64_t GetTS(const std::string* val);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]

+ 7
- 0
db/memtable.cc Visa fil

@ -100,6 +100,13 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
table_.Insert(buf);
}
/**
* @descrption: ttl问题场景keyttl的相对大小于版本号的相对大小不同
* @param key
* @param value
* @param s
* @return
*/
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);

+ 2
- 0
include/leveldb/db.h Visa fil

@ -152,6 +152,8 @@ class LEVELDB_EXPORT DB {
const Slice& value, uint64_t ttl) = 0;
// virtual Status Write(const WriteOptions& options, WriteBatch* updates,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
};
// Destroy the contents of the specified database.

+ 5
- 1
include/leveldb/status.h Visa fil

@ -52,6 +52,9 @@ class LEVELDB_EXPORT Status {
static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, msg, msg2);
}
static Status Expire(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kExpire, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); }
@ -82,7 +85,8 @@ class LEVELDB_EXPORT Status {
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
kIOError = 5,
kExpire = 6
};
Code code() const {

+ 57
- 10
test/db_test2.cc Visa fil

@ -1,14 +1,21 @@
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 256 << 20;
// 3. 数据管理(Manifest/创建/恢复数据库)
Status OpenDB(std::string dbName, DB **db) {
@ -40,9 +47,12 @@ void InsertData(DB *db, uint64_t ttl/* second */) {
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
key_ = 1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
break;
}
}
// 2. 数据访问(如何读数据)
@ -54,9 +64,11 @@ void GetData(DB *db, int size = (1 << 30)) {
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
//break;
}
// 范围查询
@ -70,19 +82,54 @@ void GetData(DB *db, int size = (1 << 30)) {
int main() {
// DB *db;
// if(OpenDB("testdb", &db).ok()) {
// uint64_t ttl = 20;
//
// InsertData(db, ttl);
// delete db;
// }
//
// if(OpenDB("testdb", &db).ok()) {
// GetData(db);
// delete db;
// }
//
DB *db;
if(OpenDB("testdb", &db).ok()) {
uint64_t ttl = 20;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
InsertData(db, ttl);
delete db;
uint64_t ttl = 200;
InsertData(db, ttl);
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
key_ = 1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.ok());
break;
}
if(OpenDB("testdb", &db).ok()) {
GetData(db);
delete db;
Env::Default()->SleepForMicroseconds(ttl * 10000);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
key_ = 1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.ok() != true);
break;
}
return 0;
}

+ 93
- 0
test/simple_test.cc Visa fil

@ -0,0 +1,93 @@
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
#include <iostream>
#include <cstdlib>
#include <ctime>
using namespace leveldb;
// 3. 数据管理(Manifest/创建/恢复数据库)
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
options.filter_policy = NewBloomFilterPolicy(10);
return DB::Open(options, dbName, db);
}
// 1. 存储(数据结构与写入)
// 4. 数据合并(Compaction)
//void InsertData(DB *db) {
// WriteOptions writeOptions;
// int key_num = data_size / value_size;
// srand(static_cast<unsigned int>(time(0)));
//
// for (int i = 0; i < key_num; i++) {
// int key_ = rand() % key_num+1;
// std::string key = std::to_string(key_);
// std::string value(value_size, 'a');
// db->Put(writeOptions, key, value);
// }
//}
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
key_ = 1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
break;
}
}
int main() {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 200;
// InsertData(db, ttl);
WriteOptions writeOptions;
int key_ = 1;
std::string key = std::to_string(key_);
std::string value(1, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
ReadOptions readOptions;
std::string value_read;
Status status = db->Get(readOptions, key, &value);
//std::cout<<"start sleep for " <<
Env::Default()->SleepForMicroseconds(ttl * 100000);
key_ = 1;
key = std::to_string(key_);
std::string value_read_second;
status = db->Get(readOptions, key, &value_read_second);
assert(status.ok() != true);
return 0;
}

Laddar…
Avbryt
Spara