- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
- //
- // WriteBatch::rep_ :=
- // sequence: fixed64
- // count: fixed32
- // data: record[count]
- // record :=
- // kTypeValue varstring varstring |
- // kTypeDeletion varstring
- // varstring :=
- // len: varint32
- // data: uint8[len]
-
- #include "leveldb/write_batch.h"
-
- #include "db/dbformat.h"
- #include "db/memtable.h"
- #include "db/write_batch_internal.h"
- #include "leveldb/db.h"
- #include "util/coding.h"
-
- #include <sstream> // For std::ostringstream 心
- #include <cstdint> // 引入uint64_t类型,燕
- #include "leveldb/env.h" //燕
- namespace leveldb {
-
- // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
- static const size_t kHeader = 12;
-
- WriteBatch::WriteBatch() { Clear(); }
-
- WriteBatch::~WriteBatch() = default;
-
- WriteBatch::Handler::~Handler() = default;
-
- void WriteBatch::Clear() {
- rep_.clear();
- rep_.resize(kHeader);
- }
-
- size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
-
- Status WriteBatch::Iterate(Handler* handler) const {
- Slice input(rep_);
- if (input.size() < kHeader) {
- return Status::Corruption("malformed WriteBatch (too small)");
- }
-
- input.remove_prefix(kHeader);
- Slice key, value;
- int found = 0;
- while (!input.empty()) {
- found++;
- char tag = input[0];
- input.remove_prefix(1);
- switch (tag) {
- case kTypeValue:
- if (GetLengthPrefixedSlice(&input, &key) &&
- GetLengthPrefixedSlice(&input, &value)) {
- handler->Put(key, value);
- } else {
- return Status::Corruption("bad WriteBatch Put");
- }
- break;
- case kTypeDeletion:
- if (GetLengthPrefixedSlice(&input, &key)) {
- handler->Delete(key);
- } else {
- return Status::Corruption("bad WriteBatch Delete");
- }
- break;
- default:
- return Status::Corruption("unknown WriteBatch tag");
- }
- }
- if (found != WriteBatchInternal::Count(this)) {
- return Status::Corruption("WriteBatch has wrong count");
- } else {
- return Status::OK();
- }
- }
-
- int WriteBatchInternal::Count(const WriteBatch* b) {
- return DecodeFixed32(b->rep_.data() + 8);
- }
-
- void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
- EncodeFixed32(&b->rep_[8], n);
- }
-
- SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
- return SequenceNumber(DecodeFixed64(b->rep_.data()));
- }
-
- void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
- EncodeFixed64(&b->rep_[0], seq);
- }
-
- void WriteBatch::Put(const Slice& key, const Slice& value) {
- WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
- rep_.push_back(static_cast<char>(kTypeValue));
- PutLengthPrefixedSlice(&rep_, key);
- PutLengthPrefixedSlice(&rep_, value);
- }
-
- // void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
- // WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
- // rep_.push_back(static_cast<char>(kTypeValue));
- // PutLengthPrefixedSlice(&rep_, key);
-
- // // 获取当前时间并计算过期时间
- // uint64_t current_time = Env::Default()->NowMicros() / 1000000; // 当前时间(秒)
- // uint64_t expire_time = current_time + ttl; // 计算未来的过期时间
-
- // // 将过期时间编码为64位整数
- // rep_.append(reinterpret_cast<const char*>(&expire_time), sizeof(expire_time)); // 添加过期时间戳
-
- // // 添加原始值
- // PutLengthPrefixedSlice(&rep_, value);
- // }
-
- void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
- WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
- rep_.push_back(static_cast<char>(kTypeValue));
- PutLengthPrefixedSlice(&rep_, key);
-
- // 获取当前时间
-
- auto now = std::chrono::system_clock::now();
-
- // 加上ttl
- auto future_time = now + std::chrono::seconds(ttl);
-
- // 转换为 time_t
- std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time);
-
- // 将 time_t 转换为 tm 结构
- std::tm* local_tm = std::localtime(&future_time_t);
-
- // 格式化为字符串
- char buffer[20]; // 格式化字符串的缓冲区
- std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm);
- std::string future_time_str(buffer);
-
- // 拼接原本的值和时间字符串
- std::string combined_str = value.ToString() + future_time_str;
- PutLengthPrefixedSlice(&rep_, Slice(combined_str));
-
- } // 心
-
- void WriteBatch::Delete(const Slice& key) {
- WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
- rep_.push_back(static_cast<char>(kTypeDeletion));
- PutLengthPrefixedSlice(&rep_, key);
- }
-
- void WriteBatch::Append(const WriteBatch& source) {
- WriteBatchInternal::Append(this, &source);
- }
-
- namespace {
- class MemTableInserter : public WriteBatch::Handler {
- public:
- SequenceNumber sequence_;
- MemTable* mem_;
-
- void Put(const Slice& key, const Slice& value) override {
- mem_->Add(sequence_, kTypeValue, key, value);
- sequence_++;
- }
- void Delete(const Slice& key) override {
- mem_->Add(sequence_, kTypeDeletion, key, Slice());
- sequence_++;
- }
- };
- } // namespace
-
- Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
- MemTableInserter inserter;
- inserter.sequence_ = WriteBatchInternal::Sequence(b);
- inserter.mem_ = memtable;
- return b->Iterate(&inserter);
- }
-
- void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
- assert(contents.size() >= kHeader);
- b->rep_.assign(contents.data(), contents.size());
- }
-
- void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
- SetCount(dst, Count(dst) + Count(src));
- assert(src->rep_.size() >= kHeader);
- dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
- }
-
- } // namespace leveldb
|