Skip to content

Commit

Permalink
feat(NODE-6540): Add c++ zstd compression API (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson authored Nov 19, 2024
1 parent 8c40b08 commit 6673245
Show file tree
Hide file tree
Showing 14 changed files with 2,120 additions and 93 deletions.
7 changes: 7 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
"@typescript-eslint/no-require-imports": "off"
},
"overrides": [
{
"files": ["lib/*.js"],
"parserOptions": {
"ecmaVersion": 2019,
"sourceType": "commonjs"
}
},
{
"files": ["test/**/*ts"],
"rules": {
Expand Down
38 changes: 38 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Lint

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest

name: ${{ matrix.lint-target }}
strategy:
matrix:
lint-target: ["c++", "typescript"]

steps:
- uses: actions/checkout@v4

- name: Use Node.js LTS
uses: actions/setup-node@v4
with:
node-version: 'lts/*'
cache: 'npm'

- name: Install dependencies
shell: bash
run: npm i --ignore-scripts

- if: matrix.lint-target == 'c++'
shell: bash
run: |
npm run check:clang-format
- if: matrix.lint-target == 'typescript'
shell: bash
run: |
npm run check:eslint
8 changes: 6 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ jobs:
cache: 'npm'
registry-url: 'https://registry.npmjs.org'

- name: Build with Node.js ${{ matrix.node }} on ${{ matrix.os }}
run: npm install && npm run compile
- name: Install zstd
run: npm run install-zstd
shell: bash

- name: install dependencies and compmile
run: npm install --loglevel verbose
shell: bash

- name: Test ${{ matrix.os }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ node_modules
build

npm-debug.log
deps
73 changes: 73 additions & 0 deletions addon/compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include "compression.h"

std::vector<uint8_t> mongodb_zstd::compress(const std::vector<uint8_t>& data,
size_t compression_level) {
size_t output_buffer_size = ZSTD_compressBound(data.size());
std::vector<uint8_t> output(output_buffer_size);

size_t result_code =
ZSTD_compress(output.data(), output.size(), data.data(), data.size(), compression_level);

if (ZSTD_isError(result_code)) {
throw std::runtime_error(ZSTD_getErrorName(result_code));
}

output.resize(result_code);

return output;
}

std::vector<uint8_t> mongodb_zstd::decompress(const std::vector<uint8_t>& compressed) {
std::vector<uint8_t> decompressed;

using DCTX_Deleter = void (*)(ZSTD_DCtx*);

std::unique_ptr<ZSTD_DCtx, DCTX_Deleter> decompression_context(
ZSTD_createDCtx(), [](ZSTD_DCtx* ctx) { ZSTD_freeDCtx(ctx); });

ZSTD_inBuffer input = {compressed.data(), compressed.size(), 0};
std::vector<uint8_t> output_buffer(ZSTD_DStreamOutSize());
ZSTD_outBuffer output = {output_buffer.data(), output_buffer.size(), 0};

// Source: https://facebook.github.io/zstd/zstd_manual.html#Chapter9
//
// Use ZSTD_decompressStream() repetitively to consume your input.
// The function will update both `pos` fields.
// If `input.pos < input.size`, some input has not been consumed.
// It's up to the caller to present again remaining data.
// The function tries to flush all data decoded immediately, respecting output buffer size.
// If `output.pos < output.size`, decoder has flushed everything it could.
// But if `output.pos == output.size`, there might be some data left within internal buffers.,
// In which case, call ZSTD_decompressStream() again to flush whatever remains in the buffer.
// Note : with no additional input provided, amount of data flushed is necessarily <=
// ZSTD_BLOCKSIZE_MAX.
// @return : 0 when a frame is completely decoded and fully flushed,
// or an error code, which can be tested using ZSTD_isError(),
// or any other value > 0, which means there is still some decoding or flushing to do to
// complete current frame :
// the return value is a suggested next input size (just a hint
// for better latency) that will never request more than the
// remaining frame size.
auto inputRemains = [](const ZSTD_inBuffer& input) { return input.pos < input.size; };
auto isOutputBufferFlushed = [](const ZSTD_outBuffer& output) {
return output.pos < output.size;
};

while (inputRemains(input) || !isOutputBufferFlushed(output)) {
size_t const ret = ZSTD_decompressStream(decompression_context.get(), &output, &input);
if (ZSTD_isError(ret)) {
throw std::runtime_error(ZSTD_getErrorName(ret));
}

size_t decompressed_size = decompressed.size();
decompressed.resize(decompressed_size + output.pos);
std::copy(output_buffer.data(),
output_buffer.data() + output.pos,
decompressed.data() + decompressed_size);

// move the position back go 0, to indicate that we are ready for more data
output.pos = 0;
}

return decompressed;
}
15 changes: 15 additions & 0 deletions addon/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef MONGODB_ZSTD_COMPRESSION
#define MONGODB_ZSTD_COMPRESSION

#include <exception>
#include <vector>

#include "compression_worker.h"
#include "zstd.h"

namespace mongodb_zstd {
std::vector<uint8_t> compress(const std::vector<uint8_t>& data, size_t compression_level);
std::vector<uint8_t> decompress(const std::vector<uint8_t>& data);
} // namespace mongodb_zstd

#endif
46 changes: 46 additions & 0 deletions addon/compression_worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#ifndef MONGODB_ZSTD_COMPRESSION_WORKER_H
#define MONGODB_ZSTD_COMPRESSION_WORKER_H
#include <napi.h>

#include <optional>
#include <variant>

using namespace Napi;

namespace mongodb_zstd {
/**
* @brief An asynchronous Napi::Worker that can be with any function that produces
* CompressionResults.
* */
class CompressionWorker final : public AsyncWorker {
public:
CompressionWorker(const Function& callback, std::function<std::vector<uint8_t>()> worker)
: AsyncWorker{callback, "compression worker"}, m_worker(worker), m_result{} {}

protected:
void Execute() final {
m_result = m_worker();
}

void OnOK() final {
if (!m_result.has_value()) {
Callback().Call({Error::New(Env(),
"zstd runtime - async worker finished without "
"a compression or decompression result.")
.Value()});
return;
}

const std::vector<uint8_t>& data = m_result.value();
Buffer result = Buffer<uint8_t>::Copy(Env(), data.data(), data.size());

Callback().Call({Env().Undefined(), result});
}

private:
std::function<std::vector<uint8_t>()> m_worker;
std::optional<std::vector<uint8_t>> m_result;
};

} // namespace mongodb_zstd
#endif
58 changes: 49 additions & 9 deletions addon/zstd.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,60 @@
#include "zstd.h"

#include <napi.h>

#include <vector>

#include "compression.h"
#include "compression_worker.h"

using namespace Napi;

Napi::String Compress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "compress()");
return string;
namespace mongodb_zstd {
void Compress(const CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 3) {
const char* error_message = "Expected three arguments.";
throw TypeError::New(info.Env(), error_message);
}

Uint8Array to_compress = info[0].As<Uint8Array>();
std::vector<uint8_t> data(to_compress.Data(), to_compress.Data() + to_compress.ElementLength());

size_t compression_level = static_cast<size_t>(info[1].ToNumber().Int32Value());
Function callback = info[2].As<Function>();

CompressionWorker* worker =
new CompressionWorker(callback, [data = std::move(data), compression_level] {
return mongodb_zstd::compress(data, compression_level);
});

worker->Queue();
}
Napi::String Decompress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "decompress()");
return string;

void Decompress(const CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 2) {
const char* error_message = "Expected two argument.";
throw TypeError::New(info.Env(), error_message);
}

Uint8Array compressed_data = info[0].As<Uint8Array>();
std::vector<uint8_t> data(compressed_data.Data(),
compressed_data.Data() + compressed_data.ElementLength());
Function callback = info[1].As<Function>();

CompressionWorker* worker = new CompressionWorker(
callback, [data = std::move(data)] { return mongodb_zstd::decompress(data); });

worker->Queue();
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
exports.Set(Napi::String::New(env, "compress"), Napi::Function::New(env, Compress));
exports.Set(Napi::String::New(env, "decompress"), Napi::Function::New(env, Decompress));
Object Init(Env env, Object exports) {
exports.Set(String::New(env, "compress"), Function::New(env, Compress));
exports.Set(String::New(env, "decompress"), Function::New(env, Decompress));
return exports;
}

NODE_API_MODULE(zstd, Init)

} // namespace mongodb_zstd
18 changes: 14 additions & 4 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,35 @@
'type': 'loadable_module',
'defines': ['ZSTD_STATIC_LINKING_ONLY'],
'include_dirs': [
"<!(node -p \"require('node-addon-api').include_dir\")"
"<!(node -p \"require('node-addon-api').include_dir\")",
"<(module_root_dir)/deps/zstd/lib",
],
'variables': {
'ARCH': '<(host_arch)',
'built_with_electron%': 0
},
'sources': [
'addon/zstd.cpp'
'addon/zstd.cpp',
'addon/compression_worker.h',
'addon/compression.h',
'addon/compression.cpp'
],
'xcode_settings': {
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES',
'CLANG_CXX_LIBRARY': 'libc++',
'MACOSX_DEPLOYMENT_TARGET': '10.12',
'MACOSX_DEPLOYMENT_TARGET': '11',
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
},
'cflags!': [ '-fno-exceptions' ],
'cflags_cc!': [ '-fno-exceptions' ],
'cflags_cc': ['-std=c++17'],
'msvs_settings': {
'VCCLCompilerTool': { 'ExceptionHandling': 1 },
}
},
'link_settings': {
'libraries': [
'<(module_root_dir)/deps/zstd/build/cmake/lib/libzstd.a',
]
},
}]
}
26 changes: 26 additions & 0 deletions etc/install-zstd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/sh
set -o xtrace

