Explorar el Código

Merge branch 'cyq' into ld

pull/2/head
augurier hace 8 meses
padre
commit
4fb627403c
Se han modificado 10 ficheros con 209 adiciones y 66 borrados
  1. +14
    -2
      benchmarks/db_bench.cc
  2. +23
    -3
      benchmarks/db_bench_FieldDB.cc
  3. +14
    -2
      benchmarks/db_bench_testDB.cc
  4. +13
    -1
      db/db_impl.cc
  5. +30
    -0
      db/db_impl.h
  6. +24
    -0
      fielddb/SliceHashSet.h
  7. +15
    -10
      fielddb/field_db.cpp
  8. +7
    -3
      fielddb/field_db.h
  9. +62
    -39
      fielddb/request.cpp
  10. +7
    -6
      fielddb/request.h

+ 14
- 2
benchmarks/db_bench.cc Ver fichero

@ -856,8 +856,20 @@ class Benchmark {
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
std::string name = "customer#" + std::to_string(k);
//这个字段用来查找
std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100);
//这个字段填充长度
std::string tag = gen.Generate(value_size_).ToString();
FieldArray fields = {
{"name", name},
{"age", age},
{"tag", tag}
};
std::string value = SerializeValue(fields);
batch.Put(key.slice(), value);
bytes += value.size() + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);

+ 23
- 3
benchmarks/db_bench_FieldDB.cc Ver fichero

@ -77,7 +77,9 @@ static const char* FLAGS_benchmarks =
"ReadSeqWhileCreating,"
"ReadSeqWhileDeleting,"
"ReadRandomWhileCreating,"
"ReadRandomWhileDeleting,";
"ReadRandomWhileDeleting,"
"WriteRandomWithIndex,"
"WriteSeqWithIndex,";
// Number of key/values to place in database
static int FLAGS_num = 1000000;
@ -340,8 +342,8 @@ class Stats {
// elapsed times.
double elapsed = (finish_ - start_) * 1e-6;
char rate[100];
std::snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / elapsed);
std::snprintf(rate, sizeof(rate), "%6.1f MB/s Bytes:%6.1f elapsed(s):%6.1f seconds:%6.1f ",
(bytes_ / 1048576.0) / elapsed,(bytes_ / 1048576.0),elapsed,seconds_);
extra = rate;
}
AppendWithSpace(&extra, message_);
@ -678,6 +680,12 @@ class Benchmark {
} else if (name == Slice("ReadRandomWhileDeleting")) {
num_threads++;
method = &Benchmark::ReadRandomWhileDeleting;
} else if (name == Slice("WriteRandomWithIndex")) {
fresh_db = true;
method = &Benchmark::WriteRandomWithIndex;
} else if (name == Slice("WriteSeqWithIndex")) {
fresh_db = true;
method = &Benchmark::WriteSeqWithIndex;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
@ -1141,6 +1149,18 @@ class Benchmark {
db_->DeleteIndex("age", write_options_);
}
void WriteSeqWithIndex(ThreadState* thread) {
CreateIndex(thread);
thread->stats.Start();
WriteSeq(thread);
}
void WriteRandomWithIndex(ThreadState* thread) {
CreateIndex(thread);
thread->stats.Start();
WriteRandom(thread);
}
void WriteSeqWhileCreating(ThreadState* thread) {
if (thread->tid > 0) {
WriteSeq(thread);

+ 14
- 2
benchmarks/db_bench_testDB.cc Ver fichero

@ -859,8 +859,20 @@ class Benchmark {
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
std::string name = "customer#" + std::to_string(k);
//这个字段用来查找
std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100);
//这个字段填充长度
std::string tag = gen.Generate(value_size_).ToString();
FieldArray fields = {
{"name", name},
{"age", age},
{"tag", tag}
};
std::string value = SerializeValue(fields);
batch.Put(key.slice(), value);
bytes += value.size() + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);

+ 13
- 1
db/db_impl.cc Ver fichero

@ -1235,12 +1235,18 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
w.sync = options.sync;
w.done = false;
uint64_t start_ = env_->NowMicros();
MutexLock l(&mutex_);
count ++;
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
Waiting_elapsed += env_->NowMicros() - start_;
waited_count ++;
Total_elapsed += env_->NowMicros() - start_;
// dumpStatistics();
return w.status;
}
@ -1259,6 +1265,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// into mem_.
{
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
@ -1270,6 +1277,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
BatchSize += write_batch->ApproximateSize();
write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
@ -1298,7 +1307,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
Total_elapsed += env_->NowMicros() - start_;
NoWaiting_elapsed += env_->NowMicros() - start_;
Nowaited_count ++;
// dumpStatistics();
return status;
}

+ 30
- 0
db/db_impl.h Ver fichero

@ -6,7 +6,10 @@
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <deque>
#include <ostream>
#include <set>
#include <string>
@ -210,6 +213,33 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
int count = 0;
int waited_count = 0;
int Nowaited_count = 0;
uint64_t Total_elapsed = 0;
uint64_t Waiting_elapsed = 0;
uint64_t NoWaiting_elapsed = 0;
uint64_t write_elapsed = 0;
uint64_t BatchSize = 0;
const double MB = 1024 * 1024;
const double KB = 1024;
inline void dumpStatistics() {
if(count && count % 500000 == 0) {
printf("==================================\n");
printf("Count: Total:%d Waited:%d Nowaited:%d\n",count,waited_count,Nowaited_count);
printf("%ld %ld %ld\n",Total_elapsed,Waiting_elapsed,NoWaiting_elapsed);
printf("Average Total elapsed: %lf ms\n",Total_elapsed * 1.0 / count);
printf("Average Waiting elapsed: %lf ms\n",Waiting_elapsed * 1.0 / count);
printf("For waiting request: %lf ms\n",Waiting_elapsed * 1.0 / waited_count);
printf("For Nowait request: %lf ms\n",NoWaiting_elapsed * 1.0 / Nowaited_count);
printf("Write elapsed: %lf ms\n",write_elapsed * 1.0 / Nowaited_count);
printf("Average BatchSize: %lfKB\n",BatchSize / KB / count);
printf("Average BatchSize per write:%lfKB\n",BatchSize / KB / Nowaited_count);
printf("==================================\n");
std::fflush(stdout);
}
}
};
// Sanitize db options. The caller should delete result.info_log if

