Browse Source

完成kv分离写功能

main
朴祉燕 9 months ago
parent
commit
49f2c85cb7
7 changed files with 2944 additions and 374 deletions
  1. +541
    -0
      table/CMakeLists.txt
  2. +63
    -26
      table/blob_file.cc
  3. +34
    -25
      table/blob_file.h
  4. +1616
    -0
      table/db_impl.cc
  5. +226
    -0
      table/db_impl.h
  6. +119
    -0
      table/kv_seperate_test.cc
  7. +345
    -323
      table/table_builder.cc

+ 541
- 0
table/CMakeLists.txt View File

@ -0,0 +1,541 @@
# Copyright 2017 The LevelDB Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.
cmake_minimum_required(VERSION 3.9)
# Keep the version below in sync with the one in db.h
project(leveldb VERSION 1.23.0 LANGUAGES C CXX)
# C standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_C_STANDARD)
# This project can use C11, but will gracefully decay down to C89.
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED OFF)
set(CMAKE_C_EXTENSIONS OFF)
endif(NOT CMAKE_C_STANDARD)
# C++ standard can be overridden when this is used as a sub-project.
if(NOT CMAKE_CXX_STANDARD)
# This project requires C++11.
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
endif(NOT CMAKE_CXX_STANDARD)
if (WIN32)
set(LEVELDB_PLATFORM_NAME LEVELDB_PLATFORM_WINDOWS)
# TODO(cmumford): Make UNICODE configurable for Windows.
add_definitions(-D_UNICODE -DUNICODE)
else (WIN32)
set(LEVELDB_PLATFORM_NAME LEVELDB_PLATFORM_POSIX)
endif (WIN32)
option(LEVELDB_BUILD_TESTS "Build LevelDB's unit tests" ON)
option(LEVELDB_BUILD_BENCHMARKS "Build LevelDB's benchmarks" ON)
option(LEVELDB_INSTALL "Install LevelDB's header and library" ON)
include(CheckIncludeFile)
check_include_file("unistd.h" HAVE_UNISTD_H)
include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
include(CheckCXXSymbolExists)
# Using check_cxx_symbol_exists() instead of check_c_symbol_exists() because
# we're including the header from C++, and feature detection should use the same
# compiler language that the project will use later. Principles aside, some
# versions of do not expose fdatasync() in <unistd.h> in standard C mode
# (-std=c11), but do expose the function in standard C++ mode (-std=c++11).
check_cxx_symbol_exists(fdatasync "unistd.h" HAVE_FDATASYNC)
check_cxx_symbol_exists(F_FULLFSYNC "fcntl.h" HAVE_FULLFSYNC)
check_cxx_symbol_exists(O_CLOEXEC "fcntl.h" HAVE_O_CLOEXEC)
if(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
# Disable C++ exceptions.
string(REGEX REPLACE "/EH[a-z]+" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /EHs-c-")
add_definitions(-D_HAS_EXCEPTIONS=0)
# Disable RTTI.
string(REGEX REPLACE "/GR" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /GR-")
else(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
# Enable strict prototype warnings for C code in clang and gcc.
if(NOT CMAKE_C_FLAGS MATCHES "-Wstrict-prototypes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wstrict-prototypes")
endif(NOT CMAKE_C_FLAGS MATCHES "-Wstrict-prototypes")
# Disable C++ exceptions.
string(REGEX REPLACE "-fexceptions" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-exceptions")
# Disable RTTI.
string(REGEX REPLACE "-frtti" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-rtti")
endif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
# Test whether -Wthread-safety is available. See
# https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
include(CheckCXXCompilerFlag)
check_cxx_compiler_flag(-Wthread-safety HAVE_CLANG_THREAD_SAFETY)
# Used by googletest.
check_cxx_compiler_flag(-Wno-missing-field-initializers
LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS)
include(CheckCXXSourceCompiles)
# Test whether C++17 __has_include is available.
check_cxx_source_compiles("
#if defined(__has_include) && __has_include(<string>)
#include <string>
#endif
int main() { std::string str; return 0; }
" HAVE_CXX17_HAS_INCLUDE)
set(LEVELDB_PUBLIC_INCLUDE_DIR "include/leveldb")
set(LEVELDB_PORT_CONFIG_DIR "include/port")
configure_file(
"port/port_config.h.in"
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
)
include_directories(
"${PROJECT_BINARY_DIR}/include"
"."
)
if(BUILD_SHARED_LIBS)
# Only export LEVELDB_EXPORT symbols from the shared library.
add_compile_options(-fvisibility=hidden)
endif(BUILD_SHARED_LIBS)
# Must be included before CMAKE_INSTALL_INCLUDEDIR is used.
include(GNUInstallDirs)
add_library(leveldb "")
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"db/builder.cc"
"db/builder.h"
"db/c.cc"
"db/db_impl.cc"
"db/db_impl.h"
"db/db_iter.cc"
"db/db_iter.h"
"db/dbformat.cc"
"db/dbformat.h"
"db/dumpfile.cc"
"db/filename.cc"
"db/filename.h"
"db/log_format.h"
"db/log_reader.cc"
"db/log_reader.h"
"db/log_writer.cc"
"db/log_writer.h"
"db/memtable.cc"
"db/memtable.h"
"db/repair.cc"
"db/skiplist.h"
"db/snapshot.h"
"db/table_cache.cc"
"db/table_cache.h"
"db/version_edit.cc"
"db/version_edit.h"
"db/version_set.cc"
"db/version_set.h"
"db/write_batch_internal.h"
"db/write_batch.cc"
"port/port_stdcxx.h"
"port/port.h"
"port/thread_annotations.h"
"table/block_builder.cc"
"table/block_builder.h"
"table/block.cc"
"table/block.h"
"table/filter_block.cc"
"table/filter_block.h"
"table/format.cc"
"table/format.h"
"table/iterator_wrapper.h"
"table/iterator.cc"
"table/merger.cc"
"table/merger.h"
"table/table_builder.cc"
"table/table.cc"
"table/two_level_iterator.cc"
"table/two_level_iterator.h"
"util/arena.cc"
"util/arena.h"
"util/bloom.cc"
"util/cache.cc"
"util/coding.cc"
"util/coding.h"
"util/comparator.cc"
"util/crc32c.cc"
"util/crc32c.h"
"util/env.cc"
"util/filter_policy.cc"
"util/hash.cc"
"util/hash.h"
"util/logging.cc"
"util/logging.h"
"util/mutexlock.h"
"util/no_destructor.h"
"util/options.cc"
"util/random.h"
"util/status.cc"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
"${LEVELDB_PUBLIC_INCLUDE_DIR}/c.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/cache.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/comparator.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/db.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/dumpfile.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/env.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/export.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/filter_policy.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/iterator.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/options.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/slice.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/status.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
)
if (WIN32)
target_sources(leveldb
PRIVATE
"util/env_windows.cc"
"util/windows_logger.h"
)
else (WIN32)
target_sources(leveldb
PRIVATE
"util/env_posix.cc"
"util/posix_logger.h"
)
endif (WIN32)
# MemEnv is not part of the interface and could be pulled to a separate library.
target_sources(leveldb
PRIVATE
"helpers/memenv/memenv.cc"
"helpers/memenv/memenv.h"
)
target_include_directories(leveldb
PUBLIC
$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
)
set_target_properties(leveldb
PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR})
target_compile_definitions(leveldb
PRIVATE
# Used by include/export.h when building shared libraries.
LEVELDB_COMPILE_LIBRARY
# Used by port/port.h.
${LEVELDB_PLATFORM_NAME}=1
)
if (NOT HAVE_CXX17_HAS_INCLUDE)
target_compile_definitions(leveldb
PRIVATE
LEVELDB_HAS_PORT_CONFIG_H=1
)
endif(NOT HAVE_CXX17_HAS_INCLUDE)
if(BUILD_SHARED_LIBS)
target_compile_definitions(leveldb
PUBLIC
# Used by include/export.h.
LEVELDB_SHARED_LIBRARY
)
endif(BUILD_SHARED_LIBS)
if(HAVE_CLANG_THREAD_SAFETY)
target_compile_options(leveldb
PUBLIC
-Werror -Wthread-safety)
endif(HAVE_CLANG_THREAD_SAFETY)
if(HAVE_CRC32C)
target_link_libraries(leveldb crc32c)
endif(HAVE_CRC32C)
if(HAVE_SNAPPY)
target_link_libraries(leveldb snappy)
endif(HAVE_SNAPPY)
if(HAVE_ZSTD)
target_link_libraries(leveldb zstd)
endif(HAVE_ZSTD)
if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)
# Needed by port_stdcxx.h
find_package(Threads REQUIRED)
target_link_libraries(leveldb Threads::Threads)
add_executable(leveldbutil
"db/leveldbutil.cc"
)
target_link_libraries(leveldbutil leveldb)
if(LEVELDB_BUILD_TESTS)
enable_testing()
# Prevent overriding the parent project's compiler/linker settings on Windows.
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
set(install_gtest OFF)
set(install_gmock OFF)
set(build_gmock ON)
# This project is tested using GoogleTest.
add_subdirectory("third_party/googletest")
# GoogleTest triggers a missing field initializers warning.
if(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS)
set_property(TARGET gtest
APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers)
set_property(TARGET gmock
APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers)
endif(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS)
add_executable(leveldb_tests "")
target_sources(leveldb_tests
PRIVATE
# "db/fault_injection_test.cc"
# "issues/issue178_test.cc"
# "issues/issue200_test.cc"
# "issues/issue320_test.cc"
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
# "util/env_test.cc"
"util/status_test.cc"
"util/no_destructor_test.cc"
"util/testutil.cc"
"util/testutil.h"
)
if(NOT BUILD_SHARED_LIBS)
target_sources(leveldb_tests
PRIVATE
"db/autocompact_test.cc"
"db/corruption_test.cc"
"db/db_test.cc"
"db/dbformat_test.cc"
"db/filename_test.cc"
"db/log_test.cc"
"db/recovery_test.cc"
"db/skiplist_test.cc"
"db/version_edit_test.cc"
"db/version_set_test.cc"
"db/write_batch_test.cc"
"helpers/memenv/memenv_test.cc"
"table/filter_block_test.cc"
"table/table_test.cc"
"util/arena_test.cc"
"util/bloom_test.cc"
"util/cache_test.cc"
"util/coding_test.cc"
"util/crc32c_test.cc"
"util/hash_test.cc"
"util/logging_test.cc"
)
endif(NOT BUILD_SHARED_LIBS)
target_link_libraries(leveldb_tests leveldb gmock gtest gtest_main)
target_compile_definitions(leveldb_tests
PRIVATE
${LEVELDB_PLATFORM_NAME}=1
)
if (NOT HAVE_CXX17_HAS_INCLUDE)
target_compile_definitions(leveldb_tests
PRIVATE
LEVELDB_HAS_PORT_CONFIG_H=1
)
endif(NOT HAVE_CXX17_HAS_INCLUDE)
add_test(NAME "leveldb_tests" COMMAND "leveldb_tests")
function(leveldb_test test_file)
get_filename_component(test_target_name "${test_file}" NAME_WE)
add_executable("${test_target_name}" "")
target_sources("${test_target_name}"
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"util/testutil.cc"
"util/testutil.h"
"${test_file}"
)
target_link_libraries("${test_target_name}" leveldb gmock gtest)
target_compile_definitions("${test_target_name}"
PRIVATE
${LEVELDB_PLATFORM_NAME}=1
)
if (NOT HAVE_CXX17_HAS_INCLUDE)
target_compile_definitions("${test_target_name}"
PRIVATE
LEVELDB_HAS_PORT_CONFIG_H=1
)
endif(NOT HAVE_CXX17_HAS_INCLUDE)
add_test(NAME "${test_target_name}" COMMAND "${test_target_name}")
endfunction(leveldb_test)
leveldb_test("db/c_test.c")
if(NOT BUILD_SHARED_LIBS)
# TODO(costan): This test also uses
# "util/env_{posix|windows}_test_helper.h"
if (WIN32)
leveldb_test("util/env_windows_test.cc")
else (WIN32)
leveldb_test("util/env_posix_test.cc")
endif (WIN32)
endif(NOT BUILD_SHARED_LIBS)
endif(LEVELDB_BUILD_TESTS)
if(LEVELDB_BUILD_BENCHMARKS)
# This project uses Google benchmark for benchmarking.
set(BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "" FORCE)
set(BENCHMARK_ENABLE_EXCEPTIONS OFF CACHE BOOL "" FORCE)
add_subdirectory("third_party/benchmark")
function(leveldb_benchmark bench_file)
get_filename_component(bench_target_name "${bench_file}" NAME_WE)
add_executable("${bench_target_name}" "")
target_sources("${bench_target_name}"
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"util/histogram.cc"
"util/histogram.h"
"util/testutil.cc"
"util/testutil.h"
"${bench_file}"
)
target_link_libraries("${bench_target_name}" leveldb gmock gtest benchmark)
target_compile_definitions("${bench_target_name}"
PRIVATE
${LEVELDB_PLATFORM_NAME}=1
)
if (NOT HAVE_CXX17_HAS_INCLUDE)
target_compile_definitions("${bench_target_name}"
PRIVATE
LEVELDB_HAS_PORT_CONFIG_H=1
)
endif(NOT HAVE_CXX17_HAS_INCLUDE)
endfunction(leveldb_benchmark)
if(NOT BUILD_SHARED_LIBS)
leveldb_benchmark("benchmarks/db_bench.cc")
endif(NOT BUILD_SHARED_LIBS)
check_library_exists(sqlite3 sqlite3_open "" HAVE_SQLITE3)
if(HAVE_SQLITE3)
leveldb_benchmark("benchmarks/db_bench_sqlite3.cc")
target_link_libraries(db_bench_sqlite3 sqlite3)
endif(HAVE_SQLITE3)
# check_library_exists is insufficient here because the library names have
# different manglings when compiled with clang or gcc, at least when installed
# with Homebrew on Mac.
set(OLD_CMAKE_REQURED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES})
list(APPEND CMAKE_REQUIRED_LIBRARIES kyotocabinet)
check_cxx_source_compiles("
#include <kcpolydb.h>
int main() {
kyotocabinet::TreeDB* db = new kyotocabinet::TreeDB();
delete db;
return 0;
}
" HAVE_KYOTOCABINET)
set(CMAKE_REQUIRED_LIBRARIES ${OLD_CMAKE_REQURED_LIBRARIES})
if(HAVE_KYOTOCABINET)
leveldb_benchmark("benchmarks/db_bench_tree_db.cc")
target_link_libraries(db_bench_tree_db kyotocabinet)
endif(HAVE_KYOTOCABINET)
endif(LEVELDB_BUILD_BENCHMARKS)
if(LEVELDB_INSTALL)
install(TARGETS leveldb
EXPORT leveldbTargets
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
install(
FILES
"${LEVELDB_PUBLIC_INCLUDE_DIR}/c.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/cache.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/comparator.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/db.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/dumpfile.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/env.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/export.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/filter_policy.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/iterator.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/options.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/slice.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/status.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb"
)
include(CMakePackageConfigHelpers)
configure_package_config_file(
"cmake/${PROJECT_NAME}Config.cmake.in"
"${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}Config.cmake"
INSTALL_DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}"
)
write_basic_package_version_file(
"${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}ConfigVersion.cmake"
COMPATIBILITY SameMajorVersion
)
install(
EXPORT leveldbTargets
NAMESPACE leveldb::
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}"
)
install(
FILES
"${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}Config.cmake"
"${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}ConfigVersion.cmake"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}"
)
endif(LEVELDB_INSTALL)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc"
)
target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(field_test
"${PROJECT_SOURCE_DIR}/test/field_test.cc"
)
target_link_libraries(field_test PRIVATE leveldb gtest)
# add_executable(kv_seperate_test
# "${PROJECT_SOURCE_DIR}/test/kv_seperate_test.cc"
# )
# target_link_libraries(kv_seperate_test PRIVATE leveldb gtest)

+ 63
- 26
table/blob_file.cc View File

@ -1,26 +1,63 @@
#include "blob_file.h"
#include <fstream>
namespace leveldb {
BlobFile::BlobFile(const std::string& filename) : filename_(filename) {
// 初始化 BlobFile,例如打开文件
}
BlobFile::~BlobFile() {
// 关闭文件
}
Status BlobFile::Put(const Slice& key, const Slice& value) {
std::ofstream file(filename_, std::ios::app | std::ios::binary);
if (!file.is_open()) {
return Status::IOError("Failed to open blob file");
}
// 简单实现,将 key 和 value 写入文件
file.write(key.data(), key.size());
file.write(value.data(), value.size());
file.close();
return Status::OK();
}
} // namespace leveldb
#include "table/blob_file.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include <cassert>
namespace leveldb {
namespace blob {
BlobFile::BlobFile(WritableFile* dest) : dest_(dest), head_(0) {}
BlobFile::BlobFile(WritableFile* dest, uint64_t dest_length)
: dest_(dest), head_(dest_length) {}
BlobFile::~BlobFile() = default;
Status BlobFile::AddRecord(const Slice& key, const Slice& value, uint64_t& offset) {
// 动态写入记录,返回写入的偏移量
return EmitDynamicRecord(key, value, offset);
}
Status BlobFile::EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset) {
// 记录头部,包括 key 和 value 的长度
char header[8]; // 4 字节 key 长度 + 4 字节 value 长度
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t value_size = static_cast<uint32_t>(value.size());
// 编码 key 和 value 长度
EncodeFixed32(header, key_size);
EncodeFixed32(header + 4, value_size);
// 写入头部
Status s = dest_->Append(Slice(header, sizeof(header)));
if (!s.ok()) {
return s;
}
// 写入 key 和 value 数据
s = dest_->Append(key);
if (!s.ok()) {
return s;
}
s = dest_->Append(value);
if (!s.ok()) {
return s;
}
// 刷新文件到磁盘
s = dest_->Flush();
if (!s.ok()) {
return s;
}
// 更新偏移量
offset = head_;
head_ += sizeof(header) + key_size + value_size;
return Status::OK();
}
} // namespace blob
} // namespace leveldb

