Browse Source

improve some codes and init unordered_iter

pull/3/head
dgy 9 months ago
parent
commit
cbf87651cc
6 changed files with 172 additions and 20 deletions
  1. +2
    -0
      benchmarks/db_bench.cc
  2. +31
    -19
      db/db_impl.cc
  3. +1
    -0
      db/db_impl.h
  4. +1
    -1
      db/dbformat.h
  5. +115
    -0
      db/unordered_iter.cc
  6. +22
    -0
      db/unordered_iter.h

+ 2
- 0
benchmarks/db_bench.cc View File

@ -49,6 +49,8 @@ static const char* FLAGS_benchmarks =
"fillsync,"
"fillrandom,"
"overwrite,"
"overwrite,"
"overwrite,"
"readrandom,"
"readrandom," // Extra run to allow previous compactions to quiesce
"readseq,"

+ 31
- 19
db/db_impl.cc View File

@ -1053,9 +1053,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
value.remove_prefix(1);
bool res = GetVarint64(&value, &file_id);
if (!res) return Status::Corruption("can't decode file id");
if(valuelog_origin[file_id] == 0){//???
valuelog_origin[file_id] = valuelog_usage[file_id];
}
valuelog_usage[file_id]--;
}
}
@ -1276,6 +1273,25 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
return s;
}
Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
int iter_num=24;
mutex_.Lock();
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
auto db_iter=NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
mutex_.Unlock();
return db_iter;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
@ -1357,7 +1373,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch, this);//need lock! to protect valuelog_number
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
@ -1367,10 +1385,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
{
mutex_.Unlock();
WriteBatchInternal::ConverToValueLog(write_batch, this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
@ -1634,8 +1648,11 @@ std::vector> DBImpl::WriteValueLog(
// 如果超出fixed_size
if(init_offset>=config::value_log_size){
valuelog_usage[valuelogfile_number_]=init_offset;
valuelog_origin[valuelogfile_number_]=init_offset;
uint64_t file_data_size = 0; // 文件数据大小标志位
valueFile.seekg(0, std::ios::beg);
valueFile.read(reinterpret_cast<char*>(&file_data_size), sizeof(uint64_t));
valuelog_usage[valuelogfile_number_]=file_data_size;
valuelog_origin[valuelogfile_number_]=file_data_size;
addNewValueLog();
valueFile.close();
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
@ -1805,8 +1822,9 @@ void DBImpl::GarbageCollect() {
auto tmp_name = ValueLogFileName(dbname_, cur_log_number);
if (!versions_->checkOldValueLog(tmp_name) &&
valuelog_origin[cur_log_number]) {
//std::cout<<((float)valuelog_usage[cur_log_number]) /(float)valuelog_origin[cur_log_number]<<std::endl;
if (((float)valuelog_usage[cur_log_number]) /
valuelog_origin[cur_log_number] <= 0.6) {
(float)valuelog_origin[cur_log_number] <= 0.6) {
valuelog_set.emplace(filename);
}
}
@ -2075,12 +2093,8 @@ void DBImpl::InitializeExistingLogs() {
valuelog_usage[cur_log_number]=0;
}
}
SequenceNumber latest_snapshot;
uint32_t seed;
int iter_num=24;
Iterator* iter= NewInternalIterator(ReadOptions(), &latest_snapshot, &seed);
auto db_iter=NewDBIterator(this, user_comparator(), iter,latest_snapshot,seed);
mutex_.Unlock();
auto db_iter=NewOriginalIterator(ReadOptions());
for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){
auto value=db_iter->value();
if(value.size()&&value[0]==0x01){
@ -2090,8 +2104,6 @@ void DBImpl::InitializeExistingLogs() {
valuelog_usage[valuelog_id]++;
}
}
mutex_.Unlock();
delete db_iter;
mutex_.Lock();
}

+ 1
- 0
db/db_impl.h View File

@ -61,6 +61,7 @@ class DBImpl : public DB {
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewOriginalIterator(const ReadOptions&);
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
bool GetProperty(const Slice& property, std::string* value) override;

+ 1
- 1
db/dbformat.h View File

@ -47,7 +47,7 @@ static const int kReadBytesPeriod = 1048576;
// maximum size of value_log file
static const int value_log_size=1<<26;
//1<<33/1<<26=1<<7
static const int mem_value_log_number=1<<9;//8GB
static const int mem_value_log_number=1<<8;//8GB
} // namespace config

+ 115
- 0
db/unordered_iter.cc View File

@ -0,0 +1,115 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>
#include <fstream>
#include <thread>
#include <queue>
#include "db/unordered_iter.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. UnorderedIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class UnorderedIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right};
UnorderedIter(DBImpl* db, Iterator* iter, Iterator* prefetch_iter,int prefetch_num)
:
db_(db),iter_(iter),prefetch_iter_(prefetch_iter),prefetch_num_(prefetch_num) {}
UnorderedIter(const UnorderedIter&) = delete;
UnorderedIter& operator=(const UnorderedIter&) = delete;
~UnorderedIter() override {
delete iter_;
}
bool Valid() const override { return iter_->Valid(); }
Slice key() const override {
return iter_->key();
}
Slice value() const override {
buf_for_value=std::move(GetAndParseTrueValue(iter_->value()));
return Slice(buf_for_value.data(),buf_for_value.size());
}
Status status() const override {
return iter_->status();
}
void Next() override;
void Prev() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
private:
std::string GetAndParseTrueValue(Slice tmp_value)const{
if(tmp_value.size()==0){
return "";
}
if(tmp_value.data()[0]==(char)(0x00)){
tmp_value.remove_prefix(1);
return std::string(tmp_value.data(),tmp_value.size());
}
tmp_value.remove_prefix(1);
uint64_t file_id,valuelog_offset;
bool res=GetVarint64(&tmp_value,&file_id);
if(!res)assert(0);
res=GetVarint64(&tmp_value,&valuelog_offset);
if(!res)assert(0);
std::string str;
Status s=db_->ReadValueLog(file_id,valuelog_offset, &str);
return std::move(str);
}
DBImpl* db_;
Iterator* const iter_;
};
void UnorderedIter::Next() {
iter_->Next();
}
void UnorderedIter::Prev() {
iter_->Prev();
}
void UnorderedIter::Seek(const Slice& target) {
iter_->Seek(target);
}
void UnorderedIter::SeekToFirst() {
iter_->SeekToFirst();
}
void UnorderedIter::SeekToLast() {
iter_->SeekToLast();
}
} // anonymous namespace
Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter) {
return new UnorderedIter(db,db_iter);
}
} // namespace leveldb

+ 22
- 0
db/unordered_iter.h View File

@ -0,0 +1,22 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#define STORAGE_LEVELDB_DB_PREFETCH_ITER_H_
#include <cstdint>
#include "db/dbformat.h"
#include "leveldb/db.h"
namespace leveldb {
class DBImpl;
// add a prefetch function for db_iter
Iterator* NewUnorderedIterator(DBImpl* db,Iterator* db_iter);
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_PREFETCH_ITER_H_

Loading…
Cancel
Save