Browse Source

Add support for Zstd-based compression in LevelDB.

This change implements support for Zstd-based compression in LevelDB. Building
up from the Snappy compression (which has been supported since inception), this
change adds Zstd as an alternate compression algorithm.

We are implementing this to provide alternative options for users who might
have different performance and efficiency requirements. For instance, the
Zstandard website (https://facebook.github.io/zstd/) claims that the Zstd
algorithm can achieve around 30% higher compression ratios than Snappy, with
relatively smaller (~10%) slowdowns in de/compression speeds.

Benchmarking results:

$ blaze-bin/third_party/leveldb/db_bench
LevelDB:    version 1.23
Date:       Thu Feb  2 18:50:06 2023
CPU:        56 * Intel(R) Xeon(R) CPU E5-2690 v4 @ 2.60GHz
CPUCache:   35840 KB
Keys:       16 bytes each
Values:     100 bytes each (50 bytes after compression)
Entries:    1000000
RawSize:    110.6 MB (estimated)
FileSize:   62.9 MB (estimated)
------------------------------------------------
fillseq      :       2.613 micros/op;   42.3 MB/s
fillsync     :    3924.432 micros/op;    0.0 MB/s (1000 ops)
fillrandom   :       3.609 micros/op;   30.7 MB/s
overwrite    :       4.508 micros/op;   24.5 MB/s
readrandom   :       6.136 micros/op; (864322 of 1000000 found)
readrandom   :       5.446 micros/op; (864083 of 1000000 found)
readseq      :       0.180 micros/op;  613.3 MB/s
readreverse  :       0.321 micros/op;  344.7 MB/s
compact      :  827043.000 micros/op;
readrandom   :       4.603 micros/op; (864105 of 1000000 found)
readseq      :       0.169 micros/op;  656.3 MB/s
readreverse  :       0.315 micros/op;  350.8 MB/s
fill100K     :     854.009 micros/op;  111.7 MB/s (1000 ops)
crc32c       :       1.227 micros/op; 3184.0 MB/s (4K per op)
snappycomp   :       3.610 micros/op; 1081.9 MB/s (output: 55.2%)
snappyuncomp :       0.691 micros/op; 5656.3 MB/s
zstdcomp     :      15.731 micros/op;  248.3 MB/s (output: 44.1%)
zstduncomp   :       4.218 micros/op;  926.2 MB/s
PiperOrigin-RevId: 509957778
main
leveldb Team 1 year ago
committed by Victor Costan
parent
commit
1d6e8d64ee
10 changed files with 224 additions and 53 deletions
  1. +4
    -0
      CMakeLists.txt
  2. +1
    -1
      README.md
  3. +68
    -41
      benchmarks/db_bench.cc
  4. +2
    -1
      include/leveldb/options.h
  5. +6
    -1
      port/port_config.h.in
  6. +20
    -1
      port/port_example.h
  7. +67
    -0
      port/port_stdcxx.h
  8. +21
    -2
      table/format.cc
  9. +14
    -0
      table/table_builder.cc
  10. +21
    -6
      table/table_test.cc

+ 4
- 0
CMakeLists.txt View File

@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H)
include(CheckLibraryExists) include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C) check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY) check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC) check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
include(CheckCXXSymbolExists) include(CheckCXXSymbolExists)
@ -273,6 +274,9 @@ endif(HAVE_CRC32C)
if(HAVE_SNAPPY) if(HAVE_SNAPPY)
target_link_libraries(leveldb snappy) target_link_libraries(leveldb snappy)
endif(HAVE_SNAPPY) endif(HAVE_SNAPPY)
if(HAVE_ZSTD)
target_link_libraries(leveldb zstd)
endif(HAVE_ZSTD)
if(HAVE_TCMALLOC) if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc) target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC) endif(HAVE_TCMALLOC)

+ 1
- 1
README.md View File

@ -18,7 +18,7 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com)
* Multiple changes can be made in one atomic batch. * Multiple changes can be made in one atomic batch.
* Users can create a transient snapshot to get a consistent view of data. * Users can create a transient snapshot to get a consistent view of data.
* Forward and backward iteration is supported over the data. * Forward and backward iteration is supported over the data.
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/).
* Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported.
* External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions. * External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions.
# Documentation # Documentation

+ 68
- 41
benchmarks/db_bench.cc View File

