// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // // WriteBatch::rep_ := // sequence: fixed64 // count: fixed32 // data: record[count] // record := // kTypeValue varstring varstring | // kTypeDeletion varstring // varstring := // len: varint32 // data: uint8[len] #include "leveldb/write_batch.h" #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "util/coding.h" // 添加所需头文件-橙 #include #include #include #include namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; WriteBatch::WriteBatch() { Clear(); } WriteBatch::~WriteBatch() = default; WriteBatch::Handler::~Handler() = default; void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); } size_t WriteBatch::ApproximateSize() const { return rep_.size(); } Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } input.remove_prefix(kHeader); Slice key, value; int found = 0; while (!input.empty()) { found++; char tag = input[0]; input.remove_prefix(1); switch (tag) { case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Put(key, value); } else { return Status::Corruption("bad WriteBatch Put"); } break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { handler->Delete(key); } else { return Status::Corruption("bad WriteBatch Delete"); } break; default: return Status::Corruption("unknown WriteBatch tag"); } } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); } } int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } void WriteBatchInternal::SetCount(WriteBatch* b, int n) { EncodeFixed32(&b->rep_[8], n); } SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { return SequenceNumber(DecodeFixed64(b->rep_.data())); } void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } void WriteBatch::Put(const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } // 添加ttl,新的put方法-橙 void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); // PutLengthPrefixedSlice(&rep_, value); // 获取当前时间,加上 TTL 得到过期时间戳 auto expiration_time = std::chrono::system_clock::now()+ std::chrono::seconds(ttl); // 将过期时间戳转换为字符串 std::time_t expiration_time_t = std::chrono::system_clock::to_time_t(expiration_time); std::stringstream ss; ss << std::put_time(std::localtime(&expiration_time_t), "%Y-%m-%d %H:%M:%S"); std::string expiration_time_str = ss.str(); // 将过期时间戳添加到值中 std::string value_with_ttl(value.data(), value.size()); value_with_ttl.append(expiration_time_str); // 拼接 PutLengthPrefixedSlice(&rep_, Slice(value_with_ttl.data(), value_with_ttl.size())); } void WriteBatch::Delete(const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeDeletion)); PutLengthPrefixedSlice(&rep_, key); } void WriteBatch::Append(const WriteBatch& source) { WriteBatchInternal::Append(this, &source); } namespace { class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; void Put(const Slice& key, const Slice& value) override { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } void Delete(const Slice& key) override { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; } // namespace Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); } } // namespace leveldb