From eedabe8dc32b9efb2e5f97f72d7e91315581fbf0 Mon Sep 17 00:00:00 2001 From: VirgilZhu <94546750@qq.com> Date: Sun, 5 Jan 2025 01:26:19 +0800 Subject: [PATCH] tmp work --- db/db_impl.cc | 24 +++---- db/db_impl.h | 5 +- db/fields.cpp | 176 ++++++++++++++++++++++++++--------------------- db/fields.h | 6 +- test/kv_test.cc | 22 +----- test/value_field_test.cc | 12 +++- 6 files changed, 128 insertions(+), 117 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 47e212e..8ad3874 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -231,6 +231,14 @@ Status DBImpl::NewDB() { return s; } +Env* DBImpl::GetEnv() const { + return env_; +} + +std::string DBImpl::GetDBName() const { + return dbname_; +} + void DBImpl::MaybeIgnoreError(Status* s) const { if (s->ok() || options_.paranoid_checks) { // No change needed @@ -1570,7 +1578,6 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, VlogReader vlogReader(file, &reporter); Slice key_value; - Slice ret_value; char* scratch = new char[encoded_len]; if (vlogReader.ReadValue(offset, encoded_len, &key_value, scratch)) { @@ -1583,6 +1590,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } delete file; + file = nullptr; } return s; @@ -1672,6 +1680,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(write_batch); // TODO begin gc中的batch全部都是设置好的。此时是不需要设置的。 if( !write_batch->IsGarbageColletion() ){ @@ -1685,22 +1695,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { MaybeScheduleGarbageCollection(); } //SetSequence在write_batch中写入本次的sequence - WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); - // Count返回write_batch中的key-value个数 last_sequence += WriteBatchInternal::Count(write_batch); } - vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch); // TODO 这里设置last_sequence 是为了照顾离线回收的时候,在map存在的时候需要调用 ConvertQueue 给回收任务分配sequence。 // TODO 针对多线程调用put的时候,为了避免给gc回收的时候分配的sequence重叠。 - versions_->SetLastSequence(last_sequence); - // TODO end - - - WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); - last_sequence += WriteBatchInternal::Count(write_batch); - - /* TODO */ vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch); + // TODO end // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging diff --git a/db/db_impl.h b/db/db_impl.h index 5d0b274..f7a4a1d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -56,7 +56,10 @@ class DBImpl : public DB { void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - bool ParseVlogValue(Slice key_value, Slice key, std::string& value, uint64_t val_size); + static bool ParseVlogValue(Slice key_value, Slice key, std::string& value, uint64_t val_size); + + Env* GetEnv() const; + std::string GetDBName() const; // Extra methods (for testing) that are not in the public DB interface diff --git a/db/fields.cpp b/db/fields.cpp index fb845eb..cd66004 100644 --- a/db/fields.cpp +++ b/db/fields.cpp @@ -1,11 +1,16 @@ #include #include -//#include +#include "db/filename.h" + +#include "leveldb/env.h" -#include "fields.h" #include "util/coding.h" + #include "dbformat.h" +#include "fields.h" +#include "vlog_reader.h" +#include "db_impl.h" namespace leveldb { @@ -171,97 +176,23 @@ std::string& Fields::operator[](const std::string& field_name) { } /* 通过若干个字段查询 Key */ -std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) { - Fields to_fields = Fields(fields); - to_fields.Fields::SortFields(); - FieldArray search_fields_ = to_fields.fields_; - - std::vector find_keys; - - Iterator* it = db->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - - std::string iter_key = it->key().ToString(); - if (std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()){ - continue; - } - - FieldArray iter_fields_ = Fields::ParseValue(it->value().ToString()).fields_; - - if (iter_fields_ == search_fields_ || - std::includes(iter_fields_.begin(), iter_fields_.end(), - search_fields_.begin(), search_fields_.end())) { - find_keys.emplace_back(iter_key); - } - } - - assert(it->status().ok()); - delete it; - - return find_keys; -} - -//std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) { +//std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields, const std::string& dbname, Env* env) { // Fields to_fields = Fields(fields); // to_fields.Fields::SortFields(); // FieldArray search_fields_ = to_fields.fields_; // // std::vector find_keys; -// std::vector deleted_keys; -// -// std::ofstream outfile; -// outfile.open("/home/workspace/leveldb_kv/test/log.txt"); -// outfile.clear(); -// outfile << "aaaaaaaaaaaaaaaaaaa\n"; // // Iterator* it = db->NewIterator(leveldb::ReadOptions()); // for (it->SeekToFirst(); it->Valid(); it->Next()) { // -// Slice iter_key_slice = it->key(); -// outfile << "\niter_key_slice: " + iter_key_slice.ToString() + "\n"; -// const char* p = iter_key_slice.data(); -// const char* limit = p + iter_key_slice.size(); -//// const char* limit = p + 5; -// -// uint32_t key_length; -// const char* key_ptr = GetVarint32Ptr(p, limit, &key_length); -// -// iter_key_slice = Slice(p, iter_key_slice.size() - 8); -// -//// outfile << "\niter_key_slice: " + iter_key_slice.ToString() + "\n"; -// -// Slice iter_key_for_parse; -// if (!GetLengthPrefixedSlice(&iter_key_slice, &iter_key_for_parse)) { -// continue; -// } -// -// std::string iter_key = iter_key_for_parse.ToString(); -// -// outfile << "\niter_key_str: " + iter_key + "\n"; -// -// if (std::find(deleted_keys.begin(), deleted_keys.end(), iter_key) != deleted_keys.end() -// || std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()) { -// continue; -// } -// -// const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); -// if ((tag & 0xff) == kTypeDeletion) { -// deleted_keys.emplace_back(iter_key); -// continue; -// } -// -// outfile << "\niter_tag_str: " + std::to_string(tag) + "\n"; -// -// Slice iter_value_slice = it->value(); -// Slice iter_value_for_parse; -// if (!GetLengthPrefixedSlice(&iter_value_slice, &iter_value_for_parse)) { +// std::string iter_key = it->key().ToString(); +// if (std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()){ // continue; // } // -// outfile << "\niter_value_str: " + iter_value_for_parse.ToString() + "\n"; +// FieldArray iter_fields_ = Fields::ParseValue(it->value().ToString()).fields_; // -// FieldArray iter_fields_ = Fields::ParseValue(iter_value_for_parse.ToString()).fields_; -// // FieldArray iter_fields_ = Fields::ParseValue(it->value().ToString()).fields_; // if (iter_fields_ == search_fields_ || // std::includes(iter_fields_.begin(), iter_fields_.end(), // search_fields_.begin(), search_fields_.end())) { @@ -275,4 +206,89 @@ std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldAr // return find_keys; //} +std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields, DBImpl* impl) { + Fields to_fields = Fields(fields); + to_fields.Fields::SortFields(); + FieldArray search_fields_ = to_fields.fields_; + + std::vector find_keys; + + Iterator* it = db->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + + std::string iter_key = it->key().ToString(); + if (std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()){ + continue; + } + + FieldArray iter_value_ = Fields::ParseValue(it->value().ToString()).fields_; + + if (!iter_value_.empty()) { + if (iter_value_ == search_fields_ || + std::includes(iter_value_.begin(), iter_value_.end(), + search_fields_.begin(), search_fields_.end())) { + find_keys.emplace_back(iter_key); + } + } else { + uint64_t fid; + uint64_t kv_offset; + uint64_t val_size; + Slice vlog_ptr = it->value(); + if (!(GetVarint64(&vlog_ptr, &fid) + && GetVarint64(&vlog_ptr, &kv_offset) + && GetVarint64(&vlog_ptr, &val_size))) { + continue; + } + uint64_t encoded_len = 1 + VarintLength(it->key().size()) + it->key().size() + VarintLength(val_size) + val_size; + + Env* env = impl->GetEnv(); + std::string dbname = impl->GetDBName(); + + std::string fname = LogFileName(dbname, fid); + RandomAccessFile* file; + + Status s = env->NewRandomAccessFile(fname, &file); + if (!s.ok()) { + continue; + } + struct VlogReporter : public log::VlogReader::Reporter { + Status* status; + void Corruption(size_t bytes, const Status& s) override { + if (this->status->ok()) *this->status = s; + } + }; + VlogReporter reporter; + log::VlogReader vlogReader(file, &reporter); + Slice key_value; + std::string vlog_value; + char* scratch = new char[encoded_len]; + + if (vlogReader.ReadValue(kv_offset, encoded_len, &key_value, scratch)) { + if (!DBImpl::ParseVlogValue(key_value, it->key(), vlog_value, val_size)) { + s = Status::Corruption("value in vlog isn't match with given key"); + } + } else { + s = Status::Corruption("read vlog error"); + } + + delete file; + file = nullptr; + + iter_value_ = Fields::ParseValue(vlog_value).fields_; + if (iter_value_ == search_fields_ || + std::includes(iter_value_.begin(), iter_value_.end(), + search_fields_.begin(), search_fields_.end())) { + find_keys.emplace_back(iter_key); + } + } + } + +// assert(it->status().ok()); + delete it; + + return find_keys; +} + + + } // namespace leveldb diff --git a/db/fields.h b/db/fields.h index e7412da..c9d7e21 100644 --- a/db/fields.h +++ b/db/fields.h @@ -1,9 +1,11 @@ #ifndef LEVELDB_FIELDS_H #define LEVELDB_FIELDS_H -#include "vector" #include "leveldb/db.h" +#include "db_impl.h" +#include "vector" + namespace leveldb { using Field = std::pair; @@ -58,7 +60,7 @@ class Fields { std::string& operator[](const std::string& field_name); /* 通过若干个字段查询 Key */ - static std::vector FindKeysByFields(leveldb::DB* db, const FieldArray& fields); + static std::vector FindKeysByFields(leveldb::DB* db, const FieldArray& fields, leveldb::DBImpl* impl); using iterator = std::vector::iterator; using const_iterator = std::vector::const_iterator; diff --git a/test/kv_test.cc b/test/kv_test.cc index c39a8a0..65ae46d 100644 --- a/test/kv_test.cc +++ b/test/kv_test.cc @@ -8,7 +8,7 @@ using namespace leveldb; constexpr int short_value_size = 4; constexpr int long_value_size = 32; -constexpr int data_size = 32; +constexpr int data_size = 512; Status OpenDB(std::string dbName, DB **db) { std::string rm_command = "rm -rf " + dbName; @@ -33,20 +33,6 @@ void InsertData(DB *db, int value_size) { } - void GetData(DB *db, int size = (1 << 30), int value_size = 0) { - ReadOptions readOptions; - int key_num = data_size / value_size; - - // 点查 - srand(42); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - db->Get(readOptions, key, &value); - } - } - TEST(TestKV, GetValue) { DB *db; if(OpenDB("testdb_TestKV_short_value", &db).ok() == false) { @@ -83,14 +69,12 @@ TEST(TestKV, GetLongValue) { Status status; int key_num = data_size / long_value_size; for (int i = 0; i < key_num; i++) { -// for (int i = 0; i < key_num - 1; i++) { - // int key_ = rand() % key_num+1; std::string key = std::to_string(i); std::string value; std::string expected_value(long_value_size, 'a'); status = db->Get(readOptions, key, &value); - std::cout << key << std::endl; - std::cout << status.ToString() << std::endl; +// std::cout << key << std::endl; +// std::cout << status.ToString() << std::endl; ASSERT_TRUE(status.ok()); EXPECT_EQ(expected_value, value); } diff --git a/test/value_field_test.cc b/test/value_field_test.cc index 6772e57..15bdb1c 100644 --- a/test/value_field_test.cc +++ b/test/value_field_test.cc @@ -18,7 +18,7 @@ Status OpenDB(const std::string& dbName, DB** db) { class FieldsTest : public ::testing::Test { protected: void SetUp() override { - Status s = OpenDB("testdb", &db_); + Status s = OpenDB(dbname_ , &db_); EXPECT_TRUE(s.ok()) << "Failed to open database: " << s.ToString(); } @@ -28,6 +28,7 @@ class FieldsTest : public ::testing::Test { } DB* db_ = nullptr; // 数据库实例指针。 + std::string dbname_ = "testdb"; // 记录数据库路径 }; // 测试各种构造函数 @@ -225,7 +226,12 @@ TEST_F(FieldsTest, TestBulkInsertSerializeDeleteAndFindKeys) { // 使用 FindKeysByFields 查找包含特定字段的键 FieldArray fields_to_find = {{"field2", "value2_"}}; - std::vector found_keys = Fields::FindKeysByFields(db_, fields_to_find); + + Options options; + options.create_if_missing = true; + DBImpl* impl = new DBImpl(options, dbname_); + + std::vector found_keys = Fields::FindKeysByFields(db_, fields_to_find, impl); // 验证找到的键是否正确 EXPECT_EQ(found_keys.size(), num_entries - 1) << "Expected " << num_entries - 1 << " keys but found " << found_keys.size(); @@ -237,7 +243,7 @@ TEST_F(FieldsTest, TestBulkInsertSerializeDeleteAndFindKeys) { // 再次查找,这次没有符合条件的字段 FieldArray no_match_fields = {{"nonexistent_field", ""}}; - found_keys = Fields::FindKeysByFields(db_, no_match_fields); + found_keys = Fields::FindKeysByFields(db_, no_match_fields, impl); EXPECT_TRUE(found_keys.empty()) << "Expected an empty result for non-matching fields."; }