@ -60,7 +60,9 @@ static const char* FLAGS_benchmarks =
"fill100K," "fill100K,"
"crc32c," "crc32c,"
"snappycomp," "snappycomp,"
"snappyuncomp,";
"snappyuncomp,"
"zstdcomp,"
"zstduncomp,";
// Number of key/values to place in database // Number of key/values to place in database
static int FLAGS_num = 1000000; static int FLAGS_num = 1000000;
@ -367,6 +369,57 @@ struct ThreadState {
ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
}; };
void Compress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = compress_func(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
}
void Uncompress(
ThreadState* thread, std::string name,
std::function<bool(const char*, size_t, std::string*)> compress_func,
std::function<bool(const char*, size_t, char*)> uncompress_func) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = compress_func(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = uncompress_func(compressed.data(), compressed.size(), uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
if (!ok) {
thread->stats.AddMessage("(" + name + " failure)");
} else {
thread->stats.AddBytes(bytes);
}
}
} // namespace } // namespace
class Benchmark { class Benchmark {
@ -579,6 +632,10 @@ class Benchmark {
method = &Benchmark::SnappyCompress; method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) { } else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress; method = &Benchmark::SnappyUncompress;
} else if (name == Slice("zstdcomp")) {
method = &Benchmark::ZstdCompress;
} else if (name == Slice("zstduncomp")) {
method = &Benchmark::ZstdUncompress;
} else if (name == Slice("heapprofile")) { } else if (name == Slice("heapprofile")) {
HeapProfile(); HeapProfile();
} else if (name == Slice("stats")) { } else if (name == Slice("stats")) {
@ -713,50 +770,20 @@ class Benchmark {
} }
void SnappyCompress(ThreadState* thread) { void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp();
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
char buf[100];
std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
Compress(thread, "snappy", &port::Snappy_Compress);
} }
void SnappyUncompress(ThreadState* thread) { void SnappyUncompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
bytes += input.size();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
Uncompress(thread, "snappy", &port::Snappy_Compress,
&port::Snappy_Uncompress);
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
} else {
thread->stats.AddBytes(bytes);
}
void ZstdCompress(ThreadState* thread) {
Compress(thread, "zstd", &port::Zstd_Compress);
}
void ZstdUncompress(ThreadState* thread) {
Uncompress(thread, "zstd", &port::Zstd_Compress, &port::Zstd_Uncompress);
} }
void Open() { void Open() {

+ 2
- 1
include/leveldb/options.h View File

@ -26,7 +26,8 @@ enum CompressionType {
// NOTE: do not change the values of existing entries, as these are // NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk. // part of the persistent format on disk.
kNoCompression = 0x0, kNoCompression = 0x0,
kSnappyCompression = 0x1
kSnappyCompression = 0x1,
kZstdCompression = 0x2,
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

+ 6
- 1
port/port_config.h.in View File

@ -30,4 +30,9 @@
#cmakedefine01 HAVE_SNAPPY #cmakedefine01 HAVE_SNAPPY
#endif // !defined(HAVE_SNAPPY) #endif // !defined(HAVE_SNAPPY)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_
// Define to 1 if you have Zstd.
#if !defined(HAVE_Zstd)
#cmakedefine01 HAVE_ZSTD
#endif // !defined(HAVE_ZSTD)
#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_

+ 20
- 1
port/port_example.h View File

@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
size_t* result); size_t* result);
// Attempt to snappy uncompress input[0,input_length-1] into *output. // Attempt to snappy uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid lightweight
// Returns true if successful, false if the input is invalid snappy
// compressed data. // compressed data.
// //
// REQUIRES: at least the first "n" bytes of output[] must be writable // REQUIRES: at least the first "n" bytes of output[] must be writable
@ -81,6 +81,25 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
bool Snappy_Uncompress(const char* input_data, size_t input_length, bool Snappy_Uncompress(const char* input_data, size_t input_length,
char* output); char* output);
// Store the zstd compression of "input[0,input_length-1]" in *output.
// Returns false if zstd is not supported by this port.
bool Zstd_Compress(const char* input, size_t input_length, std::string* output);
// If input[0,input_length-1] looks like a valid zstd compressed
// buffer, store the size of the uncompressed data in *result and
// return true. Else return false.
bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to zstd uncompress input[0,input_length-1] into *output.
// Returns true if successful, false if the input is invalid zstd
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
// where "n" is the result of a successful call to
// Zstd_GetUncompressedLength.
bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output);
// ------------------ Miscellaneous ------------------- // ------------------ Miscellaneous -------------------
// If heap profiling is not supported, returns false. // If heap profiling is not supported, returns false.

+ 67
- 0
port/port_stdcxx.h View File

