Bladeren bron

暂时完成内存部分的write内容

lzj_version
林子骥 1 maand geleden
bovenliggende
commit
fe61ee09a9
11 gewijzigde bestanden met toevoegingen van 179 en 6 verwijderingen
  1. +1
    -0
      CMakeLists.txt
  2. +51
    -0
      db/db_impl.cc
  3. +6
    -1
      db/db_impl.h
  4. +9
    -0
      db/dbformat.h
  5. +2
    -1
      db/memtable.cc
  6. +2
    -0
      include/leveldb/db.h
  7. +8
    -0
      include/leveldb/env.h
  8. +3
    -0
      include/leveldb/slice.h
  9. +18
    -4
      test/db_test2.cc
  10. +79
    -0
      test/ttl_test.cc
  11. +0
    -0
      util/ttl/tt_impl.cc

+ 1
- 0
CMakeLists.txt Bestand weergeven

@ -521,6 +521,7 @@ endif(LEVELDB_INSTALL)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
util/ttl/tt_impl.cc
)
target_link_libraries(db_test2 PRIVATE leveldb)

+ 51
- 0
db/db_impl.cc Bestand weergeven

@ -48,6 +48,7 @@ struct DBImpl::Writer {
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};
@ -1197,6 +1198,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
return DB::Put(options, key, value,ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
@ -1483,6 +1488,52 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref();
}
/**
*
* @param val
* @param val_with_ts val后面连接上预计超时的timestamp
* @param ttl
*/
void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
val_with_ts->reserve(kTSLength + val.size());
char ts_string[kTSLength];
uint64_t st = env_->GetCurrentTime() + ttl;
*val_with_ts = val.ToString(); // 原始 value
val_with_ts->append(reinterpret_cast<const char*>(&st), sizeof(st));
}
/**
*
* @param options
* @param key
* @param value
* @param ttl
* @return
*/
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
// 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value
std::string val_with_ts;
val_with_ts.reserve(value.size() + sizeof(uint64_t));
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count(); + ttl;
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
WriteBatch batch;
batch.Put(key, Slice(val_with_ts));
return Write(options, &batch);
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

+ 6
- 1
db/db_impl.h Bestand weergeven

@ -38,6 +38,9 @@ class DBImpl : public DB {
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
@ -48,7 +51,7 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
@ -70,6 +73,8 @@ class DBImpl : public DB {
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
// Status Write(const WriteOptions& options, WriteBatch* updates,
// uint64_t ttl) override;
private:
friend class DB;

+ 9
- 0
db/dbformat.h Bestand weergeven

@ -66,6 +66,9 @@ typedef uint64_t SequenceNumber;
// can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
static const uint32_t kTSLength = sizeof(uint64_t ); // size of timestamp
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
@ -85,6 +88,8 @@ inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
// Append the serialization of "key" to *result.
void AppendInternalKey(std::string* result, const ParsedInternalKey& key);
// Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true.
//
@ -219,6 +224,10 @@ inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_DBFORMAT_H_

+ 2
- 1
db/memtable.cc Bestand weergeven

@ -81,6 +81,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// timestamp : uint64
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -94,7 +95,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
std::memcpy(p, value.data(), val_size);//value包含timestamp
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

+ 2
- 0
include/leveldb/db.h Bestand weergeven

@ -150,6 +150,8 @@ class LEVELDB_EXPORT DB {
// key设置ttl
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
// virtual Status Write(const WriteOptions& options, WriteBatch* updates,uint64_t ttl) = 0;
};
// Destroy the contents of the specified database.

+ 8
- 0
include/leveldb/env.h Bestand weergeven

@ -13,6 +13,7 @@
#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_
#define STORAGE_LEVELDB_INCLUDE_ENV_H_
#include <chrono>
#include <cstdarg>
#include <cstdint>
#include <string>
@ -64,6 +65,13 @@ class LEVELDB_EXPORT Env {
// The result of Default() belongs to leveldb and must never be deleted.
static Env* Default();
uint64_t GetCurrentTime() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count(); //
}
// Create an object that sequentially reads the file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores nullptr in *result and returns non-OK. If the file does

+ 3
- 0
include/leveldb/slice.h Bestand weergeven

@ -38,6 +38,7 @@ class LEVELDB_EXPORT Slice {
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) {}
// Intentionally copyable.
Slice(const Slice&) = default;
Slice& operator=(const Slice&) = default;
@ -48,6 +49,7 @@ class LEVELDB_EXPORT Slice {
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
@ -74,6 +76,7 @@ class LEVELDB_EXPORT Slice {
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); }

+ 18
- 4
test/db_test2.cc Bestand weergeven

@ -20,7 +20,20 @@ Status OpenDB(std::string dbName, DB **db) {
// 1. 存储(数据结构与写入)
// 4. 数据合并(Compaction)
void InsertData(DB *db) {
//void InsertData(DB *db) {
// WriteOptions writeOptions;
// int key_num = data_size / value_size;
// srand(static_cast<unsigned int>(time(0)));
//
// for (int i = 0; i < key_num; i++) {
// int key_ = rand() % key_num+1;
// std::string key = std::to_string(key_);
// std::string value(value_size, 'a');
// db->Put(writeOptions, key, value);
// }
//}
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
@ -29,10 +42,9 @@ void InsertData(DB *db) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value);
db->Put(writeOptions, key, value, ttl);
}
}
// 2. 数据访问(如何读数据)
void GetData(DB *db, int size = (1 << 30)) {
ReadOptions readOptions;
@ -60,7 +72,9 @@ int main() {
DB *db;
if(OpenDB("testdb", &db).ok()) {
InsertData(db);
uint64_t ttl = 20;
InsertData(db, ttl);
delete db;
}

+ 79
- 0
test/ttl_test.cc Bestand weergeven

@ -14,6 +14,84 @@ constexpr int data_size = 128 << 20;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value, ttl);
}
}
void GetData(DB *db, int size = (1 << 30)){
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
}
}
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok());
}
Env::Default()->SleepForMicroseconds(ttl * 1000000);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok());
}
}
return DB::Open(options, dbName, db);
}
@ -109,6 +187,7 @@ TEST(TestTTL, CompactionTTL) {
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 0
- 0
util/ttl/tt_impl.cc Bestand weergeven


Laden…
Annuleren
Opslaan