Сравнить коммиты

...

3 коммитов

Автор SHA1 Сообщение Дата
  cyq 3b6087ad23 fix bug 8 месяцев назад
  cyq c58cbc12aa 较为完整的kv分离实现 8 месяцев назад
  cyq ab67267dfd 除了kvlog回收外的所有 8 месяцев назад
24 измененных файлов: 808 добавлений и 22 удалений
  1. +7
    -0
      CMakeLists.txt
  2. +286
    -4
      db/db_impl.cc
  3. +26
    -0
      db/db_impl.h
  4. +3
    -0
      db/db_iter.cc
  5. +16
    -3
      db/dbformat.cc
  6. +10
    -7
      db/dbformat.h
  7. +9
    -0
      db/filename.cc
  8. +3
    -0
      db/filename.h
  9. +6
    -2
      db/log_writer.cc
  10. +1
    -0
      db/log_writer.h
  11. +13
    -4
      db/memtable.cc
  12. +30
    -1
      db/version_edit.cc
  13. +16
    -0
      db/version_edit.h
  14. +55
    -0
      db/version_set.cc
  15. +7
    -0
      db/version_set.h
  16. +44
    -0
      db/write_batch.cc
  17. +6
    -0
      db/write_batch_internal.h
  18. +1
    -1
      fielddb/field_db.h
  19. +2
    -0
      include/leveldb/options.h
  20. +109
    -0
      kv_sep/kvlog.cc
  21. +72
    -0
      kv_sep/kvlog.h
  22. +0
    -0
      kv_sep/kvlog_cache.cc
  23. +0
    -0
      kv_sep/kvlog_cache.h
  24. +86
    -0
      test/test.cc

+ 7
- 0
CMakeLists.txt Просмотреть файл

