diff --git a/db/builder.cc b/db/builder.cc index e6329e0..a8b9780 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -11,6 +11,7 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "table/vtable_builder.h" namespace leveldb { @@ -21,6 +22,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, iter->SeekToFirst(); std::string fname = TableFileName(dbname, meta->number); + std::string vtb_name = VTableFileName(dbname, meta->number); if (iter->Valid()) { WritableFile* file; s = env->NewWritableFile(fname, &file); @@ -28,12 +30,44 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + WritableFile* vtb_file; + s = env->NewWritableFile(vtb_name, &vtb_file); + if (!s.ok()) { + return s; + } + TableBuilder* builder = new TableBuilder(options, file); + VTableBuilder* vtb_builder = new VTableBuilder(options, vtb_file); meta->smallest.DecodeFrom(iter->key()); Slice key; for (; iter->Valid(); iter->Next()) { key = iter->key(); - builder->Add(key, iter->value()); + Slice value = iter->value(); + if (value.size() < options.kv_sep_size) { + // No need to separate key and value + builder->Add(key, value); + } + else { + // Separate key value + ParsedInternalKey parsed; + if (!ParseInternalKey(key, &parsed)) { + s = Status::Corruption("Fatal. Memtable Key Error"); + builder->Abandon(); + vtb_builder->Abandon(); + return s; + } + value.remove_prefix(1); + VTableRecord record {parsed.user_key, value}; + VTableHandle handle; + VTableIndex index; + std::string value_index; + vtb_builder->Add(record, &handle); + + index.file_number = meta->number; + index.vtable_handle = handle; + index.Encode(&value_index); + builder->Add(key, Slice(value_index)); + } } if (!key.empty()) { meta->largest.DecodeFrom(key); @@ -58,6 +92,20 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, file = nullptr; if (s.ok()) { + s = vtb_builder->Finish(); + } + delete vtb_builder; + + if (s.ok()) { + s = vtb_file->Sync(); + } + if (s.ok()) { + s = vtb_file->Close(); + } + delete vtb_file; + vtb_file = nullptr; + + if (s.ok()) { // Verify that the table is usable Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number, meta->file_size); diff --git a/db/db_impl.cc b/db/db_impl.cc index 6db14f3..59205bb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,14 +4,6 @@ #include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include - #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -23,15 +15,26 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include +#include +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" + #include "port/port.h" #include "table/block.h" #include "table/merger.h" #include "table/two_level_iterator.h" +#include "table/vtable_format.h" +#include "table/vtable_reader.h" #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -1118,6 +1121,62 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } +namespace { + +bool GetChar(Slice* input, unsigned char* value) { + if (input->empty()) { + return false; + } + *value = *input->data(); + return true; +} + +} // namespace + +Status DBImpl::DecodeValue(std::string* value) const { + enum Type : unsigned char { + kVTableIndex = 1, + kNonIndexValue = 2, + }; + std::string tmp = *value; + auto input = new Slice(tmp); + unsigned char type; + if (!GetChar(input, &type)) { + return Status::Corruption("Fatal Value Error"); + } + if (type == kNonIndexValue) { + input->remove_prefix(1); + *value = input->ToString(); + return Status::OK(); + } + if (type == kVTableIndex) { + VTableIndex index; + VTableReader reader; + VTableRecord record; + + Status s = index.Decode(input); + if (!s.ok()) { + return s; + } + + std::string vtb_name = VTableFileName(this->dbname_, index.file_number); + s = reader.Open(this->options_, vtb_name); + if (!s.ok()) { + return s; + } + + s = reader.Get(index.vtable_handle, &record); + if (!s.ok()) { + return s; + } + *value = record.value.ToString(); + return s; + } + return Status::Corruption("Unsupported value type"); + +} + + Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; @@ -1146,12 +1205,19 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { - // Done + if (s.ok()) { + s = DecodeValue(value); + } } else if (imm != nullptr && imm->Get(lkey, value, &s)) { - // Done + if (s.ok()) { + s = DecodeValue(value); + } } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; + if (s.ok()) { + s = DecodeValue(value); + } } auto fields = Fields(Slice(*value)); *value = fields["1"]; @@ -1196,12 +1262,19 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { - // Done + if (s.ok()) { + s = DecodeValue(value); + } } else if (imm != nullptr && imm->Get(lkey, value, &s)) { - // Done + if (s.ok()) { + s = DecodeValue(value); + } } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; + if (s.ok()) { + s = DecodeValue(value); + } } *fields = Fields(Slice(*value)); mutex_.Lock(); @@ -1561,11 +1634,25 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } +namespace { + +void EncodeNonIndexValue(const Slice& value, std::string* res) { + enum Type : unsigned char { + kNonIndexValue = 2, + }; + res->push_back(kNonIndexValue); + res->append(value.ToString()); +} + +} // namespace + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(key, value); + std::string encoded_value; + EncodeNonIndexValue(value, &encoded_value); + batch.Put(key, Slice(encoded_value)); return Write(opt, &batch); } diff --git a/db/db_impl.h b/db/db_impl.h index 696d0b6..4d6fc29 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -43,6 +43,7 @@ class DBImpl : public DB { const Fields& fields) override; Status Delete(const WriteOptions&, const Slice& key) override; Status Write(const WriteOptions& options, WriteBatch* updates) override; + Status DecodeValue(std::string* value) const; Status Get(const ReadOptions& options, const Slice& key, Fields* fields) override; Status Get(const ReadOptions& options, const Slice& key, diff --git a/db/db_iter.cc b/db/db_iter.cc index 464ac0c..5ea9cca 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -71,7 +71,15 @@ class DBIter : public Iterator { } Fields fields() const override { assert(valid_); - return (direction_ == kForward) ? Fields(iter_->value()) : Fields(saved_value_); + if (direction_ == kForward) { + std::string field_str = iter_->value().ToString(); + db_->DecodeValue(&field_str); + return Fields(Slice(field_str)); + } else { + std::string field_str = saved_value_; + db_->DecodeValue(&field_str); + return Fields(Slice(field_str)); + } } Status status() const override { if (status_.ok()) { diff --git a/db/filename.cc b/db/filename.cc index e526249..21c306a 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -35,6 +35,11 @@ std::string TableFileName(const std::string& dbname, uint64_t number) { return MakeFileName(dbname, number, "ldb"); } +std::string VTableFileName(const std::string& dbname, uint64_t number) { + assert(number > 0); + return MakeFileName(dbname, number, "vtb"); +} + std::string SSTTableFileName(const std::string& dbname, uint64_t number) { assert(number > 0); return MakeFileName(dbname, number, "sst"); diff --git a/db/filename.h b/db/filename.h index 563c6d8..06cf019 100644 --- a/db/filename.h +++ b/db/filename.h @@ -38,6 +38,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number); // "dbname". std::string TableFileName(const std::string& dbname, uint64_t number); +// Return the name of the vTable with the specified number +// in the db named by "dbname". The result will be prefixed with +// "dbname". +std::string VTableFileName(const std::string& dbname, uint64_t number); + // Return the legacy file name for an sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..067947d 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -100,6 +100,9 @@ struct LEVELDB_EXPORT Options { // compression is enabled. This parameter can be changed dynamically. size_t block_size = 4 * 1024; + // Threshold of value size that decide whether to separate the key and value + size_t kv_sep_size = 1; + // Number of keys between restart points for delta encoding of keys. // This parameter can be changed dynamically. Most clients should // leave this parameter alone. diff --git a/test/test_fields.cc b/test/test_fields.cc index 8581ba4..39bb10d 100644 --- a/test/test_fields.cc +++ b/test/test_fields.cc @@ -117,6 +117,8 @@ TEST(TestFields, SearchKey) { const std::vector key_ret = db->FindKeysByField(field_test); ASSERT_EQ(CompareVector(key_ret, keys_have_field), true); + + delete db; } int main(int argc, char **argv) {