谢瑞阳&徐翔宇的KV分离LEVELDB实现
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

106 lines
2.7 KiB

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/builder.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
namespace leveldb {
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number);
std::string value_fname = ValueLogFileName(dbname, meta->valuelog_id);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
WritableFile* value_file;
s = env->NewWritableFile(value_fname, &value_file);
if (!s.ok()) {
return s;
}
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
Slice key;
uint64_t valuelog_offset=0;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
Slice value=iter->value();
Slice new_value=value;
if(value.size()>100){
value.remove_prefix(1);
std::string buf;
value_file->Append(value);
buf+=(char)(0x01);
PutVarint64(&buf,meta->valuelog_id);
PutVarint64(&buf,valuelog_offset);
PutVarint64(&buf,value.size());
valuelog_offset+=value.size();
new_value=Slice(buf);
}
builder->Add(key, new_value);
}
value_file->Flush();
value_file->Sync();
value_file->Close();
if (!key.empty()) {
meta->largest.DecodeFrom(key);
}
// Finish and check for builder errors
s = builder->Finish();
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
delete builder;
// Finish and check for file errors
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = nullptr;
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number,
meta->file_size);
s = it->status();
delete it;
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->RemoveFile(fname);
}
return s;
}
} // namespace leveldb