Browse Source

实现字段功能

ckx
jjhengxin 9 months ago
parent
commit
a62b6d2842
11 changed files with 249 additions and 5 deletions
  1. +6
    -1
      CMakeLists.txt
  2. +16
    -0
      db/db_impl.cc
  3. +5
    -0
      db/db_impl.h
  4. +4
    -0
      db/db_test.cc
  5. +31
    -0
      db/write_batch.cc
  6. +8
    -0
      include/leveldb/db.h
  7. +3
    -0
      include/leveldb/write_batch.h
  8. +122
    -0
      test/field_test.cc
  9. +3
    -3
      test/ttl_test.cc
  10. +43
    -0
      util/coding.cc
  11. +8
    -1
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -528,4 +528,9 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc" "${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
) )
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(field_test
"${PROJECT_SOURCE_DIR}/test/field_test.cc"
)
target_link_libraries(field_test PRIVATE leveldb gtest)

+ 16
- 0
db/db_impl.cc View File

@ -11,6 +11,7 @@
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include<iostream>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
@ -1193,11 +1194,16 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot)); snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
} }
// Convenience methods // Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val); return DB::Put(o, key, val);
} }
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val, uint64_t ttl) {
return DB::Put(o, key, val, ttl);
} // 实现新的put接口,心
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
@ -1491,12 +1497,22 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
return Write(opt, &batch); return Write(opt, &batch);
} }
// 假设增加一个新的Put接口,包含TTL参数, 单位(秒)
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl){
WriteBatch batch;
batch.Put(key, value, ttl);
return Write(opt, &batch);
} // 这里应该是新的PUT接口的真正实现的地方,还是由本来的DB类实现,怪?心
Status DB::Delete(const WriteOptions& opt, const Slice& key) { Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch; WriteBatch batch;
batch.Delete(key); batch.Delete(key);
return Write(opt, &batch); return Write(opt, &batch);
} }
DB::~DB() = default; DB::~DB() = default;
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {

+ 5
- 0
db/db_impl.h View File

@ -38,6 +38,8 @@ class DBImpl : public DB {
// Implementations of the DB interface // Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status Put(const WriteOptions&, const Slice& key,
const Slice& value, uint64_t ttl) override; //put接口
Status Delete(const WriteOptions&, const Slice& key) override; Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key, Status Get(const ReadOptions& options, const Slice& key,
@ -50,6 +52,7 @@ class DBImpl : public DB {
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end] // Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end); void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
@ -212,6 +215,8 @@ Options SanitizeOptions(const std::string& db,
const InternalFilterPolicy* ipolicy, const InternalFilterPolicy* ipolicy,
const Options& src); const Options& src);
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_DB_DB_IMPL_H_ #endif // STORAGE_LEVELDB_DB_DB_IMPL_H_

+ 4
- 0
db/db_test.cc View File

@ -2117,6 +2117,10 @@ class ModelDB : public DB {
Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override { Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override {
return DB::Put(o, k, v); return DB::Put(o, k, v);
} }
Status Put(const WriteOptions& o, const Slice& k,
const Slice& v, uint64_t ttl) override {
return DB::Put(o, k, v);
}// 实现的是DB里的新put接口,心
Status Delete(const WriteOptions& o, const Slice& key) override { Status Delete(const WriteOptions& o, const Slice& key) override {
return DB::Delete(o, key); return DB::Delete(o, key);
} }

+ 31
- 0
db/write_batch.cc View File

@ -21,6 +21,9 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include "util/coding.h" #include "util/coding.h"
#include <sstream> // For std::ostringstream 心
#include <cstdint>
namespace leveldb { namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
@ -102,6 +105,34 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSlice(&rep_, value);
} }
void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
// 获取当前时间
auto now = std::chrono::system_clock::now();
// 加上ttl
auto future_time = now + std::chrono::seconds(ttl);
// 转换为 time_t
std::time_t future_time_t = std::chrono::system_clock::to_time_t(future_time);
// 将 time_t 转换为 tm 结构
std::tm* local_tm = std::localtime(&future_time_t);
// 格式化为字符串
char buffer[20]; // 格式化字符串的缓冲区
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", local_tm);
std::string future_time_str(buffer);
// 拼接原本的值和时间字符串
std::string combined_str = value.ToString() + future_time_str;
PutLengthPrefixedSlice(&rep_, Slice(combined_str));
} // 心
void WriteBatch::Delete(const Slice& key) { void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion)); rep_.push_back(static_cast<char>(kTypeDeletion));