@ -200,6 +200,8 @@ target_sources(leveldb
"fielddb/request.h"
"testdb/testdb.cc"
"testdb/testdb.h"
"kv_sep/kvlog.cc"
"kv_sep/kvlog.h"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
@ -544,3 +546,8 @@ add_executable(recover_test
"${PROJECT_SOURCE_DIR}/test/recover_test.cc"
)
target_link_libraries(recover_test PRIVATE leveldb gtest)
add_executable(test1
"${PROJECT_SOURCE_DIR}/test/test.cc"
)
target_link_libraries(test1 PRIVATE leveldb gtest)

+ 286
- 4
db/db_impl.cc Просмотреть файл

@ -4,10 +4,13 @@
#include "db/db_impl.h"
#include "db/version_edit.h"
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <set>
#include <string>
#include <vector>
@ -24,10 +27,17 @@
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
#include "port/port_stdcxx.h"
#include "port/thread_annotations.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
@ -35,6 +45,7 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
@ -145,6 +156,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
log_(nullptr),
seed_(0),
tmp_batch_(new WriteBatch),
tmp_kp_batch_(new WriteBatch),
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
@ -235,6 +247,8 @@ void DBImpl::RemoveObsoleteFiles() {
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
//将所有的live的kvlog加入集合,防止被删除
versions_->AddLiveKVLogs(&live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
@ -262,6 +276,9 @@ void DBImpl::RemoveObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end());
break;
case kKVLogFile:
keep = (live.find(number) != live.end());
break;
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
@ -533,13 +550,18 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != nullptr) {
if (base != nullptr && !only_Level0) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
}
edit->AddKVLogs(imm_kvlogfile_number);
pending_outputs_.erase(imm_kvlogfile_number);
imm_kvlogfile_number = 0;
delete imm_kvlogfile_;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
@ -705,6 +727,155 @@ void DBImpl::BackgroundCall() {
background_work_finished_signal_.SignalAll();
}
bool DBImpl::CollectKVLogs() {
mutex_.AssertHeld();
Version *current = versions_->current();
current->Ref();
int total_files = 0;
for(int i = 0; i < config::kNumLevels; i++) {
total_files += current->NumFiles(i);
}
//这个判断保证了所有被选择回收的kvlog都是不对应于mem、imm、L0文件的
if(current->kvlogs_.size() < total_files * 20) {
// std::cout << "kvlogs not enough : " << current->kvlogs_.size() << " " << total_files << std::endl;
current->Unref();
return false;
}
Log(options_.info_log,"CollectKVLogs Start\n");
std::vector<FileMetaData*> &kvlogs = current->kvlogs_;
FileMetaData metaSST, metaKVLog;
metaSST.number = versions_->NewFileNumber();
metaKVLog.number = versions_->NewFileNumber();
pending_outputs_.insert(metaSST.number);
pending_outputs_.insert(metaKVLog.number);
// WritableFile *fileSST;
// env_->NewWritableFile(TableFileName(dbname_, metaSST.number),&fileSST);
// TableBuilder *builder = new TableBuilder(options_,fileSST);
WritableFile *fileKVLog;
env_->NewWritableFile(KVLogFileName(dbname_, metaKVLog.number), &fileKVLog);
KVLog kvlog_writer(fileKVLog,metaKVLog.number);
FilePointer fp;
uint32_t seed;
SequenceNumber latest_sequence;
ReadOptions ro;
ro.decode = false;
Iterator *deep_iter = NewInternalDeepIterator(ro,&latest_sequence,&seed);
// Iterator *iter = NewDBIterator(this, user_comparator(), deep_iter, latest_sequence, seed);
uint64_t budget = 100;
only_Level0 = true; //保证后续的小合并的结果只会在level0
mutex_.Unlock();
std::string saved_key_;
// ParsedInternalKey key_in_kvlog;
ParsedInternalKey key_in_sst;
WriteBatch write_batch,kp_batch;
MemTable *tempMem = new MemTable(internal_comparator_);
tempMem->Ref();
std::set<uint64_t> kvlogs_to_remove;
bool has_collected = false;
for(int i = 0; i < std::min(kvlogs.size(),20ul) && budget > 0; i++) {
FileMetaData *f = kvlogs[i];
SequentialFile *file;
env_->NewSequentialFile(KVLogFileName(dbname_, f->number), &file);
KVLogReader reader(file);
for(int cnt = 0; reader.Valid(); reader.Next(), cnt ++) {
// std::cout << "find KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
if(reader.Type() == kTypeDeletion) {
// std::cout << "is delete record\n";
continue;
}
saved_key_.clear();
AppendInternalKey(&saved_key_,
ParsedInternalKey(reader.Key(),latest_sequence,kValueTypeForSeek,metaKVLog.number));
deep_iter->Seek(saved_key_);
// ParseInternalKey(reader.Key(), &key_in_kvlog);
ParseInternalKey(deep_iter->key(), &key_in_sst);
//当前的key已经由于合并没有了,kvlog中的记录失效
if(internal_comparator_.user_comparator()->Compare(reader.Key(), key_in_sst.user_key) != 0) {
// std::cout << "has been compacted\n";
continue;
}
//如果搜索到的key是delete,或者key的sequence比kvlog里面的大,那么kvlog中的就是失效状态
//这里不可能搜到比kvlog里面还要小的情况
if(key_in_sst.type == kTypeDeletion) {
// std::cout << "key delete \n";
continue;
}
if(key_in_sst.sequence > reader.Seq()) {
// printf("sst seq larger: sst:%ld kvlog:%ld\n",key_in_sst.sequence,reader.Seq());
continue;
}
if(i > 0) {
budget = 0;
break;
}
has_collected = true;
// std::cout << "collected KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
// if(budget % 100 == 0) {
// std::cout << "collected KV: " << reader.Value().ToString() << " " << reader.Seq() << " " << cnt << "\n";
// std::cout << "rest budget : " << budget << "\n";
// }
budget--;
write_batch.Clear();
kp_batch.Clear();
write_batch.Put(reader.Key(), reader.Value());
WriteBatchInternal::SetSequence(&write_batch, reader.Seq());
kvlog_writer.AddRecord(WriteBatchInternal::Contents(&write_batch), fp);
WriteBatchInternal::ConstructKPBatch(&kp_batch, &write_batch, fp);
WriteBatchInternal::InsertInto(&kp_batch, tempMem);
}
delete file;
if(reader.Valid()) break;
kvlogs_to_remove.insert(f->number);
// break;//当前一次只回收一个kvlog
}
VersionEdit edit;
Iterator *miter = nullptr;
if(has_collected) {
miter = tempMem->NewIterator();
Status s = BuildTable(dbname_, env_, options_,
table_cache_, miter, &metaSST);
}
mutex_.Lock();
if(has_collected) {
edit.AddFile(0, metaSST.number, metaSST.file_size,
metaSST.smallest, metaSST.largest);
edit.AddKVLogs(metaKVLog.number);
}
for(auto num : kvlogs_to_remove) {
edit.RemoveKVLogs(num);
}
Log(options_.info_log,"Add SST at Level0 : %ld\n",metaSST.number);
for(auto num : kvlogs_to_remove) {
Log(options_.info_log,"Collect kvlog : %ld\n",num);
printf("Collect kvlog : %ld\n",num);
}
Log(options_.info_log,"Merge to kvlog : %ld\n",metaKVLog.number);
versions_->LogAndApply(&edit, &mutex_);
delete miter;
tempMem->Unref();
delete deep_iter;
delete fileKVLog;
pending_outputs_.erase(metaSST.number);
pending_outputs_.erase(metaKVLog.number);
only_Level0 = false;
current->Unref();
return true;
}
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
@ -713,6 +884,10 @@ void DBImpl::BackgroundCompaction() {
return;
}
if(CollectKVLogs()) {
return;
}
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
@ -1078,6 +1253,20 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
delete state;
}
struct DeepIterState {
port::Mutex* const mu;
Version* const version GUARDED_BY(mu);
DeepIterState(port::Mutex* mutex, Version* version)
: mu(mutex),version(version) {}
};
static void CleanupDeepIteratorState(void* arg1, void* args) {
DeepIterState* state = reinterpret_cast<DeepIterState*>(arg1);
state->mu->AssertHeld();
state->version->Unref();
delete state;
}
} // anonymous namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
@ -1107,6 +1296,33 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
return internal_iter;
}
//在初始化和析构的时候都要保证mutex_.AssertHeld()
Iterator* DBImpl::NewInternalDeepIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot,
uint32_t* seed) {
mutex_.AssertHeld();
// mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
versions_->current()->AddDeepIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
// IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
// internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
DeepIterState *cleanup = new DeepIterState(&mutex_,versions_->current());
internal_iter->RegisterCleanup(CleanupDeepIteratorState, cleanup, nullptr);
*seed = ++seed_;
// mutex_.Unlock();
return internal_iter;
}
Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored;
uint32_t ignored_seed;
@ -1118,6 +1334,18 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
Slice DBImpl::GetValueFromFP(const FilePointer &fp,std::string *value) {
RandomAccessFile *file;
Status s = env_->NewRandomAccessFile(KVLogFileName(dbname_, fp.FileNumber), &file);
Slice slice;
char *buf = new char[fp.Size];
s = file->Read(fp.FileOffset, fp.Size, &slice, buf);
*value = slice.ToString();
delete[] buf;
delete file;
return slice;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1159,6 +1387,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
if(!s.ok()) {
// printf( "not found : %s\n",key.ToString().c_str());
} else {
// printf("found key:%s value:%s\n",key.ToString().c_str(),value->c_str());
FilePointer fp;
DecodeFp(fp, value->data());
GetValueFromFP(fp, value);
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
@ -1252,12 +1489,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
uint64_t last_sequence = versions_->LastSequence(), temp_seq = last_sequence;
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
@ -1266,7 +1504,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
{
mutex_.Unlock();
// uint64_t start_write = env_->NowMicros();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
FilePointer fp;
//1. 将WriteBatch写入到kvlog中
status = kvlog_->AddRecord(WriteBatchInternal::Contents(write_batch), fp);
//2. 将writebatch的filepointer写入到log中
// status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
char rep[8 * 3];
EncodeFP(fp, rep);
status = log_->AddRecord(Slice(rep, 3 * 8));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
@ -1274,8 +1519,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
sync_error = true;
}
}
//3. 根据write_batch里面的内容,构建kp_batch
WriteBatchInternal::ConstructKPBatch(tmp_kp_batch_, write_batch, fp);
WriteBatchInternal::SetSequence(tmp_kp_batch_,temp_seq + 1);
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
status = WriteBatchInternal::InsertInto(tmp_kp_batch_, mem_);
}
// BatchSize += write_batch->ApproximateSize();
// write_elapsed += env_->NowMicros() - start_write;
@ -1311,6 +1559,15 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// NoWaiting_elapsed += env_->NowMicros() - start_;
// Nowaited_count ++;
// dumpStatistics();
// SequentialFile *file;
// env_->NewSequentialFile(KVLogFileName(dbname_, kvlogfile_number_), &file);
// KVLogReader reader(file);
// std::cout << "==============tail check begin==============\n";
// for( ;reader.Valid(); reader.Next()) {
// printf("key:%s, value:%s, seq:%ld\n",reader.Key().ToString().c_str(),reader.Value().ToString().c_str(),reader.Seq());
// }
// std::cout << "==============tail check end==============\n";
// delete file;
return status;
}
@ -1412,6 +1669,22 @@ Status DBImpl::MakeRoomForWrite(bool force) {
versions_->ReuseFileNumber(new_log_number);
break;
}
/*更换新的kvlog*/
uint64_t new_kvlog_number = versions_->NewFileNumber();
WritableFile* kvlogfile = nullptr;
s = env_->NewWritableFile(KVLogFileName(dbname_,new_kvlog_number),&kvlogfile);
if(!s.ok()) {
versions_->ReuseFileNumber(new_kvlog_number);
break;
}
pending_outputs_.insert(new_kvlog_number);
kvlogfile_->Close();
imm_kvlogfile_ = kvlogfile_;
kvlogfile_ = kvlogfile;
imm_kvlogfile_number = kvlogfile_number_;
kvlogfile_number_ = new_kvlog_number;
delete kvlog_;
kvlog_ = new KVLog(kvlogfile,new_kvlog_number);
delete log_;
@ -1566,6 +1839,14 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
uint64_t new_kvlog_number = impl->versions_->NewFileNumber();
WritableFile* kvlogfile;
s = options.env->NewWritableFile(KVLogFileName(dbname, new_kvlog_number), &kvlogfile);
impl->pending_outputs_.insert(new_kvlog_number);
impl->kvlogfile_number_ = new_kvlog_number;
impl->kvlogfile_ = kvlogfile;
impl->kvlog_ = new KVLog(kvlogfile,new_kvlog_number);
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
@ -1607,6 +1888,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
if (ParseFileName(filenames[i], &number, &type) &&
type != kDBLockFile) { // Lock file will be deleted at end
Status del = env->RemoveFile(dbname + "/" + filenames[i]);
// std::cout << "remove file : " << filenames[i] << std::endl;
if (result.ok() && !del.ok()) {
result = del;
}

+ 26
- 0
db/db_impl.h Просмотреть файл

@ -18,10 +18,15 @@
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/serialize_value.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
class MemTable;
@ -81,6 +86,8 @@ class DBImpl : public DB {
// bytes.
void RecordReadSample(Slice key);
private:
friend class DB;
struct CompactionState;
@ -115,6 +122,10 @@ class DBImpl : public DB {
SequenceNumber* latest_snapshot,
uint32_t* seed);
Iterator* NewInternalDeepIterator(const ReadOptions&,
SequenceNumber* latest_snapshot,
uint32_t* seed);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
@ -160,6 +171,12 @@ class DBImpl : public DB {
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/*about kvlogs*/
public:
Slice GetValueFromFP(const FilePointer &fp,std::string *value);
private:
int triggers = 0;
bool CollectKVLogs();
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
@ -190,11 +207,20 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
/*kvlog begin*/
WritableFile* kvlogfile_;
uint64_t kvlogfile_number_ GUARDED_BY(mutex_);
KVLog *kvlog_;
WritableFile* imm_kvlogfile_;
uint64_t imm_kvlogfile_number;
bool only_Level0 GUARDED_BY(mutex_) = false;
/*kvlog end*/
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
WriteBatch* tmp_kp_batch_ GUARDED_BY(mutex_);//memtable和SSTable的key,pointer
SnapshotList snapshots_ GUARDED_BY(mutex_);

+ 3
- 0
db/db_iter.cc Просмотреть файл

@ -13,6 +13,7 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
@ -113,6 +114,8 @@ class DBIter : public Iterator {
Status status_;
std::string saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse
std::string buf;
FilePointer fp;
Direction direction_;
bool valid_;
Random rnd_;

+ 16
- 3
db/dbformat.cc Просмотреть файл

@ -4,6 +4,7 @@
#include "db/dbformat.h"
#include <cstdint>
#include <cstdio>
#include <sstream>
@ -20,13 +21,14 @@ static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, key.FileNumber);
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
std::string ParsedInternalKey::DebugString() const {
std::ostringstream ss;
ss << '\'' << EscapeString(user_key.ToString()) << "' @ " << sequence << " : "
<< static_cast<int>(type);
<< static_cast<int>(type) << " in file:" << FileNumber;
return ss.str();
}
@ -59,6 +61,15 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
r = +1;
}
}
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8 - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8 - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
@ -116,7 +127,7 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t needed = usize + 13 + 8; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
@ -124,10 +135,12 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
dst = EncodeVarint32(dst, usize + 8 + 8);
kstart_ = dst;
std::memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, UINT64_MAX);
dst += 8;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;

