diff --git a/NEWS b/NEWS new file mode 100644 index 0000000..3fd9924 --- /dev/null +++ b/NEWS @@ -0,0 +1,17 @@ +Release 1.2 2011-05-16 +---------------------- + +Fixes for larger databases (tested up to one billion 100-byte entries, +i.e., ~100GB). + +(1) Place hard limit on number of level-0 files. This fixes errors +of the form "too many open files". + +(2) Fixed memtable management. Before the fix, a heavy write burst +could cause unbounded memory usage. + +A fix for a logging bug where the reader would incorrectly complain +about corruption. + +Allow public access to WriteBatch contents so that users can easily +wrap a DB. diff --git a/db/db_bench.cc b/db/db_bench.cc index d1cbdc0..b5fd679 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -24,9 +24,10 @@ // overwrite -- overwrite N values in random key order in async mode // fillsync -- write N/100 values in random key order in sync mode // fill100K -- write N/1000 100K values in random order in async mode -// readseq -- read N values sequentially -// readreverse -- read N values in reverse order -// readrandom -- read N values in random order +// readseq -- read N times sequentially +// readreverse -- read N times in reverse order +// readrandom -- read N times in random order +// readhot -- read N times in random order from 1% section of DB // crc32c -- repeated crc32c of 4K of data // Meta operations: // compact -- Compact the entire DB @@ -54,6 +55,9 @@ static const char* FLAGS_benchmarks = // Number of key/values to place in database static int FLAGS_num = 1000000; +// Number of read operations to do. If negative, do FLAGS_num reads. +static int FLAGS_reads = -1; + // Size of each value static int FLAGS_value_size = 100; @@ -72,6 +76,14 @@ static int FLAGS_write_buffer_size = 0; // Negative means use default settings. static int FLAGS_cache_size = -1; +// Maximum number of files to keep open at the same time (use default if == 0) +static int FLAGS_open_files = 0; + +// If true, do not destroy the existing database. If you set this +// flag and also specify a benchmark that wants a fresh database, that +// benchmark will fail. +static bool FLAGS_use_existing_db = false; + namespace leveldb { // Helper for quickly generating random data. @@ -126,6 +138,7 @@ class Benchmark { Cache* cache_; DB* db_; int num_; + int reads_; int heap_counter_; double start_; double last_op_finish_; @@ -298,6 +311,7 @@ class Benchmark { : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), db_(NULL), num_(FLAGS_num), + reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), heap_counter_(0), bytes_(0), rand_(301) { @@ -308,7 +322,9 @@ class Benchmark { Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]); } } - DestroyDB("/tmp/dbbench", Options()); + if (!FLAGS_use_existing_db) { + DestroyDB("/tmp/dbbench", Options()); + } } ~Benchmark() { @@ -355,11 +371,13 @@ class Benchmark { ReadReverse(); } else if (name == Slice("readrandom")) { ReadRandom(); + } else if (name == Slice("readhot")) { + ReadHot(); } else if (name == Slice("readrandomsmall")) { - int n = num_; - num_ /= 1000; + int n = reads_; + reads_ /= 1000; ReadRandom(); - num_ = n; + reads_ = n; } else if (name == Slice("compact")) { Compact(); } else if (name == Slice("crc32c")) { @@ -449,7 +467,7 @@ class Benchmark { void Open() { assert(db_ == NULL); Options options; - options.create_if_missing = true; + options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; Status s = DB::Open(options, "/tmp/dbbench", &db_); @@ -462,6 +480,10 @@ class Benchmark { void Write(const WriteOptions& options, Order order, DBState state, int num_entries, int value_size, int entries_per_batch) { if (state == FRESH) { + if (FLAGS_use_existing_db) { + message_ = "skipping (--use_existing_db is true)"; + return; + } delete db_; db_ = NULL; DestroyDB("/tmp/dbbench", Options()); @@ -499,7 +521,7 @@ class Benchmark { void ReadSequential() { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; - for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) { + for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes_ += iter->key().size() + iter->value().size(); FinishedSingleOp(); ++i; @@ -510,7 +532,7 @@ class Benchmark { void ReadReverse() { Iterator* iter = db_->NewIterator(ReadOptions()); int i = 0; - for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) { + for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes_ += iter->key().size() + iter->value().size(); FinishedSingleOp(); ++i; @@ -521,7 +543,7 @@ class Benchmark { void ReadRandom() { ReadOptions options; std::string value; - for (int i = 0; i < num_; i++) { + for (int i = 0; i < reads_; i++) { char key[100]; const int k = rand_.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d", k); @@ -530,6 +552,19 @@ class Benchmark { } } + void ReadHot() { + ReadOptions options; + std::string value; + const int range = (FLAGS_num + 99) / 100; + for (int i = 0; i < reads_; i++) { + char key[100]; + const int k = rand_.Next() % range; + snprintf(key, sizeof(key), "%016d", k); + db_->Get(options, key, &value); + FinishedSingleOp(); + } + } + void Compact() { DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); @@ -582,6 +617,8 @@ class Benchmark { int main(int argc, char** argv) { FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; + FLAGS_open_files = leveldb::Options().max_open_files; + for (int i = 1; i < argc; i++) { double d; int n; @@ -593,14 +630,21 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_histogram = n; + } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_use_existing_db = n; } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { FLAGS_num = n; + } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { + FLAGS_reads = n; } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { FLAGS_value_size = n; } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { FLAGS_write_buffer_size = n; } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { FLAGS_cache_size = n; + } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { + FLAGS_open_files = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b9e04e..baf9299 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -126,6 +126,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), bg_compaction_scheduled_(false), compacting_(false) { + mem_->Ref(); has_imm_.Release_Store(NULL); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -152,8 +153,8 @@ DBImpl::~DBImpl() { } delete versions_; - delete mem_; - delete imm_; + if (mem_ != NULL) mem_->Unref(); + if (imm_ != NULL) imm_->Unref(); delete log_; delete logfile_; delete table_cache_; @@ -344,7 +345,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). - log::Reader reader(file, &reporter, true/*checksum*/); + log::Reader reader(file, &reporter, true/*checksum*/, + 0/*initial_offset*/); Log(env_, options_.info_log, "Recovering log #%llu", (unsigned long long) log_number); @@ -364,6 +366,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, if (mem == NULL) { mem = new MemTable(internal_comparator_); + mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem); MaybeIgnoreError(&status); @@ -384,7 +387,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // file-systems cause the DB::Open() to fail. break; } - delete mem; + mem->Unref(); mem = NULL; } } @@ -395,7 +398,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, // file-systems cause the DB::Open() to fail. } - delete mem; + if (mem != NULL) mem->Unref(); delete file; return status; } @@ -443,11 +446,12 @@ Status DBImpl::CompactMemTable() { // Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); - s = versions_->LogAndApply(&edit, imm_); + s = versions_->LogAndApply(&edit); } if (s.ok()) { // Commit to the new state + imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); @@ -556,7 +560,7 @@ void DBImpl::BackgroundCompaction() { c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); - status = versions_->LogAndApply(c->edit(), NULL); + status = versions_->LogAndApply(c->edit()); Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n", static_cast(f->number), c->level() + 1, @@ -697,7 +701,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { } compact->outputs.clear(); - Status s = versions_->LogAndApply(compact->compaction->edit(), NULL); + Status s = versions_->LogAndApply(compact->compaction->edit()); if (s.ok()) { compact->compaction->ReleaseInputs(); DeleteObsoleteFiles(); @@ -754,9 +758,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } Slice key = input->key(); - InternalKey tmp_internal_key; - tmp_internal_key.DecodeFrom(key); - if (compact->compaction->ShouldStopBefore(tmp_internal_key) && + if (compact->compaction->ShouldStopBefore(key) && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { @@ -867,6 +869,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } compacting_ = false; compacting_cv_.SignalAll(); + VersionSet::LevelSummaryStorage tmp; + Log(env_, options_.info_log, + "compacted to: %s", versions_->LevelSummary(&tmp)); return status; } @@ -925,10 +930,11 @@ Status DBImpl::Get(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); - SequenceNumber sequence = - (options.snapshot ? options.snapshot->number_ : latest_snapshot); - return NewDBIterator(&dbname_, env_, - user_comparator(), internal_iter, sequence); + return NewDBIterator( + &dbname_, env_, user_comparator(), internal_iter, + (options.snapshot != NULL + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot)); } void DBImpl::Unref(void* arg1, void* arg2) { @@ -945,7 +951,7 @@ const Snapshot* DBImpl::GetSnapshot() { void DBImpl::ReleaseSnapshot(const Snapshot* s) { MutexLock l(&mutex_); - snapshots_.Delete(s); + snapshots_.Delete(reinterpret_cast(s)); } // Convenience methods @@ -985,12 +991,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + bool allow_delay = !force; Status s; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; + } else if ( + allow_delay && + versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { + // We are getting close to hitting a hard limit on the number of + // L0 files. Rather than delaying a single write by several + // seconds when we hit the hard limit, start delaying each + // individual write by 1ms to reduce latency variance. Also, + // this delay hands over some CPU to the compaction thread in + // case it is sharing the same core as the writer. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + allow_delay = false; // Do not delay a single write more than once + mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // There is room in current memtable @@ -999,6 +1019,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. compacting_cv_.Wait(); + } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { + // There are too many level-0 files. + compacting_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); @@ -1011,7 +1034,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { VersionEdit edit; edit.SetPrevLogNumber(versions_->LogNumber()); edit.SetLogNumber(new_log_number); - s = versions_->LogAndApply(&edit, NULL); + s = versions_->LogAndApply(&edit); if (!s.ok()) { delete lfile; env_->DeleteFile(LogFileName(dbname_, new_log_number)); @@ -1024,6 +1047,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { imm_ = mem_; has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); + mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); } @@ -1141,10 +1165,11 @@ Status DB::Open(const Options& options, const std::string& dbname, edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->log_ = new log::Writer(lfile); - s = impl->versions_->LogAndApply(&edit, NULL); + s = impl->versions_->LogAndApply(&edit); } if (s.ok()) { impl->DeleteObsoleteFiles(); + impl->MaybeScheduleCompaction(); } } impl->mutex_.Unlock(); @@ -1156,6 +1181,9 @@ Status DB::Open(const Options& options, const std::string& dbname, return s; } +Snapshot::~Snapshot() { +} + Status DestroyDB(const std::string& dbname, const Options& options) { Env* env = options.env; std::vector filenames; diff --git a/db/db_test.cc b/db/db_test.cc index f828e3d..06565b2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3,7 +3,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/db.h" - #include "db/db_impl.h" #include "db/filename.h" #include "db/version_set.h" @@ -802,8 +801,17 @@ TEST(DBTest, DBOpen_Options) { db = NULL; } +namespace { +typedef std::map KVMap; +} + class ModelDB: public DB { public: + class ModelSnapshot : public Snapshot { + public: + KVMap map_; + }; + explicit ModelDB(const Options& options): options_(options) { } ~ModelDB() { } virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) { @@ -824,35 +832,34 @@ class ModelDB: public DB { return new ModelIter(saved, true); } else { const KVMap* snapshot_state = - reinterpret_cast(options.snapshot->number_); + &(reinterpret_cast(options.snapshot)->map_); return new ModelIter(snapshot_state, false); } } virtual const Snapshot* GetSnapshot() { - KVMap* saved = new KVMap; - *saved = map_; - return snapshots_.New( - reinterpret_cast(saved)); + ModelSnapshot* snapshot = new ModelSnapshot; + snapshot->map_ = map_; + return snapshot; } virtual void ReleaseSnapshot(const Snapshot* snapshot) { - const KVMap* saved = reinterpret_cast(snapshot->number_); - delete saved; - snapshots_.Delete(snapshot); + delete reinterpret_cast(snapshot); } virtual Status Write(const WriteOptions& options, WriteBatch* batch) { assert(options.post_write_snapshot == NULL); // Not supported - for (WriteBatchInternal::Iterator it(*batch); !it.Done(); it.Next()) { - switch (it.op()) { - case kTypeValue: - map_[it.key().ToString()] = it.value().ToString(); - break; - case kTypeDeletion: - map_.erase(it.key().ToString()); - break; + class Handler : public WriteBatch::Handler { + public: + KVMap* map_; + virtual void Put(const Slice& key, const Slice& value) { + (*map_)[key.ToString()] = value.ToString(); } - } - return Status::OK(); + virtual void Delete(const Slice& key) { + map_->erase(key.ToString()); + } + }; + Handler handler; + handler.map_ = &map_; + return batch->Iterate(&handler); } virtual bool GetProperty(const Slice& property, std::string* value) { @@ -864,7 +871,6 @@ class ModelDB: public DB { } } private: - typedef std::map KVMap; class ModelIter: public Iterator { public: ModelIter(const KVMap* map, bool owned) @@ -897,7 +903,6 @@ class ModelDB: public DB { }; const Options options_; KVMap map_; - SnapshotList snapshots_; }; static std::string RandomKey(Random* rnd) { @@ -1023,8 +1028,70 @@ TEST(DBTest, Randomized) { if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); } +std::string MakeKey(unsigned int num) { + char buf[30]; + snprintf(buf, sizeof(buf), "%016u", num); + return std::string(buf); +} + +void BM_LogAndApply(int iters, int num_base_files) { + std::string dbname = test::TmpDir() + "/leveldb_test_benchmark"; + DestroyDB(dbname, Options()); + + DB* db = NULL; + Options opts; + opts.create_if_missing = true; + Status s = DB::Open(opts, dbname, &db); + ASSERT_OK(s); + ASSERT_TRUE(db != NULL); + + delete db; + db = NULL; + + Env* env = Env::Default(); + + InternalKeyComparator cmp(BytewiseComparator()); + Options options; + VersionSet vset(dbname, &options, NULL, &cmp); + ASSERT_OK(vset.Recover()); + VersionEdit vbase; + uint64_t fnum = 1; + for (int i = 0; i < num_base_files; i++) { + InternalKey start(MakeKey(2*fnum), 1, kTypeValue); + InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + } + ASSERT_OK(vset.LogAndApply(&vbase)); + + uint64_t start_micros = env->NowMicros(); + + for (int i = 0; i < iters; i++) { + VersionEdit vedit; + vedit.DeleteFile(2, fnum); + InternalKey start(MakeKey(2*fnum), 1, kTypeValue); + InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vset.LogAndApply(&vedit); + } + uint64_t stop_micros = env->NowMicros(); + unsigned int us = stop_micros - start_micros; + char buf[16]; + snprintf(buf, sizeof(buf), "%d", num_base_files); + fprintf(stderr, + "BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n", + buf, iters, us, ((float)us) / iters); +} + } int main(int argc, char** argv) { + if (argc > 1 && std::string(argv[1]) == "--benchmark") { + leveldb::BM_LogAndApply(1000, 1); + leveldb::BM_LogAndApply(1000, 100); + leveldb::BM_LogAndApply(1000, 10000); + leveldb::BM_LogAndApply(100, 100000); + return 0; + } + return leveldb::test::RunAllTests(); } diff --git a/db/dbformat.h b/db/dbformat.h index d583665..89c4afb 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -19,6 +19,16 @@ namespace leveldb { // parameters set via options. namespace config { static const int kNumLevels = 7; + +// Level-0 compaction is started when we hit this many files. +static const int kL0_CompactionTrigger = 4; + +// Soft limit on number of level-0 files. We slow down writes at this point. +static const int kL0_SlowdownWritesTrigger = 8; + +// Maximum number of level-0 files. We stop writes at this point. +static const int kL0_StopWritesTrigger = 12; + } class InternalKey; diff --git a/db/log_reader.cc b/db/log_reader.cc index 75e1d28..8721071 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -4,7 +4,6 @@ #include "db/log_reader.h" -#include #include "leveldb/env.h" #include "util/coding.h" #include "util/crc32c.h" @@ -15,46 +14,104 @@ namespace log { Reader::Reporter::~Reporter() { } -Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum) +Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, + uint64_t initial_offset) : file_(file), reporter_(reporter), checksum_(checksum), backing_store_(new char[kBlockSize]), buffer_(), - eof_(false) { + eof_(false), + last_record_offset_(0), + end_of_buffer_offset_(0), + initial_offset_(initial_offset) { } Reader::~Reader() { delete[] backing_store_; } +bool Reader::SkipToInitialBlock() { + size_t offset_in_block = initial_offset_ % kBlockSize; + uint64_t block_start_location = initial_offset_ - offset_in_block; + + // Don't search a block if we'd be in the trailer + if (offset_in_block > kBlockSize - 6) { + offset_in_block = 0; + block_start_location += kBlockSize; + } + + end_of_buffer_offset_ = block_start_location; + + // Skip to start of first block that can contain the initial record + if (block_start_location > 0) { + Status skip_status = file_->Skip(block_start_location); + if (!skip_status.ok()) { + ReportDrop(block_start_location, skip_status); + return false; + } + } + + return true; +} + bool Reader::ReadRecord(Slice* record, std::string* scratch) { + if (last_record_offset_ < initial_offset_) { + if (!SkipToInitialBlock()) { + return false; + } + } + scratch->clear(); record->clear(); bool in_fragmented_record = false; + // Record offset of the logical record that we're reading + // 0 is a dummy value to make compilers happy + uint64_t prospective_record_offset = 0; Slice fragment; while (true) { + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); switch (ReadPhysicalRecord(&fragment)) { case kFullType: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + if (scratch->empty()) { + in_fragmented_record = false; + } else { + ReportCorruption(scratch->size(), "partial record without end(1)"); + } } + prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; + last_record_offset_ = prospective_record_offset; return true; case kFirstType: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + if (scratch->empty()) { + in_fragmented_record = false; + } else { + ReportCorruption(scratch->size(), "partial record without end(2)"); + } } + prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: if (!in_fragmented_record) { - ReportDrop(fragment.size(), "missing start of fragmented record"); + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } @@ -62,31 +119,33 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { case kLastType: if (!in_fragmented_record) { - ReportDrop(fragment.size(), "missing start of fragmented record"); + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; return true; } break; case kEof: if (in_fragmented_record) { - ReportDrop(scratch->size(), "partial record without end"); + ReportCorruption(scratch->size(), "partial record without end(3)"); scratch->clear(); } return false; case kBadRecord: if (in_fragmented_record) { - ReportDrop(scratch->size(), "error in middle of record"); + ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; default: - ReportDrop( + ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), "unknown record type"); in_fragmented_record = false; @@ -97,9 +156,18 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { return false; } -void Reader::ReportDrop(size_t bytes, const char* reason) { - if (reporter_ != NULL) { - reporter_->Corruption(bytes, Status::Corruption(reason)); +uint64_t Reader::LastRecordOffset() { + return last_record_offset_; +} + +void Reader::ReportCorruption(size_t bytes, const char* reason) { + ReportDrop(bytes, Status::Corruption(reason)); +} + +void Reader::ReportDrop(size_t bytes, const Status& reason) { + if (reporter_ != NULL && + end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { + reporter_->Corruption(bytes, reason); } } @@ -110,11 +178,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { - if (reporter_ != NULL) { - reporter_->Corruption(kBlockSize, status); - } buffer_.clear(); + ReportDrop(kBlockSize, status); eof_ = true; return kEof; } else if (buffer_.size() < kBlockSize) { @@ -125,8 +192,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // End of file return kEof; } else { - ReportDrop(buffer_.size(), "truncated record at end of file"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "truncated record at end of file"); return kEof; } } @@ -138,8 +206,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { const unsigned int type = header[6]; const uint32_t length = a | (b << 8); if (kHeaderSize + length > buffer_.size()) { - ReportDrop(buffer_.size(), "bad record length"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "bad record length"); return kBadRecord; } @@ -160,13 +229,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. - ReportDrop(buffer_.size(), "checksum mismatch"); + size_t drop_size = buffer_.size(); buffer_.clear(); + ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } buffer_.remove_prefix(kHeaderSize + length); + + // Skip physical record that started before initial_offset_ + if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < + initial_offset_) { + result->clear(); + return kBadRecord; + } + *result = Slice(header + kHeaderSize, length); return type; } diff --git a/db/log_reader.h b/db/log_reader.h index baf1475..61cc414 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -5,6 +5,8 @@ #ifndef STORAGE_LEVELDB_DB_LOG_READER_H_ #define STORAGE_LEVELDB_DB_LOG_READER_H_ +#include + #include "db/log_format.h" #include "leveldb/slice.h" #include "leveldb/status.h" @@ -35,7 +37,11 @@ class Reader { // live while this Reader is in use. // // If "checksum" is true, verify checksums if available. - Reader(SequentialFile* file, Reporter* reporter, bool checksum); + // + // The Reader will start reading at the first record located at physical + // position >= initial_offset within the file. + Reader(SequentialFile* file, Reporter* reporter, bool checksum, + uint64_t initial_offset); ~Reader(); @@ -46,6 +52,11 @@ class Reader { // reader or the next mutation to *scratch. bool ReadRecord(Slice* record, std::string* scratch); + // Returns the physical offset of the last record returned by ReadRecord. + // + // Undefined before the first call to ReadRecord. + uint64_t LastRecordOffset(); + private: SequentialFile* const file_; Reporter* const reporter_; @@ -54,15 +65,37 @@ class Reader { Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + // Offset of the last record returned by ReadRecord. + uint64_t last_record_offset_; + // Offset of the first location past the end of buffer_. + uint64_t end_of_buffer_offset_; + + // Offset at which to start looking for the first record to return + uint64_t const initial_offset_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, + // Returned whenever we find an invalid physical record. + // Currently there are three situations in which this happens: + // * The record has an invalid CRC (ReadPhysicalRecord reports a drop) + // * The record is a 0-length record (No drop is reported) + // * The record is below constructor's initial_offset (No drop is reported) kBadRecord = kMaxRecordType + 2 }; + // Skips all blocks that are completely before "initial_offset_". + // + // Returns true on success. Handles reporting. + bool SkipToInitialBlock(); + // Return type, or one of the preceding special values unsigned int ReadPhysicalRecord(Slice* result); - void ReportDrop(size_t bytes, const char* reason); + + // Reports dropped bytes to the reporter. + // buffer_ must be updated to remove the dropped bytes prior to invocation. + void ReportCorruption(size_t bytes, const char* reason); + void ReportDrop(size_t bytes, const Status& reason); // No copying allowed Reader(const Reader&); diff --git a/db/log_test.cc b/db/log_test.cc index 025a5ff..040bdff 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -60,7 +60,6 @@ class LogTest { virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; - ASSERT_EQ(kBlockSize, n); if (force_error_) { force_error_ = false; @@ -76,6 +75,17 @@ class LogTest { contents_.remove_prefix(n); return Status::OK(); } + + virtual Status Skip(size_t n) { + if (n > contents_.size()) { + contents_.clear(); + return Status::NotFound("in-memory file skipepd past end"); + } + + contents_.remove_prefix(n); + + return Status::OK(); + } }; class ReportCollector : public Reader::Reporter { @@ -97,10 +107,15 @@ class LogTest { Writer writer_; Reader reader_; + // Record metadata for testing initial offset functionality + static size_t initial_offset_record_sizes_[]; + static uint64_t initial_offset_last_record_offsets_[]; + public: LogTest() : reading_(false), writer_(&dest_), - reader_(&source_, &report_, true/*checksum*/) { + reader_(&source_, &report_, true/*checksum*/, + 0/*initial_offset*/) { } void Write(const std::string& msg) { @@ -153,6 +168,10 @@ class LogTest { return report_.dropped_bytes_; } + std::string ReportMessage() const { + return report_.message_; + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -161,8 +180,61 @@ class LogTest { return "OK"; } } + + void WriteInitialOffsetLog() { + for (int i = 0; i < 4; i++) { + std::string record(initial_offset_record_sizes_[i], + static_cast('a' + i)); + Write(record); + } + } + + void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { + WriteInitialOffsetLog(); + reading_ = true; + source_.contents_ = Slice(dest_.contents_); + Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, + WrittenBytes() + offset_past_end); + Slice record; + std::string scratch; + ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); + delete offset_reader; + } + + void CheckInitialOffsetRecord(uint64_t initial_offset, + int expected_record_offset) { + WriteInitialOffsetLog(); + reading_ = true; + source_.contents_ = Slice(dest_.contents_); + Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/, + initial_offset); + Slice record; + std::string scratch; + ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); + ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset], + record.size()); + ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset], + offset_reader->LastRecordOffset()); + ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); + delete offset_reader; + } + }; +size_t LogTest::initial_offset_record_sizes_[] = + {10000, // Two sizable records in first block + 10000, + 2 * log::kBlockSize - 1000, // Span three blocks + 1}; + +uint64_t LogTest::initial_offset_last_record_offsets_[] = + {0, + kHeaderSize + 10000, + 2 * (kHeaderSize + 10000), + 2 * (kHeaderSize + 10000) + + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize}; + + TEST(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } @@ -213,6 +285,19 @@ TEST(LogTest, MarginalTrailer) { ASSERT_EQ("EOF", Read()); } +TEST(LogTest, MarginalTrailer2) { + // Make a trailer that is exactly the same length as an empty record. + const int n = kBlockSize - 2*kHeaderSize; + Write(BigString("foo", n)); + ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); + Write("bar"); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("", ReportMessage()); +} + TEST(LogTest, ShortTrailer) { const int n = kBlockSize - 2*kHeaderSize + 4; Write(BigString("foo", n)); @@ -353,6 +438,60 @@ TEST(LogTest, ErrorJoinsRecords) { ASSERT_GE(dropped, 2*kBlockSize); } +TEST(LogTest, ReadStart) { + CheckInitialOffsetRecord(0, 0); +} + +TEST(LogTest, ReadSecondOneOff) { + CheckInitialOffsetRecord(1, 1); +} + +TEST(LogTest, ReadSecondTenThousand) { + CheckInitialOffsetRecord(10000, 1); +} + +TEST(LogTest, ReadSecondStart) { + CheckInitialOffsetRecord(10007, 1); +} + +TEST(LogTest, ReadThirdOneOff) { + CheckInitialOffsetRecord(10008, 2); +} + +TEST(LogTest, ReadThirdStart) { + CheckInitialOffsetRecord(20014, 2); +} + +TEST(LogTest, ReadFourthOneOff) { + CheckInitialOffsetRecord(20015, 3); +} + +TEST(LogTest, ReadFourthFirstBlockTrailer) { + CheckInitialOffsetRecord(log::kBlockSize - 4, 3); +} + +TEST(LogTest, ReadFourthMiddleBlock) { + CheckInitialOffsetRecord(log::kBlockSize + 1, 3); +} + +TEST(LogTest, ReadFourthLastBlock) { + CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); +} + +TEST(LogTest, ReadFourthStart) { + CheckInitialOffsetRecord( + 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, + 3); +} + +TEST(LogTest, ReadEnd) { + CheckOffsetPastEndReturnsNoRecords(0); +} + +TEST(LogTest, ReadPastEnd) { + CheckOffsetPastEndReturnsNoRecords(5); +} + } } diff --git a/db/log_writer.cc b/db/log_writer.cc index 1696851..0887f6c 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -32,6 +32,7 @@ Status Writer::AddRecord(const Slice& slice) { // is empty, we still want to iterate once to emit a single // zero-length record Status s; + bool begin = true; do { const int leftover = kBlockSize - block_offset_; assert(leftover >= 0); @@ -52,7 +53,6 @@ Status Writer::AddRecord(const Slice& slice) { const size_t fragment_length = (left < avail) ? left : avail; RecordType type; - const bool begin = (ptr == slice.data()); const bool end = (left == fragment_length); if (begin && end) { type = kFullType; @@ -67,6 +67,7 @@ Status Writer::AddRecord(const Slice& slice) { s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; + begin = false; } while (s.ok() && left > 0); return s; } diff --git a/db/memtable.cc b/db/memtable.cc index a3b618a..9c25f6d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -20,10 +20,12 @@ static Slice GetLengthPrefixedSlice(const char* data) { MemTable::MemTable(const InternalKeyComparator& cmp) : comparator_(cmp), + refs_(0), table_(comparator_, &arena_) { } MemTable::~MemTable() { + assert(refs_ == 0); } size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } @@ -48,10 +50,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable::Table* table) { + explicit MemTableIterator(MemTable* mem, MemTable::Table* table) { + mem_ = mem; iter_ = new MemTable::Table::Iterator(table); + mem->Ref(); + } + virtual ~MemTableIterator() { + delete iter_; + mem_->Unref(); } - virtual ~MemTableIterator() { delete iter_; } virtual bool Valid() const { return iter_->Valid(); } virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); } @@ -68,6 +75,7 @@ class MemTableIterator: public Iterator { virtual Status status() const { return Status::OK(); } private: + MemTable* mem_; MemTable::Table::Iterator* iter_; std::string tmp_; // For passing to EncodeKey @@ -77,7 +85,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(&table_); + return new MemTableIterator(this, &table_); } void MemTable::Add(SequenceNumber s, ValueType type, diff --git a/db/memtable.h b/db/memtable.h index 45b3342..2e9bd61 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -19,8 +19,21 @@ class MemTableIterator; class MemTable { public: + // MemTables are reference counted. The initial reference count + // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator); - ~MemTable(); + + // Increase reference count. + void Ref() { ++refs_; } + + // Drop reference count. Delete if no more references exist. + void Unref() { + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + delete this; + } + } // Returns an estimate of the number of bytes of data in use by this // data structure. @@ -45,6 +58,8 @@ class MemTable { const Slice& value); private: + ~MemTable(); // Private since only Unref() should be used to delete it + struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } @@ -56,6 +71,7 @@ class MemTable { typedef SkipList Table; KeyComparator comparator_; + int refs_; Arena arena_; Table table_; diff --git a/db/repair.cc b/db/repair.cc index c8e7b9e..4b57169 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -183,13 +183,15 @@ class Repairer { // corruptions cause entire commits to be skipped instead of // propagating bad information (like overly large sequence // numbers). - log::Reader reader(lfile, &reporter, false/*do not checksum*/); + log::Reader reader(lfile, &reporter, false/*do not checksum*/, + 0/*initial_offset*/); // Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; - MemTable mem(icmp_); + MemTable* mem = new MemTable(icmp_); + mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { @@ -198,7 +200,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, &mem); + status = WriteBatchInternal::InsertInto(&batch, mem); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { @@ -215,10 +217,12 @@ class Repairer { VersionEdit skipped; FileMetaData meta; meta.number = next_file_number_++; - Iterator* iter = mem.NewIterator(); + Iterator* iter = mem->NewIterator(); status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &skipped); delete iter; + mem->Unref(); + mem = NULL; if (status.ok()) { if (meta.file_size > 0) { table_numbers_.push_back(meta.number); diff --git a/db/snapshot.h b/db/snapshot.h index 9a90756..a08dbd3 100644 --- a/db/snapshot.h +++ b/db/snapshot.h @@ -12,17 +12,17 @@ namespace leveldb { class SnapshotList; // Snapshots are kept in a doubly-linked list in the DB. -// Each Snapshot corresponds to a particular sequence number. -class Snapshot { +// Each SnapshotImpl corresponds to a particular sequence number. +class SnapshotImpl : public Snapshot { public: SequenceNumber number_; // const after creation private: friend class SnapshotList; - // Snapshot is kept in a doubly-linked circular list - Snapshot* prev_; - Snapshot* next_; + // SnapshotImpl is kept in a doubly-linked circular list + SnapshotImpl* prev_; + SnapshotImpl* next_; SnapshotList* list_; // just for sanity checks }; @@ -35,11 +35,11 @@ class SnapshotList { } bool empty() const { return list_.next_ == &list_; } - Snapshot* oldest() const { assert(!empty()); return list_.next_; } - Snapshot* newest() const { assert(!empty()); return list_.prev_; } + SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } + SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } - const Snapshot* New(SequenceNumber seq) { - Snapshot* s = new Snapshot; + const SnapshotImpl* New(SequenceNumber seq) { + SnapshotImpl* s = new SnapshotImpl; s->number_ = seq; s->list_ = this; s->next_ = &list_; @@ -49,7 +49,7 @@ class SnapshotList { return s; } - void Delete(const Snapshot* s) { + void Delete(const SnapshotImpl* s) { assert(s->list_ == this); s->prev_->next_ = s->next_; s->next_->prev_ = s->prev_; @@ -58,7 +58,7 @@ class SnapshotList { private: // Dummy head of doubly-linked list of snapshots - Snapshot list_; + SnapshotImpl list_; }; } diff --git a/db/version_set.cc b/db/version_set.cc index c439f49..f64ac8d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -57,17 +57,22 @@ std::string IntSetToString(const std::set& s) { Version::~Version() { assert(refs_ == 0); + + // Remove from linked list + prev_->next_ = next_; + next_->prev_ = prev_; + + // Drop references to files for (int level = 0; level < config::kNumLevels; level++) { for (size_t i = 0; i < files_[level].size(); i++) { FileMetaData* f = files_[level][i]; - assert(f->refs >= 0); + assert(f->refs > 0); f->refs--; if (f->refs <= 0) { delete f; } } } - delete cleanup_mem_; } // An internal iterator. For a given version/level pair, yields @@ -77,9 +82,9 @@ Version::~Version() { // encoded using EncodeFixed64. class Version::LevelFileNumIterator : public Iterator { public: - LevelFileNumIterator(const Version* version, + LevelFileNumIterator(const InternalKeyComparator& icmp, const std::vector* flist) - : icmp_(version->vset_->icmp_.user_comparator()), + : icmp_(icmp), flist_(flist), index_(flist->size()) { // Marks as invalid } @@ -157,7 +162,7 @@ static Iterator* GetFileIterator(void* arg, Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, int level) const { return NewTwoLevelIterator( - new LevelFileNumIterator(this, &files_[level]), + new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator, vset_->table_cache_, options); } @@ -185,11 +190,11 @@ void Version::Ref() { } void Version::Unref() { + assert(this != &vset_->dummy_versions_); assert(refs_ >= 1); --refs_; if (refs_ == 0) { - vset_->MaybeDeleteOldVersions(); - // TODO: try to delete obsolete files + delete this; } } @@ -222,37 +227,58 @@ std::string Version::DebugString() const { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - typedef std::map FileMap; + // Helper to sort by v->files_[file_number].smallest + struct BySmallestKey { + const InternalKeyComparator* internal_comparator; + + bool operator()(FileMetaData* f1, FileMetaData* f2) const { + int r = internal_comparator->Compare(f1->smallest, f2->smallest); + if (r != 0) { + return (r < 0); + } else { + // Break ties by file number + return (f1->number < f2->number); + } + } + }; + + typedef std::set FileSet; + struct LevelState { + std::set deleted_files; + FileSet* added_files; + }; + VersionSet* vset_; - FileMap files_[config::kNumLevels]; + Version* base_; + LevelState levels_[config::kNumLevels]; public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) - : vset_(vset) { + : vset_(vset), + base_(base) { + base_->Ref(); + BySmallestKey cmp; + cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { - const std::vector& files = base->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - FileMetaData* f = files[i]; - f->refs++; - files_[level].insert(std::make_pair(f->number, f)); - } + levels_[level].added_files = new FileSet(cmp); } } ~Builder() { for (int level = 0; level < config::kNumLevels; level++) { - const FileMap& fmap = files_[level]; - for (FileMap::const_iterator iter = fmap.begin(); - iter != fmap.end(); - ++iter) { - FileMetaData* f = iter->second; + std::vector to_unref(levels_[level].added_files->begin(), + levels_[level].added_files->end()); + delete levels_[level].added_files; + for (int i = 0; i < to_unref.size(); i++) { + FileMetaData* f = to_unref[i]; f->refs--; if (f->refs <= 0) { delete f; } } } + base_->Unref(); } // Apply all of the edits in *edit to the current state. @@ -271,16 +297,7 @@ class VersionSet::Builder { ++iter) { const int level = iter->first; const uint64_t number = iter->second; - FileMap::iterator fiter = files_[level].find(number); - assert(fiter != files_[level].end()); // Sanity check for debug mode - if (fiter != files_[level].end()) { - FileMetaData* f = fiter->second; - f->refs--; - if (f->refs <= 0) { - delete f; - } - files_[level].erase(fiter); - } + levels_[level].deleted_files.insert(number); } // Add new files @@ -288,22 +305,66 @@ class VersionSet::Builder { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; - assert(files_[level].count(f->number) == 0); - files_[level].insert(std::make_pair(f->number, f)); + levels_[level].deleted_files.erase(f->number); + levels_[level].added_files->insert(f); } } // Save the current state in *v. void SaveTo(Version* v) { + BySmallestKey cmp; + cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { - const FileMap& fmap = files_[level]; - for (FileMap::const_iterator iter = fmap.begin(); - iter != fmap.end(); - ++iter) { - FileMetaData* f = iter->second; - f->refs++; - v->files_[level].push_back(f); + // Merge the set of added files with the set of pre-existing files. + // Drop any deleted files. Store the result in *v. + const std::vector& base_files = base_->files_[level]; + std::vector::const_iterator base_iter = base_files.begin(); + std::vector::const_iterator base_end = base_files.end(); + const FileSet* added = levels_[level].added_files; + v->files_[level].reserve(base_files.size() + added->size()); + for (FileSet::const_iterator added_iter = added->begin(); + added_iter != added->end(); + ++added_iter) { + // Add all smaller files listed in base_ + for (std::vector::const_iterator bpos + = std::upper_bound(base_iter, base_end, *added_iter, cmp); + base_iter != bpos; + ++base_iter) { + MaybeAddFile(v, level, *base_iter); + } + + MaybeAddFile(v, level, *added_iter); + } + + // Add remaining base files + for (; base_iter != base_end; ++base_iter) { + MaybeAddFile(v, level, *base_iter); } + +#ifndef NDEBUG + // Make sure there is no overlap in levels > 0 + if (level > 0) { + for (int i = 1; i < v->files_[level].size(); i++) { + const InternalKey& prev_end = v->files_[level][i-1]->largest; + const InternalKey& this_begin = v->files_[level][i]->smallest; + if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", + EscapeString(prev_end.Encode()).c_str(), + EscapeString(this_begin.Encode()).c_str()); + abort(); + } + } + } +#endif + } + } + + void MaybeAddFile(Version* v, int level, FileMetaData* f) { + if (levels_[level].deleted_files.count(f->number) > 0) { + // File is deleted: do nothing + } else { + f->refs++; + v->files_[level].push_back(f); } } }; @@ -324,22 +385,36 @@ VersionSet::VersionSet(const std::string& dbname, prev_log_number_(0), descriptor_file_(NULL), descriptor_log_(NULL), - current_(new Version(this)), - oldest_(current_) { + dummy_versions_(this), + current_(NULL) { + AppendVersion(new Version(this)); } VersionSet::~VersionSet() { - for (Version* v = oldest_; v != NULL; ) { - Version* next = v->next_; - assert(v->refs_ == 0); - delete v; - v = next; - } + current_->Unref(); + assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty delete descriptor_log_; delete descriptor_file_; } -Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { +void VersionSet::AppendVersion(Version* v) { + // Make "v" current + assert(v->refs_ == 0); + assert(v != current_); + if (current_ != NULL) { + current_->Unref(); + } + current_ = v; + v->Ref(); + + // Append to linked list + v->prev_ = dummy_versions_.prev_; + v->next_ = &dummy_versions_; + v->prev_->next_ = v; + v->next_->prev_ = v; +} + +Status VersionSet::LogAndApply(VersionEdit* edit) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -360,22 +435,20 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { builder.Apply(edit); builder.SaveTo(v); } - - std::string new_manifest_file; - Status s = Finalize(v); + Finalize(v); // Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. - if (s.ok()) { - if (descriptor_log_ == NULL) { - assert(descriptor_file_ == NULL); - new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); - edit->SetNextFile(next_file_number_); - s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); - if (s.ok()) { - descriptor_log_ = new log::Writer(descriptor_file_); - s = WriteSnapshot(descriptor_log_); - } + std::string new_manifest_file; + Status s; + if (descriptor_log_ == NULL) { + assert(descriptor_file_ == NULL); + new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); + edit->SetNextFile(next_file_number_); + s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); + if (s.ok()) { + descriptor_log_ = new log::Writer(descriptor_file_); + s = WriteSnapshot(descriptor_log_); } } @@ -397,12 +470,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { // Install the new version if (s.ok()) { - assert(current_->next_ == NULL); - assert(current_->cleanup_mem_ == NULL); - current_->cleanup_mem_ = cleanup_mem; - v->next_ = NULL; - current_->next_ = v; - current_ = v; + AppendVersion(v); log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } else { @@ -458,7 +526,7 @@ Status VersionSet::Recover() { { LogReporter reporter; reporter.status = &s; - log::Reader reader(file, &reporter, true/*checksum*/); + log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -518,20 +586,14 @@ Status VersionSet::Recover() { if (s.ok()) { Version* v = new Version(this); builder.SaveTo(v); - s = Finalize(v); - if (!s.ok()) { - delete v; - } else { - // Install recovered version - v->next_ = NULL; - current_->next_ = v; - current_ = v; - manifest_file_number_ = next_file; - next_file_number_ = next_file + 1; - last_sequence_ = last_sequence; - log_number_ = log_number; - prev_log_number_ = prev_log_number; - } + // Install recovered version + Finalize(v); + AppendVersion(v); + manifest_file_number_ = next_file; + next_file_number_ = next_file + 1; + last_sequence_ = last_sequence; + log_number_ = log_number; + prev_log_number_ = prev_log_number; } return s; @@ -545,15 +607,12 @@ static int64_t TotalFileSize(const std::vector& files) { return sum; } -Status VersionSet::Finalize(Version* v) { +void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; - Status s; - for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) { - s = SortLevel(v, level); - + for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files @@ -567,7 +626,8 @@ Status VersionSet::Finalize(Version* v) { // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). - score = v->files_[level].size() / 4.0; + score = v->files_[level].size() / + static_cast(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); @@ -582,7 +642,6 @@ Status VersionSet::Finalize(Version* v) { v->compaction_level_ = best_level; v->compaction_score_ = best_score; - return s; } Status VersionSet::WriteSnapshot(log::Writer* log) { @@ -615,44 +674,27 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { return log->AddRecord(record); } -// Helper to sort by tables_[file_number].smallest -struct VersionSet::BySmallestKey { - const InternalKeyComparator* internal_comparator; - - bool operator()(FileMetaData* f1, FileMetaData* f2) const { - return internal_comparator->Compare(f1->smallest, f2->smallest) < 0; - } -}; - -Status VersionSet::SortLevel(Version* v, uint64_t level) { - Status result; - BySmallestKey cmp; - cmp.internal_comparator = &icmp_; - std::sort(v->files_[level].begin(), v->files_[level].end(), cmp); - - if (result.ok() && level > 0) { - // There should be no overlap - for (size_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (icmp_.Compare(prev_end, this_begin) >= 0) { - result = Status::Corruption( - "overlapping ranges in same level", - (EscapeString(prev_end.Encode()) + " vs. " + - EscapeString(this_begin.Encode()))); - break; - } - } - } - return result; -} - int VersionSet::NumLevelFiles(int level) const { assert(level >= 0); assert(level < config::kNumLevels); return current_->files_[level].size(); } +const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { + // Update code if kNumLevels changes + assert(config::kNumLevels == 7); + snprintf(scratch->buffer, sizeof(scratch->buffer), + "files[ %d %d %d %d %d %d %d ]", + int(current_->files_[0].size()), + int(current_->files_[1].size()), + int(current_->files_[2].size()), + int(current_->files_[3].size()), + int(current_->files_[4].size()), + int(current_->files_[5].size()), + int(current_->files_[6].size())); + return scratch->buffer; +} + uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; for (int level = 0; level < config::kNumLevels; level++) { @@ -685,19 +727,10 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { return result; } -void VersionSet::MaybeDeleteOldVersions() { - // Note: it is important to delete versions in order since a newer - // version with zero refs may be holding a pointer to a memtable - // that is used by somebody who has a ref on an older version. - while (oldest_ != current_ && oldest_->refs_ == 0) { - Version* next = oldest_->next_; - delete oldest_; - oldest_ = next; - } -} - void VersionSet::AddLiveFiles(std::set* live) { - for (Version* v = oldest_; v != NULL; v = v->next_) { + for (Version* v = dummy_versions_.next_; + v != &dummy_versions_; + v = v->next_) { for (int level = 0; level < config::kNumLevels; level++) { const std::vector& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -809,8 +842,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator( - c->input_version_, &c->inputs_[which]), + new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]), &GetFileIterator, table_cache_, options); } } @@ -996,11 +1028,12 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) { return true; } -bool Compaction::ShouldStopBefore(const InternalKey& key) { +bool Compaction::ShouldStopBefore(const Slice& internal_key) { // Scan to find earliest grandparent file that contains key. const InternalKeyComparator* icmp = &input_version_->vset_->icmp_; while (grandparent_index_ < grandparents_.size() && - icmp->Compare(key, grandparents_[grandparent_index_]->largest) > 0) { + icmp->Compare(internal_key, + grandparents_[grandparent_index_]->largest.Encode()) > 0) { if (seen_key_) { overlapped_bytes_ += grandparents_[grandparent_index_]->file_size; } diff --git a/db/version_set.h b/db/version_set.h index e377513..2bac5e2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -59,8 +59,8 @@ class Version { VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list + Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version - MemTable* cleanup_mem_; // NULL, or table to delete when version dropped // List of files per level std::vector files_[config::kNumLevels]; @@ -72,8 +72,7 @@ class Version { int compaction_level_; explicit Version(VersionSet* vset) - : vset_(vset), next_(NULL), refs_(0), - cleanup_mem_(NULL), + : vset_(vset), next_(this), prev_(this), refs_(0), compaction_score_(-1), compaction_level_(-1) { } @@ -95,10 +94,8 @@ class VersionSet { // Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new - // current version. Iff Apply() returns OK, arrange to delete - // cleanup_mem (if cleanup_mem != NULL) when it is no longer needed - // by older versions. - Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem); + // current version. + Status LogAndApply(VersionEdit* edit); // Recover the last saved descriptor from persistent storage. Status Recover(); @@ -171,19 +168,20 @@ class VersionSet { // "key" as of version "v". uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key); + // Return a human-readable short (single-line) summary of the number + // of files per level. Uses *scratch as backing store. + struct LevelSummaryStorage { + char buffer[100]; + }; + const char* LevelSummary(LevelSummaryStorage* scratch) const; + private: class Builder; friend class Compaction; friend class Version; - Status Finalize(Version* v); - - // Delete any old versions that are no longer needed. - void MaybeDeleteOldVersions(); - - struct BySmallestKey; - Status SortLevel(Version* v, uint64_t level); + void Finalize(Version* v); void GetOverlappingInputs( int level, @@ -202,6 +200,8 @@ class VersionSet { void SetupOtherInputs(Compaction* c); + void AppendVersion(Version* v); + Env* const env_; const std::string dbname_; const Options* const options_; @@ -216,10 +216,8 @@ class VersionSet { // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; - - // Versions are kept in a singly linked list that is never empty - Version* current_; // Pointer to the last (newest) list entry - Version* oldest_; // Pointer to the first (oldest) list entry + Version dummy_versions_; // Head of circular doubly-linked list of versions. + Version* current_; // == dummy_versions_.prev_ // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. @@ -265,8 +263,8 @@ class Compaction { bool IsBaseLevelForKey(const Slice& user_key); // Returns true iff we should stop building the current output - // before processing "key". - bool ShouldStopBefore(const InternalKey& key); + // before processing "internal_key". + bool ShouldStopBefore(const Slice& internal_key); // Release the input version for the compaction, once the compaction // is successful. diff --git a/db/write_batch.cc b/db/write_batch.cc index d561528..4e1e899 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -29,11 +29,53 @@ WriteBatch::WriteBatch() { WriteBatch::~WriteBatch() { } +WriteBatch::Handler::~Handler() { } + void WriteBatch::Clear() { rep_.clear(); rep_.resize(12); } +Status WriteBatch::Iterate(Handler* handler) const { + Slice input(rep_); + if (input.size() < 12) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + + input.remove_prefix(12); + Slice key, value; + int found = 0; + while (!input.empty()) { + found++; + char tag = input[0]; + input.remove_prefix(1); + switch (tag) { + case kTypeValue: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Put(key, value); + } else { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeDeletion: + if (GetLengthPrefixedSlice(&input, &key)) { + handler->Delete(key); + } else { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + if (found != WriteBatchInternal::Count(this)) { + return Status::Corruption("WriteBatch has wrong count"); + } else { + return Status::OK(); + } +} + int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } @@ -63,28 +105,29 @@ void WriteBatch::Delete(const Slice& key) { PutLengthPrefixedSlice(&rep_, key); } -Status WriteBatchInternal::InsertInto(const WriteBatch* b, - MemTable* memtable) { - const int count = WriteBatchInternal::Count(b); - int found = 0; - Iterator it(*b); - for (; !it.Done(); it.Next()) { - switch (it.op()) { - case kTypeDeletion: - memtable->Add(it.sequence_number(), kTypeDeletion, it.key(), Slice()); - break; - case kTypeValue: - memtable->Add(it.sequence_number(), kTypeValue, it.key(), it.value()); - break; - } - found++; +namespace { +class MemTableInserter : public WriteBatch::Handler { + public: + SequenceNumber sequence_; + MemTable* mem_; + + virtual void Put(const Slice& key, const Slice& value) { + mem_->Add(sequence_, kTypeValue, key, value); + sequence_++; } - if (!it.status().ok()) { - return it.status(); - } else if (found != count) { - return Status::Corruption("wrong count in WriteBatch"); + virtual void Delete(const Slice& key) { + mem_->Add(sequence_, kTypeDeletion, key, Slice()); + sequence_++; } - return Status::OK(); +}; +} + +Status WriteBatchInternal::InsertInto(const WriteBatch* b, + MemTable* memtable) { + MemTableInserter inserter; + inserter.sequence_ = WriteBatchInternal::Sequence(b); + inserter.mem_ = memtable; + return b->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { @@ -92,57 +135,4 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { b->rep_.assign(contents.data(), contents.size()); } -WriteBatchInternal::Iterator::Iterator(const WriteBatch& batch) - : input_(WriteBatchInternal::Contents(&batch)), - done_(false) { - if (input_.size() < 12) { - done_ = true; - } else { - seq_ = WriteBatchInternal::Sequence(&batch), - input_.remove_prefix(12); - GetNextEntry(); - } -} - -void WriteBatchInternal::Iterator::Next() { - assert(!done_); - seq_++; - GetNextEntry(); -} - -void WriteBatchInternal::Iterator::GetNextEntry() { - if (input_.empty()) { - done_ = true; - return; - } - char tag = input_[0]; - input_.remove_prefix(1); - switch (tag) { - case kTypeValue: - if (GetLengthPrefixedSlice(&input_, &key_) && - GetLengthPrefixedSlice(&input_, &value_)) { - op_ = static_cast(tag); - } else { - status_ = Status::Corruption("bad WriteBatch Put"); - done_ = true; - input_.clear(); - } - break; - case kTypeDeletion: - if (GetLengthPrefixedSlice(&input_, &key_)) { - op_ = kTypeDeletion; - } else { - status_ = Status::Corruption("bad WriteBatch Delete"); - done_ = true; - input_.clear(); - } - break; - default: - status_ = Status::Corruption("unknown WriteBatch tag"); - done_ = true; - input_.clear(); - break; - } -} - } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index ab0a823..d975444 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -37,30 +37,6 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); static Status InsertInto(const WriteBatch* batch, MemTable* memtable); - - // Iterate over the contents of a write batch. - class Iterator { - public: - explicit Iterator(const WriteBatch& batch); - bool Done() const { return done_; } - void Next(); - ValueType op() const { return op_; } - const Slice& key() const { return key_; } - const Slice& value() const { return value_; } - SequenceNumber sequence_number() const { return seq_; } - Status status() const { return status_; } - - private: - void GetNextEntry(); - - Slice input_; - bool done_; - ValueType op_; - Slice key_; - Slice value_; - SequenceNumber seq_; - Status status_; - }; }; } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 2bf1134..73d68fd 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -14,10 +14,11 @@ namespace leveldb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable mem(cmp); + MemTable* mem = new MemTable(cmp); + mem->Ref(); std::string state; - Status s = WriteBatchInternal::InsertInto(b, &mem); - Iterator* iter = mem.NewIterator(); + Status s = WriteBatchInternal::InsertInto(b, mem); + Iterator* iter = mem->NewIterator(); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey)); @@ -42,6 +43,7 @@ static std::string PrintContents(WriteBatch* b) { if (!s.ok()) { state.append("ParseError()"); } + mem->Unref(); return state; } diff --git a/doc/impl.html b/doc/impl.html index dd09fea..e870795 100644 --- a/doc/impl.html +++ b/doc/impl.html @@ -17,14 +17,14 @@ However the organization of the files that make up the representation is somewhat different and is explained below.