+ 34
- 25
table/blob_file.h View File

@ -1,25 +1,34 @@
#ifndef LEVELDB_BLOB_FILE_H_
#define LEVELDB_BLOB_FILE_H_
#include <string>
#include "leveldb/status.h"
#include "leveldb/slice.h"
namespace leveldb {
class BlobFile {
public:
BlobFile(const std::string& filename);
~BlobFile();
//
Status Put(const Slice& key, const Slice& value);
private:
std::string filename_;
//
};
} // namespace leveldb
#endif // LEVELDB_BLOB_FILE_H_
#ifndef LEVELDB_BLOB_FILE_H_
#define LEVELDB_BLOB_FILE_H_
#include <string>
#include "leveldb/status.h"
#include "leveldb/slice.h"
#include "leveldb/env.h"
namespace leveldb {
namespace blob {
class BlobFile {
public:
explicit BlobFile(WritableFile* dest);
BlobFile(WritableFile* dest, uint64_t dest_length);
~BlobFile();
// 添加一条记录,记录写入的偏移量
Status AddRecord(const Slice& key, const Slice& value, uint64_t& offset);
private:
WritableFile* dest_; // 用于写入数据的目标文件
uint64_t head_; // 当前写入位置的偏移量
uint64_t bfid_; // 用于标识 BlobFile 的唯一 ID
// uint64_t head_; // 当前写入文件的偏移量
Status EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset);
};
} // namespace blob
} // namespace leveldb
#endif // LEVELDB_BLOB_FILE_H_

