diff --git a/db/db_bench.cc b/db/db_bench.cc index 920f119..701b128 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -282,8 +282,8 @@ class Stats { // State shared by all concurrent executions of the same benchmark. struct SharedState { port::Mutex mu; - port::CondVar cv; - int total; + port::CondVar cv GUARDED_BY(mu); + int total GUARDED_BY(mu); // Each thread goes through the following states: // (1) initializing @@ -291,11 +291,12 @@ struct SharedState { // (3) running // (4) done - int num_initialized; - int num_done; - bool start; + int num_initialized GUARDED_BY(mu); + int num_done GUARDED_BY(mu); + bool start GUARDED_BY(mu); - SharedState() : cv(&mu) { } + SharedState(int total) + : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) { } }; // Per-thread state for concurrent executions of the same benchmark. @@ -584,11 +585,7 @@ class Benchmark { void RunBenchmark(int n, Slice name, void (Benchmark::*method)(ThreadState*)) { - SharedState shared; - shared.total = n; - shared.num_initialized = 0; - shared.num_done = 0; - shared.start = false; + SharedState shared(n); ThreadArg* arg = new ThreadArg[n]; for (int i = 0; i < n; i++) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 8484e46..cd8792e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1053,11 +1053,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } namespace { + struct IterState { - port::Mutex* mu; - Version* version; - MemTable* mem; - MemTable* imm; + port::Mutex* const mu; + Version* const version GUARDED_BY(mu); + MemTable* const mem GUARDED_BY(mu); + MemTable* const imm GUARDED_BY(mu); + + IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) + : mu(mutex), version(version), mem(mem), imm(imm) { } }; static void CleanupIteratorState(void* arg1, void* arg2) { @@ -1069,12 +1073,12 @@ static void CleanupIteratorState(void* arg1, void* arg2) { state->mu->Unlock(); delete state; } -} // namespace + +} // anonymous namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot, uint32_t* seed) { - IterState* cleanup = new IterState; mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); @@ -1091,10 +1095,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); - cleanup->mu = &mutex_; - cleanup->mem = mem_; - cleanup->imm = imm_; - cleanup->version = versions_->current(); + IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); *seed = ++seed_; diff --git a/db/db_test.cc b/db/db_test.cc index c818113..b1d2cd8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,8 @@ #include "leveldb/cache.h" #include "leveldb/env.h" #include "leveldb/table.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -36,21 +38,21 @@ namespace { class AtomicCounter { private: port::Mutex mu_; - int count_; + int count_ GUARDED_BY(mu_); public: AtomicCounter() : count_(0) { } void Increment() { IncrementBy(1); } - void IncrementBy(int count) { + void IncrementBy(int count) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); count_ += count; } - int Read() { + int Read() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); return count_; } - void Reset() { + void Reset() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); count_ = 0; } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index f8b2440..caead37 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -6,10 +6,10 @@ // the last "sync". It then checks for data loss errors by purposely dropping // file data (or entire files) not protected by a "sync". -#include "leveldb/db.h" - #include #include + +#include "leveldb/db.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" @@ -18,6 +18,8 @@ #include "leveldb/env.h" #include "leveldb/table.h" #include "leveldb/write_batch.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" @@ -126,7 +128,8 @@ class TestWritableFile : public WritableFile { class FaultInjectionTestEnv : public EnvWrapper { public: - FaultInjectionTestEnv() : EnvWrapper(Env::Default()), filesystem_active_(true) {} + FaultInjectionTestEnv() + : EnvWrapper(Env::Default()), filesystem_active_(true) {} virtual ~FaultInjectionTestEnv() { } virtual Status NewWritableFile(const std::string& fname, WritableFile** result); @@ -146,14 +149,20 @@ class FaultInjectionTestEnv : public EnvWrapper { // system reset. Setting to inactive will freeze our saved filesystem state so // that it will stop being recorded. It can then be reset back to the state at // the time of the reset. - bool IsFilesystemActive() const { return filesystem_active_; } - void SetFilesystemActive(bool active) { filesystem_active_ = active; } + bool IsFilesystemActive() LOCKS_EXCLUDED(mutex_) { + MutexLock l(&mutex_); + return filesystem_active_; + } + void SetFilesystemActive(bool active) LOCKS_EXCLUDED(mutex_) { + MutexLock l(&mutex_); + filesystem_active_ = active; + } private: port::Mutex mutex_; - std::map db_file_state_; - std::set new_files_since_last_dir_sync_; - bool filesystem_active_; // Record flushes, syncs, writes + std::map db_file_state_ GUARDED_BY(mutex_); + std::set new_files_since_last_dir_sync_ GUARDED_BY(mutex_); + bool filesystem_active_ GUARDED_BY(mutex_); // Record flushes, syncs, writes }; TestWritableFile::TestWritableFile(const FileState& state, @@ -328,7 +337,6 @@ void FaultInjectionTestEnv::ResetState() { // Since we are not destroying the database, the existing files // should keep their recorded synced/flushed state. Therefore // we do not reset db_file_state_ and new_files_since_last_dir_sync_. - MutexLock l(&mutex_); SetFilesystemActive(true); } diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index aee1461..90f9d0e 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -5,6 +5,8 @@ #include "db/skiplist.h" #include #include "leveldb/env.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/arena.h" #include "util/hash.h" #include "util/random.h" @@ -312,7 +314,7 @@ class TestState { state_(STARTING), state_cv_(&mu_) {} - void Wait(ReaderState s) { + void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); while (state_ != s) { state_cv_.Wait(); @@ -320,7 +322,7 @@ class TestState { mu_.Unlock(); } - void Change(ReaderState s) { + void Change(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); state_ = s; state_cv_.Signal(); @@ -329,8 +331,8 @@ class TestState { private: port::Mutex mu_; - ReaderState state_; - port::CondVar state_cv_; + ReaderState state_ GUARDED_BY(mu_); + port::CondVar state_cv_ GUARDED_BY(mu_); }; static void ConcurrentReader(void* arg) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 8327978..4bfaf6c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -22,6 +22,7 @@ #include "leveldb/env.h" #include "leveldb/slice.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/posix_logger.h" @@ -57,7 +58,7 @@ class Limiter { // If another resource is available, acquire it and return true. // Else return false. - bool Acquire() { + bool Acquire() LOCKS_EXCLUDED(mu_) { if (GetAllowed() <= 0) { return false; } @@ -73,7 +74,7 @@ class Limiter { // Release a resource acquired by a previous call to Acquire() that returned // true. - void Release() { + void Release() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); SetAllowed(GetAllowed() + 1); } @@ -86,8 +87,7 @@ class Limiter { return reinterpret_cast(allowed_.Acquire_Load()); } - // REQUIRES: mu_ must be held - void SetAllowed(intptr_t v) { + void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) { allowed_.Release_Store(reinterpret_cast(v)); } @@ -365,13 +365,13 @@ class PosixFileLock : public FileLock { class PosixLockTable { private: port::Mutex mu_; - std::set locked_files_; + std::set locked_files_ GUARDED_BY(mu_); public: - bool Insert(const std::string& fname) { + bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); return locked_files_.insert(fname).second; } - void Remove(const std::string& fname) { + void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); locked_files_.erase(fname); } diff --git a/util/env_test.cc b/util/env_test.cc index 0bf7121..fd89b4c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -7,6 +7,8 @@ #include #include "port/port.h" +#include "port/thread_annotations.h" +#include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" @@ -17,10 +19,6 @@ static const int kReadOnlyFileLimit = 4; static const int kMMapLimit = 4; class EnvTest { - private: - port::Mutex mu_; - std::string events_; - public: Env* env_; EnvTest() : env_(Env::Default()) { } @@ -119,8 +117,10 @@ TEST(EnvTest, RunMany) { struct State { port::Mutex mu; - int val; - int num_running; + int val GUARDED_BY(mu); + int num_running GUARDED_BY(mu); + + State(int val, int num_running) : val(val), num_running(num_running) { } }; static void ThreadBody(void* arg) { @@ -132,9 +132,7 @@ static void ThreadBody(void* arg) { } TEST(EnvTest, StartThread) { - State state; - state.val = 0; - state.num_running = 3; + State state(0, 3); for (int i = 0; i < 3; i++) { env_->StartThread(&ThreadBody, &state); } @@ -147,6 +145,8 @@ TEST(EnvTest, StartThread) { } env_->SleepForMicroseconds(kDelayMicros); } + + MutexLock l(&state.mu); ASSERT_EQ(state.val, 3); }