-Each database is represented by a set of file stored in a directory. +Each database is represented by a set of files stored in a directory. There are several different types of files as documented below:

Log files

A log file (*.log) stores a sequence of recent updates. Each update is appended to the current log file. When the log file reaches a -pre-determined size (approximately 1MB by default), it is converted +pre-determined size (approximately 4MB by default), it is converted to a sorted table (see below) and a new log file is created for future updates.

@@ -83,19 +83,15 @@ Other files used for miscellaneous purposes may also be present

Level 0

When the log file grows above a certain size (1MB by default):
    -
  • Write the contents of the current memtable to an sstable -
  • Replace the current memtable by a brand new empty memtable -
  • Switch to a new log file +
  • Create a brand new memtable and log file and direct future updates here +
  • In the background: +
      +
    • Write the contents of the previous memtable to an sstable +
    • Discard the memtable
    • Delete the old log file and the old memtable +
    • Add the new sstable to the young (level-0) level. +
-Experimental measurements show that generating an sstable from a 1MB -log file takes ~12ms, which seems like an acceptable latency hiccup to -add infrequently to a log write. - -

-The new sstable is added to a special level-0 level. level-0 contains -a set of files (up to 4 by default). However unlike other levels, -these files do not cover disjoint ranges, but may overlap each other.

