From 1d6e8d64ee8489a85ce939b819d106d2b54abb15 Mon Sep 17 00:00:00 2001 From: leveldb Team Date: Wed, 15 Feb 2023 16:04:43 -0800 Subject: [PATCH] Add support for Zstd-based compression in LevelDB. This change implements support for Zstd-based compression in LevelDB. Building up from the Snappy compression (which has been supported since inception), this change adds Zstd as an alternate compression algorithm. We are implementing this to provide alternative options for users who might have different performance and efficiency requirements. For instance, the Zstandard website (https://facebook.github.io/zstd/) claims that the Zstd algorithm can achieve around 30% higher compression ratios than Snappy, with relatively smaller (~10%) slowdowns in de/compression speeds. Benchmarking results: $ blaze-bin/third_party/leveldb/db_bench LevelDB: version 1.23 Date: Thu Feb 2 18:50:06 2023 CPU: 56 * Intel(R) Xeon(R) CPU E5-2690 v4 @ 2.60GHz CPUCache: 35840 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) ------------------------------------------------ fillseq : 2.613 micros/op; 42.3 MB/s fillsync : 3924.432 micros/op; 0.0 MB/s (1000 ops) fillrandom : 3.609 micros/op; 30.7 MB/s overwrite : 4.508 micros/op; 24.5 MB/s readrandom : 6.136 micros/op; (864322 of 1000000 found) readrandom : 5.446 micros/op; (864083 of 1000000 found) readseq : 0.180 micros/op; 613.3 MB/s readreverse : 0.321 micros/op; 344.7 MB/s compact : 827043.000 micros/op; readrandom : 4.603 micros/op; (864105 of 1000000 found) readseq : 0.169 micros/op; 656.3 MB/s readreverse : 0.315 micros/op; 350.8 MB/s fill100K : 854.009 micros/op; 111.7 MB/s (1000 ops) crc32c : 1.227 micros/op; 3184.0 MB/s (4K per op) snappycomp : 3.610 micros/op; 1081.9 MB/s (output: 55.2%) snappyuncomp : 0.691 micros/op; 5656.3 MB/s zstdcomp : 15.731 micros/op; 248.3 MB/s (output: 44.1%) zstduncomp : 4.218 micros/op; 926.2 MB/s PiperOrigin-RevId: 509957778 --- CMakeLists.txt | 4 ++ README.md | 2 +- benchmarks/db_bench.cc | 109 +++++++++++++++++++++++++++++----------------- include/leveldb/options.h | 3 +- port/port_config.h.in | 7 ++- port/port_example.h | 21 ++++++++- port/port_stdcxx.h | 67 ++++++++++++++++++++++++++++ table/format.cc | 23 +++++++++- table/table_builder.cc | 14 ++++++ table/table_test.cc | 27 +++++++++--- 10 files changed, 224 insertions(+), 53 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b829c94..fda9e01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H) include(CheckLibraryExists) check_library_exists(crc32c crc32c_value "" HAVE_CRC32C) check_library_exists(snappy snappy_compress "" HAVE_SNAPPY) +check_library_exists(zstd zstd_compress "" HAVE_ZSTD) check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC) include(CheckCXXSymbolExists) @@ -273,6 +274,9 @@ endif(HAVE_CRC32C) if(HAVE_SNAPPY) target_link_libraries(leveldb snappy) endif(HAVE_SNAPPY) +if(HAVE_ZSTD) + target_link_libraries(leveldb zstd) +endif(HAVE_ZSTD) if(HAVE_TCMALLOC) target_link_libraries(leveldb tcmalloc) endif(HAVE_TCMALLOC) diff --git a/README.md b/README.md index 3088c55..a5e5416 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com) * Multiple changes can be made in one atomic batch. * Users can create a transient snapshot to get a consistent view of data. * Forward and backward iteration is supported over the data. - * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/). + * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported. * External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions. # Documentation diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 43ff43f..db866f5 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -60,7 +60,9 @@ static const char* FLAGS_benchmarks = "fill100K," "crc32c," "snappycomp," - "snappyuncomp,"; + "snappyuncomp," + "zstdcomp," + "zstduncomp,"; // Number of key/values to place in database static int FLAGS_num = 1000000; @@ -367,6 +369,57 @@ struct ThreadState { ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} }; +void Compress( + ThreadState* thread, std::string name, + std::function compress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + int64_t bytes = 0; + int64_t produced = 0; + bool ok = true; + std::string compressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = compress_func(input.data(), input.size(), &compressed); + produced += compressed.size(); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + char buf[100]; + std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", + (produced * 100.0) / bytes); + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); + } +} + +void Uncompress( + ThreadState* thread, std::string name, + std::function compress_func, + std::function uncompress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + std::string compressed; + bool ok = compress_func(input.data(), input.size(), &compressed); + int64_t bytes = 0; + char* uncompressed = new char[input.size()]; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = uncompress_func(compressed.data(), compressed.size(), uncompressed); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + delete[] uncompressed; + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + thread->stats.AddBytes(bytes); + } +} + } // namespace class Benchmark { @@ -579,6 +632,10 @@ class Benchmark { method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { method = &Benchmark::SnappyUncompress; + } else if (name == Slice("zstdcomp")) { + method = &Benchmark::ZstdCompress; + } else if (name == Slice("zstduncomp")) { + method = &Benchmark::ZstdUncompress; } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { @@ -713,50 +770,20 @@ class Benchmark { } void SnappyCompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - int64_t bytes = 0; - int64_t produced = 0; - bool ok = true; - std::string compressed; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - produced += compressed.size(); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - char buf[100]; - std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", - (produced * 100.0) / bytes); - thread->stats.AddMessage(buf); - thread->stats.AddBytes(bytes); - } + Compress(thread, "snappy", &port::Snappy_Compress); } void SnappyUncompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - std::string compressed; - bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - int64_t bytes = 0; - char* uncompressed = new char[input.size()]; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - delete[] uncompressed; + Uncompress(thread, "snappy", &port::Snappy_Compress, + &port::Snappy_Uncompress); + } - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - thread->stats.AddBytes(bytes); - } + void ZstdCompress(ThreadState* thread) { + Compress(thread, "zstd", &port::Zstd_Compress); + } + + void ZstdUncompress(ThreadState* thread) { + Uncompress(thread, "zstd", &port::Zstd_Compress, &port::Zstd_Uncompress); } void Open() { diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 97c6b0b..79bcdbb 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -26,7 +26,8 @@ enum CompressionType { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. kNoCompression = 0x0, - kSnappyCompression = 0x1 + kSnappyCompression = 0x1, + kZstdCompression = 0x2, }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/port/port_config.h.in b/port/port_config.h.in index 272671d..34bf66a 100644 --- a/port/port_config.h.in +++ b/port/port_config.h.in @@ -30,4 +30,9 @@ #cmakedefine01 HAVE_SNAPPY #endif // !defined(HAVE_SNAPPY) -#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_ \ No newline at end of file +// Define to 1 if you have Zstd. +#if !defined(HAVE_Zstd) +#cmakedefine01 HAVE_ZSTD +#endif // !defined(HAVE_ZSTD) + +#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_ diff --git a/port/port_example.h b/port/port_example.h index 704aa24..b1a1c32 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length, size_t* result); // Attempt to snappy uncompress input[0,input_length-1] into *output. -// Returns true if successful, false if the input is invalid lightweight +// Returns true if successful, false if the input is invalid snappy // compressed data. // // REQUIRES: at least the first "n" bytes of output[] must be writable @@ -81,6 +81,25 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length, bool Snappy_Uncompress(const char* input_data, size_t input_length, char* output); +// Store the zstd compression of "input[0,input_length-1]" in *output. +// Returns false if zstd is not supported by this port. +bool Zstd_Compress(const char* input, size_t input_length, std::string* output); + +// If input[0,input_length-1] looks like a valid zstd compressed +// buffer, store the size of the uncompressed data in *result and +// return true. Else return false. +bool Zstd_GetUncompressedLength(const char* input, size_t length, + size_t* result); + +// Attempt to zstd uncompress input[0,input_length-1] into *output. +// Returns true if successful, false if the input is invalid zstd +// compressed data. +// +// REQUIRES: at least the first "n" bytes of output[] must be writable +// where "n" is the result of a successful call to +// Zstd_GetUncompressedLength. +bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output); + // ------------------ Miscellaneous ------------------- // If heap profiling is not supported, returns false. diff --git a/port/port_stdcxx.h b/port/port_stdcxx.h index 2bda48d..ca961e6 100644 --- a/port/port_stdcxx.h +++ b/port/port_stdcxx.h @@ -28,6 +28,9 @@ #if HAVE_SNAPPY #include #endif // HAVE_SNAPPY +#if HAVE_ZSTD +#include +#endif // HAVE_ZSTD #include #include // NOLINT @@ -126,6 +129,70 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) { #endif // HAVE_SNAPPY } +inline bool Zstd_Compress(const char* input, size_t length, + std::string* output) { +#if HAVE_ZSTD + // Get the MaxCompressedLength. + size_t outlen = ZSTD_compressBound(length); + if (ZSTD_isError(outlen)) { + return false; + } + output->resize(outlen); + ZSTD_CCtx* ctx = ZSTD_createCCtx(); + outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length); + ZSTD_freeCCtx(ctx); + if (ZSTD_isError(outlen)) { + return false; + } + output->resize(outlen); + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)output; + return false; +#endif // HAVE_ZSTD +} + +inline bool Zstd_GetUncompressedLength(const char* input, size_t length, + size_t* result) { +#if HAVE_ZSTD + size_t size = ZSTD_getFrameContentSize(input, length); + if (size == 0) return false; + *result = size; + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)result; + return false; +#endif // HAVE_ZSTD +} + +inline bool Zstd_Uncompress(const char* input, size_t length, char* output) { +#if HAVE_ZSTD + size_t outlen; + if (!Zstd_GetUncompressedLength(input, length, &outlen)) { + return false; + } + ZSTD_DCtx* ctx = ZSTD_createDCtx(); + outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length); + ZSTD_freeDCtx(ctx); + if (ZSTD_isError(outlen)) { + return false; + } + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)output; + return false; +#endif // HAVE_ZSTD +} + inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { // Silence compiler warnings about unused arguments. (void)func; diff --git a/table/format.cc b/table/format.cc index e183977..7647372 100644 --- a/table/format.cc +++ b/table/format.cc @@ -5,6 +5,7 @@ #include "table/format.h" #include "leveldb/env.h" +#include "leveldb/options.h" #include "port/port.h" #include "table/block.h" #include "util/coding.h" @@ -116,13 +117,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, size_t ulength = 0; if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { delete[] buf; - return Status::Corruption("corrupted compressed block contents"); + return Status::Corruption("corrupted snappy compressed block length"); } char* ubuf = new char[ulength]; if (!port::Snappy_Uncompress(data, n, ubuf)) { delete[] buf; delete[] ubuf; - return Status::Corruption("corrupted compressed block contents"); + return Status::Corruption("corrupted snappy compressed block contents"); + } + delete[] buf; + result->data = Slice(ubuf, ulength); + result->heap_allocated = true; + result->cachable = true; + break; + } + case kZstdCompression: { + size_t ulength = 0; + if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) { + delete[] buf; + return Status::Corruption("corrupted zstd compressed block length"); + } + char* ubuf = new char[ulength]; + if (!port::Zstd_Uncompress(data, n, ubuf)) { + delete[] buf; + delete[] ubuf; + return Status::Corruption("corrupted zstd compressed block contents"); } delete[] buf; result->data = Slice(ubuf, ulength); diff --git a/table/table_builder.cc b/table/table_builder.cc index 29a619d..ba3df9e 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -168,6 +168,20 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { } break; } + + case kZstdCompression: { + std::string* compressed = &r->compressed_output; + if (port::Zstd_Compress(raw.data(), raw.size(), compressed) && + compressed->size() < raw.size() - (raw.size() / 8u)) { + block_contents = *compressed; + } else { + // Zstd not supported, or compressed less than 12.5%, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + } } WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); diff --git a/table/table_test.cc b/table/table_test.cc index a405586..b3baf95 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -14,6 +14,7 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "leveldb/options.h" #include "leveldb/table_builder.h" #include "table/block.h" #include "table/block_builder.h" @@ -784,15 +785,29 @@ TEST(TableTest, ApproximateOffsetOfPlain) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); } -static bool SnappyCompressionSupported() { +static bool CompressionSupported(CompressionType type) { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); + if (type == kSnappyCompression) { + return port::Snappy_Compress(in.data(), in.size(), &out); + } else if (type == kZstdCompression) { + return port::Zstd_Compress(in.data(), in.size(), &out); + } + return false; } -TEST(TableTest, ApproximateOffsetOfCompressed) { - if (!SnappyCompressionSupported()) - GTEST_SKIP() << "skipping compression tests"; +class CompressionTableTest + : public ::testing::TestWithParam> {}; + +INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest, + ::testing::Values(kSnappyCompression, + kZstdCompression)); + +TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) { + CompressionType type = ::testing::get<0>(GetParam()); + if (!CompressionSupported(type)) { + GTEST_SKIP() << "skipping compression test: " << type; + } Random rnd(301); TableConstructor c(BytewiseComparator()); @@ -805,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) { KVMap kvmap; Options options; options.block_size = 1024; - options.compression = kSnappyCompression; + options.compression = type; c.Finish(options, &keys, &kvmap); // Expected upper and lower bounds of space used by compressible strings.