// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // // WriteBatch::rep_ := // sequence: fixed64 // count: fixed32 // data: record[count] // record := // kTypeValue varstring varstring | // kTypeDeletion varstring // varstring := // len: varint32 // data: uint8[len] #include "leveldb/write_batch.h" #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "db/db_impl.h" //朴 #include "util/coding.h" #include <sstream> // For std::ostringstream 心 #include <cstdint> #include <string> namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; WriteBatch::WriteBatch() { Clear(); } WriteBatch::~WriteBatch() = default; WriteBatch::Handler::~Handler() = default; void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); } size_t WriteBatch::ApproximateSize() const { return rep_.size(); } Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } input.remove_prefix(kHeader); Slice key, value; int found = 0; while (!input.empty()) { found++; char tag = input[0]; input.remove_prefix(1); switch (tag) { case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Put(key, value); } else { return Status::Corruption("bad WriteBatch Put"); } break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { handler->Delete(key); } else { return Status::Corruption("bad WriteBatch Delete"); } break; default: return Status::Corruption("unknown WriteBatch tag"); } } if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); } } int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } void WriteBatchInternal::SetCount(WriteBatch* b, int n) { EncodeFixed32(&b->rep_[8], n); } SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) { return SequenceNumber(DecodeFixed64(b->rep_.data())); } void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } void WriteBatch::Put(const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast<char>(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } // void WriteBatch::Put(const Slice& key, const Slice& value) { // 朴,kv分离,12.07 // if (DBImpl::key_value_separated_) { // // 分离key和value的逻辑 // // 例如,你可以将key和value分别存储在不同的容器中 // // 这里需要根据你的具体需求来实现 // //... // if (value.size() > max_value_size_) { // // 分离key和value的逻辑 // // 将value存进新的数据结构blobfile // //... // // 例如,你可以使用以下代码将value写入blobfile // std::ofstream blobfile("blobfile.dat", std::ios::binary | std::ios::app); // blobfile.write(value.data(), value.size()); // blobfile.close(); // } // } // else { // // 不分离key和value的逻辑 // WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); // rep_.push_back(static_cast<char>(kTypeValue)); // PutLengthPrefixedSlice(&rep_, key); // PutLengthPrefixedSlice(&rep_, value); // } // } void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast<char>(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); // 获取当前时间 auto now = std::chrono::system_clock::now(); // 加上ttl auto future_time = now + std::chrono::seconds(ttl); // 转换为 time_t std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time); // 将 time_t 转换为 tm 结构 std::tm* local_tm = std::localtime(&future_time_t); // 格式化为字符串 char buffer[20]; // 格式化字符串的缓冲区 std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm); std::string future_time_str(buffer); // 拼接原本的值和时间字符串 std::string combined_str = value.ToString() + future_time_str; PutLengthPrefixedSlice(&rep_, Slice(combined_str)); } // 心 void WriteBatch::Delete(const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast<char>(kTypeDeletion)); PutLengthPrefixedSlice(&rep_, key); } void WriteBatch::Append(const WriteBatch& source) { WriteBatchInternal::Append(this, &source); } namespace { class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; void Put(const Slice& key, const Slice& value) override { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } void Delete(const Slice& key) override { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; } // namespace Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); } } // namespace leveldb