+ 10
- 7
db/dbformat.h Просмотреть файл

@ -69,17 +69,18 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
uint64_t FileNumber;
ValueType type;
ParsedInternalKey() {} // Intentionally left uninitialized (for speed)
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t, uint64_t file = UINT64_MAX)
: user_key(u), sequence(seq), type(t), FileNumber(file) {}
std::string DebugString() const;
};
// Return the length of the encoding of "key".
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
return key.user_key.size() + 8;
return key.user_key.size() + 8 + 8;
}
// Append the serialization of "key" to *result.
@ -94,7 +95,7 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);
// Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
return Slice(internal_key.data(), internal_key.size() - 8 - 8);
}
// A comparator for internal keys that uses a specified comparator for
@ -176,7 +177,8 @@ inline bool ParseInternalKey(const Slice& internal_key,
uint8_t c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
result->user_key = Slice(internal_key.data(), n - 8 - 8);
result->FileNumber = DecodeFixed64(internal_key.data() + n - 8 - 8);
return (c <= static_cast<uint8_t>(kTypeValue));
}
@ -199,12 +201,13 @@ class LookupKey {
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8 - 8); }
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// userkey char[klength - 8 - 8] <-- kstart_
// FileNumber uint64
// tag uint64
// <-- end_
// The array is a suitable MemTable key.

