Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

318 wiersze
8.7 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_iter.h"
  5. #include "db/db_impl.h"
  6. #include "db/dbformat.h"
  7. #include "db/filename.h"
  8. #include "leveldb/env.h"
  9. #include "leveldb/iterator.h"
  10. #include "port/port.h"
  11. #include "util/logging.h"
  12. #include "util/mutexlock.h"
  13. #include "util/random.h"
  14. namespace leveldb {
  15. #if 0
  16. static void DumpInternalIter(Iterator* iter) {
  17. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  18. ParsedInternalKey k;
  19. if (!ParseInternalKey(iter->key(), &k)) {
  20. fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
  21. } else {
  22. fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
  23. }
  24. }
  25. }
  26. #endif
  27. namespace {
  28. // Memtables and sstables that make the DB representation contain
  29. // (userkey,seq,type) => uservalue entries. DBIter
  30. // combines multiple entries for the same userkey found in the DB
  31. // representation into a single entry while accounting for sequence
  32. // numbers, deletion markers, overwrites, etc.
  33. class DBIter : public Iterator {
  34. public:
  35. // Which direction is the iterator currently moving?
  36. // (1) When moving forward, the internal iterator is positioned at
  37. // the exact entry that yields this->key(), this->value()
  38. // (2) When moving backwards, the internal iterator is positioned
  39. // just before all entries whose user key == this->key().
  40. enum Direction { kForward, kReverse };
  41. DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
  42. uint32_t seed)
  43. : db_(db),
  44. user_comparator_(cmp),
  45. iter_(iter),
  46. sequence_(s),
  47. direction_(kForward),
  48. valid_(false),
  49. rnd_(seed),
  50. bytes_until_read_sampling_(RandomCompactionPeriod()) {}
  51. DBIter(const DBIter&) = delete;
  52. DBIter& operator=(const DBIter&) = delete;
  53. virtual ~DBIter() { delete iter_; }
  54. virtual bool Valid() const { return valid_; }
  55. virtual Slice key() const {
  56. assert(valid_);
  57. return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
  58. }
  59. virtual Slice value() const {
  60. assert(valid_);
  61. return (direction_ == kForward) ? iter_->value() : saved_value_;
  62. }
  63. virtual Status status() const {
  64. if (status_.ok()) {
  65. return iter_->status();
  66. } else {
  67. return status_;
  68. }
  69. }
  70. virtual void Next();
  71. virtual void Prev();
  72. virtual void Seek(const Slice& target);
  73. virtual void SeekToFirst();
  74. virtual void SeekToLast();
  75. private:
  76. void FindNextUserEntry(bool skipping, std::string* skip);
  77. void FindPrevUserEntry();
  78. bool ParseKey(ParsedInternalKey* key);
  79. inline void SaveKey(const Slice& k, std::string* dst) {
  80. dst->assign(k.data(), k.size());
  81. }
  82. inline void ClearSavedValue() {
  83. if (saved_value_.capacity() > 1048576) {
  84. std::string empty;
  85. swap(empty, saved_value_);
  86. } else {
  87. saved_value_.clear();
  88. }
  89. }
  90. // Picks the number of bytes that can be read until a compaction is scheduled.
  91. size_t RandomCompactionPeriod() {
  92. return rnd_.Uniform(2 * config::kReadBytesPeriod);
  93. }
  94. DBImpl* db_;
  95. const Comparator* const user_comparator_;
  96. Iterator* const iter_;
  97. SequenceNumber const sequence_;
  98. Status status_;
  99. std::string saved_key_; // == current key when direction_==kReverse
  100. std::string saved_value_; // == current raw value when direction_==kReverse
  101. Direction direction_;
  102. bool valid_;
  103. Random rnd_;
  104. size_t bytes_until_read_sampling_;
  105. };
  106. inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  107. Slice k = iter_->key();
  108. size_t bytes_read = k.size() + iter_->value().size();
  109. while (bytes_until_read_sampling_ < bytes_read) {
  110. bytes_until_read_sampling_ += RandomCompactionPeriod();
  111. db_->RecordReadSample(k);
  112. }
  113. assert(bytes_until_read_sampling_ >= bytes_read);
  114. bytes_until_read_sampling_ -= bytes_read;
  115. if (!ParseInternalKey(k, ikey)) {
  116. status_ = Status::Corruption("corrupted internal key in DBIter");
  117. return false;
  118. } else {
  119. return true;
  120. }
  121. }
  122. void DBIter::Next() {
  123. assert(valid_);
  124. if (direction_ == kReverse) { // Switch directions?
  125. direction_ = kForward;
  126. // iter_ is pointing just before the entries for this->key(),
  127. // so advance into the range of entries for this->key() and then
  128. // use the normal skipping code below.
  129. if (!iter_->Valid()) {
  130. iter_->SeekToFirst();
  131. } else {
  132. iter_->Next();
  133. }
  134. if (!iter_->Valid()) {
  135. valid_ = false;
  136. saved_key_.clear();
  137. return;
  138. }
  139. // saved_key_ already contains the key to skip past.
  140. } else {
  141. // Store in saved_key_ the current key so we skip it below.
  142. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  143. // iter_ is pointing to current key. We can now safely move to the next to
  144. // avoid checking current key.
  145. iter_->Next();
  146. if (!iter_->Valid()) {
  147. valid_ = false;
  148. saved_key_.clear();
  149. return;
  150. }
  151. }
  152. FindNextUserEntry(true, &saved_key_);
  153. }
  154. void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
  155. // Loop until we hit an acceptable entry to yield
  156. assert(iter_->Valid());
  157. assert(direction_ == kForward);
  158. do {
  159. ParsedInternalKey ikey;
  160. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  161. switch (ikey.type) {
  162. case kTypeDeletion:
  163. // Arrange to skip all upcoming entries for this key since
  164. // they are hidden by this deletion.
  165. SaveKey(ikey.user_key, skip);
  166. skipping = true;
  167. break;
  168. case kTypeValue:
  169. if (skipping &&
  170. user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
  171. // Entry hidden
  172. } else {
  173. valid_ = true;
  174. saved_key_.clear();
  175. return;
  176. }
  177. break;
  178. }
  179. }
  180. iter_->Next();
  181. } while (iter_->Valid());
  182. saved_key_.clear();
  183. valid_ = false;
  184. }
  185. void DBIter::Prev() {
  186. assert(valid_);
  187. if (direction_ == kForward) { // Switch directions?
  188. // iter_ is pointing at the current entry. Scan backwards until
  189. // the key changes so we can use the normal reverse scanning code.
  190. assert(iter_->Valid()); // Otherwise valid_ would have been false
  191. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  192. while (true) {
  193. iter_->Prev();
  194. if (!iter_->Valid()) {
  195. valid_ = false;
  196. saved_key_.clear();
  197. ClearSavedValue();
  198. return;
  199. }
  200. if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) <
  201. 0) {
  202. break;
  203. }
  204. }
  205. direction_ = kReverse;
  206. }
  207. FindPrevUserEntry();
  208. }
  209. void DBIter::FindPrevUserEntry() {
  210. assert(direction_ == kReverse);
  211. ValueType value_type = kTypeDeletion;
  212. if (iter_->Valid()) {
  213. do {
  214. ParsedInternalKey ikey;
  215. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  216. if ((value_type != kTypeDeletion) &&
  217. user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
  218. // We encountered a non-deleted value in entries for previous keys,
  219. break;
  220. }
  221. value_type = ikey.type;
  222. if (value_type == kTypeDeletion) {
  223. saved_key_.clear();
  224. ClearSavedValue();
  225. } else {
  226. Slice raw_value = iter_->value();
  227. if (saved_value_.capacity() > raw_value.size() + 1048576) {
  228. std::string empty;
  229. swap(empty, saved_value_);
  230. }
  231. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  232. saved_value_.assign(raw_value.data(), raw_value.size());
  233. }
  234. }
  235. iter_->Prev();
  236. } while (iter_->Valid());
  237. }
  238. if (value_type == kTypeDeletion) {
  239. // End
  240. valid_ = false;
  241. saved_key_.clear();
  242. ClearSavedValue();
  243. direction_ = kForward;
  244. } else {
  245. valid_ = true;
  246. }
  247. }
  248. void DBIter::Seek(const Slice& target) {
  249. direction_ = kForward;
  250. ClearSavedValue();
  251. saved_key_.clear();
  252. AppendInternalKey(&saved_key_,
  253. ParsedInternalKey(target, sequence_, kValueTypeForSeek));
  254. iter_->Seek(saved_key_);
  255. if (iter_->Valid()) {
  256. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  257. } else {
  258. valid_ = false;
  259. }
  260. }
  261. void DBIter::SeekToFirst() {
  262. direction_ = kForward;
  263. ClearSavedValue();
  264. iter_->SeekToFirst();
  265. if (iter_->Valid()) {
  266. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  267. } else {
  268. valid_ = false;
  269. }
  270. }
  271. void DBIter::SeekToLast() {
  272. direction_ = kReverse;
  273. ClearSavedValue();
  274. iter_->SeekToLast();
  275. FindPrevUserEntry();
  276. }
  277. } // anonymous namespace
  278. Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
  279. Iterator* internal_iter, SequenceNumber sequence,
  280. uint32_t seed) {
  281. return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
  282. }
  283. } // namespace leveldb