Explorar el Código

leveldb: Add more thread safety annotations.

After this CL, all classes with Mutex members should be covered by annotations. Exceptions are atomic members, which shouldn't need locking, and DBImpl members that cause errors when annotated, which will be tackled separately.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=190260865
master
costan hace 6 años
cometido por Victor Costan
padre
commit
0db30413a4
Se han modificado 7 ficheros con 64 adiciones y 54 borrados
  1. +8
    -11
      db/db_bench.cc
  2. +11
    -10
      db/db_impl.cc
  3. +6
    -4
      db/db_test.cc
  4. +17
    -9
      db/fault_injection_test.cc
  5. +6
    -4
      db/skiplist_test.cc
  6. +7
    -7
      util/env_posix.cc
  7. +9
    -9
      util/env_test.cc

+ 8
- 11
db/db_bench.cc Ver fichero

@ -282,8 +282,8 @@ class Stats {
// State shared by all concurrent executions of the same benchmark. // State shared by all concurrent executions of the same benchmark.
struct SharedState { struct SharedState {
port::Mutex mu; port::Mutex mu;
port::CondVar cv;
int total;
port::CondVar cv GUARDED_BY(mu);
int total GUARDED_BY(mu);
// Each thread goes through the following states: // Each thread goes through the following states:
// (1) initializing // (1) initializing
@ -291,11 +291,12 @@ struct SharedState {
// (3) running // (3) running
// (4) done // (4) done
int num_initialized;
int num_done;
bool start;
int num_initialized GUARDED_BY(mu);
int num_done GUARDED_BY(mu);
bool start GUARDED_BY(mu);
SharedState() : cv(&mu) { }
SharedState(int total)
: cv(&mu), total(total), num_initialized(0), num_done(0), start(false) { }
}; };
// Per-thread state for concurrent executions of the same benchmark. // Per-thread state for concurrent executions of the same benchmark.
@ -584,11 +585,7 @@ class Benchmark {
void RunBenchmark(int n, Slice name, void RunBenchmark(int n, Slice name,
void (Benchmark::*method)(ThreadState*)) { void (Benchmark::*method)(ThreadState*)) {
SharedState shared;
shared.total = n;
shared.num_initialized = 0;
shared.num_done = 0;
shared.start = false;
SharedState shared(n);
ThreadArg* arg = new ThreadArg[n]; ThreadArg* arg = new ThreadArg[n];
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {

+ 11
- 10
db/db_impl.cc Ver fichero

@ -1053,11 +1053,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
namespace { namespace {
struct IterState { struct IterState {
port::Mutex* mu;
Version* version;
MemTable* mem;
MemTable* imm;
port::Mutex* const mu;
Version* const version GUARDED_BY(mu);
MemTable* const mem GUARDED_BY(mu);
MemTable* const imm GUARDED_BY(mu);
IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
: mu(mutex), version(version), mem(mem), imm(imm) { }
}; };
static void CleanupIteratorState(void* arg1, void* arg2) { static void CleanupIteratorState(void* arg1, void* arg2) {
@ -1069,12 +1073,12 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
state->mu->Unlock(); state->mu->Unlock();
delete state; delete state;
} }
} // namespace
} // anonymous namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot, SequenceNumber* latest_snapshot,
uint32_t* seed) { uint32_t* seed) {
IterState* cleanup = new IterState;
mutex_.Lock(); mutex_.Lock();
*latest_snapshot = versions_->LastSequence(); *latest_snapshot = versions_->LastSequence();
@ -1091,10 +1095,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
NewMergingIterator(&internal_comparator_, &list[0], list.size()); NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref(); versions_->current()->Ref();
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
*seed = ++seed_; *seed = ++seed_;

+ 6
- 4
db/db_test.cc Ver fichero

@ -11,6 +11,8 @@
#include "leveldb/cache.h" #include "leveldb/cache.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -36,21 +38,21 @@ namespace {
class AtomicCounter { class AtomicCounter {
private: private:
port::Mutex mu_; port::Mutex mu_;
int count_;
int count_ GUARDED_BY(mu_);
public: public:
AtomicCounter() : count_(0) { } AtomicCounter() : count_(0) { }
void Increment() { void Increment() {
IncrementBy(1); IncrementBy(1);
} }
void IncrementBy(int count) {
void IncrementBy(int count) LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
count_ += count; count_ += count;
} }
int Read() {
int Read() LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
return count_; return count_;
} }
void Reset() {
void Reset() LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
count_ = 0; count_ = 0;
} }

+ 17
- 9
db/fault_injection_test.cc Ver fichero

@ -6,10 +6,10 @@
// the last "sync". It then checks for data loss errors by purposely dropping // the last "sync". It then checks for data loss errors by purposely dropping
// file data (or entire files) not protected by a "sync". // file data (or entire files) not protected by a "sync".
#include "leveldb/db.h"
#include <map> #include <map>
#include <set> #include <set>
#include "leveldb/db.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/log_format.h" #include "db/log_format.h"
@ -18,6 +18,8 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -126,7 +128,8 @@ class TestWritableFile : public WritableFile {
class FaultInjectionTestEnv : public EnvWrapper { class FaultInjectionTestEnv : public EnvWrapper {
public: public:
FaultInjectionTestEnv() : EnvWrapper(Env::Default()), filesystem_active_(true) {}
FaultInjectionTestEnv()
: EnvWrapper(Env::Default()), filesystem_active_(true) {}
virtual ~FaultInjectionTestEnv() { } virtual ~FaultInjectionTestEnv() { }
virtual Status NewWritableFile(const std::string& fname, virtual Status NewWritableFile(const std::string& fname,
WritableFile** result); WritableFile** result);
@ -146,14 +149,20 @@ class FaultInjectionTestEnv : public EnvWrapper {
// system reset. Setting to inactive will freeze our saved filesystem state so // system reset. Setting to inactive will freeze our saved filesystem state so
// that it will stop being recorded. It can then be reset back to the state at // that it will stop being recorded. It can then be reset back to the state at
// the time of the reset. // the time of the reset.
bool IsFilesystemActive() const { return filesystem_active_; }
void SetFilesystemActive(bool active) { filesystem_active_ = active; }
bool IsFilesystemActive() LOCKS_EXCLUDED(mutex_) {
MutexLock l(&mutex_);
return filesystem_active_;
}
void SetFilesystemActive(bool active) LOCKS_EXCLUDED(mutex_) {
MutexLock l(&mutex_);
filesystem_active_ = active;
}
private: private:
port::Mutex mutex_; port::Mutex mutex_;
std::map<std::string, FileState> db_file_state_;
std::set<std::string> new_files_since_last_dir_sync_;
bool filesystem_active_; // Record flushes, syncs, writes
std::map<std::string, FileState> db_file_state_ GUARDED_BY(mutex_);
std::set<std::string> new_files_since_last_dir_sync_ GUARDED_BY(mutex_);
bool filesystem_active_ GUARDED_BY(mutex_); // Record flushes, syncs, writes
}; };
TestWritableFile::TestWritableFile(const FileState& state, TestWritableFile::TestWritableFile(const FileState& state,
@ -328,7 +337,6 @@ void FaultInjectionTestEnv::ResetState() {
// Since we are not destroying the database, the existing files // Since we are not destroying the database, the existing files
// should keep their recorded synced/flushed state. Therefore // should keep their recorded synced/flushed state. Therefore
// we do not reset db_file_state_ and new_files_since_last_dir_sync_. // we do not reset db_file_state_ and new_files_since_last_dir_sync_.
MutexLock l(&mutex_);
SetFilesystemActive(true); SetFilesystemActive(true);
} }

+ 6
- 4
db/skiplist_test.cc Ver fichero

@ -5,6 +5,8 @@
#include "db/skiplist.h" #include "db/skiplist.h"
#include <set> #include <set>
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/random.h" #include "util/random.h"
@ -312,7 +314,7 @@ class TestState {
state_(STARTING), state_(STARTING),
state_cv_(&mu_) {} state_cv_(&mu_) {}
void Wait(ReaderState s) {
void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) {
mu_.Lock(); mu_.Lock();
while (state_ != s) { while (state_ != s) {
state_cv_.Wait(); state_cv_.Wait();
@ -320,7 +322,7 @@ class TestState {
mu_.Unlock(); mu_.Unlock();
} }
void Change(ReaderState s) {
void Change(ReaderState s) LOCKS_EXCLUDED(mu_) {
mu_.Lock(); mu_.Lock();
state_ = s; state_ = s;
state_cv_.Signal(); state_cv_.Signal();
@ -329,8 +331,8 @@ class TestState {
private: private:
port::Mutex mu_; port::Mutex mu_;
ReaderState state_;
port::CondVar state_cv_;
ReaderState state_ GUARDED_BY(mu_);
port::CondVar state_cv_ GUARDED_BY(mu_);
}; };
static void ConcurrentReader(void* arg) { static void ConcurrentReader(void* arg) {

+ 7
- 7
util/env_posix.cc Ver fichero

@ -22,6 +22,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "port/port.h" #include "port/port.h"
#include "port/thread_annotations.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/posix_logger.h" #include "util/posix_logger.h"
@ -57,7 +58,7 @@ class Limiter {
// If another resource is available, acquire it and return true. // If another resource is available, acquire it and return true.
// Else return false. // Else return false.
bool Acquire() {
bool Acquire() LOCKS_EXCLUDED(mu_) {
if (GetAllowed() <= 0) { if (GetAllowed() <= 0) {
return false; return false;
} }
@ -73,7 +74,7 @@ class Limiter {
// Release a resource acquired by a previous call to Acquire() that returned // Release a resource acquired by a previous call to Acquire() that returned
// true. // true.
void Release() {
void Release() LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
SetAllowed(GetAllowed() + 1); SetAllowed(GetAllowed() + 1);
} }
@ -86,8 +87,7 @@ class Limiter {
return reinterpret_cast<intptr_t>(allowed_.Acquire_Load()); return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
} }
// REQUIRES: mu_ must be held
void SetAllowed(intptr_t v) {
void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
allowed_.Release_Store(reinterpret_cast<void*>(v)); allowed_.Release_Store(reinterpret_cast<void*>(v));
} }
@ -365,13 +365,13 @@ class PosixFileLock : public FileLock {
class PosixLockTable { class PosixLockTable {
private: private:
port::Mutex mu_; port::Mutex mu_;
std::set<std::string> locked_files_;
std::set<std::string> locked_files_ GUARDED_BY(mu_);
public: public:
bool Insert(const std::string& fname) {
bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
return locked_files_.insert(fname).second; return locked_files_.insert(fname).second;
} }
void Remove(const std::string& fname) {
void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_); MutexLock l(&mu_);
locked_files_.erase(fname); locked_files_.erase(fname);
} }

+ 9
- 9
util/env_test.cc Ver fichero

@ -7,6 +7,8 @@
#include <algorithm> #include <algorithm>
#include "port/port.h" #include "port/port.h"
#include "port/thread_annotations.h"
#include "util/mutexlock.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
@ -17,10 +19,6 @@ static const int kReadOnlyFileLimit = 4;
static const int kMMapLimit = 4; static const int kMMapLimit = 4;
class EnvTest { class EnvTest {
private:
port::Mutex mu_;
std::string events_;
public: public:
Env* env_; Env* env_;
EnvTest() : env_(Env::Default()) { } EnvTest() : env_(Env::Default()) { }
@ -119,8 +117,10 @@ TEST(EnvTest, RunMany) {
struct State { struct State {
port::Mutex mu; port::Mutex mu;
int val;
int num_running;
int val GUARDED_BY(mu);
int num_running GUARDED_BY(mu);
State(int val, int num_running) : val(val), num_running(num_running) { }
}; };
static void ThreadBody(void* arg) { static void ThreadBody(void* arg) {
@ -132,9 +132,7 @@ static void ThreadBody(void* arg) {
} }
TEST(EnvTest, StartThread) { TEST(EnvTest, StartThread) {
State state;
state.val = 0;
state.num_running = 3;
State state(0, 3);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
env_->StartThread(&ThreadBody, &state); env_->StartThread(&ThreadBody, &state);
} }
@ -147,6 +145,8 @@ TEST(EnvTest, StartThread) {
} }
env_->SleepForMicroseconds(kDelayMicros); env_->SleepForMicroseconds(kDelayMicros);
} }
MutexLock l(&state.mu);
ASSERT_EQ(state.val, 3); ASSERT_EQ(state.val, 3);
} }

Cargando…
Cancelar
Guardar