小组成员:陈予曈,朱陈媛
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

324 lines
10 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // Decodes the blocks generated by block_builder.cc.
  6. #include "table/block.h"
  7. #include <algorithm>
  8. #include <cstdint>
  9. #include <vector>
  10. #include "leveldb/comparator.h"
  11. #include "table/format.h"
  12. #include "util/coding.h"
  13. #include "util/logging.h"
  14. // 添加所需头文件-陈予曈
  15. #include <iostream>
  16. #include <sstream>
  17. #include <iomanip>
  18. #include <ctime>
  19. #include <chrono>
  20. namespace leveldb {
  21. inline uint32_t Block::NumRestarts() const {
  22. assert(size_ >= sizeof(uint32_t));
  23. return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
  24. }
  25. Block::Block(const BlockContents& contents)
  26. : data_(contents.data.data()),
  27. size_(contents.data.size()),
  28. owned_(contents.heap_allocated) {
  29. if (size_ < sizeof(uint32_t)) {
  30. size_ = 0; // Error marker
  31. } else {
  32. size_t max_restarts_allowed = (size_ - sizeof(uint32_t)) / sizeof(uint32_t);
  33. if (NumRestarts() > max_restarts_allowed) {
  34. // The size is too small for NumRestarts()
  35. size_ = 0;
  36. } else {
  37. restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
  38. }
  39. }
  40. }
  41. Block::~Block() {
  42. if (owned_) {
  43. delete[] data_;
  44. }
  45. }
  46. // Helper routine: decode the next block entry starting at "p",
  47. // storing the number of shared key bytes, non_shared key bytes,
  48. // and the length of the value in "*shared", "*non_shared", and
  49. // "*value_length", respectively. Will not dereference past "limit".
  50. //
  51. // If any errors are detected, returns nullptr. Otherwise, returns a
  52. // pointer to the key delta (just past the three decoded values).
  53. static inline const char* DecodeEntry(const char* p, const char* limit,
  54. uint32_t* shared, uint32_t* non_shared,
  55. uint32_t* value_length) {
  56. if (limit - p < 3) return nullptr;
  57. *shared = reinterpret_cast<const uint8_t*>(p)[0];
  58. *non_shared = reinterpret_cast<const uint8_t*>(p)[1];
  59. *value_length = reinterpret_cast<const uint8_t*>(p)[2];
  60. if ((*shared | *non_shared | *value_length) < 128) {
  61. // Fast path: all three values are encoded in one byte each
  62. p += 3;
  63. } else {
  64. if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
  65. if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
  66. if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr;
  67. }
  68. if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
  69. return nullptr;
  70. }
  71. return p;
  72. }
  73. class Block::Iter : public Iterator {
  74. private:
  75. const Comparator* const comparator_;
  76. const char* const data_; // underlying block contents
  77. uint32_t const restarts_; // Offset of restart array (list of fixed32)
  78. uint32_t const num_restarts_; // Number of uint32_t entries in restart array
  79. // current_ is offset in data_ of current entry. >= restarts_ if !Valid
  80. uint32_t current_;
  81. uint32_t restart_index_; // Index of restart block in which current_ falls
  82. std::string key_;
  83. Slice value_;
  84. Status status_;
  85. inline int Compare(const Slice& a, const Slice& b) const {
  86. return comparator_->Compare(a, b);
  87. }
  88. // Return the offset in data_ just past the end of the current entry.
  89. inline uint32_t NextEntryOffset() const {
  90. return (value_.data() + value_.size()) - data_;
  91. }
  92. uint32_t GetRestartPoint(uint32_t index) {
  93. assert(index < num_restarts_);
  94. return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
  95. }
  96. void SeekToRestartPoint(uint32_t index) {
  97. key_.clear();
  98. restart_index_ = index;
  99. // current_ will be fixed by ParseNextKey();
  100. // ParseNextKey() starts at the end of value_, so set value_ accordingly
  101. uint32_t offset = GetRestartPoint(index);
  102. value_ = Slice(data_ + offset, 0);
  103. }
  104. public:
  105. Iter(const Comparator* comparator, const char* data, uint32_t restarts,
  106. uint32_t num_restarts)
  107. : comparator_(comparator),
  108. data_(data),
  109. restarts_(restarts),
  110. num_restarts_(num_restarts),
  111. current_(restarts_),
  112. restart_index_(num_restarts_) {
  113. assert(num_restarts_ > 0);
  114. }
  115. bool Valid() const override { return current_ < restarts_; }
  116. Status status() const override { return status_; }
  117. Slice key() const override {
  118. assert(Valid());
  119. return key_;
  120. }
  121. Slice value() const override {
  122. assert(Valid());
  123. return value_;
  124. }
  125. void Next() override {
  126. assert(Valid());
  127. ParseNextKey();
  128. }
  129. void Prev() override {
  130. assert(Valid());
  131. // Scan backwards to a restart point before current_
  132. const uint32_t original = current_;
  133. while (GetRestartPoint(restart_index_) >= original) {
  134. if (restart_index_ == 0) {
  135. // No more entries
  136. current_ = restarts_;
  137. restart_index_ = num_restarts_;
  138. return;
  139. }
  140. restart_index_--;
  141. }
  142. SeekToRestartPoint(restart_index_);
  143. do {
  144. // Loop until end of current entry hits the start of original entry
  145. } while (ParseNextKey() && NextEntryOffset() < original);
  146. }
  147. // 修改sstable读取逻辑-陈予曈
  148. void Seek(const Slice& target) override {
  149. // Binary search in restart array to find the last restart point
  150. // with a key < target
  151. uint32_t left = 0;
  152. uint32_t right = num_restarts_ - 1;
  153. int current_key_compare = 0;
  154. if (Valid()) {
  155. // If we're already scanning, use the current position as a starting
  156. // point. This is beneficial if the key we're seeking to is ahead of the
  157. // current position.
  158. current_key_compare = Compare(key_, target);
  159. if (current_key_compare < 0) {
  160. // key_ is smaller than target
  161. left = restart_index_;
  162. } else if (current_key_compare > 0) {
  163. right = restart_index_;
  164. } else {
  165. // We're seeking to the key we're already at.
  166. return;
  167. }
  168. }
  169. while (left < right) {
  170. uint32_t mid = (left + right + 1) / 2;
  171. uint32_t region_offset = GetRestartPoint(mid);
  172. uint32_t shared, non_shared, value_length;
  173. const char* key_ptr =
  174. DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
  175. &non_shared, &value_length);
  176. if (key_ptr == nullptr || (shared != 0)) {
  177. CorruptionError();
  178. return;
  179. }
  180. Slice mid_key(key_ptr, non_shared);
  181. if (Compare(mid_key, target) < 0) {
  182. // Key at "mid" is smaller than "target". Therefore all
  183. // blocks before "mid" are uninteresting.
  184. left = mid;
  185. } else {
  186. // Key at "mid" is >= "target". Therefore all blocks at or
  187. // after "mid" are uninteresting.
  188. right = mid - 1;
  189. }
  190. }
  191. // We might be able to use our current position within the restart block.
  192. // This is true if we determined the key we desire is in the current block
  193. // and is after than the current key.
  194. assert(current_key_compare == 0 || Valid());
  195. bool skip_seek = left == restart_index_ && current_key_compare < 0;
  196. if (!skip_seek) {
  197. SeekToRestartPoint(left);
  198. }
  199. // Linear search (within restart block) for first key >= target
  200. while (true) {
  201. if (!ParseNextKey()) {
  202. return;
  203. }
  204. if (Compare(key_, target) >= 0) {
  205. // 重新解析record-陈予曈
  206. std::string value_with_ttl(value_.data(), value_.size());
  207. if (value_with_ttl.size() >= 19) {
  208. std::string expiration_time_str = value_with_ttl.substr(value_with_ttl.size() - 19); // 提取过期时间戳
  209. std::tm tm = {};
  210. char* res = strptime(expiration_time_str.c_str(), "%Y-%m-%d %H:%M:%S", &tm);
  211. if (res == nullptr) { // 解析时间戳失败
  212. value_ = Slice(value_.data(), value_.size()-19);
  213. } else {
  214. std::time_t expiration_time = std::mktime(&tm);
  215. std::time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
  216. if (expiration_time <= current_time) { // 数据过期
  217. //std::cerr << "notfound_sst" << std::endl;
  218. status_ = Status::NotFound(Slice());
  219. value_ = Slice(value_.data(), 0);
  220. }
  221. else //数据未过期
  222. {
  223. value_ = Slice(value_.data(), value_.size()-19);
  224. }
  225. }
  226. } else { // 时间戳信息不存在
  227. value_ = Slice(value_.data(), value_.size());
  228. }
  229. return;
  230. }
  231. }
  232. }
  233. void SeekToFirst() override {
  234. SeekToRestartPoint(0);
  235. ParseNextKey();
  236. }
  237. void SeekToLast() override {
  238. SeekToRestartPoint(num_restarts_ - 1);
  239. while (ParseNextKey() && NextEntryOffset() < restarts_) {
  240. // Keep skipping
  241. }
  242. }
  243. private:
  244. void CorruptionError() {
  245. current_ = restarts_;
  246. restart_index_ = num_restarts_;
  247. status_ = Status::Corruption("bad entry in block");
  248. key_.clear();
  249. value_.clear();
  250. }
  251. bool ParseNextKey() {
  252. current_ = NextEntryOffset();
  253. const char* p = data_ + current_;
  254. const char* limit = data_ + restarts_; // Restarts come right after data
  255. if (p >= limit) {
  256. // No more entries to return. Mark as invalid.
  257. current_ = restarts_;
  258. restart_index_ = num_restarts_;
  259. return false;
  260. }
  261. // Decode next entry
  262. uint32_t shared, non_shared, value_length;
  263. p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
  264. if (p == nullptr || key_.size() < shared) {
  265. CorruptionError();
  266. return false;
  267. } else {
  268. key_.resize(shared);
  269. key_.append(p, non_shared);
  270. value_ = Slice(p + non_shared, value_length);
  271. while (restart_index_ + 1 < num_restarts_ &&
  272. GetRestartPoint(restart_index_ + 1) < current_) {
  273. ++restart_index_;
  274. }
  275. return true;
  276. }
  277. }
  278. };
  279. Iterator* Block::NewIterator(const Comparator* comparator) {
  280. if (size_ < sizeof(uint32_t)) {
  281. return NewErrorIterator(Status::Corruption("bad block contents"));
  282. }
  283. const uint32_t num_restarts = NumRestarts();
  284. if (num_restarts == 0) {
  285. return NewEmptyIterator();
  286. } else {
  287. return new Iter(comparator, data_, restart_offset_, num_restarts);
  288. }
  289. }
  290. } // namespace leveldb