小组成员: 曹可心-10223903406 朴祉燕-10224602413
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

198 lines
5.8 KiB

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring |
// kTypeDeletion varstring
// varstring :=
// len: varint32
// data: uint8[len]
#include "leveldb/write_batch.h"
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "util/coding.h"
#include <sstream> // For std::ostringstream 心
#include <cstdint> // 引入uint64_t类型,燕
#include "leveldb/env.h" //燕
namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
WriteBatch::WriteBatch() { Clear(); }
WriteBatch::~WriteBatch() = default;
WriteBatch::Handler::~Handler() = default;
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(kHeader);
}
size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
EncodeFixed32(&b->rep_[8], n);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
// void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
// WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
// rep_.push_back(static_cast<char>(kTypeValue));
// PutLengthPrefixedSlice(&rep_, key);
// // 获取当前时间并计算过期时间
// uint64_t current_time = Env::Default()->NowMicros() / 1000000; // 当前时间(秒)
// uint64_t expire_time = current_time + ttl; // 计算未来的过期时间
// // 将过期时间编码为64位整数
// rep_.append(reinterpret_cast<const char*>(&expire_time), sizeof(expire_time)); // 添加过期时间戳
// // 添加原始值
// PutLengthPrefixedSlice(&rep_, value);
// }
void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
// 获取当前时间
auto now = std::chrono::system_clock::now();
// 加上ttl
auto future_time = now + std::chrono::seconds(ttl);
// 转换为 time_t
std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time);
// 将 time_t 转换为 tm 结构
std::tm* local_tm = std::localtime(&future_time_t);
// 格式化为字符串
char buffer[20]; // 格式化字符串的缓冲区
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm);
std::string future_time_str(buffer);
// 拼接原本的值和时间字符串
std::string combined_str = value.ToString() + future_time_str;
PutLengthPrefixedSlice(&rep_, Slice(combined_str));
} // 心
void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}
void WriteBatch::Append(const WriteBatch& source) {
WriteBatchInternal::Append(this, &source);
}
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
} // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());
}
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
} // namespace leveldb