作者: 谢瑞阳 10225101483 徐翔宇 10225101535
Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

389 righe
12 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. //
  5. // We recover the contents of the descriptor from the other files we find.
  6. // (1) Any log files are first converted to tables
  7. // (2) We scan every table to compute
  8. // (a) smallest/largest for the table
  9. // (b) largest sequence number in the table
  10. // (3) We generate descriptor contents:
  11. // - log number is set to zero
  12. // - next-file-number is set to 1 + largest file number we found
  13. // - last-sequence-number is set to largest sequence# found across
  14. // all tables (see 2c)
  15. // - compaction pointers are cleared
  16. // - every table file is added at level 0
  17. //
  18. // Possible optimization 1:
  19. // (a) Compute total size and use to pick appropriate max-level M
  20. // (b) Sort tables by largest sequence# in the table
  21. // (c) For each table: if it overlaps earlier table, place in level-0,
  22. // else place in level-M.
  23. // Possible optimization 2:
  24. // Store per-table metadata (smallest, largest, largest-seq#, ...)
  25. // in the table's meta section to speed up ScanTable.
  26. #include "db/builder.h"
  27. #include "db/db_impl.h"
  28. #include "db/dbformat.h"
  29. #include "db/filename.h"
  30. #include "db/log_reader.h"
  31. #include "db/log_writer.h"
  32. #include "db/memtable.h"
  33. #include "db/table_cache.h"
  34. #include "db/version_edit.h"
  35. #include "db/write_batch_internal.h"
  36. #include "leveldb/comparator.h"
  37. #include "leveldb/db.h"
  38. #include "leveldb/env.h"
  39. namespace leveldb {
  40. namespace {
  41. class Repairer {
  42. public:
  43. Repairer(const std::string& dbname, const Options& options)
  44. : dbname_(dbname),
  45. env_(options.env),
  46. icmp_(options.comparator),
  47. ipolicy_(options.filter_policy),
  48. options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
  49. owns_info_log_(options_.info_log != options.info_log),
  50. owns_cache_(options_.block_cache != options.block_cache),
  51. next_file_number_(1) {
  52. // TableCache can be small since we expect each table to be opened once.
  53. table_cache_ = new TableCache(dbname_, &options_, 10);
  54. }
  55. ~Repairer() {
  56. delete table_cache_;
  57. if (owns_info_log_) {
  58. delete options_.info_log;
  59. }
  60. if (owns_cache_) {
  61. delete options_.block_cache;
  62. }
  63. }
  64. Status Run() {
  65. Status status = FindFiles();
  66. if (status.ok()) {
  67. ConvertLogFilesToTables();
  68. ExtractMetaData();
  69. status = WriteDescriptor();
  70. }
  71. if (status.ok()) {
  72. unsigned long long bytes = 0;
  73. for (size_t i = 0; i < tables_.size(); i++) {
  74. bytes += tables_[i].meta.file_size;
  75. }
  76. Log(options_.info_log,
  77. "**** Repaired leveldb %s; "
  78. "recovered %d files; %llu bytes. "
  79. "Some data may have been lost. "
  80. "****",
  81. dbname_.c_str(),
  82. static_cast<int>(tables_.size()),
  83. bytes);
  84. }
  85. return status;
  86. }
  87. private:
  88. struct TableInfo {
  89. FileMetaData meta;
  90. SequenceNumber max_sequence;
  91. };
  92. std::string const dbname_;
  93. Env* const env_;
  94. InternalKeyComparator const icmp_;
  95. InternalFilterPolicy const ipolicy_;
  96. Options const options_;
  97. bool owns_info_log_;
  98. bool owns_cache_;
  99. TableCache* table_cache_;
  100. VersionEdit edit_;
  101. std::vector<std::string> manifests_;
  102. std::vector<uint64_t> table_numbers_;
  103. std::vector<uint64_t> logs_;
  104. std::vector<TableInfo> tables_;
  105. uint64_t next_file_number_;
  106. Status FindFiles() {
  107. std::vector<std::string> filenames;
  108. Status status = env_->GetChildren(dbname_, &filenames);
  109. if (!status.ok()) {
  110. return status;
  111. }
  112. if (filenames.empty()) {
  113. return Status::IOError(dbname_, "repair found no files");
  114. }
  115. uint64_t number;
  116. FileType type;
  117. for (size_t i = 0; i < filenames.size(); i++) {
  118. if (ParseFileName(filenames[i], &number, &type)) {
  119. if (type == kDescriptorFile) {
  120. manifests_.push_back(filenames[i]);
  121. } else {
  122. if (number + 1 > next_file_number_) {
  123. next_file_number_ = number + 1;
  124. }
  125. if (type == kLogFile) {
  126. logs_.push_back(number);
  127. } else if (type == kTableFile) {
  128. table_numbers_.push_back(number);
  129. } else {
  130. // Ignore other files
  131. }
  132. }
  133. }
  134. }
  135. return status;
  136. }
  137. void ConvertLogFilesToTables() {
  138. for (size_t i = 0; i < logs_.size(); i++) {
  139. std::string logname = LogFileName(dbname_, logs_[i]);
  140. Status status = ConvertLogToTable(logs_[i]);
  141. if (!status.ok()) {
  142. Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
  143. (unsigned long long) logs_[i],
  144. status.ToString().c_str());
  145. }
  146. ArchiveFile(logname);
  147. }
  148. }
  149. Status ConvertLogToTable(uint64_t log) {
  150. struct LogReporter : public log::Reader::Reporter {
  151. Env* env;
  152. Logger* info_log;
  153. uint64_t lognum;
  154. virtual void Corruption(size_t bytes, const Status& s) {
  155. // We print error messages for corruption, but continue repairing.
  156. Log(info_log, "Log #%llu: dropping %d bytes; %s",
  157. (unsigned long long) lognum,
  158. static_cast<int>(bytes),
  159. s.ToString().c_str());
  160. }
  161. };
  162. // Open the log file
  163. std::string logname = LogFileName(dbname_, log);
  164. SequentialFile* lfile;
  165. Status status = env_->NewSequentialFile(logname, &lfile);
  166. if (!status.ok()) {
  167. return status;
  168. }
  169. // Create the log reader.
  170. LogReporter reporter;
  171. reporter.env = env_;
  172. reporter.info_log = options_.info_log;
  173. reporter.lognum = log;
  174. // We intentially make log::Reader do checksumming so that
  175. // corruptions cause entire commits to be skipped instead of
  176. // propagating bad information (like overly large sequence
  177. // numbers).
  178. log::Reader reader(lfile, &reporter, false/*do not checksum*/,
  179. 0/*initial_offset*/);
  180. // Read all the records and add to a memtable
  181. std::string scratch;
  182. Slice record;
  183. WriteBatch batch;
  184. MemTable* mem = new MemTable(icmp_);
  185. mem->Ref();
  186. int counter = 0;
  187. while (reader.ReadRecord(&record, &scratch)) {
  188. if (record.size() < 12) {
  189. reporter.Corruption(
  190. record.size(), Status::Corruption("log record too small"));
  191. continue;
  192. }
  193. WriteBatchInternal::SetContents(&batch, record);
  194. status = WriteBatchInternal::InsertInto(&batch, mem);
  195. if (status.ok()) {
  196. counter += WriteBatchInternal::Count(&batch);
  197. } else {
  198. Log(options_.info_log, "Log #%llu: ignoring %s",
  199. (unsigned long long) log,
  200. status.ToString().c_str());
  201. status = Status::OK(); // Keep going with rest of file
  202. }
  203. }
  204. delete lfile;
  205. // Do not record a version edit for this conversion to a Table
  206. // since ExtractMetaData() will also generate edits.
  207. FileMetaData meta;
  208. meta.number = next_file_number_++;
  209. Iterator* iter = mem->NewIterator();
  210. status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
  211. delete iter;
  212. mem->Unref();
  213. mem = NULL;
  214. if (status.ok()) {
  215. if (meta.file_size > 0) {
  216. table_numbers_.push_back(meta.number);
  217. }
  218. }
  219. Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
  220. (unsigned long long) log,
  221. counter,
  222. (unsigned long long) meta.number,
  223. status.ToString().c_str());
  224. return status;
  225. }
  226. void ExtractMetaData() {
  227. std::vector<TableInfo> kept;
  228. for (size_t i = 0; i < table_numbers_.size(); i++) {
  229. TableInfo t;
  230. t.meta.number = table_numbers_[i];
  231. Status status = ScanTable(&t);
  232. if (!status.ok()) {
  233. std::string fname = TableFileName(dbname_, table_numbers_[i]);
  234. Log(options_.info_log, "Table #%llu: ignoring %s",
  235. (unsigned long long) table_numbers_[i],
  236. status.ToString().c_str());
  237. ArchiveFile(fname);
  238. } else {
  239. tables_.push_back(t);
  240. }
  241. }
  242. }
  243. Status ScanTable(TableInfo* t) {
  244. std::string fname = TableFileName(dbname_, t->meta.number);
  245. int counter = 0;
  246. Status status = env_->GetFileSize(fname, &t->meta.file_size);
  247. if (status.ok()) {
  248. Iterator* iter = table_cache_->NewIterator(
  249. ReadOptions(), t->meta.number, t->meta.file_size);
  250. bool empty = true;
  251. ParsedInternalKey parsed;
  252. t->max_sequence = 0;
  253. for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
  254. Slice key = iter->key();
  255. if (!ParseInternalKey(key, &parsed)) {
  256. Log(options_.info_log, "Table #%llu: unparsable key %s",
  257. (unsigned long long) t->meta.number,
  258. EscapeString(key).c_str());
  259. continue;
  260. }
  261. counter++;
  262. if (empty) {
  263. empty = false;
  264. t->meta.smallest.DecodeFrom(key);
  265. }
  266. t->meta.largest.DecodeFrom(key);
  267. if (parsed.sequence > t->max_sequence) {
  268. t->max_sequence = parsed.sequence;
  269. }
  270. }
  271. if (!iter->status().ok()) {
  272. status = iter->status();
  273. }
  274. delete iter;
  275. }
  276. Log(options_.info_log, "Table #%llu: %d entries %s",
  277. (unsigned long long) t->meta.number,
  278. counter,
  279. status.ToString().c_str());
  280. return status;
  281. }
  282. Status WriteDescriptor() {
  283. std::string tmp = TempFileName(dbname_, 1);
  284. WritableFile* file;
  285. Status status = env_->NewWritableFile(tmp, &file);
  286. if (!status.ok()) {
  287. return status;
  288. }
  289. SequenceNumber max_sequence = 0;
  290. for (size_t i = 0; i < tables_.size(); i++) {
  291. if (max_sequence < tables_[i].max_sequence) {
  292. max_sequence = tables_[i].max_sequence;
  293. }
  294. }
  295. edit_.SetComparatorName(icmp_.user_comparator()->Name());
  296. edit_.SetLogNumber(0);
  297. edit_.SetNextFile(next_file_number_);
  298. edit_.SetLastSequence(max_sequence);
  299. for (size_t i = 0; i < tables_.size(); i++) {
  300. // TODO(opt): separate out into multiple levels
  301. const TableInfo& t = tables_[i];
  302. edit_.AddFile(0, t.meta.number, t.meta.file_size,
  303. t.meta.smallest, t.meta.largest);
  304. }
  305. //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
  306. {
  307. log::Writer log(file);
  308. std::string record;
  309. edit_.EncodeTo(&record);
  310. status = log.AddRecord(record);
  311. }
  312. if (status.ok()) {
  313. status = file->Close();
  314. }
  315. delete file;
  316. file = NULL;
  317. if (!status.ok()) {
  318. env_->DeleteFile(tmp);
  319. } else {
  320. // Discard older manifests
  321. for (size_t i = 0; i < manifests_.size(); i++) {
  322. ArchiveFile(dbname_ + "/" + manifests_[i]);
  323. }
  324. // Install new manifest
  325. status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
  326. if (status.ok()) {
  327. status = SetCurrentFile(env_, dbname_, 1);
  328. } else {
  329. env_->DeleteFile(tmp);
  330. }
  331. }
  332. return status;
  333. }
  334. void ArchiveFile(const std::string& fname) {
  335. // Move into another directory. E.g., for
  336. // dir/foo
  337. // rename to
  338. // dir/lost/foo
  339. const char* slash = strrchr(fname.c_str(), '/');
  340. std::string new_dir;
  341. if (slash != NULL) {
  342. new_dir.assign(fname.data(), slash - fname.data());
  343. }
  344. new_dir.append("/lost");
  345. env_->CreateDir(new_dir); // Ignore error
  346. std::string new_file = new_dir;
  347. new_file.append("/");
  348. new_file.append((slash == NULL) ? fname.c_str() : slash + 1);
  349. Status s = env_->RenameFile(fname, new_file);
  350. Log(options_.info_log, "Archiving %s: %s\n",
  351. fname.c_str(), s.ToString().c_str());
  352. }
  353. };
  354. } // namespace
  355. Status RepairDB(const std::string& dbname, const Options& options) {
  356. Repairer repairer(dbname, options);
  357. return repairer.Run();
  358. }
  359. } // namespace leveldb