作者: 韩晨旭 10225101440 李畅 10225102463
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 line
11 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_iter.h"
  5. #include "db/filename.h"
  6. #include "db/dbformat.h"
  7. #include "include/env.h"
  8. #include "include/iterator.h"
  9. #include "port/port.h"
  10. #include "util/logging.h"
  11. #include "util/mutexlock.h"
  12. namespace leveldb {
  13. #if 0
  14. static void DumpInternalIter(Iterator* iter) {
  15. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  16. ParsedInternalKey k;
  17. if (!ParseInternalKey(iter->key(), &k)) {
  18. fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
  19. } else {
  20. fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
  21. }
  22. }
  23. }
  24. #endif
  25. namespace {
  26. // Memtables and sstables that make the DB representation contain
  27. // (userkey,seq,type) => uservalue entries. DBIter
  28. // combines multiple entries for the same userkey found in the DB
  29. // representation into a single entry while accounting for sequence
  30. // numbers, deletion markers, overwrites, etc.
  31. class DBIter: public Iterator {
  32. public:
  33. // Which direction is the iterator currently moving?
  34. // (1) When moving forward, the internal iterator is positioned at
  35. // the exact entry that yields this->key(), this->value()
  36. // (2) When moving backwards, the internal iterator is positioned
  37. // just before all entries whose user key == this->key().
  38. enum Direction {
  39. kForward,
  40. kReverse
  41. };
  42. DBIter(const std::string* dbname, Env* env,
  43. const Comparator* cmp, Iterator* iter, SequenceNumber s)
  44. : dbname_(dbname),
  45. env_(env),
  46. user_comparator_(cmp),
  47. iter_(iter),
  48. sequence_(s),
  49. large_(NULL),
  50. direction_(kForward),
  51. valid_(false) {
  52. }
  53. virtual ~DBIter() {
  54. delete iter_;
  55. delete large_;
  56. }
  57. virtual bool Valid() const { return valid_; }
  58. virtual Slice key() const {
  59. assert(valid_);
  60. return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
  61. }
  62. virtual Slice value() const {
  63. assert(valid_);
  64. Slice raw_value = (direction_ == kForward) ? iter_->value() : saved_value_;
  65. if (large_ == NULL) {
  66. return raw_value;
  67. } else {
  68. MutexLock l(&large_->mutex);
  69. if (!large_->produced) {
  70. ReadIndirectValue(raw_value);
  71. }
  72. return large_->value;
  73. }
  74. }
  75. virtual Status status() const {
  76. if (status_.ok()) {
  77. if (large_ != NULL && !large_->status.ok()) return large_->status;
  78. return iter_->status();
  79. } else {
  80. return status_;
  81. }
  82. }
  83. virtual void Next();
  84. virtual void Prev();
  85. virtual void Seek(const Slice& target);
  86. virtual void SeekToFirst();
  87. virtual void SeekToLast();
  88. private:
  89. struct Large {
  90. port::Mutex mutex;
  91. std::string value;
  92. bool produced;
  93. Status status;
  94. };
  95. void FindNextUserEntry(bool skipping, std::string* skip);
  96. void FindPrevUserEntry();
  97. bool ParseKey(ParsedInternalKey* key);
  98. void ReadIndirectValue(Slice ref) const;
  99. inline void SaveKey(const Slice& k, std::string* dst) {
  100. dst->assign(k.data(), k.size());
  101. }
  102. inline void ForgetLargeValue() {
  103. if (large_ != NULL) {
  104. delete large_;
  105. large_ = NULL;
  106. }
  107. }
  108. inline void ClearSavedValue() {
  109. if (saved_value_.capacity() > 1048576) {
  110. std::string empty;
  111. swap(empty, saved_value_);
  112. } else {
  113. saved_value_.clear();
  114. }
  115. }
  116. const std::string* const dbname_;
  117. Env* const env_;
  118. const Comparator* const user_comparator_;
  119. Iterator* const iter_;
  120. SequenceNumber const sequence_;
  121. Status status_;
  122. std::string saved_key_; // == current key when direction_==kReverse
  123. std::string saved_value_; // == current raw value when direction_==kReverse
  124. Large* large_; // Non-NULL if value is an indirect reference
  125. Direction direction_;
  126. bool valid_;
  127. // No copying allowed
  128. DBIter(const DBIter&);
  129. void operator=(const DBIter&);
  130. };
  131. inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
  132. if (!ParseInternalKey(iter_->key(), ikey)) {
  133. status_ = Status::Corruption("corrupted internal key in DBIter");
  134. return false;
  135. } else {
  136. return true;
  137. }
  138. }
  139. void DBIter::Next() {
  140. assert(valid_);
  141. ForgetLargeValue();
  142. if (direction_ == kReverse) { // Switch directions?
  143. direction_ = kForward;
  144. // iter_ is pointing just before the entries for this->key(),
  145. // so advance into the range of entries for this->key() and then
  146. // use the normal skipping code below.
  147. if (!iter_->Valid()) {
  148. iter_->SeekToFirst();
  149. } else {
  150. iter_->Next();
  151. }
  152. if (!iter_->Valid()) {
  153. valid_ = false;
  154. saved_key_.clear();
  155. return;
  156. }
  157. }
  158. // Temporarily use saved_key_ as storage for key to skip.
  159. std::string* skip = &saved_key_;
  160. SaveKey(ExtractUserKey(iter_->key()), skip);
  161. FindNextUserEntry(true, skip);
  162. }
  163. void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
  164. // Loop until we hit an acceptable entry to yield
  165. assert(iter_->Valid());
  166. assert(direction_ == kForward);
  167. assert(large_ == NULL);
  168. do {
  169. ParsedInternalKey ikey;
  170. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  171. switch (ikey.type) {
  172. case kTypeDeletion:
  173. // Arrange to skip all upcoming entries for this key since
  174. // they are hidden by this deletion.
  175. SaveKey(ikey.user_key, skip);
  176. skipping = true;
  177. break;
  178. case kTypeValue:
  179. case kTypeLargeValueRef:
  180. if (skipping &&
  181. user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
  182. // Entry hidden
  183. } else {
  184. valid_ = true;
  185. saved_key_.clear();
  186. if (ikey.type == kTypeLargeValueRef) {
  187. large_ = new Large;
  188. large_->produced = false;
  189. }
  190. return;
  191. }
  192. break;
  193. }
  194. }
  195. iter_->Next();
  196. } while (iter_->Valid());
  197. saved_key_.clear();
  198. valid_ = false;
  199. }
  200. void DBIter::Prev() {
  201. assert(valid_);
  202. ForgetLargeValue();
  203. if (direction_ == kForward) { // Switch directions?
  204. // iter_ is pointing at the current entry. Scan backwards until
  205. // the key changes so we can use the normal reverse scanning code.
  206. assert(iter_->Valid()); // Otherwise valid_ would have been false
  207. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  208. while (true) {
  209. iter_->Prev();
  210. if (!iter_->Valid()) {
  211. valid_ = false;
  212. saved_key_.clear();
  213. ClearSavedValue();
  214. return;
  215. }
  216. if (user_comparator_->Compare(ExtractUserKey(iter_->key()),
  217. saved_key_) < 0) {
  218. break;
  219. }
  220. }
  221. direction_ = kReverse;
  222. }
  223. FindPrevUserEntry();
  224. }
  225. void DBIter::FindPrevUserEntry() {
  226. assert(direction_ == kReverse);
  227. assert(large_ == NULL);
  228. ValueType value_type = kTypeDeletion;
  229. if (iter_->Valid()) {
  230. SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
  231. do {
  232. ParsedInternalKey ikey;
  233. if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
  234. if ((value_type != kTypeDeletion) &&
  235. user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
  236. // We encountered a non-deleted value in entries for previous keys,
  237. break;
  238. }
  239. value_type = ikey.type;
  240. if (value_type == kTypeDeletion) {
  241. ClearSavedValue();
  242. } else {
  243. Slice raw_value = iter_->value();
  244. if (saved_value_.capacity() > raw_value.size() + 1048576) {
  245. std::string empty;
  246. swap(empty, saved_value_);
  247. }
  248. saved_value_.assign(raw_value.data(), raw_value.size());
  249. }
  250. }
  251. iter_->Prev();
  252. } while (iter_->Valid());
  253. }
  254. if (value_type == kTypeDeletion) {
  255. // End
  256. valid_ = false;
  257. saved_key_.clear();
  258. ClearSavedValue();
  259. direction_ = kForward;
  260. } else {
  261. valid_ = true;
  262. if (value_type == kTypeLargeValueRef) {
  263. large_ = new Large;
  264. large_->produced = false;
  265. }
  266. }
  267. }
  268. void DBIter::Seek(const Slice& target) {
  269. direction_ = kForward;
  270. ForgetLargeValue();
  271. ClearSavedValue();
  272. saved_key_.clear();
  273. AppendInternalKey(
  274. &saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek));
  275. iter_->Seek(saved_key_);
  276. if (iter_->Valid()) {
  277. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  278. } else {
  279. valid_ = false;
  280. }
  281. }
  282. void DBIter::SeekToFirst() {
  283. direction_ = kForward;
  284. ForgetLargeValue();
  285. ClearSavedValue();
  286. iter_->SeekToFirst();
  287. if (iter_->Valid()) {
  288. FindNextUserEntry(false, &saved_key_ /* temporary storage */);
  289. } else {
  290. valid_ = false;
  291. }
  292. }
  293. void DBIter::SeekToLast() {
  294. direction_ = kReverse;
  295. ForgetLargeValue();
  296. ClearSavedValue();
  297. iter_->SeekToLast();
  298. FindPrevUserEntry();
  299. }
  300. void DBIter::ReadIndirectValue(Slice ref) const {
  301. assert(!large_->produced);
  302. large_->produced = true;
  303. LargeValueRef large_ref;
  304. if (ref.size() != LargeValueRef::ByteSize()) {
  305. large_->status = Status::Corruption("malformed large value reference");
  306. return;
  307. }
  308. memcpy(large_ref.data, ref.data(), LargeValueRef::ByteSize());
  309. std::string fname = LargeValueFileName(*dbname_, large_ref);
  310. RandomAccessFile* file;
  311. Status s = env_->NewRandomAccessFile(fname, &file);
  312. uint64_t file_size = 0;
  313. if (s.ok()) {
  314. s = env_->GetFileSize(fname, &file_size);
  315. }
  316. if (s.ok()) {
  317. uint64_t value_size = large_ref.ValueSize();
  318. large_->value.resize(value_size);
  319. Slice result;
  320. s = file->Read(0, file_size, &result,
  321. const_cast<char*>(large_->value.data()));
  322. if (s.ok()) {
  323. if (result.size() == file_size) {
  324. switch (large_ref.compression_type()) {
  325. case kNoCompression: {
  326. if (result.data() != large_->value.data()) {
  327. large_->value.assign(result.data(), result.size());
  328. }
  329. break;
  330. }
  331. case kSnappyCompression: {
  332. std::string uncompressed;
  333. if (port::Snappy_Uncompress(result.data(), result.size(),
  334. &uncompressed) &&
  335. uncompressed.size() == large_ref.ValueSize()) {
  336. swap(uncompressed, large_->value);
  337. } else {
  338. s = Status::Corruption(
  339. "Unable to read entire compressed large value file");
  340. }
  341. }
  342. }
  343. } else {
  344. s = Status::Corruption("Unable to read entire large value file");
  345. }
  346. }
  347. delete file; // Ignore errors on closing
  348. }
  349. if (!s.ok()) {
  350. large_->value.clear();
  351. large_->status = s;
  352. }
  353. }
  354. } // anonymous namespace
  355. Iterator* NewDBIterator(
  356. const std::string* dbname,
  357. Env* env,
  358. const Comparator* user_key_comparator,
  359. Iterator* internal_iter,
  360. const SequenceNumber& sequence) {
  361. return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
  362. }
  363. }