diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index a2e9d1d..12742fa 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -49,6 +49,8 @@ static const char* FLAGS_benchmarks = "fillsync," "fillrandom," "overwrite," + "overwrite," + "overwrite," "readrandom," "readrandom," // Extra run to allow previous compactions to quiesce "readseq," diff --git a/db/db_impl.cc b/db/db_impl.cc index de7b8cb..2cd4377 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1053,9 +1053,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { value.remove_prefix(1); bool res = GetVarint64(&value, &file_id); if (!res) return Status::Corruption("can't decode file id"); - if(valuelog_origin[file_id] == 0){//??? - valuelog_origin[file_id] = valuelog_usage[file_id]; - } valuelog_usage[file_id]--; } } @@ -1276,6 +1273,25 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, return s; } +Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) { + SequenceNumber latest_snapshot; + uint32_t seed; + int iter_num=24; + + mutex_.Lock(); + + Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); + auto db_iter=NewDBIterator(this, user_comparator(), iter, + (options.snapshot != nullptr + ? static_cast(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); + + mutex_.Unlock(); + return db_iter; +} + Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; @@ -1357,7 +1373,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - + WriteBatchInternal::ConverToValueLog(write_batch, this);//need lock! to protect valuelog_number + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(write_batch); // Add to log and apply to memtable. We can release the lock @@ -1367,10 +1385,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { { mutex_.Unlock(); - WriteBatchInternal::ConverToValueLog(write_batch, this); - WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); - last_sequence += WriteBatchInternal::Count(write_batch); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; @@ -1634,8 +1648,11 @@ std::vector> DBImpl::WriteValueLog( // 如果超出fixed_size if(init_offset>=config::value_log_size){ - valuelog_usage[valuelogfile_number_]=init_offset; - valuelog_origin[valuelogfile_number_]=init_offset; + uint64_t file_data_size = 0; // 文件数据大小标志位 + valueFile.seekg(0, std::ios::beg); + valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + valuelog_usage[valuelogfile_number_]=file_data_size; + valuelog_origin[valuelogfile_number_]=file_data_size; addNewValueLog(); valueFile.close(); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); @@ -1805,8 +1822,9 @@ void DBImpl::GarbageCollect() { auto tmp_name = ValueLogFileName(dbname_, cur_log_number); if (!versions_->checkOldValueLog(tmp_name) && valuelog_origin[cur_log_number]) { + //std::cout<<((float)valuelog_usage[cur_log_number]) /(float)valuelog_origin[cur_log_number]<SeekToFirst();db_iter->Valid();db_iter->Next()){ auto value=db_iter->value(); if(value.size()&&value[0]==0x01){ @@ -2090,8 +2104,6 @@ void DBImpl::InitializeExistingLogs() { valuelog_usage[valuelog_id]++; } } - - mutex_.Unlock(); delete db_iter; mutex_.Lock(); } diff --git a/db/db_impl.h b/db/db_impl.h index 658ca0b..8b98055 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -61,6 +61,7 @@ class DBImpl : public DB { Status Get(const ReadOptions& options, const Slice& key, std::string* value) override; Iterator* NewIterator(const ReadOptions&) override; + Iterator* NewOriginalIterator(const ReadOptions&); const Snapshot* GetSnapshot() override; void ReleaseSnapshot(const Snapshot* snapshot) override; bool GetProperty(const Slice& property, std::string* value) override; diff --git a/db/dbformat.h b/db/dbformat.h index c6ab98f..c6a093b 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -47,7 +47,7 @@ static const int kReadBytesPeriod = 1048576; // maximum size of value_log file static const int value_log_size=1<<26; //1<<33/1<<26=1<<7 -static const int mem_value_log_number=1<<9;//8GB +static const int mem_value_log_number=1<<8;//8GB } // namespace config diff --git a/db/unordered_iter.cc b/db/unordered_iter.cc new file mode 100644 index 0000000..b440147 --- /dev/null +++ b/db/unordered_iter.cc @@ -0,0 +1,115 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include +#include "db/unordered_iter.h" + +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "leveldb/env.h" +#include "leveldb/iterator.h" +#include "port/port.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "port/port.h" + +namespace leveldb { + +namespace { + + + +// Memtables and sstables that make the DB representation contain +// (userkey,seq,type) => uservalue entries. UnorderedIter +// combines multiple entries for the same userkey found in the DB +// representation into a single entry while accounting for sequence +// numbers, deletion markers, overwrites, etc. +class UnorderedIter : public Iterator { + public: + // Which direction is the iterator currently moving? + // (1) When moving forward, the internal iterator is positioned at + // the exact entry that yields this->key(), this->value() + // (2) When moving backwards, the internal iterator is positioned + // just before all entries whose user key == this->key(). + enum IterPos {Left,Mid,Right}; + + UnorderedIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num) + : + db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {} + + UnorderedIter(const UnorderedIter&) = delete; + UnorderedIter& operator=(const UnorderedIter&) = delete; + + ~UnorderedIter() override { + delete iter_; + } + bool Valid() const override { return iter_->Valid(); } + Slice key() const override { + return iter_->key(); + } + Slice value() const override { + buf_for_value=std::move(GetAndParseTrueValue(iter_->value())); + return Slice(buf_for_value.data(),buf_for_value.size()); + } + Status status() const override { + return iter_->status(); + } + + void Next() override; + void Prev() override; + void Seek(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + + private: + std::string GetAndParseTrueValue(Slice tmp_value)const{ + if(tmp_value.size()==0){ + return ""; + } + if(tmp_value.data()[0]==(char)(0x00)){ + tmp_value.remove_prefix(1); + return std::string(tmp_value.data(),tmp_value.size()); + } + tmp_value.remove_prefix(1); + uint64_t file_id,valuelog_offset; + bool res=GetVarint64(&tmp_value,&file_id); + if(!res)assert(0); + res=GetVarint64(&tmp_value,&valuelog_offset); + if(!res)assert(0); + std::string str; + Status s=db_->ReadValueLog(file_id,valuelog_offset, &str); + return std::move(str); + } + + + DBImpl* db_; + Iterator* const iter_; +}; +void UnorderedIter::Next() { + iter_->Next(); +} +void UnorderedIter::Prev() { + iter_->Prev(); +} + +void UnorderedIter::Seek(const Slice& target) { + iter_->Seek(target); +} + +void UnorderedIter::SeekToFirst() { + iter_->SeekToFirst(); + } +void UnorderedIter::SeekToLast() { + iter_->SeekToLast(); +} + +} // anonymous namespace +Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter) { + return new UnorderedIter(db,db_iter); +} +} // namespace leveldb diff --git a/db/unordered_iter.h b/db/unordered_iter.h new file mode 100644 index 0000000..920c573 --- /dev/null +++ b/db/unordered_iter.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ +#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_ + +#include + +#include "db/dbformat.h" +#include "leveldb/db.h" + +namespace leveldb { + +class DBImpl; + +// add a prefetch function for db_iter +Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter); + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_