You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

309 lines
8.5 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_iter.h"
  5. #include "db/db_impl.h"
  6. #include "db/dbformat.h"
  7. #include "db/filename.h"
  8. #include "leveldb/env.h"
  9. #include "leveldb/iterator.h"
  10. #include "port/port.h"
  11. #include "util/logging.h"
  12. #include "util/mutexlock.h"
  13. #include "util/random.h"
  14. namespace leveldb {
  15. #if 0
  16. static void DumpInternalIter(Iterator* iter) {
  17. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  18. ParsedInternalKey k;
  19. if (!ParseInternalKey(iter->key(), &k)) {
  20. fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
  21. } else {
  22. fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
  23. }
  24. }
  25. }
  26. #endif
  27. namespace {
  28. // Memtables and sstables that make the DB representation contain
  29. // (userkey,seq,type) => uservalue entries. DBIter
  30. // combines multiple entries for the same userkey found in the DB
  31. // representation into a single entry while accounting for sequence
  32. // numbers, deletion markers, overwrites, etc.
  33. class DBIter : public Iterator {
  34. public:
  35. // Which direction is the iterator currently moving?
  36. // (1) When moving forward, the internal iterator is positioned at
  37. // the exact entry that yields this->key(), this->value()
  38. // (2) When moving backwards, the internal iterator is positioned
  39. // just before all entries whose user key == this->key().
  40. enum Direction { kForward, kReverse };
  41. DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
  42. uint32_t seed)
  43. : db_(db),
  44. user_comparator_(cmp),
  45. iter_(iter),
  46. sequence_(s),
  47. direction_(kForward),
  48. valid_(false),
  49. rnd_(seed),
  50. bytes_until_read_sampling_(RandomCompactionPeriod()) {}
  51. DBIter(const DBIter&) = delete;
  52. DBIter& operator=(const DBIter&) = delete;
  53. virtual ~DBIter() { delete iter_; }
  54. virtual bool Valid() const { return valid_; }
  55. virtual Slice key() const {
  56. assert(valid_);
  57. return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
  58. }
  59. virtual Slice value() const {
  60. assert(valid_);
  61. return (direction_ == kForward) ? iter_->value() : saved_value_;
  62. }
  63. virtual Status status() const {
  64. if (status_.ok()) {
  65. return iter_->status();
  66. } else {
  67. return status_;
  68. }
  69. }
  70. virtual void Next();
  71. virtual void Prev();
  72. virtual void Seek(const Slice& target);
  73. virtual void SeekToFirst();
  74. virtual void SeekToLast();
  75. private:
  76. void FindNextUserEntry(bool skipping, std::string* skip);
  77. void FindPrevUserEntry();
  78. bool ParseKey(ParsedInternalKey* key);
  79. inline void SaveKey(const Slice& k, std::string* dst) {
  80. dst->assign(k.data(), k.size());
  81. }
  82. inline void ClearSavedValue() {
  83. if (saved_value_.capacity() > 1048576) {
  84. std::string empty;
  85. swap(empty, saved_value_);
  86. } else {
  87. saved_value_.clear();
  88. }
  89. }
  90. // Picks the number of bytes that can be read until a compaction is scheduled.
  91. size_t RandomCompactionPeriod() {
  92. return rnd_.Uniform(2 * config::kReadBytesPeriod);
  93. }
  94. DBImpl* db_;
  95. const Comparator* const user_comparator_;
  96. Iterator* const iter_;
  97. SequenceNumber const sequence_;
  98. Status status_;
  99. std::string saved_key_; // == current key when direction_==kReverse
  100. std::string saved_value_; // == current raw value when direction_==kReverse
  101. Direction direction_;
  102. bool valid_;
  103. Random rnd_;
  104. size_t bytes_until_read_sampling_;
  105. };
  106. inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  107. Slice k = iter_->key();
  108. size_t bytes_read = k.size() + iter_->value().size();
  109. while (bytes_until_read_sampling_ < bytes_read) {
  110. bytes_until_read_sampling_ += RandomCompactionPeriod();
  111. db_->RecordReadSample(k);
  112. }
  113. assert(bytes_until_read_sampling_ >= bytes_read);
  114. bytes_until_read_sampling_ -= bytes_read;
  115. if (!ParseInternalKey(k, ikey)) {
  116. status_ = Status::Corruption("corrupted internal key in DBIter");
  117. return false;
  118. } else {
  119. return true;
  120. }
  121. }
  122. void DBIter::Next() {
  123. assert(valid_);
  124. if (direction_ == kReverse) { // Switch directions?
  125. direction_ = kForward;
  126. // iter_ is pointing just before the entries for this->key(),
  127. // so advance into the range of entries for this->key() and then
  128. // use the normal skipping code below.
  129. if (!iter_->Valid()) {
  130. iter_->SeekToFirst();
  131. } else {
  132. iter_->Next();
  133. }
  134. if (!iter_->Valid()) {
  135. valid_ = false;
  136. saved_key_.clear();
  137. return;
  138. }
  139. // saved_key_ already contains the key to skip past.
  140. } else {
  141. // Store in saved_key_ the current key so we skip it below.
  142. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  143. }
  144. FindNextUserEntry(true, &saved_key_);
  145. }
  146. void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
  147. // Loop until we hit an acceptable entry to yield
  148. assert(iter_->Valid());
  149. assert(direction_ == kForward);
  150. do {
  151. ParsedInternalKey ikey;
  152. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  153. switch (ikey.type) {
  154. case kTypeDeletion:
  155. // Arrange to skip all upcoming entries for this key since
  156. // they are hidden by this deletion.
  157. SaveKey(ikey.user_key, skip);
  158. skipping = true;
  159. break;
  160. case kTypeValue:
  161. if (skipping &&
  162. user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
  163. // Entry hidden
  164. } else {
  165. valid_ = true;
  166. saved_key_.clear();
  167. return;
  168. }
  169. break;
  170. }
  171. }
  172. iter_->Next();
  173. } while (iter_->Valid());
  174. saved_key_.clear();
  175. valid_ = false;
  176. }
  177. void DBIter::Prev() {
  178. assert(valid_);
  179. if (direction_ == kForward) { // Switch directions?
  180. // iter_ is pointing at the current entry. Scan backwards until
  181. // the key changes so we can use the normal reverse scanning code.
  182. assert(iter_->Valid()); // Otherwise valid_ would have been false
  183. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  184. while (true) {
  185. iter_->Prev();
  186. if (!iter_->Valid()) {
  187. valid_ = false;
  188. saved_key_.clear();
  189. ClearSavedValue();
  190. return;
  191. }
  192. if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) <
  193. 0) {
  194. break;
  195. }
  196. }
  197. direction_ = kReverse;
  198. }
  199. FindPrevUserEntry();
  200. }
  201. void DBIter::FindPrevUserEntry() {
  202. assert(direction_ == kReverse);
  203. ValueType value_type = kTypeDeletion;
  204. if (iter_->Valid()) {
  205. do {
  206. ParsedInternalKey ikey;
  207. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  208. if ((value_type != kTypeDeletion) &&
  209. user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
  210. // We encountered a non-deleted value in entries for previous keys,
  211. break;
  212. }
  213. value_type = ikey.type;
  214. if (value_type == kTypeDeletion) {
  215. saved_key_.clear();
  216. ClearSavedValue();
  217. } else {
  218. Slice raw_value = iter_->value();
  219. if (saved_value_.capacity() > raw_value.size() + 1048576) {
  220. std::string empty;
  221. swap(empty, saved_value_);
  222. }
  223. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  224. saved_value_.assign(raw_value.data(), raw_value.size());
  225. }
  226. }
  227. iter_->Prev();
  228. } while (iter_->Valid());
  229. }
  230. if (value_type == kTypeDeletion) {
  231. // End
  232. valid_ = false;
  233. saved_key_.clear();
  234. ClearSavedValue();
  235. direction_ = kForward;
  236. } else {
  237. valid_ = true;
  238. }
  239. }
  240. void DBIter::Seek(const Slice& target) {
  241. direction_ = kForward;
  242. ClearSavedValue();
  243. saved_key_.clear();
  244. AppendInternalKey(&saved_key_,
  245. ParsedInternalKey(target, sequence_, kValueTypeForSeek));
  246. iter_->Seek(saved_key_);
  247. if (iter_->Valid()) {
  248. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  249. } else {
  250. valid_ = false;
  251. }
  252. }
  253. void DBIter::SeekToFirst() {
  254. direction_ = kForward;
  255. ClearSavedValue();
  256. iter_->SeekToFirst();
  257. if (iter_->Valid()) {
  258. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  259. } else {
  260. valid_ = false;
  261. }
  262. }
  263. void DBIter::SeekToLast() {
  264. direction_ = kReverse;
  265. ClearSavedValue();
  266. iter_->SeekToLast();
  267. FindPrevUserEntry();
  268. }
  269. } // anonymous namespace
  270. Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
  271. Iterator* internal_iter, SequenceNumber sequence,
  272. uint32_t seed) {
  273. return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
  274. }
  275. } // namespace leveldb