// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // // WriteBatch::rep_ := // sequence: fixed64 // count: fixed32 // data: record[count] // record := // kTypeValue varstring varstring | // kTypeDeletion varstring // varstring := // len: varint32 // data: uint8[len] #include "leveldb/write_batch.h" #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "util/coding.h" #include // For std::ostringstream 心 #include // 引入uint64_t类型,燕 #include "leveldb/env.h" //燕 namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; WriteBatch::WriteBatch() { Clear(); } WriteBatch::~WriteBatch() = default; WriteBatch::Handler::~Handler() = default; void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); } size_t WriteBatch::ApproximateSize() const { return rep_.size(); } Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } input.remove_prefix(kHeader); Slice key, value; int found = 0; while (!input.empty()) { found++; char tag = input[0]; input.remove_prefix(1); switch (tag) { case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Put(key, value); } else { return Status::Corruption("bad WriteBatch Put"); } break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { handler->Delete(key); } else { return Status::Corruption("bad WriteBatch Delete"); } break; default: return Status::Corruption("unknown WriteBatch tag"); } } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); } } int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } void WriteBatchInternal::SetCount(WriteBatch* b, int n) { EncodeFixed32(&b->rep_[8], n); } SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { return SequenceNumber(DecodeFixed64(b->rep_.data())); } void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } void WriteBatch::Put(const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } // void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) { // WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); // rep_.push_back(static_cast(kTypeValue)); // PutLengthPrefixedSlice(&rep_, key); // // 获取当前时间并计算过期时间 // uint64_t current_time = Env::Default()->NowMicros() / 1000000; // 当前时间(秒) // uint64_t expire_time = current_time + ttl; // 计算未来的过期时间 // // 将过期时间编码为64位整数 // rep_.append(reinterpret_cast(&expire_time), sizeof(expire_time)); // 添加过期时间戳 // // 添加原始值 // PutLengthPrefixedSlice(&rep_, value); // } void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); // 获取当前时间 auto now = std::chrono::system_clock::now(); // 加上ttl auto future_time = now + std::chrono::seconds(ttl); // 转换为 time_t std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time); // 将 time_t 转换为 tm 结构 std::tm* local_tm = std::localtime(&future_time_t); // 格式化为字符串 char buffer[20]; // 格式化字符串的缓冲区 std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm); std::string future_time_str(buffer); // 拼接原本的值和时间字符串 std::string combined_str = value.ToString() + future_time_str; PutLengthPrefixedSlice(&rep_, Slice(combined_str)); } // 心 void WriteBatch::Delete(const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeDeletion)); PutLengthPrefixedSlice(&rep_, key); } void WriteBatch::Append(const WriteBatch& source) { WriteBatchInternal::Append(this, &source); } namespace { class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; void Put(const Slice& key, const Slice& value) override { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } void Delete(const Slice& key) override { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; } // namespace Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); } } // namespace leveldb