+ 8
- 0
include/leveldb/db.h View File

@ -11,8 +11,16 @@
#include "leveldb/export.h" #include "leveldb/export.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "util/coding.h"
#include <vector>
namespace leveldb { namespace leveldb {
//
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
// Update CMakeLists.txt if you change these // Update CMakeLists.txt if you change these
static const int kMajorVersion = 1; static const int kMajorVersion = 1;

+ 3
- 0
include/leveldb/write_batch.h View File

@ -25,6 +25,7 @@
#include "leveldb/export.h" #include "leveldb/export.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include <cstdint>
namespace leveldb { namespace leveldb {
@ -50,6 +51,8 @@ class LEVELDB_EXPORT WriteBatch {
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value); void Put(const Slice& key, const Slice& value);
void Put(const Slice& key, const Slice& value, uint64_t ttl); //
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(const Slice& key); void Delete(const Slice& key);

+ 122
- 0
test/field_test.cc View File

@ -0,0 +1,122 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "util/coding.h"
#include <iostream>
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
// 根据字段值查找所有包含该字段的 key
std::vector<std::string> FindKeysByField(leveldb::DB* db, Field &field) {
Iterator* iter = db->NewIterator(ReadOptions());
std::vector<std::string> ret_keys;
int64_t bytes = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
auto fields_ret = ParseValue(iter->value().data());
for (Field each_field : fields_ret)
{
std::cout << each_field.first << " " << each_field.second << std::endl;
if (field.first.compare(each_field.first) == 0) {
if (field.second.compare(each_field.second)==0)
{
ret_keys.push_back(iter->key().data());
}
else
break;
}
}
}
delete iter;
return ret_keys;
}
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
TEST(TestField, Read) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key = "k_1";
FieldArray fields = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
// 序列化并插入
std::string value = SerializeValue(fields);
db->Put(WriteOptions(), key, value);
// 读取并反序列化
std::string value_ret;
db->Get(ReadOptions(), key, &value_ret);
auto fields_ret = ParseValue(value_ret);
std::cout << "第一个字段名:"<< fields_ret[0].first << "第一个字段值" << fields_ret[0].second<< std::endl;
delete db;
}
TEST(TestField, Find) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> keys = {"s_1", "s_2", "s_3", "s_4"};
// 构造一组字段数组
std::vector<FieldArray> FieldArrays = {
{
{"name", "Sarah"},{"sex", "f"},{"age", "20"}
},
{
{"name", "Mike"},{"sex", "m"},{"age", "19"},{"hobby", "badminton"}
},
{
{"name", "Amy"},{"sex", "f"},{"age", "21"},{"talent", "sing"}
},
{
{"name", "John"}, {"sex", "m"},{"age", "20"}
}
};
// 序列化并插入
for (int i=0; i<FieldArrays.size(); i++)
{
std::string key = keys[i];
FieldArray fields = FieldArrays[i];
std::string value = SerializeValue(fields);
db->Put(WriteOptions(), key, value);
}
// 构建目标字段
Field field = {"sex", "f"};
std::vector<std::string> key_ret;
// 查询得到对应的key
key_ret = FindKeysByField(db, field);
for (int i = 0; i < key_ret.size(); i++) {
std::cout << "找到的键:" << key_ret[i] << std::endl;
}
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 3
- 3
test/ttl_test.cc View File

@ -74,7 +74,7 @@ TEST(TestTTL, ReadTTL) {
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
std::string value; std::string value;
status = db->Get(readOptions, key, &value); status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok());
ASSERT_FALSE(status.ok()); // 经过超长时间之后所有的键值对应该都过期了,心
} }
} }
@ -99,9 +99,9 @@ TEST(TestTTL, CompactionTTL) {
db->CompactRange(nullptr, nullptr); db->CompactRange(nullptr, nullptr);
leveldb::Range ranges[1];
// leveldb::Range ranges[1]; // 这里为什么要重复定义?心
ranges[0] = leveldb::Range("-", "A"); ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
// uint64_t sizes[1]; // 心
db->GetApproximateSizes(ranges, 1, sizes); db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0); ASSERT_EQ(sizes[0], 0);
} }

