diff --git a/db/db_impl.cc b/db/db_impl.cc index a9044c2..8484e46 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,12 +4,14 @@ #include "db/db_impl.h" +#include +#include + #include #include #include -#include -#include #include + #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -82,7 +84,7 @@ struct DBImpl::CompactionState { }; // Fix user-supplied options to be reasonable -template +template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast(*ptr) < minvalue) *ptr = minvalue; @@ -114,6 +116,11 @@ Options SanitizeOptions(const std::string& dbname, return result; } +static int TableCacheSize(const Options& sanitized_options) { + // Reserve ten files or so for other uses and give the rest to TableCache. + return sanitized_options.max_open_files - kNumNonTableCacheFiles; +} + DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) : env_(raw_options.env), internal_comparator_(raw_options.comparator), @@ -123,9 +130,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) owns_info_log_(options_.info_log != raw_options.info_log), owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), + table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), db_lock_(NULL), shutting_down_(NULL), - bg_cv_(&mutex_), + background_work_finished_signal_(&mutex_), mem_(NULL), imm_(NULL), logfile_(NULL), @@ -133,24 +141,19 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) log_(NULL), seed_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - manual_compaction_(NULL) { + background_compaction_scheduled_(false), + manual_compaction_(NULL), + versions_(new VersionSet(dbname_, &options_, table_cache_, + &internal_comparator_)) { has_imm_.Release_Store(NULL); - - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; - table_cache_ = new TableCache(dbname_, &options_, table_cache_size); - - versions_ = new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_); } DBImpl::~DBImpl() { // Wait for background work to finish mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); } mutex_.Unlock(); @@ -216,6 +219,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); + if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. @@ -227,7 +232,7 @@ void DBImpl::DeleteObsoleteFiles() { versions_->AddLiveFiles(&live); std::vector filenames; - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -263,7 +268,7 @@ void DBImpl::DeleteObsoleteFiles() { table_cache_->Evict(number); } Log(options_.info_log, "Delete type=%d #%lld\n", - int(type), + static_cast(type), static_cast(number)); env_->DeleteFile(dbname_ + "/" + filenames[i]); } @@ -575,13 +580,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } } } - TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } } -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { +void DBImpl::TEST_CompactRange(int level, const Slice* begin, + const Slice* end) { assert(level >= 0); assert(level + 1 < config::kNumLevels); @@ -609,7 +615,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { manual_compaction_ = &manual; MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } } if (manual_compaction_ == &manual) { @@ -625,7 +631,7 @@ Status DBImpl::TEST_CompactMemTable() { // Wait until the compaction completes MutexLock l(&mutex_); while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } if (imm_ != NULL) { s = bg_error_; @@ -638,13 +644,13 @@ void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { bg_error_ = s; - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } } void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { + if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions @@ -655,7 +661,7 @@ void DBImpl::MaybeScheduleCompaction() { !versions_->NeedsCompaction()) { // No work to be done } else { - bg_compaction_scheduled_ = true; + background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } } @@ -666,7 +672,7 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + assert(background_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { @@ -675,12 +681,12 @@ void DBImpl::BackgroundCall() { BackgroundCompaction(); } - bg_compaction_scheduled_ = false; + background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } void DBImpl::BackgroundCompaction() { @@ -920,7 +926,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { mutex_.Lock(); if (imm_ != NULL) { CompactMemTable(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); @@ -1267,6 +1274,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-NULL batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + mutex_.AssertHeld(); assert(!writers_.empty()); Writer* first = writers_.front(); WriteBatch* result = first->batch; @@ -1346,11 +1354,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); diff --git a/db/db_impl.h b/db/db_impl.h index 3861b86..6344112 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -84,7 +84,7 @@ class DBImpl : public DB { void MaybeIgnoreError(Status* s) const; // Delete any unneeded files and stale in-memory entries. - void DeleteObsoleteFiles(); + void DeleteObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. @@ -100,14 +100,15 @@ class DBImpl : public DB { Status MakeRoomForWrite(bool force /* compact even if there is room? */) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - WriteBatch* BuildBatchGroup(Writer** last_writer); + WriteBatch* BuildBatchGroup(Writer** last_writer) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -123,12 +124,12 @@ class DBImpl : public DB { const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; const Options options_; // options_.comparator == &internal_comparator_ - bool owns_info_log_; - bool owns_cache_; + const bool owns_info_log_; + const bool owns_cache_; const std::string dbname_; // table_cache_ provides its own synchronization - TableCache* table_cache_; + TableCache* const table_cache_; // Lock over the persistent DB state. Non-NULL iff successfully acquired. FileLock* db_lock_; @@ -136,27 +137,27 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when background work finishes + port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; - MemTable* imm_; // Memtable being compacted - port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ + MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted + port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ WritableFile* logfile_; - uint64_t logfile_number_; + uint64_t logfile_number_ GUARDED_BY(mutex_); log::Writer* log_; - uint32_t seed_; // For sampling. + uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. - std::deque writers_; - WriteBatch* tmp_batch_; + std::deque writers_ GUARDED_BY(mutex_); + WriteBatch* tmp_batch_ GUARDED_BY(mutex_); - SnapshotList snapshots_; + SnapshotList snapshots_ GUARDED_BY(mutex_); // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set pending_outputs_; + std::set pending_outputs_ GUARDED_BY(mutex_); // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; + bool background_compaction_scheduled_ GUARDED_BY(mutex_); // Information for a manual compaction struct ManualCompaction { @@ -166,12 +167,12 @@ class DBImpl : public DB { const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress }; - ManualCompaction* manual_compaction_; + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); - VersionSet* versions_; + VersionSet* const versions_; // Have we encountered a background error in paranoid mode? - Status bg_error_; + Status bg_error_ GUARDED_BY(mutex_); // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". @@ -188,7 +189,7 @@ class DBImpl : public DB { this->bytes_written += c.bytes_written; } }; - CompactionStats stats_[config::kNumLevels]; + CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); // No copying allowed DBImpl(const DBImpl&); diff --git a/db/memtable.h b/db/memtable.h index 9f41567..f2a6736 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -14,7 +14,6 @@ namespace leveldb { class InternalKeyComparator; -class Mutex; class MemTableIterator; class MemTable { diff --git a/db/repair.cc b/db/repair.cc index 4cd4bb0..c10da82 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -54,7 +54,7 @@ class Repairer { owns_cache_(options_.block_cache != options.block_cache), next_file_number_(1) { // TableCache can be small since we expect each table to be opened once. - table_cache_ = new TableCache(dbname_, &options_, 10); + table_cache_ = new TableCache(dbname_, options_, 10); } ~Repairer() { diff --git a/db/table_cache.cc b/db/table_cache.cc index e3d82cd..6cf005b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -30,9 +30,9 @@ static void UnrefEntry(void* arg1, void* arg2) { } TableCache::TableCache(const std::string& dbname, - const Options* options, + const Options& options, int entries) - : env_(options->env), + : env_(options.env), dbname_(dbname), options_(options), cache_(NewLRUCache(entries)) { @@ -61,7 +61,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, } } if (s.ok()) { - s = Table::Open(*options_, file, file_size, &table); + s = Table::Open(options_, file, file_size, &table); } if (!s.ok()) { diff --git a/db/table_cache.h b/db/table_cache.h index 8cf4aaf..e9191dc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -20,7 +20,7 @@ class Env; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, int entries); + TableCache(const std::string& dbname, const Options& options, int entries); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -50,7 +50,7 @@ class TableCache { private: Env* const env_; const std::string dbname_; - const Options* options_; + const Options& options_; Cache* cache_; Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 9a98884..ee7abd4 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -4,14 +4,17 @@ #include "helpers/memenv/memenv.h" +#include + +#include +#include +#include + #include "leveldb/env.h" #include "leveldb/status.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/mutexlock.h" -#include -#include -#include -#include namespace leveldb { @@ -135,7 +138,7 @@ class FileState { void operator=(const FileState&); port::Mutex refs_mutex_; - int refs_; // Protected by refs_mutex_; + int refs_ GUARDED_BY(refs_mutex_); // The following fields are not protected by any mutex. They are only mutable // while the file is being written, and concurrent access is not allowed @@ -312,7 +315,8 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } - void DeleteFileInternal(const std::string& fname) { + void DeleteFileInternal(const std::string& fname) + EXCLUSIVE_LOCKS_REQUIRED(mutex_) { if (file_map_.find(fname) == file_map_.end()) { return; } @@ -386,7 +390,7 @@ class InMemoryEnv : public EnvWrapper { // Map from filenames to FileState objects, representing a simple file system. typedef std::map FileSystem; port::Mutex mutex_; - FileSystem file_map_; // Protected by mutex_. + FileSystem file_map_ GUARDED_BY(mutex_); }; } // namespace diff --git a/util/cache.cc b/util/cache.cc index bd914ae..10b7103 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -8,6 +8,7 @@ #include "leveldb/cache.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/hash.h" #include "util/mutexlock.h" @@ -174,25 +175,25 @@ class LRUCache { void LRU_Append(LRUHandle*list, LRUHandle* e); void Ref(LRUHandle* e); void Unref(LRUHandle* e); - bool FinishErase(LRUHandle* e); + bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Initialized before use. size_t capacity_; // mutex_ protects the following state. mutable port::Mutex mutex_; - size_t usage_; + size_t usage_ GUARDED_BY(mutex_); // Dummy head of LRU list. // lru.prev is newest entry, lru.next is oldest entry. // Entries have refs==1 and in_cache==true. - LRUHandle lru_; + LRUHandle lru_ GUARDED_BY(mutex_); // Dummy head of in-use list. // Entries are in use by clients, and have refs >= 2 and in_cache==true. - LRUHandle in_use_; + LRUHandle in_use_ GUARDED_BY(mutex_); - HandleTable table_; + HandleTable table_ GUARDED_BY(mutex_); }; LRUCache::LRUCache() @@ -227,11 +228,12 @@ void LRUCache::Ref(LRUHandle* e) { void LRUCache::Unref(LRUHandle* e) { assert(e->refs > 0); e->refs--; - if (e->refs == 0) { // Deallocate. + if (e->refs == 0) { // Deallocate. assert(!e->in_cache); (*e->deleter)(e->key(), e->value); free(e); - } else if (e->in_cache && e->refs == 1) { // No longer in use; move to lru_ list. + } else if (e->in_cache && e->refs == 1) { + // No longer in use; move to lru_ list. LRU_Remove(e); LRU_Append(&lru_, e); }