From 0db30413a4cfa8c980e675ba5cb96717d688af92 Mon Sep 17 00:00:00 2001 From: costan Date: Fri, 23 Mar 2018 12:50:14 -0700 Subject: [PATCH] leveldb: Add more thread safety annotations. After this CL, all classes with Mutex members should be covered by annotations. Exceptions are atomic members, which shouldn't need locking, and DBImpl members that cause errors when annotated, which will be tackled separately. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=190260865 --- db/db_bench.cc | 19 ++++++++----------- db/db_impl.cc | 21 +++++++++++---------- db/db_test.cc | 10 ++++++---- db/fault_injection_test.cc | 26 +++++++++++++++++--------- db/skiplist_test.cc | 10 ++++++---- util/env_posix.cc | 14 +++++++------- util/env_test.cc | 18 +++++++++--------- 7 files changed, 64 insertions(+), 54 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 920f119..701b128 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -282,8 +282,8 @@ class Stats { // State shared by all concurrent executions of the same benchmark. struct SharedState { port::Mutex mu; - port::CondVar cv; - int total; + port::CondVar cv GUARDED_BY(mu); + int total GUARDED_BY(mu); // Each thread goes through the following states: // (1) initializing @@ -291,11 +291,12 @@ struct SharedState { // (3) running // (4) done - int num_initialized; - int num_done; - bool start; + int num_initialized GUARDED_BY(mu); + int num_done GUARDED_BY(mu); + bool start GUARDED_BY(mu); - SharedState() : cv(&mu) { } + SharedState(int total) + : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) { } }; // Per-thread state for concurrent executions of the same benchmark. @@ -584,11 +585,7 @@ class Benchmark { void RunBenchmark(int n, Slice name, void (Benchmark::*method)(ThreadState*)) { - SharedState shared; - shared.total = n; - shared.num_initialized = 0; - shared.num_done = 0; - shared.start = false; + SharedState shared(n); ThreadArg* arg = new ThreadArg[n]; for (int i = 0; i < n; i++) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 8484e46..cd8792e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1053,11 +1053,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } namespace { + struct IterState { - port::Mutex* mu; - Version* version; - MemTable* mem; - MemTable* imm; + port::Mutex* const mu; + Version* const version GUARDED_BY(mu); + MemTable* const mem GUARDED_BY(mu); + MemTable* const imm GUARDED_BY(mu); + + IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) + : mu(mutex), version(version), mem(mem), imm(imm) { } }; static void CleanupIteratorState(void* arg1, void* arg2) { @@ -1069,12 +1073,12 @@ static void CleanupIteratorState(void* arg1, void* arg2) { state->mu->Unlock(); delete state; } -} // namespace + +} // anonymous namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot, uint32_t* seed) { - IterState* cleanup = new IterState; mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); @@ -1091,10 +1095,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); - cleanup->mu = &mutex_; - cleanup->mem = mem_; - cleanup->imm = imm_; - cleanup->version = versions_->current(); + IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); *seed = ++seed_; diff --git a/db/db_test.cc b/db/db_test.cc index c818113..b1d2cd8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,8 @@ #include "leveldb/cache.h" #include "leveldb/env.h" #include "leveldb/table.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -36,21 +38,21 @@ namespace { class AtomicCounter { private: port::Mutex mu_; - int count_; + int count_ GUARDED_BY(mu_); public: AtomicCounter() : count_(0) { } void Increment() { IncrementBy(1); } - void IncrementBy(int count) { + void IncrementBy(int count) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); count_ += count; } - int Read() { + int Read() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); return count_; } - void Reset() { + void Reset() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); count_ = 0; } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index f8b2440..caead37 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -6,10 +6,10 @@ // the last "sync". It then checks for data loss errors by purposely dropping // file data (or entire files) not protected by a "sync". -#include "leveldb/db.h" - #include #include + +#include "leveldb/db.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" @@ -18,6 +18,8 @@ #include "leveldb/env.h" #include "leveldb/table.h" #include "leveldb/write_batch.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" @@ -126,7 +128,8 @@ class TestWritableFile : public WritableFile { class FaultInjectionTestEnv : public EnvWrapper { public: - FaultInjectionTestEnv() : EnvWrapper(Env::Default()), filesystem_active_(true) {} + FaultInjectionTestEnv() + : EnvWrapper(Env::Default()), filesystem_active_(true) {} virtual ~FaultInjectionTestEnv() { } virtual Status NewWritableFile(const std::string& fname, WritableFile** result); @@ -146,14 +149,20 @@ class FaultInjectionTestEnv : public EnvWrapper { // system reset. Setting to inactive will freeze our saved filesystem state so // that it will stop being recorded. It can then be reset back to the state at // the time of the reset. - bool IsFilesystemActive() const { return filesystem_active_; } - void SetFilesystemActive(bool active) { filesystem_active_ = active; } + bool IsFilesystemActive() LOCKS_EXCLUDED(mutex_) { + MutexLock l(&mutex_); + return filesystem_active_; + } + void SetFilesystemActive(bool active) LOCKS_EXCLUDED(mutex_) { + MutexLock l(&mutex_); + filesystem_active_ = active; + } private: port::Mutex mutex_; - std::map db_file_state_; - std::set new_files_since_last_dir_sync_; - bool filesystem_active_; // Record flushes, syncs, writes + std::map db_file_state_ GUARDED_BY(mutex_); + std::set new_files_since_last_dir_sync_ GUARDED_BY(mutex_); + bool filesystem_active_ GUARDED_BY(mutex_); // Record flushes, syncs, writes }; TestWritableFile::TestWritableFile(const FileState& state, @@ -328,7 +337,6 @@ void FaultInjectionTestEnv::ResetState() { // Since we are not destroying the database, the existing files // should keep their recorded synced/flushed state. Therefore // we do not reset db_file_state_ and new_files_since_last_dir_sync_. - MutexLock l(&mutex_); SetFilesystemActive(true); } diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index aee1461..90f9d0e 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -5,6 +5,8 @@ #include "db/skiplist.h" #include #include "leveldb/env.h" +#include "port/port.h" +#include "port/thread_annotations.h" #include "util/arena.h" #include "util/hash.h" #include "util/random.h" @@ -312,7 +314,7 @@ class TestState { state_(STARTING), state_cv_(&mu_) {} - void Wait(ReaderState s) { + void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); while (state_ != s) { state_cv_.Wait(); @@ -320,7 +322,7 @@ class TestState { mu_.Unlock(); } - void Change(ReaderState s) { + void Change(ReaderState s) LOCKS_EXCLUDED(mu_) { mu_.Lock(); state_ = s; state_cv_.Signal(); @@ -329,8 +331,8 @@ class TestState { private: port::Mutex mu_; - ReaderState state_; - port::CondVar state_cv_; + ReaderState state_ GUARDED_BY(mu_); + port::CondVar state_cv_ GUARDED_BY(mu_); }; static void ConcurrentReader(void* arg) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 8327978..4bfaf6c 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -22,6 +22,7 @@ #include "leveldb/env.h" #include "leveldb/slice.h" #include "port/port.h" +#include "port/thread_annotations.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/posix_logger.h" @@ -57,7 +58,7 @@ class Limiter { // If another resource is available, acquire it and return true. // Else return false. - bool Acquire() { + bool Acquire() LOCKS_EXCLUDED(mu_) { if (GetAllowed() <= 0) { return false; } @@ -73,7 +74,7 @@ class Limiter { // Release a resource acquired by a previous call to Acquire() that returned // true. - void Release() { + void Release() LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); SetAllowed(GetAllowed() + 1); } @@ -86,8 +87,7 @@ class Limiter { return reinterpret_cast(allowed_.Acquire_Load()); } - // REQUIRES: mu_ must be held - void SetAllowed(intptr_t v) { + void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) { allowed_.Release_Store(reinterpret_cast(v)); } @@ -365,13 +365,13 @@ class PosixFileLock : public FileLock { class PosixLockTable { private: port::Mutex mu_; - std::set locked_files_; + std::set locked_files_ GUARDED_BY(mu_); public: - bool Insert(const std::string& fname) { + bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); return locked_files_.insert(fname).second; } - void Remove(const std::string& fname) { + void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) { MutexLock l(&mu_); locked_files_.erase(fname); } diff --git a/util/env_test.cc b/util/env_test.cc index 0bf7121..fd89b4c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -7,6 +7,8 @@ #include #include "port/port.h" +#include "port/thread_annotations.h" +#include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" @@ -17,10 +19,6 @@ static const int kReadOnlyFileLimit = 4; static const int kMMapLimit = 4; class EnvTest { - private: - port::Mutex mu_; - std::string events_; - public: Env* env_; EnvTest() : env_(Env::Default()) { } @@ -119,8 +117,10 @@ TEST(EnvTest, RunMany) { struct State { port::Mutex mu; - int val; - int num_running; + int val GUARDED_BY(mu); + int num_running GUARDED_BY(mu); + + State(int val, int num_running) : val(val), num_running(num_running) { } }; static void ThreadBody(void* arg) { @@ -132,9 +132,7 @@ static void ThreadBody(void* arg) { } TEST(EnvTest, StartThread) { - State state; - state.val = 0; - state.num_running = 3; + State state(0, 3); for (int i = 0; i < 3; i++) { env_->StartThread(&ThreadBody, &state); } @@ -147,6 +145,8 @@ TEST(EnvTest, StartThread) { } env_->SleepForMicroseconds(kDelayMicros); } + + MutexLock l(&state.mu); ASSERT_EQ(state.val, 3); }