Browse Source

fixed issues 66 (leaking files on disk error) and 68 (no sync of CURRENT file)

naive_version
Sanjay Ghemawat 13 years ago
parent
commit
3c8be108bf
5 changed files with 95 additions and 34 deletions
  1. +6
    -14
      db/db_impl.cc
  2. +56
    -7
      db/db_test.cc
  3. +5
    -1
      db/filename.cc
  4. +16
    -2
      util/env.cc
  5. +12
    -10
      util/env_test.cc

+ 6
- 14
db/db_impl.cc View File

@ -655,6 +655,8 @@ void DBImpl::BackgroundCompaction() {
CompactionState* compact = new CompactionState(c); CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); status = DoCompactionWork(compact);
CleanupCompaction(compact); CleanupCompaction(compact);
c->ReleaseInputs();
DeleteObsoleteFiles();
} }
delete c; delete c;
@ -672,6 +674,9 @@ void DBImpl::BackgroundCompaction() {
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction_;
if (!status.ok()) {
m->done = true;
}
if (!m->done) { if (!m->done) {
// We only compacted part of the requested range. Update *m // We only compacted part of the requested range. Update *m
// to the range that is left to be compacted. // to the range that is left to be compacted.
@ -793,21 +798,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->edit()->AddFile( compact->compaction->edit()->AddFile(
level + 1, level + 1,
out.number, out.file_size, out.smallest, out.largest); out.number, out.file_size, out.smallest, out.largest);
pending_outputs_.erase(out.number);
} }
compact->outputs.clear();
Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
} else {
// Discard any files we may have created during this failed compaction
for (size_t i = 0; i < compact->outputs.size(); i++) {
env_->DeleteFile(TableFileName(dbname_, compact->outputs[i].number));
}
}
return s;
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
} }
Status DBImpl::DoCompactionWork(CompactionState* compact) { Status DBImpl::DoCompactionWork(CompactionState* compact) {

+ 56
- 7
db/db_test.cc View File

@ -28,8 +28,12 @@ class SpecialEnv : public EnvWrapper {
// sstable Sync() calls are blocked while this pointer is non-NULL. // sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_; port::AtomicPointer delay_sstable_sync_;
// Simulate no-space errors while this pointer is non-NULL.
port::AtomicPointer no_space_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) { explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL); delay_sstable_sync_.Release_Store(NULL);
no_space_.Release_Store(NULL);
} }
Status NewWritableFile(const std::string& f, WritableFile** r) { Status NewWritableFile(const std::string& f, WritableFile** r) {
@ -44,7 +48,14 @@ class SpecialEnv : public EnvWrapper {
base_(base) { base_(base) {
} }
~SSTableFile() { delete base_; } ~SSTableFile() { delete base_; }
Status Append(const Slice& data) { return base_->Append(data); }
Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != NULL) {
// Drop writes on the floor
return Status::OK();
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); } Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); } Status Flush() { return base_->Flush(); }
Status Sync() { Status Sync() {
@ -239,6 +250,12 @@ class DBTest {
return result; return result;
} }
int CountFiles() {
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
return static_cast<int>(files.size());
}
uint64_t Size(const Slice& start, const Slice& limit) { uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit); Range r(start, limit);
uint64_t size; uint64_t size;
@ -1266,6 +1283,37 @@ TEST(DBTest, DBOpen_Options) {
db = NULL; db = NULL;
} }
// Check that number of files does not grow when we are out of space
TEST(DBTest, NoSpace) {
Options options;
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL);
}
}
env_->no_space_.Release_Store(NULL);
ASSERT_LT(CountFiles(), num_files + 5);
}
TEST(DBTest, FilesDeletedAfterCompaction) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
const int num_files = CountFiles();
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("foo", "v2"));
Compact("a", "z");
}
ASSERT_EQ(CountFiles(), num_files);
}
// Multi-threaded test: // Multi-threaded test:
namespace { namespace {
@ -1287,14 +1335,15 @@ struct MTThread {
static void MTThreadBody(void* arg) { static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg); MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_; DB* db = t->state->test->db_;
uintptr_t counter = 0; uintptr_t counter = 0;
fprintf(stderr, "... starting thread %d\n", t->id);
Random rnd(1000 + t->id);
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value; std::string value;
char valbuf[1500]; char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) { while (t->state->stop.Acquire_Load() == NULL) {
t->state->counter[t->id].Release_Store(reinterpret_cast<void*>(counter));
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
int key = rnd.Uniform(kNumKeys); int key = rnd.Uniform(kNumKeys);
char keybuf[20]; char keybuf[20];
@ -1304,7 +1353,7 @@ static void MTThreadBody(void* arg) {
// Write values of the form <key, my id, counter>. // Write values of the form <key, my id, counter>.
// We add some padding for force compactions. // We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d", snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, t->id, static_cast<int>(counter));
key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf))); ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else { } else {
// Read a value and verify that it matches the pattern written above. // Read a value and verify that it matches the pattern written above.
@ -1325,8 +1374,8 @@ static void MTThreadBody(void* arg) {
} }
counter++; counter++;
} }
t->state->thread_done[t->id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter));
t->state->thread_done[id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
} }
} // namespace } // namespace