+ 9
- 0
db/filename.cc Просмотреть файл

@ -5,10 +5,12 @@
#include "db/filename.h"
#include <cassert>
#include <cstdint>
#include <cstdio>
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "util/logging.h"
namespace leveldb {
@ -30,6 +32,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) {
return MakeFileName(dbname, number, "log");
}
std::string KVLogFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "kvlog");
}
std::string TableFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "ldb");
@ -112,6 +119,8 @@ bool ParseFileName(const std::string& filename, uint64_t* number,
*type = kTableFile;
} else if (suffix == Slice(".dbtmp")) {
*type = kTempFile;
} else if (suffix == Slice(".kvlog")) {
*type = kKVLogFile;
} else {
return false;
}

+ 3
- 0
db/filename.h Просмотреть файл

@ -25,6 +25,7 @@ enum FileType {
kDescriptorFile,
kCurrentFile,
kTempFile,
kKVLogFile,
kInfoLogFile // Either the current one, or an old one
};
@ -33,6 +34,8 @@ enum FileType {
// "dbname".
std::string LogFileName(const std::string& dbname, uint64_t number);
std::string KVLogFileName(const std::string& dbname, uint64_t number);
// Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".

+ 6
- 2
db/log_writer.cc Просмотреть файл

@ -4,6 +4,7 @@
#include "db/log_writer.h"
#include "db/log_format.h"
#include <cstdint>
#include "leveldb/env.h"
@ -20,12 +21,12 @@ static void InitTypeCrc(uint32_t* type_crc) {
}
}
Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0), pos_(0) {
InitTypeCrc(type_crc_);
}
Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
: dest_(dest), block_offset_(dest_length % kBlockSize), pos_(dest_length) {
InitTypeCrc(type_crc_);
}
@ -49,6 +50,7 @@ Status Writer::AddRecord(const Slice& slice) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
pos_ += leftover;
}
block_offset_ = 0;
}
@ -97,8 +99,10 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
pos_ += kHeaderSize;
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
pos_ += length;
if (s.ok()) {
s = dest_->Flush();
}

