10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

330 行
11 KiB

#include "fielddb/field_db.h"
#include <cstdint>
#include <cstdio>
#include <string>
#include <utility>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "db/write_batch_internal.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
namespace fielddb {
using namespace leveldb;
//TODO:打开fieldDB
Status FieldDB::OpenFieldDB(const Options& options,
const std::string& name, FieldDB** dbptr) {
// options.env->CreateDir("./abc")
if(*dbptr == nullptr){
return Status::NotSupported(name, "new a fieldDb first\n");
}
//
Status status;
DB *indexdb, *kvdb, *metadb;
status = Open(options, name+"_indexDB", &indexdb);
if(!status.ok()) return status;
status = Open(options, name+"_kvDB", &kvdb);
if(!status.ok()) return status;
status = Open(options, name+"_metaDB", &metadb);
if(!status.ok()) return status;
(*dbptr)->indexDB_ = indexdb;
(*dbptr)->kvDB_ = kvdb;
(*dbptr)->metaDB_ = metadb;
(*dbptr)->dbname_ = name;
status = (*dbptr)->Recover();
(*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env;
return status;
}
// TODO:Recover
Status FieldDB::Recover() {
//TODO:
//1. 遍历所有Index类型的meta,重建内存中的index_状态表
//2. 寻找所有KV类型的meta,再次提交一遍请求
//3. 等待所有请求完成
return Status::OK();
}
Request *FieldDB::GetHandleInterval() {
mutex_.AssertHeld(); //保证队列是互斥访问的
Request *tail = taskqueue_.front();
for(auto *req_ptr : taskqueue_) {
if(req_ptr->isDeleteReq() || req_ptr->isiCreateReq()) {
return tail;
}
tail = req_ptr;
}
return tail;
}
Status FieldDB::HandleRequest(Request &req) {
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
Again:
while(!req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
if(req.done) {
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
if(req_ptr == tail) break;
}
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
WriteOptions op;
status = metaDB_->Write(op, &MetaBatch);
//TODO:index的写入需要在另外一个线程中同时完成
status = indexDB_->Write(op, &IndexBatch);
status = kvDB_->Write(op, &KVBatch);
//3. 将meta数据清除
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
while(true) {
Request *ready = taskqueue_.front();
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(ready != &req && !ready->isPending() &&
!req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
ready->cond_.Signal();
}
if (ready == tail) break;
}
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
//这里用了万恶的goto,蛤蛤
goto Again;
// return status;
}
//这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
FieldArray FA = {{"",value.ToString()}};
return PutFields(options, key, FA);
// return kvDB_->Put(options, key, value);
}
// TODO:需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) {
//这里是为了const和slice-string的转换被迫搞得
std::string key_ = key.ToString();
FieldArray fields_ = fields;
FieldsReq req(&key_,&fields_,&mutex_);
Status status = HandleRequest(req);
return status;
// return kvDB_->PutFields(Options, key, fields);
}
// todo: 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
//
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req);
return status;
// return kvDB_->Delete(options, key);
}
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
//或许应该再做一个接口?或者基于现有的接口进行改造
return Status::OK();
}
//由于常规put将空串作为name,这里也需要适当修改
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
// return kvDB_->Get(options, key, value);
FieldArray fields;
Status s = GetFields(options, key, &fields);
if(!s.ok()) {
return s;
}
*value = fields[0].second;
return s;
}
Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) {
return kvDB_->GetFields(options, key, fields);
}
std::vector<std::string> FieldDB::FindKeysByField(Field &field) {
return kvDB_->FindKeysByField(field);
}
std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName (
const std::string &fieldName){
std::vector<std::pair<std::string, std::string>> result;
auto iter = kvDB_->NewIterator(ReadOptions());
std::string val;
for(iter->SeekToFirst();iter->Valid();iter->Next()) {
InternalFieldArray fields(iter->value());
val = fields.ValOfName(fieldName);
if(!val.empty()) {
result.push_back(std::make_pair(iter->key().ToString(), val));
}
}
return result;
}
Status FieldDB::CreateIndexOnField(const std::string& field_name) {
//taskQueue相关
//写锁 是不是只需要给putfields设置一把锁就行
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// Slice value = Slice();
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Put(indexKey, value);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
// index_[field_name].first = Exist;
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
HandleRequest(req);
//如果已经存在索引,那么可以直接返回
if(req.Existed) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
}
Status FieldDB::DeleteIndex(const std::string &field_name) {
//taskQueue相关
//写锁
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Delete(indexKey);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
// index_.erase(field_name);
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
HandleRequest(req);
//如果已经被删除或者不存在,那么可以直接返回
if(req.Deleted) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
return req.s;
}
std::vector<std::string> FieldDB::QueryByIndex(const Field &field, Status *s) {
if (index_.count(field.first) == 0 || index_[field.first].first != Exist){
*s = Status::NotFound(Slice());
return std::vector<std::string>();
}
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(Slice(), field.first, field.second));
Iterator *indexIterator = indexDB_->NewIterator(ReadOptions());
indexIterator->Seek(indexKey);
std::vector<std::string> result;
for (; indexIterator->Valid(); indexIterator->Next()) {
ParsedInternalIndexKey iterKey;
if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){
if (iterKey.name_ == field.first && iterKey.val_ == field.second){
result.push_back(iterKey.user_key_.ToString());
continue; //查到说明在范围里,否则break
}
}
break;
}
*s = Status::OK();
return result;
}
Iterator * FieldDB::NewIterator(const ReadOptions &options) {
return kvDB_->NewIterator(options);
}
// TODO:使用统一seq进行snapshot管理
const Snapshot * FieldDB::GetSnapshot() {
return kvDB_->GetSnapshot();
}
// TODO:同上
void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) {
kvDB_->ReleaseSnapshot(snapshot);
}
bool FieldDB::GetProperty(const Slice &property, std::string *value) {
return kvDB_->GetProperty(property, value) | indexDB_->GetProperty(property, value);
}
void FieldDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) {
uint64_t temp = 0;
kvDB_->GetApproximateSizes(range, n, sizes);
indexDB_->GetApproximateSizes(range, n, &temp);
*sizes += temp;
}
void FieldDB::CompactRange(const Slice *begin, const Slice *end) {
kvDB_->CompactRange(begin, end);
}
} // end of namespace