clean_deps() {
rm -rf deps
}

download_zstd() {
rm -rf deps
mkdir -p deps/zstd

curl -L "https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz" \
| tar -zxf - -C deps/zstd --strip-components 1
}

build_zstd() {
export MACOSX_DEPLOYMENT_TARGET=11
cd deps/zstd/build/cmake

cmake .
make
}

clean_deps
download_zstd
build_zstd
22 changes: 19 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
const { compress: _compress, decompress: _decompress } = require('bindings')('zstd');
'use strict';
const zstd = require('bindings')('zstd');
const { promisify } = require('util');
const { isUint8Array } = require('util/types');

const _compress = promisify(zstd.compress);
const _decompress = promisify(zstd.decompress);
// Error objects created via napi don't have JS stacks; wrap them so .stack is present
// https://github.com/nodejs/node/issues/25318#issuecomment-451068073

exports.compress = async function compress(data) {
exports.compress = async function compress(data, compressionLevel) {
if (!isUint8Array(data)) {
throw new TypeError(`parameter 'data' must be a Uint8Array.`);
}

if (compressionLevel != null && typeof compressionLevel !== 'number') {
throw new TypeError(`parameter 'compressionLevel' must be a number.`);
}

try {
return await _compress(data);
return await _compress(data, compressionLevel ?? 3);
} catch (e) {
throw new Error(`zstd: ${e.message}`);
}
};
exports.decompress = async function decompress(data) {
if (!isUint8Array(data)) {
throw new TypeError(`parameter 'data' must be a Uint8Array.`);
}
try {
return await _decompress(data);
} catch (e) {
Expand Down
Loading

0 comments on commit 6673245

Please sign in to comment.