Przeglądaj źródła

sync with upstream @ 21409451

Check the NEWS file for details of what changed.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@28 62dab493-f737-651d-591e-8d6aee1b9529
master
dgrogan@chromium.org 13 lat temu
rodzic
commit
da79909507
34 zmienionych plików z 953 dodań i 406 usunięć
  1. +17
    -0
      NEWS
  2. +55
    -11
      db/db_bench.cc
  3. +46
    -18
      db/db_impl.cc
  4. +88
    -21
      db/db_test.cc
  5. +10
    -0
      db/dbformat.h
  6. +97
    -19
      db/log_reader.cc
  7. +35
    -2
      db/log_reader.h
  8. +141
    -2
      db/log_test.cc
  9. +2
    -1
      db/log_writer.cc
  10. +11
    -3
      db/memtable.cc
  11. +17
    -1
      db/memtable.h
  12. +8
    -4
      db/repair.cc
  13. +11
    -11
      db/snapshot.h
  14. +171
    -138
      db/version_set.cc
  15. +18
    -20
      db/version_set.h
  16. +63
    -73
      db/write_batch.cc
  17. +0
    -24
      db/write_batch_internal.h
  18. +5
    -3
      db/write_batch_test.cc
  19. +11
    -15
      doc/impl.html
  20. +12
    -4
      doc/index.html
  21. +3
    -1
      include/leveldb/comparator.h
  22. +13
    -7
      include/leveldb/db.h
  23. +12
    -0
      include/leveldb/env.h
  24. +5
    -0
      include/leveldb/iterator.h
  25. +5
    -0
      include/leveldb/slice.h
  26. +25
    -11
      include/leveldb/status.h
  27. +2
    -1
      include/leveldb/table.h
  28. +5
    -0
      include/leveldb/table_builder.h
  29. +15
    -0
      include/leveldb/write_batch.h
  30. +1
    -1
      table/block_builder.cc
  31. +9
    -5
      table/table_test.cc
  32. +7
    -0
      util/env_chromium.cc
  33. +7
    -0
      util/env_posix.cc
  34. +26
    -10
      util/status.cc

+ 17
- 0
NEWS Wyświetl plik

@ -0,0 +1,17 @@
Release 1.2 2011-05-16
----------------------
Fixes for larger databases (tested up to one billion 100-byte entries,
i.e., ~100GB).
(1) Place hard limit on number of level-0 files. This fixes errors
of the form "too many open files".
(2) Fixed memtable management. Before the fix, a heavy write burst
could cause unbounded memory usage.
A fix for a logging bug where the reader would incorrectly complain
about corruption.
Allow public access to WriteBatch contents so that users can easily
wrap a DB.

+ 55
- 11
db/db_bench.cc Wyświetl plik

