小组成员: 曹可心-10223903406 朴祉燕-10224602413
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
5.8 KiB

4 weeks ago
4 weeks ago
4 weeks ago
4 weeks ago
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // WriteBatch::rep_ :=
  6. // sequence: fixed64
  7. // count: fixed32
  8. // data: record[count]
  9. // record :=
  10. // kTypeValue varstring varstring |
  11. // kTypeDeletion varstring
  12. // varstring :=
  13. // len: varint32
  14. // data: uint8[len]
  15. #include "leveldb/write_batch.h"
  16. #include "db/dbformat.h"
  17. #include "db/memtable.h"
  18. #include "db/write_batch_internal.h"
  19. #include "leveldb/db.h"
  20. #include "util/coding.h"
  21. #include <sstream> // For std::ostringstream 心
  22. #include <cstdint> // 引入uint64_t类型,燕
  23. #include "leveldb/env.h" //燕
  24. namespace leveldb {
  25. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  26. static const size_t kHeader = 12;
  27. WriteBatch::WriteBatch() { Clear(); }
  28. WriteBatch::~WriteBatch() = default;
  29. WriteBatch::Handler::~Handler() = default;
  30. void WriteBatch::Clear() {
  31. rep_.clear();
  32. rep_.resize(kHeader);
  33. }
  34. size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
  35. Status WriteBatch::Iterate(Handler* handler) const {
  36. Slice input(rep_);
  37. if (input.size() < kHeader) {
  38. return Status::Corruption("malformed WriteBatch (too small)");
  39. }
  40. input.remove_prefix(kHeader);
  41. Slice key, value;
  42. int found = 0;
  43. while (!input.empty()) {
  44. found++;
  45. char tag = input[0];
  46. input.remove_prefix(1);
  47. switch (tag) {
  48. case kTypeValue:
  49. if (GetLengthPrefixedSlice(&input, &key) &&
  50. GetLengthPrefixedSlice(&input, &value)) {
  51. handler->Put(key, value);
  52. } else {
  53. return Status::Corruption("bad WriteBatch Put");
  54. }
  55. break;
  56. case kTypeDeletion:
  57. if (GetLengthPrefixedSlice(&input, &key)) {
  58. handler->Delete(key);
  59. } else {
  60. return Status::Corruption("bad WriteBatch Delete");
  61. }
  62. break;
  63. default:
  64. return Status::Corruption("unknown WriteBatch tag");
  65. }
  66. }
  67. if (found != WriteBatchInternal::Count(this)) {
  68. return Status::Corruption("WriteBatch has wrong count");
  69. } else {
  70. return Status::OK();
  71. }
  72. }
  73. int WriteBatchInternal::Count(const WriteBatch* b) {
  74. return DecodeFixed32(b->rep_.data() + 8);
  75. }
  76. void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  77. EncodeFixed32(&b->rep_[8], n);
  78. }
  79. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  80. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  81. }
  82. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  83. EncodeFixed64(&b->rep_[0], seq);
  84. }
  85. void WriteBatch::Put(const Slice& key, const Slice& value) {
  86. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  87. rep_.push_back(static_cast<char>(kTypeValue));
  88. PutLengthPrefixedSlice(&rep_, key);
  89. PutLengthPrefixedSlice(&rep_, value);
  90. }
  91. // void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
  92. // WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  93. // rep_.push_back(static_cast<char>(kTypeValue));
  94. // PutLengthPrefixedSlice(&rep_, key);
  95. // // 获取当前时间并计算过期时间
  96. // uint64_t current_time = Env::Default()->NowMicros() / 1000000; // 当前时间(秒)
  97. // uint64_t expire_time = current_time + ttl; // 计算未来的过期时间
  98. // // 将过期时间编码为64位整数
  99. // rep_.append(reinterpret_cast<const char*>(&expire_time), sizeof(expire_time)); // 添加过期时间戳
  100. // // 添加原始值
  101. // PutLengthPrefixedSlice(&rep_, value);
  102. // }
  103. void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
  104. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  105. rep_.push_back(static_cast<char>(kTypeValue));
  106. PutLengthPrefixedSlice(&rep_, key);
  107. // 获取当前时间
  108. auto now = std::chrono::system_clock::now();
  109. // 加上ttl
  110. auto future_time = now + std::chrono::seconds(ttl);
  111. // 转换为 time_t
  112. std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time);
  113. // 将 time_t 转换为 tm 结构
  114. std::tm* local_tm = std::localtime(&future_time_t);
  115. // 格式化为字符串
  116. char buffer[20]; // 格式化字符串的缓冲区
  117. std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm);
  118. std::string future_time_str(buffer);
  119. // 拼接原本的值和时间字符串
  120. std::string combined_str = value.ToString() + future_time_str;
  121. PutLengthPrefixedSlice(&rep_, Slice(combined_str));
  122. } // 心
  123. void WriteBatch::Delete(const Slice& key) {
  124. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  125. rep_.push_back(static_cast<char>(kTypeDeletion));
  126. PutLengthPrefixedSlice(&rep_, key);
  127. }
  128. void WriteBatch::Append(const WriteBatch& source) {
  129. WriteBatchInternal::Append(this, &source);
  130. }
  131. namespace {
  132. class MemTableInserter : public WriteBatch::Handler {
  133. public:
  134. SequenceNumber sequence_;
  135. MemTable* mem_;
  136. void Put(const Slice& key, const Slice& value) override {
  137. mem_->Add(sequence_, kTypeValue, key, value);
  138. sequence_++;
  139. }
  140. void Delete(const Slice& key) override {
  141. mem_->Add(sequence_, kTypeDeletion, key, Slice());
  142. sequence_++;
  143. }
  144. };
  145. } // namespace
  146. Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
  147. MemTableInserter inserter;
  148. inserter.sequence_ = WriteBatchInternal::Sequence(b);
  149. inserter.mem_ = memtable;
  150. return b->Iterate(&inserter);
  151. }
  152. void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  153. assert(contents.size() >= kHeader);
  154. b->rep_.assign(contents.data(), contents.size());
  155. }
  156. void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  157. SetCount(dst, Count(dst) + Count(src));
  158. assert(src->rep_.size() >= kHeader);
  159. dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
  160. }
  161. } // namespace leveldb