+ 5
- 1
db/filename.cc View File

@ -11,6 +11,10 @@
namespace leveldb { namespace leveldb {
// A utility routine: write "data" to the named file and Sync() it.
extern Status WriteStringToFileSync(Env* env, const Slice& data,
const std::string& fname);
static std::string MakeFileName(const std::string& name, uint64_t number, static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) { const char* suffix) {
char buf[100]; char buf[100];
@ -122,7 +126,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
assert(contents.starts_with(dbname + "/")); assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1); contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number); std::string tmp = TempFileName(dbname, descriptor_number);
Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp);
Status s = WriteStringToFileSync(env, contents.ToString() + "\n", tmp);
if (s.ok()) { if (s.ok()) {
s = env->RenameFile(tmp, CurrentFileName(dbname)); s = env->RenameFile(tmp, CurrentFileName(dbname));
} }

+ 16
- 2
util/env.cc View File

@ -33,14 +33,18 @@ void Log(Logger* info_log, const char* format, ...) {
} }
} }
Status WriteStringToFile(Env* env, const Slice& data,
const std::string& fname) {
static Status DoWriteStringToFile(Env* env, const Slice& data,
const std::string& fname,
bool should_sync) {
WritableFile* file; WritableFile* file;
Status s = env->NewWritableFile(fname, &file); Status s = env->NewWritableFile(fname, &file);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
s = file->Append(data); s = file->Append(data);
if (s.ok() && should_sync) {
s = file->Sync();
}
if (s.ok()) { if (s.ok()) {
s = file->Close(); s = file->Close();
} }
@ -51,6 +55,16 @@ Status WriteStringToFile(Env* env, const Slice& data,
return s; return s;
} }
Status WriteStringToFile(Env* env, const Slice& data,
const std::string& fname) {
return DoWriteStringToFile(env, data, fname, false);
}
Status WriteStringToFileSync(Env* env, const Slice& data,
const std::string& fname) {
return DoWriteStringToFile(env, data, fname, true);
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) { Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
data->clear(); data->clear();
SequentialFile* file; SequentialFile* file;

+ 12
- 10
util/env_test.cc View File

@ -22,29 +22,30 @@ class EnvPosixTest {
}; };
static void SetBool(void* ptr) { static void SetBool(void* ptr) {
*(reinterpret_cast<bool*>(ptr)) = true;
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
} }
TEST(EnvPosixTest, RunImmediately) { TEST(EnvPosixTest, RunImmediately) {
bool called = false;
port::AtomicPointer called (NULL);
env_->Schedule(&SetBool, &called); env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called);
ASSERT_TRUE(called.NoBarrier_Load() != NULL);
} }
TEST(EnvPosixTest, RunMany) { TEST(EnvPosixTest, RunMany) {
int last_id = 0;
port::AtomicPointer last_id (NULL);
struct CB { struct CB {
int* last_id_ptr; // Pointer to shared slot
int id; // Order# for the execution of this callback
port::AtomicPointer* last_id_ptr; // Pointer to shared slot
uintptr_t id; // Order# for the execution of this callback
CB(int* p, int i) : last_id_ptr(p), id(i) { }
CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }
static void Run(void* v) { static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v); CB* cb = reinterpret_cast<CB*>(v);
ASSERT_EQ(cb->id-1, *cb->last_id_ptr);
*cb->last_id_ptr = cb->id;
void* cur = cb->last_id_ptr->NoBarrier_Load();
ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
} }
}; };
@ -59,7 +60,8 @@ TEST(EnvPosixTest, RunMany) {
env_->Schedule(&CB::Run, &cb4); env_->Schedule(&CB::Run, &cb4);
Env::Default()->SleepForMicroseconds(kDelayMicros); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(4, last_id);
void* cur = last_id.Acquire_Load();
ASSERT_EQ(4, reinterpret_cast<uintptr_t>(cur));
} }
struct State { struct State {

Loading…
Cancel
Save