Переглянути джерело

Merge branch 'cyq' into ld

pull/2/head
augurier 8 місяці тому
джерело
коміт
d31a28257d
8 змінених файлів з 122 додано та 87 видалено
  1. +15
    -18
      fielddb/field_db.cpp
  2. +3
    -1
      fielddb/field_db.h
  3. +2
    -2
      fielddb/meta.cpp
  4. +3
    -3
      fielddb/meta.h
  5. +42
    -44
      fielddb/request.cpp
  6. +31
    -12
      fielddb/request.h
  7. +20
    -4
      util/serialize_value.cc
  8. +6
    -3
      util/serialize_value.h

+ 15
- 18
fielddb/field_db.cpp Переглянути файл

@ -2,13 +2,10 @@
#include <climits> #include <climits>
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <iostream>
#include <string> #include <string>
#include <sys/types.h> #include <sys/types.h>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "leveldb/c.h"
#include "leveldb/cache.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
@ -16,7 +13,6 @@
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "db/write_batch_internal.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
@ -226,7 +222,7 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
// 这里把一个空串作为常规put的name // 这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
FieldArray FA = {{"",value.ToString()}};
FieldArray FA = {{EMPTY,value.ToString()}};
return PutFields(options, key, FA); return PutFields(options, key, FA);
// return kvDB_->Put(options, key, value); // return kvDB_->Put(options, key, value);
} }
@ -234,10 +230,10 @@ Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &
// 需要对是否进行index更新做处理 // 需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options, Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) { const Slice &key, const FieldArray &fields) {
std::string key_ = key.ToString();
FieldArray fields_ = fields;
// std::string key_ = key.ToString();
// FieldArray fields_ = fields;
FieldsReq req(&key_,&fields_,&mutex_);
FieldsReq req(key,fields,&mutex_);
Status status = HandleRequest(req, Options); Status status = HandleRequest(req, Options);
return status; return status;
@ -246,8 +242,8 @@ Status FieldDB::PutFields(const WriteOptions &Options,
// 删除有索引的key时indexdb也要同步 // 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
// std::string key_ = key.ToString();
DeleteReq req(key,&mutex_);
Status status = HandleRequest(req, options); Status status = HandleRequest(req, options);
return status; return status;
} }
@ -289,15 +285,15 @@ std::vector FieldDB::FindKeysByField(Field &field) {
} }
std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName ( std::vector<std::pair<std::string, std::string>> FieldDB::FindKeysAndValByFieldName (
const std::string &fieldName){
const Slice fieldName){
std::vector<std::pair<std::string, std::string>> result; std::vector<std::pair<std::string, std::string>> result;
auto iter = kvDB_->NewIterator(ReadOptions()); auto iter = kvDB_->NewIterator(ReadOptions());
std::string val;
Slice val;
for(iter->SeekToFirst();iter->Valid();iter->Next()) { for(iter->SeekToFirst();iter->Valid();iter->Next()) {
InternalFieldArray fields(iter->value()); InternalFieldArray fields(iter->value());
val = fields.ValOfName(fieldName);
val = fields.ValOfName(fieldName.ToString());
if(!val.empty()) { if(!val.empty()) {
result.push_back(std::make_pair(iter->key().ToString(), val));
result.push_back(std::make_pair(iter->key().ToString(), val.ToString()));
} }
} }
delete iter; delete iter;
@ -305,8 +301,9 @@ std::vector> FieldDB::FindKeysAndValByFieldN
} }
Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) { Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) {
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
// std::string Field = field_name;
// iCreateReq req(&Field,&mutex_);
iCreateReq req(field_name,&mutex_);
HandleRequest(req, op); HandleRequest(req, op);
//如果已经存在索引,那么直接返回 //如果已经存在索引,那么直接返回
if(req.Existed) { if(req.Existed) {
@ -321,8 +318,8 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOpt
} }
Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) { Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) {
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
// std::string Field = field_name;
iDeleteReq req(field_name,&mutex_);
HandleRequest(req, op); HandleRequest(req, op);
//如果已经被删除或者不存在,那么可以直接返回 //如果已经被删除或者不存在,那么可以直接返回
if(req.Deleted) { if(req.Deleted) {

+ 3
- 1
fielddb/field_db.h Переглянути файл

@ -18,6 +18,8 @@
namespace fielddb { namespace fielddb {
using namespace leveldb; using namespace leveldb;
const char EMPTY[1] = {0};
enum IndexStatus{ enum IndexStatus{
Creating, Creating,
Deleting, Deleting,
@ -83,7 +85,7 @@ private:
std::deque<Request *> taskqueue_; std::deque<Request *> taskqueue_;
std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName ( std::vector<std::pair<std::string, std::string>> FindKeysAndValByFieldName (
const std::string &fieldName);
const Slice fieldName);
/*For request handling*/ /*For request handling*/
Status HandleRequest(Request &req, const WriteOptions &op); // Status HandleRequest(Request &req, const WriteOptions &op); //

+ 2
- 2
fielddb/meta.cpp Переглянути файл

@ -29,7 +29,7 @@ void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) {
//但是slice中的指针指向的是析构的string对象的部分内存 //但是slice中的指针指向的是析构的string对象的部分内存
std::string &buf = MetaKey; std::string &buf = MetaKey;
PutFixed32(&buf, KV_Creating); PutFixed32(&buf, KV_Creating);
PutLengthPrefixedSlice(&buf, Slice(*name));
PutLengthPrefixedSlice(&buf, Slice(name));
// MetaKey = Slice(buf); // MetaKey = Slice(buf);
// MetaValue = Slice(*value); // MetaValue = Slice(*value);
} }
@ -38,7 +38,7 @@ void MetaKV::TransDelete(std::string &MetaKey) {
MetaKey.clear(); MetaKey.clear();
std::string &buf = MetaKey; std::string &buf = MetaKey;
PutFixed32(&buf, KV_Deleting); PutFixed32(&buf, KV_Deleting);
PutLengthPrefixedSlice(&buf, Slice(*name));
PutLengthPrefixedSlice(&buf, Slice(name));
// MetaKey = Slice(buf); // MetaKey = Slice(buf);
} }

+ 3
- 3
fielddb/meta.h Переглянути файл

@ -35,13 +35,13 @@ enum MetaType {
//(field_name,field_value)metaDB中的KV表示 //(field_name,field_value)metaDB中的KV表示
class MetaKV { class MetaKV {
public: public:
MetaKV(std::string *field_name,std::string *field_value = nullptr):
MetaKV(Slice field_name,Slice field_value = Slice()):
name(field_name),value(field_value) { } name(field_name),value(field_value) { }
void TransPut(std::string &MetaKey,std::string &MetaValue); void TransPut(std::string &MetaKey,std::string &MetaValue);
void TransDelete(std::string &MetaKey); void TransDelete(std::string &MetaKey);
private: private:
std::string *name;
std::string *value;
Slice name;
Slice value;
}; };
class MetaCleaner { class MetaCleaner {

+ 42
- 44
fielddb/request.cpp Переглянути файл

@ -17,8 +17,6 @@
namespace fielddb { namespace fielddb {
using namespace leveldb; using namespace leveldb;
const char EMPTY[1] = {0};
//为虚函数提供最基本的实现 //为虚函数提供最基本的实现
void Request::PendReq(Request *req) { void Request::PendReq(Request *req) {
assert(0); assert(0);
@ -52,15 +50,15 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB, WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet) std::unordered_set<std::string> &batchKeySet)
{ {
if (batchKeySet.find(*Key) != batchKeySet.end()){
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次 return;//并发的被合并的put/delete请求只处理一次
} else { } else {
batchKeySet.insert(*Key);
batchKeySet.insert(Key.ToString());
} }
std::string val_str; std::string val_str;
Status s = Status::NotFound("test"); Status s = Status::NotFound("test");
uint64_t start_ = DB->env_->NowMicros(); uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str);
DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
FieldArray *oldFields; FieldArray *oldFields;
if (s.IsNotFound()){ if (s.IsNotFound()){
@ -78,10 +76,10 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
// MutexLock L(&DB->index_mu); //互斥访问索引状态表 // MutexLock L(&DB->index_mu); //互斥访问索引状态表
DB->index_mu.AssertHeld(); DB->index_mu.AssertHeld();
//1.将存在冲突的put pend到对应的请求 //1.将存在冲突的put pend到对应的请求
for(auto [field_name,field_value] : *Fields) {
for(auto [field_name,field_value] : SliceFields) {
if(field_name == EMPTY) break; if(field_name == EMPTY) break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(DB->index_.count(field_name.ToString())) {
auto [index_status,parent_req] = DB->index_[field_name.ToString()];
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this->parent); parent_req->PendReq(this->parent);
return; return;
@ -107,13 +105,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
} }
} }
} }
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
std::string scrach = SerializeValue(SliceFields);
KVBatch.Put(Slice(Key), Slice(scrach));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) { if(HasIndex || HasOldIndex) {
std::string MetaKey,MetaValue; std::string MetaKey,MetaValue;
std::string serialized = SerializeValue(*Fields);
MetaKV MKV = MetaKV(Key,&serialized);
std::string serialized = SerializeValue(SliceFields);
MetaKV MKV = MetaKV(Key,serialized);
MKV.TransPut(MetaKey, MetaValue); MKV.TransPut(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, serialized); MetaBatch.Put(MetaKey, serialized);
@ -125,7 +123,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) { if(DB->index_.count(field_name)) {
std::string indexKey; std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey( AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
Key,field_name,field_value));
IndexBatch.Delete(indexKey); IndexBatch.Delete(indexKey);
} }
} }
@ -133,12 +131,12 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//3.2对于含有索引的field建立索引 //3.2对于含有索引的field建立索引
if (HasIndex) { if (HasIndex) {
for(auto [field_name,field_value] : *Fields) {
for(auto [field_name,field_value] : SliceFields) {
if(field_name == EMPTY) continue; if(field_name == EMPTY) continue;
if(DB->index_.count(field_name)) {
if(DB->index_.count(field_name.ToString())) {
std::string indexKey; std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey( AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice()); IndexBatch.Put(indexKey, Slice());
} }
} }
@ -157,10 +155,10 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB, WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet) std::unordered_set<std::string> &batchKeySet)
{ {
if (batchKeySet.find(*Key) != batchKeySet.end()){
if (batchKeySet.find(Key.ToString()) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次 return;//并发的被合并的put/delete请求只处理一次
} else { } else {
batchKeySet.insert(*Key);
batchKeySet.insert(Key.ToString());
} }
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field //1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete //2.1 如果无,则正常构造delete
@ -168,11 +166,11 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//在kvDB和metaDB中写入对应的delete //在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str; std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
if (s.IsNotFound()) return; if (s.IsNotFound()) return;
FieldArray *Fields = new FieldArray; FieldArray *Fields = new FieldArray;
ParseValue(val_str,Fields); ParseValue(val_str,Fields);
KVBatch.Delete(Slice(*Key));
KVBatch.Delete(Slice(Key));
bool HasIndex = false; bool HasIndex = false;
{ {
// MutexLock L(&DB->index_mu); //互斥访问索引状态表 // MutexLock L(&DB->index_mu); //互斥访问索引状态表
@ -191,7 +189,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//assert(0); //assert(0);
} }
} }
KVBatch.Delete(Slice(*Key));
KVBatch.Delete(Slice(Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) { if(HasIndex) {
std::string MetaKey; std::string MetaKey;
@ -204,7 +202,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(DB->index_.count(field_name)) { if(DB->index_.count(field_name)) {
std::string indexKey; std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey( AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
Key,field_name,field_value));
IndexBatch.Delete(indexKey); IndexBatch.Delete(indexKey);
} }
} }
@ -217,8 +215,8 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
void iCreateReq::Prepare(FieldDB *DB) { void iCreateReq::Prepare(FieldDB *DB) {
//在index_中完成索引状态更新,在这里可以避免重复创建 //在index_中完成索引状态更新,在这里可以避免重复创建
DB->index_mu.AssertHeld(); DB->index_mu.AssertHeld();
if(DB->index_.count(*Field)) {
auto [istatus,parent] = DB->index_[*Field];
if(DB->index_.count(Field.ToString())) {
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) { if(istatus == IndexStatus::Exist) {
//如果已经完成建立索引,则返回成功 //如果已经完成建立索引,则返回成功
done = true; done = true;
@ -232,7 +230,7 @@ void iCreateReq::Prepare(FieldDB *DB) {
} }
//如果索引状态表中没有,则表示尚未创建,更新相应的状态 //如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend //这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[*Field] = {IndexStatus::Creating,this};
DB->index_[Field.ToString()] = {IndexStatus::Creating,this};
done = true; done = true;
} }
@ -248,12 +246,12 @@ void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
std::vector<std::pair<std::string, std::string>> keysAndVal = std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
DB->FindKeysAndValByFieldName(Field.ToString());
Slice value = Slice(); Slice value = Slice();
for (auto &kvPair : keysAndVal){ for (auto &kvPair : keysAndVal){
std::string indexKey; std::string indexKey;
AppendIndexKey(&indexKey, AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Put(indexKey, value); IndexBatch.Put(indexKey, value);
} }
} }
@ -261,7 +259,7 @@ void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
void iCreateReq::Finalize(FieldDB *DB) { void iCreateReq::Finalize(FieldDB *DB) {
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing)) //1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing))
MutexLock iL(&DB->index_mu); MutexLock iL(&DB->index_mu);
DB->index_[*Field] = {IndexStatus::Exist, nullptr};
DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock(); DB->index_mu.Unlock();
if (pending_list.empty()) return; if (pending_list.empty()) return;
@ -280,15 +278,15 @@ void iCreateReq::Finalize(FieldDB *DB) {
/*******iDeleteReq*******/ /*******iDeleteReq*******/
void iDeleteReq::Prepare(FieldDB *DB) { void iDeleteReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld(); DB->index_mu.AssertHeld();
if(DB->index_.count(*Field) == 0) {
if(DB->index_.count(Field.ToString()) == 0) {
done = true; done = true;
Deleted = true; Deleted = true;
s = Status::OK(); s = Status::OK();
return ; return ;
} }
auto [istatus,parent] = DB->index_[*Field];
auto [istatus,parent] = DB->index_[Field.ToString()];
if(istatus == IndexStatus::Exist) { if(istatus == IndexStatus::Exist) {
DB->index_[*Field] = {IndexStatus::Deleting,this};
DB->index_[Field.ToString()] = {IndexStatus::Deleting,this};
done = true; done = true;
} else { } else {
//如果正在创建或者删除,那么pend到对应的请求上 //如果正在创建或者删除,那么pend到对应的请求上
@ -305,19 +303,19 @@ void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{ {
std::vector<std::pair<std::string, std::string>> keysAndVal = std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
DB->FindKeysAndValByFieldName(Field);
Slice value = Slice(); Slice value = Slice();
for (auto &kvPair : keysAndVal){ for (auto &kvPair : keysAndVal){
std::string indexKey; std::string indexKey;
AppendIndexKey(&indexKey, AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second));
IndexBatch.Delete(indexKey); IndexBatch.Delete(indexKey);
} }
} }
void iDeleteReq::Finalize(FieldDB *DB) { void iDeleteReq::Finalize(FieldDB *DB) {
MutexLock iL(&DB->index_mu); MutexLock iL(&DB->index_mu);
DB->index_.erase(*Field);
DB->index_.erase(Field.ToString());
DB->index_mu.Unlock(); DB->index_mu.Unlock();
if (pending_list.empty()) return; if (pending_list.empty()) return;
@ -339,37 +337,37 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
struct BatchHandler : WriteBatch::Handler { struct BatchHandler : WriteBatch::Handler {
void Put(const Slice &key, const Slice &value) override { void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{EMPTY,value.ToString()}});
// str_buf->push_back(key.ToString());
// FieldArray *field = new FieldArray; // FieldArray *field = new FieldArray;
// field = ParseValue(value.ToString(), field); // field = ParseValue(value.ToString(), field);
// if (field->empty()){ //batch中的value没有field // if (field->empty()){ //batch中的value没有field
// fa_buf->push_back({{EMPTY,value.ToString()}});
// } else { // } else {
// fa_buf->push_back(*field); // fa_buf->push_back(*field);
// } // }
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
//默认所有WriteBatch中的东西都是有Field的!!!!!
sub_requests->emplace_back(new FieldsReq(key,value,mu));
sub_requests->back()->parent = req; sub_requests->back()->parent = req;
// delete field; // delete field;
} }
void Delete(const Slice &key) override { void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(&str_buf->back(),mu));
// str_buf->push_back(key.ToString());
sub_requests->emplace_back(new DeleteReq(key,mu));
sub_requests->back()->parent = req; sub_requests->back()->parent = req;
} }
BatchReq *req; BatchReq *req;
port::Mutex *mu; port::Mutex *mu;
std::deque<std::string> *str_buf;
std::deque<FieldArray> *fa_buf;
// std::deque<std::string> *str_buf;
// std::deque<FieldArray> *fa_buf;
std::deque<Request*> *sub_requests; std::deque<Request*> *sub_requests;
}; };
BatchHandler Handler; BatchHandler Handler;
Handler.req = this; Handler.req = this;
Handler.mu = mu; Handler.mu = mu;
Handler.str_buf = &str_buf;
Handler.fa_buf = &fa_buf;
// Handler.str_buf = &str_buf;
// Handler.fa_buf = &fa_buf;
Handler.sub_requests = &sub_requests; Handler.sub_requests = &sub_requests;
Batch->Iterate(&Handler); Batch->Iterate(&Handler);

+ 31
- 12
fielddb/request.h Переглянути файл

@ -1,8 +1,10 @@
#include <deque> #include <deque>
#include <string> #include <string>
#include "leveldb/slice.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "port/port_stdcxx.h" #include "port/port_stdcxx.h"
#include "util/coding.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/serialize_value.h" #include "util/serialize_value.h"
#include <unordered_set> #include <unordered_set>
@ -64,14 +66,31 @@ public:
//field的put //field的put
class FieldsReq : public Request { class FieldsReq : public Request {
public: public:
FieldsReq(std::string *Key,FieldArray *Fields,port::Mutex *mu):
Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { };
FieldsReq(Slice Key,const FieldArray &Fields,port::Mutex *mu):
Key(Key),Request(FieldsReq_t,mu) {
for(auto &[name,value] : Fields) {
SliceFields.push_back({name,value});
}
};
FieldsReq(Slice Key, Slice Value,port::Mutex *mu):
Key(Key),Request(FieldsReq_t,mu) {
Slice nameSlice, valSlice;
while(GetLengthPrefixedSlice(&Value, &nameSlice)) {
if(GetLengthPrefixedSlice(&Value, &valSlice)) {
SliceFields.push_back({nameSlice,valSlice});
} else {
std::cout << "name and val not match! From FieldsReq Init" << std::endl;
}
nameSlice.clear(), valSlice.clear();
}
}
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override; WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
FieldArray *Fields;
Slice Key;
FieldSliceArray SliceFields;
}; };
//field的put //field的put
@ -89,7 +108,7 @@ public:
//request //request
class iCreateReq : public Request { class iCreateReq : public Request {
public: public:
iCreateReq(std::string *Field,port::Mutex *mu):
iCreateReq(Slice Field,port::Mutex *mu):
Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
@ -99,14 +118,14 @@ public:
void PendReq(Request *req) override; void PendReq(Request *req) override;
bool Existed; bool Existed;
std::string *Field;
Slice Field;
std::deque<Request *> pending_list; std::deque<Request *> pending_list;
}; };
//request //request
class iDeleteReq : public Request { class iDeleteReq : public Request {
public: public:
iDeleteReq(std::string *Field,port::Mutex *mu):
iDeleteReq(Slice Field,port::Mutex *mu):
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
@ -116,20 +135,20 @@ public:
void PendReq(Request *req) override; void PendReq(Request *req) override;
bool Deleted; bool Deleted;
std::string *Field;
Slice Field;
std::deque<Request *> pending_list; std::deque<Request *> pending_list;
}; };
//key的request //key的request
class DeleteReq : public Request { class DeleteReq : public Request {
public: public:
DeleteReq(std::string *Key,port::Mutex *mu):
DeleteReq(Slice Key,port::Mutex *mu):
Key(Key),Request(DeleteReq_t,mu) { }; Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override; WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
Slice Key;
}; };
class BatchReq : public Request { class BatchReq : public Request {
@ -142,8 +161,8 @@ public:
WriteBatch *Batch; WriteBatch *Batch;
std::deque<Request *> sub_requests; std::deque<Request *> sub_requests;
std::deque<std::string> str_buf;
std::deque<FieldArray> fa_buf;
// std::deque<std::string> str_buf;
// std::deque<FieldArray> fa_buf;
}; };
} }

+ 20
- 4
util/serialize_value.cc Переглянути файл

@ -3,6 +3,7 @@
#include <string> #include <string>
#include "util/coding.h" #include "util/coding.h"
#include <iostream> #include <iostream>
#include "leveldb/slice.h"
namespace leveldb{ namespace leveldb{
bool compareByFirst(const Field& a, const Field& b) { bool compareByFirst(const Field& a, const Field& b) {
@ -20,6 +21,21 @@ std::string SerializeValue(const FieldArray& fields){
return result; return result;
} }
std::string SerializeValue(const FieldSliceArray& fields) {
using pss = std::pair<Slice,Slice>;
FieldSliceArray sortFields = fields;
std::sort(sortFields.begin(), sortFields.end(),
[&](const pss &lhs, const pss &rhs) {
return lhs.first.compare(rhs.first);
});
std::string result;
for (const pss& pairs : sortFields) {
PutLengthPrefixedSlice(&result, pairs.first);
PutLengthPrefixedSlice(&result, pairs.second);
}
return result;
}
FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){ FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){
Slice valueSlice(value_str); Slice valueSlice(value_str);
// FieldArray *res = new FieldArray; // FieldArray *res = new FieldArray;
@ -75,22 +91,22 @@ bool InternalFieldArray::HasField(const Field& field) {
return std::find(fields.begin(),fields.end(),field) != fields.end(); return std::find(fields.begin(),fields.end(),field) != fields.end();
} }
std::string InternalFieldArray::ValOfName(const std::string &name) {
Slice InternalFieldArray::ValOfName(const std::string &name) {
if(isMapped) { if(isMapped) {
if(map.count(name)) { if(map.count(name)) {
return map[name]; return map[name];
} }
return std::string();
return Slice();
} }
for (auto iter = fields.begin(); iter != fields.end(); iter++){ for (auto iter = fields.begin(); iter != fields.end(); iter++){
if (iter->first == name) { if (iter->first == name) {
return iter->second; return iter->second;
} else if (iter->first > name) { } else if (iter->first > name) {
return std::string();
return Slice();
} }
} }
return std::string();
return Slice();
} }
} }

+ 6
- 3
util/serialize_value.h Переглянути файл

@ -3,6 +3,7 @@
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
#include <map> #include <map>
#include "leveldb/slice.h" #include "leveldb/slice.h"
@ -10,13 +11,15 @@
namespace leveldb{ namespace leveldb{
using Field = std::pair<std::string, std::string>; // field_name:field_value using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>; using FieldArray = std::vector<std::pair<std::string, std::string>>;
using FieldSliceArray = std::vector<std::pair<Slice, Slice>>;
std::string SerializeValue(const FieldArray& fields); std::string SerializeValue(const FieldArray& fields);
std::string SerializeValue(const FieldSliceArray& fields);
FieldArray *ParseValue(const std::string& value_str, FieldArray *fields); FieldArray *ParseValue(const std::string& value_str, FieldArray *fields);
class InternalFieldArray { class InternalFieldArray {
public: public:
using FieldMap = std::map<std::string,std::string>;
using FieldMap = std::map<std::string,Slice>;
InternalFieldArray(const FieldArray &fields, bool to_map = false): InternalFieldArray(const FieldArray &fields, bool to_map = false):
fields(fields),isMapped(false) { fields(fields),isMapped(false) {
@ -29,7 +32,7 @@ public:
Slice nameSlice, valSlice; Slice nameSlice, valSlice;
while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) { while(GetLengthPrefixedSlice(&valueSlice, &nameSlice)) {
if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) { if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) {
map[nameSlice.ToString()] = valSlice.ToString();
map[nameSlice.ToString()] = valSlice;
} else { } else {
std::cout << "name and val not match! From InternalFieldArray" << std::endl; std::cout << "name and val not match! From InternalFieldArray" << std::endl;
} }
@ -48,7 +51,7 @@ public:
std::string Serialize(); std::string Serialize();
bool HasField(const Field& field); bool HasField(const Field& field);
std::string ValOfName(const std::string& name);
Slice ValOfName(const std::string& name);
private: private:
bool isMapped; bool isMapped;

Завантаження…
Відмінити
Зберегти