Sfoglia il codice sorgente

sync with upstream @21706995

Fixed race condition reported by Dave Smit (dizzyd@dizzyd,com)
on the leveldb mailing list.  We were not signalling
waiters after a trivial move from level-0.  The result was
that in some cases (hard to reproduce), a write would get
stuck forever waiting for the number of level-0 files to drop
below its hard limit.

The new code is simpler: there is just one condition variable
instead of two, and the condition variable is signalled after
every piece of background work finishes.  Also, all compaction
work (including for manual compactions) is done in the
background thread, and therefore we can remove the
"compacting_" variable.



git-svn-id: https://leveldb.googlecode.com/svn/trunk@31 62dab493-f737-651d-591e-8d6aee1b9529
naive_version
hans@chromium.org 13 anni fa
parent
commit
80e5b0d944
2 ha cambiato i file con 53 aggiunte e 50 eliminazioni
  1. +45
    -46
      db/db_impl.cc
  2. +8
    -4
      db/db_impl.h

+ 45
- 46
db/db_impl.cc Vedi File

@ -119,13 +119,12 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
db_lock_(NULL), db_lock_(NULL),
shutting_down_(NULL), shutting_down_(NULL),
bg_cv_(&mutex_), bg_cv_(&mutex_),
compacting_cv_(&mutex_),
mem_(new MemTable(internal_comparator_)), mem_(new MemTable(internal_comparator_)),
imm_(NULL), imm_(NULL),
logfile_(NULL), logfile_(NULL),
log_(NULL), log_(NULL),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
compacting_(false) {
manual_compaction_(NULL) {
mem_->Ref(); mem_->Ref();
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
@ -141,10 +140,8 @@ DBImpl::~DBImpl() {
// Wait for background work to finish // Wait for background work to finish
mutex_.Lock(); mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok shutting_down_.Release_Store(this); // Any non-NULL value is ok
if (bg_compaction_scheduled_) {
while (bg_compaction_scheduled_) {
bg_cv_.Wait();
}
while (bg_compaction_scheduled_) {
bg_cv_.Wait();
} }
mutex_.Unlock(); mutex_.Unlock();
@ -437,7 +434,6 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
Status DBImpl::CompactMemTable() { Status DBImpl::CompactMemTable() {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_ != NULL); assert(imm_ != NULL);
assert(compacting_);
// Save the contents of the memtable as a new Table // Save the contents of the memtable as a new Table
VersionEdit edit; VersionEdit edit;
@ -457,7 +453,6 @@ Status DBImpl::CompactMemTable() {
DeleteObsoleteFiles(); DeleteObsoleteFiles();
} }
compacting_cv_.SignalAll(); // Wake up waiter even if there was an error
return s; return s;
} }
@ -466,22 +461,18 @@ void DBImpl::TEST_CompactRange(
const std::string& begin, const std::string& begin,
const std::string& end) { const std::string& end) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (compacting_) {
compacting_cv_.Wait();
while (manual_compaction_ != NULL) {
bg_cv_.Wait();
} }
Compaction* c = versions_->CompactRange(
level,
InternalKey(begin, kMaxSequenceNumber, kValueTypeForSeek),
InternalKey(end, 0, static_cast<ValueType>(0)));
if (c != NULL) {
CompactionState* compact = new CompactionState(c);
DoCompactionWork(compact); // Ignore error in test compaction
CleanupCompaction(compact);
}
// Start any background compaction that may have been delayed by this thread
ManualCompaction manual;
manual.level = level;
manual.begin = begin;
manual.end = end;
manual_compaction_ = &manual;
MaybeScheduleCompaction(); MaybeScheduleCompaction();
while (manual_compaction_ == &manual) {
bg_cv_.Wait();
}
} }
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
@ -490,7 +481,7 @@ Status DBImpl::TEST_CompactMemTable() {
if (s.ok()) { if (s.ok()) {
// Wait until the compaction completes // Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) { while (imm_ != NULL && bg_error_.ok()) {
compacting_cv_.Wait();
bg_cv_.Wait();
} }
if (imm_ != NULL) { if (imm_ != NULL) {
s = bg_error_; s = bg_error_;
@ -503,11 +494,11 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (bg_compaction_scheduled_) { if (bg_compaction_scheduled_) {
// Already scheduled // Already scheduled
} else if (compacting_) {
// Some other thread is running a compaction. Do not conflict with it.
} else if (shutting_down_.Acquire_Load()) { } else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
} else if (imm_ == NULL && !versions_->NeedsCompaction()) {
} else if (imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done // No work to be done
} else { } else {
bg_compaction_scheduled_ = true; bg_compaction_scheduled_ = true;
@ -522,38 +513,41 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() { void DBImpl::BackgroundCall() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load() &&
!compacting_) {
if (!shutting_down_.Acquire_Load()) {
BackgroundCompaction(); BackgroundCompaction();
} }
bg_compaction_scheduled_ = false; bg_compaction_scheduled_ = false;
bg_cv_.SignalAll();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed. // so reschedule another compaction if needed.
MaybeScheduleCompaction(); MaybeScheduleCompaction();
bg_cv_.SignalAll();
} }
void DBImpl::BackgroundCompaction() { void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(!compacting_);
if (imm_ != NULL) { if (imm_ != NULL) {
compacting_ = true;
CompactMemTable(); CompactMemTable();
compacting_ = false;
compacting_cv_.SignalAll();
return; return;
} }
Compaction* c = versions_->PickCompaction();
if (c == NULL) {
// Nothing to do
return;
Compaction* c;
bool is_manual = (manual_compaction_ != NULL);
if (is_manual) {
const ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(
m->level,
InternalKey(m->begin, kMaxSequenceNumber, kValueTypeForSeek),
InternalKey(m->end, 0, static_cast<ValueType>(0)));
} else {
c = versions_->PickCompaction();
} }
Status status; Status status;
if (c->IsTrivialMove()) {
if (c == NULL) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level // Move file to next level
assert(c->num_input_files(0) == 1); assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0); FileMetaData* f = c->input(0, 0);
@ -561,11 +555,13 @@ void DBImpl::BackgroundCompaction() {
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest); f->smallest, f->largest);
status = versions_->LogAndApply(c->edit()); status = versions_->LogAndApply(c->edit());
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
VersionSet::LevelSummaryStorage tmp;
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), static_cast<unsigned long long>(f->number),
c->level() + 1, c->level() + 1,
static_cast<unsigned long long>(f->file_size), static_cast<unsigned long long>(f->file_size),
status.ToString().c_str());
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
} else { } else {
CompactionState* compact = new CompactionState(c); CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); status = DoCompactionWork(compact);
@ -584,6 +580,11 @@ void DBImpl::BackgroundCompaction() {
bg_error_ = status; bg_error_ = status;
} }
} }
if (is_manual) {
// Mark it as done
manual_compaction_ = NULL;
}
} }
void DBImpl::CleanupCompaction(CompactionState* compact) { void DBImpl::CleanupCompaction(CompactionState* compact) {
@ -734,7 +735,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
// Release mutex while we're actually doing the compaction work // Release mutex while we're actually doing the compaction work
compacting_ = true;
mutex_.Unlock(); mutex_.Unlock();
Iterator* input = versions_->MakeInputIterator(compact->compaction); Iterator* input = versions_->MakeInputIterator(compact->compaction);
@ -751,7 +751,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
mutex_.Lock(); mutex_.Lock();
if (imm_ != NULL) { if (imm_ != NULL) {
CompactMemTable(); CompactMemTable();
compacting_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} }
mutex_.Unlock(); mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start); imm_micros += (env_->NowMicros() - imm_start);
@ -867,8 +867,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
} }
compacting_ = false;
compacting_cv_.SignalAll();
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(env_, options_.info_log, Log(env_, options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp)); "compacted to: %s", versions_->LevelSummary(&tmp));
@ -1038,10 +1036,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else if (imm_ != NULL) { } else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous // We have filled up the current memtable, but the previous
// one is still being compacted, so we wait. // one is still being compacted, so we wait.
compacting_cv_.Wait();
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files. // There are too many level-0 files.
compacting_cv_.Wait();
Log(env_, options_.info_log, "waiting...\n");
bg_cv_.Wait();
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0); assert(versions_->PrevLogNumber() == 0);

+ 8
- 4
db/db_impl.h Vedi File

@ -119,8 +119,7 @@ class DBImpl : public DB {
// State below is protected by mutex_ // State below is protected by mutex_
port::Mutex mutex_; port::Mutex mutex_;
port::AtomicPointer shutting_down_; port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_
port::CondVar compacting_cv_; // Signalled when !compacting_
port::CondVar bg_cv_; // Signalled when background work finishes
MemTable* mem_; MemTable* mem_;
MemTable* imm_; // Memtable being compacted MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
@ -135,8 +134,13 @@ class DBImpl : public DB {
// Has a background compaction been scheduled or is running? // Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_; bool bg_compaction_scheduled_;
// Is there a compaction running?
bool compacting_;
// Information for a manual compaction
struct ManualCompaction {
int level;
std::string begin;
std::string end;
};
ManualCompaction* manual_compaction_;
VersionSet* versions_; VersionSet* versions_;

Caricamento…
Annulla
Salva