+ 43
- 0
util/coding.cc View File

@ -3,6 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/coding.h" #include "util/coding.h"
#include "leveldb/db.h"
#include <iostream>
#include<vector>
namespace leveldb { namespace leveldb {
@ -49,7 +53,9 @@ char* EncodeVarint32(char* dst, uint32_t v) {
void PutVarint32(std::string* dst, uint32_t v) { void PutVarint32(std::string* dst, uint32_t v) {
char buf[5]; char buf[5];
char* ptr = EncodeVarint32(buf, v); char* ptr = EncodeVarint32(buf, v);
// printf("加入的长度%d\n", ptr - buf);
dst->append(buf, ptr - buf); dst->append(buf, ptr - buf);
// printf("插入之后的长度为:%d", dst->length());
} }
char* EncodeVarint64(char* dst, uint64_t v) { char* EncodeVarint64(char* dst, uint64_t v) {
@ -152,5 +158,42 @@ bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
return false; return false;
} }
} }
// 序列化为字符串
std::string SerializeValue(const FieldArray& fields) {
std::string serialized_value = "";
uint32_t num = fields.size();
// std::cout<< "加入的值中包含的字段数:"<<num<<std::endl;
PutVarint32(&serialized_value, num); // 编码字段个数
for (uint32_t i = 0; i < num; i++) { // 依次编码字段名和字段值
PutLengthPrefixedSlice(&serialized_value, fields[i].first);
PutLengthPrefixedSlice(&serialized_value, fields[i].second);
}
return serialized_value;
}
// 反序列化为字段数组
FieldArray ParseValue(const std::string& value_str) {
uint32_t num=0;
FieldArray fields;
Slice value_sl(value_str); // 把字符串类型的值转为slice类型
if (GetVarint32(&value_sl, &num)) {
for (uint32_t i = 0; i < num; i++)
{
Slice field_name, field_value;
GetLengthPrefixedSlice(&value_sl, &field_name);
GetLengthPrefixedSlice(&value_sl, &field_value);
fields.push_back({field_name.ToString().substr(0, field_name.size()), field_value.ToString().substr(0, field_value.size())});
// std::string str = field_name.data().substr(0, field_name.size());
//std::cout << "字段名:"<< strncpy(str1, field_name.data(),field_name.size())<< std::endl;
// std::cout<<field_name.data()<<"__"<<field_value.data()<< std::endl;
}
}
return fields;
}
} // namespace leveldb } // namespace leveldb

+ 8
- 1
util/coding.h View File

@ -13,11 +13,17 @@
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include<vector>
#include "util/coding.h"
#include "leveldb/db.h"
#include <iostream>
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "port/port.h" #include "port/port.h"
namespace leveldb { namespace leveldb {
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
// Standard Put... routines append to a string // Standard Put... routines append to a string
void PutFixed32(std::string* dst, uint32_t value); void PutFixed32(std::string* dst, uint32_t value);
@ -116,7 +122,8 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
} }
return GetVarint32PtrFallback(p, limit, value); return GetVarint32PtrFallback(p, limit, value);
} }
std::string SerializeValue(const FieldArray& fields);
FieldArray ParseValue(const std::string& value_str);
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_ #endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save