|
|
@ -1,9 +1,14 @@ |
|
|
|
#include "fielddb/field_db.h"
|
|
|
|
#include <climits>
|
|
|
|
#include <cstdint>
|
|
|
|
#include <cstdio>
|
|
|
|
#include <iostream>
|
|
|
|
#include <string>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <utility>
|
|
|
|
#include <vector>
|
|
|
|
#include "leveldb/c.h"
|
|
|
|
#include "leveldb/cache.h"
|
|
|
|
#include "leveldb/db.h"
|
|
|
|
#include "leveldb/env.h"
|
|
|
|
#include "leveldb/iterator.h"
|
|
|
@ -22,7 +27,7 @@ |
|
|
|
namespace fielddb { |
|
|
|
using namespace leveldb; |
|
|
|
//TODO:打开fieldDB
|
|
|
|
Status FieldDB::OpenFieldDB(const Options& options, |
|
|
|
Status FieldDB::OpenFieldDB(Options& options, |
|
|
|
const std::string& name, FieldDB** dbptr) { |
|
|
|
// options.env->CreateDir("./abc")
|
|
|
|
if(*dbptr == nullptr){ |
|
|
@ -32,11 +37,18 @@ Status FieldDB::OpenFieldDB(const Options& options, |
|
|
|
//
|
|
|
|
Status status; |
|
|
|
DB *indexdb, *kvdb, *metadb; |
|
|
|
// options.block_cache = NewLRUCache(ULONG_MAX);
|
|
|
|
// options.max_open_files = 1000;
|
|
|
|
// options.write_buffer_size = 512 * 1024 * 1024;
|
|
|
|
// options.env = getPosixEnv();
|
|
|
|
status = Open(options, name+"_indexDB", &indexdb); |
|
|
|
if(!status.ok()) return status; |
|
|
|
|
|
|
|
|
|
|
|
// options.env = getPosixEnv();
|
|
|
|
status = Open(options, name+"_kvDB", &kvdb); |
|
|
|
if(!status.ok()) return status; |
|
|
|
|
|
|
|
// options.env = getPosixEnv();
|
|
|
|
status = Open(options, name+"_metaDB", &metadb); |
|
|
|
if(!status.ok()) return status; |
|
|
|
|
|
|
@ -45,7 +57,7 @@ Status FieldDB::OpenFieldDB(const Options& options, |
|
|
|
(*dbptr)->metaDB_ = metadb; |
|
|
|
(*dbptr)->dbname_ = name; |
|
|
|
|
|
|
|
status = (*dbptr)->Recover(); |
|
|
|
// status = (*dbptr)->Recover();
|
|
|
|
|
|
|
|
(*dbptr)->options_ = &options; |
|
|
|
(*dbptr)->env_ = options.env; |
|
|
@ -118,6 +130,7 @@ Request *FieldDB::GetHandleInterval() { |
|
|
|
} |
|
|
|
|
|
|
|
Status FieldDB::HandleRequest(Request &req) { |
|
|
|
uint64_t start_ = env_->NowMicros(); |
|
|
|
MutexLock L(&mutex_); |
|
|
|
taskqueue_.push_back(&req); |
|
|
|
Again: |
|
|
@ -136,33 +149,61 @@ Again: |
|
|
|
{ |
|
|
|
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
|
|
|
|
MutexLock iL(&index_mu); |
|
|
|
uint64_t start_construct = env_->NowMicros(); |
|
|
|
for(auto *req_ptr : taskqueue_) { |
|
|
|
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); |
|
|
|
if(req_ptr == tail) break; |
|
|
|
} |
|
|
|
construct_elapsed += env_->NowMicros() - start_construct; |
|
|
|
} |
|
|
|
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
|
|
|
|
//此处可以放锁是因为写入的有序性可以通过队列来保证
|
|
|
|
mutex_.Unlock(); |
|
|
|
uint64_t start_write = env_->NowMicros(); |
|
|
|
WriteOptions op; |
|
|
|
status = metaDB_->Write(op, &MetaBatch); |
|
|
|
assert(status.ok()); |
|
|
|
if(MetaBatch.ApproximateSize() > 12) { |
|
|
|
uint64_t start_meta = env_->NowMicros(); |
|
|
|
status = metaDB_->Write(op, &MetaBatch); |
|
|
|
write_meta_elapsed += env_->NowMicros() - start_meta; |
|
|
|
write_bytes += MetaBatch.ApproximateSize(); |
|
|
|
assert(status.ok()); |
|
|
|
} |
|
|
|
//TODO:index的写入需要在另外一个线程中同时完成
|
|
|
|
status = indexDB_->Write(op, &IndexBatch); |
|
|
|
assert(status.ok()); |
|
|
|
status = kvDB_->Write(op, &KVBatch); |
|
|
|
assert(status.ok()); |
|
|
|
if(IndexBatch.ApproximateSize() > 12) { |
|
|
|
uint64_t start_index = env_->NowMicros(); |
|
|
|
status = indexDB_->Write(op, &IndexBatch); |
|
|
|
write_index_elapsed += env_->NowMicros() - start_index; |
|
|
|
write_bytes += IndexBatch.ApproximateSize(); |
|
|
|
assert(status.ok()); |
|
|
|
} |
|
|
|
if(KVBatch.ApproximateSize() > 12) { |
|
|
|
uint64_t start_kv = env_->NowMicros(); |
|
|
|
status = kvDB_->Write(op, &KVBatch); |
|
|
|
write_kv_elapsed += env_->NowMicros() - start_kv; |
|
|
|
write_bytes += KVBatch.ApproximateSize(); |
|
|
|
assert(status.ok()); |
|
|
|
} |
|
|
|
//3. 将meta数据清除
|
|
|
|
MetaCleaner cleaner; |
|
|
|
cleaner.Collect(MetaBatch); |
|
|
|
cleaner.CleanMetaBatch(metaDB_); |
|
|
|
if(MetaBatch.ApproximateSize() > 12) { |
|
|
|
uint64_t start_clean = env_->NowMicros(); |
|
|
|
MetaCleaner cleaner; |
|
|
|
cleaner.Collect(MetaBatch); |
|
|
|
cleaner.CleanMetaBatch(metaDB_); |
|
|
|
write_clean_elapsed += env_->NowMicros() - start_clean; |
|
|
|
} |
|
|
|
write_elapsed += env_->NowMicros() - start_write; |
|
|
|
mutex_.Lock(); |
|
|
|
} else { |
|
|
|
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
|
|
|
|
MutexLock iL(&index_mu); |
|
|
|
req.Prepare(this); |
|
|
|
} |
|
|
|
|
|
|
|
// {
|
|
|
|
// static int count = 0;
|
|
|
|
// if(count++ % 100000 == 0) {
|
|
|
|
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
while(true) { |
|
|
|
Request *ready = taskqueue_.front(); |
|
|
|
// int debug = tail->type_;
|
|
|
@ -175,6 +216,11 @@ Again: |
|
|
|
} |
|
|
|
if (ready == tail) break; |
|
|
|
} |
|
|
|
|
|
|
|
elapsed += env_->NowMicros() - start_; |
|
|
|
count ++; |
|
|
|
dumpStatistics(); |
|
|
|
|
|
|
|
if(!taskqueue_.empty()) { |
|
|
|
taskqueue_.front()->cond_.Signal(); |
|
|
|
} |
|
|
@ -218,8 +264,19 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { |
|
|
|
} |
|
|
|
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
|
|
|
|
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { |
|
|
|
{ |
|
|
|
uint64_t start_ = env_->NowMicros(); |
|
|
|
Status status = kvDB_->Write(options, updates); |
|
|
|
temp_elapsed += env_->NowMicros() - start_; |
|
|
|
count ++; |
|
|
|
dumpStatistics(); |
|
|
|
return status; |
|
|
|
} |
|
|
|
|
|
|
|
//或许应该再做一个接口?或者基于现有的接口进行改造
|
|
|
|
uint64_t start_ = env_->NowMicros(); |
|
|
|
BatchReq req(updates,&mutex_); |
|
|
|
construct_BatchReq_init_elapsed += env_->NowMicros() - start_; |
|
|
|
Status status = HandleRequest(req); |
|
|
|
return status; |
|
|
|
assert(0); |
|
|
|