Bläddra i källkod

使用SliceHashTable代替unordered_set<string>,并增加了一些benchmark

pull/2/head
cyq 8 månader sedan
förälder
incheckning
9e6d5eb832
6 ändrade filer med 56 tillägg och 27 borttagningar
  1. +1
    -1
      db/db_impl.cc
  2. +24
    -0
      fielddb/SliceHashSet.h
  3. +10
    -7
      fielddb/field_db.cpp
  4. +3
    -2
      fielddb/field_db.h
  5. +11
    -11
      fielddb/request.cpp
  6. +7
    -6
      fielddb/request.h

+ 1
- 1
db/db_impl.cc Visa fil

@ -1310,7 +1310,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Total_elapsed += env_->NowMicros() - start_;
NoWaiting_elapsed += env_->NowMicros() - start_;
Nowaited_count ++;
dumpStatistics();
// dumpStatistics();
return status;
}

+ 24
- 0
fielddb/SliceHashSet.h Visa fil

@ -0,0 +1,24 @@
# ifndef SLICE_HASH_SET_H
# define SLICE_HASH_SET_H
#include "leveldb/slice.h"
#include "util/hash.h"
#include <unordered_set>
using namespace leveldb;
class SliceHash {
public:
uint32_t operator()(const Slice &lhs) const {
return Hash(lhs.data(),lhs.size(),0x1234);
}
};
class SliceEq {
public:
bool operator()(const Slice &lhs, const Slice &rhs) const {
return lhs == rhs;
}
};
using SliceHashSet = std::unordered_set<Slice,SliceHash,SliceEq>;
#endif

+ 10
- 7
fielddb/field_db.cpp Visa fil

@ -19,6 +19,7 @@
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
#include "field_db.h"
#include "fielddb/SliceHashSet.h"
namespace fielddb {
using namespace leveldb;
@ -36,15 +37,17 @@ Status FieldDB::OpenFieldDB(Options& options,
// options.block_cache = NewLRUCache(ULONG_MAX);
// options.max_open_files = 1000;
// options.write_buffer_size = 512 * 1024 * 1024;
// options.env = getPosixEnv();
//这里是为了让3个数据库有独立的的Background thread
options.env = getPosixEnv();
status = Open(options, name+"_indexDB", &indexdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
options.env = getPosixEnv();
status = Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
options.env = getPosixEnv();
status = Open(options, name+"_metaDB", &metadb);
if(!status.ok()) return status;
@ -127,7 +130,7 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
taskqueue_.push_back(&req);
while(true){
uint64_t start_waiting = env_->NowMicros();
while(!req.done && &req != taskqueue_.front()) {
while(req.isPending() || !req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
waiting_elasped += env_->NowMicros() - start_waiting;
@ -139,7 +142,7 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> batchKeySet;
SliceHashSet batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
@ -312,7 +315,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
@ -328,7 +331,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &o
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
SliceHashSet useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);

+ 3
- 2
fielddb/field_db.h Visa fil

@ -111,7 +111,8 @@ private:
uint64_t write_clean_elapsed = 0;
uint64_t write_bytes = 0;
uint64_t write_bytes_lim = 50 * 1024 * 1024;
uint64_t write_step = 500 * 1024 * 1024;
uint64_t write_bytes_lim = write_step;
uint64_t temp_elapsed = 0;
@ -140,7 +141,7 @@ private:
std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl;
// std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl;
std::cout << "=====================================================\n";
write_bytes_lim = write_bytes + 50 * 1024 * 1024;
write_bytes_lim = write_bytes + write_step;
std::fflush(stdout);
}
}

+ 11
- 11
fielddb/request.cpp Visa fil

@ -25,7 +25,7 @@ void Request::PendReq(Request *req) {
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
assert(0);
}
@ -48,12 +48,12 @@ bool Request::isPending() {
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key.ToString());
batchKeySet.insert(Key);
}
std::string val_str;
Status s = Status::NotFound("test");
@ -164,12 +164,12 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key.ToString());
batchKeySet.insert(Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
@ -263,7 +263,7 @@ void iCreateReq::PendReq(Request *req) {
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
SliceHashSet &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
@ -322,7 +322,7 @@ void iDeleteReq::PendReq(Request* req) {
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field);
@ -404,10 +404,10 @@ BatchReq::~BatchReq() {
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string> Sub_batchKeySet;
SliceHashSet Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {

+ 7
- 6
fielddb/request.h Visa fil

@ -9,6 +9,7 @@
#include "util/serialize_value.h"
#include <unordered_set>
// #include "fielddb/field_db.h"
#include "fielddb/SliceHashSet.h"
#ifndef REQUEST_H
#define REQUEST_H
@ -48,7 +49,7 @@ public:
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet);
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
@ -87,7 +88,7 @@ public:
}
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
FieldSliceArray SliceFields;
@ -112,7 +113,7 @@ public:
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -129,7 +130,7 @@ public:
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -146,7 +147,7 @@ public:
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
Slice Key;
};
@ -157,7 +158,7 @@ public:
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;

Laddar…
Avbryt
Spara