LevelDB project 1 10225501460 林子骥 10211900416 郭夏辉
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

451 строка
13 KiB

1 месяц назад
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // We recover the contents of the descriptor from the other files we find.
  6. // (1) Any log files are first converted to tables
  7. // (2) We scan every table to compute
  8. // (a) smallest/largest for the table
  9. // (b) largest sequence number in the table
  10. // (3) We generate descriptor contents:
  11. // - log number is set to zero
  12. // - next-file-number is set to 1 + largest file number we found
  13. // - last-sequence-number is set to largest sequence# found across
  14. // all tables (see 2c)
  15. // - compaction pointers are cleared
  16. // - every table file is added at level 0
  17. //
  18. // Possible optimization 1:
  19. // (a) Compute total size and use to pick appropriate max-level M
  20. // (b) Sort tables by largest sequence# in the table
  21. // (c) For each table: if it overlaps earlier table, place in level-0,
  22. // else place in level-M.
  23. // Possible optimization 2:
  24. // Store per-table metadata (smallest, largest, largest-seq#, ...)
  25. // in the table's meta section to speed up ScanTable.
  26. #include "db/builder.h"
  27. #include "db/db_impl.h"
  28. #include "db/dbformat.h"
  29. #include "db/filename.h"
  30. #include "db/log_reader.h"
  31. #include "db/log_writer.h"
  32. #include "db/memtable.h"
  33. #include "db/table_cache.h"
  34. #include "db/version_edit.h"
  35. #include "db/write_batch_internal.h"
  36. #include "leveldb/comparator.h"
  37. #include "leveldb/db.h"
  38. #include "leveldb/env.h"
  39. namespace leveldb {
  40. namespace {
  41. class Repairer {
  42. public:
  43. Repairer(const std::string& dbname, const Options& options)
  44. : dbname_(dbname),
  45. env_(options.env),
  46. icmp_(options.comparator),
  47. ipolicy_(options.filter_policy),
  48. options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
  49. owns_info_log_(options_.info_log != options.info_log),
  50. owns_cache_(options_.block_cache != options.block_cache),
  51. next_file_number_(1) {
  52. // TableCache can be small since we expect each table to be opened once.
  53. table_cache_ = new TableCache(dbname_, options_, 10);
  54. }
  55. ~Repairer() {
  56. delete table_cache_;
  57. if (owns_info_log_) {
  58. delete options_.info_log;
  59. }
  60. if (owns_cache_) {
  61. delete options_.block_cache;
  62. }
  63. }
  64. Status Run() {
  65. Status status = FindFiles();
  66. if (status.ok()) {
  67. ConvertLogFilesToTables();
  68. ExtractMetaData();
  69. status = WriteDescriptor();
  70. }
  71. if (status.ok()) {
  72. unsigned long long bytes = 0;
  73. for (size_t i = 0; i < tables_.size(); i++) {
  74. bytes += tables_[i].meta.file_size;
  75. }
  76. Log(options_.info_log,
  77. "**** Repaired leveldb %s; "
  78. "recovered %d files; %llu bytes. "
  79. "Some data may have been lost. "
  80. "****",
  81. dbname_.c_str(), static_cast<int>(tables_.size()), bytes);
  82. }
  83. return status;
  84. }
  85. private:
  86. struct TableInfo {
  87. FileMetaData meta;
  88. SequenceNumber max_sequence;
  89. };
  90. Status FindFiles() {
  91. std::vector<std::string> filenames;
  92. Status status = env_->GetChildren(dbname_, &filenames);
  93. if (!status.ok()) {
  94. return status;
  95. }
  96. if (filenames.empty()) {
  97. return Status::IOError(dbname_, "repair found no files");
  98. }
  99. uint64_t number;
  100. FileType type;
  101. for (size_t i = 0; i < filenames.size(); i++) {
  102. if (ParseFileName(filenames[i], &number, &type)) {
  103. if (type == kDescriptorFile) {
  104. manifests_.push_back(filenames[i]);
  105. } else {
  106. if (number + 1 > next_file_number_) {
  107. next_file_number_ = number + 1;
  108. }
  109. if (type == kLogFile) {
  110. logs_.push_back(number);
  111. } else if (type == kTableFile) {
  112. table_numbers_.push_back(number);
  113. } else {
  114. // Ignore other files
  115. }
  116. }
  117. }
  118. }
  119. return status;
  120. }
  121. void ConvertLogFilesToTables() {
  122. for (size_t i = 0; i < logs_.size(); i++) {
  123. std::string logname = LogFileName(dbname_, logs_[i]);
  124. Status status = ConvertLogToTable(logs_[i]);
  125. if (!status.ok()) {
  126. Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
  127. (unsigned long long)logs_[i], status.ToString().c_str());
  128. }
  129. ArchiveFile(logname);
  130. }
  131. }
  132. Status ConvertLogToTable(uint64_t log) {
  133. struct LogReporter : public log::Reader::Reporter {
  134. Env* env;
  135. Logger* info_log;
  136. uint64_t lognum;
  137. void Corruption(size_t bytes, const Status& s) override {
  138. // We print error messages for corruption, but continue repairing.
  139. Log(info_log, "Log #%llu: dropping %d bytes; %s",
  140. (unsigned long long)lognum, static_cast<int>(bytes),
  141. s.ToString().c_str());
  142. }
  143. };
  144. // Open the log file
  145. std::string logname = LogFileName(dbname_, log);
  146. SequentialFile* lfile;
  147. Status status = env_->NewSequentialFile(logname, &lfile);
  148. if (!status.ok()) {
  149. return status;
  150. }
  151. // Create the log reader.
  152. LogReporter reporter;
  153. reporter.env = env_;
  154. reporter.info_log = options_.info_log;
  155. reporter.lognum = log;
  156. // We intentionally make log::Reader do checksumming so that
  157. // corruptions cause entire commits to be skipped instead of
  158. // propagating bad information (like overly large sequence
  159. // numbers).
  160. log::Reader reader(lfile, &reporter, false /*do not checksum*/,
  161. 0 /*initial_offset*/);
  162. // Read all the records and add to a memtable
  163. std::string scratch;
  164. Slice record;
  165. WriteBatch batch;
  166. MemTable* mem = new MemTable(icmp_);
  167. mem->Ref();
  168. int counter = 0;
  169. while (reader.ReadRecord(&record, &scratch)) {
  170. if (record.size() < 12) {
  171. reporter.Corruption(record.size(),
  172. Status::Corruption("log record too small"));
  173. continue;
  174. }
  175. WriteBatchInternal::SetContents(&batch, record);
  176. status = WriteBatchInternal::InsertInto(&batch, mem);
  177. if (status.ok()) {
  178. counter += WriteBatchInternal::Count(&batch);
  179. } else {
  180. Log(options_.info_log, "Log #%llu: ignoring %s",
  181. (unsigned long long)log, status.ToString().c_str());
  182. status = Status::OK(); // Keep going with rest of file
  183. }
  184. }
  185. delete lfile;
  186. // Do not record a version edit for this conversion to a Table
  187. // since ExtractMetaData() will also generate edits.
  188. FileMetaData meta;
  189. meta.number = next_file_number_++;
  190. Iterator* iter = mem->NewIterator();
  191. status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
  192. delete iter;
  193. mem->Unref();
  194. mem = nullptr;
  195. if (status.ok()) {
  196. if (meta.file_size > 0) {
  197. table_numbers_.push_back(meta.number);
  198. }
  199. }
  200. Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
  201. (unsigned long long)log, counter, (unsigned long long)meta.number,
  202. status.ToString().c_str());
  203. return status;
  204. }
  205. void ExtractMetaData() {
  206. for (size_t i = 0; i < table_numbers_.size(); i++) {
  207. ScanTable(table_numbers_[i]);
  208. }
  209. }
  210. Iterator* NewTableIterator(const FileMetaData& meta) {
  211. // Same as compaction iterators: if paranoid_checks are on, turn
  212. // on checksum verification.
  213. ReadOptions r;
  214. r.verify_checksums = options_.paranoid_checks;
  215. return table_cache_->NewIterator(r, meta.number, meta.file_size);
  216. }
  217. void ScanTable(uint64_t number) {
  218. TableInfo t;
  219. t.meta.number = number;
  220. std::string fname = TableFileName(dbname_, number);
  221. Status status = env_->GetFileSize(fname, &t.meta.file_size);
  222. if (!status.ok()) {
  223. // Try alternate file name.
  224. fname = SSTTableFileName(dbname_, number);
  225. Status s2 = env_->GetFileSize(fname, &t.meta.file_size);
  226. if (s2.ok()) {
  227. status = Status::OK();
  228. }
  229. }
  230. if (!status.ok()) {
  231. ArchiveFile(TableFileName(dbname_, number));
  232. ArchiveFile(SSTTableFileName(dbname_, number));
  233. Log(options_.info_log, "Table #%llu: dropped: %s",
  234. (unsigned long long)t.meta.number, status.ToString().c_str());
  235. return;
  236. }
  237. // Extract metadata by scanning through table.
  238. int counter = 0;
  239. Iterator* iter = NewTableIterator(t.meta);
  240. bool empty = true;
  241. ParsedInternalKey parsed;
  242. t.max_sequence = 0;
  243. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  244. Slice key = iter->key();
  245. if (!ParseInternalKey(key, &parsed)) {
  246. Log(options_.info_log, "Table #%llu: unparsable key %s",
  247. (unsigned long long)t.meta.number, EscapeString(key).c_str());
  248. continue;
  249. }
  250. counter++;
  251. if (empty) {
  252. empty = false;
  253. t.meta.smallest.DecodeFrom(key);
  254. }
  255. t.meta.largest.DecodeFrom(key);
  256. if (parsed.sequence > t.max_sequence) {
  257. t.max_sequence = parsed.sequence;
  258. }
  259. }
  260. if (!iter->status().ok()) {
  261. status = iter->status();
  262. }
  263. delete iter;
  264. Log(options_.info_log, "Table #%llu: %d entries %s",
  265. (unsigned long long)t.meta.number, counter, status.ToString().c_str());
  266. if (status.ok()) {
  267. tables_.push_back(t);
  268. } else {
  269. RepairTable(fname, t); // RepairTable archives input file.
  270. }
  271. }
  272. void RepairTable(const std::string& src, TableInfo t) {
  273. // We will copy src contents to a new table and then rename the
  274. // new table over the source.
  275. // Create builder.
  276. std::string copy = TableFileName(dbname_, next_file_number_++);
  277. WritableFile* file;
  278. Status s = env_->NewWritableFile(copy, &file);
  279. if (!s.ok()) {
  280. return;
  281. }
  282. TableBuilder* builder = new TableBuilder(options_, file);
  283. // Copy data.
  284. Iterator* iter = NewTableIterator(t.meta);
  285. int counter = 0;
  286. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  287. builder->Add(iter->key(), iter->value());
  288. counter++;
  289. }
  290. delete iter;
  291. ArchiveFile(src);
  292. if (counter == 0) {
  293. builder->Abandon(); // Nothing to save
  294. } else {
  295. s = builder->Finish();
  296. if (s.ok()) {
  297. t.meta.file_size = builder->FileSize();
  298. }
  299. }
  300. delete builder;
  301. builder = nullptr;
  302. if (s.ok()) {
  303. s = file->Close();
  304. }
  305. delete file;
  306. file = nullptr;
  307. if (counter > 0 && s.ok()) {
  308. std::string orig = TableFileName(dbname_, t.meta.number);
  309. s = env_->RenameFile(copy, orig);
  310. if (s.ok()) {
  311. Log(options_.info_log, "Table #%llu: %d entries repaired",
  312. (unsigned long long)t.meta.number, counter);
  313. tables_.push_back(t);
  314. }
  315. }
  316. if (!s.ok()) {
  317. env_->RemoveFile(copy);
  318. }
  319. }
  320. Status WriteDescriptor() {
  321. std::string tmp = TempFileName(dbname_, 1);
  322. WritableFile* file;
  323. Status status = env_->NewWritableFile(tmp, &file);
  324. if (!status.ok()) {
  325. return status;
  326. }
  327. SequenceNumber max_sequence = 0;
  328. for (size_t i = 0; i < tables_.size(); i++) {
  329. if (max_sequence < tables_[i].max_sequence) {
  330. max_sequence = tables_[i].max_sequence;
  331. }
  332. }
  333. edit_.SetComparatorName(icmp_.user_comparator()->Name());
  334. edit_.SetLogNumber(0);
  335. edit_.SetNextFile(next_file_number_);
  336. edit_.SetLastSequence(max_sequence);
  337. for (size_t i = 0; i < tables_.size(); i++) {
  338. // TODO(opt): separate out into multiple levels
  339. const TableInfo& t = tables_[i];
  340. edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest,
  341. t.meta.largest);
  342. }
  343. // std::fprintf(stderr,
  344. // "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
  345. {
  346. log::Writer log(file);
  347. std::string record;
  348. edit_.EncodeTo(&record);
  349. status = log.AddRecord(record);
  350. }
  351. if (status.ok()) {
  352. status = file->Close();
  353. }
  354. delete file;
  355. file = nullptr;
  356. if (!status.ok()) {
  357. env_->RemoveFile(tmp);
  358. } else {
  359. // Discard older manifests
  360. for (size_t i = 0; i < manifests_.size(); i++) {
  361. ArchiveFile(dbname_ + "/" + manifests_[i]);
  362. }
  363. // Install new manifest
  364. status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
  365. if (status.ok()) {
  366. status = SetCurrentFile(env_, dbname_, 1);
  367. } else {
  368. env_->RemoveFile(tmp);
  369. }
  370. }
  371. return status;
  372. }
  373. void ArchiveFile(const std::string& fname) {
  374. // Move into another directory. E.g., for
  375. // dir/foo
  376. // rename to
  377. // dir/lost/foo
  378. const char* slash = strrchr(fname.c_str(), '/');
  379. std::string new_dir;
  380. if (slash != nullptr) {
  381. new_dir.assign(fname.data(), slash - fname.data());
  382. }
  383. new_dir.append("/lost");
  384. env_->CreateDir(new_dir); // Ignore error
  385. std::string new_file = new_dir;
  386. new_file.append("/");
  387. new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
  388. Status s = env_->RenameFile(fname, new_file);
  389. Log(options_.info_log, "Archiving %s: %s\n", fname.c_str(),
  390. s.ToString().c_str());
  391. }
  392. const std::string dbname_;
  393. Env* const env_;
  394. InternalKeyComparator const icmp_;
  395. InternalFilterPolicy const ipolicy_;
  396. const Options options_;
  397. bool owns_info_log_;
  398. bool owns_cache_;
  399. TableCache* table_cache_;
  400. VersionEdit edit_;
  401. std::vector<std::string> manifests_;
  402. std::vector<uint64_t> table_numbers_;
  403. std::vector<uint64_t> logs_;
  404. std::vector<TableInfo> tables_;
  405. uint64_t next_file_number_;
  406. };
  407. } // namespace
  408. Status RepairDB(const std::string& dbname, const Options& options) {
  409. Repairer repairer(dbname, options);
  410. return repairer.Run();
  411. }
  412. } // namespace leveldb