From 0fa5a4f7b1ad9dc16b705bcad1f3ca913f187325 Mon Sep 17 00:00:00 2001 From: costan Date: Fri, 16 Mar 2018 10:06:35 -0700 Subject: [PATCH] Extend thread safety annotations. This CL makes it easier to reason about thread safety by: 1) Adding Clang thread safety annotations according to comments. 2) Expanding a couple of variable names, without adding extra lines of code. 3) Adding const in a couple of places. 4) Replacing an always-non-null const pointer with a reference. 5) Fixing style warnings in the modified files. This CL does not annotate the DBImpl members that claim to be protected by the instance mutex, but are accessed without the mutex being held. Those members (and their unprotected accesses) will be addressed in future CLs. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=189354657 --- db/db_impl.cc | 68 +++++++++++++++++++++++++++--------------------- db/db_impl.h | 41 +++++++++++++++-------------- db/memtable.h | 1 - db/repair.cc | 2 +- db/table_cache.cc | 6 ++--- db/table_cache.h | 4 +-- helpers/memenv/memenv.cc | 18 ++++++++----- util/cache.cc | 16 +++++++----- 8 files changed, 85 insertions(+), 71 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a9044c2..8484e46 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,12 +4,14 @@ #include "db/db_impl.h" +#include +#include + #include #include #include -#include -#include #include + #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -82,7 +84,7 @@ struct DBImpl::CompactionState { }; // Fix user-supplied options to be reasonable -template +template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { if (static_cast(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast(*ptr) < minvalue) *ptr = minvalue; @@ -114,6 +116,11 @@ Options SanitizeOptions(const std::string& dbname, return result; } +static int TableCacheSize(const Options& sanitized_options) { + // Reserve ten files or so for other uses and give the rest to TableCache. + return sanitized_options.max_open_files - kNumNonTableCacheFiles; +} + DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) : env_(raw_options.env), internal_comparator_(raw_options.comparator), @@ -123,9 +130,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) owns_info_log_(options_.info_log != raw_options.info_log), owns_cache_(options_.block_cache != raw_options.block_cache), dbname_(dbname), + table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), db_lock_(NULL), shutting_down_(NULL), - bg_cv_(&mutex_), + background_work_finished_signal_(&mutex_), mem_(NULL), imm_(NULL), logfile_(NULL), @@ -133,24 +141,19 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) log_(NULL), seed_(0), tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - manual_compaction_(NULL) { + background_compaction_scheduled_(false), + manual_compaction_(NULL), + versions_(new VersionSet(dbname_, &options_, table_cache_, + &internal_comparator_)) { has_imm_.Release_Store(NULL); - - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; - table_cache_ = new TableCache(dbname_, &options_, table_cache_size); - - versions_ = new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_); } DBImpl::~DBImpl() { // Wait for background work to finish mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); } mutex_.Unlock(); @@ -216,6 +219,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } void DBImpl::DeleteObsoleteFiles() { + mutex_.AssertHeld(); + if (!bg_error_.ok()) { // After a background error, we don't know whether a new version may // or may not have been committed, so we cannot safely garbage collect. @@ -227,7 +232,7 @@ void DBImpl::DeleteObsoleteFiles() { versions_->AddLiveFiles(&live); std::vector filenames; - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { @@ -263,7 +268,7 @@ void DBImpl::DeleteObsoleteFiles() { table_cache_->Evict(number); } Log(options_.info_log, "Delete type=%d #%lld\n", - int(type), + static_cast(type), static_cast(number)); env_->DeleteFile(dbname_ + "/" + filenames[i]); } @@ -575,13 +580,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } } } - TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } } -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { +void DBImpl::TEST_CompactRange(int level, const Slice* begin, + const Slice* end) { assert(level >= 0); assert(level + 1 < config::kNumLevels); @@ -609,7 +615,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { manual_compaction_ = &manual; MaybeScheduleCompaction(); } else { // Running either my compaction or another compaction. - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } } if (manual_compaction_ == &manual) { @@ -625,7 +631,7 @@ Status DBImpl::TEST_CompactMemTable() { // Wait until the compaction completes MutexLock l(&mutex_); while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } if (imm_ != NULL) { s = bg_error_; @@ -638,13 +644,13 @@ void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { bg_error_ = s; - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } } void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { + if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions @@ -655,7 +661,7 @@ void DBImpl::MaybeScheduleCompaction() { !versions_->NeedsCompaction()) { // No work to be done } else { - bg_compaction_scheduled_ = true; + background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } } @@ -666,7 +672,7 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); + assert(background_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { @@ -675,12 +681,12 @@ void DBImpl::BackgroundCall() { BackgroundCompaction(); } - bg_compaction_scheduled_ = false; + background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); - bg_cv_.SignalAll(); + background_work_finished_signal_.SignalAll(); } void DBImpl::BackgroundCompaction() { @@ -920,7 +926,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { mutex_.Lock(); if (imm_ != NULL) { CompactMemTable(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); @@ -1267,6 +1274,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-NULL batch WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + mutex_.AssertHeld(); assert(!writers_.empty()); Writer* first = writers_.front(); WriteBatch* result = first->batch; @@ -1346,11 +1354,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); - bg_cv_.Wait(); + background_work_finished_signal_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); diff --git a/db/db_impl.h b/db/db_impl.h index 3861b86..6344112 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -84,7 +84,7 @@ class DBImpl : public DB { void MaybeIgnoreError(Status* s) const; // Delete any unneeded files and stale in-memory entries. - void DeleteObsoleteFiles(); + void DeleteObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. @@ -100,14 +100,15 @@ class DBImpl : public DB { Status MakeRoomForWrite(bool force /* compact even if there is room? */) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - WriteBatch* BuildBatchGroup(Writer** last_writer); + WriteBatch* BuildBatchGroup(Writer** last_writer) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); void BackgroundCall(); - void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -123,12 +124,12 @@ class DBImpl : public DB { const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; const Options options_; // options_.comparator == &internal_comparator_ - bool owns_info_log_; - bool owns_cache_; + const bool owns_info_log_; + const bool owns_cache_; const std::string dbname_; // table_cache_ provides its own synchronization - TableCache* table_cache_; + TableCache* const table_cache_; // Lock over the persistent DB state. Non-NULL iff successfully acquired. FileLock* db_lock_; @@ -136,27 +137,27 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when background work finishes + port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; - MemTable* imm_; // Memtable being compacted - port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ + MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted + port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ WritableFile* logfile_; - uint64_t logfile_number_; + uint64_t logfile_number_ GUARDED_BY(mutex_); log::Writer* log_; - uint32_t seed_; // For sampling. + uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. - std::deque writers_; - WriteBatch* tmp_batch_; + std::deque writers_ GUARDED_BY(mutex_); + WriteBatch* tmp_batch_ GUARDED_BY(mutex_); - SnapshotList snapshots_; + SnapshotList snapshots_ GUARDED_BY(mutex_); // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set pending_outputs_; + std::set pending_outputs_ GUARDED_BY(mutex_); // Has a background compaction been scheduled or is running? - bool bg_compaction_scheduled_; + bool background_compaction_scheduled_ GUARDED_BY(mutex_); // Information for a manual compaction struct ManualCompaction { @@ -166,12 +167,12 @@ class DBImpl : public DB { const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress }; - ManualCompaction* manual_compaction_; + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); - VersionSet* versions_; + VersionSet* const versions_; // Have we encountered a background error in paranoid mode? - Status bg_error_; + Status bg_error_ GUARDED_BY(mutex_); // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". @@ -188,7 +189,7 @@ class DBImpl : public DB { this->bytes_written += c.bytes_written; } }; - CompactionStats stats_[config::kNumLevels]; + CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); // No copying allowed DBImpl(const DBImpl&); diff --git a/db/memtable.h b/db/memtable.h index 9f41567..f2a6736 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -14,7 +14,6 @@ namespace leveldb { class InternalKeyComparator; -class Mutex; class MemTableIterator; class MemTable { diff --git a/db/repair.cc b/db/repair.cc index 4cd4bb0..c10da82 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -54,7 +54,7 @@ class Repairer { owns_cache_(options_.block_cache != options.block_cache), next_file_number_(1) { // TableCache can be small since we expect each table to be opened once. - table_cache_ = new TableCache(dbname_, &options_, 10); + table_cache_ = new TableCache(dbname_, options_, 10); } ~Repairer() { diff --git a/db/table_cache.cc b/db/table_cache.cc index e3d82cd..6cf005b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -30,9 +30,9 @@ static void UnrefEntry(void* arg1, void* arg2) { } TableCache::TableCache(const std::string& dbname, - const Options* options, + const Options& options, int entries) - : env_(options->env), + : env_(options.env), dbname_(dbname), options_(options), cache_(NewLRUCache(entries)) { @@ -61,7 +61,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, } } if (s.ok()) { - s = Table::Open(*options_, file, file_size, &table); + s = Table::Open(options_, file, file_size, &table); } if (!s.ok()) { diff --git a/db/table_cache.h b/db/table_cache.h index 8cf4aaf..e9191dc 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -20,7 +20,7 @@ class Env; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, int entries); + TableCache(const std::string& dbname, const Options& options, int entries); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -50,7 +50,7 @@ class TableCache { private: Env* const env_; const std::string dbname_; - const Options* options_; + const Options& options_; Cache* cache_; Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 9a98884..ee7abd4 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -4,14 +4,17 @@ #include "helpers/memenv/memenv.h" +#include + +#include +#include +#include + #include "leveldb/env.h" #include "leveldb/status.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/mutexlock.h" -#include -#include -#include -#include namespace leveldb { @@ -135,7 +138,7 @@ class FileState { void operator=(const FileState&); port::Mutex refs_mutex_; - int refs_; // Protected by refs_mutex_; + int refs_ GUARDED_BY(refs_mutex_); // The following fields are not protected by any mutex. They are only mutable // while the file is being written, and concurrent access is not allowed @@ -312,7 +315,8 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } - void DeleteFileInternal(const std::string& fname) { + void DeleteFileInternal(const std::string& fname) + EXCLUSIVE_LOCKS_REQUIRED(mutex_) { if (file_map_.find(fname) == file_map_.end()) { return; } @@ -386,7 +390,7 @@ class InMemoryEnv : public EnvWrapper { // Map from filenames to FileState objects, representing a simple file system. typedef std::map FileSystem; port::Mutex mutex_; - FileSystem file_map_; // Protected by mutex_. + FileSystem file_map_ GUARDED_BY(mutex_); }; } // namespace diff --git a/util/cache.cc b/util/cache.cc index bd914ae..10b7103 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -8,6 +8,7 @@ #include "leveldb/cache.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/hash.h" #include "util/mutexlock.h" @@ -174,25 +175,25 @@ class LRUCache { void LRU_Append(LRUHandle*list, LRUHandle* e); void Ref(LRUHandle* e); void Unref(LRUHandle* e); - bool FinishErase(LRUHandle* e); + bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_); // Initialized before use. size_t capacity_; // mutex_ protects the following state. mutable port::Mutex mutex_; - size_t usage_; + size_t usage_ GUARDED_BY(mutex_); // Dummy head of LRU list. // lru.prev is newest entry, lru.next is oldest entry. // Entries have refs==1 and in_cache==true. - LRUHandle lru_; + LRUHandle lru_ GUARDED_BY(mutex_); // Dummy head of in-use list. // Entries are in use by clients, and have refs >= 2 and in_cache==true. - LRUHandle in_use_; + LRUHandle in_use_ GUARDED_BY(mutex_); - HandleTable table_; + HandleTable table_ GUARDED_BY(mutex_); }; LRUCache::LRUCache() @@ -227,11 +228,12 @@ void LRUCache::Ref(LRUHandle* e) { void LRUCache::Unref(LRUHandle* e) { assert(e->refs > 0); e->refs--; - if (e->refs == 0) { // Deallocate. + if (e->refs == 0) { // Deallocate. assert(!e->in_cache); (*e->deleter)(e->key(), e->value); free(e); - } else if (e->in_cache && e->refs == 1) { // No longer in use; move to lru_ list. + } else if (e->in_cache && e->refs == 1) { + // No longer in use; move to lru_ list. LRU_Remove(e); LRU_Append(&lru_, e); }