+ 24
- 0
fielddb/SliceHashSet.h Ver fichero

@ -0,0 +1,24 @@
# ifndef SLICE_HASH_SET_H
# define SLICE_HASH_SET_H
#include "leveldb/slice.h"
#include "util/hash.h"
#include <unordered_set>
using namespace leveldb;
class SliceHash {
public:
uint32_t operator()(const Slice &lhs) const {
return Hash(lhs.data(),lhs.size(),0x1234);
}
};
class SliceEq {
public:
bool operator()(const Slice &lhs, const Slice &rhs) const {
return lhs == rhs;
}
};
using SliceHashSet = std::unordered_set<Slice,SliceHash,SliceEq>;
#endif

+ 15
- 10
fielddb/field_db.cpp Ver fichero

@ -19,6 +19,7 @@
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
#include "field_db.h"
#include "fielddb/SliceHashSet.h"
namespace fielddb {
using namespace leveldb;
@ -36,15 +37,17 @@ Status FieldDB::OpenFieldDB(Options& options,
// options.block_cache = NewLRUCache(ULONG_MAX);
// options.max_open_files = 1000;
// options.write_buffer_size = 512 * 1024 * 1024;
// options.env = getPosixEnv();
//这里是为了让3个数据库有独立的的Background thread
options.env = getPosixEnv();
status = Open(options, name+"_indexDB", &indexdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
options.env = getPosixEnv();
status = Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
options.env = getPosixEnv();
status = Open(options, name+"_metaDB", &metadb);
if(!status.ok()) return status;
@ -126,15 +129,20 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
while(true){
while(!req.done && &req != taskqueue_.front()) {
uint64_t start_waiting = env_->NowMicros();
while(req.isPending() || !req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
waiting_elasped += env_->NowMicros() - start_waiting;
if(req.done) {
elapsed += env_->NowMicros() - start_;
count ++;
// dumpStatistics();
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> batchKeySet;
SliceHashSet batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
@ -208,9 +216,6 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
if (ready == tail) break;
}
elapsed += env_->NowMicros() - start_;
count ++;
//dumpStatistics();
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
@ -310,7 +315,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
@ -326,7 +331,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &o
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);

+ 7
- 3
fielddb/field_db.h Ver fichero

@ -111,10 +111,13 @@ private:
uint64_t write_clean_elapsed = 0;
uint64_t write_bytes = 0;
uint64_t write_bytes_lim = 50 * 1024 * 1024;
uint64_t write_step = 500 * 1024 * 1024;
uint64_t write_bytes_lim = write_step;
uint64_t temp_elapsed = 0;
uint64_t waiting_elasped = 0;
inline void dumpStatistics() {
if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) {
std::cout << "=====================================================\n";
@ -134,10 +137,11 @@ private:
std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl;
std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl;
std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
std::cout << "temp_elased : " << temp_elapsed * 1.0 / count<< std::endl;
std::cout << "temp_elased : " << temp_elapsed * 1.0 / count << std::endl;
std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl;
// std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl;
std::cout << "=====================================================\n";
write_bytes_lim = write_bytes + 50 * 1024 * 1024;
write_bytes_lim = write_bytes + write_step;
std::fflush(stdout);
}
}

+ 62
- 39
fielddb/request.cpp Ver fichero

@ -25,7 +25,7 @@ void Request::PendReq(Request *req) {
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
assert(0);
}
@ -48,24 +48,35 @@ bool Request::isPending() {
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key.ToString());
batchKeySet.insert(Key);
}
std::string val_str;
Status s = Status::NotFound("test");
uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str);
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
FieldArray *oldFields;
// FieldArray *oldFields;
FieldSliceArray oldFields;
if (s.IsNotFound()){
oldFields = nullptr;
// oldFields = nullptr;
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
oldFields = new FieldArray;
oldFields = ParseValue(val_str,oldFields);
// oldFields = new FieldArray;
// oldFields = ParseValue(val_str,oldFields);
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
oldFields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
} else {
assert(0);
}
@ -76,8 +87,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求
for(auto [field_name,field_value] : SliceFields) {
if(field_name == EMPTY) break;
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
@ -90,11 +101,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
}
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中
if (oldFields != nullptr){
for(auto [field_name,field_value] : *oldFields) {
if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if (!oldFields.empty()){
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
@ -118,9 +129,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//3.1对于含有索引的oldfield删除索引
if (HasOldIndex) {
for(auto [field_name,field_value] : *oldFields) {
if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
@ -131,8 +142,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//3.2对于含有索引的field建立索引
if (HasIndex) {
for(auto [field_name,field_value] : SliceFields) {
if(field_name == EMPTY) continue;
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
@ -146,19 +157,19 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//优化:对于3.1,3.2中都有的索引只写一次
}
if(oldFields) delete oldFields;
// if(oldFields) delete oldFields;
}
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key.ToString());
batchKeySet.insert(Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
@ -168,18 +179,29 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
if (s.IsNotFound()) return;
FieldArray *Fields = new FieldArray;
ParseValue(val_str,Fields);
// FieldArray *Fields = new FieldArray;
// ParseValue(val_str,Fields);
FieldSliceArray Fields;
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
Fields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
KVBatch.Delete(Slice(Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的delete pend到对应的请求
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
@ -197,9 +219,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
MetaBatch.Put(MetaKey, Slice());
//3.对于含有索引的field删除索引
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
@ -208,7 +230,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
}
}
delete Fields;
// delete Fields;
}
/*******iCreateReq*******/
@ -241,7 +263,7 @@ void iCreateReq::PendReq(Request *req) {
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
@ -300,7 +322,7 @@ void iDeleteReq::PendReq(Request* req) {
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field);
@ -382,15 +404,16 @@ BatchReq::~BatchReq() {
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string> Sub_batchKeySet;
SliceHashSet Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
uint64_t start_sub = DB->env_->NowMicros();
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
// (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet);
DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
DB->count_Batch_Sub ++;
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,

+ 7
- 6
fielddb/request.h Ver fichero

@ -9,6 +9,7 @@
#include "util/serialize_value.h"
#include <unordered_set>
// #include "fielddb/field_db.h"
#include "fielddb/SliceHashSet.h"
#ifndef REQUEST_H
#define REQUEST_H
@ -48,7 +49,7 @@ public:
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet);
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
@ -87,7 +88,7 @@ public:
}
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
FieldSliceArray SliceFields;
@ -112,7 +113,7 @@ public:
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -129,7 +130,7 @@ public:
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -146,7 +147,7 @@ public:
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
};
@ -157,7 +158,7 @@ public:
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;

Cargando…
Cancelar
Guardar