From 3c8be108bfb5fbd7d51f824199627e757279f79e Mon Sep 17 00:00:00 2001 From: Sanjay Ghemawat Date: Wed, 25 Jan 2012 14:56:52 -0800 Subject: [PATCH] fixed issues 66 (leaking files on disk error) and 68 (no sync of CURRENT file) --- db/db_impl.cc | 20 ++++++------------ db/db_test.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++++------- db/filename.cc | 6 +++++- util/env.cc | 18 ++++++++++++++-- util/env_test.cc | 22 +++++++++++--------- 5 files changed, 95 insertions(+), 34 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index b4df80d..7b268ea 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -655,6 +655,8 @@ void DBImpl::BackgroundCompaction() { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); CleanupCompaction(compact); + c->ReleaseInputs(); + DeleteObsoleteFiles(); } delete c; @@ -672,6 +674,9 @@ void DBImpl::BackgroundCompaction() { if (is_manual) { ManualCompaction* m = manual_compaction_; + if (!status.ok()) { + m->done = true; + } if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. @@ -793,21 +798,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->edit()->AddFile( level + 1, out.number, out.file_size, out.smallest, out.largest); - pending_outputs_.erase(out.number); } - compact->outputs.clear(); - - Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_); - if (s.ok()) { - compact->compaction->ReleaseInputs(); - DeleteObsoleteFiles(); - } else { - // Discard any files we may have created during this failed compaction - for (size_t i = 0; i < compact->outputs.size(); i++) { - env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number)); - } - } - return s; + return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } Status DBImpl::DoCompactionWork(CompactionState* compact) { diff --git a/db/db_test.cc b/db/db_test.cc index 5dc3b02..8318885 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -28,8 +28,12 @@ class SpecialEnv : public EnvWrapper { // sstable Sync() calls are blocked while this pointer is non-NULL. port::AtomicPointer delay_sstable_sync_; + // Simulate no-space errors while this pointer is non-NULL. + port::AtomicPointer no_space_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(NULL); + no_space_.Release_Store(NULL); } Status NewWritableFile(const std::string& f, WritableFile** r) { @@ -44,7 +48,14 @@ class SpecialEnv : public EnvWrapper { base_(base) { } ~SSTableFile() { delete base_; } - Status Append(const Slice& data) { return base_->Append(data); } + Status Append(const Slice& data) { + if (env_->no_space_.Acquire_Load() != NULL) { + // Drop writes on the floor + return Status::OK(); + } else { + return base_->Append(data); + } + } Status Close() { return base_->Close(); } Status Flush() { return base_->Flush(); } Status Sync() { @@ -239,6 +250,12 @@ class DBTest { return result; } + int CountFiles() { + std::vector files; + env_->GetChildren(dbname_, &files); + return static_cast(files.size()); + } + uint64_t Size(const Slice& start, const Slice& limit) { Range r(start, limit); uint64_t size; @@ -1266,6 +1283,37 @@ TEST(DBTest, DBOpen_Options) { db = NULL; } +// Check that number of files does not grow when we are out of space +TEST(DBTest, NoSpace) { + Options options; + options.env = env_; + Reopen(&options); + + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + Compact("a", "z"); + const int num_files = CountFiles(); + env_->no_space_.Release_Store(env_); // Force out-of-space errors + for (int i = 0; i < 10; i++) { + for (int level = 0; level < config::kNumLevels-1; level++) { + dbfull()->TEST_CompactRange(level, NULL, NULL); + } + } + env_->no_space_.Release_Store(NULL); + ASSERT_LT(CountFiles(), num_files + 5); +} + +TEST(DBTest, FilesDeletedAfterCompaction) { + ASSERT_OK(Put("foo", "v2")); + Compact("a", "z"); + const int num_files = CountFiles(); + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("foo", "v2")); + Compact("a", "z"); + } + ASSERT_EQ(CountFiles(), num_files); +} + // Multi-threaded test: namespace { @@ -1287,14 +1335,15 @@ struct MTThread { static void MTThreadBody(void* arg) { MTThread* t = reinterpret_cast(arg); + int id = t->id; DB* db = t->state->test->db_; uintptr_t counter = 0; - fprintf(stderr, "... starting thread %d\n", t->id); - Random rnd(1000 + t->id); + fprintf(stderr, "... starting thread %d\n", id); + Random rnd(1000 + id); std::string value; char valbuf[1500]; while (t->state->stop.Acquire_Load() == NULL) { - t->state->counter[t->id].Release_Store(reinterpret_cast(counter)); + t->state->counter[id].Release_Store(reinterpret_cast(counter)); int key = rnd.Uniform(kNumKeys); char keybuf[20]; @@ -1304,7 +1353,7 @@ static void MTThreadBody(void* arg) { // Write values of the form . // We add some padding for force compactions. snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d", - key, t->id, static_cast(counter)); + key, id, static_cast(counter)); ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf))); } else { // Read a value and verify that it matches the pattern written above. @@ -1325,8 +1374,8 @@ static void MTThreadBody(void* arg) { } counter++; } - t->state->thread_done[t->id].Release_Store(t); - fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter)); + t->state->thread_done[id].Release_Store(t); + fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter)); } } // namespace diff --git a/db/filename.cc b/db/filename.cc index 24fd140..3c4d49f 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -11,6 +11,10 @@ namespace leveldb { +// A utility routine: write "data" to the named file and Sync() it. +extern Status WriteStringToFileSync(Env* env, const Slice& data, + const std::string& fname); + static std::string MakeFileName(const std::string& name, uint64_t number, const char* suffix) { char buf[100]; @@ -122,7 +126,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname, assert(contents.starts_with(dbname + "/")); contents.remove_prefix(dbname.size() + 1); std::string tmp = TempFileName(dbname, descriptor_number); - Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp); + Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp); if (s.ok()) { s = env->RenameFile(tmp, CurrentFileName(dbname)); } diff --git a/util/env.cc b/util/env.cc index 594811b..c2600e9 100644 --- a/util/env.cc +++ b/util/env.cc @@ -33,14 +33,18 @@ void Log(Logger* info_log, const char* format, ...) { } } -Status WriteStringToFile(Env* env, const Slice& data, - const std::string& fname) { +static Status DoWriteStringToFile(Env* env, const Slice& data, + const std::string& fname, + bool should_sync) { WritableFile* file; Status s = env->NewWritableFile(fname, &file); if (!s.ok()) { return s; } s = file->Append(data); + if (s.ok() && should_sync) { + s = file->Sync(); + } if (s.ok()) { s = file->Close(); } @@ -51,6 +55,16 @@ Status WriteStringToFile(Env* env, const Slice& data, return s; } +Status WriteStringToFile(Env* env, const Slice& data, + const std::string& fname) { + return DoWriteStringToFile(env, data, fname, false); +} + +Status WriteStringToFileSync(Env* env, const Slice& data, + const std::string& fname) { + return DoWriteStringToFile(env, data, fname, true); +} + Status ReadFileToString(Env* env, const std::string& fname, std::string* data) { data->clear(); SequentialFile* file; diff --git a/util/env_test.cc b/util/env_test.cc index 3f8a8a2..b72cb44 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -22,29 +22,30 @@ class EnvPosixTest { }; static void SetBool(void* ptr) { - *(reinterpret_cast(ptr)) = true; + reinterpret_cast(ptr)->NoBarrier_Store(ptr); } TEST(EnvPosixTest, RunImmediately) { - bool called = false; + port::AtomicPointer called (NULL); env_->Schedule(&SetBool, &called); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_TRUE(called); + ASSERT_TRUE(called.NoBarrier_Load() != NULL); } TEST(EnvPosixTest, RunMany) { - int last_id = 0; + port::AtomicPointer last_id (NULL); struct CB { - int* last_id_ptr; // Pointer to shared slot - int id; // Order# for the execution of this callback + port::AtomicPointer* last_id_ptr; // Pointer to shared slot + uintptr_t id; // Order# for the execution of this callback - CB(int* p, int i) : last_id_ptr(p), id(i) { } + CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { } static void Run(void* v) { CB* cb = reinterpret_cast(v); - ASSERT_EQ(cb->id-1, *cb->last_id_ptr); - *cb->last_id_ptr = cb->id; + void* cur = cb->last_id_ptr->NoBarrier_Load(); + ASSERT_EQ(cb->id-1, reinterpret_cast(cur)); + cb->last_id_ptr->Release_Store(reinterpret_cast(cb->id)); } }; @@ -59,7 +60,8 @@ TEST(EnvPosixTest, RunMany) { env_->Schedule(&CB::Run, &cb4); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_EQ(4, last_id); + void* cur = last_id.Acquire_Load(); + ASSERT_EQ(4, reinterpret_cast(cur)); } struct State {