diff --git a/CMakeLists.txt b/CMakeLists.txt index b829c94..fda9e01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H) include(CheckLibraryExists) check_library_exists(crc32c crc32c_value "" HAVE_CRC32C) check_library_exists(snappy snappy_compress "" HAVE_SNAPPY) +check_library_exists(zstd zstd_compress "" HAVE_ZSTD) check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC) include(CheckCXXSymbolExists) @@ -273,6 +274,9 @@ endif(HAVE_CRC32C) if(HAVE_SNAPPY) target_link_libraries(leveldb snappy) endif(HAVE_SNAPPY) +if(HAVE_ZSTD) + target_link_libraries(leveldb zstd) +endif(HAVE_ZSTD) if(HAVE_TCMALLOC) target_link_libraries(leveldb tcmalloc) endif(HAVE_TCMALLOC) diff --git a/README.md b/README.md index 3088c55..a5e5416 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com) * Multiple changes can be made in one atomic batch. * Users can create a transient snapshot to get a consistent view of data. * Forward and backward iteration is supported over the data. - * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/). + * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported. * External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions. # Documentation diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 43ff43f..db866f5 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -60,7 +60,9 @@ static const char* FLAGS_benchmarks = "fill100K," "crc32c," "snappycomp," - "snappyuncomp,"; + "snappyuncomp," + "zstdcomp," + "zstduncomp,"; // Number of key/values to place in database static int FLAGS_num = 1000000; @@ -367,6 +369,57 @@ struct ThreadState { ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} }; +void Compress( + ThreadState* thread, std::string name, + std::function compress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + int64_t bytes = 0; + int64_t produced = 0; + bool ok = true; + std::string compressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = compress_func(input.data(), input.size(), &compressed); + produced += compressed.size(); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + char buf[100]; + std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", + (produced * 100.0) / bytes); + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); + } +} + +void Uncompress( + ThreadState* thread, std::string name, + std::function compress_func, + std::function uncompress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + std::string compressed; + bool ok = compress_func(input.data(), input.size(), &compressed); + int64_t bytes = 0; + char* uncompressed = new char[input.size()]; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = uncompress_func(compressed.data(), compressed.size(), uncompressed); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + delete[] uncompressed; + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + thread->stats.AddBytes(bytes); + } +} + } // namespace class Benchmark { @@ -579,6 +632,10 @@ class Benchmark { method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { method = &Benchmark::SnappyUncompress; + } else if (name == Slice("zstdcomp")) { + method = &Benchmark::ZstdCompress; + } else if (name == Slice("zstduncomp")) { + method = &Benchmark::ZstdUncompress; } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { @@ -713,50 +770,20 @@ class Benchmark { } void SnappyCompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - int64_t bytes = 0; - int64_t produced = 0; - bool ok = true; - std::string compressed; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - produced += compressed.size(); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - char buf[100]; - std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", - (produced * 100.0) / bytes); - thread->stats.AddMessage(buf); - thread->stats.AddBytes(bytes); - } + Compress(thread, "snappy", &port::Snappy_Compress); } void SnappyUncompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - std::string compressed; - bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - int64_t bytes = 0; - char* uncompressed = new char[input.size()]; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - delete[] uncompressed; + Uncompress(thread, "snappy", &port::Snappy_Compress, + &port::Snappy_Uncompress); + } - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - thread->stats.AddBytes(bytes); - } + void ZstdCompress(ThreadState* thread) { + Compress(thread, "zstd", &port::Zstd_Compress); + } + + void ZstdUncompress(ThreadState* thread) { + Uncompress(thread, "zstd", &port::Zstd_Compress, &port::Zstd_Uncompress); } void Open() { diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 97c6b0b..79bcdbb 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -26,7 +26,8 @@ enum CompressionType { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. kNoCompression = 0x0, - kSnappyCompression = 0x1 + kSnappyCompression = 0x1, + kZstdCompression = 0x2, }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/port/port_config.h.in b/port/port_config.h.in index 272671d..34bf66a 100644 --- a/port/port_config.h.in +++ b/port/port_config.h.in @@ -30,4 +30,9 @@ #cmakedefine01 HAVE_SNAPPY #endif // !defined(HAVE_SNAPPY) -#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_ \ No newline at end of file +// Define to 1 if you have Zstd. +#if !defined(HAVE_Zstd) +#cmakedefine01 HAVE_ZSTD +#endif // !defined(HAVE_ZSTD) + +#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_ diff --git a/port/port_example.h b/port/port_example.h index 704aa24..b1a1c32 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length, size_t* result); // Attempt to snappy uncompress input[0,input_length-1] into *output. -// Returns true if successful, false if the input is invalid lightweight +// Returns true if successful, false if the input is invalid snappy // compressed data. // // REQUIRES: at least the first "n" bytes of output[] must be writable @@ -81,6 +81,25 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length, bool Snappy_Uncompress(const char* input_data, size_t input_length, char* output); +// Store the zstd compression of "input[0,input_length-1]" in *output. +// Returns false if zstd is not supported by this port. +bool Zstd_Compress(const char* input, size_t input_length, std::string* output); + +// If input[0,input_length-1] looks like a valid zstd compressed +// buffer, store the size of the uncompressed data in *result and +// return true. Else return false. +bool Zstd_GetUncompressedLength(const char* input, size_t length, + size_t* result); + +// Attempt to zstd uncompress input[0,input_length-1] into *output. +// Returns true if successful, false if the input is invalid zstd +// compressed data. +// +// REQUIRES: at least the first "n" bytes of output[] must be writable +// where "n" is the result of a successful call to +// Zstd_GetUncompressedLength. +bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output); + // ------------------ Miscellaneous ------------------- // If heap profiling is not supported, returns false. diff --git a/port/port_stdcxx.h b/port/port_stdcxx.h index 2bda48d..ca961e6 100644 --- a/port/port_stdcxx.h +++ b/port/port_stdcxx.h @@ -28,6 +28,9 @@ #if HAVE_SNAPPY #include #endif // HAVE_SNAPPY +#if HAVE_ZSTD +#include +#endif // HAVE_ZSTD #include #include // NOLINT @@ -126,6 +129,70 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) { #endif // HAVE_SNAPPY } +inline bool Zstd_Compress(const char* input, size_t length, + std::string* output) { +#if HAVE_ZSTD + // Get the MaxCompressedLength. + size_t outlen = ZSTD_compressBound(length); + if (ZSTD_isError(outlen)) { + return false; + } + output->resize(outlen); + ZSTD_CCtx* ctx = ZSTD_createCCtx(); + outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length); + ZSTD_freeCCtx(ctx); + if (ZSTD_isError(outlen)) { + return false; + } + output->resize(outlen); + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)output; + return false; +#endif // HAVE_ZSTD +} + +inline bool Zstd_GetUncompressedLength(const char* input, size_t length, + size_t* result) { +#if HAVE_ZSTD + size_t size = ZSTD_getFrameContentSize(input, length); + if (size == 0) return false; + *result = size; + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)result; + return false; +#endif // HAVE_ZSTD +} + +inline bool Zstd_Uncompress(const char* input, size_t length, char* output) { +#if HAVE_ZSTD + size_t outlen; + if (!Zstd_GetUncompressedLength(input, length, &outlen)) { + return false; + } + ZSTD_DCtx* ctx = ZSTD_createDCtx(); + outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length); + ZSTD_freeDCtx(ctx); + if (ZSTD_isError(outlen)) { + return false; + } + return true; +#else + // Silence compiler warnings about unused arguments. + (void)input; + (void)length; + (void)output; + return false; +#endif // HAVE_ZSTD +} + inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { // Silence compiler warnings about unused arguments. (void)func; diff --git a/table/format.cc b/table/format.cc index e183977..7647372 100644 --- a/table/format.cc +++ b/table/format.cc @@ -5,6 +5,7 @@ #include "table/format.h" #include "leveldb/env.h" +#include "leveldb/options.h" #include "port/port.h" #include "table/block.h" #include "util/coding.h" @@ -116,13 +117,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, size_t ulength = 0; if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { delete[] buf; - return Status::Corruption("corrupted compressed block contents"); + return Status::Corruption("corrupted snappy compressed block length"); } char* ubuf = new char[ulength]; if (!port::Snappy_Uncompress(data, n, ubuf)) { delete[] buf; delete[] ubuf; - return Status::Corruption("corrupted compressed block contents"); + return Status::Corruption("corrupted snappy compressed block contents"); + } + delete[] buf; + result->data = Slice(ubuf, ulength); + result->heap_allocated = true; + result->cachable = true; + break; + } + case kZstdCompression: { + size_t ulength = 0; + if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) { + delete[] buf; + return Status::Corruption("corrupted zstd compressed block length"); + } + char* ubuf = new char[ulength]; + if (!port::Zstd_Uncompress(data, n, ubuf)) { + delete[] buf; + delete[] ubuf; + return Status::Corruption("corrupted zstd compressed block contents"); } delete[] buf; result->data = Slice(ubuf, ulength); diff --git a/table/table_builder.cc b/table/table_builder.cc index 29a619d..ba3df9e 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -168,6 +168,20 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { } break; } + + case kZstdCompression: { + std::string* compressed = &r->compressed_output; + if (port::Zstd_Compress(raw.data(), raw.size(), compressed) && + compressed->size() < raw.size() - (raw.size() / 8u)) { + block_contents = *compressed; + } else { + // Zstd not supported, or compressed less than 12.5%, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + } } WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); diff --git a/table/table_test.cc b/table/table_test.cc index a405586..b3baf95 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -14,6 +14,7 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "leveldb/options.h" #include "leveldb/table_builder.h" #include "table/block.h" #include "table/block_builder.h" @@ -784,15 +785,29 @@ TEST(TableTest, ApproximateOffsetOfPlain) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); } -static bool SnappyCompressionSupported() { +static bool CompressionSupported(CompressionType type) { std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; - return port::Snappy_Compress(in.data(), in.size(), &out); + if (type == kSnappyCompression) { + return port::Snappy_Compress(in.data(), in.size(), &out); + } else if (type == kZstdCompression) { + return port::Zstd_Compress(in.data(), in.size(), &out); + } + return false; } -TEST(TableTest, ApproximateOffsetOfCompressed) { - if (!SnappyCompressionSupported()) - GTEST_SKIP() << "skipping compression tests"; +class CompressionTableTest + : public ::testing::TestWithParam> {}; + +INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest, + ::testing::Values(kSnappyCompression, + kZstdCompression)); + +TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) { + CompressionType type = ::testing::get<0>(GetParam()); + if (!CompressionSupported(type)) { + GTEST_SKIP() << "skipping compression test: " << type; + } Random rnd(301); TableConstructor c(BytewiseComparator()); @@ -805,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) { KVMap kvmap; Options options; options.block_size = 1024; - options.compression = kSnappyCompression; + options.compression = type; c.Finish(options, &keys, &kvmap); // Expected upper and lower bounds of space used by compressible strings.