@ -24,9 +24,10 @@
// overwrite -- overwrite N values in random key order in async mode
// fillsync -- write N/100 values in random key order in sync mode
// fill100K -- write N/1000 100K values in random order in async mode
// readseq -- read N values sequentially
// readreverse -- read N values in reverse order
// readrandom -- read N values in random order
// readseq -- read N times sequentially
// readreverse -- read N times in reverse order
// readrandom -- read N times in random order
// readhot -- read N times in random order from 1% section of DB
// crc32c -- repeated crc32c of 4K of data
// Meta operations:
// compact -- Compact the entire DB
@ -54,6 +55,9 @@ static const char* FLAGS_benchmarks =
// Number of key/values to place in database
static int FLAGS_num = 1000000;
// Number of read operations to do. If negative, do FLAGS_num reads.
static int FLAGS_reads = -1;
// Size of each value
static int FLAGS_value_size = 100;
@ -72,6 +76,14 @@ static int FLAGS_write_buffer_size = 0;
// Negative means use default settings.
static int FLAGS_cache_size = -1;
// Maximum number of files to keep open at the same time (use default if == 0)
static int FLAGS_open_files = 0;
// If true, do not destroy the existing database. If you set this
// flag and also specify a benchmark that wants a fresh database, that
// benchmark will fail.
static bool FLAGS_use_existing_db = false;
namespace leveldb {
// Helper for quickly generating random data.
@ -126,6 +138,7 @@ class Benchmark {
Cache* cache_;
DB* db_;
int num_;
int reads_;
int heap_counter_;
double start_;
double last_op_finish_;
@ -298,6 +311,7 @@ class Benchmark {
: cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
db_(NULL),
num_(FLAGS_num),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0),
bytes_(0),
rand_(301) {
@ -308,7 +322,9 @@ class Benchmark {
Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]);
}
}
DestroyDB("/tmp/dbbench", Options());
if (!FLAGS_use_existing_db) {
DestroyDB("/tmp/dbbench", Options());
}
}
~Benchmark() {
@ -355,11 +371,13 @@ class Benchmark {
ReadReverse();
} else if (name == Slice("readrandom")) {
ReadRandom();
} else if (name == Slice("readhot")) {
ReadHot();
} else if (name == Slice("readrandomsmall")) {
int n = num_;
num_ /= 1000;
int n = reads_;
reads_ /= 1000;
ReadRandom();
num_ = n;
reads_ = n;
} else if (name == Slice("compact")) {
Compact();
} else if (name == Slice("crc32c")) {
@ -449,7 +467,7 @@ class Benchmark {
void Open() {
assert(db_ == NULL);
Options options;
options.create_if_missing = true;
options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
Status s = DB::Open(options, "/tmp/dbbench", &db_);
@ -462,6 +480,10 @@ class Benchmark {
void Write(const WriteOptions& options, Order order, DBState state,
int num_entries, int value_size, int entries_per_batch) {
if (state == FRESH) {
if (FLAGS_use_existing_db) {
message_ = "skipping (--use_existing_db is true)";
return;
}
delete db_;
db_ = NULL;
DestroyDB("/tmp/dbbench", Options());
@ -499,7 +521,7 @@ class Benchmark {
void ReadSequential() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) {
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
@ -510,7 +532,7 @@ class Benchmark {
void ReadReverse() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) {
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
@ -521,7 +543,7 @@ class Benchmark {
void ReadRandom() {
ReadOptions options;
std::string value;
for (int i = 0; i < num_; i++) {
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = rand_.Next() % FLAGS_num;
snprintf(key, sizeof(key), "%016d", k);
@ -530,6 +552,19 @@ class Benchmark {
}
}
void ReadHot() {
ReadOptions options;
std::string value;
const int range = (FLAGS_num + 99) / 100;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = rand_.Next() % range;
snprintf(key, sizeof(key), "%016d", k);
db_->Get(options, key, &value);
FinishedSingleOp();
}
}
void Compact() {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
@ -582,6 +617,8 @@ class Benchmark {
int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_open_files = leveldb::Options().max_open_files;
for (int i = 1; i < argc; i++) {
double d;
int n;
@ -593,14 +630,21 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_histogram = n;
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_use_existing_db = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
FLAGS_reads = n;
} else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
FLAGS_open_files = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

+ 46
- 18
db/db_impl.cc Wyświetl plik

@ -126,6 +126,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
log_(NULL),
bg_compaction_scheduled_(false),
compacting_(false) {
mem_->Ref();
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache.
@ -152,8 +153,8 @@ DBImpl::~DBImpl() {
}
delete versions_;
delete mem_;
delete imm_;
if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref();
delete log_;
delete logfile_;
delete table_cache_;
@ -344,7 +345,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(file, &reporter, true/*checksum*/);
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(env_, options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
@ -364,6 +366,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
if (mem == NULL) {
mem = new MemTable(internal_comparator_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
@ -384,7 +387,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
break;
}
delete mem;
mem->Unref();
mem = NULL;
}
}
@ -395,7 +398,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
// file-systems cause the DB::Open() to fail.
}
delete mem;
if (mem != NULL) mem->Unref();
delete file;
return status;
}
@ -443,11 +446,12 @@ Status DBImpl::CompactMemTable() {
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
s = versions_->LogAndApply(&edit, imm_);
s = versions_->LogAndApply(&edit);
}
if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles();
@ -556,7 +560,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), NULL);
status = versions_->LogAndApply(c->edit());
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
@ -697,7 +701,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
compact->outputs.clear();
Status s = versions_->LogAndApply(compact->compaction->edit(), NULL);
Status s = versions_->LogAndApply(compact->compaction->edit());
if (s.ok()) {
compact->compaction->ReleaseInputs();
DeleteObsoleteFiles();
@ -754,9 +758,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Slice key = input->key();
InternalKey tmp_internal_key;
tmp_internal_key.DecodeFrom(key);
if (compact->compaction->ShouldStopBefore(tmp_internal_key) &&
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
@ -867,6 +869,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
compacting_ = false;
compacting_cv_.SignalAll();
VersionSet::LevelSummaryStorage tmp;
Log(env_, options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
@ -925,10 +930,11 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
SequenceNumber sequence =
(options.snapshot ? options.snapshot->number_ : latest_snapshot);
return NewDBIterator(&dbname_, env_,
user_comparator(), internal_iter, sequence);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
(options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
void DBImpl::Unref(void* arg1, void* arg2) {
@ -945,7 +951,7 @@ const Snapshot* DBImpl::GetSnapshot() {
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
MutexLock l(&mutex_);
snapshots_.Delete(s);
snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
}
// Convenience methods
@ -985,12 +991,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
@ -999,6 +1019,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
compacting_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
compacting_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
@ -1011,7 +1034,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
s = versions_->LogAndApply(&edit, NULL);
s = versions_->LogAndApply(&edit);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
@ -1024,6 +1047,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
@ -1141,10 +1165,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit, NULL);
s = impl->versions_->LogAndApply(&edit);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
@ -1156,6 +1181,9 @@ Status DB::Open(const Options& options, const std::string& dbname,
return s;
}
Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
std::vector<std::string> filenames;

+ 88
- 21
db/db_test.cc Wyświetl plik

@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/db.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
@ -802,8 +801,17 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}
class ModelDB: public DB {
public:
class ModelSnapshot : public Snapshot {
public:
KVMap map_;
};
explicit ModelDB(const Options& options): options_(options) { }
~ModelDB() { }
virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
@ -824,35 +832,34 @@ class ModelDB: public DB {
return new ModelIter(saved, true);
} else {
const KVMap* snapshot_state =
reinterpret_cast<const KVMap*>(options.snapshot->number_);
&(reinterpret_cast<const ModelSnapshot*>(options.snapshot)->map_);
return new ModelIter(snapshot_state, false);
}
}
virtual const Snapshot* GetSnapshot() {
KVMap* saved = new KVMap;
*saved = map_;
return snapshots_.New(
reinterpret_cast<SequenceNumber>(saved));
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
return snapshot;
}
virtual void ReleaseSnapshot(const Snapshot* snapshot) {
const KVMap* saved = reinterpret_cast<const KVMap*>(snapshot->number_);
delete saved;
snapshots_.Delete(snapshot);
delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
virtual Status Write(const WriteOptions& options, WriteBatch* batch) {
assert(options.post_write_snapshot == NULL); // Not supported
for (WriteBatchInternal::Iterator it(*batch); !it.Done(); it.Next()) {
switch (it.op()) {
case kTypeValue:
map_[it.key().ToString()] = it.value().ToString();
break;
case kTypeDeletion:
map_.erase(it.key().ToString());
break;
class Handler : public WriteBatch::Handler {
public:
KVMap* map_;
virtual void Put(const Slice& key, const Slice& value) {
(*map_)[key.ToString()] = value.ToString();
}
}
return Status::OK();
virtual void Delete(const Slice& key) {
map_->erase(key.ToString());
}
};
Handler handler;
handler.map_ = &map_;
return batch->Iterate(&handler);
}
virtual bool GetProperty(const Slice& property, std::string* value) {
@ -864,7 +871,6 @@ class ModelDB: public DB {
}
}
private:
typedef std::map<std::string, std::string> KVMap;
class ModelIter: public Iterator {
public:
ModelIter(const KVMap* map, bool owned)
@ -897,7 +903,6 @@ class ModelDB: public DB {
};
const Options options_;
KVMap map_;
SnapshotList snapshots_;
};
static std::string RandomKey(Random* rnd) {
@ -1023,8 +1028,70 @@ TEST(DBTest, Randomized) {
if (db_snap != NULL) db_->ReleaseSnapshot(db_snap);
}
std::string MakeKey(unsigned int num) {
char buf[30];
snprintf(buf, sizeof(buf), "%016u", num);
return std::string(buf);
}
void BM_LogAndApply(int iters, int num_base_files) {
std::string dbname = test::TmpDir() + "/leveldb_test_benchmark";
DestroyDB(dbname, Options());
DB* db = NULL;
Options opts;
opts.create_if_missing = true;
Status s = DB::Open(opts, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != NULL);
delete db;
db = NULL;
Env* env = Env::Default();
InternalKeyComparator cmp(BytewiseComparator());
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
}
ASSERT_OK(vset.LogAndApply(&vbase));
uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) {
VersionEdit vedit;
vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
vset.LogAndApply(&vedit);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;
char buf[16];
snprintf(buf, sizeof(buf), "%d", num_base_files);
fprintf(stderr,
"BM_LogAndApply/%-6s %8d iters : %9u us (%7.0f us / iter)\n",
buf, iters, us, ((float)us) / iters);
}
}
int main(int argc, char** argv) {
if (argc > 1 && std::string(argv[1]) == "--benchmark") {
leveldb::BM_LogAndApply(1000, 1);
leveldb::BM_LogAndApply(1000, 100);
leveldb::BM_LogAndApply(1000, 10000);
leveldb::BM_LogAndApply(100, 100000);
return 0;
}
return leveldb::test::RunAllTests();
}

+ 10
- 0
db/dbformat.h Wyświetl plik

@ -19,6 +19,16 @@ namespace leveldb {
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;
// Soft limit on number of level-0 files. We slow down writes at this point.
static const int kL0_SlowdownWritesTrigger = 8;
// Maximum number of level-0 files. We stop writes at this point.
static const int kL0_StopWritesTrigger = 12;
}
class InternalKey;

+ 97
- 19
db/log_reader.cc Wyświetl plik

@ -4,7 +4,6 @@
#include "db/log_reader.h"
#include <stdint.h>
#include "leveldb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
@ -15,46 +14,104 @@ namespace log {
Reader::Reporter::~Reporter() {
}
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum)
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset)
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false) {
eof_(false),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
}
Reader::~Reader() {
delete[] backing_store_;
}
bool Reader::SkipToInitialBlock() {
size_t offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;
// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
offset_in_block = 0;
block_start_location += kBlockSize;
}
end_of_buffer_offset_ = block_start_location;
// Skip to start of first block that can contain the initial record
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(block_start_location, skip_status);
return false;
}
}
return true;
}
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
switch (ReadPhysicalRecord(&fragment)) {
case kFullType:
if (in_fragmented_record) {
ReportDrop(scratch->size(), "partial record without end");
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
ReportDrop(scratch->size(), "partial record without end");
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportDrop(fragment.size(), "missing start of fragmented record");
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
@ -62,31 +119,33 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
case kLastType:
if (!in_fragmented_record) {
ReportDrop(fragment.size(), "missing start of fragmented record");
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
ReportDrop(scratch->size(), "partial record without end");
ReportCorruption(scratch->size(), "partial record without end(3)");
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportDrop(scratch->size(), "error in middle of record");
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default:
ReportDrop(
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
"unknown record type");
in_fragmented_record = false;
@ -97,9 +156,18 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
return false;
}
void Reader::ReportDrop(size_t bytes, const char* reason) {
if (reporter_ != NULL) {
reporter_->Corruption(bytes, Status::Corruption(reason));
uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
void Reader::ReportDrop(size_t bytes, const Status& reason) {
if (reporter_ != NULL &&
end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
reporter_->Corruption(bytes, reason);
}
}
@ -110,11 +178,10 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
if (reporter_ != NULL) {
reporter_->Corruption(kBlockSize, status);
}
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
@ -125,8 +192,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// End of file
return kEof;
} else {
ReportDrop(buffer_.size(), "truncated record at end of file");
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
@ -138,8 +206,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
ReportDrop(buffer_.size(), "bad record length");
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
@ -160,13 +229,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
ReportDrop(buffer_.size(), "checksum mismatch");
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}

+ 35
- 2
db/log_reader.h Wyświetl plik

@ -5,6 +5,8 @@
#ifndef STORAGE_LEVELDB_DB_LOG_READER_H_
#define STORAGE_LEVELDB_DB_LOG_READER_H_
#include <stdint.h>
#include "db/log_format.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
@ -35,7 +37,11 @@ class Reader {
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
Reader(SequentialFile* file, Reporter* reporter, bool checksum);
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
Reader(SequentialFile* file, Reporter* reporter, bool checksum,
uint64_t initial_offset);
~Reader();
@ -46,6 +52,11 @@ class Reader {
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch);
// Returns the physical offset of the last record returned by ReadRecord.
//
// Undefined before the first call to ReadRecord.
uint64_t LastRecordOffset();
private:
SequentialFile* const file_;
Reporter* const reporter_;
@ -54,15 +65,37 @@ class Reader {
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_;
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};
// Skips all blocks that are completely before "initial_offset_".
//
// Returns true on success. Handles reporting.
bool SkipToInitialBlock();
// Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result);
void ReportDrop(size_t bytes, const char* reason);
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
// No copying allowed
Reader(const Reader&);

+ 141
- 2
db/log_test.cc Wyświetl plik

@ -60,7 +60,6 @@ class LogTest {
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
ASSERT_EQ(kBlockSize, n);
if (force_error_) {
force_error_ = false;
@ -76,6 +75,17 @@ class LogTest {
contents_.remove_prefix(n);
return Status::OK();
}
virtual Status Skip(size_t n) {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
}
contents_.remove_prefix(n);
return Status::OK();
}
};
class ReportCollector : public Reader::Reporter {
@ -97,10 +107,15 @@ class LogTest {
Writer writer_;
Reader reader_;
// Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[];
static uint64_t initial_offset_last_record_offsets_[];
public:
LogTest() : reading_(false),
writer_(&dest_),
reader_(&source_, &report_, true/*checksum*/) {
reader_(&source_, &report_, true/*checksum*/,
0/*initial_offset*/) {
}
void Write(const std::string& msg) {
@ -153,6 +168,10 @@ class LogTest {
return report_.dropped_bytes_;
}
std::string ReportMessage() const {
return report_.message_;
}
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
@ -161,8 +180,61 @@ class LogTest {
return "OK";
}
}
void WriteInitialOffsetLog() {
for (int i = 0; i < 4; i++) {
std::string record(initial_offset_record_sizes_[i],
static_cast<char>('a' + i));
Write(record);
}
}
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
source_.contents_ = Slice(dest_.contents_);
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
WrittenBytes() + offset_past_end);
Slice record;
std::string scratch;
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
delete offset_reader;
}
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
reading_ = true;
source_.contents_ = Slice(dest_.contents_);
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
initial_offset);
Slice record;
std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
record.size());
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
offset_reader->LastRecordOffset());
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
delete offset_reader;
}
};
size_t LogTest::initial_offset_record_sizes_[] =
{10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
1};
uint64_t LogTest::initial_offset_last_record_offsets_[] =
{0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
TEST(LogTest, Empty) {
ASSERT_EQ("EOF", Read());
}
@ -213,6 +285,19 @@ TEST(LogTest, MarginalTrailer) {
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2*kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("", ReportMessage());
}
TEST(LogTest, ShortTrailer) {
const int n = kBlockSize - 2*kHeaderSize + 4;
Write(BigString("foo", n));
@ -353,6 +438,60 @@ TEST(LogTest, ErrorJoinsRecords) {
ASSERT_GE(dropped, 2*kBlockSize);
}
TEST(LogTest, ReadStart) {
CheckInitialOffsetRecord(0, 0);
}
TEST(LogTest, ReadSecondOneOff) {
CheckInitialOffsetRecord(1, 1);
}
TEST(LogTest, ReadSecondTenThousand) {
CheckInitialOffsetRecord(10000, 1);
}
TEST(LogTest, ReadSecondStart) {
CheckInitialOffsetRecord(10007, 1);
}
TEST(LogTest, ReadThirdOneOff) {
CheckInitialOffsetRecord(10008, 2);
}
TEST(LogTest, ReadThirdStart) {
CheckInitialOffsetRecord(20014, 2);
}
TEST(LogTest, ReadFourthOneOff) {
CheckInitialOffsetRecord(20015, 3);
}
TEST(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
}
TEST(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
}
TEST(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
}
TEST(LogTest, ReadFourthStart) {
CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
3);
}
TEST(LogTest, ReadEnd) {
CheckOffsetPastEndReturnsNoRecords(0);
}
TEST(LogTest, ReadPastEnd) {
CheckOffsetPastEndReturnsNoRecords(5);
}
}
}

+ 2
- 1
db/log_writer.cc Wyświetl plik

@ -32,6 +32,7 @@ Status Writer::AddRecord(const Slice& slice) {
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
@ -52,7 +53,6 @@ Status Writer::AddRecord(const Slice& slice) {
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool begin = (ptr == slice.data());
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
@ -67,6 +67,7 @@ Status Writer::AddRecord(const Slice& slice) {
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

+ 11
- 3
db/memtable.cc Wyświetl plik

@ -20,10 +20,12 @@ static Slice GetLengthPrefixedSlice(const char* data) {
MemTable::MemTable(const InternalKeyComparator& cmp)
: comparator_(cmp),
refs_(0),
table_(comparator_, &arena_) {
}
MemTable::~MemTable() {
assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
@ -48,10 +50,15 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) {
explicit MemTableIterator(MemTable* mem, MemTable::Table* table) {
mem_ = mem;
iter_ = new MemTable::Table::Iterator(table);
mem->Ref();
}
virtual ~MemTableIterator() {
delete iter_;
mem_->Unref();
}
virtual ~MemTableIterator() { delete iter_; }
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
@ -68,6 +75,7 @@ class MemTableIterator: public Iterator {
virtual Status status() const { return Status::OK(); }
private:
MemTable* mem_;
MemTable::Table::Iterator* iter_;
std::string tmp_; // For passing to EncodeKey
@ -77,7 +85,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
return new MemTableIterator(this, &table_);
}
void MemTable::Add(SequenceNumber s, ValueType type,

+ 17
- 1
db/memtable.h Wyświetl plik

@ -19,8 +19,21 @@ class MemTableIterator;
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
~MemTable();
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
// Returns an estimate of the number of bytes of data in use by this
// data structure.
@ -45,6 +58,8 @@ class MemTable {
const Slice& value);
private:
~MemTable(); // Private since only Unref() should be used to delete it
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
@ -56,6 +71,7 @@ class MemTable {
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;

+ 8
- 4
db/repair.cc Wyświetl plik

@ -183,13 +183,15 @@ class Repairer {
// corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence
// numbers).
log::Reader reader(lfile, &reporter, false/*do not checksum*/);
log::Reader reader(lfile, &reporter, false/*do not checksum*/,
0/*initial_offset*/);
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
MemTable mem(icmp_);
MemTable* mem = new MemTable(icmp_);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
@ -198,7 +200,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, &mem);
status = WriteBatchInternal::InsertInto(&batch, mem);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
@ -215,10 +217,12 @@ class Repairer {
VersionEdit skipped;
FileMetaData meta;
meta.number = next_file_number_++;
Iterator* iter = mem.NewIterator();
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter,
&meta, &skipped);
delete iter;
mem->Unref();
mem = NULL;
if (status.ok()) {
if (meta.file_size > 0) {
table_numbers_.push_back(meta.number);

+ 11
- 11
db/snapshot.h Wyświetl plik

@ -12,17 +12,17 @@ namespace leveldb {
class SnapshotList;
// Snapshots are kept in a doubly-linked list in the DB.
// Each Snapshot corresponds to a particular sequence number.
class Snapshot {
// Each SnapshotImpl corresponds to a particular sequence number.
class SnapshotImpl : public Snapshot {
public:
SequenceNumber number_; // const after creation
private:
friend class SnapshotList;
// Snapshot is kept in a doubly-linked circular list
Snapshot* prev_;
Snapshot* next_;
// SnapshotImpl is kept in a doubly-linked circular list
SnapshotImpl* prev_;
SnapshotImpl* next_;
SnapshotList* list_; // just for sanity checks
};
@ -35,11 +35,11 @@ class SnapshotList {
}
bool empty() const { return list_.next_ == &list_; }
Snapshot* oldest() const { assert(!empty()); return list_.next_; }
Snapshot* newest() const { assert(!empty()); return list_.prev_; }
SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
const Snapshot* New(SequenceNumber seq) {
Snapshot* s = new Snapshot;
const SnapshotImpl* New(SequenceNumber seq) {
SnapshotImpl* s = new SnapshotImpl;
s->number_ = seq;
s->list_ = this;
s->next_ = &list_;
@ -49,7 +49,7 @@ class SnapshotList {
return s;
}
void Delete(const Snapshot* s) {
void Delete(const SnapshotImpl* s) {
assert(s->list_ == this);
s->prev_->next_ = s->next_;
s->next_->prev_ = s->prev_;
@ -58,7 +58,7 @@ class SnapshotList {
private:
// Dummy head of doubly-linked list of snapshots
Snapshot list_;
SnapshotImpl list_;
};
}

+ 171
- 138
db/version_set.cc Wyświetl plik

@ -57,17 +57,22 @@ std::string IntSetToString(const std::set& s) {
Version::~Version() {
assert(refs_ == 0);
// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs >= 0);
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
delete cleanup_mem_;
}
// An internal iterator. For a given version/level pair, yields
@ -77,9 +82,9 @@ Version::~Version() {
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const Version* version,
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(version->vset_->icmp_.user_comparator()),
: icmp_(icmp),
flist_(flist),
index_(flist->size()) { // Marks as invalid
}
@ -157,7 +162,7 @@ static Iterator* GetFileIterator(void* arg,
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
int level) const {
return NewTwoLevelIterator(
new LevelFileNumIterator(this, &files_[level]),
new LevelFileNumIterator(vset_->icmp_, &files_[level]),
&GetFileIterator, vset_->table_cache_, options);
}
@ -185,11 +190,11 @@ void Version::Ref() {
}
void Version::Unref() {
assert(this != &vset_->dummy_versions_);
assert(refs_ >= 1);
--refs_;
if (refs_ == 0) {
vset_->MaybeDeleteOldVersions();
// TODO: try to delete obsolete files
delete this;
}
}
@ -222,37 +227,58 @@ std::string Version::DebugString() const {
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
typedef std::map<uint64_t, FileMetaData*> FileMap;
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
}
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
FileMap files_[config::kNumLevels];
Version* base_;
LevelState levels_[config::kNumLevels];
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base)
: vset_(vset) {
: vset_(vset),
base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = base->files_[level];
for (size_t i = 0; i < files.size(); i++) {
FileMetaData* f = files[i];
f->refs++;
files_[level].insert(std::make_pair(f->number, f));
}
levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileMap& fmap = files_[level];
for (FileMap::const_iterator iter = fmap.begin();
iter != fmap.end();
++iter) {
FileMetaData* f = iter->second;
std::vector<FileMetaData*> to_unref(levels_[level].added_files->begin(),
levels_[level].added_files->end());
delete levels_[level].added_files;
for (int i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}
// Apply all of the edits in *edit to the current state.
@ -271,16 +297,7 @@ class VersionSet::Builder {
++iter) {
const int level = iter->first;
const uint64_t number = iter->second;
FileMap::iterator fiter = files_[level].find(number);
assert(fiter != files_[level].end()); // Sanity check for debug mode
if (fiter != files_[level].end()) {
FileMetaData* f = fiter->second;
f->refs--;
if (f->refs <= 0) {
delete f;
}
files_[level].erase(fiter);
}
levels_[level].deleted_files.insert(number);
}
// Add new files
@ -288,22 +305,66 @@ class VersionSet::Builder {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;
assert(files_[level].count(f->number) == 0);
files_[level].insert(std::make_pair(f->number, f));
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
const FileMap& fmap = files_[level];
for (FileMap::const_iterator iter = fmap.begin();
iter != fmap.end();
++iter) {
FileMetaData* f = iter->second;
f->refs++;
v->files_[level].push_back(f);
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
const FileSet* added = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added->size());
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos
= std::upper_bound(base_iter, base_end, *added_iter, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
MaybeAddFile(v, level, *added_iter);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (int i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
EscapeString(prev_end.Encode()).c_str(),
EscapeString(this_begin.Encode()).c_str());
abort();
}
}
}
#endif
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
f->refs++;
v->files_[level].push_back(f);
}
}
};
@ -324,22 +385,36 @@ VersionSet::VersionSet(const std::string& dbname,
prev_log_number_(0),
descriptor_file_(NULL),
descriptor_log_(NULL),
current_(new Version(this)),
oldest_(current_) {
dummy_versions_(this),
current_(NULL) {
AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
for (Version* v = oldest_; v != NULL; ) {
Version* next = v->next_;
assert(v->refs_ == 0);
delete v;
v = next;
}
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete descriptor_log_;
delete descriptor_file_;
}
Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != NULL) {
current_->Unref();
}
current_ = v;
v->Ref();
// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::LogAndApply(VersionEdit* edit) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
@ -360,22 +435,20 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
builder.Apply(edit);
builder.SaveTo(v);
}
std::string new_manifest_file;
Status s = Finalize(v);
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
if (s.ok()) {
if (descriptor_log_ == NULL) {
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
std::string new_manifest_file;
Status s;
if (descriptor_log_ == NULL) {
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
@ -397,12 +470,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
// Install the new version
if (s.ok()) {
assert(current_->next_ == NULL);
assert(current_->cleanup_mem_ == NULL);
current_->cleanup_mem_ = cleanup_mem;
v->next_ = NULL;
current_->next_ = v;
current_ = v;
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
@ -458,7 +526,7 @@ Status VersionSet::Recover() {
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/);
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -518,20 +586,14 @@ Status VersionSet::Recover() {
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
s = Finalize(v);
if (!s.ok()) {
delete v;
} else {
// Install recovered version
v->next_ = NULL;
current_->next_ = v;
current_ = v;
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
@ -545,15 +607,12 @@ static int64_t TotalFileSize(const std::vector& files) {
return sum;
}
Status VersionSet::Finalize(Version* v) {
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
Status s;
for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) {
s = SortLevel(v, level);
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
@ -567,7 +626,8 @@ Status VersionSet::Finalize(Version* v) {
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() / 4.0;
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
@ -582,7 +642,6 @@ Status VersionSet::Finalize(Version* v) {
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
return s;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
@ -615,44 +674,27 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record);
}
// Helper to sort by tables_[file_number].smallest
struct VersionSet::BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
return internal_comparator->Compare(f1->smallest, f2->smallest) < 0;
}
};
Status VersionSet::SortLevel(Version* v, uint64_t level) {
Status result;
BySmallestKey cmp;
cmp.internal_comparator = &icmp_;
std::sort(v->files_[level].begin(), v->files_[level].end(), cmp);
if (result.ok() && level > 0) {
// There should be no overlap
for (size_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (icmp_.Compare(prev_end, this_begin) >= 0) {
result = Status::Corruption(
"overlapping ranges in same level",
(EscapeString(prev_end.Encode()) + " vs. " +
EscapeString(this_begin.Encode())));
break;
}
}
}
return result;
}
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
return current_->files_[level].size();
}
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
snprintf(scratch->buffer, sizeof(scratch->buffer),
"files[ %d %d %d %d %d %d %d ]",
int(current_->files_[0].size()),
int(current_->files_[1].size()),
int(current_->files_[2].size()),
int(current_->files_[3].size()),
int(current_->files_[4].size()),
int(current_->files_[5].size()),
int(current_->files_[6].size()));
return scratch->buffer;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
@ -685,19 +727,10 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
return result;
}
void VersionSet::MaybeDeleteOldVersions() {
// Note: it is important to delete versions in order since a newer
// version with zero refs may be holding a pointer to a memtable
// that is used by somebody who has a ref on an older version.
while (oldest_ != current_ && oldest_->refs_ == 0) {
Version* next = oldest_->next_;
delete oldest_;
oldest_ = next;
}
}
void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = oldest_; v != NULL; v = v->next_) {
for (Version* v = dummy_versions_.next_;
v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
@ -809,8 +842,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(
c->input_version_, &c->inputs_[which]),
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
&GetFileIterator, table_cache_, options);
}
}
@ -996,11 +1028,12 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
return true;
}
bool Compaction::ShouldStopBefore(const InternalKey& key) {
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(key, grandparents_[grandparent_index_]->largest) > 0) {
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}

+ 18
- 20
db/version_set.h Wyświetl plik

@ -59,8 +59,8 @@ class Version {
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
MemTable* cleanup_mem_; // NULL, or table to delete when version dropped
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
@ -72,8 +72,7 @@ class Version {
int compaction_level_;
explicit Version(VersionSet* vset)
: vset_(vset), next_(NULL), refs_(0),
cleanup_mem_(NULL),
: vset_(vset), next_(this), prev_(this), refs_(0),
compaction_score_(-1),
compaction_level_(-1) {
}
@ -95,10 +94,8 @@ class VersionSet {
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Iff Apply() returns OK, arrange to delete
// cleanup_mem (if cleanup_mem != NULL) when it is no longer needed
// by older versions.
Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem);
// current version.
Status LogAndApply(VersionEdit* edit);
// Recover the last saved descriptor from persistent storage.
Status Recover();
@ -171,19 +168,20 @@ class VersionSet {
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
private:
class Builder;
friend class Compaction;
friend class Version;
Status Finalize(Version* v);
// Delete any old versions that are no longer needed.
void MaybeDeleteOldVersions();
struct BySmallestKey;
Status SortLevel(Version* v, uint64_t level);
void Finalize(Version* v);
void GetOverlappingInputs(
int level,
@ -202,6 +200,8 @@ class VersionSet {
void SetupOtherInputs(Compaction* c);
void AppendVersion(Version* v);
Env* const env_;
const std::string dbname_;
const Options* const options_;
@ -216,10 +216,8 @@ class VersionSet {
// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
// Versions are kept in a singly linked list that is never empty
Version* current_; // Pointer to the last (newest) list entry
Version* oldest_; // Pointer to the first (oldest) list entry
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
@ -265,8 +263,8 @@ class Compaction {
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
// before processing "key".
bool ShouldStopBefore(const InternalKey& key);
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
// Release the input version for the compaction, once the compaction
// is successful.

+ 63
- 73
db/write_batch.cc Wyświetl plik

@ -29,11 +29,53 @@ WriteBatch::WriteBatch() {
WriteBatch::~WriteBatch() { }
WriteBatch::Handler::~Handler() { }
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(12);
}
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < 12) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(12);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
@ -63,28 +105,29 @@ void WriteBatch::Delete(const Slice& key) {
PutLengthPrefixedSlice(&rep_, key);
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
MemTable* memtable) {
const int count = WriteBatchInternal::Count(b);
int found = 0;
Iterator it(*b);
for (; !it.Done(); it.Next()) {
switch (it.op()) {
case kTypeDeletion:
memtable->Add(it.sequence_number(), kTypeDeletion, it.key(), Slice());
break;
case kTypeValue:
memtable->Add(it.sequence_number(), kTypeValue, it.key(), it.value());
break;
}
found++;
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;
virtual void Put(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
if (!it.status().ok()) {
return it.status();
} else if (found != count) {
return Status::Corruption("wrong count in WriteBatch");
virtual void Delete(const Slice& key) {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
return Status::OK();
};
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
@ -92,57 +135,4 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
b->rep_.assign(contents.data(), contents.size());
}
WriteBatchInternal::Iterator::Iterator(const WriteBatch& batch)
: input_(WriteBatchInternal::Contents(&batch)),
done_(false) {
if (input_.size() < 12) {
done_ = true;
} else {
seq_ = WriteBatchInternal::Sequence(&batch),
input_.remove_prefix(12);
GetNextEntry();
}
}
void WriteBatchInternal::Iterator::Next() {
assert(!done_);
seq_++;
GetNextEntry();
}
void WriteBatchInternal::Iterator::GetNextEntry() {
if (input_.empty()) {
done_ = true;
return;
}
char tag = input_[0];
input_.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input_, &key_) &&
GetLengthPrefixedSlice(&input_, &value_)) {
op_ = static_cast<ValueType>(tag);
} else {
status_ = Status::Corruption("bad WriteBatch Put");
done_ = true;
input_.clear();
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input_, &key_)) {
op_ = kTypeDeletion;
} else {
status_ = Status::Corruption("bad WriteBatch Delete");
done_ = true;
input_.clear();
}
break;
default:
status_ = Status::Corruption("unknown WriteBatch tag");
done_ = true;
input_.clear();
break;
}
}
}

+ 0
- 24
db/write_batch_internal.h Wyświetl plik

@ -37,30 +37,6 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
// Iterate over the contents of a write batch.
class Iterator {
public:
explicit Iterator(const WriteBatch& batch);
bool Done() const { return done_; }
void Next();
ValueType op() const { return op_; }
const Slice& key() const { return key_; }
const Slice& value() const { return value_; }
SequenceNumber sequence_number() const { return seq_; }
Status status() const { return status_; }
private:
void GetNextEntry();
Slice input_;
bool done_;
ValueType op_;
Slice key_;
Slice value_;
SequenceNumber seq_;
Status status_;
};
};
}

+ 5
- 3
db/write_batch_test.cc Wyświetl plik

@ -14,10 +14,11 @@ namespace leveldb {
static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator());
MemTable mem(cmp);
MemTable* mem = new MemTable(cmp);
mem->Ref();
std::string state;
Status s = WriteBatchInternal::InsertInto(b, &mem);
Iterator* iter = mem.NewIterator();
Status s = WriteBatchInternal::InsertInto(b, mem);
Iterator* iter = mem->NewIterator();
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey;
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
@ -42,6 +43,7 @@ static std::string PrintContents(WriteBatch* b) {
if (!s.ok()) {
state.append("ParseError()");
}
mem->Unref();
return state;
}

+ 11
- 15
doc/impl.html Wyświetl plik

@ -17,14 +17,14 @@ However the organization of the files that make up the representation
is somewhat different and is explained below.
<p>
Each database is represented by a set of file stored in a directory.
Each database is represented by a set of files stored in a directory.
There are several different types of files as documented below:
<p>
<h2>Log files</h2>
<p>
A log file (*.log) stores a sequence of recent updates. Each update
is appended to the current log file. When the log file reaches a
pre-determined size (approximately 1MB by default), it is converted
pre-determined size (approximately 4MB by default), it is converted
to a sorted table (see below) and a new log file is created for future
updates.
<p>
@ -83,19 +83,15 @@ Other files used for miscellaneous purposes may also be present
<h1>Level 0</h1>
When the log file grows above a certain size (1MB by default):
<ul>
<li>Write the contents of the current memtable to an sstable
<li>Replace the current memtable by a brand new empty memtable
<li>Switch to a new log file
<li>Create a brand new memtable and log file and direct future updates here
<li>In the background:
<ul>
<li>Write the contents of the previous memtable to an sstable
<li>Discard the memtable
<li>Delete the old log file and the old memtable
<li>Add the new sstable to the young (level-0) level.
</ul>
</ul>
Experimental measurements show that generating an sstable from a 1MB
log file takes ~12ms, which seems like an acceptable latency hiccup to
add infrequently to a log write.
<p>
The new sstable is added to a special level-0 level. level-0 contains
a set of files (up to 4 by default). However unlike other levels,
these files do not cover disjoint ranges, but may overlap each other.
<h1>Compactions</h1>
@ -162,8 +158,8 @@ read.
<p>
Solution 1: To reduce this problem, we might want to increase the log
switching threshold when the number of level-0 files is large. Though
the downside is that the larger this threshold, the larger the delay
that we will add to write latency when a write triggers a log switch.
the downside is that the larger this threshold, the more memory we will
need to hold the corresponding memtable.
<p>
Solution 2: We might want to decrease write rate artificially when the

+ 12
- 4
doc/index.html Wyświetl plik

@ -141,10 +141,18 @@ the batch.
<p>
<h1>Concurrency</h1>
<p>
A database may only be opened by one process at a time. The <code>leveldb</code>
implementation acquires a lock from the operating system to prevent
misuse. Within a single process, the same <code>leveldb::DB</code> object may
be safely used by multiple concurrent threads.
A database may only be opened by one process at a time.
The <code>leveldb</code> implementation acquires a lock from the
operating system to prevent misuse. Within a single process, the
same <code>leveldb::DB</code> object may be safely shared by multiple
concurrent threads. I.e., different threads may write into or fetch
iterators or call <code>Get</code> on the same database without any
external synchronization (the leveldb implementation will
automatically do the required synchronization). However other objects
(like Iterator and WriteBatch) may require external synchronization.
If two threads share such an object, they must protect access to it
using their own locking protocol. More details are available in
the public header files.
<p>
<h1>Iteration</h1>
<p>

+ 3
- 1
include/leveldb/comparator.h Wyświetl plik

@ -12,7 +12,9 @@ namespace leveldb {
class Slice;
// A Comparator object provides a total order across slices that are
// used as keys in an sstable or a database.
// used as keys in an sstable or a database. A Comparator implementation
// must be thread-safe since leveldb may invoke its methods concurrently
// from multiple threads.
class Comparator {
public:
virtual ~Comparator();

+ 13
- 7
include/leveldb/db.h Wyświetl plik

@ -13,26 +13,32 @@
namespace leveldb {
static const int kMajorVersion = 1;
static const int kMinorVersion = 1;
static const int kMinorVersion = 2;
struct Options;
struct ReadOptions;
struct WriteOptions;
class Snapshot;
class WriteBatch;
// Some internal types. Clients should ignore.
class WriteBatchInternal;
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
class Snapshot {
protected:
virtual ~Snapshot();
};
// A range of keys
struct Range {
Slice start;
Slice limit;
Slice start; // Included in the range
Slice limit; // Not included in the range
Range(const Slice& s, const Slice& l) : start(s), limit(l) { }
};
// A DB is a persistent ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
class DB {
public:
// Open the database with the specified "name".

+ 12
- 0
include/leveldb/env.h Wyświetl plik

@ -6,6 +6,9 @@
// operating system functionality like the filesystem etc. Callers
// may wish to provide a custom Env object when opening a database to
// get fine gain control; e.g., to rate limit file system operations.
//
// All Env implementations are safe for concurrent access from
// multiple threads without any external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_
#define STORAGE_LEVELDB_INCLUDE_ENV_H_
@ -160,6 +163,15 @@ class SequentialFile {
//
// REQUIRES: External synchronization
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
// Skip "n" bytes from the file. This is guaranteed to be no
// slower that reading the same data, but may be faster.
//
// If end of file is reached, skipping will stop at the end of the
// file, and Skip will return OK.
//
// REQUIRES: External synchronization
virtual Status Skip(uint64_t n) = 0;
};
// A file abstraction for randomly reading the contents of a file.

+ 5
- 0
include/leveldb/iterator.h Wyświetl plik

@ -6,6 +6,11 @@
// The following class defines the interface. Multiple implementations
// are provided by this library. In particular, iterators are provided
// to access the contents of a Table or a DB.
//
// Multiple threads can invoke const methods on an Iterator without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Iterator must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_ITERATOR_H_
#define STORAGE_LEVELDB_INCLUDE_ITERATOR_H_

+ 5
- 0
include/leveldb/slice.h Wyświetl plik

@ -6,6 +6,11 @@
// storage and a size. The user of a Slice must ensure that the slice
// is not used after the corresponding external storage has been
// deallocated.
//
// Multiple threads can invoke const methods on a Slice without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Slice must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_SLICE_H_
#define STORAGE_LEVELDB_INCLUDE_SLICE_H_

+ 25
- 11
include/leveldb/status.h Wyświetl plik

@ -4,12 +4,16 @@
//
// A Status encapsulates the result of an operation. It may indicate success,
// or it may indicate an error with an associated error message.
//
// Multiple threads can invoke const methods on a Status without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same Status must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_STATUS_H_
#define STORAGE_LEVELDB_INCLUDE_STATUS_H_
#include <string>
#include <utility>
#include "leveldb/slice.h"
namespace leveldb {
@ -18,7 +22,7 @@ class Status {
public:
// Create a success status.
Status() : state_(NULL) { }
~Status() { delete state_; }
~Status() { delete[] state_; }
// Copy the specified status.
Status(const Status& s);
@ -29,7 +33,7 @@ class Status {
// Return error status of an appropriate type.
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, Slice());
return Status(kNotFound, msg, msg2);
}
static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kCorruption, msg, msg2);
@ -55,6 +59,13 @@ class Status {
std::string ToString() const;
private:
// OK status has a NULL state_. Otherwise, state_ is a new[] array
// of the following form:
// state_[0..3] == length of message
// state_[4] == code
// state_[5..] == message
const char* state_;
enum Code {
kOk = 0,
kNotFound = 1,
@ -63,21 +74,24 @@ class Status {
kInvalidArgument = 4,
kIOError = 5,
};
Code code() const { return (state_ == NULL) ? kOk : state_->first; }
Status(Code code, const Slice& msg, const Slice& msg2);
Code code() const {
return (state_ == NULL) ? kOk : static_cast<Code>(state_[4]);
}
typedef std::pair<Code, std::string> State;
State* state_;
Status(Code code, const Slice& msg, const Slice& msg2);
static const char* CopyState(const char* s);
};
inline Status::Status(const Status& s) {
state_ = (s.state_ == NULL) ? NULL : new State(*s.state_);
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
inline void Status::operator=(const Status& s) {
if (this != &s) {
delete state_;
state_ = (s.state_ == NULL) ? NULL : new State(*s.state_);
// The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok.
if (state_ != s.state_) {
delete[] state_;
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
}

+ 2
- 1
include/leveldb/table.h Wyświetl plik

@ -17,7 +17,8 @@ class RandomAccessFile;
struct ReadOptions;
// A Table is a sorted map from strings to strings. Tables are
// immutable and persistent.
// immutable and persistent. A Table may be safely accessed from
// multiple threads without external synchronization.
class Table {
public:
// Attempt to open the table that is stored in bytes [0..file_size)

+ 5
- 0
include/leveldb/table_builder.h Wyświetl plik

@ -4,6 +4,11 @@
//
// TableBuilder provides the interface used to build a Table
// (an immutable and sorted map from keys to values).
//
// Multiple threads can invoke const methods on a TableBuilder without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same TableBuilder must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_TABLE_BUILDER_H_
#define STORAGE_LEVELDB_INCLUDE_TABLE_BUILDER_H_

+ 15
- 0
include/leveldb/write_batch.h Wyświetl plik

@ -12,11 +12,17 @@
// batch.Delete("key");
// batch.Put("key", "v2");
// batch.Put("key", "v3");
//
// Multiple threads can invoke const methods on a WriteBatch without
// external synchronization, but if any of the threads may call a
// non-const method, all threads accessing the same WriteBatch must use
// external synchronization.
#ifndef STORAGE_LEVELDB_INCLUDE_WRITE_BATCH_H_
#define STORAGE_LEVELDB_INCLUDE_WRITE_BATCH_H_
#include <string>
#include "leveldb/status.h"
namespace leveldb {
@ -36,6 +42,15 @@ class WriteBatch {
// Clear all updates buffered in this batch.
void Clear();
// Support for iterating over the contents of a batch.
class Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};
Status Iterate(Handler* handler) const;
private:
friend class WriteBatchInternal;

+ 1
- 1
table/block_builder.cc Wyświetl plik

@ -80,7 +80,7 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) {
if (counter_ < options_->block_restart_interval) {
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_[shared] == key[shared])) {
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {

+ 9
- 5
table/table_test.cc Wyświetl plik

@ -319,13 +319,15 @@ class MemTableConstructor: public Constructor {
: Constructor(cmp),
internal_comparator_(cmp) {
memtable_ = new MemTable(internal_comparator_);
memtable_->Ref();
}
~MemTableConstructor() {
delete memtable_;
memtable_->Unref();
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
delete memtable_;
memtable_->Unref();
memtable_ = new MemTable(internal_comparator_);
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
it != data.end();
@ -736,16 +738,17 @@ class MemTableTest { };
TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator());
MemTable memtable(cmp);
MemTable* memtable = new MemTable(cmp);
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
batch.Put(std::string("k3"), std::string("v3"));
batch.Put(std::string("largekey"), std::string("vlarge"));
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &memtable).ok());
ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, memtable).ok());
Iterator* iter = memtable.NewIterator();
Iterator* iter = memtable->NewIterator();
iter->SeekToFirst();
while (iter->Valid()) {
fprintf(stderr, "key: '%s' -> '%s'\n",
@ -755,6 +758,7 @@ TEST(MemTableTest, Simple) {
}
delete iter;
memtable->Unref();
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) {

+ 7
- 0
util/env_chromium.cc Wyświetl plik

@ -141,6 +141,13 @@ class ChromiumSequentialFile: public SequentialFile {
}
return s;
}
virtual Status Skip(uint64_t n) {
if (fseek(file_, n, SEEK_CUR)) {
return Status::IOError(filename_, strerror(errno));
}
return Status::OK();
}
};
class ChromiumRandomAccessFile: public RandomAccessFile {

+ 7
- 0
util/env_posix.cc Wyświetl plik

@ -52,6 +52,13 @@ class PosixSequentialFile: public SequentialFile {
}
return s;
}
virtual Status Skip(uint64_t n) {
if (fseek(file_, n, SEEK_CUR)) {
return Status::IOError(filename_, strerror(errno));
}
return Status::OK();
}
};
class PosixRandomAccessFile: public RandomAccessFile {

+ 26
- 10
util/status.cc Wyświetl plik

@ -8,13 +8,29 @@
namespace leveldb {
const char* Status::CopyState(const char* state) {
uint32_t size;
memcpy(&size, state, sizeof(size));
char* result = new char[size + 5];
memcpy(result, state, size + 5);
return result;
}
Status::Status(Code code, const Slice& msg, const Slice& msg2) {
assert(code != kOk);
state_ = new State(make_pair(code, std::string(msg.data(), msg.size())));
if (!msg2.empty()) {
state_->second.append(": ");
state_->second.append(msg2.data(), msg2.size());
const uint32_t len1 = msg.size();
const uint32_t len2 = msg2.size();
const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
char* result = new char[size + 5];
memcpy(result, &size, sizeof(size));
result[4] = static_cast<char>(code);
memcpy(result + 5, msg.data(), len1);
if (len2) {
result[5 + len1] = ':';
result[6 + len1] = ' ';
memcpy(result + 7 + len1, msg2.data(), len2);
}
state_ = result;
}
std::string Status::ToString() const {
@ -23,12 +39,12 @@ std::string Status::ToString() const {
} else {
char tmp[30];
const char* type;
switch (state_->first) {
switch (code()) {
case kOk:
type = "OK";
break;
case kNotFound:
type = "NotFound";
type = "NotFound: ";
break;
case kCorruption:
type = "Corruption: ";
@ -44,14 +60,14 @@ std::string Status::ToString() const {
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(state_->first));
static_cast<int>(code()));
type = tmp;
break;
}
std::string result(type);
if (!state_->second.empty()) {
result.append(state_->second);
}
uint32_t length;
memcpy(&length, state_, sizeof(length));
result.append(state_ + 5, length);
return result;
}
}

Ładowanie…
Anuluj
Zapisz