@ -28,6 +28,9 @@
#if HAVE_SNAPPY #if HAVE_SNAPPY
#include <snappy.h> #include <snappy.h>
#endif // HAVE_SNAPPY #endif // HAVE_SNAPPY
#if HAVE_ZSTD
#include <zstd.h>
#endif // HAVE_ZSTD
#include <cassert> #include <cassert>
#include <condition_variable> // NOLINT #include <condition_variable> // NOLINT
@ -126,6 +129,70 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) {
#endif // HAVE_SNAPPY #endif // HAVE_SNAPPY
} }
inline bool Zstd_Compress(const char* input, size_t length,
std::string* output) {
#if HAVE_ZSTD
// Get the MaxCompressedLength.
size_t outlen = ZSTD_compressBound(length);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
ZSTD_CCtx* ctx = ZSTD_createCCtx();
outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length);
ZSTD_freeCCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
output->resize(outlen);
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_GetUncompressedLength(const char* input, size_t length,
size_t* result) {
#if HAVE_ZSTD
size_t size = ZSTD_getFrameContentSize(input, length);
if (size == 0) return false;
*result = size;
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)result;
return false;
#endif // HAVE_ZSTD
}
inline bool Zstd_Uncompress(const char* input, size_t length, char* output) {
#if HAVE_ZSTD
size_t outlen;
if (!Zstd_GetUncompressedLength(input, length, &outlen)) {
return false;
}
ZSTD_DCtx* ctx = ZSTD_createDCtx();
outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length);
ZSTD_freeDCtx(ctx);
if (ZSTD_isError(outlen)) {
return false;
}
return true;
#else
// Silence compiler warnings about unused arguments.
(void)input;
(void)length;
(void)output;
return false;
#endif // HAVE_ZSTD
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
// Silence compiler warnings about unused arguments. // Silence compiler warnings about unused arguments.
(void)func; (void)func;

+ 21
- 2
table/format.cc View File

@ -5,6 +5,7 @@
#include "table/format.h" #include "table/format.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/options.h"
#include "port/port.h" #include "port/port.h"
#include "table/block.h" #include "table/block.h"
#include "util/coding.h" #include "util/coding.h"
@ -116,13 +117,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
size_t ulength = 0; size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf; delete[] buf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block length");
} }
char* ubuf = new char[ulength]; char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) { if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf; delete[] buf;
delete[] ubuf; delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
return Status::Corruption("corrupted snappy compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
break;
}
case kZstdCompression: {
size_t ulength = 0;
if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted zstd compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Zstd_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted zstd compressed block contents");
} }
delete[] buf; delete[] buf;
result->data = Slice(ubuf, ulength); result->data = Slice(ubuf, ulength);

+ 14
- 0
table/table_builder.cc View File

@ -168,6 +168,20 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
} }
break; break;
} }
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
} }
WriteRawBlock(block_contents, type, handle); WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear(); r->compressed_output.clear();

+ 21
- 6
table/table_test.cc View File

@ -14,6 +14,7 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/table_builder.h" #include "leveldb/table_builder.h"
#include "table/block.h" #include "table/block.h"
#include "table/block_builder.h" #include "table/block_builder.h"
@ -784,15 +785,29 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
} }
static bool SnappyCompressionSupported() {
static bool CompressionSupported(CompressionType type) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
if (type == kSnappyCompression) {
return port::Snappy_Compress(in.data(), in.size(), &out);
} else if (type == kZstdCompression) {
return port::Zstd_Compress(in.data(), in.size(), &out);
}
return false;
} }
TEST(TableTest, ApproximateOffsetOfCompressed) {
if (!SnappyCompressionSupported())
GTEST_SKIP() << "skipping compression tests";
class CompressionTableTest
: public ::testing::TestWithParam<std::tuple<CompressionType>> {};
INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest,
::testing::Values(kSnappyCompression,
kZstdCompression));
TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) {
CompressionType type = ::testing::get<0>(GetParam());
if (!CompressionSupported(type)) {
GTEST_SKIP() << "skipping compression test: " << type;
}
Random rnd(301); Random rnd(301);
TableConstructor c(BytewiseComparator()); TableConstructor c(BytewiseComparator());
@ -805,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
KVMap kvmap; KVMap kvmap;
Options options; Options options;
options.block_size = 1024; options.block_size = 1024;
options.compression = kSnappyCompression;
options.compression = type;
c.Finish(options, &keys, &kvmap); c.Finish(options, &keys, &kvmap);
// Expected upper and lower bounds of space used by compressible strings. // Expected upper and lower bounds of space used by compressible strings.

Loading…
Cancel
Save