// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
//
|
|
// WriteBatch::rep_ :=
|
|
// sequence: fixed64
|
|
// count: fixed32
|
|
// data: record[count]
|
|
// record :=
|
|
// kTypeValue varstring varstring |
|
|
// kTypeDeletion varstring
|
|
// varstring :=
|
|
// len: varint32
|
|
// data: uint8[len]
|
|
|
|
#include "leveldb/write_batch.h"
|
|
|
|
#include "db/dbformat.h"
|
|
#include "db/memtable.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "leveldb/db.h"
|
|
#include "db/db_impl.h" //朴
|
|
#include "util/coding.h"
|
|
|
|
#include <sstream> // For std::ostringstream 心
|
|
#include <cstdint>
|
|
#include <string>
|
|
|
|
namespace leveldb {
|
|
|
|
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
|
|
static const size_t kHeader = 12;
|
|
|
|
WriteBatch::WriteBatch() { Clear(); }
|
|
|
|
WriteBatch::~WriteBatch() = default;
|
|
|
|
WriteBatch::Handler::~Handler() = default;
|
|
|
|
void WriteBatch::Clear() {
|
|
rep_.clear();
|
|
rep_.resize(kHeader);
|
|
}
|
|
|
|
size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
|
|
|
|
Status WriteBatch::Iterate(Handler* handler) const {
|
|
Slice input(rep_);
|
|
if (input.size() < kHeader) {
|
|
return Status::Corruption("malformed WriteBatch (too small)");
|
|
}
|
|
|
|
input.remove_prefix(kHeader);
|
|
Slice key, value;
|
|
int found = 0;
|
|
while (!input.empty()) {
|
|
found++;
|
|
char tag = input[0];
|
|
input.remove_prefix(1);
|
|
switch (tag) {
|
|
case kTypeValue:
|
|
if (GetLengthPrefixedSlice(&input, &key) &&
|
|
GetLengthPrefixedSlice(&input, &value)) {
|
|
handler->Put(key, value);
|
|
} else {
|
|
return Status::Corruption("bad WriteBatch Put");
|
|
}
|
|
break;
|
|
case kTypeDeletion:
|
|
if (GetLengthPrefixedSlice(&input, &key)) {
|
|
handler->Delete(key);
|
|
} else {
|
|
return Status::Corruption("bad WriteBatch Delete");
|
|
}
|
|
break;
|
|
default:
|
|
return Status::Corruption("unknown WriteBatch tag");
|
|
}
|
|
}
|
|
if (found != WriteBatchInternal::Count(this)) {
|
|
return Status::Corruption("WriteBatch has wrong count");
|
|
} else {
|
|
return Status::OK();
|
|
}
|
|
}
|
|
|
|
int WriteBatchInternal::Count(const WriteBatch* b) {
|
|
return DecodeFixed32(b->rep_.data() + 8);
|
|
}
|
|
|
|
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
|
|
EncodeFixed32(&b->rep_[8], n);
|
|
}
|
|
|
|
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
|
|
return SequenceNumber(DecodeFixed64(b->rep_.data()));
|
|
}
|
|
|
|
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
|
|
EncodeFixed64(&b->rep_[0], seq);
|
|
}
|
|
|
|
void WriteBatch::Put(const Slice& key, const Slice& value) {
|
|
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
|
|
rep_.push_back(static_cast<char>(kTypeValue));
|
|
PutLengthPrefixedSlice(&rep_, key);
|
|
PutLengthPrefixedSlice(&rep_, value);
|
|
}
|
|
|
|
// void WriteBatch::Put(const Slice& key, const Slice& value) { // 朴,kv分离,12.07
|
|
// if (DBImpl::key_value_separated_) {
|
|
// // 分离key和value的逻辑
|
|
// // 例如,你可以将key和value分别存储在不同的容器中
|
|
// // 这里需要根据你的具体需求来实现
|
|
// //...
|
|
// if (value.size() > max_value_size_) {
|
|
// // 分离key和value的逻辑
|
|
// // 将value存进新的数据结构blobfile
|
|
// //...
|
|
// // 例如,你可以使用以下代码将value写入blobfile
|
|
// std::ofstream blobfile("blobfile.dat", std::ios::binary | std::ios::app);
|
|
// blobfile.write(value.data(), value.size());
|
|
// blobfile.close();
|
|
// }
|
|
// }
|
|
// else {
|
|
// // 不分离key和value的逻辑
|
|
// WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
|
|
// rep_.push_back(static_cast<char>(kTypeValue));
|
|
// PutLengthPrefixedSlice(&rep_, key);
|
|
// PutLengthPrefixedSlice(&rep_, value);
|
|
// }
|
|
// }
|
|
|
|
|
|
void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
|
|
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
|
|
rep_.push_back(static_cast<char>(kTypeValue));
|
|
PutLengthPrefixedSlice(&rep_, key);
|
|
|
|
// 获取当前时间
|
|
auto now = std::chrono::system_clock::now();
|
|
|
|
// 加上ttl
|
|
auto future_time = now + std::chrono::seconds(ttl);
|
|
|
|
// 转换为 time_t
|
|
std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time);
|
|
|
|
// 将 time_t 转换为 tm 结构
|
|
std::tm* local_tm = std::localtime(&future_time_t);
|
|
|
|
// 格式化为字符串
|
|
char buffer[20]; // 格式化字符串的缓冲区
|
|
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm);
|
|
std::string future_time_str(buffer);
|
|
|
|
// 拼接原本的值和时间字符串
|
|
std::string combined_str = value.ToString() + future_time_str;
|
|
PutLengthPrefixedSlice(&rep_, Slice(combined_str));
|
|
|
|
} // 心
|
|
|
|
void WriteBatch::Delete(const Slice& key) {
|
|
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
|
|
rep_.push_back(static_cast<char>(kTypeDeletion));
|
|
PutLengthPrefixedSlice(&rep_, key);
|
|
}
|
|
|
|
void WriteBatch::Append(const WriteBatch& source) {
|
|
WriteBatchInternal::Append(this, &source);
|
|
}
|
|
|
|
namespace {
|
|
class MemTableInserter : public WriteBatch::Handler {
|
|
public:
|
|
SequenceNumber sequence_;
|
|
MemTable* mem_;
|
|
|
|
void Put(const Slice& key, const Slice& value) override {
|
|
mem_->Add(sequence_, kTypeValue, key, value);
|
|
sequence_++;
|
|
}
|
|
void Delete(const Slice& key) override {
|
|
mem_->Add(sequence_, kTypeDeletion, key, Slice());
|
|
sequence_++;
|
|
}
|
|
};
|
|
} // namespace
|
|
|
|
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
|
|
MemTableInserter inserter;
|
|
inserter.sequence_ = WriteBatchInternal::Sequence(b);
|
|
inserter.mem_ = memtable;
|
|
return b->Iterate(&inserter);
|
|
}
|
|
|
|
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
|
|
assert(contents.size() >= kHeader);
|
|
b->rep_.assign(contents.data(), contents.size());
|
|
}
|
|
|
|
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
|
|
SetCount(dst, Count(dst) + Count(src));
|
|
assert(src->rep_.size() >= kHeader);
|
|
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
|
|
}
|
|
|
|
} // namespace leveldb
|