Browse Source

finish Secondary Index based on DBImpl&indexdb_

main
kevinyao0901 9 months ago
parent
commit
ed103e7cfe
10 changed files with 552 additions and 290 deletions
  1. +0
    -2
      .gitignore
  2. +18
    -9
      CMakeLists.txt
  3. +183
    -104
      README.md
  4. +169
    -109
      db/db_impl.cc
  5. +21
    -24
      db/db_impl.h
  6. +1
    -0
      db/db_iter.cc
  7. +3
    -42
      include/leveldb/db.h
  8. +94
    -0
      test/Secondary_index_test.cc
  9. +37
    -0
      test/WAL_test.cc
  10. +26
    -0
      test/db_test1.cc

+ 0
- 2
.gitignore View File

@ -6,5 +6,3 @@
# Build directory. # Build directory.
build/ build/
out/ out/
cmake-build-*
.idea

+ 18
- 9
CMakeLists.txt View File

@ -329,7 +329,7 @@ if(LEVELDB_BUILD_TESTS)
PRIVATE PRIVATE
"db/autocompact_test.cc" "db/autocompact_test.cc"
"db/corruption_test.cc" "db/corruption_test.cc"
"db/db_test.cc"
# "db/db_test.cc"
"db/dbformat_test.cc" "db/dbformat_test.cc"
"db/filename_test.cc" "db/filename_test.cc"
"db/log_test.cc" "db/log_test.cc"
@ -518,14 +518,23 @@ if(LEVELDB_INSTALL)
) )
endif(LEVELDB_INSTALL) endif(LEVELDB_INSTALL)
# add for ./test/db_test1.cc
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
)
target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(WAL_test
"${PROJECT_SOURCE_DIR}/test/WAL_test.cc"
)
target_link_libraries(WAL_test leveldb)
add_executable(Serialize_test
"${PROJECT_SOURCE_DIR}/test/Serialize_test.cc"
)
target_link_libraries(Serialize_test leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(Secondary_index_test
"${PROJECT_SOURCE_DIR}/test/Secondary_index_test.cc"
)
target_link_libraries(Secondary_index_test leveldb)

+ 183
- 104
README.md View File

@ -1,167 +1,246 @@
### **实验计划说明报告:基于 `embedded_secondary-index``LevelDB` 实现及实验**
LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.
------
> **This repository is receiving very limited maintenance. We will only review the following types of changes.**
>
> * Fixes for critical bugs, such as data loss or memory corruption
> * Changes absolutely needed by internally supported leveldb clients. These typically fix breakage introduced by a language/standard library/OS update
#### **1. 实验背景**
[![ci](https://github.com/google/leveldb/actions/workflows/build.yml/badge.svg)](https://github.com/google/leveldb/actions/workflows/build.yml)
LevelDB 是一个高性能的持久化键值存储引擎,提供简单的 `API` 用于高效的读写操作。然而,传统 `LevelDB` 仅支持基于主键的快速查询,而无法直接支持对二级属性的查询需求。在许多场景(如搜索系统或复杂索引系统)中,需要支持高效的二级索引查询。
Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com)
本实验计划基于 `embedded_secondary-index` 的设计扩展了 `LevelDB`,支持通过嵌入式布隆过滤器实现的二级索引查询,并引入了 Top-K 查询功能以提升二级属性查询的实用性和效率。
# Features
------
* Keys and values are arbitrary byte arrays.
* Data is stored sorted by key.
* Callers can provide a custom comparison function to override the sort order.
* The basic operations are `Put(key,value)`, `Get(key)`, `Delete(key)`.
* Multiple changes can be made in one atomic batch.
* Users can create a transient snapshot to get a consistent view of data.
* Forward and backward iteration is supported over the data.
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported.
* External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions.
#### **2. 实验目标**
# Documentation
- 实现一个支持二级索引查询的 `LevelDB` 扩展版本。
- 验证嵌入式二级索引的设计在读写性能和查询效率上的优越性。
- 测试支持二级索引查询的数据库在 Top-K 查询功能上的性能表现。
[LevelDB library documentation](https://github.com/google/leveldb/blob/main/doc/index.md) is online and bundled with the source code.
------
# Limitations
#### **3. 系统设计**
* This is not a SQL database. It does not have a relational data model, it does not support SQL queries, and it has no support for indexes.
* Only a single process (possibly multi-threaded) can access a particular database at a time.
* There is no client-server support builtin to the library. An application that needs such support will have to wrap their own server around the library.
本实验采用 **`embedded_secondary-index`** 的实现方式,将二级索引嵌入到 `LevelDB` 的原有数据结构中。以下是系统的核心设计:
# Getting the Source
##### **3.1 数据结构设计**
```bash
git clone --recurse-submodules https://github.com/google/leveldb.git
```
1. **`MemTable`**
- 在内存中维护主键与二级属性的数据映射关系。
- 对二级属性构建布隆过滤器以提高查询效率。
2. **`SSTable`**
- 每个 `SSTable` 包含多个数据块(存储键值对)、元数据块(记录索引信息)和布隆过滤器块(分别用于主键和二级属性的快速过滤)。
- 数据写入磁盘时,布隆过滤器被嵌入到 `SSTable` 中,避免额外的索引文件。
3. **布隆过滤器**
- 对每个数据块的二级属性计算布隆过滤器位串。
- 通过内存中加载的布隆过滤器快速筛选可能包含目标数据的块,减少磁盘 IO。
# Building
##### **3.2 查询算法设计**
This project supports [CMake](https://cmake.org/) out of the box.
1. **Top-K 查询**
- 查询时,先通过布隆过滤器筛选出可能的 `SSTable` 和数据块。
- 使用小顶堆保存查询结果,根据 `sequence_number`(插入顺序)排序,最终返回最近的 K 条记录。
2. **层次化查询流程**
- 优先从 `MemTable` 查询;
- 若未命中,则逐层遍历 `SSTable`
### Build for POSIX
------
Quick start:
#### **4. 实验步骤**
```bash
mkdir -p build && cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake --build .
```
##### **4.1 系统实现**
### Building for Windows
1. 修改 `LevelDB` 的源码以支持二级索引嵌入:
- 更新 `SSTable` 数据块结构,增加布隆过滤器支持;
- 修改 `Write``Flush` 流程,嵌入二级索引信息。
2. 扩展数据库的 `API`
- 实现二级索引的查询接口(`RangeLookUp` 和 `Top-K LookUp`)。
3. 使用 Google Test 编写单元测试,验证功能正确性。
First generate the Visual Studio 2017 project/solution files:
##### **4.2 计划性能测试**
```cmd
mkdir build
cd build
cmake -G "Visual Studio 15" ..
```
The default default will build for x86. For 64-bit run:
1. **数据准备**
```cmd
cmake -G "Visual Studio 15 Win64" ..
```
- 生成包含主键和二级属性的模拟数据集。
To compile the Windows solution from the command-line:
- 数据格式示例:
```cmd
devenv /build Debug leveldb.sln
```
```json
{
"primary_key": "id12345",
"secondary_key": "tag123",
"value": "This is a test record."
}
```
or open leveldb.sln in Visual Studio and build from within.
2. **测试指标**
Please see the CMake documentation and `CMakeLists.txt` for more advanced usage.
- 数据写入性能(`QPS`)。
- 基于二级属性的查询性能:
- 单次查询耗时;
- 不同 Top-K 参数下的查询性能;
- 对比嵌入式二级索引与传统外部索引在查询性能上的表现。
# Contributing to the leveldb Project
3. **测试工具**
计划使用 Benchmark 工具测量数据库的吞吐量与延迟。
> **This repository is receiving very limited maintenance. We will only review the following types of changes.**
>
> * Bug fixes
> * Changes absolutely needed by internally supported leveldb clients. These typically fix breakage introduced by a language/standard library/OS update
The leveldb project welcomes contributions. leveldb's primary goal is to be
a reliable and fast key/value store. Changes that are in line with the
features/limitations outlined above, and meet the requirements below,
will be considered.
Contribution requirements:
------
1. **Tested platforms only**. We _generally_ will only accept changes for
platforms that are compiled and tested. This means POSIX (for Linux and
macOS) or Windows. Very small changes will sometimes be accepted, but
consider that more of an exception than the rule.
#### **5. 附录:系统结构图**
2. **Stable API**. We strive very hard to maintain a stable API. Changes that
require changes for projects using leveldb _might_ be rejected without
sufficient benefit to the project.
1. 下面提供一些建议的结构图,可以清晰说明基于 **`embedded_secondary-index`** 的设计和实现,适合配合实验报告使用:
3. **Tests**: All changes must be accompanied by a new (or changed) test, or
a sufficient explanation as to why a new (or changed) test is not required.
------
4. **Consistent Style**: This project conforms to the
[Google C++ Style Guide](https://google.github.io/styleguide/cppguide.html).
To ensure your changes are properly formatted please run:
### **1. 系统整体架构图**
```
clang-format -i --style=file <file>
```
**图示内容**
展示 `embedded_secondary-index` 的整体设计,包括主键、二级属性的存储方式,以及布隆过滤器与 `SSTable` 的嵌入关系。
We are unlikely to accept contributions to the build configuration files, such
as `CMakeLists.txt`. We are focused on maintaining a build configuration that
allows us to test that the project works in a few supported configurations
inside Google. We are not currently interested in supporting other requirements,
such as different operating systems, compilers, or build systems.
**图示结构**
## Submitting a Pull Request
![error](./Report/png/Structure1.svg)
Before any pull request will be accepted the author must first sign a
Contributor License Agreement (CLA) at https://cla.developers.google.com/.
- 要点说明:
1. 二级索引与布隆过滤器紧密嵌入 `SSTable` 的元数据块中,避免外部索引文件的开销。
2. 查询时,通过布隆过滤器快速过滤非相关 `SSTable`,只访问可能的匹配块。
In order to keep the commit timeline linear
[squash](https://git-scm.com/book/en/v2/Git-Tools-Rewriting-History#Squashing-Commits)
your changes down to a single commit and [rebase](https://git-scm.com/docs/git-rebase)
on google/leveldb/main. This keeps the commit timeline linear and more easily sync'ed
with the internal repository at Google. More information at GitHub's
[About Git rebase](https://help.github.com/articles/about-git-rebase/) page.
------
# Performance
### **2. 数据写入流程图**
Here is a performance report (with explanations) from the run of the
included db_bench program. The results are somewhat noisy, but should
be enough to get a ballpark performance estimate.
**图示内容**
描述写入数据时如何解析主键和二级属性,并更新布隆过滤器和 `SSTable` 的流程。
## Setup
**图示结构**
We use a database with a million entries. Each entry has a 16 byte
key, and a 100 byte value. Values used by the benchmark compress to
about half their original size.
![error](./Report/png/Structure2.svg)
LevelDB: version 1.1
Date: Sun May 1 12:11:26 2011
CPU: 4 x Intel(R) Core(TM)2 Quad CPU Q6600 @ 2.40GHz
CPUCache: 4096 KB
Keys: 16 bytes each
Values: 100 bytes each (50 bytes after compression)
Entries: 1000000
Raw Size: 110.6 MB (estimated)
File Size: 62.9 MB (estimated)
- **要点说明**
写入过程中,自动解析主键和二级属性,实时更新布隆过滤器,确保写入操作高效完成。
## Write performance
------
The "fill" benchmarks create a brand new database, in either
sequential, or random order. The "fillsync" benchmark flushes data
from the operating system to the disk after every operation; the other
write operations leave the data sitting in the operating system buffer
cache for a while. The "overwrite" benchmark does random writes that
update existing keys in the database.
### **3. 数据查询流程图**
fillseq : 1.765 micros/op; 62.7 MB/s
fillsync : 268.409 micros/op; 0.4 MB/s (10000 ops)
fillrandom : 2.460 micros/op; 45.0 MB/s
overwrite : 2.380 micros/op; 46.5 MB/s
**图示内容**
展示基于二级属性查询的具体步骤,包括布隆过滤器筛选、块访问和结果返回。
Each "op" above corresponds to a write of a single key/value pair.
I.e., a random write benchmark goes at approximately 400,000 writes per second.
**图示结构**
Each "fillsync" operation costs much less (0.3 millisecond)
than a disk seek (typically 10 milliseconds). We suspect that this is
because the hard disk itself is buffering the update in its memory and
responding before the data has been written to the platter. This may
or may not be safe based on whether or not the hard disk has enough
power to save its memory in the event of a power failure.
![error](./Report/png/Structure3.svg)
## Read performance
- **要点说明**
布隆过滤器用于筛选目标 `SSTable`,通过小顶堆实现 Top-K 的排序与记录收集,保证查询的效率。
We list the performance of reading sequentially in both the forward
and reverse direction, and also the performance of a random lookup.
Note that the database created by the benchmark is quite small.
Therefore the report characterizes the performance of leveldb when the
working set fits in memory. The cost of reading a piece of data that
is not present in the operating system buffer cache will be dominated
by the one or two disk seeks needed to fetch the data from disk.
Write performance will be mostly unaffected by whether or not the
working set fits in memory.
------
readrandom : 16.677 micros/op; (approximately 60,000 reads per second)
readseq : 0.476 micros/op; 232.3 MB/s
readreverse : 0.724 micros/op; 152.9 MB/s
### **4. `SSTable` 布局示意图**
LevelDB compacts its underlying storage data in the background to
improve read performance. The results listed above were done
immediately after a lot of random writes. The results after
compactions (which are usually triggered automatically) are better.
**图示内容**
展示 `SSTable` 内部如何组织主键、二级属性和布隆过滤器的布局。
readrandom : 11.602 micros/op; (approximately 85,000 reads per second)
readseq : 0.423 micros/op; 261.8 MB/s
readreverse : 0.663 micros/op; 166.9 MB/s
**图示结构**
Some of the high cost of reads comes from repeated decompression of blocks
read from disk. If we supply enough cache to the leveldb so it can hold the
uncompressed blocks in memory, the read performance improves again:
![error](./Report/png/Structure4.svg)
readrandom : 9.775 micros/op; (approximately 100,000 reads per second before compaction)
readrandom : 5.215 micros/op; (approximately 190,000 reads per second after compaction)
- **要点说明:**
1. 每个 `SSTable` 包含数据块(Data Blocks)、元数据块(Meta Block)和布隆过滤器块(Bloom Filter Blocks)。
2. 二级属性的布隆过滤器和主键布隆过滤器分别存储,提供不同维度的快速索引。
## Repository contents
------
See [doc/index.md](doc/index.md) for more explanation. See
[doc/impl.md](doc/impl.md) for a brief overview of the implementation.
### **5. Top-K 查询堆排序示意图**
The public interface is in include/leveldb/*.h. Callers should not include or
rely on the details of any other header files in this package. Those
internal APIs may be changed without warning.
**图示内容**
以小顶堆为核心,说明查询结果如何按照时间顺序(`sequence_number`)进行排序。
Guide to header files:
**图示结构**
* **include/leveldb/db.h**: Main interface to the DB: Start here.
![error](./Report/png/Structure5.svg)
* **include/leveldb/options.h**: Control over the behavior of an entire database,
and also control over the behavior of individual reads and writes.
* **include/leveldb/comparator.h**: Abstraction for user-specified comparison function.
If you want just bytewise comparison of keys, you can use the default
comparator, but clients can write their own comparator implementations if they
want custom ordering (e.g. to handle different character encodings, etc.).
- **要点说明**
查询过程中,维护一个固定大小的小顶堆,仅保留最近的 K 条记录,大幅提高排序效率。
* **include/leveldb/iterator.h**: Interface for iterating over data. You can get
an iterator from a DB object.
------
* **include/leveldb/write_batch.h**: Interface for atomically applying multiple
updates to a database.
* **include/leveldb/slice.h**: A simple module for maintaining a pointer and a
length into some other byte array.
* **include/leveldb/status.h**: Status is returned from many of the public interfaces
and is used to report success and various kinds of errors.
* **include/leveldb/env.h**:
Abstraction of the OS environment. A posix implementation of this interface is
in util/env_posix.cc.
* **include/leveldb/table.h, include/leveldb/table_builder.h**: Lower-level modules that most
clients probably won't use directly.

+ 169
- 109
db/db_impl.cc View File

@ -11,7 +11,7 @@
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include <sstream>
#include <iostream>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
@ -36,7 +36,6 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
namespace leveldb { namespace leveldb {
const int kNumNonTableCacheFiles = 10; const int kNumNonTableCacheFiles = 10;
@ -125,33 +124,25 @@ static int TableCacheSize(const Options& sanitized_options) {
return sanitized_options.max_open_files - kNumNonTableCacheFiles; return sanitized_options.max_open_files - kNumNonTableCacheFiles;
} }
//Secondary Index ToDo
FieldDb::FieldDb(DB* kv_db, DB* index_db)
: kvDb(kv_db), // 初始化 kvDb 成员变量
indexDb(index_db), // 初始化 indexDb 成员变量
fieldWithIndex(), // 初始化字段索引列表
taskQueue() { // 初始化任务队列
// 如果需要在构造函数中做额外的初始化操作,可以在这里进行
//ToDo
std::string EncodeIndexKey(const std::string& fieldName, const std::string& data) {
// Encode the field and the data as a key for the index.
return fieldName + ":" + data;
} }
FieldDb::~FieldDb() {
// 清空 taskQueue 中的所有任务
while (!taskQueue.empty()) {
taskQueue.pop();
}
// 对于 kvDb 和 indexDb,如果它们需要手动释放资源,可以在这里进行处理
// 注意:通常 leveldb 会负责释放它们的资源,如果不需要手动释放,可以忽略这一步
// 如果 kvDb 和 indexDb 是由 FieldDb 管理的并且需要释放,可以在此释放
// delete kvDb; // 假设我们负责销毁 kvDb
// delete indexDb; // 假设我们负责销毁 indexDb
std::string DecodeIndexKey(const std::string& indexKey) {
// Assuming the index key is formatted as fieldName:data.
size_t delimiter_pos = indexKey.find(':');
if (delimiter_pos == std::string::npos) {
return "";
}
return indexKey.substr(delimiter_pos + 1); // Extract data part
} }
//ToDo end //ToDo end
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: /*Secondary Index ToDo*/ FieldDb(kvDb, indexDb), // 调用 FieldDb 构造函数初始化 kvDb 和 indexDb /*ToDo end */
env_(raw_options.env),
: env_(raw_options.env),
internal_comparator_(raw_options.comparator), internal_comparator_(raw_options.comparator),
internal_filter_policy_(raw_options.filter_policy), internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions(dbname, &internal_comparator_, options_(SanitizeOptions(dbname, &internal_comparator_,
@ -174,7 +165,22 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
&internal_comparator_)) ,
//ToDo
fieldWithIndex_(), // 初始化fieldWithIndex__
indexDb_(nullptr) // 初始化indexDb_
{
// 在这里执行额外的操作
if (dbname_.find("_index") == std::string::npos) {
Status s = DB::Open(options_, dbname_ + "_index", &indexDb_);
if (!s.ok()) {
fprintf(stderr, "Failed to initialize index database: %s\n", s.ToString().c_str());
}
}
}
//ToDo end
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
// Wait for background work to finish. // Wait for background work to finish.
@ -197,6 +203,13 @@ DBImpl::~DBImpl() {
delete logfile_; delete logfile_;
delete table_cache_; delete table_cache_;
//ToDo
// Clean up indexDb_ (new addition)
if (indexDb_ != nullptr) {
delete indexDb_; // Clean up the index database
}
//ToDo end
if (owns_info_log_) { if (owns_info_log_) {
delete options_.info_log; delete options_.info_log;
} }
@ -623,90 +636,6 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
} }
} }
//Secondary index ToDo
// 定义字段类型别名
Status DBImpl::CreateIndexOnField(const std::string& fieldName) {
// 首先检查字段是否已经创建了索引
for (const auto& field : fieldWithIndex) {
if (field == fieldName) {
return Status::InvalidArgument("Index already exists for this field");
}
}
// 将索引字段添加到 fieldWithIndex 列表中
fieldWithIndex.push_back(fieldName);
// 在索引数据库中为字段创建索引(这只是一个简单的例子,实际的索引创建可能会更复杂)
std::string key = "index:" + fieldName;
std::string value = "index_created";
Status s = indexDb->Put(WriteOptions(), Slice(key), Slice(value));
if (!s.ok()) {
return s;
}
return Status::OK();
}
Status DBImpl::DeleteIndex(const std::string& fieldName) {
auto it = std::find(fieldWithIndex.begin(), fieldWithIndex.end(), fieldName);
if (it == fieldWithIndex.end()) {
return Status::NotFound("Index not found for this field");
}
// 从列表中移除该字段
fieldWithIndex.erase(it);
// 删除索引数据库中的条目
std::string key = "index:" + fieldName;
Status s = indexDb->Delete(WriteOptions(), Slice(key));
if (!s.ok()) {
return s;
}
return Status::OK();
}
Status DBImpl::QueryByIndex(const std::string& fieldName, std::vector<std::string>* results) {
// 检查该字段是否有索引
auto it = std::find(fieldWithIndex.begin(), fieldWithIndex.end(), fieldName);
if (it == fieldWithIndex.end()) {
return Status::NotFound("No index found for this field");
}
// 执行查询操作(这是一个简单的查询方法,实际应用中可能会更复杂)
std::string key = "index:" + fieldName;
std::string value;
Status s = indexDb->Get(ReadOptions(), Slice(key), &value);
if (!s.ok()) {
return s;
}
// 假设我们通过值可以获取到索引的内容,添加到结果列表中
results->push_back(value); // 这里可以根据实际需求来处理查询结果
return Status::OK();
}
Status DBImpl::EncodeIndexKey(const std::string& fieldName, const std::string& key, std::string* encodedKey) {
// 简单的编码示例,可以根据需求修改
*encodedKey = fieldName + ":" + key;
return Status::OK();
}
Status DBImpl::DecodeIndexKey(const std::string& encodedKey, std::string* fieldName, std::string* originalKey) {
size_t pos = encodedKey.find(':');
if (pos == std::string::npos) {
return Status::InvalidArgument("Invalid encoded key");
}
*fieldName = encodedKey.substr(0, pos);
*originalKey = encodedKey.substr(pos + 1);
return Status::OK();
}
// ToDo end
void DBImpl::TEST_CompactRange(int level, const Slice* begin, void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) { const Slice* end) {
assert(level >= 0); assert(level >= 0);
@ -1594,6 +1523,138 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
//ToDo
// Create an index on a specified field
Status DBImpl::CreateIndexOnField(const std::string& fieldName) {
// 首先检查字段是否已经创建了索引
for (const auto& field : fieldWithIndex_) {
if (field == fieldName) {
return Status::InvalidArgument("Index already exists for this field");
}
}
// 将索引字段添加到 fieldWithIndex_ 列表中
fieldWithIndex_.push_back(fieldName);
// 扫描所有原始数据并为每一条记录创建索引
leveldb::ReadOptions read_options;
leveldb::Iterator* it = this->NewIterator(read_options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string key = it->key().ToString();
std::string value = it->value().ToString();
// 提取字段值
size_t field_pos = value.find(fieldName + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + fieldName.size() + 1; // 跳过 "fieldName:"
size_t value_end = value.find("|", value_start); // 查找下一个分隔符
if (value_end == std::string::npos) {
value_end = value.size();
}
std::string field_value = value.substr(value_start, value_end - value_start);
// 生成二级索引的 key 为 "field_value-key"
std::string index_key = fieldName + ":" + field_value;
// 在二级索引数据库中创建索引,值设为 null
leveldb::Status s = indexDb_->Put(WriteOptions(), Slice(index_key), Slice(key));
if (!s.ok()) {
delete it;
return s;
}
}
}
delete it;
return Status::OK();
}
// Delete an index for a specified field
Status DBImpl::DeleteIndex(const std::string& fieldName) {
// 检查该字段是否已经创建了索引
auto it = std::find(fieldWithIndex_.begin(), fieldWithIndex_.end(), fieldName);
if (it == fieldWithIndex_.end()) {
return Status::NotFound("Index not found for this field");
}
// 从列表中移除该字段
fieldWithIndex_.erase(it);
// 遍历数据库中的所有记录,删除与该字段相关的索引
leveldb::ReadOptions read_options;
leveldb::Iterator* it_index = indexDb_->NewIterator(read_options);
for (it_index->SeekToFirst(); it_index->Valid(); it_index->Next()) {
std::string index_key = it_index->key().ToString();
std::string value = it_index->value().ToString();
// 检查该索引条目是否与目标字段相关
size_t field_pos = index_key.find(fieldName + ":");
if (field_pos != std::string::npos) {
// 删除该索引条目
Status s = indexDb_->Delete(WriteOptions(), Slice(index_key));
if (!s.ok()) {
delete it_index;
return s;
}
}
}
delete it_index;
return Status::OK();
}
// Query by index (retrieve all values indexed by a field)
std::vector<std::string> DBImpl::QueryByIndex(const std::string& fieldName) {
std::vector<std::string> results;
// 假设您有一个存储索引的数据库 indexDb_
// 例如:leveldb::DB* indexDb_;,并且它存储字段名称到值的映射
leveldb::ReadOptions read_options;
leveldb::Iterator* it = indexDb_->NewIterator(read_options);
for (it->Seek(fieldName); it->Valid(); it->Next()) {
std::string value = it->value().ToString();
// 仅将非空值添加到结果中
if (!value.empty()) {
results.push_back(value);
}
}
if (!it->status().ok()) {
// 处理查询错误
std::cerr << "Error querying index: " << it->status().ToString() << std::endl;
}
delete it;
return results;
}
// Helper methods for managing indexed fields
void DBImpl::UpdatefieldWithIndex_(const std::string& fieldName) {
fieldWithIndex_.push_back(fieldName);
}
void DBImpl::RemoveFieldFromIndex(const std::string& fieldName) {
auto it = std::find(fieldWithIndex_.begin(), fieldWithIndex_.end(), fieldName);
if (it != fieldWithIndex_.end()) {
fieldWithIndex_.erase(it);
}
}
// // Helper for logging index operations
// void DBImpl::WriteIndexLog(const std::string& operation, const std::string& fieldName) {
// WriteBatch batch;
// std::string logMessage = operation + ": " + fieldName;
// batch.Put(Slice(logMessage), Slice());
// kvDb_->Write(WriteOptions(), &batch); // Write index log to kvDb
// }
//ToDo end
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
@ -1653,7 +1714,6 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return s; return s;
} }
Snapshot::~Snapshot() = default; Snapshot::~Snapshot() = default;
Status DestroyDB(const std::string& dbname, const Options& options) { Status DestroyDB(const std::string& dbname, const Options& options) {

+ 21
- 24
db/db_impl.h View File

@ -9,7 +9,6 @@
#include <deque> #include <deque>
#include <set> #include <set>
#include <string> #include <string>
#include <utility>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_writer.h" #include "db/log_writer.h"
@ -27,7 +26,7 @@ class Version;
class VersionEdit; class VersionEdit;
class VersionSet; class VersionSet;
class DBImpl : /*Secondary Index ToDo*/ public DB , public FieldDb/*ToDo end*/{
class DBImpl : public DB {
public: public:
DBImpl(const Options& options, const std::string& dbname); DBImpl(const Options& options, const std::string& dbname);
@ -52,14 +51,6 @@ class DBImpl : /*Secondary Index ToDo*/ public DB , public FieldDb/*ToDo end*/{
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
//Secondary Index ToDo
// FieldDb
Status CreateIndexOnField(const std::string& fieldName) override;
Status DeleteIndex(const std::string& fieldName) override;
Status QueryByIndex(const std::string& fieldName, std::vector<std::string>* results) override;
//ToDo end
// Compact any files in the named level that overlap [*begin,*end] // Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end); void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
@ -164,14 +155,6 @@ class DBImpl : /*Secondary Index ToDo*/ public DB , public FieldDb/*ToDo end*/{
return internal_comparator_.user_comparator(); return internal_comparator_.user_comparator();
} }
//Secondary Index ToDo
// FieldDb
DB* kvDb; //
DB* indexDb; //
std::vector<std::string> fieldWithIndex; //
std::queue<std::pair<bool, std::string>> taskQueue;
//ToDo end
// Constant after construction // Constant after construction
Env* const env_; Env* const env_;
const InternalKeyComparator internal_comparator_; const InternalKeyComparator internal_comparator_;
@ -221,15 +204,29 @@ class DBImpl : /*Secondary Index ToDo*/ public DB , public FieldDb/*ToDo end*/{
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
//Secondary Index ToDo
// Helper methods to interact with the underlying database
Status EncodeIndexKey(const std::string& fieldName, const std::string& key, std::string* encodedKey);
Status DecodeIndexKey(const std::string& encodedKey, std::string* fieldName, std::string* originalKey);
//ToDo end
};
//ToDo
// Add the indexDb for storing the secondary index
std::vector<std::string> fieldWithIndex_; //
DB* indexDb_; //
// Add helper methods for index creation, deletion, and query
Status CreateIndexOnField(const std::string& fieldName); // Create an index on a specific field
Status DeleteIndex(const std::string& fieldName); // Delete the index on a specific field
std::vector<std::string> QueryByIndex(const std::string& fieldName); // Query using the index
void UpdatefieldWithIndex_(const std::string& fieldName);
// Helper methods for encoding/decoding index keys
std::string GenerateIndexKey(const std::string& fieldName, const std::string& data);
std::string DecodeIndexKey(const std::string& indexKey);
// Method for handling index-related logging and consistency
void UpdateFieldWithIndex(const std::string& fieldName);
void RemoveFieldFromIndex(const std::string& fieldName);
void WriteIndexLog(const std::string& operation, const std::string& fieldName);
//ToDo end
};
// Sanitize db options. The caller should delete result.info_log if // Sanitize db options. The caller should delete result.info_log if
// it is not equal to src.info_log. // it is not equal to src.info_log.

+ 1
- 0
db/db_iter.cc View File

@ -315,4 +315,5 @@ Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed); return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
} }
} // namespace leveldb } // namespace leveldb

+ 3
- 42
include/leveldb/db.h View File

@ -147,52 +147,13 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database: // Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr); // db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0; virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
};
// Secondary index ToDo
// Definition of the FieldDb class to manage kvDb and indexDb
class LEVELDB_EXPORT FieldDb {
public:
FieldDb(DB* kv_db, DB* index_db);
virtual ~FieldDb();
// Open
static Status Open(const Options& options, const std::string& dbname, FieldDb** field_db);
// Add an index on the specified field
//ToDo
virtual Status CreateIndexOnField(const std::string& fieldName) = 0; virtual Status CreateIndexOnField(const std::string& fieldName) = 0;
// Remove an index on the specified field
virtual Status DeleteIndex(const std::string& fieldName) = 0; virtual Status DeleteIndex(const std::string& fieldName) = 0;
// Query the database using an index
virtual Status QueryByIndex(const std::string& fieldName, std::vector<std::string>* results) = 0;
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) = 0;
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
virtual const Snapshot* GetSnapshot() = 0;
virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
virtual void GetApproximateSizes(const Range* range, int n,
uint64_t* sizes) = 0;
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
private:
// Helper methods
virtual Status EncodeIndexKey(const std::string& fieldName, const std::string& key, std::string* encodedKey) = 0;
virtual Status DecodeIndexKey(const std::string& encodedKey, std::string* fieldName, std::string* originalKey) = 0;
DB* kvDb; // Original database (key-value store)
DB* indexDb; // Index database (secondary index)
std::vector<std::string> fieldWithIndex; // List of fields with indexes
std::queue<std::pair<bool, std::string>> taskQueue;
virtual std::vector<std::string> QueryByIndex(const std::string& fieldName) = 0;
//ToDo end
}; };
// ToDo end
// Destroy the contents of the specified database. // Destroy the contents of the specified database.
// Be very careful using this method. // Be very careful using this method.

+ 94
- 0
test/Secondary_index_test.cc View File

@ -0,0 +1,94 @@
#include <iostream>
#include <cassert>
#include <vector>
#include "leveldb/db.h"
// 为字段创建索引
void TestCreateIndexOnField(leveldb::DB* db) {
// 创建索引
leveldb::Status status = db->CreateIndexOnField("name");
assert(status.ok() && "Failed to create index on field 'name'");
std::cout << "Index on 'name' created successfully." << std::endl;
}
// 按照索引查询
void TestQueryByIndex(leveldb::DB* db) {
// 插入一些数据
leveldb::WriteOptions write_options;
leveldb::Status status;
// 插入记录,格式为 "name:Customer#000000001|address:IVhzIApeRb|phone:25-989-741-2988"
status = db->Put(write_options, "k_1", "name:Customer#000000001|address:IVhzIApeRb|phone:25-989-741-2988");
assert(status.ok() && "Failed to insert data for k_1");
status = db->Put(write_options, "k_2", "name:Customer#000000002|address:XSTf4,NCwDVaW|phone:23-768-687-3665");
assert(status.ok() && "Failed to insert data for k_2");
status = db->Put(write_options, "k_3", "name:Customer#000000003|address:MG9kdTD2WBHm|phone:11-719-748-3364");
assert(status.ok() && "Failed to insert data for k_3");
status = db->Put(write_options, "k_4", "name:Customer#000000004|address:XxVSJsLAGtn|phone:14-128-190-5944");
assert(status.ok() && "Failed to insert data for k_4");
status = db->Put(write_options, "k_5", "name:Customer#000000005|address:KvpyuHCplrB84Wg|phone:13-750-942-6364");
assert(status.ok() && "Failed to insert data for k_5");
std::cout << "Data inserted successfully." << std::endl;
// 查询索引(我们假设查询字段值为 name:Customer#000000001)
std::vector<std::string> results = db->QueryByIndex("name:Customer#000000001");
assert(!results.empty() && "Query by index returned no results");
std::cout << "Query by index results for name=Customer#000000001: ";
for (const auto& result : results) {
std::cout << result << ", ";
}
std::cout << std::endl;
// 验证查询结果
assert(results.size() == 1 && results[0] == "k_1");
std::cout << "Query by index test passed." << std::endl;
}
// 删除索引测试
void TestDeleteIndex(leveldb::DB* db) {
// 删除索引
leveldb::Status status = db->DeleteIndex("name");
assert(status.ok() && "Failed to delete index on field 'name'");
std::cout << "Index on 'name' deleted successfully." << std::endl;
// 尝试查询已删除的索引
std::vector<std::string> results = db->QueryByIndex("name:Customer#000000001");
assert(results.empty() && "Query on deleted index should return no results");
std::cout << "Query after deleting index returned no results as expected." << std::endl;
}
int main() {
// 配置数据库选项
leveldb::Options options;
options.create_if_missing = true;
// 打开数据库
leveldb::DB* db = nullptr;
leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);
assert(status.ok() && "Failed to open database");
// 测试创建索引
TestCreateIndexOnField(db);
// 测试查询索引
TestQueryByIndex(db);
// 测试删除索引
TestDeleteIndex(db);
// 关闭数据库
delete db;
std::cout << "All tests passed." << std::endl;
return 0;
}

+ 37
- 0
test/WAL_test.cc View File

@ -0,0 +1,37 @@
#include "leveldb/db.h"
#include "leveldb/env.h" // 添加对 Env 类的完整定义
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
// 创建日志文件并将其关联到 Options 的 info_log
Logger* logger;
Status log_status = Env::Default()->NewLogger("leveldb_log.txt", &logger);
if (log_status.ok()) {
op.info_log = logger; // 正确设置 Logger 指针
} else {
cerr << "Failed to create logger: " << log_status.ToString() << endl;
}
Status status = DB::Open(op, "testWAL", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 26
- 0
test/db_test1.cc View File

@ -0,0 +1,26 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb");
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

Loading…
Cancel
Save