LevelDB二级索引实现 姚凯文(kevinyao0901) 姜嘉祺
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

558 lines
16 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "gtest/gtest.h"
  5. #include "db/log_reader.h"
  6. #include "db/log_writer.h"
  7. #include "leveldb/env.h"
  8. #include "util/coding.h"
  9. #include "util/crc32c.h"
  10. #include "util/random.h"
  11. namespace leveldb {
  12. namespace log {
  13. // Construct a string of the specified length made out of the supplied
  14. // partial string.
  15. static std::string BigString(const std::string& partial_string, size_t n) {
  16. std::string result;
  17. while (result.size() < n) {
  18. result.append(partial_string);
  19. }
  20. result.resize(n);
  21. return result;
  22. }
  23. // Construct a string from a number
  24. static std::string NumberString(int n) {
  25. char buf[50];
  26. std::snprintf(buf, sizeof(buf), "%d.", n);
  27. return std::string(buf);
  28. }
  29. // Return a skewed potentially long string
  30. static std::string RandomSkewedString(int i, Random* rnd) {
  31. return BigString(NumberString(i), rnd->Skewed(17));
  32. }
  33. class LogTest : public testing::Test {
  34. public:
  35. LogTest()
  36. : reading_(false),
  37. writer_(new Writer(&dest_)),
  38. reader_(new Reader(&source_, &report_, true /*checksum*/,
  39. 0 /*initial_offset*/)) {}
  40. ~LogTest() {
  41. delete writer_;
  42. delete reader_;
  43. }
  44. void ReopenForAppend() {
  45. delete writer_;
  46. writer_ = new Writer(&dest_, dest_.contents_.size());
  47. }
  48. void Write(const std::string& msg) {
  49. ASSERT_TRUE(!reading_) << "Write() after starting to read";
  50. writer_->AddRecord(Slice(msg));
  51. }
  52. size_t WrittenBytes() const { return dest_.contents_.size(); }
  53. std::string Read() {
  54. if (!reading_) {
  55. reading_ = true;
  56. source_.contents_ = Slice(dest_.contents_);
  57. }
  58. std::string scratch;
  59. Slice record;
  60. if (reader_->ReadRecord(&record, &scratch)) {
  61. return record.ToString();
  62. } else {
  63. return "EOF";
  64. }
  65. }
  66. void IncrementByte(int offset, int delta) {
  67. dest_.contents_[offset] += delta;
  68. }
  69. void SetByte(int offset, char new_byte) {
  70. dest_.contents_[offset] = new_byte;
  71. }
  72. void ShrinkSize(int bytes) {
  73. dest_.contents_.resize(dest_.contents_.size() - bytes);
  74. }
  75. void FixChecksum(int header_offset, int len) {
  76. // Compute crc of type/len/data
  77. uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
  78. crc = crc32c::Mask(crc);
  79. EncodeFixed32(&dest_.contents_[header_offset], crc);
  80. }
  81. void ForceError() { source_.force_error_ = true; }
  82. size_t DroppedBytes() const { return report_.dropped_bytes_; }
  83. std::string ReportMessage() const { return report_.message_; }
  84. // Returns OK iff recorded error message contains "msg"
  85. std::string MatchError(const std::string& msg) const {
  86. if (report_.message_.find(msg) == std::string::npos) {
  87. return report_.message_;
  88. } else {
  89. return "OK";
  90. }
  91. }
  92. void WriteInitialOffsetLog() {
  93. for (int i = 0; i < num_initial_offset_records_; i++) {
  94. std::string record(initial_offset_record_sizes_[i],
  95. static_cast<char>('a' + i));
  96. Write(record);
  97. }
  98. }
  99. void StartReadingAt(uint64_t initial_offset) {
  100. delete reader_;
  101. reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
  102. }
  103. void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
  104. WriteInitialOffsetLog();
  105. reading_ = true;
  106. source_.contents_ = Slice(dest_.contents_);
  107. Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
  108. WrittenBytes() + offset_past_end);
  109. Slice record;
  110. std::string scratch;
  111. ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
  112. delete offset_reader;
  113. }
  114. void CheckInitialOffsetRecord(uint64_t initial_offset,
  115. int expected_record_offset) {
  116. WriteInitialOffsetLog();
  117. reading_ = true;
  118. source_.contents_ = Slice(dest_.contents_);
  119. Reader* offset_reader =
  120. new Reader(&source_, &report_, true /*checksum*/, initial_offset);
  121. // Read all records from expected_record_offset through the last one.
  122. ASSERT_LT(expected_record_offset, num_initial_offset_records_);
  123. for (; expected_record_offset < num_initial_offset_records_;
  124. ++expected_record_offset) {
  125. Slice record;
  126. std::string scratch;
  127. ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
  128. ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
  129. record.size());
  130. ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
  131. offset_reader->LastRecordOffset());
  132. ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
  133. }
  134. delete offset_reader;
  135. }
  136. private:
  137. class StringDest : public WritableFile {
  138. public:
  139. Status Close() override { return Status::OK(); }
  140. Status Flush() override { return Status::OK(); }
  141. Status Sync() override { return Status::OK(); }
  142. Status Append(const Slice& slice) override {
  143. contents_.append(slice.data(), slice.size());
  144. return Status::OK();
  145. }
  146. std::string contents_;
  147. };
  148. class StringSource : public SequentialFile {
  149. public:
  150. StringSource() : force_error_(false), returned_partial_(false) {}
  151. Status Read(size_t n, Slice* result, char* scratch) override {
  152. EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
  153. if (force_error_) {
  154. force_error_ = false;
  155. returned_partial_ = true;
  156. return Status::Corruption("read error");
  157. }
  158. if (contents_.size() < n) {
  159. n = contents_.size();
  160. returned_partial_ = true;
  161. }
  162. *result = Slice(contents_.data(), n);
  163. contents_.remove_prefix(n);
  164. return Status::OK();
  165. }
  166. Status Skip(uint64_t n) override {
  167. if (n > contents_.size()) {
  168. contents_.clear();
  169. return Status::NotFound("in-memory file skipped past end");
  170. }
  171. contents_.remove_prefix(n);
  172. return Status::OK();
  173. }
  174. Slice contents_;
  175. bool force_error_;
  176. bool returned_partial_;
  177. };
  178. class ReportCollector : public Reader::Reporter {
  179. public:
  180. ReportCollector() : dropped_bytes_(0) {}
  181. void Corruption(size_t bytes, const Status& status) override {
  182. dropped_bytes_ += bytes;
  183. message_.append(status.ToString());
  184. }
  185. size_t dropped_bytes_;
  186. std::string message_;
  187. };
  188. // Record metadata for testing initial offset functionality
  189. static size_t initial_offset_record_sizes_[];
  190. static uint64_t initial_offset_last_record_offsets_[];
  191. static int num_initial_offset_records_;
  192. StringDest dest_;
  193. StringSource source_;
  194. ReportCollector report_;
  195. bool reading_;
  196. Writer* writer_;
  197. Reader* reader_;
  198. };
  199. size_t LogTest::initial_offset_record_sizes_[] = {
  200. 10000, // Two sizable records in first block
  201. 10000,
  202. 2 * log::kBlockSize - 1000, // Span three blocks
  203. 1,
  204. 13716, // Consume all but two bytes of block 3.
  205. log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
  206. };
  207. uint64_t LogTest::initial_offset_last_record_offsets_[] = {
  208. 0,
  209. kHeaderSize + 10000,
  210. 2 * (kHeaderSize + 10000),
  211. 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
  212. 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
  213. kHeaderSize + 1,
  214. 3 * log::kBlockSize,
  215. };
  216. // LogTest::initial_offset_last_record_offsets_ must be defined before this.
  217. int LogTest::num_initial_offset_records_ =
  218. sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
  219. TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
  220. TEST_F(LogTest, ReadWrite) {
  221. Write("foo");
  222. Write("bar");
  223. Write("");
  224. Write("xxxx");
  225. ASSERT_EQ("foo", Read());
  226. ASSERT_EQ("bar", Read());
  227. ASSERT_EQ("", Read());
  228. ASSERT_EQ("xxxx", Read());
  229. ASSERT_EQ("EOF", Read());
  230. ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
  231. }
  232. TEST_F(LogTest, ManyBlocks) {
  233. for (int i = 0; i < 100000; i++) {
  234. Write(NumberString(i));
  235. }
  236. for (int i = 0; i < 100000; i++) {
  237. ASSERT_EQ(NumberString(i), Read());
  238. }
  239. ASSERT_EQ("EOF", Read());
  240. }
  241. TEST_F(LogTest, Fragmentation) {
  242. Write("small");
  243. Write(BigString("medium", 50000));
  244. Write(BigString("large", 100000));
  245. ASSERT_EQ("small", Read());
  246. ASSERT_EQ(BigString("medium", 50000), Read());
  247. ASSERT_EQ(BigString("large", 100000), Read());
  248. ASSERT_EQ("EOF", Read());
  249. }
  250. TEST_F(LogTest, MarginalTrailer) {
  251. // Make a trailer that is exactly the same length as an empty record.
  252. const int n = kBlockSize - 2 * kHeaderSize;
  253. Write(BigString("foo", n));
  254. ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
  255. Write("");
  256. Write("bar");
  257. ASSERT_EQ(BigString("foo", n), Read());
  258. ASSERT_EQ("", Read());
  259. ASSERT_EQ("bar", Read());
  260. ASSERT_EQ("EOF", Read());
  261. }
  262. TEST_F(LogTest, MarginalTrailer2) {
  263. // Make a trailer that is exactly the same length as an empty record.
  264. const int n = kBlockSize - 2 * kHeaderSize;
  265. Write(BigString("foo", n));
  266. ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
  267. Write("bar");
  268. ASSERT_EQ(BigString("foo", n), Read());
  269. ASSERT_EQ("bar", Read());
  270. ASSERT_EQ("EOF", Read());
  271. ASSERT_EQ(0, DroppedBytes());
  272. ASSERT_EQ("", ReportMessage());
  273. }
  274. TEST_F(LogTest, ShortTrailer) {
  275. const int n = kBlockSize - 2 * kHeaderSize + 4;
  276. Write(BigString("foo", n));
  277. ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
  278. Write("");
  279. Write("bar");
  280. ASSERT_EQ(BigString("foo", n), Read());
  281. ASSERT_EQ("", Read());
  282. ASSERT_EQ("bar", Read());
  283. ASSERT_EQ("EOF", Read());
  284. }
  285. TEST_F(LogTest, AlignedEof) {
  286. const int n = kBlockSize - 2 * kHeaderSize + 4;
  287. Write(BigString("foo", n));
  288. ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
  289. ASSERT_EQ(BigString("foo", n), Read());
  290. ASSERT_EQ("EOF", Read());
  291. }
  292. TEST_F(LogTest, OpenForAppend) {
  293. Write("hello");
  294. ReopenForAppend();
  295. Write("world");
  296. ASSERT_EQ("hello", Read());
  297. ASSERT_EQ("world", Read());
  298. ASSERT_EQ("EOF", Read());
  299. }
  300. TEST_F(LogTest, RandomRead) {
  301. const int N = 500;
  302. Random write_rnd(301);
  303. for (int i = 0; i < N; i++) {
  304. Write(RandomSkewedString(i, &write_rnd));
  305. }
  306. Random read_rnd(301);
  307. for (int i = 0; i < N; i++) {
  308. ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
  309. }
  310. ASSERT_EQ("EOF", Read());
  311. }
  312. // Tests of all the error paths in log_reader.cc follow:
  313. TEST_F(LogTest, ReadError) {
  314. Write("foo");
  315. ForceError();
  316. ASSERT_EQ("EOF", Read());
  317. ASSERT_EQ(kBlockSize, DroppedBytes());
  318. ASSERT_EQ("OK", MatchError("read error"));
  319. }
  320. TEST_F(LogTest, BadRecordType) {
  321. Write("foo");
  322. // Type is stored in header[6]
  323. IncrementByte(6, 100);
  324. FixChecksum(0, 3);
  325. ASSERT_EQ("EOF", Read());
  326. ASSERT_EQ(3, DroppedBytes());
  327. ASSERT_EQ("OK", MatchError("unknown record type"));
  328. }
  329. TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
  330. Write("foo");
  331. ShrinkSize(4); // Drop all payload as well as a header byte
  332. ASSERT_EQ("EOF", Read());
  333. // Truncated last record is ignored, not treated as an error.
  334. ASSERT_EQ(0, DroppedBytes());
  335. ASSERT_EQ("", ReportMessage());
  336. }
  337. TEST_F(LogTest, BadLength) {
  338. const int kPayloadSize = kBlockSize - kHeaderSize;
  339. Write(BigString("bar", kPayloadSize));
  340. Write("foo");
  341. // Least significant size byte is stored in header[4].
  342. IncrementByte(4, 1);
  343. ASSERT_EQ("foo", Read());
  344. ASSERT_EQ(kBlockSize, DroppedBytes());
  345. ASSERT_EQ("OK", MatchError("bad record length"));
  346. }
  347. TEST_F(LogTest, BadLengthAtEndIsIgnored) {
  348. Write("foo");
  349. ShrinkSize(1);
  350. ASSERT_EQ("EOF", Read());
  351. ASSERT_EQ(0, DroppedBytes());
  352. ASSERT_EQ("", ReportMessage());
  353. }
  354. TEST_F(LogTest, ChecksumMismatch) {
  355. Write("foo");
  356. IncrementByte(0, 10);
  357. ASSERT_EQ("EOF", Read());
  358. ASSERT_EQ(10, DroppedBytes());
  359. ASSERT_EQ("OK", MatchError("checksum mismatch"));
  360. }
  361. TEST_F(LogTest, UnexpectedMiddleType) {
  362. Write("foo");
  363. SetByte(6, kMiddleType);
  364. FixChecksum(0, 3);
  365. ASSERT_EQ("EOF", Read());
  366. ASSERT_EQ(3, DroppedBytes());
  367. ASSERT_EQ("OK", MatchError("missing start"));
  368. }
  369. TEST_F(LogTest, UnexpectedLastType) {
  370. Write("foo");
  371. SetByte(6, kLastType);
  372. FixChecksum(0, 3);
  373. ASSERT_EQ("EOF", Read());
  374. ASSERT_EQ(3, DroppedBytes());
  375. ASSERT_EQ("OK", MatchError("missing start"));
  376. }
  377. TEST_F(LogTest, UnexpectedFullType) {
  378. Write("foo");
  379. Write("bar");
  380. SetByte(6, kFirstType);
  381. FixChecksum(0, 3);
  382. ASSERT_EQ("bar", Read());
  383. ASSERT_EQ("EOF", Read());
  384. ASSERT_EQ(3, DroppedBytes());
  385. ASSERT_EQ("OK", MatchError("partial record without end"));
  386. }
  387. TEST_F(LogTest, UnexpectedFirstType) {
  388. Write("foo");
  389. Write(BigString("bar", 100000));
  390. SetByte(6, kFirstType);
  391. FixChecksum(0, 3);
  392. ASSERT_EQ(BigString("bar", 100000), Read());
  393. ASSERT_EQ("EOF", Read());
  394. ASSERT_EQ(3, DroppedBytes());
  395. ASSERT_EQ("OK", MatchError("partial record without end"));
  396. }
  397. TEST_F(LogTest, MissingLastIsIgnored) {
  398. Write(BigString("bar", kBlockSize));
  399. // Remove the LAST block, including header.
  400. ShrinkSize(14);
  401. ASSERT_EQ("EOF", Read());
  402. ASSERT_EQ("", ReportMessage());
  403. ASSERT_EQ(0, DroppedBytes());
  404. }
  405. TEST_F(LogTest, PartialLastIsIgnored) {
  406. Write(BigString("bar", kBlockSize));
  407. // Cause a bad record length in the LAST block.
  408. ShrinkSize(1);
  409. ASSERT_EQ("EOF", Read());
  410. ASSERT_EQ("", ReportMessage());
  411. ASSERT_EQ(0, DroppedBytes());
  412. }
  413. TEST_F(LogTest, SkipIntoMultiRecord) {
  414. // Consider a fragmented record:
  415. // first(R1), middle(R1), last(R1), first(R2)
  416. // If initial_offset points to a record after first(R1) but before first(R2)
  417. // incomplete fragment errors are not actual errors, and must be suppressed
  418. // until a new first or full record is encountered.
  419. Write(BigString("foo", 3 * kBlockSize));
  420. Write("correct");
  421. StartReadingAt(kBlockSize);
  422. ASSERT_EQ("correct", Read());
  423. ASSERT_EQ("", ReportMessage());
  424. ASSERT_EQ(0, DroppedBytes());
  425. ASSERT_EQ("EOF", Read());
  426. }
  427. TEST_F(LogTest, ErrorJoinsRecords) {
  428. // Consider two fragmented records:
  429. // first(R1) last(R1) first(R2) last(R2)
  430. // where the middle two fragments disappear. We do not want
  431. // first(R1),last(R2) to get joined and returned as a valid record.
  432. // Write records that span two blocks
  433. Write(BigString("foo", kBlockSize));
  434. Write(BigString("bar", kBlockSize));
  435. Write("correct");
  436. // Wipe the middle block
  437. for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
  438. SetByte(offset, 'x');
  439. }
  440. ASSERT_EQ("correct", Read());
  441. ASSERT_EQ("EOF", Read());
  442. const size_t dropped = DroppedBytes();
  443. ASSERT_LE(dropped, 2 * kBlockSize + 100);
  444. ASSERT_GE(dropped, 2 * kBlockSize);
  445. }
  446. TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
  447. TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
  448. TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
  449. TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
  450. TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
  451. TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
  452. TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
  453. TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
  454. CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
  455. }
  456. TEST_F(LogTest, ReadFourthMiddleBlock) {
  457. CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
  458. }
  459. TEST_F(LogTest, ReadFourthLastBlock) {
  460. CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
  461. }
  462. TEST_F(LogTest, ReadFourthStart) {
  463. CheckInitialOffsetRecord(
  464. 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
  465. 3);
  466. }
  467. TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) {
  468. CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
  469. }
  470. TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
  471. TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
  472. } // namespace log
  473. } // namespace leveldb