10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 

418 lignes
14 KiB

#include "fielddb/field_db.h"
#include <climits>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <string>
#include <sys/types.h>
#include <utility>
#include <vector>
#include "leveldb/c.h"
#include "leveldb/cache.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
#include "field_db.h"
namespace fielddb {
using namespace leveldb;
//TODO:打开fieldDB
Status FieldDB::OpenFieldDB(Options& options,
const std::string& name, FieldDB** dbptr) {
// options.env->CreateDir("./abc")
if(*dbptr == nullptr){
return Status::NotSupported(name, "new a fieldDb first\n");
}
//
Status status;
DB *indexdb, *kvdb, *metadb;
// options.block_cache = NewLRUCache(ULONG_MAX);
// options.max_open_files = 1000;
// options.write_buffer_size = 512 * 1024 * 1024;
// options.env = getPosixEnv();
status = Open(options, name+"_indexDB", &indexdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
status = Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
// options.env = getPosixEnv();
status = Open(options, name+"_metaDB", &metadb);
if(!status.ok()) return status;
(*dbptr)->indexDB_ = indexdb;
(*dbptr)->kvDB_ = kvdb;
(*dbptr)->metaDB_ = metadb;
(*dbptr)->dbname_ = name;
status = (*dbptr)->Recover();
(*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env;
return status;
}
Status FieldDB::Recover() {
//1. 遍历所有Index类型的meta,重建内存中的index_状态表
Iterator *Iter = indexDB_->NewIterator(ReadOptions());
std::string IndexKey;
Iter->SeekToFirst();
while(Iter->Valid()) {
IndexKey = Iter->key().ToString();
ParsedInternalIndexKey ParsedIndex;
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex);
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr};
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符)
std::string Seek;
PutLengthPrefixedSlice(&Seek, ParsedIndex.name_);
Seek.push_back(0xff);
Iter->Seek(Slice(Seek));
}
delete Iter;
//2. 寻找所有KV类型的meta,再次提交一遍请求
Iter = metaDB_->NewIterator(ReadOptions());
Slice MetaValue;
Iter->SeekToFirst();
while (Iter->Valid()) {
MetaValue = Iter->key();
MetaType type = MetaType(DecodeFixed32(MetaValue.data()));
MetaValue.remove_prefix(4);//移除头上的metaType的部分
Slice extractKey;
GetLengthPrefixedSlice(&MetaValue, &extractKey);
if(type == KV_Creating) {
FieldArray fields;
ParseValue(Iter->value().ToString(), &fields);
PutFields(WriteOptions(), extractKey, fields);
} else if(type == KV_Deleting) {
Delete(WriteOptions(), extractKey);
} else {
assert(0 && "Invalid MetaType");
}
Iter->Next();
}
delete Iter;
//在所有的请求完成后,会自动把metaDB的内容清空。
Iter = metaDB_->NewIterator(ReadOptions());
Iter->SeekToFirst();
std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
delete Iter;
//3. 等待所有请求完成
return Status::OK();
}
Request *FieldDB::GetHandleInterval() {
mutex_.AssertHeld(); //保证队列是互斥访问的
Request *tail = taskqueue_.front();
for(auto *req_ptr : taskqueue_) {
if(req_ptr->isiDeleteReq() || req_ptr->isiCreateReq()) {
return tail;
}
tail = req_ptr;
}
return tail;
}
Status FieldDB::HandleRequest(Request &req) {
uint64_t start_ = env_->NowMicros();
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
while(true){
while(!req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
if(req.done) {
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
uint64_t start_construct = env_->NowMicros();
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break;
}
construct_elapsed += env_->NowMicros() - start_construct;
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
WriteOptions op;
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch);
write_meta_elapsed += env_->NowMicros() - start_meta;
write_bytes += MetaBatch.ApproximateSize();
assert(status.ok());
}
//TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) {
uint64_t start_index = env_->NowMicros();
status = indexDB_->Write(op, &IndexBatch);
write_index_elapsed += env_->NowMicros() - start_index;
write_bytes += IndexBatch.ApproximateSize();
assert(status.ok());
}
if(KVBatch.ApproximateSize() > 12) {
uint64_t start_kv = env_->NowMicros();
status = kvDB_->Write(op, &KVBatch);
write_kv_elapsed += env_->NowMicros() - start_kv;
write_bytes += KVBatch.ApproximateSize();
assert(status.ok());
}
//3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_clean = env_->NowMicros();
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
write_clean_elapsed += env_->NowMicros() - start_clean;
}
write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
// {
// static int count = 0;
// if(count++ % 100000 == 0) {
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
// }
// }
while(true) {
Request *ready = taskqueue_.front();
// int debug = tail->type_;
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
if (ready != &req) ready->cond_.Signal();
}
if (ready == tail) break;
}
elapsed += env_->NowMicros() - start_;
count ++;
dumpStatistics();
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
}
}
// 这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
FieldArray FA = {{"",value.ToString()}};
return PutFields(options, key, FA);
// return kvDB_->Put(options, key, value);
}
// 需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) {
std::string key_ = key.ToString();
FieldArray fields_ = fields;
FieldsReq req(&key_,&fields_,&mutex_);
Status status = HandleRequest(req);
return status;
}
// 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req);
return status;
}
// 根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
// {
// uint64_t start_ = env_->NowMicros();
// Status status = kvDB_->Write(options, updates);
// temp_elapsed += env_->NowMicros() - start_;
// count ++;
// dumpStatistics();
// return status;
// }
uint64_t start_ = env_->NowMicros();
BatchReq req(updates,&mutex_);
construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
Status status = HandleRequest(req);
return status;
}
//由于常规put将空串作为name,这里也需要适当修改
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
FieldArray fields;
Status s = GetFields(options, key, &fields);
if(!s.ok()) {
return s;
}
*value = fields[0].second;
return s;
}
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) {
return kvDB_->GetFields(options, key, fields);
}
std::vector<std::string> FieldDB::FindKeysByField(Field &field) {
return kvDB_->FindKeysByField(field);
}
std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName (
const std::string &fieldName){
std::vector<std::pair<std::string, std::string>> result;
auto iter = kvDB_->NewIterator(ReadOptions());
std::string val;
for(iter->SeekToFirst();iter->Valid();iter->Next()) {
InternalFieldArray fields(iter->value());
val = fields.ValOfName(fieldName);
if(!val.empty()) {
result.push_back(std::make_pair(iter->key().ToString(), val));
}
}
delete iter;
return result;
}
Status FieldDB::CreateIndexOnField(const std::string& field_name) {
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
HandleRequest(req);
//如果已经存在索引,那么直接返回
if(req.Existed) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
}
Status FieldDB::DeleteIndex(const std::string &field_name) {
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
HandleRequest(req);
//如果已经被删除或者不存在,那么可以直接返回
if(req.Deleted) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
}
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) {
if (index_.count(field.first) == 0 || index_[field.first].first != Exist){
*s = Status::NotFound(Slice());
return std::vector<std::string>();
}
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(Slice(), field.first, field.second));
Iterator *indexIterator = indexDB_->NewIterator(ReadOptions());
indexIterator->Seek(indexKey);
std::vector<std::string> result;
for (; indexIterator->Valid(); indexIterator->Next()) {
ParsedInternalIndexKey iterKey;
if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){
if (iterKey.name_ == field.first && iterKey.val_ == field.second){
result.push_back(iterKey.user_key_.ToString());
continue; //查到说明在范围里,否则break
}
}
break;
}
delete indexIterator;
*s = Status::OK();
return result;
}
IndexStatus FieldDB::GetIndexStatus(const std::string &fieldName){
if (index_.count(fieldName) == 0) return IndexStatus::NotExist;
IndexStatus idxs = index_[fieldName].first;
return idxs;
}
Iterator * FieldDB::NewIterator(const ReadOptions &options) {
return kvDB_->NewIterator(options);
}
// TODO:使用统一seq进行snapshot管理
const Snapshot * FieldDB::GetSnapshot() {
return kvDB_->GetSnapshot();
}
// TODO:同上
void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) {
kvDB_->ReleaseSnapshot(snapshot);
}
bool FieldDB::GetProperty(const Slice &property, std::string *value) {
return kvDB_->GetProperty(property, value) | indexDB_->GetProperty(property, value);
}
void FieldDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) {
uint64_t temp = 0;
kvDB_->GetApproximateSizes(range, n, sizes);
indexDB_->GetApproximateSizes(range, n, &temp);
*sizes += temp;
}
void FieldDB::CompactRange(const Slice *begin, const Slice *end) {
kvDB_->CompactRange(begin, end);
}
Status DestroyDB(const std::string& name, const Options& options) {
Status s;
s = leveldb::DestroyDB(name+"_kvDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_indexDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_metaDB", options);
assert(s.ok());
return s;
}
FieldDB::~FieldDB() {
delete indexDB_;
delete kvDB_;
delete metaDB_;
}
} // namespace fielddb