10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
 
 

449 lines
17 KiB

#include "fielddb/request.h"
#include <cassert>
#include <cstdint>
#include <deque>
#include <string>
#include <unordered_set>
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
#include "port/port_stdcxx.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/field_db.h"
#include "fielddb/meta.h"
#include "request.h"
namespace fielddb {
using namespace leveldb;
//为虚函数提供最基本的实现
void Request::PendReq(Request *req) {
assert(0);
}
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
assert(0);
}
void Request::Prepare(FieldDB *DB) {
assert(0);
}
void Request::Finalize(FieldDB *DB) {
assert(0);
}
//为虚函数提供最基本的实现
bool Request::isPending() {
//pending中的请求的parent会指向所等待的请求(iCreate/iDelete)
return parent != this;
}
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
}
std::string val_str;
//uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
//DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
FieldSliceArray oldFields;
if (s.IsNotFound()){
// oldFields = nullptr;
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
oldFields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
assert(0);
}
nameSlice.clear(), valSlice.clear();
}
} else {
assert(0);
}
bool HasIndex = false;
bool HasOldIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中
if (!oldFields.empty()){
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasOldIndex = true;
}
//assert(0);
}
}
}
std::string scrach = SerializeValue(SliceFields);
KVBatch.Put(Slice(Key), Slice(scrach));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
std::string MetaKey,MetaValue;
std::string serialized = SerializeValue(SliceFields);
MetaKV MKV = MetaKV(Key,serialized);
MKV.TransPut(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, serialized);
//3.1对于含有索引的oldfield删除索引
if (HasOldIndex) {
for(auto &[field_name,field_value] : oldFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString()) && //旧数据有,新数据没有的字段,删索引
std::find(SliceFields.begin(), SliceFields.end(),
std::make_pair(field_name, field_value)) == SliceFields.end()) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
//3.2对于含有索引的field建立索引
if (HasIndex) {
for(auto &[field_name,field_value] : SliceFields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
}
}
}
}
}
}
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录
//在kvDB和indexDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
if (s.IsNotFound()) return;
// FieldArray *Fields = new FieldArray;
// ParseValue(val_str,Fields);
FieldSliceArray Fields;
Slice nameSlice, valSlice;
Slice Value(val_str);
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
Fields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
KVBatch.Delete(Slice(Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld();
//1.将存在冲突的delete pend到对应的请求
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) break;
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent);
return;
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
//assert(0);
}
}
KVBatch.Delete(Slice(Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
std::string MetaKey;
MetaKV MKV = MetaKV(Key);
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
MetaBatch.Put(MetaKey, Slice());
//3.对于含有索引的field删除索引
for(auto &[field_name,field_value] : Fields) {
if(field_name.data() == EMPTY) continue;
if(DB->index_.count(field_name.ToString())) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
Key,field_name,field_value));
IndexBatch.Delete(indexKey);
}
}
}
}
// delete Fields;
}
/*******iCreateReq*******/
void iCreateReq::Prepare(FieldDB *DB) {
//在index_中完成索引状态更新,在这里可以避免重复创建
DB->index_mu.AssertHeld();
if(DB->index_.count(Field.ToString())) {
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) {
//如果已经完成建立索引,则返回成功
done = true;
Existed = true;
s = Status::OK();
} else {
//如果正在创建或删除,那么进行等待
parent->PendReq(this->parent);
}
return;
}
//如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[Field.ToString()] = {IndexStatus::Creating,this};
done = true;
}
void iCreateReq::PendReq(Request *req) {
req->parent = this;
pending_list.push_back(req);
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
SliceHashSet &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field.ToString());
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Put(indexKey, value);
}
}
void iCreateReq::Finalize(FieldDB *DB) {
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing))
MutexLock iL(&DB->index_mu);
DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
/*******iDeleteReq*******/
void iDeleteReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld();
if(DB->index_.count(Field.ToString()) == 0) {
done = true;
Deleted = true;
s = Status::OK();
return ;
}
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) {
DB->index_[Field.ToString()] = {IndexStatus::Deleting,this};
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
parent->PendReq(this->parent);
}
}
void iDeleteReq::PendReq(Request* req) {
req->parent = this;
pending_list.push_back(req);
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Delete(indexKey);
}
}
void iDeleteReq::Finalize(FieldDB *DB) {
MutexLock iL(&DB->index_mu);
DB->index_.erase(Field.ToString());
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
Batch(Batch),Request(BatchReq_t, mu) {
struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
// str_buf->push_back(key.ToString());
// FieldArray *field = new FieldArray;
// field = ParseValue(value.ToString(), field);
// if (field->empty()){ //batch中的value没有field
// fa_buf->push_back({{EMPTY,value.ToString()}});
// } else {
// fa_buf->push_back(*field);
// }
//默认所有WriteBatch中的东西都是有Field的!!!!!
sub_requests->emplace_back(new FieldsReq(key,value,mu));
sub_requests->back()->parent = req;
// delete field;
}
void Delete(const Slice &key) override {
// str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(key,mu));
sub_requests->back()->parent = req;
}
BatchReq *req;
port::Mutex *mu;
// std::deque<std::string> *str_buf;
// std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests;
};
BatchHandler Handler;
Handler.req = this;
Handler.mu = mu;
// Handler.str_buf = &str_buf;
// Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler);
}
BatchReq::~BatchReq() {
while(!sub_requests.empty()) {
Request *req = sub_requests.front();
sub_requests.pop_front();
delete req;
}
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
SliceHashSet Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
//uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
//uint64_t start_sub = DB->env_->NowMicros();
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
// (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet);
//DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
//DB->count_Batch_Sub ++;
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
if(isPending()) {
return;
}
}
//DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
if(Sub_KVBatch.ApproximateSize() > 12) {
KVBatch.Append(Sub_KVBatch);
}
if(Sub_IndexBatch.ApproximateSize() > 12) {
IndexBatch.Append(Sub_IndexBatch);
}
if(Sub_MetaBatch.ApproximateSize() > 12) {
MetaBatch.Append(Sub_MetaBatch);
}
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
//DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_;
}
void SnapshotReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld();
for(auto [_,pair] : DB->index_) {
auto [status,req] = pair;
if(status == Creating || status == Deleting) {
req->PendReq(this);
return;
}
}
xSnapshot = new XSnapshot(DB->kvDB_->GetSnapshot(),DB->indexDB_->GetSnapshot());
done = true;
}
} // namespace fielddb