+ 1616
- 0
table/db_impl.cc
File diff suppressed because it is too large
View File


+ 226
- 0
table/db_impl.h View File

@ -0,0 +1,226 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <atomic>
#include <deque>
#include <set>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
namespace leveldb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class DBImpl : public DB {
public:
DBImpl(const Options& options, const std::string& dbname);
DBImpl(const DBImpl&) = delete;
DBImpl& operator=(const DBImpl&) = delete;
~DBImpl() override;
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions&, const Slice& key,
const Slice& value, uint64_t ttl) override; //实现新的put接口,心
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Iterator* NewIterator(const ReadOptions&) override;
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
// 朴,添加是否kv分离接口,12.07
bool static key_value_separated_;
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
Iterator* TEST_NewInternalIterator();
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes();
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
private:
friend class DB;
struct CompactionState;
struct Writer;
// Information for a manual compaction
struct ManualCompaction {
int level;
bool done;
const InternalKey* begin; // null means beginning of key range
const InternalKey* end; // null means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
CompactionStats() : micros(0), bytes_read(0), bytes_written(0) {}
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_read += c.bytes_read;
this->bytes_written += c.bytes_written;
}
int64_t micros;
int64_t bytes_read;
int64_t bytes_written;
};
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot,
uint32_t* seed);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit, bool* save_manifest)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries.
void RemoveObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
// Errors are recorded in bg_error_.
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
VersionEdit* edit, SequenceNumber* max_sequence)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
WriteBatch* BuildBatchGroup(Writer** last_writer)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
static void BGWork(void* db);
void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void CleanupCompaction(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status DoCompactionWork(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
// Constant after construction
Env* const env_;
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
const Options options_; // options_.comparator == &internal_comparator_
const bool owns_info_log_;
const bool owns_cache_;
const std::string dbname_;
// table_cache_ provides its own synchronization
TableCache* const table_cache_;
// Lock over the persistent DB state. Non-null iff successfully acquired.
FileLock* db_lock_;
// State below is protected by mutex_
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
SnapshotList snapshots_ GUARDED_BY(mutex_);
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_);
// Has a background compaction been scheduled or is running?
bool background_compaction_scheduled_ GUARDED_BY(mutex_);
ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
VersionSet* const versions_ GUARDED_BY(mutex_);
// Have we encountered a background error in paranoid mode?
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
};
// Sanitize db options. The caller should delete result.info_log if
// it is not equal to src.info_log.
Options SanitizeOptions(const std::string& db,
const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const Options& src);
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_DB_IMPL_H_

