10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

1195 lignes
36 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_impl.h"
  5. #include <algorithm>
  6. #include <set>
  7. #include <string>
  8. #include <stdint.h>
  9. #include <stdio.h>
  10. #include <vector>
  11. #include "db/builder.h"
  12. #include "db/db_iter.h"
  13. #include "db/dbformat.h"
  14. #include "db/filename.h"
  15. #include "db/log_reader.h"
  16. #include "db/log_writer.h"
  17. #include "db/memtable.h"
  18. #include "db/table_cache.h"
  19. #include "db/version_set.h"
  20. #include "db/write_batch_internal.h"
  21. #include "include/db.h"
  22. #include "include/env.h"
  23. #include "include/status.h"
  24. #include "include/table.h"
  25. #include "include/table_builder.h"
  26. #include "port/port.h"
  27. #include "table/block.h"
  28. #include "table/merger.h"
  29. #include "table/two_level_iterator.h"
  30. #include "util/coding.h"
  31. #include "util/logging.h"
  32. #include "util/mutexlock.h"
  33. namespace leveldb {
  34. struct DBImpl::CompactionState {
  35. Compaction* const compaction;
  36. // Sequence numbers < smallest_snapshot are not significant since we
  37. // will never have to service a snapshot below smallest_snapshot.
  38. // Therefore if we have seen a sequence number S <= smallest_snapshot,
  39. // we can drop all entries for the same key with sequence numbers < S.
  40. SequenceNumber smallest_snapshot;
  41. // Files produced by compaction
  42. struct Output {
  43. uint64_t number;
  44. uint64_t file_size;
  45. InternalKey smallest, largest;
  46. };
  47. std::vector<Output> outputs;
  48. // State kept for output being generated
  49. WritableFile* outfile;
  50. TableBuilder* builder;
  51. uint64_t total_bytes;
  52. Output* current_output() { return &outputs[outputs.size()-1]; }
  53. explicit CompactionState(Compaction* c)
  54. : compaction(c),
  55. outfile(NULL),
  56. builder(NULL),
  57. total_bytes(0) {
  58. }
  59. };
  60. namespace {
  61. class NullWritableFile : public WritableFile {
  62. public:
  63. virtual Status Append(const Slice& data) { return Status::OK(); }
  64. virtual Status Close() { return Status::OK(); }
  65. virtual Status Flush() { return Status::OK(); }
  66. virtual Status Sync() { return Status::OK(); }
  67. };
  68. }
  69. // Fix user-supplied options to be reasonable
  70. template <class T,class V>
  71. static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  72. if (*ptr > maxvalue) *ptr = maxvalue;
  73. if (*ptr < minvalue) *ptr = minvalue;
  74. }
  75. Options SanitizeOptions(const std::string& dbname,
  76. const InternalKeyComparator* icmp,
  77. const Options& src) {
  78. Options result = src;
  79. result.comparator = icmp;
  80. ClipToRange(&result.max_open_files, 20, 50000);
  81. ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
  82. ClipToRange(&result.large_value_threshold, 16<<10, 1<<30);
  83. ClipToRange(&result.block_size, 1<<10, 4<<20);
  84. if (result.info_log == NULL) {
  85. // Open a log file in the same directory as the db
  86. src.env->CreateDir(dbname); // In case it does not exist
  87. src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
  88. Status s = src.env->NewWritableFile(InfoLogFileName(dbname),
  89. &result.info_log);
  90. if (!s.ok()) {
  91. // No place suitable for logging
  92. result.info_log = new NullWritableFile;
  93. }
  94. }
  95. return result;
  96. }
  97. DBImpl::DBImpl(const Options& options, const std::string& dbname)
  98. : env_(options.env),
  99. internal_comparator_(options.comparator),
  100. options_(SanitizeOptions(dbname, &internal_comparator_, options)),
  101. owns_info_log_(options_.info_log != options.info_log),
  102. dbname_(dbname),
  103. db_lock_(NULL),
  104. shutting_down_(NULL),
  105. bg_cv_(&mutex_),
  106. compacting_cv_(&mutex_),
  107. last_sequence_(0),
  108. mem_(new MemTable(internal_comparator_)),
  109. logfile_(NULL),
  110. log_(NULL),
  111. log_number_(0),
  112. bg_compaction_scheduled_(false),
  113. compacting_(false) {
  114. // Reserve ten files or so for other uses and give the rest to TableCache.
  115. const int table_cache_size = options.max_open_files - 10;
  116. table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
  117. versions_ = new VersionSet(dbname_, &options_, table_cache_,
  118. &internal_comparator_);
  119. }
  120. DBImpl::~DBImpl() {
  121. // Wait for background work to finish
  122. mutex_.Lock();
  123. shutting_down_.Release_Store(this); // Any non-NULL value is ok
  124. if (bg_compaction_scheduled_) {
  125. while (bg_compaction_scheduled_) {
  126. bg_cv_.Wait();
  127. }
  128. }
  129. mutex_.Unlock();
  130. if (db_lock_ != NULL) {
  131. env_->UnlockFile(db_lock_);
  132. }
  133. delete versions_;
  134. delete mem_;
  135. delete log_;
  136. delete logfile_;
  137. delete table_cache_;
  138. if (owns_info_log_) {
  139. delete options_.info_log;
  140. }
  141. }
  142. Status DBImpl::NewDB() {
  143. assert(log_number_ == 0);
  144. assert(last_sequence_ == 0);
  145. VersionEdit new_db;
  146. new_db.SetComparatorName(user_comparator()->Name());
  147. new_db.SetLogNumber(log_number_);
  148. new_db.SetNextFile(2);
  149. new_db.SetLastSequence(0);
  150. const std::string manifest = DescriptorFileName(dbname_, 1);
  151. WritableFile* file;
  152. Status s = env_->NewWritableFile(manifest, &file);
  153. if (!s.ok()) {
  154. return s;
  155. }
  156. {
  157. log::Writer log(file);
  158. std::string record;
  159. new_db.EncodeTo(&record);
  160. s = log.AddRecord(record);
  161. if (s.ok()) {
  162. s = file->Close();
  163. }
  164. }
  165. delete file;
  166. if (s.ok()) {
  167. // Make "CURRENT" file that points to the new manifest file.
  168. s = SetCurrentFile(env_, dbname_, 1);
  169. } else {
  170. env_->DeleteFile(manifest);
  171. }
  172. return s;
  173. }
  174. Status DBImpl::Install(VersionEdit* edit,
  175. uint64_t new_log_number,
  176. MemTable* cleanup_mem) {
  177. mutex_.AssertHeld();
  178. edit->SetLogNumber(new_log_number);
  179. edit->SetLastSequence(last_sequence_);
  180. return versions_->LogAndApply(edit, cleanup_mem);
  181. }
  182. void DBImpl::MaybeIgnoreError(Status* s) const {
  183. if (s->ok() || options_.paranoid_checks) {
  184. // No change needed
  185. } else {
  186. Log(env_, options_.info_log, "Ignoring error %s", s->ToString().c_str());
  187. *s = Status::OK();
  188. }
  189. }
  190. void DBImpl::DeleteObsoleteFiles() {
  191. // Make a set of all of the live files
  192. std::set<uint64_t> live = pending_outputs_;
  193. versions_->AddLiveFiles(&live);
  194. versions_->CleanupLargeValueRefs(live, log_number_);
  195. std::vector<std::string> filenames;
  196. env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
  197. uint64_t number;
  198. LargeValueRef large_ref;
  199. FileType type;
  200. for (int i = 0; i < filenames.size(); i++) {
  201. if (ParseFileName(filenames[i], &number, &large_ref, &type)) {
  202. bool keep = true;
  203. switch (type) {
  204. case kLogFile:
  205. keep = (number == log_number_);
  206. break;
  207. case kDescriptorFile:
  208. // Keep my manifest file, and any newer incarnations'
  209. // (in case there is a race that allows other incarnations)
  210. keep = (number >= versions_->ManifestFileNumber());
  211. break;
  212. case kTableFile:
  213. keep = (live.find(number) != live.end());
  214. break;
  215. case kTempFile:
  216. // Any temp files that are currently being written to must
  217. // be recorded in pending_outputs_, which is inserted into "live"
  218. keep = (live.find(number) != live.end());
  219. break;
  220. case kLargeValueFile:
  221. keep = versions_->LargeValueIsLive(large_ref);
  222. break;
  223. case kCurrentFile:
  224. case kDBLockFile:
  225. case kInfoLogFile:
  226. keep = true;
  227. break;
  228. }
  229. if (!keep) {
  230. if (type == kTableFile) {
  231. table_cache_->Evict(number);
  232. }
  233. Log(env_, options_.info_log, "Delete type=%d #%lld\n",
  234. int(type),
  235. static_cast<unsigned long long>(number));
  236. env_->DeleteFile(dbname_ + "/" + filenames[i]);
  237. }
  238. }
  239. }
  240. }
  241. Status DBImpl::Recover(VersionEdit* edit) {
  242. mutex_.AssertHeld();
  243. // Ignore error from CreateDir since the creation of the DB is
  244. // committed only when the descriptor is created, and this directory
  245. // may already exist from a previous failed creation attempt.
  246. env_->CreateDir(dbname_);
  247. assert(db_lock_ == NULL);
  248. Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  249. if (!s.ok()) {
  250. return s;
  251. }
  252. if (!env_->FileExists(CurrentFileName(dbname_))) {
  253. if (options_.create_if_missing) {
  254. s = NewDB();
  255. if (!s.ok()) {
  256. return s;
  257. }
  258. } else {
  259. return Status::InvalidArgument(
  260. dbname_, "does not exist (create_if_missing is false)");
  261. }
  262. } else {
  263. if (options_.error_if_exists) {
  264. return Status::InvalidArgument(
  265. dbname_, "exists (error_if_exists is true)");
  266. }
  267. }
  268. s = versions_->Recover(&log_number_, &last_sequence_);
  269. if (s.ok()) {
  270. // Recover from the log file named in the descriptor
  271. SequenceNumber max_sequence(0);
  272. if (log_number_ != 0) { // log_number_ == 0 indicates initial empty state
  273. s = RecoverLogFile(log_number_, edit, &max_sequence);
  274. }
  275. if (s.ok()) {
  276. last_sequence_ =
  277. last_sequence_ > max_sequence ? last_sequence_ : max_sequence;
  278. }
  279. }
  280. return s;
  281. }
  282. Status DBImpl::RecoverLogFile(uint64_t log_number,
  283. VersionEdit* edit,
  284. SequenceNumber* max_sequence) {
  285. struct LogReporter : public log::Reader::Reporter {
  286. Env* env;
  287. WritableFile* info_log;
  288. const char* fname;
  289. Status* status; // NULL if options_.paranoid_checks==false
  290. virtual void Corruption(size_t bytes, const Status& s) {
  291. Log(env, info_log, "%s%s: dropping %d bytes; %s",
  292. (this->status == NULL ? "(ignoring error) " : ""),
  293. fname, static_cast<int>(bytes), s.ToString().c_str());
  294. if (this->status != NULL && this->status->ok()) *this->status = s;
  295. }
  296. };
  297. mutex_.AssertHeld();
  298. // Open the log file
  299. std::string fname = LogFileName(dbname_, log_number);
  300. SequentialFile* file;
  301. Status status = env_->NewSequentialFile(fname, &file);
  302. if (!status.ok()) {
  303. MaybeIgnoreError(&status);
  304. return status;
  305. }
  306. // Create the log reader.
  307. LogReporter reporter;
  308. reporter.env = env_;
  309. reporter.info_log = options_.info_log;
  310. reporter.fname = fname.c_str();
  311. reporter.status = (options_.paranoid_checks ? &status : NULL);
  312. // We intentially make log::Reader do checksumming even if
  313. // paranoid_checks==false so that corruptions cause entire commits
  314. // to be skipped instead of propagating bad information (like overly
  315. // large sequence numbers).
  316. log::Reader reader(file, &reporter, true/*checksum*/);
  317. Log(env_, options_.info_log, "Recovering log #%llu",
  318. (unsigned long long) log_number);
  319. // Read all the records and add to a memtable
  320. std::string scratch;
  321. Slice record;
  322. WriteBatch batch;
  323. MemTable* mem = NULL;
  324. while (reader.ReadRecord(&record, &scratch) &&
  325. status.ok()) {
  326. if (record.size() < 12) {
  327. reporter.Corruption(
  328. record.size(), Status::Corruption("log record too small"));
  329. continue;
  330. }
  331. WriteBatchInternal::SetContents(&batch, record);
  332. if (mem == NULL) {
  333. mem = new MemTable(internal_comparator_);
  334. }
  335. status = WriteBatchInternal::InsertInto(&batch, mem);
  336. MaybeIgnoreError(&status);
  337. if (!status.ok()) {
  338. break;
  339. }
  340. const SequenceNumber last_seq =
  341. WriteBatchInternal::Sequence(&batch) +
  342. WriteBatchInternal::Count(&batch) - 1;
  343. if (last_seq > *max_sequence) {
  344. *max_sequence = last_seq;
  345. }
  346. if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
  347. status = WriteLevel0Table(mem, edit);
  348. if (!status.ok()) {
  349. // Reflect errors immediately so that conditions like full
  350. // file-systems cause the DB::Open() to fail.
  351. break;
  352. }
  353. delete mem;
  354. mem = NULL;
  355. }
  356. }
  357. if (status.ok() && mem != NULL) {
  358. status = WriteLevel0Table(mem, edit);
  359. // Reflect errors immediately so that conditions like full
  360. // file-systems cause the DB::Open() to fail.
  361. }
  362. delete mem;
  363. delete file;
  364. return status;
  365. }
  366. Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
  367. mutex_.AssertHeld();
  368. FileMetaData meta;
  369. meta.number = versions_->NewFileNumber();
  370. pending_outputs_.insert(meta.number);
  371. Iterator* iter = mem->NewIterator();
  372. Log(env_, options_.info_log, "Level-0 table #%llu: started",
  373. (unsigned long long) meta.number);
  374. Status s = BuildTable(dbname_, env_, options_, table_cache_,
  375. iter, &meta, edit);
  376. Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
  377. (unsigned long long) meta.number,
  378. (unsigned long long) meta.file_size,
  379. s.ToString().c_str());
  380. delete iter;
  381. pending_outputs_.erase(meta.number);
  382. return s;
  383. }
  384. Status DBImpl::CompactMemTable() {
  385. mutex_.AssertHeld();
  386. WritableFile* lfile = NULL;
  387. uint64_t new_log_number = versions_->NewFileNumber();
  388. VersionEdit edit;
  389. // Save the contents of the memtable as a new Table
  390. Status s = WriteLevel0Table(mem_, &edit);
  391. if (s.ok()) {
  392. s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  393. }
  394. // Save a new descriptor with the new table and log number.
  395. if (s.ok()) {
  396. s = Install(&edit, new_log_number, mem_);
  397. }
  398. if (s.ok()) {
  399. // Commit to the new state
  400. mem_ = new MemTable(internal_comparator_);
  401. delete log_;
  402. delete logfile_;
  403. logfile_ = lfile;
  404. log_ = new log::Writer(lfile);
  405. log_number_ = new_log_number;
  406. DeleteObsoleteFiles();
  407. MaybeScheduleCompaction();
  408. } else {
  409. delete lfile;
  410. env_->DeleteFile(LogFileName(dbname_, new_log_number));
  411. }
  412. return s;
  413. }
  414. void DBImpl::TEST_CompactRange(
  415. int level,
  416. const std::string& begin,
  417. const std::string& end) {
  418. MutexLock l(&mutex_);
  419. while (compacting_) {
  420. compacting_cv_.Wait();
  421. }
  422. Compaction* c = versions_->CompactRange(
  423. level,
  424. InternalKey(begin, kMaxSequenceNumber, kValueTypeForSeek),
  425. InternalKey(end, 0, static_cast<ValueType>(0)));
  426. if (c != NULL) {
  427. CompactionState* compact = new CompactionState(c);
  428. DoCompactionWork(compact); // Ignore error in test compaction
  429. CleanupCompaction(compact);
  430. }
  431. // Start any background compaction that may have been delayed by this thread
  432. MaybeScheduleCompaction();
  433. }
  434. Status DBImpl::TEST_CompactMemTable() {
  435. MutexLock l(&mutex_);
  436. return CompactMemTable();
  437. }
  438. void DBImpl::MaybeScheduleCompaction() {
  439. mutex_.AssertHeld();
  440. if (bg_compaction_scheduled_) {
  441. // Already scheduled
  442. } else if (compacting_) {
  443. // Some other thread is running a compaction. Do not conflict with it.
  444. } else if (shutting_down_.Acquire_Load()) {
  445. // DB is being deleted; no more background compactions
  446. } else if (!versions_->NeedsCompaction()) {
  447. // No work to be done
  448. } else {
  449. bg_compaction_scheduled_ = true;
  450. env_->Schedule(&DBImpl::BGWork, this);
  451. }
  452. }
  453. void DBImpl::BGWork(void* db) {
  454. reinterpret_cast<DBImpl*>(db)->BackgroundCall();
  455. }
  456. void DBImpl::BackgroundCall() {
  457. MutexLock l(&mutex_);
  458. assert(bg_compaction_scheduled_);
  459. if (!shutting_down_.Acquire_Load() &&
  460. !compacting_) {
  461. BackgroundCompaction();
  462. }
  463. bg_compaction_scheduled_ = false;
  464. bg_cv_.SignalAll();
  465. // Previous compaction may have produced too many files in a level,
  466. // so reschedule another compaction if needed.
  467. MaybeScheduleCompaction();
  468. }
  469. void DBImpl::BackgroundCompaction() {
  470. mutex_.AssertHeld();
  471. Compaction* c = versions_->PickCompaction();
  472. if (c == NULL) {
  473. // Nothing to do
  474. return;
  475. }
  476. Status status;
  477. if (c->num_input_files(0) == 1 && c->num_input_files(1) == 0) {
  478. // Move file to next level
  479. FileMetaData* f = c->input(0, 0);
  480. c->edit()->DeleteFile(c->level(), f->number);
  481. c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
  482. f->smallest, f->largest);
  483. status = Install(c->edit(), log_number_, NULL);
  484. Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
  485. static_cast<unsigned long long>(f->number),
  486. c->level() + 1,
  487. static_cast<unsigned long long>(f->file_size),
  488. status.ToString().c_str());
  489. } else {
  490. CompactionState* compact = new CompactionState(c);
  491. status = DoCompactionWork(compact);
  492. CleanupCompaction(compact);
  493. }
  494. delete c;
  495. if (status.ok()) {
  496. // Done
  497. } else if (shutting_down_.Acquire_Load()) {
  498. // Ignore compaction errors found during shutting down
  499. } else {
  500. Log(env_, options_.info_log,
  501. "Compaction error: %s", status.ToString().c_str());
  502. if (options_.paranoid_checks && bg_error_.ok()) {
  503. bg_error_ = status;
  504. }
  505. }
  506. }
  507. void DBImpl::CleanupCompaction(CompactionState* compact) {
  508. mutex_.AssertHeld();
  509. if (compact->builder != NULL) {
  510. // May happen if we get a shutdown call in the middle of compaction
  511. compact->builder->Abandon();
  512. delete compact->builder;
  513. } else {
  514. assert(compact->outfile == NULL);
  515. }
  516. delete compact->outfile;
  517. for (int i = 0; i < compact->outputs.size(); i++) {
  518. const CompactionState::Output& out = compact->outputs[i];
  519. pending_outputs_.erase(out.number);
  520. }
  521. delete compact;
  522. }
  523. Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
  524. assert(compact != NULL);
  525. assert(compact->builder == NULL);
  526. uint64_t file_number;
  527. {
  528. mutex_.Lock();
  529. file_number = versions_->NewFileNumber();
  530. pending_outputs_.insert(file_number);
  531. CompactionState::Output out;
  532. out.number = file_number;
  533. out.smallest.Clear();
  534. out.largest.Clear();
  535. compact->outputs.push_back(out);
  536. mutex_.Unlock();
  537. }
  538. // Make the output file
  539. std::string fname = TableFileName(dbname_, file_number);
  540. Status s = env_->NewWritableFile(fname, &compact->outfile);
  541. if (s.ok()) {
  542. compact->builder = new TableBuilder(options_, compact->outfile);
  543. }
  544. return s;
  545. }
  546. Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
  547. Iterator* input) {
  548. assert(compact != NULL);
  549. assert(compact->outfile != NULL);
  550. assert(compact->builder != NULL);
  551. const uint64_t output_number = compact->current_output()->number;
  552. assert(output_number != 0);
  553. // Check for iterator errors
  554. Status s = input->status();
  555. const uint64_t current_entries = compact->builder->NumEntries();
  556. if (s.ok()) {
  557. s = compact->builder->Finish();
  558. } else {
  559. compact->builder->Abandon();
  560. }
  561. const uint64_t current_bytes = compact->builder->FileSize();
  562. compact->current_output()->file_size = current_bytes;
  563. compact->total_bytes += current_bytes;
  564. delete compact->builder;
  565. compact->builder = NULL;
  566. // Finish and check for file errors
  567. if (s.ok()) {
  568. s = compact->outfile->Sync();
  569. }
  570. if (s.ok()) {
  571. s = compact->outfile->Close();
  572. }
  573. delete compact->outfile;
  574. compact->outfile = NULL;
  575. if (s.ok() && current_entries > 0) {
  576. // Verify that the table is usable
  577. Iterator* iter = table_cache_->NewIterator(ReadOptions(),output_number);
  578. s = iter->status();
  579. delete iter;
  580. if (s.ok()) {
  581. Log(env_, options_.info_log,
  582. "Generated table #%llu: %lld keys, %lld bytes",
  583. (unsigned long long) output_number,
  584. (unsigned long long) current_entries,
  585. (unsigned long long) current_bytes);
  586. }
  587. }
  588. return s;
  589. }
  590. Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  591. mutex_.AssertHeld();
  592. Log(env_, options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
  593. compact->compaction->num_input_files(0),
  594. compact->compaction->level(),
  595. compact->compaction->num_input_files(1),
  596. compact->compaction->level() + 1,
  597. static_cast<long long>(compact->total_bytes));
  598. // Add compaction outputs
  599. compact->compaction->AddInputDeletions(compact->compaction->edit());
  600. const int level = compact->compaction->level();
  601. for (int i = 0; i < compact->outputs.size(); i++) {
  602. const CompactionState::Output& out = compact->outputs[i];
  603. compact->compaction->edit()->AddFile(
  604. level + 1,
  605. out.number, out.file_size, out.smallest, out.largest);
  606. pending_outputs_.erase(out.number);
  607. }
  608. compact->outputs.clear();
  609. Status s = Install(compact->compaction->edit(), log_number_, NULL);
  610. if (s.ok()) {
  611. compact->compaction->ReleaseInputs();
  612. DeleteObsoleteFiles();
  613. } else {
  614. // Discard any files we may have created during this failed compaction
  615. for (int i = 0; i < compact->outputs.size(); i++) {
  616. env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
  617. }
  618. }
  619. return s;
  620. }
  621. Status DBImpl::DoCompactionWork(CompactionState* compact) {
  622. Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
  623. compact->compaction->num_input_files(0),
  624. compact->compaction->level(),
  625. compact->compaction->num_input_files(1),
  626. compact->compaction->level() + 1);
  627. assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
  628. assert(compact->builder == NULL);
  629. assert(compact->outfile == NULL);
  630. if (snapshots_.empty()) {
  631. compact->smallest_snapshot = last_sequence_;
  632. } else {
  633. compact->smallest_snapshot = snapshots_.oldest()->number_;
  634. }
  635. // Release mutex while we're actually doing the compaction work
  636. compacting_ = true;
  637. mutex_.Unlock();
  638. Iterator* input = versions_->MakeInputIterator(compact->compaction);
  639. input->SeekToFirst();
  640. Status status;
  641. ParsedInternalKey ikey;
  642. std::string current_user_key;
  643. bool has_current_user_key = false;
  644. SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  645. for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
  646. // Handle key/value, add to state, etc.
  647. Slice key = input->key();
  648. bool drop = false;
  649. if (!ParseInternalKey(key, &ikey)) {
  650. // Do not hide error keys
  651. current_user_key.clear();
  652. has_current_user_key = false;
  653. last_sequence_for_key = kMaxSequenceNumber;
  654. } else {
  655. if (!has_current_user_key ||
  656. user_comparator()->Compare(ikey.user_key,
  657. Slice(current_user_key)) != 0) {
  658. // First occurrence of this user key
  659. current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
  660. has_current_user_key = true;
  661. last_sequence_for_key = kMaxSequenceNumber;
  662. }
  663. if (last_sequence_for_key <= compact->smallest_snapshot) {
  664. // Hidden by an newer entry for same user key
  665. drop = true; // (A)
  666. } else if (ikey.type == kTypeDeletion &&
  667. ikey.sequence <= compact->smallest_snapshot &&
  668. compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
  669. // For this user key:
  670. // (1) there is no data in higher levels
  671. // (2) data in lower levels will have larger sequence numbers
  672. // (3) data in layers that are being compacted here and have
  673. // smaller sequence numbers will be dropped in the next
  674. // few iterations of this loop (by rule (A) above).
  675. // Therefore this deletion marker is obsolete and can be dropped.
  676. drop = true;
  677. }
  678. last_sequence_for_key = ikey.sequence;
  679. }
  680. #if 0
  681. Log(env_, options_.info_log,
  682. " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
  683. "%d smallest_snapshot: %d",
  684. ikey.user_key.ToString().c_str(),
  685. (int)ikey.sequence, ikey.type, kTypeLargeValueRef, drop,
  686. compact->compaction->IsBaseLevelForKey(ikey.user_key),
  687. (int)last_sequence_for_key, (int)compact->smallest_snapshot);
  688. #endif
  689. if (!drop) {
  690. // Open output file if necessary
  691. if (compact->builder == NULL) {
  692. status = OpenCompactionOutputFile(compact);
  693. if (!status.ok()) {
  694. break;
  695. }
  696. }
  697. if (compact->builder->NumEntries() == 0) {
  698. compact->current_output()->smallest.DecodeFrom(key);
  699. }
  700. compact->current_output()->largest.DecodeFrom(key);
  701. if (ikey.type == kTypeLargeValueRef) {
  702. if (input->value().size() != LargeValueRef::ByteSize()) {
  703. if (options_.paranoid_checks) {
  704. status = Status::Corruption("invalid large value ref");
  705. break;
  706. } else {
  707. Log(env_, options_.info_log,
  708. "compaction found invalid large value ref");
  709. }
  710. } else {
  711. compact->compaction->edit()->AddLargeValueRef(
  712. LargeValueRef::FromRef(input->value()),
  713. compact->current_output()->number,
  714. input->key());
  715. compact->builder->Add(key, input->value());
  716. }
  717. } else {
  718. compact->builder->Add(key, input->value());
  719. }
  720. // Close output file if it is big enough
  721. if (compact->builder->FileSize() >=
  722. compact->compaction->MaxOutputFileSize()) {
  723. status = FinishCompactionOutputFile(compact, input);
  724. if (!status.ok()) {
  725. break;
  726. }
  727. }
  728. }
  729. input->Next();
  730. }
  731. if (status.ok() && shutting_down_.Acquire_Load()) {
  732. status = Status::IOError("Deleting DB during compaction");
  733. }
  734. if (status.ok() && compact->builder != NULL) {
  735. status = FinishCompactionOutputFile(compact, input);
  736. }
  737. if (status.ok()) {
  738. status = input->status();
  739. }
  740. delete input;
  741. input = NULL;
  742. mutex_.Lock();
  743. if (status.ok()) {
  744. status = InstallCompactionResults(compact);
  745. }
  746. compacting_ = false;
  747. compacting_cv_.SignalAll();
  748. return status;
  749. }
  750. Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
  751. SequenceNumber* latest_snapshot) {
  752. mutex_.Lock();
  753. *latest_snapshot = last_sequence_;
  754. // Collect together all needed child iterators
  755. std::vector<Iterator*> list;
  756. list.push_back(mem_->NewIterator());
  757. versions_->current()->AddIterators(options, &list);
  758. Iterator* internal_iter =
  759. NewMergingIterator(&internal_comparator_, &list[0], list.size());
  760. versions_->current()->Ref();
  761. internal_iter->RegisterCleanup(&DBImpl::Unref, this, versions_->current());
  762. mutex_.Unlock();
  763. return internal_iter;
  764. }
  765. Iterator* DBImpl::TEST_NewInternalIterator() {
  766. SequenceNumber ignored;
  767. return NewInternalIterator(ReadOptions(), &ignored);
  768. }
  769. Status DBImpl::Get(const ReadOptions& options,
  770. const Slice& key,
  771. std::string* value) {
  772. // TODO(opt): faster implementation
  773. Iterator* iter = NewIterator(options);
  774. iter->Seek(key);
  775. bool found = false;
  776. if (iter->Valid() && user_comparator()->Compare(key, iter->key()) == 0) {
  777. Slice v = iter->value();
  778. value->assign(v.data(), v.size());
  779. found = true;
  780. }
  781. // Non-OK iterator status trumps everything else
  782. Status result = iter->status();
  783. if (result.ok() && !found) {
  784. result = Status::NotFound(Slice()); // Use an empty error message for speed
  785. }
  786. delete iter;
  787. return result;
  788. }
  789. Iterator* DBImpl::NewIterator(const ReadOptions& options) {
  790. SequenceNumber latest_snapshot;
  791. Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
  792. SequenceNumber sequence =
  793. (options.snapshot ? options.snapshot->number_ : latest_snapshot);
  794. return NewDBIterator(&dbname_, env_,
  795. user_comparator(), internal_iter, sequence);
  796. }
  797. void DBImpl::Unref(void* arg1, void* arg2) {
  798. DBImpl* impl = reinterpret_cast<DBImpl*>(arg1);
  799. Version* v = reinterpret_cast<Version*>(arg2);
  800. MutexLock l(&impl->mutex_);
  801. v->Unref();
  802. }
  803. const Snapshot* DBImpl::GetSnapshot() {
  804. MutexLock l(&mutex_);
  805. return snapshots_.New(last_sequence_);
  806. }
  807. void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  808. MutexLock l(&mutex_);
  809. snapshots_.Delete(s);
  810. }
  811. // Convenience methods
  812. Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  813. return DB::Put(o, key, val);
  814. }
  815. Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
  816. return DB::Delete(options, key);
  817. }
  818. Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  819. Status status;
  820. WriteBatch* final = NULL;
  821. {
  822. MutexLock l(&mutex_);
  823. if (!bg_error_.ok()) {
  824. status = bg_error_;
  825. } else if (mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
  826. status = CompactMemTable();
  827. }
  828. if (status.ok()) {
  829. status = HandleLargeValues(last_sequence_ + 1, updates, &final);
  830. }
  831. if (status.ok()) {
  832. WriteBatchInternal::SetSequence(final, last_sequence_ + 1);
  833. last_sequence_ += WriteBatchInternal::Count(final);
  834. // Add to log and apply to memtable
  835. status = log_->AddRecord(WriteBatchInternal::Contents(final));
  836. if (status.ok() && options.sync) {
  837. status = logfile_->Sync();
  838. }
  839. if (status.ok()) {
  840. status = WriteBatchInternal::InsertInto(final, mem_);
  841. }
  842. }
  843. if (options.post_write_snapshot != NULL) {
  844. *options.post_write_snapshot =
  845. status.ok() ? snapshots_.New(last_sequence_) : NULL;
  846. }
  847. }
  848. if (final != updates) {
  849. delete final;
  850. }
  851. return status;
  852. }
  853. bool DBImpl::HasLargeValues(const WriteBatch& batch) const {
  854. if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) {
  855. for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) {
  856. if (it.op() == kTypeValue &&
  857. it.value().size() >= options_.large_value_threshold) {
  858. return true;
  859. }
  860. }
  861. }
  862. return false;
  863. }
  864. // Given "raw_value", determines the appropriate compression format to use
  865. // and stores the data that should be written to the large value file in
  866. // "*file_bytes", and sets "*ref" to the appropriate large value reference.
  867. // May use "*scratch" as backing store for "*file_bytes".
  868. void DBImpl::MaybeCompressLargeValue(
  869. const Slice& raw_value,
  870. Slice* file_bytes,
  871. std::string* scratch,
  872. LargeValueRef* ref) {
  873. switch (options_.compression) {
  874. case kLightweightCompression: {
  875. port::Lightweight_Compress(raw_value.data(), raw_value.size(), scratch);
  876. if (scratch->size() < (raw_value.size() / 8) * 7) {
  877. *file_bytes = *scratch;
  878. *ref = LargeValueRef::Make(raw_value, kLightweightCompression);
  879. return;
  880. }
  881. // Less than 12.5% compression: just leave as uncompressed data
  882. break;
  883. }
  884. case kNoCompression:
  885. // Use default code outside of switch
  886. break;
  887. }
  888. // Store as uncompressed data
  889. *file_bytes = raw_value;
  890. *ref = LargeValueRef::Make(raw_value, kNoCompression);
  891. }
  892. Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
  893. WriteBatch* updates,
  894. WriteBatch** final) {
  895. if (!HasLargeValues(*updates)) {
  896. // Fast path: no large values found
  897. *final = updates;
  898. } else {
  899. // Copy *updates to a new WriteBatch, replacing the references to
  900. *final = new WriteBatch;
  901. SequenceNumber seq = assigned_seq;
  902. for (WriteBatchInternal::Iterator it(*updates); !it.Done(); it.Next()) {
  903. switch (it.op()) {
  904. case kTypeValue:
  905. if (it.value().size() < options_.large_value_threshold) {
  906. (*final)->Put(it.key(), it.value());
  907. } else {
  908. std::string scratch;
  909. Slice file_bytes;
  910. LargeValueRef large_ref;
  911. MaybeCompressLargeValue(
  912. it.value(), &file_bytes, &scratch, &large_ref);
  913. InternalKey ikey(it.key(), seq, kTypeLargeValueRef);
  914. if (versions_->RegisterLargeValueRef(large_ref, log_number_,ikey)) {
  915. // TODO(opt): avoid holding the lock here (but be careful about
  916. // another thread doing a Write and changing log_number_ or
  917. // having us get a different "assigned_seq" value).
  918. uint64_t tmp_number = versions_->NewFileNumber();
  919. pending_outputs_.insert(tmp_number);
  920. std::string tmp = TempFileName(dbname_, tmp_number);
  921. WritableFile* file;
  922. Status s = env_->NewWritableFile(tmp, &file);
  923. if (!s.ok()) {
  924. return s; // Caller will delete *final
  925. }
  926. file->Append(file_bytes);
  927. s = file->Close();
  928. delete file;
  929. if (s.ok()) {
  930. const std::string fname =
  931. LargeValueFileName(dbname_, large_ref);
  932. s = env_->RenameFile(tmp, fname);
  933. } else {
  934. Log(env_, options_.info_log, "Write large value: %s",
  935. s.ToString().c_str());
  936. }
  937. pending_outputs_.erase(tmp_number);
  938. if (!s.ok()) {
  939. env_->DeleteFile(tmp); // Cleanup; intentionally ignoring error
  940. return s; // Caller will delete *final
  941. }
  942. }
  943. // Put an indirect reference in the write batch in place
  944. // of large value
  945. WriteBatchInternal::PutLargeValueRef(*final, it.key(), large_ref);
  946. }
  947. break;
  948. case kTypeLargeValueRef:
  949. return Status::Corruption("Corrupted write batch");
  950. break;
  951. case kTypeDeletion:
  952. (*final)->Delete(it.key());
  953. break;
  954. }
  955. seq = seq + 1;
  956. }
  957. }
  958. return Status::OK();
  959. }
  960. bool DBImpl::GetProperty(const Slice& property, uint64_t* value) {
  961. MutexLock l(&mutex_);
  962. Slice in = property;
  963. Slice prefix("leveldb.");
  964. if (!in.starts_with(prefix)) return false;
  965. in.remove_prefix(prefix.size());
  966. if (in.starts_with("num-files-at-level")) {
  967. in.remove_prefix(strlen("num-files-at-level"));
  968. uint64_t level;
  969. bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
  970. if (!ok || level < 0 || level >= config::kNumLevels) {
  971. return false;
  972. } else {
  973. *value = versions_->NumLevelFiles(level);
  974. return true;
  975. }
  976. }
  977. return false;
  978. }
  979. void DBImpl::GetApproximateSizes(
  980. const Range* range, int n,
  981. uint64_t* sizes) {
  982. // TODO(opt): better implementation
  983. Version* v;
  984. {
  985. MutexLock l(&mutex_);
  986. versions_->current()->Ref();
  987. v = versions_->current();
  988. }
  989. for (int i = 0; i < n; i++) {
  990. // Convert user_key into a corresponding internal key.
  991. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
  992. InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
  993. uint64_t start = versions_->ApproximateOffsetOf(v, k1);
  994. uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
  995. sizes[i] = (limit >= start ? limit - start : 0);
  996. }
  997. {
  998. MutexLock l(&mutex_);
  999. v->Unref();
  1000. }
  1001. }
  1002. // Default implementations of convenience methods that subclasses of DB
  1003. // can call if they wish
  1004. Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  1005. WriteBatch batch;
  1006. batch.Put(key, value);
  1007. return Write(opt, &batch);
  1008. }
  1009. Status DB::Delete(const WriteOptions& opt, const Slice& key) {
  1010. WriteBatch batch;
  1011. batch.Delete(key);
  1012. return Write(opt, &batch);
  1013. }
  1014. DB::~DB() { }
  1015. Status DB::Open(const Options& options, const std::string& dbname,
  1016. DB** dbptr) {
  1017. *dbptr = NULL;
  1018. DBImpl* impl = new DBImpl(options, dbname);
  1019. impl->mutex_.Lock();
  1020. VersionEdit edit;
  1021. Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
  1022. if (s.ok()) {
  1023. impl->log_number_ = impl->versions_->NewFileNumber();
  1024. WritableFile* lfile;
  1025. s = options.env->NewWritableFile(LogFileName(dbname, impl->log_number_),
  1026. &lfile);
  1027. if (s.ok()) {
  1028. impl->logfile_ = lfile;
  1029. impl->log_ = new log::Writer(lfile);
  1030. s = impl->Install(&edit, impl->log_number_, NULL);
  1031. }
  1032. if (s.ok()) {
  1033. impl->DeleteObsoleteFiles();
  1034. }
  1035. }
  1036. impl->mutex_.Unlock();
  1037. if (s.ok()) {
  1038. *dbptr = impl;
  1039. } else {
  1040. delete impl;
  1041. }
  1042. return s;
  1043. }
  1044. Status DestroyDB(const std::string& dbname, const Options& options) {
  1045. Env* env = options.env;
  1046. std::vector<std::string> filenames;
  1047. // Ignore error in case directory does not exist
  1048. env->GetChildren(dbname, &filenames);
  1049. if (filenames.empty()) {
  1050. return Status::OK();
  1051. }
  1052. FileLock* lock;
  1053. Status result = env->LockFile(LockFileName(dbname), &lock);
  1054. if (result.ok()) {
  1055. uint64_t number;
  1056. LargeValueRef large_ref;
  1057. FileType type;
  1058. for (int i = 0; i < filenames.size(); i++) {
  1059. if (ParseFileName(filenames[i], &number, &large_ref, &type)) {
  1060. Status del = env->DeleteFile(dbname + "/" + filenames[i]);
  1061. if (result.ok() && !del.ok()) {
  1062. result = del;
  1063. }
  1064. }
  1065. }
  1066. env->UnlockFile(lock); // Ignore error since state is already gone
  1067. env->DeleteFile(LockFileName(dbname));
  1068. env->DeleteDir(dbname); // Ignore error in case dir contains other files
  1069. }
  1070. return result;
  1071. }
  1072. }