Compactions

@@ -162,8 +158,8 @@ read.

Solution 1: To reduce this problem, we might want to increase the log switching threshold when the number of level-0 files is large. Though -the downside is that the larger this threshold, the larger the delay -that we will add to write latency when a write triggers a log switch. +the downside is that the larger this threshold, the more memory we will +need to hold the corresponding memtable.

Solution 2: We might want to decrease write rate artificially when the diff --git a/doc/index.html b/doc/index.html index c2312b7..58442e8 100644 --- a/doc/index.html +++ b/doc/index.html @@ -141,10 +141,18 @@ the batch.

Concurrency

-A database may only be opened by one process at a time. The leveldb -implementation acquires a lock from the operating system to prevent -misuse. Within a single process, the same leveldb::DB object may -be safely used by multiple concurrent threads. +A database may only be opened by one process at a time. +The leveldb implementation acquires a lock from the +operating system to prevent misuse. Within a single process, the +same leveldb::DB object may be safely shared by multiple +concurrent threads. I.e., different threads may write into or fetch +iterators or call Get on the same database without any +external synchronization (the leveldb implementation will +automatically do the required synchronization). However other objects +(like Iterator and WriteBatch) may require external synchronization. +If two threads share such an object, they must protect access to it +using their own locking protocol. More details are available in +the public header files.

