10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.9 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // WriteBatch::rep_ :=
  6. // sequence: fixed64
  7. // count: fixed32
  8. // data: record[count]
  9. // 原来record :=
  10. // kTypeValue varstring varstring |
  11. // kTypeDeletion varstring
  12. // varstring :=
  13. // len: varint32
  14. // data: uint8[len]
  15. // 增加ttl后record :=
  16. // kTypeValue havettl (deadtime) varstring varstring |
  17. // kTypeDeletion varstring
  18. // varstring :=
  19. // len: varint32
  20. // data: uint8[len]
  21. #include "leveldb/write_batch.h"
  22. #include "db/dbformat.h"
  23. #include "db/memtable.h"
  24. #include "db/write_batch_internal.h"
  25. #include "leveldb/db.h"
  26. #include "util/coding.h"
  27. #include "ctime"
  28. #include <cstdint>
  29. namespace leveldb {
  30. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
  31. static const size_t kHeader = 12;
  32. WriteBatch::WriteBatch() { Clear(); }
  33. WriteBatch::~WriteBatch() = default;
  34. WriteBatch::Handler::~Handler() = default;
  35. void WriteBatch::Clear() {
  36. rep_.clear();
  37. rep_.resize(kHeader);
  38. }
  39. size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
  40. Status WriteBatch::Iterate(Handler* handler) const {
  41. Slice input(rep_);
  42. if (input.size() < kHeader) {
  43. return Status::Corruption("malformed WriteBatch (too small)");
  44. }
  45. input.remove_prefix(kHeader);
  46. Slice key, value;
  47. int found = 0;
  48. while (!input.empty()) {
  49. found++;
  50. char tag = input[0];
  51. input.remove_prefix(1);
  52. switch (tag) {
  53. case kTypeValue:
  54. {
  55. char havettl = input[0];
  56. input.remove_prefix(1);
  57. uint64_t deadTime = 0;
  58. if(havettl){
  59. deadTime = DecodeFixed64(input.data());
  60. input.remove_prefix(8);
  61. }
  62. if (GetLengthPrefixedSlice(&input, &key) &&
  63. GetLengthPrefixedSlice(&input, &value)) {
  64. handler->Put(key, value, deadTime);
  65. } else {
  66. return Status::Corruption("bad WriteBatch Put");
  67. }
  68. }
  69. break;
  70. case kTypeDeletion:
  71. if (GetLengthPrefixedSlice(&input, &key)) {
  72. handler->Delete(key);
  73. } else {
  74. return Status::Corruption("bad WriteBatch Delete");
  75. }
  76. break;
  77. default:
  78. return Status::Corruption("unknown WriteBatch tag");
  79. }
  80. }
  81. if (found != WriteBatchInternal::Count(this)) {
  82. return Status::Corruption("WriteBatch has wrong count");
  83. } else {
  84. return Status::OK();
  85. }
  86. }
  87. int WriteBatchInternal::Count(const WriteBatch* b) {
  88. return DecodeFixed32(b->rep_.data() + 8);
  89. }
  90. void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
  91. EncodeFixed32(&b->rep_[8], n);
  92. }
  93. SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
  94. return SequenceNumber(DecodeFixed64(b->rep_.data()));
  95. }
  96. void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
  97. EncodeFixed64(&b->rep_[0], seq);
  98. }
  99. void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
  100. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  101. rep_.push_back(static_cast<char>(kTypeValue));
  102. rep_.push_back(static_cast<char>(ttl != 0)); //1:havettl
  103. if(ttl != 0){
  104. time_t nowTime;
  105. time(&nowTime);
  106. assert(nowTime > 0);
  107. assert(ttl > 0);
  108. uint64_t deadTime = static_cast<uint64_t>(nowTime) + ttl;
  109. // std::cout<<"now and dead time : "<<nowTime<<" "<<deadTime<<std::endl;
  110. PutFixed64(&rep_, deadTime);
  111. }
  112. PutLengthPrefixedSlice(&rep_, key);
  113. PutLengthPrefixedSlice(&rep_, value);
  114. }
  115. void WriteBatch::Delete(const Slice& key) {
  116. WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  117. rep_.push_back(static_cast<char>(kTypeDeletion));
  118. PutLengthPrefixedSlice(&rep_, key);
  119. }
  120. void WriteBatch::Append(const WriteBatch& source) {
  121. WriteBatchInternal::Append(this, &source);
  122. }
  123. namespace {
  124. class MemTableInserter : public WriteBatch::Handler {
  125. public:
  126. SequenceNumber sequence_;
  127. MemTable* mem_;
  128. void Put(const Slice& key, const Slice& value, uint64_t deadTime) override {
  129. mem_->Add(sequence_, kTypeValue, key, value, deadTime);
  130. sequence_++;
  131. }
  132. void Delete(const Slice& key) override {
  133. mem_->Add(sequence_, kTypeDeletion, key, Slice(), 0);
  134. sequence_++;
  135. }
  136. };
  137. } // namespace
  138. Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
  139. MemTableInserter inserter;
  140. inserter.sequence_ = WriteBatchInternal::Sequence(b);
  141. inserter.mem_ = memtable;
  142. return b->Iterate(&inserter);
  143. }
  144. void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
  145. assert(contents.size() >= kHeader);
  146. b->rep_.assign(contents.data(), contents.size());
  147. }
  148. void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
  149. SetCount(dst, Count(dst) + Count(src));
  150. assert(src->rep_.size() >= kHeader);
  151. dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
  152. }
  153. } // namespace leveldb