+ 1
- 0
db/log_writer.h Просмотреть файл

@ -41,6 +41,7 @@ class Writer {
WritableFile* dest_;
int block_offset_; // Current offset in block
uint64_t pos_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the

+ 13
- 4
db/memtable.cc Просмотреть файл

@ -4,6 +4,7 @@
#include "db/memtable.h"
#include "db/dbformat.h"
#include <climits>
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
@ -77,13 +78,14 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// key bytes : char[internal_key.size() - 8 - 8]
// FileNumber uint64
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
size_t internal_key_size = key_size + 8 + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
@ -91,6 +93,12 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
if(value.size()) {
std::memcpy(p, value.data(), 8);
} else {
EncodeFixed64(p, UINT64_MAX);
}
p += 8;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
@ -106,7 +114,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// userkey char[klength - 8]
// FileNumber uint64
// tag uint64
// vlength varint32
// value char[vlength]
@ -117,7 +126,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
Slice(key_ptr, key_length - 8 - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {

+ 30
- 1
db/version_edit.cc Просмотреть файл

@ -20,7 +20,9 @@ enum Tag {
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
kNewKVLog = 10,
kDeletedKVLog = 11,
};
void VersionEdit::Clear() {
@ -82,6 +84,17 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
for(int i = 0; i < new_kvlogs_.size(); i++) {
const FileMetaData& f = new_kvlogs_[i];
PutVarint32(dst, kNewKVLog);
PutVarint64(dst, f.number);
}
for(const auto& deleted_kvlog_number : deleted_kvlogs_) {
PutVarint32(dst,kDeletedKVLog);
PutVarint64(dst, deleted_kvlog_number);
}
}
static bool GetInternalKey(Slice* input, InternalKey* dst) {
@ -186,6 +199,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kNewKVLog:
if(GetVarint64(&input, &f.number)) {
new_kvlogs_.push_back(f);
} else {
msg = "new-kvlog entry";
}
break;
case kDeletedKVLog:
if(GetVarint64(&input, &number)) {
deleted_kvlogs_.insert(number);
} else {
msg = "deleted kvlog";
}
break;
default:
msg = "unknown tag";
break;

+ 16
- 0
db/version_edit.h Просмотреть файл

@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_DB_VERSION_EDIT_H_
#define STORAGE_LEVELDB_DB_VERSION_EDIT_H_
#include <cstdint>
#include <set>
#include <utility>
#include <vector>
@ -75,6 +76,18 @@ class VersionEdit {
deleted_files_.insert(std::make_pair(level, file));
}
//TODO:
void AddKVLogs(uint64_t file) {
FileMetaData f;
f.number = file;
new_kvlogs_.push_back(f);
}
void RemoveKVLogs(uint64_t file) {
deleted_kvlogs_.insert(file);
}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
@ -99,6 +112,9 @@ class VersionEdit {
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
std::set<uint64_t> deleted_kvlogs_;
std::vector<FileMetaData> new_kvlogs_;
};
} // namespace leveldb

+ 55
- 0
db/version_set.cc Просмотреть файл

@ -4,7 +4,9 @@
#include "db/version_set.h"
#include "db/version_edit.h"
#include <algorithm>
#include <cstdint>
#include <cstdio>
#include "db/filename.h"
@ -244,6 +246,18 @@ void Version::AddIterators(const ReadOptions& options,
}
}
void Version::AddDeepIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {
@ -583,6 +597,12 @@ class VersionSet::Builder {
}
};
struct BySmallestFileNum {
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
return (f1->number < f2->number);
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
@ -593,6 +613,9 @@ class VersionSet::Builder {
Version* base_;
LevelState levels_[config::kNumLevels];
std::set<uint64_t> deleted_kvlogs;
std::set<FileMetaData*,BySmallestFileNum> added_kvlogs;
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
@ -666,6 +689,17 @@ class VersionSet::Builder {
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
//Delete KVLogs
for(const auto& deleted_kvlog : edit->deleted_kvlogs_) {
deleted_kvlogs.insert(deleted_kvlog);
}
//Add new KVLogs
for(const auto& added_kvlog : edit->new_kvlogs_) {
FileMetaData *f = new FileMetaData(added_kvlog);
f->refs = 1;
added_kvlogs.insert(f);
}
}
// Save the current state in *v.
@ -712,6 +746,18 @@ class VersionSet::Builder {
}
#endif
}
//合并一个版本的kvlog以及当前edit的新增,但是不包含被删除的kvlog
for(const auto &kvlog : base_->kvlogs_) {
if(deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}
for(const auto &kvlog : added_kvlogs) {
if(deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
@ -1158,6 +1204,15 @@ void VersionSet::AddLiveFiles(std::set* live) {
}
}
void VersionSet::AddLiveKVLogs(std::set<uint64_t>* live_kvlogs_) {
for(Version* v = dummy_versions_.next_; v != &dummy_versions_;
v = v->next_) {
for(int i = 0; i < v->kvlogs_.size(); i++) {
live_kvlogs_->insert(v->kvlogs_[i]->number);
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);

+ 7
- 0
db/version_set.h Просмотреть файл

@ -15,6 +15,8 @@
#ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_
#define STORAGE_LEVELDB_DB_VERSION_SET_H_
#include "db/db_impl.h"
#include <cstdint>
#include <map>
#include <set>
#include <vector>
@ -68,6 +70,7 @@ class Version {
// yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
void AddDeepIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
@ -117,6 +120,7 @@ class Version {
private:
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
class LevelFileNumIterator;
@ -152,6 +156,7 @@ class Version {
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
std::vector<FileMetaData*> kvlogs_;
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
@ -258,6 +263,8 @@ class VersionSet {
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);
void AddLiveKVLogs(std::set<uint64_t>* live_kvlogs);
// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

+ 44
- 0
db/write_batch.cc Просмотреть файл

@ -15,12 +15,18 @@
#include "leveldb/write_batch.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include <cstdint>
#include "leveldb/db.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "util/coding.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
@ -147,4 +153,42 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
class KPBatchConstructor : public WriteBatch::Handler {
public:
void Put(const Slice& key, const Slice& value) override {
int addend_key = EncodeVarint32(buf, key.size()) - buf;
int addend_value = EncodeVarint32(buf, value.size()) - buf;
fp.FileOffset += 1 + addend_key + key.size() + addend_value;
fp.Size = value.size();
EncodeFP(fp, rep);
kp_batch->Put(key, Slice(rep,3 * 8));
fp.FileOffset += fp.Size;
// printf("key:%s, file num:%ld,offset:%ld,size:%ld\n",key.ToString().c_str(),fp.FileNumber,fp.FileOffset,fp.Size);
}
void Delete(const Slice& key) override {
kp_batch->Delete(key);
int addend= EncodeVarint32(buf, key.size()) - buf;
fp.FileOffset += 1 + addend + key.size();
}
WriteBatch *kp_batch;
FilePointer fp;
char rep[8 * 3];
char buf[5];
};
Status WriteBatchInternal::ConstructKPBatch(WriteBatch *kp_batch,
const WriteBatch *write_batch, FilePointer fp) {
kp_batch->Clear();
KPBatchConstructor constructor;
constructor.kp_batch = kp_batch;
constructor.fp.FileNumber = fp.FileNumber;
constructor.fp.FileOffset = fp.FileOffset + kHeader;
constructor.fp.Size = 0;
return write_batch->Iterate(&constructor);
}
} // namespace leveldb

+ 6
- 0
db/write_batch_internal.h Просмотреть файл

@ -5,7 +5,10 @@
#ifndef STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#define STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#include "db/db_impl.h"
#include "db/dbformat.h"
#include <cstdint>
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
namespace leveldb {
@ -38,6 +41,9 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static void Append(WriteBatch* dst, const WriteBatch* src);
static Status ConstructKPBatch(WriteBatch* kp_batch,
const WriteBatch* write_batch,FilePointer fp);
};
} // namespace leveldb

+ 1
- 1
fielddb/field_db.h Просмотреть файл

@ -145,7 +145,7 @@ private:
// std::fflush(stdout);
// }
// }
// };
};
Status DestroyDB(const std::string& name,
const Options& options);

+ 2
- 0
include/leveldb/options.h Просмотреть файл

@ -162,6 +162,8 @@ struct LEVELDB_EXPORT ReadOptions {
// not have been released). If "snapshot" is null, use an implicit
// snapshot of the state at the beginning of this read operation.
const Snapshot* snapshot = nullptr;
bool decode = true; //FilePointer解析成为实际的value
};
// Options that control write operations

+ 109
- 0
kv_sep/kvlog.cc Просмотреть файл

@ -0,0 +1,109 @@
#include "kv_sep/kvlog.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include <cstdint>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "util/coding.h"
namespace leveldb {
void EncodeFP(const struct FilePointer &fp, char *scratch) {
EncodeFixed64(scratch, fp.FileNumber);
EncodeFixed64(scratch + 8, fp.FileOffset);
EncodeFixed64(scratch + 16, fp.Size);
}
void DecodeFp(struct FilePointer &fp, char *src) {
fp.FileNumber = DecodeFixed64(src);
fp.FileOffset = DecodeFixed64(src + 8);
fp.Size = DecodeFixed64(src + 16);
}
KVLog::KVLog(WritableFile *dest,uint64_t file_number):
dest_(dest),pos_(0),file_number(file_number) {}
KVLog::KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number):
dest_(dest),pos_(dest_length),file_number(file_number) {}
Status KVLog::AddRecord(const Slice &slice, FilePointer &fp) {
//写入slice大小
EncodeFixed64(buf, slice.size());
Status s = dest_->Append(Slice(buf,8));
pos_ += 8;
//写入slice
fp.FileNumber = file_number;
fp.FileOffset = pos_;
fp.Size = slice.size();
s = dest_->Append(slice);
pos_ += slice.size();
if(s.ok()) {
s = dest_->Flush();
}
return s;
}
void KVLogReader::Next() {
if(input.empty()) {
NextWriteBatch();
if(!Valid()) return;
}
NextKV();
}
void KVLogReader::NextWriteBatch() {
Slice num;
file->Read(8, &num, number);
if(num.size() !=8) {
valid = false;
return;
}
uint64_t batch_size = DecodeFixed64(number);
if(batch_size > rep_size) {
delete[] rep_;
rep_ = new char[batch_size];
rep_size = batch_size;
}
input.clear();
file->Read(batch_size,&input,rep_);
if(input.size() != batch_size) {
valid = false;
return;
}
seq = DecodeFixed64(input.data()) - 1;//-1是为了和后面的nextkv的实现对齐
input.remove_prefix(12); //remove writebatch header
}
void KVLogReader::NextKV() {
seq++;
type = (ValueType)input[0];
input.remove_prefix(1);
switch (type) {
case kTypeValue:
if(GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
} else {
valid = false;
return;
}
break;
case kTypeDeletion:
if(GetLengthPrefixedSlice(&input, &key)) {
} else {
valid = false;
return;
}
break;
default:
valid = false;
return;
}
}
}

+ 72
- 0
kv_sep/kvlog.h Просмотреть файл

@ -0,0 +1,72 @@
#pragma once
#include "db/dbformat.h"
#include <cstddef>
#include <cstdint>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
namespace leveldb {
struct FilePointer {
uint64_t FileNumber;
uint64_t FileOffset;
uint64_t Size;
};
void EncodeFP(const struct FilePointer &fp,char *scratch);
void DecodeFp(struct FilePointer &fp, char *src);
class KVLog {
public:
explicit KVLog(WritableFile *dest,uint64_t file_number);
KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number);
KVLog(const KVLog&) = delete;
KVLog& operator=(const KVLog&) = delete;
~KVLog() = default;
Status AddRecord(const Slice& slice,FilePointer &fp);
private:
WritableFile* dest_;
uint64_t pos_;
uint64_t file_number;
char buf[8];
};
class KVLogReader {
public:
KVLogReader(SequentialFile *file):file(file),rep_(nullptr),rep_size(0),valid(true)
{ Next();};
~KVLogReader() {
delete rep_;
}
ValueType Type() {return type;}
Slice Key() {return key;}
Slice Value() {return value;}
SequenceNumber Seq() {return seq;}
bool Valid() {return valid;}
void Next();
private:
void NextWriteBatch();
void NextKV();
private:
Slice key;
Slice value;
SequenceNumber seq;
ValueType type;
bool valid;
SequentialFile *file;
Slice input;
uint64_t rep_size;
char *rep_;
char number[8];
};
}

+ 0
- 0
kv_sep/kvlog_cache.cc Просмотреть файл


+ 0
- 0
kv_sep/kvlog_cache.h Просмотреть файл


+ 86
- 0
test/test.cc Просмотреть файл

@ -0,0 +1,86 @@
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "util/random.h"
#include <bits/stdc++.h>
#include <string>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
DestroyDB("testdb_kv", Options());
Status status = DB::Open(op, "testdb_kv", &db);
// assert(status.ok());
// string key = "leveldb",value = to_string(0);
// string res;
// for(int i = 0; i < 5; i++) {
// db->Put(WriteOptions(),key,to_string(i));
// }
// db->Put(WriteOptions(),key+key,to_string(0));
// db->Put(WriteOptions(),key+key,to_string(1));
// sleep(1);
// auto snapshot = db->GetSnapshot();
// auto readopts = ReadOptions();
// readopts.snapshot = snapshot;
// db->Put(WriteOptions(),key,to_string(10));
// db->CompactRange(nullptr,nullptr);
// db->Get(readopts,key,&res);
// cout<<"with snapshot:"<<res<<endl;
// res.clear();
// db->Get(ReadOptions(),key,&res);
// cout<<"without snapshot:"<<res<<endl;
// string value = "leveldb",key = "abc";
// db->Delete(WriteOptions(),key);
// db->Put(WriteOptions(),key,value);
// const Snapshot *snapshot = db->GetSnapshot();
// ReadOptions read_op = ReadOptions();
// read_op.snapshot = snapshot;
// string result;
// db->Get(read_op,key,&result);
// cout<<result<<endl;
// db->Delete(WriteOptions(),key);
// result.clear();
// db->Get(read_op,key,&result);
// cout<<"with snapshot:"<<result<<endl;
// result.clear();
// db->Get(ReadOptions(),key,&result);
// cout<<"without snapshot:"<<result<<endl;
// db->ReleaseSnapshot(snapshot);
string key,value = "leveldb";
key.resize(1000,'a');
// value.resize(10,'b');
value += "index";
Random ran(0);
for(int i = 0; i < 100000; i++) {
for(int j = 0; j < 20; j++) {
int rand = ran.Uniform(200 * 10);
string tk = key + std::to_string(rand);
string tv = value + std::to_string(rand) + "_" + std::to_string(i);
db->Put(WriteOptions(),tk,tv);
}
if(i && i%1000 ==0) {
std::cout << "iteration:" << i << std::endl;
}
}
std::string res;
for(int i = 0; i < 10; i++) {
db->Get(ReadOptions(), key + std::to_string(i), &res);
std::cout << i << " " << res << std::endl;
}
delete db;
// DB::Open(op,"testdb",&db);
// key = "abc", value = "";
// db->Get(ReadOptions(),key,&value);
// cout<<value;
// // DestroyDB("testdb",op);
// delete db;
return 0;
}

Загрузка…
Отмена
Сохранить