+ 119
- 0
table/kv_seperate_test.cc View File

@ -0,0 +1,119 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "table/blob_file.h" // 假设 BlobFile 的头文件
using namespace leveldb;
constexpr int value_size = 2048; // 单个值的大小
constexpr int data_size = 128 << 20; // 总数据大小
constexpr int min_blob_size = 1024; // KV 分离的阈值
Status OpenDB(std::string dbName, DB** db) {
Options options;
options.create_if_missing = true;
options.key_value_separated = true; // 启用 KV 分离
return DB::Open(options, dbName, db);
}
// 插入数据,模拟 KV 分离
void InsertData(DB* db) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num + 1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a'); // 大 value
db->Put(writeOptions, key, value); // 使用标准 Put 接口插入
}
}
// 检查数据是否被正确存入 BlobFile
void VerifyBlobFile(const std::string& blob_file_path, int expected_entries) {
BlobFile blobfile(blob_file_path, BlobFile::kReadMode);
Status status = blobfile.Open();
ASSERT_TRUE(status.ok());
int entry_count = 0;
BlobFile::Iterator it = blobfile.NewIterator();
for (it.SeekToFirst(); it.Valid(); it.Next()) {
++entry_count;
const Slice& key = it.key();
const Slice& value = it.value();
ASSERT_GT(value.size(), min_blob_size); // 确认 value 大于阈值
}
ASSERT_EQ(entry_count, expected_entries); // 确认条目数是否正确
blobfile.Close();
}
// KV 分离读写测试
TEST(TestKVSeparation, WriteAndRead) {
DB* db;
if (OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// 插入数据
InsertData(db);
// 验证 BlobFile 内容
VerifyBlobFile("blob_data", data_size / value_size);
// 随机点查数据
ReadOptions readOptions;
srand(static_cast<unsigned int>(time(0)));
int key_num = data_size / value_size;
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num + 1;
std::string key = std::to_string(key_);
std::string value;
Status status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok()); // 验证是否成功读取
if (value.size() > min_blob_size) {
ASSERT_TRUE(value == std::string(value_size, 'a')); // 验证大 value 的内容
}
}
delete db;
}
// KV 分离压缩测试
TEST(TestKVSeparation, Compaction) {
DB* db;
if (OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// 插入数据
InsertData(db);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
// 执行压缩
db->CompactRange(nullptr, nullptr);
// 验证压缩后主数据区的大小
ranges[0] = leveldb::Range("-", "A");
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
// 验证 BlobFile 内容仍然有效
VerifyBlobFile("blob_data", data_size / value_size);
delete db;
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 345
- 323
table/table_builder.cc View File

@ -1,323 +1,345 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table_builder.h"
#include <cassert>
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "db/db_impl.h" //朴
#include "table/blob_file.h" //朴
#include "table/block.h" //朴
const size_t min_blob_size = 1024; // 设定值大小阈值为 1KB,朴
namespace leveldb {
BlobFile* blobfile = new BlobFile("blob_data"); // 初始化全局 blobfile 对象,朴
class BlobFileManager {
public:
static BlobFile* GetInstance() {
static BlobFile instance("blob_data");
return &instance;
}
};
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
Options options;
Options index_block_options;
WritableFile* file;
uint64_t offset;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::string last_key;
int64_t num_entries;
bool closed; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
};
TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
}
TableBuilder::~TableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
Status TableBuilder::ChangeOptions(const Options& options) {
// Note: if more fields are added to Options, update
// this function to catch changes that should not be allowed to
// change in the middle of building a Table.
if (options.comparator != rep_->options.comparator) {
return Status::InvalidArgument("changing comparator while building table");
}
// Note that any live BlockBuilders point to rep_->options and therefore
// will automatically pick up the updated options.
rep_->options = options;
rep_->index_block_options = options;
rep_->index_block_options.block_restart_interval = 1;
return Status::OK();
}
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return; //朴,正常判断
assert(!r->pending_index_entry);
if (DBImpl::key_value_separated_) {
// 这里获取数据块内容并初始化 Block 对象,朴
Slice block_content = r->data_block.Finish();
BlockContents contents;
contents.data = block_content;
contents.heap_allocated = false;
contents.cachable = false;
// 初始化 Block
Block data_block(contents);
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
// 遍历数据块中的键值对
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
// 检查值是否大于阈值
if (value.size() > min_blob_size) {
// 将值存储到 blobfile 中
Status status = blobfile->Put(key, value);
if (!status.ok()) {
r->status = status;
}
}
}
}
WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush(); //刷新
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
Status TableBuilder::status() const { return rep_->status; }
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
void TableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
}
uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
uint64_t TableBuilder::FileSize() const { return rep_->offset; }
} // namespace leveldb
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table_builder.h"
#include <cassert>
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "db/db_impl.h" //朴
#include "table/blob_file.h" //朴
#include "table/block.h" //朴
const size_t min_blob_size = 1024; // 设定值大小阈值为 1KB,朴
namespace leveldb {
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
Options options;
Options index_block_options;
WritableFile* file;
uint64_t offset;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::string last_key;
int64_t num_entries;
bool closed; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
};
TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
}
TableBuilder::~TableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
Status TableBuilder::ChangeOptions(const Options& options) {
// Note: if more fields are added to Options, update
// this function to catch changes that should not be allowed to
// change in the middle of building a Table.
if (options.comparator != rep_->options.comparator) {
return Status::InvalidArgument("changing comparator while building table");
}
// Note that any live BlockBuilders point to rep_->options and therefore
// will automatically pick up the updated options.
rep_->options = options;
rep_->index_block_options = options;
rep_->index_block_options.block_restart_interval = 1;
return Status::OK();
}
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return; //朴,正常判断
assert(!r->pending_index_entry);
if (DBImpl::key_value_separated_) {
// 这里获取数据块内容并初始化 Block 对象,朴
Slice block_content = r->data_block.Finish();
BlockContents contents;
contents.data = block_content;
contents.heap_allocated = false;
contents.cachable = false;
Rep* new_rep = new Rep(r->options, r->file); // 创建一个新的 Rep 实例
new_rep->offset = r->offset; // 新的 offset 初始化为当前的 offset
new_rep->num_entries = r->num_entries;
// 初始化 Block
Block data_block(contents);
leveldb::WritableFile* dest = nullptr;
leveldb::blob::BlobFile blobfile(dest); // 可以动态生成文件名以防止重复 // 初始化 BlobFile 对象,朴
leveldb::WritableFile* file;
int bfid = DBImpl::NewBlobNum(); // 生成唯一的 blobfile id
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
// 遍历数据块中的键值对
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
// 检查值是否大于阈值
if (value.size() > min_blob_size) {
// 将值存储到 blobfile 中
uint64_t offset; // 局部变量存储偏移量
Status status = blobfile.AddRecord(key, value, offset);
if (!status.ok()) {
r->status = status;
}
// 这里修改 value,存储 Blob 的 offset 和 bfid
std::string new_value = EncodeBlobValue(offset, bfid);
new_rep->data_block.Add(key, Slice(new_value));
}
else{
// 不需要 Blob 存储,直接处理普通值
new_rep->data_block.Add(key, value);
}
}
}
WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush(); //刷新
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}
std::string TableBuilder::EncodeBlobValue(uint64_t offset, int bfid) {
// 自定义方法:编码新的 Blob 值
std::string result;
// 为 result 分配空间
result.resize(8 + 4); // 64位 (8字节) + 32位 (4字节)
// 将 offset 和 bfid 编码成一个新的值
std::string result;
EncodeFixed64(&result[0], offset); // 编码 offset
EncodeFixed32(&result[8], bfid); // 编码 bfid
return result;
}
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
Status TableBuilder::status() const { return rep_->status; }
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
void TableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
}
uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
uint64_t TableBuilder::FileSize() const { return rep_->offset; }
} // namespace leveldb

Loading…
Cancel
Save