From 49f2c85cb7ba10b139c6272f9b5ce1a27c16b7b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B4=E7=A5=89=E7=87=95?= <10224602413@stu.ecnu.edu.cn> Date: Wed, 11 Dec 2024 08:23:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90kv=E5=88=86=E7=A6=BB=E5=86=99?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- table/CMakeLists.txt | 541 +++++++++++++++ table/blob_file.cc | 89 ++- table/blob_file.h | 59 +- table/db_impl.cc | 1616 +++++++++++++++++++++++++++++++++++++++++++++ table/db_impl.h | 226 +++++++ table/kv_seperate_test.cc | 119 ++++ table/table_builder.cc | 668 ++++++++++--------- 7 files changed, 2944 insertions(+), 374 deletions(-) create mode 100644 table/CMakeLists.txt create mode 100644 table/db_impl.cc create mode 100644 table/db_impl.h create mode 100644 table/kv_seperate_test.cc diff --git a/table/CMakeLists.txt b/table/CMakeLists.txt new file mode 100644 index 0000000..edab122 --- /dev/null +++ b/table/CMakeLists.txt @@ -0,0 +1,541 @@ +# Copyright 2017 The LevelDB Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. See the AUTHORS file for names of contributors. + +cmake_minimum_required(VERSION 3.9) +# Keep the version below in sync with the one in db.h +project(leveldb VERSION 1.23.0 LANGUAGES C CXX) + +# C standard can be overridden when this is used as a sub-project. +if(NOT CMAKE_C_STANDARD) + # This project can use C11, but will gracefully decay down to C89. + set(CMAKE_C_STANDARD 11) + set(CMAKE_C_STANDARD_REQUIRED OFF) + set(CMAKE_C_EXTENSIONS OFF) +endif(NOT CMAKE_C_STANDARD) + +# C++ standard can be overridden when this is used as a sub-project. +if(NOT CMAKE_CXX_STANDARD) + # This project requires C++11. + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) + set(CMAKE_CXX_EXTENSIONS OFF) +endif(NOT CMAKE_CXX_STANDARD) + +if (WIN32) + set(LEVELDB_PLATFORM_NAME LEVELDB_PLATFORM_WINDOWS) + # TODO(cmumford): Make UNICODE configurable for Windows. + add_definitions(-D_UNICODE -DUNICODE) +else (WIN32) + set(LEVELDB_PLATFORM_NAME LEVELDB_PLATFORM_POSIX) +endif (WIN32) + +option(LEVELDB_BUILD_TESTS "Build LevelDB's unit tests" ON) +option(LEVELDB_BUILD_BENCHMARKS "Build LevelDB's benchmarks" ON) +option(LEVELDB_INSTALL "Install LevelDB's header and library" ON) + +include(CheckIncludeFile) +check_include_file("unistd.h" HAVE_UNISTD_H) + +include(CheckLibraryExists) +check_library_exists(crc32c crc32c_value "" HAVE_CRC32C) +check_library_exists(snappy snappy_compress "" HAVE_SNAPPY) +check_library_exists(zstd zstd_compress "" HAVE_ZSTD) +check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC) + +include(CheckCXXSymbolExists) +# Using check_cxx_symbol_exists() instead of check_c_symbol_exists() because +# we're including the header from C++, and feature detection should use the same +# compiler language that the project will use later. Principles aside, some +# versions of do not expose fdatasync() in in standard C mode +# (-std=c11), but do expose the function in standard C++ mode (-std=c++11). +check_cxx_symbol_exists(fdatasync "unistd.h" HAVE_FDATASYNC) +check_cxx_symbol_exists(F_FULLFSYNC "fcntl.h" HAVE_FULLFSYNC) +check_cxx_symbol_exists(O_CLOEXEC "fcntl.h" HAVE_O_CLOEXEC) + +if(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + # Disable C++ exceptions. + string(REGEX REPLACE "/EH[a-z]+" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /EHs-c-") + add_definitions(-D_HAS_EXCEPTIONS=0) + + # Disable RTTI. + string(REGEX REPLACE "/GR" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /GR-") +else(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + # Enable strict prototype warnings for C code in clang and gcc. + if(NOT CMAKE_C_FLAGS MATCHES "-Wstrict-prototypes") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wstrict-prototypes") + endif(NOT CMAKE_C_FLAGS MATCHES "-Wstrict-prototypes") + + # Disable C++ exceptions. + string(REGEX REPLACE "-fexceptions" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-exceptions") + + # Disable RTTI. + string(REGEX REPLACE "-frtti" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-rtti") +endif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + +# Test whether -Wthread-safety is available. See +# https://clang.llvm.org/docs/ThreadSafetyAnalysis.html +include(CheckCXXCompilerFlag) +check_cxx_compiler_flag(-Wthread-safety HAVE_CLANG_THREAD_SAFETY) + +# Used by googletest. +check_cxx_compiler_flag(-Wno-missing-field-initializers + LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS) + +include(CheckCXXSourceCompiles) + +# Test whether C++17 __has_include is available. +check_cxx_source_compiles(" +#if defined(__has_include) && __has_include() +#include +#endif +int main() { std::string str; return 0; } +" HAVE_CXX17_HAS_INCLUDE) + +set(LEVELDB_PUBLIC_INCLUDE_DIR "include/leveldb") +set(LEVELDB_PORT_CONFIG_DIR "include/port") + +configure_file( + "port/port_config.h.in" + "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" +) + +include_directories( + "${PROJECT_BINARY_DIR}/include" + "." +) + +if(BUILD_SHARED_LIBS) + # Only export LEVELDB_EXPORT symbols from the shared library. + add_compile_options(-fvisibility=hidden) +endif(BUILD_SHARED_LIBS) + +# Must be included before CMAKE_INSTALL_INCLUDEDIR is used. +include(GNUInstallDirs) + +add_library(leveldb "") +target_sources(leveldb + PRIVATE + "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" + "db/builder.cc" + "db/builder.h" + "db/c.cc" + "db/db_impl.cc" + "db/db_impl.h" + "db/db_iter.cc" + "db/db_iter.h" + "db/dbformat.cc" + "db/dbformat.h" + "db/dumpfile.cc" + "db/filename.cc" + "db/filename.h" + "db/log_format.h" + "db/log_reader.cc" + "db/log_reader.h" + "db/log_writer.cc" + "db/log_writer.h" + "db/memtable.cc" + "db/memtable.h" + "db/repair.cc" + "db/skiplist.h" + "db/snapshot.h" + "db/table_cache.cc" + "db/table_cache.h" + "db/version_edit.cc" + "db/version_edit.h" + "db/version_set.cc" + "db/version_set.h" + "db/write_batch_internal.h" + "db/write_batch.cc" + "port/port_stdcxx.h" + "port/port.h" + "port/thread_annotations.h" + "table/block_builder.cc" + "table/block_builder.h" + "table/block.cc" + "table/block.h" + "table/filter_block.cc" + "table/filter_block.h" + "table/format.cc" + "table/format.h" + "table/iterator_wrapper.h" + "table/iterator.cc" + "table/merger.cc" + "table/merger.h" + "table/table_builder.cc" + "table/table.cc" + "table/two_level_iterator.cc" + "table/two_level_iterator.h" + "util/arena.cc" + "util/arena.h" + "util/bloom.cc" + "util/cache.cc" + "util/coding.cc" + "util/coding.h" + "util/comparator.cc" + "util/crc32c.cc" + "util/crc32c.h" + "util/env.cc" + "util/filter_policy.cc" + "util/hash.cc" + "util/hash.h" + "util/logging.cc" + "util/logging.h" + "util/mutexlock.h" + "util/no_destructor.h" + "util/options.cc" + "util/random.h" + "util/status.cc" + + # Only CMake 3.3+ supports PUBLIC sources in targets exported by "install". + $<$:PUBLIC> + "${LEVELDB_PUBLIC_INCLUDE_DIR}/c.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/cache.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/comparator.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/db.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/dumpfile.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/env.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/export.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/filter_policy.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/iterator.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/options.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/slice.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/status.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" +) + +if (WIN32) + target_sources(leveldb + PRIVATE + "util/env_windows.cc" + "util/windows_logger.h" + ) +else (WIN32) + target_sources(leveldb + PRIVATE + "util/env_posix.cc" + "util/posix_logger.h" + ) +endif (WIN32) + +# MemEnv is not part of the interface and could be pulled to a separate library. +target_sources(leveldb + PRIVATE + "helpers/memenv/memenv.cc" + "helpers/memenv/memenv.h" +) + +target_include_directories(leveldb + PUBLIC + $ + $ +) + +set_target_properties(leveldb + PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR}) + +target_compile_definitions(leveldb + PRIVATE + # Used by include/export.h when building shared libraries. + LEVELDB_COMPILE_LIBRARY + # Used by port/port.h. + ${LEVELDB_PLATFORM_NAME}=1 +) +if (NOT HAVE_CXX17_HAS_INCLUDE) + target_compile_definitions(leveldb + PRIVATE + LEVELDB_HAS_PORT_CONFIG_H=1 + ) +endif(NOT HAVE_CXX17_HAS_INCLUDE) + +if(BUILD_SHARED_LIBS) + target_compile_definitions(leveldb + PUBLIC + # Used by include/export.h. + LEVELDB_SHARED_LIBRARY + ) +endif(BUILD_SHARED_LIBS) + +if(HAVE_CLANG_THREAD_SAFETY) + target_compile_options(leveldb + PUBLIC + -Werror -Wthread-safety) +endif(HAVE_CLANG_THREAD_SAFETY) + +if(HAVE_CRC32C) + target_link_libraries(leveldb crc32c) +endif(HAVE_CRC32C) +if(HAVE_SNAPPY) + target_link_libraries(leveldb snappy) +endif(HAVE_SNAPPY) +if(HAVE_ZSTD) + target_link_libraries(leveldb zstd) +endif(HAVE_ZSTD) +if(HAVE_TCMALLOC) + target_link_libraries(leveldb tcmalloc) +endif(HAVE_TCMALLOC) + +# Needed by port_stdcxx.h +find_package(Threads REQUIRED) +target_link_libraries(leveldb Threads::Threads) + +add_executable(leveldbutil + "db/leveldbutil.cc" +) +target_link_libraries(leveldbutil leveldb) + +if(LEVELDB_BUILD_TESTS) + enable_testing() + + # Prevent overriding the parent project's compiler/linker settings on Windows. + set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + set(install_gtest OFF) + set(install_gmock OFF) + set(build_gmock ON) + + # This project is tested using GoogleTest. + add_subdirectory("third_party/googletest") + + # GoogleTest triggers a missing field initializers warning. + if(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS) + set_property(TARGET gtest + APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers) + set_property(TARGET gmock + APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers) + endif(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS) + + add_executable(leveldb_tests "") + target_sources(leveldb_tests + PRIVATE + # "db/fault_injection_test.cc" + # "issues/issue178_test.cc" + # "issues/issue200_test.cc" + # "issues/issue320_test.cc" + "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" + # "util/env_test.cc" + "util/status_test.cc" + "util/no_destructor_test.cc" + "util/testutil.cc" + "util/testutil.h" + ) + if(NOT BUILD_SHARED_LIBS) + target_sources(leveldb_tests + PRIVATE + "db/autocompact_test.cc" + "db/corruption_test.cc" + "db/db_test.cc" + "db/dbformat_test.cc" + "db/filename_test.cc" + "db/log_test.cc" + "db/recovery_test.cc" + "db/skiplist_test.cc" + "db/version_edit_test.cc" + "db/version_set_test.cc" + "db/write_batch_test.cc" + "helpers/memenv/memenv_test.cc" + "table/filter_block_test.cc" + "table/table_test.cc" + "util/arena_test.cc" + "util/bloom_test.cc" + "util/cache_test.cc" + "util/coding_test.cc" + "util/crc32c_test.cc" + "util/hash_test.cc" + "util/logging_test.cc" + ) + endif(NOT BUILD_SHARED_LIBS) + target_link_libraries(leveldb_tests leveldb gmock gtest gtest_main) + target_compile_definitions(leveldb_tests + PRIVATE + ${LEVELDB_PLATFORM_NAME}=1 + ) + if (NOT HAVE_CXX17_HAS_INCLUDE) + target_compile_definitions(leveldb_tests + PRIVATE + LEVELDB_HAS_PORT_CONFIG_H=1 + ) + endif(NOT HAVE_CXX17_HAS_INCLUDE) + + add_test(NAME "leveldb_tests" COMMAND "leveldb_tests") + + function(leveldb_test test_file) + get_filename_component(test_target_name "${test_file}" NAME_WE) + + add_executable("${test_target_name}" "") + target_sources("${test_target_name}" + PRIVATE + "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" + "util/testutil.cc" + "util/testutil.h" + + "${test_file}" + ) + target_link_libraries("${test_target_name}" leveldb gmock gtest) + target_compile_definitions("${test_target_name}" + PRIVATE + ${LEVELDB_PLATFORM_NAME}=1 + ) + if (NOT HAVE_CXX17_HAS_INCLUDE) + target_compile_definitions("${test_target_name}" + PRIVATE + LEVELDB_HAS_PORT_CONFIG_H=1 + ) + endif(NOT HAVE_CXX17_HAS_INCLUDE) + + add_test(NAME "${test_target_name}" COMMAND "${test_target_name}") + endfunction(leveldb_test) + + leveldb_test("db/c_test.c") + + if(NOT BUILD_SHARED_LIBS) + # TODO(costan): This test also uses + # "util/env_{posix|windows}_test_helper.h" + if (WIN32) + leveldb_test("util/env_windows_test.cc") + else (WIN32) + leveldb_test("util/env_posix_test.cc") + endif (WIN32) + endif(NOT BUILD_SHARED_LIBS) +endif(LEVELDB_BUILD_TESTS) + +if(LEVELDB_BUILD_BENCHMARKS) + # This project uses Google benchmark for benchmarking. + set(BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "" FORCE) + set(BENCHMARK_ENABLE_EXCEPTIONS OFF CACHE BOOL "" FORCE) + add_subdirectory("third_party/benchmark") + + function(leveldb_benchmark bench_file) + get_filename_component(bench_target_name "${bench_file}" NAME_WE) + + add_executable("${bench_target_name}" "") + target_sources("${bench_target_name}" + PRIVATE + "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" + "util/histogram.cc" + "util/histogram.h" + "util/testutil.cc" + "util/testutil.h" + + "${bench_file}" + ) + target_link_libraries("${bench_target_name}" leveldb gmock gtest benchmark) + target_compile_definitions("${bench_target_name}" + PRIVATE + ${LEVELDB_PLATFORM_NAME}=1 + ) + if (NOT HAVE_CXX17_HAS_INCLUDE) + target_compile_definitions("${bench_target_name}" + PRIVATE + LEVELDB_HAS_PORT_CONFIG_H=1 + ) + endif(NOT HAVE_CXX17_HAS_INCLUDE) + endfunction(leveldb_benchmark) + + if(NOT BUILD_SHARED_LIBS) + leveldb_benchmark("benchmarks/db_bench.cc") + endif(NOT BUILD_SHARED_LIBS) + + check_library_exists(sqlite3 sqlite3_open "" HAVE_SQLITE3) + if(HAVE_SQLITE3) + leveldb_benchmark("benchmarks/db_bench_sqlite3.cc") + target_link_libraries(db_bench_sqlite3 sqlite3) + endif(HAVE_SQLITE3) + + # check_library_exists is insufficient here because the library names have + # different manglings when compiled with clang or gcc, at least when installed + # with Homebrew on Mac. + set(OLD_CMAKE_REQURED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES}) + list(APPEND CMAKE_REQUIRED_LIBRARIES kyotocabinet) + check_cxx_source_compiles(" +#include + +int main() { + kyotocabinet::TreeDB* db = new kyotocabinet::TreeDB(); + delete db; + return 0; +} + " HAVE_KYOTOCABINET) + set(CMAKE_REQUIRED_LIBRARIES ${OLD_CMAKE_REQURED_LIBRARIES}) + if(HAVE_KYOTOCABINET) + leveldb_benchmark("benchmarks/db_bench_tree_db.cc") + target_link_libraries(db_bench_tree_db kyotocabinet) + endif(HAVE_KYOTOCABINET) +endif(LEVELDB_BUILD_BENCHMARKS) + +if(LEVELDB_INSTALL) + install(TARGETS leveldb + EXPORT leveldbTargets + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + ) + install( + FILES + "${LEVELDB_PUBLIC_INCLUDE_DIR}/c.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/cache.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/comparator.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/db.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/dumpfile.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/env.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/export.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/filter_policy.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/iterator.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/options.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/slice.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/status.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h" + "${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h" + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb" + ) + + include(CMakePackageConfigHelpers) + configure_package_config_file( + "cmake/${PROJECT_NAME}Config.cmake.in" + "${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}Config.cmake" + INSTALL_DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" + ) + write_basic_package_version_file( + "${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}ConfigVersion.cmake" + COMPATIBILITY SameMajorVersion + ) + install( + EXPORT leveldbTargets + NAMESPACE leveldb:: + DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" + ) + install( + FILES + "${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}Config.cmake" + "${PROJECT_BINARY_DIR}/cmake/${PROJECT_NAME}ConfigVersion.cmake" + DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" + ) +endif(LEVELDB_INSTALL) + + +add_executable(db_test2 + "${PROJECT_SOURCE_DIR}/test/db_test2.cc" +) +target_link_libraries(db_test2 PRIVATE leveldb) + + +add_executable(ttl_test + "${PROJECT_SOURCE_DIR}/test/ttl_test.cc" +) +target_link_libraries(ttl_test PRIVATE leveldb gtest) + +add_executable(field_test + "${PROJECT_SOURCE_DIR}/test/field_test.cc" +) +target_link_libraries(field_test PRIVATE leveldb gtest) + +# add_executable(kv_seperate_test +# "${PROJECT_SOURCE_DIR}/test/kv_seperate_test.cc" +# ) +# target_link_libraries(kv_seperate_test PRIVATE leveldb gtest) \ No newline at end of file diff --git a/table/blob_file.cc b/table/blob_file.cc index a23c92c..31d9be7 100644 --- a/table/blob_file.cc +++ b/table/blob_file.cc @@ -1,26 +1,63 @@ -#include "blob_file.h" -#include - -namespace leveldb { - -BlobFile::BlobFile(const std::string& filename) : filename_(filename) { - // 初始化 BlobFile,例如打开文件 -} - -BlobFile::~BlobFile() { - // 关闭文件 -} - -Status BlobFile::Put(const Slice& key, const Slice& value) { - std::ofstream file(filename_, std::ios::app | std::ios::binary); - if (!file.is_open()) { - return Status::IOError("Failed to open blob file"); - } - // 简单实现,将 key 和 value 写入文件 - file.write(key.data(), key.size()); - file.write(value.data(), value.size()); - file.close(); - return Status::OK(); -} - -} // namespace leveldb +#include "table/blob_file.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include + +namespace leveldb { +namespace blob { + +BlobFile::BlobFile(WritableFile* dest) : dest_(dest), head_(0) {} + +BlobFile::BlobFile(WritableFile* dest, uint64_t dest_length) + : dest_(dest), head_(dest_length) {} + +BlobFile::~BlobFile() = default; + +Status BlobFile::AddRecord(const Slice& key, const Slice& value, uint64_t& offset) { + // 动态写入记录,返回写入的偏移量 + return EmitDynamicRecord(key, value, offset); +} + +Status BlobFile::EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset) { + // 记录头部,包括 key 和 value 的长度 + char header[8]; // 4 字节 key 长度 + 4 字节 value 长度 + + uint32_t key_size = static_cast(key.size()); + uint32_t value_size = static_cast(value.size()); + + // 编码 key 和 value 长度 + EncodeFixed32(header, key_size); + EncodeFixed32(header + 4, value_size); + + // 写入头部 + Status s = dest_->Append(Slice(header, sizeof(header))); + if (!s.ok()) { + return s; + } + + // 写入 key 和 value 数据 + s = dest_->Append(key); + if (!s.ok()) { + return s; + } + + s = dest_->Append(value); + if (!s.ok()) { + return s; + } + + // 刷新文件到磁盘 + s = dest_->Flush(); + if (!s.ok()) { + return s; + } + + // 更新偏移量 + offset = head_; + head_ += sizeof(header) + key_size + value_size; + + return Status::OK(); +} + +} // namespace blob +} // namespace leveldb diff --git a/table/blob_file.h b/table/blob_file.h index 45fa3ef..75b8133 100644 --- a/table/blob_file.h +++ b/table/blob_file.h @@ -1,25 +1,34 @@ -#ifndef LEVELDB_BLOB_FILE_H_ -#define LEVELDB_BLOB_FILE_H_ - -#include -#include "leveldb/status.h" -#include "leveldb/slice.h" - -namespace leveldb { - -class BlobFile { -public: - BlobFile(const std::string& filename); - ~BlobFile(); - - // 写入键值对 - Status Put(const Slice& key, const Slice& value); - -private: - std::string filename_; - // 内部实现,例如文件指针或缓冲区 -}; - -} // namespace leveldb - -#endif // LEVELDB_BLOB_FILE_H_ +#ifndef LEVELDB_BLOB_FILE_H_ +#define LEVELDB_BLOB_FILE_H_ + +#include +#include "leveldb/status.h" +#include "leveldb/slice.h" +#include "leveldb/env.h" + +namespace leveldb { +namespace blob { + +class BlobFile { + public: + explicit BlobFile(WritableFile* dest); + BlobFile(WritableFile* dest, uint64_t dest_length); + + ~BlobFile(); + + // 添加一条记录,记录写入的偏移量 + Status AddRecord(const Slice& key, const Slice& value, uint64_t& offset); + + private: + WritableFile* dest_; // 用于写入数据的目标文件 + uint64_t head_; // 当前写入位置的偏移量 + uint64_t bfid_; // 用于标识 BlobFile 的唯一 ID + // uint64_t head_; // 当前写入文件的偏移量 + + Status EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset); +}; + +} // namespace blob +} // namespace leveldb + +#endif // LEVELDB_BLOB_FILE_H_ diff --git a/table/db_impl.cc b/table/db_impl.cc new file mode 100644 index 0000000..856c2a6 --- /dev/null +++ b/table/db_impl.cc @@ -0,0 +1,1616 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/builder.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/memtable.h" +#include "db/table_cache.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/status.h" +#include "leveldb/table.h" +#include "leveldb/table_builder.h" +#include "port/port.h" +#include "table/block.h" +#include "table/merger.h" +#include "table/two_level_iterator.h" +#include "util/coding.h" +#include "util/logging.h" +#include "util/mutexlock.h" + +namespace leveldb { + +const int kNumNonTableCacheFiles = 10; + +// Information kept for every waiting writer +struct DBImpl::Writer { + explicit Writer(port::Mutex* mu) + : batch(nullptr), sync(false), done(false), cv(mu) {} + + Status status; + WriteBatch* batch; + bool sync; + bool done; + port::CondVar cv; +}; + +struct DBImpl::CompactionState { + // Files produced by compaction + struct Output { + uint64_t number; + uint64_t file_size; + InternalKey smallest, largest; + }; + + Output* current_output() { return &outputs[outputs.size() - 1]; } + + explicit CompactionState(Compaction* c) + : compaction(c), + smallest_snapshot(0), + outfile(nullptr), + builder(nullptr), + total_bytes(0) {} + + Compaction* const compaction; + + // Sequence numbers < smallest_snapshot are not significant since we + // will never have to service a snapshot below smallest_snapshot. + // Therefore if we have seen a sequence number S <= smallest_snapshot, + // we can drop all entries for the same key with sequence numbers < S. + SequenceNumber smallest_snapshot; + + std::vector outputs; + + // State kept for output being generated + WritableFile* outfile; + TableBuilder* builder; + + uint64_t total_bytes; +}; + +// Fix user-supplied options to be reasonable +template +static void ClipToRange(T* ptr, V minvalue, V maxvalue) { + if (static_cast(*ptr) > maxvalue) *ptr = maxvalue; + if (static_cast(*ptr) < minvalue) *ptr = minvalue; +} +Options SanitizeOptions(const std::string& dbname, + const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, + const Options& src) { + Options result = src; + result.comparator = icmp; + result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; + ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); + ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30); + ClipToRange(&result.max_file_size, 1 << 20, 1 << 30); + ClipToRange(&result.block_size, 1 << 10, 4 << 20); + if (result.info_log == nullptr) { + // Open a log file in the same directory as the db + src.env->CreateDir(dbname); // In case it does not exist + src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); + Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); + if (!s.ok()) { + // No place suitable for logging + result.info_log = nullptr; + } + } + if (result.block_cache == nullptr) { + result.block_cache = NewLRUCache(8 << 20); + } + return result; +} + +static int TableCacheSize(const Options& sanitized_options) { + // Reserve ten files or so for other uses and give the rest to TableCache. + return sanitized_options.max_open_files - kNumNonTableCacheFiles; +} + +DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) + : env_(raw_options.env), + internal_comparator_(raw_options.comparator), + internal_filter_policy_(raw_options.filter_policy), + options_(SanitizeOptions(dbname, &internal_comparator_, + &internal_filter_policy_, raw_options)), + owns_info_log_(options_.info_log != raw_options.info_log), + owns_cache_(options_.block_cache != raw_options.block_cache), + dbname_(dbname), + table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), + db_lock_(nullptr), + shutting_down_(false), + background_work_finished_signal_(&mutex_), + mem_(nullptr), + imm_(nullptr), + has_imm_(false), + logfile_(nullptr), + logfile_number_(0), + log_(nullptr), + seed_(0), + tmp_batch_(new WriteBatch), + background_compaction_scheduled_(false), + manual_compaction_(nullptr), + versions_(new VersionSet(dbname_, &options_, table_cache_, + &internal_comparator_)) {} + bool static key_value_separated_; //朴,添加是否kv分离,12.07 + + +DBImpl::~DBImpl() { + // Wait for background work to finish. + mutex_.Lock(); + shutting_down_.store(true, std::memory_order_release); + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); + } + mutex_.Unlock(); + + if (db_lock_ != nullptr) { + env_->UnlockFile(db_lock_); + } + + delete versions_; + if (mem_ != nullptr) mem_->Unref(); + if (imm_ != nullptr) imm_->Unref(); + delete tmp_batch_; + delete log_; + delete logfile_; + delete table_cache_; + + if (owns_info_log_) { + delete options_.info_log; + } + if (owns_cache_) { + delete options_.block_cache; + } +} + +Status DBImpl::NewDB() { + VersionEdit new_db; + new_db.SetComparatorName(user_comparator()->Name()); + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + const std::string manifest = DescriptorFileName(dbname_, 1); + WritableFile* file; + Status s = env_->NewWritableFile(manifest, &file); + if (!s.ok()) { + return s; + } + { + log::Writer log(file); + std::string record; + new_db.EncodeTo(&record); + s = log.AddRecord(record); + if (s.ok()) { + s = file->Sync(); + } + if (s.ok()) { + s = file->Close(); + } + } + delete file; + if (s.ok()) { + // Make "CURRENT" file that points to the new manifest file. + s = SetCurrentFile(env_, dbname_, 1); + } else { + env_->RemoveFile(manifest); + } + return s; +} + +void DBImpl::MaybeIgnoreError(Status* s) const { + if (s->ok() || options_.paranoid_checks) { + // No change needed + } else { + Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); + *s = Status::OK(); + } +} + +void DBImpl::RemoveObsoleteFiles() { + mutex_.AssertHeld(); + + if (!bg_error_.ok()) { + // After a background error, we don't know whether a new version may + // or may not have been committed, so we cannot safely garbage collect. + return; + } + + // Make a set of all of the live files + std::set live = pending_outputs_; + versions_->AddLiveFiles(&live); + + std::vector filenames; + env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose + uint64_t number; + FileType type; + std::vector files_to_delete; + for (std::string& filename : filenames) { + if (ParseFileName(filename, &number, &type)) { + bool keep = true; + switch (type) { + case kLogFile: + keep = ((number >= versions_->LogNumber()) || + (number == versions_->PrevLogNumber())); + break; + case kDescriptorFile: + // Keep my manifest file, and any newer incarnations' + // (in case there is a race that allows other incarnations) + keep = (number >= versions_->ManifestFileNumber()); + break; + case kTableFile: + keep = (live.find(number) != live.end()); + break; + case kTempFile: + // Any temp files that are currently being written to must + // be recorded in pending_outputs_, which is inserted into "live" + keep = (live.find(number) != live.end()); + break; + case kCurrentFile: + case kDBLockFile: + case kInfoLogFile: + keep = true; + break; + } + + if (!keep) { + files_to_delete.push_back(std::move(filename)); + if (type == kTableFile) { + table_cache_->Evict(number); + } + Log(options_.info_log, "Delete type=%d #%lld\n", static_cast(type), + static_cast(number)); + } + } + } + + // While deleting all files unblock other threads. All files being deleted + // have unique names which will not collide with newly created files and + // are therefore safe to delete while allowing other threads to proceed. + mutex_.Unlock(); + for (const std::string& filename : files_to_delete) { + env_->RemoveFile(dbname_ + "/" + filename); + } + mutex_.Lock(); +} + +Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { + mutex_.AssertHeld(); + + // Ignore error from CreateDir since the creation of the DB is + // committed only when the descriptor is created, and this directory + // may already exist from a previous failed creation attempt. + env_->CreateDir(dbname_); + assert(db_lock_ == nullptr); + Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); + if (!s.ok()) { + return s; + } + + if (!env_->FileExists(CurrentFileName(dbname_))) { + if (options_.create_if_missing) { + Log(options_.info_log, "Creating DB %s since it was missing.", + dbname_.c_str()); + s = NewDB(); + if (!s.ok()) { + return s; + } + } else { + return Status::InvalidArgument( + dbname_, "does not exist (create_if_missing is false)"); + } + } else { + if (options_.error_if_exists) { + return Status::InvalidArgument(dbname_, + "exists (error_if_exists is true)"); + } + } + + s = versions_->Recover(save_manifest); + if (!s.ok()) { + return s; + } + SequenceNumber max_sequence(0); + + // Recover from all newer log files than the ones named in the + // descriptor (new log files may have been added by the previous + // incarnation without registering them in the descriptor). + // + // Note that PrevLogNumber() is no longer used, but we pay + // attention to it in case we are recovering a database + // produced by an older version of leveldb. + const uint64_t min_log = versions_->LogNumber(); + const uint64_t prev_log = versions_->PrevLogNumber(); + std::vector filenames; + s = env_->GetChildren(dbname_, &filenames); + if (!s.ok()) { + return s; + } + std::set expected; + versions_->AddLiveFiles(&expected); + uint64_t number; + FileType type; + std::vector logs; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + expected.erase(number); + if (type == kLogFile && ((number >= min_log) || (number == prev_log))) + logs.push_back(number); + } + } + if (!expected.empty()) { + char buf[50]; + std::snprintf(buf, sizeof(buf), "%d missing files; e.g.", + static_cast(expected.size())); + return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); + } + + // Recover in the order in which the logs were generated + std::sort(logs.begin(), logs.end()); + for (size_t i = 0; i < logs.size(); i++) { + s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, + &max_sequence); + if (!s.ok()) { + return s; + } + + // The previous incarnation may not have written any MANIFEST + // records after allocating this log number. So we manually + // update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(logs[i]); + } + + if (versions_->LastSequence() < max_sequence) { + versions_->SetLastSequence(max_sequence); + } + + return Status::OK(); +} + +Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, + bool* save_manifest, VersionEdit* edit, + SequenceNumber* max_sequence) { + struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + const char* fname; + Status* status; // null if options_.paranoid_checks==false + void Corruption(size_t bytes, const Status& s) override { + Log(info_log, "%s%s: dropping %d bytes; %s", + (this->status == nullptr ? "(ignoring error) " : ""), fname, + static_cast(bytes), s.ToString().c_str()); + if (this->status != nullptr && this->status->ok()) *this->status = s; + } + }; + + mutex_.AssertHeld(); + + // Open the log file + std::string fname = LogFileName(dbname_, log_number); + SequentialFile* file; + Status status = env_->NewSequentialFile(fname, &file); + if (!status.ok()) { + MaybeIgnoreError(&status); + return status; + } + + // Create the log reader. + LogReporter reporter; + reporter.env = env_; + reporter.info_log = options_.info_log; + reporter.fname = fname.c_str(); + reporter.status = (options_.paranoid_checks ? &status : nullptr); + // We intentionally make log::Reader do checksumming even if + // paranoid_checks==false so that corruptions cause entire commits + // to be skipped instead of propagating bad information (like overly + // large sequence numbers). + log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/); + Log(options_.info_log, "Recovering log #%llu", + (unsigned long long)log_number); + + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + int compactions = 0; + MemTable* mem = nullptr; + while (reader.ReadRecord(&record, &scratch) && status.ok()) { + if (record.size() < 12) { + reporter.Corruption(record.size(), + Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + + if (mem == nullptr) { + mem = new MemTable(internal_comparator_); + mem->Ref(); + } + status = WriteBatchInternal::InsertInto(&batch, mem); + MaybeIgnoreError(&status); + if (!status.ok()) { + break; + } + const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + + WriteBatchInternal::Count(&batch) - 1; + if (last_seq > *max_sequence) { + *max_sequence = last_seq; + } + + if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { + compactions++; + *save_manifest = true; + status = WriteLevel0Table(mem, edit, nullptr); + mem->Unref(); + mem = nullptr; + if (!status.ok()) { + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + break; + } + } + } + + delete file; + + // See if we should keep reusing the last log file. + if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { + assert(logfile_ == nullptr); + assert(log_ == nullptr); + assert(mem_ == nullptr); + uint64_t lfile_size; + if (env_->GetFileSize(fname, &lfile_size).ok() && + env_->NewAppendableFile(fname, &logfile_).ok()) { + Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); + log_ = new log::Writer(logfile_, lfile_size); + logfile_number_ = log_number; + if (mem != nullptr) { + mem_ = mem; + mem = nullptr; + } else { + // mem can be nullptr if lognum exists but was empty. + mem_ = new MemTable(internal_comparator_); + mem_->Ref(); + } + } + } + + if (mem != nullptr) { + // mem did not get reused; compact it. + if (status.ok()) { + *save_manifest = true; + status = WriteLevel0Table(mem, edit, nullptr); + } + mem->Unref(); + } + + return status; +} + +Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, + Version* base) { + mutex_.AssertHeld(); + const uint64_t start_micros = env_->NowMicros(); + FileMetaData meta; + meta.number = versions_->NewFileNumber(); + pending_outputs_.insert(meta.number); + Iterator* iter = mem->NewIterator(); + Log(options_.info_log, "Level-0 table #%llu: started", + (unsigned long long)meta.number); + + Status s; + { + mutex_.Unlock(); + s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); + mutex_.Lock(); + } + + Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", + (unsigned long long)meta.number, (unsigned long long)meta.file_size, + s.ToString().c_str()); + delete iter; + pending_outputs_.erase(meta.number); + + // Note that if file_size is zero, the file has been deleted and + // should not be added to the manifest. + int level = 0; + if (s.ok() && meta.file_size > 0) { + const Slice min_user_key = meta.smallest.user_key(); + const Slice max_user_key = meta.largest.user_key(); + if (base != nullptr) { + level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); + } + edit->AddFile(level, meta.number, meta.file_size, meta.smallest, + meta.largest); + } + + CompactionStats stats; + stats.micros = env_->NowMicros() - start_micros; + stats.bytes_written = meta.file_size; + stats_[level].Add(stats); + return s; +} + +void DBImpl::CompactMemTable() { + mutex_.AssertHeld(); + assert(imm_ != nullptr); + + // Save the contents of the memtable as a new Table + VersionEdit edit; + Version* base = versions_->current(); + base->Ref(); + Status s = WriteLevel0Table(imm_, &edit, base); + base->Unref(); + + if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { + s = Status::IOError("Deleting DB during memtable compaction"); + } + + // Replace immutable memtable with the generated Table + if (s.ok()) { + edit.SetPrevLogNumber(0); + edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed + s = versions_->LogAndApply(&edit, &mutex_); + } + + if (s.ok()) { + // Commit to the new state + imm_->Unref(); + imm_ = nullptr; + has_imm_.store(false, std::memory_order_release); + RemoveObsoleteFiles(); + } else { + RecordBackgroundError(s); + } +} + +void DBImpl::CompactRange(const Slice* begin, const Slice* end) { + int max_level_with_files = 1; + { + MutexLock l(&mutex_); + Version* base = versions_->current(); + for (int level = 1; level < config::kNumLevels; level++) { + if (base->OverlapInLevel(level, begin, end)) { + max_level_with_files = level; + } + } + } + TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap + for (int level = 0; level < max_level_with_files; level++) { + TEST_CompactRange(level, begin, end); + } +} + +void DBImpl::TEST_CompactRange(int level, const Slice* begin, + const Slice* end) { + assert(level >= 0); + assert(level + 1 < config::kNumLevels); + + InternalKey begin_storage, end_storage; + + ManualCompaction manual; + manual.level = level; + manual.done = false; + if (begin == nullptr) { + manual.begin = nullptr; + } else { + begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); + manual.begin = &begin_storage; + } + if (end == nullptr) { + manual.end = nullptr; + } else { + end_storage = InternalKey(*end, 0, static_cast(0)); + manual.end = &end_storage; + } + + MutexLock l(&mutex_); + while (!manual.done && !shutting_down_.load(std::memory_order_acquire) && + bg_error_.ok()) { + if (manual_compaction_ == nullptr) { // Idle + manual_compaction_ = &manual; + MaybeScheduleCompaction(); + } else { // Running either my compaction or another compaction. + background_work_finished_signal_.Wait(); + } + } + // Finish current background compaction in the case where + // `background_work_finished_signal_` was signalled due to an error. + while (background_compaction_scheduled_) { + background_work_finished_signal_.Wait(); + } + if (manual_compaction_ == &manual) { + // Cancel my manual compaction since we aborted early for some reason. + manual_compaction_ = nullptr; + } +} + +Status DBImpl::TEST_CompactMemTable() { + // nullptr batch means just wait for earlier writes to be done + Status s = Write(WriteOptions(), nullptr); + if (s.ok()) { + // Wait until the compaction completes + MutexLock l(&mutex_); + while (imm_ != nullptr && bg_error_.ok()) { + background_work_finished_signal_.Wait(); + } + if (imm_ != nullptr) { + s = bg_error_; + } + } + return s; +} + +void DBImpl::RecordBackgroundError(const Status& s) { + mutex_.AssertHeld(); + if (bg_error_.ok()) { + bg_error_ = s; + background_work_finished_signal_.SignalAll(); + } +} + +void DBImpl::MaybeScheduleCompaction() { + mutex_.AssertHeld(); + if (background_compaction_scheduled_) { + // Already scheduled + } else if (shutting_down_.load(std::memory_order_acquire)) { + // DB is being deleted; no more background compactions + } else if (!bg_error_.ok()) { + // Already got an error; no more changes + } else if (imm_ == nullptr && manual_compaction_ == nullptr && + !versions_->NeedsCompaction()) { + // No work to be done + } else { + background_compaction_scheduled_ = true; + env_->Schedule(&DBImpl::BGWork, this); + } +} + +void DBImpl::BGWork(void* db) { + reinterpret_cast(db)->BackgroundCall(); +} + +void DBImpl::BackgroundCall() { + MutexLock l(&mutex_); + assert(background_compaction_scheduled_); + if (shutting_down_.load(std::memory_order_acquire)) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + BackgroundCompaction(); + } + + background_compaction_scheduled_ = false; + + // Previous compaction may have produced too many files in a level, + // so reschedule another compaction if needed. + MaybeScheduleCompaction(); + background_work_finished_signal_.SignalAll(); +} + +void DBImpl::BackgroundCompaction() { + mutex_.AssertHeld(); + + if (imm_ != nullptr) { + CompactMemTable(); + return; + } + + Compaction* c; + bool is_manual = (manual_compaction_ != nullptr); + InternalKey manual_end; + if (is_manual) { + ManualCompaction* m = manual_compaction_; + c = versions_->CompactRange(m->level, m->begin, m->end); + m->done = (c == nullptr); + if (c != nullptr) { + manual_end = c->input(0, c->num_input_files(0) - 1)->largest; + } + Log(options_.info_log, + "Manual compaction at level-%d from %s .. %s; will stop at %s\n", + m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), + (m->end ? m->end->DebugString().c_str() : "(end)"), + (m->done ? "(end)" : manual_end.DebugString().c_str())); + } else { + c = versions_->PickCompaction(); + } + + Status status; + if (c == nullptr) { + // Nothing to do + } else if (!is_manual && c->IsTrivialMove()) { + // Move file to next level + assert(c->num_input_files(0) == 1); + FileMetaData* f = c->input(0, 0); + c->edit()->RemoveFile(c->level(), f->number); + c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, + f->largest); + status = versions_->LogAndApply(c->edit(), &mutex_); + if (!status.ok()) { + RecordBackgroundError(status); + } + VersionSet::LevelSummaryStorage tmp; + Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", + static_cast(f->number), c->level() + 1, + static_cast(f->file_size), + status.ToString().c_str(), versions_->LevelSummary(&tmp)); + } else { + CompactionState* compact = new CompactionState(c); + status = DoCompactionWork(compact); + if (!status.ok()) { + RecordBackgroundError(status); + } + CleanupCompaction(compact); + c->ReleaseInputs(); + RemoveObsoleteFiles(); + } + delete c; + + if (status.ok()) { + // Done + } else if (shutting_down_.load(std::memory_order_acquire)) { + // Ignore compaction errors found during shutting down + } else { + Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); + } + + if (is_manual) { + ManualCompaction* m = manual_compaction_; + if (!status.ok()) { + m->done = true; + } + if (!m->done) { + // We only compacted part of the requested range. Update *m + // to the range that is left to be compacted. + m->tmp_storage = manual_end; + m->begin = &m->tmp_storage; + } + manual_compaction_ = nullptr; + } +} + +void DBImpl::CleanupCompaction(CompactionState* compact) { + mutex_.AssertHeld(); + if (compact->builder != nullptr) { + // May happen if we get a shutdown call in the middle of compaction + compact->builder->Abandon(); + delete compact->builder; + } else { + assert(compact->outfile == nullptr); + } + delete compact->outfile; + for (size_t i = 0; i < compact->outputs.size(); i++) { + const CompactionState::Output& out = compact->outputs[i]; + pending_outputs_.erase(out.number); + } + delete compact; +} + +Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { + assert(compact != nullptr); + assert(compact->builder == nullptr); + uint64_t file_number; + { + mutex_.Lock(); + file_number = versions_->NewFileNumber(); + pending_outputs_.insert(file_number); + CompactionState::Output out; + out.number = file_number; + out.smallest.Clear(); + out.largest.Clear(); + compact->outputs.push_back(out); + mutex_.Unlock(); + } + + // Make the output file + std::string fname = TableFileName(dbname_, file_number); + Status s = env_->NewWritableFile(fname, &compact->outfile); + if (s.ok()) { + compact->builder = new TableBuilder(options_, compact->outfile); + } + return s; +} + +Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, + Iterator* input) { + assert(compact != nullptr); + assert(compact->outfile != nullptr); + assert(compact->builder != nullptr); + + const uint64_t output_number = compact->current_output()->number; + assert(output_number != 0); + + // Check for iterator errors + Status s = input->status(); + const uint64_t current_entries = compact->builder->NumEntries(); + if (s.ok()) { + s = compact->builder->Finish(); + } else { + compact->builder->Abandon(); + } + const uint64_t current_bytes = compact->builder->FileSize(); + compact->current_output()->file_size = current_bytes; + compact->total_bytes += current_bytes; + delete compact->builder; + compact->builder = nullptr; + + // Finish and check for file errors + if (s.ok()) { + s = compact->outfile->Sync(); + } + if (s.ok()) { + s = compact->outfile->Close(); + } + delete compact->outfile; + compact->outfile = nullptr; + + if (s.ok() && current_entries > 0) { + // Verify that the table is usable + Iterator* iter = + table_cache_->NewIterator(ReadOptions(), output_number, current_bytes); + s = iter->status(); + delete iter; + if (s.ok()) { + Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes", + (unsigned long long)output_number, compact->compaction->level(), + (unsigned long long)current_entries, + (unsigned long long)current_bytes); + } + } + return s; +} + +Status DBImpl::InstallCompactionResults(CompactionState* compact) { + mutex_.AssertHeld(); + Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", + compact->compaction->num_input_files(0), compact->compaction->level(), + compact->compaction->num_input_files(1), compact->compaction->level() + 1, + static_cast(compact->total_bytes)); + + // Add compaction outputs + compact->compaction->AddInputDeletions(compact->compaction->edit()); + const int level = compact->compaction->level(); + for (size_t i = 0; i < compact->outputs.size(); i++) { + const CompactionState::Output& out = compact->outputs[i]; + compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, + out.smallest, out.largest); + } + return versions_->LogAndApply(compact->compaction->edit(), &mutex_); +} + +Status DBImpl::DoCompactionWork(CompactionState* compact) { + const uint64_t start_micros = env_->NowMicros(); + int64_t imm_micros = 0; // Micros spent doing imm_ compactions + + Log(options_.info_log, "Compacting %d@%d + %d@%d files", + compact->compaction->num_input_files(0), compact->compaction->level(), + compact->compaction->num_input_files(1), + compact->compaction->level() + 1); + + assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); + assert(compact->builder == nullptr); + assert(compact->outfile == nullptr); + if (snapshots_.empty()) { + compact->smallest_snapshot = versions_->LastSequence(); + } else { + compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); + } + + Iterator* input = versions_->MakeInputIterator(compact->compaction); + + // Release mutex while we're actually doing the compaction work + mutex_.Unlock(); + + input->SeekToFirst(); + Status status; + ParsedInternalKey ikey; + std::string current_user_key; + bool has_current_user_key = false; + SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { + // Prioritize immutable compaction work + if (has_imm_.load(std::memory_order_relaxed)) { + const uint64_t imm_start = env_->NowMicros(); + mutex_.Lock(); + if (imm_ != nullptr) { + CompactMemTable(); + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); + } + mutex_.Unlock(); + imm_micros += (env_->NowMicros() - imm_start); + } + + Slice key = input->key(); + if (compact->compaction->ShouldStopBefore(key) && + compact->builder != nullptr) { + status = FinishCompactionOutputFile(compact, input); + if (!status.ok()) { + break; + } + } + + // Handle key/value, add to state, etc. + bool drop = false; + if (!ParseInternalKey(key, &ikey)) { + // Do not hide error keys + current_user_key.clear(); + has_current_user_key = false; + last_sequence_for_key = kMaxSequenceNumber; + } else { + if (!has_current_user_key || + user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != + 0) { + // First occurrence of this user key + current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); + has_current_user_key = true; + last_sequence_for_key = kMaxSequenceNumber; + } + + if (last_sequence_for_key <= compact->smallest_snapshot) { + // Hidden by an newer entry for same user key + drop = true; // (A) + } else if (ikey.type == kTypeDeletion && + ikey.sequence <= compact->smallest_snapshot && + compact->compaction->IsBaseLevelForKey(ikey.user_key)) { + // For this user key: + // (1) there is no data in higher levels + // (2) data in lower levels will have larger sequence numbers + // (3) data in layers that are being compacted here and have + // smaller sequence numbers will be dropped in the next + // few iterations of this loop (by rule (A) above). + // Therefore this deletion marker is obsolete and can be dropped. + drop = true; + } + + last_sequence_for_key = ikey.sequence; + } +#if 0 + Log(options_.info_log, + " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " + "%d smallest_snapshot: %d", + ikey.user_key.ToString().c_str(), + (int)ikey.sequence, ikey.type, kTypeValue, drop, + compact->compaction->IsBaseLevelForKey(ikey.user_key), + (int)last_sequence_for_key, (int)compact->smallest_snapshot); +#endif + + if (!drop) { + // Open output file if necessary + if (compact->builder == nullptr) { + status = OpenCompactionOutputFile(compact); + if (!status.ok()) { + break; + } + } + if (compact->builder->NumEntries() == 0) { + compact->current_output()->smallest.DecodeFrom(key); + } + compact->current_output()->largest.DecodeFrom(key); + compact->builder->Add(key, input->value()); + + // Close output file if it is big enough + if (compact->builder->FileSize() >= + compact->compaction->MaxOutputFileSize()) { + status = FinishCompactionOutputFile(compact, input); + if (!status.ok()) { + break; + } + } + } + + input->Next(); + } + + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { + status = Status::IOError("Deleting DB during compaction"); + } + if (status.ok() && compact->builder != nullptr) { + status = FinishCompactionOutputFile(compact, input); + } + if (status.ok()) { + status = input->status(); + } + delete input; + input = nullptr; + + CompactionStats stats; + stats.micros = env_->NowMicros() - start_micros - imm_micros; + for (int which = 0; which < 2; which++) { + for (int i = 0; i < compact->compaction->num_input_files(which); i++) { + stats.bytes_read += compact->compaction->input(which, i)->file_size; + } + } + for (size_t i = 0; i < compact->outputs.size(); i++) { + stats.bytes_written += compact->outputs[i].file_size; + } + + mutex_.Lock(); + stats_[compact->compaction->level() + 1].Add(stats); + + if (status.ok()) { + status = InstallCompactionResults(compact); + } + if (!status.ok()) { + RecordBackgroundError(status); + } + VersionSet::LevelSummaryStorage tmp; + Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); + return status; +} + +namespace { + +struct IterState { + port::Mutex* const mu; + Version* const version GUARDED_BY(mu); + MemTable* const mem GUARDED_BY(mu); + MemTable* const imm GUARDED_BY(mu); + + IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) + : mu(mutex), version(version), mem(mem), imm(imm) {} +}; + +static void CleanupIteratorState(void* arg1, void* arg2) { + IterState* state = reinterpret_cast(arg1); + state->mu->Lock(); + state->mem->Unref(); + if (state->imm != nullptr) state->imm->Unref(); + state->version->Unref(); + state->mu->Unlock(); + delete state; +} + +} // anonymous namespace + +Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, + SequenceNumber* latest_snapshot, + uint32_t* seed) { + mutex_.Lock(); + *latest_snapshot = versions_->LastSequence(); + + // Collect together all needed child iterators + std::vector list; + list.push_back(mem_->NewIterator()); + mem_->Ref(); + if (imm_ != nullptr) { + list.push_back(imm_->NewIterator()); + imm_->Ref(); + } + versions_->current()->AddIterators(options, &list); + Iterator* internal_iter = + NewMergingIterator(&internal_comparator_, &list[0], list.size()); + versions_->current()->Ref(); + + IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); + + *seed = ++seed_; + mutex_.Unlock(); + return internal_iter; +} + +Iterator* DBImpl::TEST_NewInternalIterator() { + SequenceNumber ignored; + uint32_t ignored_seed; + return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); +} + +int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { + MutexLock l(&mutex_); + return versions_->MaxNextLevelOverlappingBytes(); +} + +Status DBImpl::Get(const ReadOptions& options, const Slice& key, + std::string* value) { + Status s; + MutexLock l(&mutex_); + SequenceNumber snapshot; + if (options.snapshot != nullptr) { + snapshot = + static_cast(options.snapshot)->sequence_number(); + } else { + snapshot = versions_->LastSequence(); + } + + MemTable* mem = mem_; + MemTable* imm = imm_; + Version* current = versions_->current(); + mem->Ref(); + if (imm != nullptr) imm->Ref(); + current->Ref(); + + bool have_stat_update = false; + Version::GetStats stats; + + // Unlock while reading from files and memtables + { + mutex_.Unlock(); + // First look in the memtable, then in the immutable memtable (if any). + LookupKey lkey(key, snapshot); + if (mem->Get(lkey, value, &s)) { + // Done + } else if (imm != nullptr && imm->Get(lkey, value, &s)) { + // Done + } else { + s = current->Get(options, lkey, value, &stats); + have_stat_update = true; + } + mutex_.Lock(); + } + + if (have_stat_update && current->UpdateStats(stats)) { + MaybeScheduleCompaction(); + } + mem->Unref(); + if (imm != nullptr) imm->Unref(); + current->Unref(); + return s; +} + +Iterator* DBImpl::NewIterator(const ReadOptions& options) { + SequenceNumber latest_snapshot; + uint32_t seed; + Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); + return NewDBIterator(this, user_comparator(), iter, + (options.snapshot != nullptr + ? static_cast(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); +} + +void DBImpl::RecordReadSample(Slice key) { + MutexLock l(&mutex_); + if (versions_->current()->RecordReadSample(key)) { + MaybeScheduleCompaction(); + } +} + +const Snapshot* DBImpl::GetSnapshot() { + MutexLock l(&mutex_); + return snapshots_.New(versions_->LastSequence()); +} + +void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { + MutexLock l(&mutex_); + snapshots_.Delete(static_cast(snapshot)); +} + + +// Convenience methods +// Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { +// return DB::Put(o, key, val); +// } + +Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { + if (key_value_separated_) { + // 分离key和value的逻辑,朴,12.07 + //... + } else { + // 不分离key和value的逻辑 + return DB::Put(o, key, val); + } +} + + +Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val, uint64_t ttl) { + return DB::Put(o, key, val, ttl); +} // 实现新的put接口,心 + +Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { + return DB::Delete(options, key); +} + +Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { + Writer w(&mutex_); + w.batch = updates; + w.sync = options.sync; + w.done = false; + + MutexLock l(&mutex_); + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; + } + + // May temporarily unlock and wait. + Status status = MakeRoomForWrite(updates == nullptr); + uint64_t last_sequence = versions_->LastSequence(); + Writer* last_writer = &w; + if (status.ok() && updates != nullptr) { // nullptr batch is for compactions + WriteBatch* write_batch = BuildBatchGroup(&last_writer); + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + last_sequence += WriteBatchInternal::Count(write_batch); + + // Add to log and apply to memtable. We can release the lock + // during this phase since &w is currently responsible for logging + // and protects against concurrent loggers and concurrent writes + // into mem_. + { + mutex_.Unlock(); + status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + bool sync_error = false; + if (status.ok() && options.sync) { + status = logfile_->Sync(); + if (!status.ok()) { + sync_error = true; + } + } + if (status.ok()) { + status = WriteBatchInternal::InsertInto(write_batch, mem_); + } + mutex_.Lock(); + if (sync_error) { + // The state of the log file is indeterminate: the log record we + // just added may or may not show up when the DB is re-opened. + // So we force the DB into a mode where all future writes fail. + RecordBackgroundError(status); + } + } + if (write_batch == tmp_batch_) tmp_batch_->Clear(); + + versions_->SetLastSequence(last_sequence); + } + + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; + } + + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + + return status; +} + +// REQUIRES: Writer list must be non-empty +// REQUIRES: First writer must have a non-null batch +WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + mutex_.AssertHeld(); + assert(!writers_.empty()); + Writer* first = writers_.front(); + WriteBatch* result = first->batch; + assert(result != nullptr); + + size_t size = WriteBatchInternal::ByteSize(first->batch); + + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much. + size_t max_size = 1 << 20; + if (size <= (128 << 10)) { + max_size = size + (128 << 10); + } + + *last_writer = first; + std::deque::iterator iter = writers_.begin(); + ++iter; // Advance past "first" + for (; iter != writers_.end(); ++iter) { + Writer* w = *iter; + if (w->sync && !first->sync) { + // Do not include a sync write into a batch handled by a non-sync write. + break; + } + + if (w->batch != nullptr) { + size += WriteBatchInternal::ByteSize(w->batch); + if (size > max_size) { + // Do not make batch too big + break; + } + + // Append to *result + if (result == first->batch) { + // Switch to temporary batch instead of disturbing caller's batch + result = tmp_batch_; + assert(WriteBatchInternal::Count(result) == 0); + WriteBatchInternal::Append(result, first->batch); + } + WriteBatchInternal::Append(result, w->batch); + } + *last_writer = w; + } + return result; +} + +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +Status DBImpl::MakeRoomForWrite(bool force) { + mutex_.AssertHeld(); + assert(!writers_.empty()); + bool allow_delay = !force; + Status s; + while (true) { + if (!bg_error_.ok()) { + // Yield previous error + s = bg_error_; + break; + } else if (allow_delay && versions_->NumLevelFiles(0) >= + config::kL0_SlowdownWritesTrigger) { + // We are getting close to hitting a hard limit on the number of + // L0 files. Rather than delaying a single write by several + // seconds when we hit the hard limit, start delaying each + // individual write by 1ms to reduce latency variance. Also, + // this delay hands over some CPU to the compaction thread in + // case it is sharing the same core as the writer. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + allow_delay = false; // Do not delay a single write more than once + mutex_.Lock(); + } else if (!force && + (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { + // There is room in current memtable + break; + } else if (imm_ != nullptr) { + // We have filled up the current memtable, but the previous + // one is still being compacted, so we wait. + Log(options_.info_log, "Current memtable full; waiting...\n"); + background_work_finished_signal_.Wait(); + } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { + // There are too many level-0 files. + Log(options_.info_log, "Too many L0 files; waiting...\n"); + background_work_finished_signal_.Wait(); + } else { + // Attempt to switch to a new memtable and trigger compaction of old + assert(versions_->PrevLogNumber() == 0); + uint64_t new_log_number = versions_->NewFileNumber(); + WritableFile* lfile = nullptr; + s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); + if (!s.ok()) { + // Avoid chewing through file number space in a tight loop. + versions_->ReuseFileNumber(new_log_number); + break; + } + + delete log_; + + s = logfile_->Close(); + if (!s.ok()) { + // We may have lost some data written to the previous log file. + // Switch to the new log file anyway, but record as a background + // error so we do not attempt any more writes. + // + // We could perhaps attempt to save the memtable corresponding + // to log file and suppress the error if that works, but that + // would add more complexity in a critical code path. + RecordBackgroundError(s); + } + delete logfile_; + + logfile_ = lfile; + logfile_number_ = new_log_number; + log_ = new log::Writer(lfile); + imm_ = mem_; + has_imm_.store(true, std::memory_order_release); + mem_ = new MemTable(internal_comparator_); + mem_->Ref(); + force = false; // Do not force another compaction if have room + MaybeScheduleCompaction(); + } + } + return s; +} + +bool DBImpl::GetProperty(const Slice& property, std::string* value) { + value->clear(); + + MutexLock l(&mutex_); + Slice in = property; + Slice prefix("leveldb."); + if (!in.starts_with(prefix)) return false; + in.remove_prefix(prefix.size()); + + if (in.starts_with("num-files-at-level")) { + in.remove_prefix(strlen("num-files-at-level")); + uint64_t level; + bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); + if (!ok || level >= config::kNumLevels) { + return false; + } else { + char buf[100]; + std::snprintf(buf, sizeof(buf), "%d", + versions_->NumLevelFiles(static_cast(level))); + *value = buf; + return true; + } + } else if (in == "stats") { + char buf[200]; + std::snprintf(buf, sizeof(buf), + " Compactions\n" + "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" + "--------------------------------------------------\n"); + value->append(buf); + for (int level = 0; level < config::kNumLevels; level++) { + int files = versions_->NumLevelFiles(level); + if (stats_[level].micros > 0 || files > 0) { + std::snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", + level, files, versions_->NumLevelBytes(level) / 1048576.0, + stats_[level].micros / 1e6, + stats_[level].bytes_read / 1048576.0, + stats_[level].bytes_written / 1048576.0); + value->append(buf); + } + } + return true; + } else if (in == "sstables") { + *value = versions_->current()->DebugString(); + return true; + } else if (in == "approximate-memory-usage") { + size_t total_usage = options_.block_cache->TotalCharge(); + if (mem_) { + total_usage += mem_->ApproximateMemoryUsage(); + } + if (imm_) { + total_usage += imm_->ApproximateMemoryUsage(); + } + char buf[50]; + std::snprintf(buf, sizeof(buf), "%llu", + static_cast(total_usage)); + value->append(buf); + return true; + } + + return false; +} + +void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { + // TODO(opt): better implementation + MutexLock l(&mutex_); + Version* v = versions_->current(); + v->Ref(); + + for (int i = 0; i < n; i++) { + // Convert user_key into a corresponding internal key. + InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); + InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); + uint64_t start = versions_->ApproximateOffsetOf(v, k1); + uint64_t limit = versions_->ApproximateOffsetOf(v, k2); + sizes[i] = (limit >= start ? limit - start : 0); + } + + v->Unref(); +} + +// Default implementations of convenience methods that subclasses of DB +// can call if they wish +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { //朴 + WriteBatch batch; + batch.Put(key, value); + return Write(opt, &batch); +} + +// 假设增加一个新的Put接口,包含TTL参数, 单位(秒) +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl){ + WriteBatch batch; + batch.Put(key, value, ttl); + return Write(opt, &batch); + } // 这里应该是新的PUT接口的真正实现的地方,还是由本来的DB类实现,怪?心 + + + + + +Status DB::Delete(const WriteOptions& opt, const Slice& key) { + WriteBatch batch; + batch.Delete(key); + return Write(opt, &batch); +} + + + + +DB::~DB() = default; + +int DB::NewBlobNum(){ + static int counter = blob_num; // 使用一个静态变量来存储计数器 + counter++; + return counter; +} //12.08 +Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { + *dbptr = nullptr; + + DBImpl* impl = new DBImpl(options, dbname); + impl->mutex_.Lock(); + VersionEdit edit; + // Recover handles create_if_missing, error_if_exists + bool save_manifest = false; + Status s = impl->Recover(&edit, &save_manifest); + if (s.ok() && impl->mem_ == nullptr) { + // Create new log and a corresponding memtable. + uint64_t new_log_number = impl->versions_->NewFileNumber(); + WritableFile* lfile; + s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), + &lfile); + if (s.ok()) { + edit.SetLogNumber(new_log_number); + impl->logfile_ = lfile; + impl->logfile_number_ = new_log_number; + impl->log_ = new log::Writer(lfile); + impl->mem_ = new MemTable(impl->internal_comparator_); + impl->mem_->Ref(); + } + } + if (s.ok() && save_manifest) { + edit.SetPrevLogNumber(0); // No older logs needed after recovery. + edit.SetLogNumber(impl->logfile_number_); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + } + if (s.ok()) { + impl->RemoveObsoleteFiles(); + impl->MaybeScheduleCompaction(); + } + impl->mutex_.Unlock(); + if (s.ok()) { + assert(impl->mem_ != nullptr); + *dbptr = impl; + } else { + delete impl; + } + return s; +} + +Snapshot::~Snapshot() = default; + +Status DestroyDB(const std::string& dbname, const Options& options) { + Env* env = options.env; + std::vector filenames; + Status result = env->GetChildren(dbname, &filenames); + if (!result.ok()) { + // Ignore error in case directory does not exist + return Status::OK(); + } + + FileLock* lock; + const std::string lockname = LockFileName(dbname); + result = env->LockFile(lockname, &lock); + if (result.ok()) { + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && + type != kDBLockFile) { // Lock file will be deleted at end + Status del = env->RemoveFile(dbname + "/" + filenames[i]); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + env->UnlockFile(lock); // Ignore error since state is already gone + env->RemoveFile(lockname); + env->RemoveDir(dbname); // Ignore error in case dir contains other files + } + return result; +} + +} // namespace leveldb diff --git a/table/db_impl.h b/table/db_impl.h new file mode 100644 index 0000000..98c2f8c --- /dev/null +++ b/table/db_impl.h @@ -0,0 +1,226 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ +#define STORAGE_LEVELDB_DB_DB_IMPL_H_ + +#include +#include +#include +#include + +#include "db/dbformat.h" +#include "db/log_writer.h" +#include "db/snapshot.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "port/port.h" +#include "port/thread_annotations.h" + +namespace leveldb { + +class MemTable; +class TableCache; +class Version; +class VersionEdit; +class VersionSet; + +class DBImpl : public DB { + public: + DBImpl(const Options& options, const std::string& dbname); + + DBImpl(const DBImpl&) = delete; + DBImpl& operator=(const DBImpl&) = delete; + + ~DBImpl() override; + + // Implementations of the DB interface + Status Put(const WriteOptions&, const Slice& key, + const Slice& value) override; + Status Put(const WriteOptions&, const Slice& key, + const Slice& value, uint64_t ttl) override; //实现新的put接口,心 + Status Delete(const WriteOptions&, const Slice& key) override; + Status Write(const WriteOptions& options, WriteBatch* updates) override; + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override; + Iterator* NewIterator(const ReadOptions&) override; + const Snapshot* GetSnapshot() override; + void ReleaseSnapshot(const Snapshot* snapshot) override; + bool GetProperty(const Slice& property, std::string* value) override; + void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; + void CompactRange(const Slice* begin, const Slice* end) override; + // 朴,添加是否kv分离接口,12.07 + bool static key_value_separated_; + + // Extra methods (for testing) that are not in the public DB interface + + + // Compact any files in the named level that overlap [*begin,*end] + void TEST_CompactRange(int level, const Slice* begin, const Slice* end); + + // Force current memtable contents to be compacted. + Status TEST_CompactMemTable(); + + // Return an internal iterator over the current state of the database. + // The keys of this iterator are internal keys (see format.h). + // The returned iterator should be deleted when no longer needed. + Iterator* TEST_NewInternalIterator(); + + // Return the maximum overlapping data (in bytes) at next level for any + // file at a level >= 1. + int64_t TEST_MaxNextLevelOverlappingBytes(); + + // Record a sample of bytes read at the specified internal key. + // Samples are taken approximately once every config::kReadBytesPeriod + // bytes. + void RecordReadSample(Slice key); + + private: + friend class DB; + struct CompactionState; + struct Writer; + + + + // Information for a manual compaction + struct ManualCompaction { + int level; + bool done; + const InternalKey* begin; // null means beginning of key range + const InternalKey* end; // null means end of key range + InternalKey tmp_storage; // Used to keep track of compaction progress + }; + + // Per level compaction stats. stats_[level] stores the stats for + // compactions that produced data for the specified "level". + struct CompactionStats { + CompactionStats() : micros(0), bytes_read(0), bytes_written(0) {} + + void Add(const CompactionStats& c) { + this->micros += c.micros; + this->bytes_read += c.bytes_read; + this->bytes_written += c.bytes_written; + } + + int64_t micros; + int64_t bytes_read; + int64_t bytes_written; + }; + + Iterator* NewInternalIterator(const ReadOptions&, + SequenceNumber* latest_snapshot, + uint32_t* seed); + + Status NewDB(); + + // Recover the descriptor from persistent storage. May do a significant + // amount of work to recover recently logged updates. Any changes to + // be made to the descriptor are added to *edit. + Status Recover(VersionEdit* edit, bool* save_manifest) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + void MaybeIgnoreError(Status* s) const; + + // Delete any unneeded files and stale in-memory entries. + void RemoveObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + // Compact the in-memory write buffer to disk. Switches to a new + // log-file/memtable and writes a new descriptor iff successful. + // Errors are recorded in bg_error_. + void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, + VersionEdit* edit, SequenceNumber* max_sequence) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + Status MakeRoomForWrite(bool force /* compact even if there is room? */) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + WriteBatch* BuildBatchGroup(Writer** last_writer) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + void RecordBackgroundError(const Status& s); + + void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + static void BGWork(void* db); + void BackgroundCall(); + void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void CleanupCompaction(CompactionState* compact) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + Status DoCompactionWork(CompactionState* compact) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + Status OpenCompactionOutputFile(CompactionState* compact); + Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); + Status InstallCompactionResults(CompactionState* compact) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + const Comparator* user_comparator() const { + return internal_comparator_.user_comparator(); + } + + // Constant after construction + Env* const env_; + const InternalKeyComparator internal_comparator_; + const InternalFilterPolicy internal_filter_policy_; + const Options options_; // options_.comparator == &internal_comparator_ + const bool owns_info_log_; + const bool owns_cache_; + const std::string dbname_; + + // table_cache_ provides its own synchronization + TableCache* const table_cache_; + + // Lock over the persistent DB state. Non-null iff successfully acquired. + FileLock* db_lock_; + + // State below is protected by mutex_ + port::Mutex mutex_; + std::atomic shutting_down_; + port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); + MemTable* mem_; + MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted + std::atomic has_imm_; // So bg thread can detect non-null imm_ + WritableFile* logfile_; + uint64_t logfile_number_ GUARDED_BY(mutex_); + log::Writer* log_; + uint32_t seed_ GUARDED_BY(mutex_); // For sampling. + + // Queue of writers. + std::deque writers_ GUARDED_BY(mutex_); + WriteBatch* tmp_batch_ GUARDED_BY(mutex_); + + SnapshotList snapshots_ GUARDED_BY(mutex_); + + // Set of table files to protect from deletion because they are + // part of ongoing compactions. + std::set pending_outputs_ GUARDED_BY(mutex_); + + // Has a background compaction been scheduled or is running? + bool background_compaction_scheduled_ GUARDED_BY(mutex_); + + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); + + VersionSet* const versions_ GUARDED_BY(mutex_); + + // Have we encountered a background error in paranoid mode? + Status bg_error_ GUARDED_BY(mutex_); + + CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); +}; + +// Sanitize db options. The caller should delete result.info_log if +// it is not equal to src.info_log. +Options SanitizeOptions(const std::string& db, + const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, + const Options& src); + + + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_DB_DB_IMPL_H_ diff --git a/table/kv_seperate_test.cc b/table/kv_seperate_test.cc new file mode 100644 index 0000000..88b59fc --- /dev/null +++ b/table/kv_seperate_test.cc @@ -0,0 +1,119 @@ +#include "gtest/gtest.h" +#include "leveldb/env.h" +#include "leveldb/db.h" +#include "table/blob_file.h" // 假设 BlobFile 的头文件 + +using namespace leveldb; + +constexpr int value_size = 2048; // 单个值的大小 +constexpr int data_size = 128 << 20; // 总数据大小 +constexpr int min_blob_size = 1024; // KV 分离的阈值 + +Status OpenDB(std::string dbName, DB** db) { + Options options; + options.create_if_missing = true; + options.key_value_separated = true; // 启用 KV 分离 + return DB::Open(options, dbName, db); +} + +// 插入数据,模拟 KV 分离 +void InsertData(DB* db) { + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + + for (int i = 0; i < key_num; i++) { + int key_ = rand() % key_num + 1; + std::string key = std::to_string(key_); + std::string value(value_size, 'a'); // 大 value + db->Put(writeOptions, key, value); // 使用标准 Put 接口插入 + } +} + +// 检查数据是否被正确存入 BlobFile +void VerifyBlobFile(const std::string& blob_file_path, int expected_entries) { + BlobFile blobfile(blob_file_path, BlobFile::kReadMode); + Status status = blobfile.Open(); + ASSERT_TRUE(status.ok()); + + int entry_count = 0; + BlobFile::Iterator it = blobfile.NewIterator(); + for (it.SeekToFirst(); it.Valid(); it.Next()) { + ++entry_count; + const Slice& key = it.key(); + const Slice& value = it.value(); + ASSERT_GT(value.size(), min_blob_size); // 确认 value 大于阈值 + } + + ASSERT_EQ(entry_count, expected_entries); // 确认条目数是否正确 + blobfile.Close(); +} + +// KV 分离读写测试 +TEST(TestKVSeparation, WriteAndRead) { + DB* db; + if (OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // 插入数据 + InsertData(db); + + // 验证 BlobFile 内容 + VerifyBlobFile("blob_data", data_size / value_size); + + // 随机点查数据 + ReadOptions readOptions; + srand(static_cast(time(0))); + int key_num = data_size / value_size; + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num + 1; + std::string key = std::to_string(key_); + std::string value; + Status status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); // 验证是否成功读取 + if (value.size() > min_blob_size) { + ASSERT_TRUE(value == std::string(value_size, 'a')); // 验证大 value 的内容 + } + } + + delete db; +} + +// KV 分离压缩测试 +TEST(TestKVSeparation, Compaction) { + DB* db; + + if (OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // 插入数据 + InsertData(db); + + leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_GT(sizes[0], 0); + + // 执行压缩 + db->CompactRange(nullptr, nullptr); + + // 验证压缩后主数据区的大小 + ranges[0] = leveldb::Range("-", "A"); + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_EQ(sizes[0], 0); + + // 验证 BlobFile 内容仍然有效 + VerifyBlobFile("blob_data", data_size / value_size); + + delete db; +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/table/table_builder.cc b/table/table_builder.cc index 7eafa6a..7001b94 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -1,323 +1,345 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#include "leveldb/table_builder.h" - -#include - -#include "leveldb/comparator.h" -#include "leveldb/env.h" -#include "leveldb/filter_policy.h" -#include "leveldb/options.h" -#include "table/block_builder.h" -#include "table/filter_block.h" -#include "table/format.h" -#include "util/coding.h" -#include "util/crc32c.h" -#include "db/db_impl.h" //朴 -#include "table/blob_file.h" //朴 -#include "table/block.h" //朴 - -const size_t min_blob_size = 1024; // 设定值大小阈值为 1KB,朴 - -namespace leveldb { - - BlobFile* blobfile = new BlobFile("blob_data"); // 初始化全局 blobfile 对象,朴 - class BlobFileManager { - public: - static BlobFile* GetInstance() { - static BlobFile instance("blob_data"); - return &instance; - } - }; - -struct TableBuilder::Rep { - Rep(const Options& opt, WritableFile* f) - : options(opt), - index_block_options(opt), - file(f), - offset(0), - data_block(&options), - index_block(&index_block_options), - num_entries(0), - closed(false), - filter_block(opt.filter_policy == nullptr - ? nullptr - : new FilterBlockBuilder(opt.filter_policy)), - pending_index_entry(false) { - index_block_options.block_restart_interval = 1; - } - - Options options; - Options index_block_options; - WritableFile* file; - uint64_t offset; - Status status; - BlockBuilder data_block; - BlockBuilder index_block; - std::string last_key; - int64_t num_entries; - bool closed; // Either Finish() or Abandon() has been called. - FilterBlockBuilder* filter_block; - - // We do not emit the index entry for a block until we have seen the - // first key for the next data block. This allows us to use shorter - // keys in the index block. For example, consider a block boundary - // between the keys "the quick brown fox" and "the who". We can use - // "the r" as the key for the index block entry since it is >= all - // entries in the first block and < all entries in subsequent - // blocks. - // - // Invariant: r->pending_index_entry is true only if data_block is empty. - bool pending_index_entry; - BlockHandle pending_handle; // Handle to add to index block - - std::string compressed_output; -}; - -TableBuilder::TableBuilder(const Options& options, WritableFile* file) - : rep_(new Rep(options, file)) { - if (rep_->filter_block != nullptr) { - rep_->filter_block->StartBlock(0); - } -} - -TableBuilder::~TableBuilder() { - assert(rep_->closed); // Catch errors where caller forgot to call Finish() - delete rep_->filter_block; - delete rep_; -} - -Status TableBuilder::ChangeOptions(const Options& options) { - // Note: if more fields are added to Options, update - // this function to catch changes that should not be allowed to - // change in the middle of building a Table. - if (options.comparator != rep_->options.comparator) { - return Status::InvalidArgument("changing comparator while building table"); - } - - // Note that any live BlockBuilders point to rep_->options and therefore - // will automatically pick up the updated options. - rep_->options = options; - rep_->index_block_options = options; - rep_->index_block_options.block_restart_interval = 1; - return Status::OK(); -} - -void TableBuilder::Add(const Slice& key, const Slice& value) { - Rep* r = rep_; - assert(!r->closed); - if (!ok()) return; - if (r->num_entries > 0) { - assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); - } - - if (r->pending_index_entry) { - assert(r->data_block.empty()); - r->options.comparator->FindShortestSeparator(&r->last_key, key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); - r->pending_index_entry = false; - } - - if (r->filter_block != nullptr) { - r->filter_block->AddKey(key); - } - - r->last_key.assign(key.data(), key.size()); - r->num_entries++; - r->data_block.Add(key, value); - - const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); - if (estimated_block_size >= r->options.block_size) { - Flush(); - } -} - -void TableBuilder::Flush() { - Rep* r = rep_; - assert(!r->closed); - if (!ok()) return; - if (r->data_block.empty()) return; //朴,正常判断 - assert(!r->pending_index_entry); - - if (DBImpl::key_value_separated_) { - // 这里获取数据块内容并初始化 Block 对象,朴 - Slice block_content = r->data_block.Finish(); - BlockContents contents; - contents.data = block_content; - contents.heap_allocated = false; - contents.cachable = false; - - // 初始化 Block - Block data_block(contents); - std::unique_ptr iter(data_block.NewIterator(Options().comparator)); - - // 遍历数据块中的键值对 - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - const Slice& key = iter->key(); - const Slice& value = iter->value(); - - // 检查值是否大于阈值 - if (value.size() > min_blob_size) { - // 将值存储到 blobfile 中 - Status status = blobfile->Put(key, value); - if (!status.ok()) { - r->status = status; - } - } - } - } - - WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。 - if (ok()) { - r->pending_index_entry = true; - r->status = r->file->Flush(); //刷新 - } - if (r->filter_block != nullptr) { - r->filter_block->StartBlock(r->offset); - } -} - -void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { - // File format contains a sequence of blocks where each block has: - // block_data: uint8[n] - // type: uint8 - // crc: uint32 - assert(ok()); - Rep* r = rep_; - Slice raw = block->Finish(); - - Slice block_contents; - CompressionType type = r->options.compression; - // TODO(postrelease): Support more compression options: zlib? - switch (type) { - case kNoCompression: - block_contents = raw; - break; - - case kSnappyCompression: { - std::string* compressed = &r->compressed_output; - if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && - compressed->size() < raw.size() - (raw.size() / 8u)) { - block_contents = *compressed; - } else { - // Snappy not supported, or compressed less than 12.5%, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } - - case kZstdCompression: { - std::string* compressed = &r->compressed_output; - if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(), - raw.size(), compressed) && - compressed->size() < raw.size() - (raw.size() / 8u)) { - block_contents = *compressed; - } else { - // Zstd not supported, or compressed less than 12.5%, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } - } - WriteRawBlock(block_contents, type, handle); - r->compressed_output.clear(); - block->Reset(); -} - -void TableBuilder::WriteRawBlock(const Slice& block_contents, - CompressionType type, BlockHandle* handle) { - Rep* r = rep_; - handle->set_offset(r->offset); - handle->set_size(block_contents.size()); - r->status = r->file->Append(block_contents); - if (r->status.ok()) { - char trailer[kBlockTrailerSize]; - trailer[0] = type; - uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); - crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type - EncodeFixed32(trailer + 1, crc32c::Mask(crc)); - r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); - if (r->status.ok()) { - r->offset += block_contents.size() + kBlockTrailerSize; - } - } -} - -Status TableBuilder::status() const { return rep_->status; } - -Status TableBuilder::Finish() { - Rep* r = rep_; - Flush(); - assert(!r->closed); - r->closed = true; - - BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; - - // Write filter block - if (ok() && r->filter_block != nullptr) { - WriteRawBlock(r->filter_block->Finish(), kNoCompression, - &filter_block_handle); - } - - // Write metaindex block - if (ok()) { - BlockBuilder meta_index_block(&r->options); - if (r->filter_block != nullptr) { - // Add mapping from "filter.Name" to location of filter data - std::string key = "filter."; - key.append(r->options.filter_policy->Name()); - std::string handle_encoding; - filter_block_handle.EncodeTo(&handle_encoding); - meta_index_block.Add(key, handle_encoding); - } - - // TODO(postrelease): Add stats and other meta blocks - WriteBlock(&meta_index_block, &metaindex_block_handle); - } - - // Write index block - if (ok()) { - if (r->pending_index_entry) { - r->options.comparator->FindShortSuccessor(&r->last_key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); - r->pending_index_entry = false; - } - WriteBlock(&r->index_block, &index_block_handle); - } - - // Write footer - if (ok()) { - Footer footer; - footer.set_metaindex_handle(metaindex_block_handle); - footer.set_index_handle(index_block_handle); - std::string footer_encoding; - footer.EncodeTo(&footer_encoding); - r->status = r->file->Append(footer_encoding); - if (r->status.ok()) { - r->offset += footer_encoding.size(); - } - } - return r->status; -} - -void TableBuilder::Abandon() { - Rep* r = rep_; - assert(!r->closed); - r->closed = true; -} - -uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; } - -uint64_t TableBuilder::FileSize() const { return rep_->offset; } - -} // namespace leveldb +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/table_builder.h" + +#include + +#include "leveldb/comparator.h" +#include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/options.h" +#include "table/block_builder.h" +#include "table/filter_block.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "db/db_impl.h" //朴 +#include "table/blob_file.h" //朴 +#include "table/block.h" //朴 + +const size_t min_blob_size = 1024; // 设定值大小阈值为 1KB,朴 + +namespace leveldb { + + + +struct TableBuilder::Rep { + Rep(const Options& opt, WritableFile* f) + : options(opt), + index_block_options(opt), + file(f), + offset(0), + data_block(&options), + index_block(&index_block_options), + num_entries(0), + closed(false), + filter_block(opt.filter_policy == nullptr + ? nullptr + : new FilterBlockBuilder(opt.filter_policy)), + pending_index_entry(false) { + index_block_options.block_restart_interval = 1; + } + + Options options; + Options index_block_options; + WritableFile* file; + uint64_t offset; + Status status; + BlockBuilder data_block; + BlockBuilder index_block; + std::string last_key; + int64_t num_entries; + bool closed; // Either Finish() or Abandon() has been called. + FilterBlockBuilder* filter_block; + + // We do not emit the index entry for a block until we have seen the + // first key for the next data block. This allows us to use shorter + // keys in the index block. For example, consider a block boundary + // between the keys "the quick brown fox" and "the who". We can use + // "the r" as the key for the index block entry since it is >= all + // entries in the first block and < all entries in subsequent + // blocks. + // + // Invariant: r->pending_index_entry is true only if data_block is empty. + bool pending_index_entry; + BlockHandle pending_handle; // Handle to add to index block + + std::string compressed_output; +}; + +TableBuilder::TableBuilder(const Options& options, WritableFile* file) + : rep_(new Rep(options, file)) { + if (rep_->filter_block != nullptr) { + rep_->filter_block->StartBlock(0); + } +} + +TableBuilder::~TableBuilder() { + assert(rep_->closed); // Catch errors where caller forgot to call Finish() + delete rep_->filter_block; + delete rep_; +} + +Status TableBuilder::ChangeOptions(const Options& options) { + // Note: if more fields are added to Options, update + // this function to catch changes that should not be allowed to + // change in the middle of building a Table. + if (options.comparator != rep_->options.comparator) { + return Status::InvalidArgument("changing comparator while building table"); + } + + // Note that any live BlockBuilders point to rep_->options and therefore + // will automatically pick up the updated options. + rep_->options = options; + rep_->index_block_options = options; + rep_->index_block_options.block_restart_interval = 1; + return Status::OK(); +} + +void TableBuilder::Add(const Slice& key, const Slice& value) { + Rep* r = rep_; + assert(!r->closed); + if (!ok()) return; + if (r->num_entries > 0) { + assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); + } + + if (r->pending_index_entry) { + assert(r->data_block.empty()); + r->options.comparator->FindShortestSeparator(&r->last_key, key); + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->pending_index_entry = false; + } + + if (r->filter_block != nullptr) { + r->filter_block->AddKey(key); + } + + r->last_key.assign(key.data(), key.size()); + r->num_entries++; + r->data_block.Add(key, value); + + const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); + if (estimated_block_size >= r->options.block_size) { + Flush(); + } +} + +void TableBuilder::Flush() { + Rep* r = rep_; + assert(!r->closed); + if (!ok()) return; + if (r->data_block.empty()) return; //朴,正常判断 + assert(!r->pending_index_entry); + + if (DBImpl::key_value_separated_) { + // 这里获取数据块内容并初始化 Block 对象,朴 + Slice block_content = r->data_block.Finish(); + BlockContents contents; + contents.data = block_content; + contents.heap_allocated = false; + contents.cachable = false; + Rep* new_rep = new Rep(r->options, r->file); // 创建一个新的 Rep 实例 + new_rep->offset = r->offset; // 新的 offset 初始化为当前的 offset + new_rep->num_entries = r->num_entries; + + // 初始化 Block + Block data_block(contents); + leveldb::WritableFile* dest = nullptr; + leveldb::blob::BlobFile blobfile(dest); // 可以动态生成文件名以防止重复 // 初始化 BlobFile 对象,朴 + leveldb::WritableFile* file; + int bfid = DBImpl::NewBlobNum(); // 生成唯一的 blobfile id + std::unique_ptr iter(data_block.NewIterator(Options().comparator)); + + // 遍历数据块中的键值对 + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + const Slice& key = iter->key(); + const Slice& value = iter->value(); + + // 检查值是否大于阈值 + if (value.size() > min_blob_size) { + // 将值存储到 blobfile 中 + uint64_t offset; // 局部变量存储偏移量 + Status status = blobfile.AddRecord(key, value, offset); + if (!status.ok()) { + r->status = status; + } + // 这里修改 value,存储 Blob 的 offset 和 bfid + std::string new_value = EncodeBlobValue(offset, bfid); + new_rep->data_block.Add(key, Slice(new_value)); + } + else{ + // 不需要 Blob 存储,直接处理普通值 + new_rep->data_block.Add(key, value); + } + } + } + + WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。 + if (ok()) { + r->pending_index_entry = true; + r->status = r->file->Flush(); //刷新 + } + if (r->filter_block != nullptr) { + r->filter_block->StartBlock(r->offset); + } +} + +std::string TableBuilder::EncodeBlobValue(uint64_t offset, int bfid) { + // 自定义方法:编码新的 Blob 值 + std::string result; + + // 为 result 分配空间 + result.resize(8 + 4); // 64位 (8字节) + 32位 (4字节) + // 将 offset 和 bfid 编码成一个新的值 + std::string result; + EncodeFixed64(&result[0], offset); // 编码 offset + EncodeFixed32(&result[8], bfid); // 编码 bfid + return result; +} + + +void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { + // File format contains a sequence of blocks where each block has: + // block_data: uint8[n] + // type: uint8 + // crc: uint32 + assert(ok()); + Rep* r = rep_; + Slice raw = block->Finish(); + + Slice block_contents; + CompressionType type = r->options.compression; + // TODO(postrelease): Support more compression options: zlib? + switch (type) { + case kNoCompression: + block_contents = raw; + break; + + case kSnappyCompression: { + std::string* compressed = &r->compressed_output; + if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && + compressed->size() < raw.size() - (raw.size() / 8u)) { + block_contents = *compressed; + } else { + // Snappy not supported, or compressed less than 12.5%, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + } + + case kZstdCompression: { + std::string* compressed = &r->compressed_output; + if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(), + raw.size(), compressed) && + compressed->size() < raw.size() - (raw.size() / 8u)) { + block_contents = *compressed; + } else { + // Zstd not supported, or compressed less than 12.5%, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + } + } + WriteRawBlock(block_contents, type, handle); + r->compressed_output.clear(); + block->Reset(); +} + +void TableBuilder::WriteRawBlock(const Slice& block_contents, + CompressionType type, BlockHandle* handle) { + Rep* r = rep_; + handle->set_offset(r->offset); + handle->set_size(block_contents.size()); + r->status = r->file->Append(block_contents); + if (r->status.ok()) { + char trailer[kBlockTrailerSize]; + trailer[0] = type; + uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); + crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type + EncodeFixed32(trailer + 1, crc32c::Mask(crc)); + r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (r->status.ok()) { + r->offset += block_contents.size() + kBlockTrailerSize; + } + } +} + +Status TableBuilder::status() const { return rep_->status; } + +Status TableBuilder::Finish() { + Rep* r = rep_; + Flush(); + assert(!r->closed); + r->closed = true; + + BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; + + // Write filter block + if (ok() && r->filter_block != nullptr) { + WriteRawBlock(r->filter_block->Finish(), kNoCompression, + &filter_block_handle); + } + + // Write metaindex block + if (ok()) { + BlockBuilder meta_index_block(&r->options); + if (r->filter_block != nullptr) { + // Add mapping from "filter.Name" to location of filter data + std::string key = "filter."; + key.append(r->options.filter_policy->Name()); + std::string handle_encoding; + filter_block_handle.EncodeTo(&handle_encoding); + meta_index_block.Add(key, handle_encoding); + } + + // TODO(postrelease): Add stats and other meta blocks + WriteBlock(&meta_index_block, &metaindex_block_handle); + } + + // Write index block + if (ok()) { + if (r->pending_index_entry) { + r->options.comparator->FindShortSuccessor(&r->last_key); + std::string handle_encoding; + r->pending_handle.EncodeTo(&handle_encoding); + r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->pending_index_entry = false; + } + WriteBlock(&r->index_block, &index_block_handle); + } + + // Write footer + if (ok()) { + Footer footer; + footer.set_metaindex_handle(metaindex_block_handle); + footer.set_index_handle(index_block_handle); + std::string footer_encoding; + footer.EncodeTo(&footer_encoding); + r->status = r->file->Append(footer_encoding); + if (r->status.ok()) { + r->offset += footer_encoding.size(); + } + } + return r->status; +} + +void TableBuilder::Abandon() { + Rep* r = rep_; + assert(!r->closed); + r->closed = true; +} + +uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; } + +uint64_t TableBuilder::FileSize() const { return rep_->offset; } + +} // namespace leveldb