Iteration

diff --git a/include/leveldb/comparator.h b/include/leveldb/comparator.h index 4e00e4d..c215fac 100644 --- a/include/leveldb/comparator.h +++ b/include/leveldb/comparator.h @@ -12,7 +12,9 @@ namespace leveldb { class Slice; // A Comparator object provides a total order across slices that are -// used as keys in an sstable or a database. +// used as keys in an sstable or a database. A Comparator implementation +// must be thread-safe since leveldb may invoke its methods concurrently +// from multiple threads. class Comparator { public: virtual ~Comparator(); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index f18ded3..79bd283 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -13,26 +13,32 @@ namespace leveldb { static const int kMajorVersion = 1; -static const int kMinorVersion = 1; +static const int kMinorVersion = 2; struct Options; struct ReadOptions; struct WriteOptions; - -class Snapshot; class WriteBatch; -// Some internal types. Clients should ignore. -class WriteBatchInternal; +// Abstract handle to particular state of a DB. +// A Snapshot is an immutable object and can therefore be safely +// accessed from multiple threads without any external synchronization. +class Snapshot { + protected: + virtual ~Snapshot(); +}; +// A range of keys struct Range { - Slice start; - Slice limit; + Slice start; // Included in the range + Slice limit; // Not included in the range Range(const Slice& s, const Slice& l) : start(s), limit(l) { } }; // A DB is a persistent ordered map from keys to values. +// A DB is safe for concurrent access from multiple threads without +// any external synchronization. class DB { public: // Open the database with the specified "name". diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 4b6e712..39f6a1a 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -6,6 +6,9 @@ // operating system functionality like the filesystem etc. Callers // may wish to provide a custom Env object when opening a database to // get fine gain control; e.g., to rate limit file system operations. +// +// All Env implementations are safe for concurrent access from +// multiple threads without any external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_ #define STORAGE_LEVELDB_INCLUDE_ENV_H_ @@ -160,6 +163,15 @@ class SequentialFile { // // REQUIRES: External synchronization virtual Status Read(size_t n, Slice* result, char* scratch) = 0; + + // Skip "n" bytes from the file. This is guaranteed to be no + // slower that reading the same data, but may be faster. + // + // If end of file is reached, skipping will stop at the end of the + // file, and Skip will return OK. + // + // REQUIRES: External synchronization + virtual Status Skip(uint64_t n) = 0; }; // A file abstraction for randomly reading the contents of a file. diff --git a/include/leveldb/iterator.h b/include/leveldb/iterator.h index 1866fb5..6821d85 100644 --- a/include/leveldb/iterator.h +++ b/include/leveldb/iterator.h @@ -6,6 +6,11 @@ // The following class defines the interface. Multiple implementations // are provided by this library. In particular, iterators are provided // to access the contents of a Table or a DB. +// +// Multiple threads can invoke const methods on an Iterator without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Iterator must use +// external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_ITERATOR_H_ #define STORAGE_LEVELDB_INCLUDE_ITERATOR_H_ diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h index 62cb894..3c000b8 100644 --- a/include/leveldb/slice.h +++ b/include/leveldb/slice.h @@ -6,6 +6,11 @@ // storage and a size. The user of a Slice must ensure that the slice // is not used after the corresponding external storage has been // deallocated. +// +// Multiple threads can invoke const methods on a Slice without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Slice must use +// external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_SLICE_H_ #define STORAGE_LEVELDB_INCLUDE_SLICE_H_ diff --git a/include/leveldb/status.h b/include/leveldb/status.h index 47e3edf..6796fdd 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -4,12 +4,16 @@ // // A Status encapsulates the result of an operation. It may indicate success, // or it may indicate an error with an associated error message. +// +// Multiple threads can invoke const methods on a Status without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same Status must use +// external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_STATUS_H_ #define STORAGE_LEVELDB_INCLUDE_STATUS_H_ #include -#include #include "leveldb/slice.h" namespace leveldb { @@ -18,7 +22,7 @@ class Status { public: // Create a success status. Status() : state_(NULL) { } - ~Status() { delete state_; } + ~Status() { delete[] state_; } // Copy the specified status. Status(const Status& s); @@ -29,7 +33,7 @@ class Status { // Return error status of an appropriate type. static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) { - return Status(kNotFound, msg, Slice()); + return Status(kNotFound, msg, msg2); } static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kCorruption, msg, msg2); @@ -55,6 +59,13 @@ class Status { std::string ToString() const; private: + // OK status has a NULL state_. Otherwise, state_ is a new[] array + // of the following form: + // state_[0..3] == length of message + // state_[4] == code + // state_[5..] == message + const char* state_; + enum Code { kOk = 0, kNotFound = 1, @@ -63,21 +74,24 @@ class Status { kInvalidArgument = 4, kIOError = 5, }; - Code code() const { return (state_ == NULL) ? kOk : state_->first; } - Status(Code code, const Slice& msg, const Slice& msg2); + Code code() const { + return (state_ == NULL) ? kOk : static_cast(state_[4]); + } - typedef std::pair State; - State* state_; + Status(Code code, const Slice& msg, const Slice& msg2); + static const char* CopyState(const char* s); }; inline Status::Status(const Status& s) { - state_ = (s.state_ == NULL) ? NULL : new State(*s.state_); + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); } inline void Status::operator=(const Status& s) { - if (this != &s) { - delete state_; - state_ = (s.state_ == NULL) ? NULL : new State(*s.state_); + // The following condition catches both aliasing (when this == &s), + // and the common case where both s and *this are ok. + if (state_ != s.state_) { + delete[] state_; + state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_); } } diff --git a/include/leveldb/table.h b/include/leveldb/table.h index bd99176..35e5d22 100644 --- a/include/leveldb/table.h +++ b/include/leveldb/table.h @@ -17,7 +17,8 @@ class RandomAccessFile; struct ReadOptions; // A Table is a sorted map from strings to strings. Tables are -// immutable and persistent. +// immutable and persistent. A Table may be safely accessed from +// multiple threads without external synchronization. class Table { public: // Attempt to open the table that is stored in bytes [0..file_size) diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h index 49d2d51..23851de 100644 --- a/include/leveldb/table_builder.h +++ b/include/leveldb/table_builder.h @@ -4,6 +4,11 @@ // // TableBuilder provides the interface used to build a Table // (an immutable and sorted map from keys to values). +// +// Multiple threads can invoke const methods on a TableBuilder without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same TableBuilder must use +// external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_TABLE_BUILDER_H_ #define STORAGE_LEVELDB_INCLUDE_TABLE_BUILDER_H_ diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 3411952..b4446c2 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -12,11 +12,17 @@ // batch.Delete("key"); // batch.Put("key", "v2"); // batch.Put("key", "v3"); +// +// Multiple threads can invoke const methods on a WriteBatch without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same WriteBatch must use +// external synchronization. #ifndef STORAGE_LEVELDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_LEVELDB_INCLUDE_WRITE_BATCH_H_ #include +#include "leveldb/status.h" namespace leveldb { @@ -36,6 +42,15 @@ class WriteBatch { // Clear all updates buffered in this batch. void Clear(); + // Support for iterating over the contents of a batch. + class Handler { + public: + virtual ~Handler(); + virtual void Put(const Slice& key, const Slice& value) = 0; + virtual void Delete(const Slice& key) = 0; + }; + Status Iterate(Handler* handler) const; + private: friend class WriteBatchInternal; diff --git a/table/block_builder.cc b/table/block_builder.cc index dc958c8..d2ffa21 100644 --- a/table/block_builder.cc +++ b/table/block_builder.cc @@ -80,7 +80,7 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { if (counter_ < options_->block_restart_interval) { // See how much sharing to do with previous string const size_t min_length = std::min(last_key_piece.size(), key.size()); - while ((shared < min_length) && (last_key_[shared] == key[shared])) { + while ((shared < min_length) && (last_key_piece[shared] == key[shared])) { shared++; } } else { diff --git a/table/table_test.cc b/table/table_test.cc index 4b3e85e..cf2bae0 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -319,13 +319,15 @@ class MemTableConstructor: public Constructor { : Constructor(cmp), internal_comparator_(cmp) { memtable_ = new MemTable(internal_comparator_); + memtable_->Ref(); } ~MemTableConstructor() { - delete memtable_; + memtable_->Unref(); } virtual Status FinishImpl(const Options& options, const KVMap& data) { - delete memtable_; + memtable_->Unref(); memtable_ = new MemTable(internal_comparator_); + memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -736,16 +738,17 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable memtable(cmp); + MemTable* memtable = new MemTable(cmp); + memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); batch.Put(std::string("k1"), std::string("v1")); batch.Put(std::string("k2"), std::string("v2")); batch.Put(std::string("k3"), std::string("v3")); batch.Put(std::string("largekey"), std::string("vlarge")); - ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &memtable).ok()); + ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok()); - Iterator* iter = memtable.NewIterator(); + Iterator* iter = memtable->NewIterator(); iter->SeekToFirst(); while (iter->Valid()) { fprintf(stderr, "key: '%s' -> '%s'\n", @@ -755,6 +758,7 @@ TEST(MemTableTest, Simple) { } delete iter; + memtable->Unref(); } static bool Between(uint64_t val, uint64_t low, uint64_t high) { diff --git a/util/env_chromium.cc b/util/env_chromium.cc index fd3a4c7..1af525a 100644 --- a/util/env_chromium.cc +++ b/util/env_chromium.cc @@ -141,6 +141,13 @@ class ChromiumSequentialFile: public SequentialFile { } return s; } + + virtual Status Skip(uint64_t n) { + if (fseek(file_, n, SEEK_CUR)) { + return Status::IOError(filename_, strerror(errno)); + } + return Status::OK(); + } }; class ChromiumRandomAccessFile: public RandomAccessFile { diff --git a/util/env_posix.cc b/util/env_posix.cc index 5cddb0c..fec1599 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -52,6 +52,13 @@ class PosixSequentialFile: public SequentialFile { } return s; } + + virtual Status Skip(uint64_t n) { + if (fseek(file_, n, SEEK_CUR)) { + return Status::IOError(filename_, strerror(errno)); + } + return Status::OK(); + } }; class PosixRandomAccessFile: public RandomAccessFile { diff --git a/util/status.cc b/util/status.cc index d9b7195..02051a9 100644 --- a/util/status.cc +++ b/util/status.cc @@ -8,13 +8,29 @@ namespace leveldb { +const char* Status::CopyState(const char* state) { + uint32_t size; + memcpy(&size, state, sizeof(size)); + char* result = new char[size + 5]; + memcpy(result, state, size + 5); + return result; +} + Status::Status(Code code, const Slice& msg, const Slice& msg2) { assert(code != kOk); - state_ = new State(make_pair(code, std::string(msg.data(), msg.size()))); - if (!msg2.empty()) { - state_->second.append(": "); - state_->second.append(msg2.data(), msg2.size()); + const uint32_t len1 = msg.size(); + const uint32_t len2 = msg2.size(); + const uint32_t size = len1 + (len2 ? (2 + len2) : 0); + char* result = new char[size + 5]; + memcpy(result, &size, sizeof(size)); + result[4] = static_cast(code); + memcpy(result + 5, msg.data(), len1); + if (len2) { + result[5 + len1] = ':'; + result[6 + len1] = ' '; + memcpy(result + 7 + len1, msg2.data(), len2); } + state_ = result; } std::string Status::ToString() const { @@ -23,12 +39,12 @@ std::string Status::ToString() const { } else { char tmp[30]; const char* type; - switch (state_->first) { + switch (code()) { case kOk: type = "OK"; break; case kNotFound: - type = "NotFound"; + type = "NotFound: "; break; case kCorruption: type = "Corruption: "; @@ -44,14 +60,14 @@ std::string Status::ToString() const { break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", - static_cast(state_->first)); + static_cast(code())); type = tmp; break; } std::string result(type); - if (!state_->second.empty()) { - result.append(state_->second); - } + uint32_t length; + memcpy(&length, state_, sizeof(length)); + result.append(state_ + 5, length); return result; } }