Kaynağa Gözat

Extend thread safety annotations.

This CL makes it easier to reason about thread safety by:

1) Adding Clang thread safety annotations according to comments.
2) Expanding a couple of variable names, without adding extra lines of code.
3) Adding const in a couple of places.
4) Replacing an always-non-null const pointer with a reference.
5) Fixing style warnings in the modified files.

This CL does not annotate the DBImpl members that claim to be protected
by the instance mutex, but are accessed without the mutex being held.
Those members (and their unprotected accesses) will be addressed in
future CLs.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=189354657
main
costan 6 yıl önce
işlemeyi yapan: Victor Costan
ebeveyn
işleme
0fa5a4f7b1
8 değiştirilmiş dosya ile 85 ekleme ve 71 silme
  1. +38
    -30
      db/db_impl.cc
  2. +21
    -20
      db/db_impl.h
  3. +0
    -1
      db/memtable.h
  4. +1
    -1
      db/repair.cc
  5. +3
    -3
      db/table_cache.cc
  6. +2
    -2
      db/table_cache.h
  7. +11
    -7
      helpers/memenv/memenv.cc
  8. +9
    -7
      util/cache.cc

+ 38
- 30
db/db_impl.cc Dosyayı Görüntüle

@ -4,12 +4,14 @@
#include "db/db_impl.h"
#include <stdint.h>
#include <stdio.h>
#include <algorithm>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -82,7 +84,7 @@ struct DBImpl::CompactionState {
};
// Fix user-supplied options to be reasonable
template <class T,class V>
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
@ -114,6 +116,11 @@ Options SanitizeOptions(const std::string& dbname,
return result;
}
static int TableCacheSize(const Options& sanitized_options) {
// Reserve ten files or so for other uses and give the rest to TableCache.
return sanitized_options.max_open_files - kNumNonTableCacheFiles;
}
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(raw_options.env),
internal_comparator_(raw_options.comparator),
@ -123,9 +130,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(NULL),
shutting_down_(NULL),
bg_cv_(&mutex_),
background_work_finished_signal_(&mutex_),
mem_(NULL),
imm_(NULL),
logfile_(NULL),
@ -133,24 +141,19 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
log_(NULL),
seed_(0),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL) {
background_compaction_scheduled_(false),
manual_compaction_(NULL),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_);
}
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_) {
bg_cv_.Wait();
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
mutex_.Unlock();
@ -216,6 +219,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
if (!bg_error_.ok()) {
// After a background error, we don't know whether a new version may
// or may not have been committed, so we cannot safely garbage collect.
@ -227,7 +232,7 @@ void DBImpl::DeleteObsoleteFiles() {
versions_->AddLiveFiles(&live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
@ -263,7 +268,7 @@ void DBImpl::DeleteObsoleteFiles() {
table_cache_->Evict(number);
}
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<int>(type),
static_cast<unsigned long long>(number));
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
@ -575,13 +580,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
@ -609,7 +615,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
manual_compaction_ = &manual;
MaybeScheduleCompaction();
} else { // Running either my compaction or another compaction.
bg_cv_.Wait();
background_work_finished_signal_.Wait();
}
}
if (manual_compaction_ == &manual) {
@ -625,7 +631,7 @@ Status DBImpl::TEST_CompactMemTable() {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
background_work_finished_signal_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
@ -638,13 +644,13 @@ void DBImpl::RecordBackgroundError(const Status& s) {
mutex_.AssertHeld();
if (bg_error_.ok()) {
bg_error_ = s;
bg_cv_.SignalAll();
background_work_finished_signal_.SignalAll();
}
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
@ -655,7 +661,7 @@ void DBImpl::MaybeScheduleCompaction() {
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_ = true;
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
@ -666,7 +672,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
assert(background_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
@ -675,12 +681,12 @@ void DBImpl::BackgroundCall() {
BackgroundCompaction();
}
bg_compaction_scheduled_ = false;
background_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
bg_cv_.SignalAll();
background_work_finished_signal_.SignalAll();
}
void DBImpl::BackgroundCompaction() {
@ -920,7 +926,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
// Wake up MakeRoomForWrite() if necessary.
background_work_finished_signal_.SignalAll();
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
@ -1267,6 +1274,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
@ -1346,11 +1354,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
background_work_finished_signal_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);

+ 21
- 20
db/db_impl.h Dosyayı Görüntüle

@ -84,7 +84,7 @@ class DBImpl : public DB {
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
void DeleteObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
@ -100,14 +100,15 @@ class DBImpl : public DB {
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
WriteBatch* BuildBatchGroup(Writer** last_writer);
WriteBatch* BuildBatchGroup(Writer** last_writer)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
@ -123,12 +124,12 @@ class DBImpl : public DB {
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
bool owns_info_log_;
bool owns_cache_;
const bool owns_info_log_;
const bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
TableCache* table_cache_;
TableCache* const table_cache_;
// Lock over the persistent DB state. Non-NULL iff successfully acquired.
FileLock* db_lock_;
@ -136,27 +137,27 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_;
uint64_t logfile_number_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
uint32_t seed_; // For sampling.
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
std::deque<Writer*> writers_;
WriteBatch* tmp_batch_;
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
SnapshotList snapshots_;
SnapshotList snapshots_ GUARDED_BY(mutex_);
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_;
std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_);
// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;
bool background_compaction_scheduled_ GUARDED_BY(mutex_);
// Information for a manual compaction
struct ManualCompaction {
@ -166,12 +167,12 @@ class DBImpl : public DB {
const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
ManualCompaction* manual_compaction_;
ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
VersionSet* versions_;
VersionSet* const versions_;
// Have we encountered a background error in paranoid mode?
Status bg_error_;
Status bg_error_ GUARDED_BY(mutex_);
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
@ -188,7 +189,7 @@ class DBImpl : public DB {
this->bytes_written += c.bytes_written;
}
};
CompactionStats stats_[config::kNumLevels];
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
// No copying allowed
DBImpl(const DBImpl&);

+ 0
- 1
db/memtable.h Dosyayı Görüntüle

@ -14,7 +14,6 @@
namespace leveldb {
class InternalKeyComparator;
class Mutex;
class MemTableIterator;
class MemTable {

+ 1
- 1
db/repair.cc Dosyayı Görüntüle

@ -54,7 +54,7 @@ class Repairer {
owns_cache_(options_.block_cache != options.block_cache),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, 10);
table_cache_ = new TableCache(dbname_, options_, 10);
}
~Repairer() {

+ 3
- 3
db/table_cache.cc Dosyayı Görüntüle

@ -30,9 +30,9 @@ static void UnrefEntry(void* arg1, void* arg2) {
}
TableCache::TableCache(const std::string& dbname,
const Options* options,
const Options& options,
int entries)
: env_(options->env),
: env_(options.env),
dbname_(dbname),
options_(options),
cache_(NewLRUCache(entries)) {
@ -61,7 +61,7 @@ Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
}
}
if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table);
s = Table::Open(options_, file, file_size, &table);
}
if (!s.ok()) {

+ 2
- 2
db/table_cache.h Dosyayı Görüntüle

@ -20,7 +20,7 @@ class Env;
class TableCache {
public:
TableCache(const std::string& dbname, const Options* options, int entries);
TableCache(const std::string& dbname, const Options& options, int entries);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@ -50,7 +50,7 @@ class TableCache {
private:
Env* const env_;
const std::string dbname_;
const Options* options_;
const Options& options_;
Cache* cache_;
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);

+ 11
- 7
helpers/memenv/memenv.cc Dosyayı Görüntüle

@ -4,14 +4,17 @@
#include "helpers/memenv/memenv.h"
#include <string.h>
#include <map>
#include <string>
#include <vector>
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/mutexlock.h"
#include <map>
#include <string.h>
#include <string>
#include <vector>
namespace leveldb {
@ -135,7 +138,7 @@ class FileState {
void operator=(const FileState&);
port::Mutex refs_mutex_;
int refs_; // Protected by refs_mutex_;
int refs_ GUARDED_BY(refs_mutex_);
// The following fields are not protected by any mutex. They are only mutable
// while the file is being written, and concurrent access is not allowed
@ -312,7 +315,8 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
void DeleteFileInternal(const std::string& fname) {
void DeleteFileInternal(const std::string& fname)
EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
if (file_map_.find(fname) == file_map_.end()) {
return;
}
@ -386,7 +390,7 @@ class InMemoryEnv : public EnvWrapper {
// Map from filenames to FileState objects, representing a simple file system.
typedef std::map<std::string, FileState*> FileSystem;
port::Mutex mutex_;
FileSystem file_map_; // Protected by mutex_.
FileSystem file_map_ GUARDED_BY(mutex_);
};
} // namespace

+ 9
- 7
util/cache.cc Dosyayı Görüntüle

@ -8,6 +8,7 @@
#include "leveldb/cache.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/hash.h"
#include "util/mutexlock.h"
@ -174,25 +175,25 @@ class LRUCache {
void LRU_Append(LRUHandle*list, LRUHandle* e);
void Ref(LRUHandle* e);
void Unref(LRUHandle* e);
bool FinishErase(LRUHandle* e);
bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
mutable port::Mutex mutex_;
size_t usage_;
size_t usage_ GUARDED_BY(mutex_);
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// Entries have refs==1 and in_cache==true.
LRUHandle lru_;
LRUHandle lru_ GUARDED_BY(mutex_);
// Dummy head of in-use list.
// Entries are in use by clients, and have refs >= 2 and in_cache==true.
LRUHandle in_use_;
LRUHandle in_use_ GUARDED_BY(mutex_);
HandleTable table_;
HandleTable table_ GUARDED_BY(mutex_);
};
LRUCache::LRUCache()
@ -227,11 +228,12 @@ void LRUCache::Ref(LRUHandle* e) {
void LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
if (e->refs == 0) { // Deallocate.
if (e->refs == 0) { // Deallocate.
assert(!e->in_cache);
(*e->deleter)(e->key(), e->value);
free(e);
} else if (e->in_cache && e->refs == 1) { // No longer in use; move to lru_ list.
} else if (e->in_cache && e->refs == 1) {
// No longer in use; move to lru_ list.
LRU_Remove(e);
LRU_Append(&lru_, e);
}

Yükleniyor…
İptal
Kaydet