10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 

419 lignes
15 KiB

#include "fielddb/request.h"
#include <cassert>
#include <cstdint>
#include <deque>
#include <string>
#include <unordered_set>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/field_db.h"
#include "fielddb/meta.h"
#include "request.h"
namespace fielddb {
using namespace leveldb;
const char EMPTY[1] = {0};
//为虚函数提供最基本的实现
void Request::PendReq(Request *req) {
assert(0);
}
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
assert(0);
}
void Request::Prepare(FieldDB *DB) {
assert(0);
}
void Request::Finalize(FieldDB *DB) {
assert(0);
}
//为虚函数提供最基本的实现
bool Request::isPending() {
//pending中的请求的parent会指向所等待的请求(iCreate/iDelete)
return parent != this;
}
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(*Key);
}
std::string val_str;
Status s = Status::NotFound("test");
uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
FieldArray *oldFields;
if (s.IsNotFound()){
oldFields = nullptr;
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
oldFields = new FieldArray;
oldFields = ParseValue(val_str,oldFields);
} else {
assert(0);
}
bool HasIndex = false;
bool HasOldIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中
if (oldFields != nullptr){
for(auto [field_name,field_value] : *oldFields) {
if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasOldIndex = true;
}
//assert(0);
}
}
}
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
std::string MetaKey,MetaValue;
std::string serialized = SerializeValue(*Fields);
MetaKV MKV = MetaKV(Key,&serialized);
MKV.TransPut(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, serialized);
//3.1对于含有索引的oldfield删除索引
if (HasOldIndex) {
for(auto [field_name,field_value] : *oldFields) {
if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
//3.2对于含有索引的field建立索引
if (HasIndex) {
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
}
}
}
}
//优化:对于3.1,3.2中都有的索引只写一次
}
if(oldFields) delete oldFields;
}
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(*Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录
//在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
if (s.IsNotFound()) return;
FieldArray *Fields = new FieldArray;
ParseValue(val_str,Fields);
KVBatch.Delete(Slice(*Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的delete pend到对应的请求
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
KVBatch.Delete(Slice(*Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
std::string MetaKey;
MetaKV MKV = MetaKV(Key);
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
MetaBatch.Put(MetaKey, Slice());
//3.对于含有索引的field删除索引
for(auto [field_name,field_value] : *Fields) {
if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
}
delete Fields;
}
/*******iCreateReq*******/
void iCreateReq::Prepare(FieldDB *DB) {
//在index_中完成索引状态更新,在这里可以避免重复创建
DB->index_mu.AssertHeld();
if(DB->index_.count(*Field)) {
auto [istatus,parent] = DB->index_[*Field];
if(istatus == IndexStatus::Exist) {
//如果已经完成建立索引,则返回成功
done = true;
Existed = true;
s = Status::OK();
} else {
//如果正在创建或删除,那么进行等待
parent->PendReq(this->parent);
}
return;
}
//如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[*Field] = {IndexStatus::Creating,this};
done = true;
}
void iCreateReq::PendReq(Request *req) {
req->parent = this;
pending_list.push_back(req);
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
IndexBatch.Put(indexKey, value);
}
}
void iCreateReq::Finalize(FieldDB *DB) {
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing))
MutexLock iL(&DB->index_mu);
DB->index_[*Field] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
/*******iDeleteReq*******/
void iDeleteReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld();
if(DB->index_.count(*Field) == 0) {
done = true;
Deleted = true;
s = Status::OK();
return ;
}
auto [istatus,parent] = DB->index_[*Field];
if(istatus == IndexStatus::Exist) {
DB->index_[*Field] = {IndexStatus::Deleting,this};
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
parent->PendReq(this->parent);
}
}
void iDeleteReq::PendReq(Request* req) {
req->parent = this;
pending_list.push_back(req);
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
IndexBatch.Delete(indexKey);
}
}
void iDeleteReq::Finalize(FieldDB *DB) {
MutexLock iL(&DB->index_mu);
DB->index_.erase(*Field);
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{EMPTY,value.ToString()}});
// FieldArray *field = new FieldArray;
// field = ParseValue(value.ToString(), field);
// if (field->empty()){ //batch中的value没有field
// } else {
// fa_buf->push_back(*field);
// }
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
sub_requests->back()->parent = req;
// delete field;
}
void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
std::deque<std::string> *str_buf;
std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
Handler.str_buf = &str_buf;
Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
BatchReq::~BatchReq() {
while(!sub_requests.empty()) {
Request *req = sub_requests.front();
sub_requests.pop_front();
delete req;
}
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string> Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
uint64_t start_sub = DB->env_->NowMicros();
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
DB->count_Batch_Sub ++;
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
if(isPending()) {
return;
}
}
DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
if(Sub_KVBatch.ApproximateSize() > 12) {
KVBatch.Append(Sub_KVBatch);
}
if(Sub_IndexBatch.ApproximateSize() > 12) {
IndexBatch.Append(Sub_IndexBatch);
}
if(Sub_MetaBatch.ApproximateSize() > 12) {
MetaBatch.Append(Sub_MetaBatch);
}
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_;
}
} // namespace fielddb