Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6540): Add c++ zstd compression API #30

Merged
merged 21 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@
"@typescript-eslint/no-require-imports": "off"
},
"overrides": [
{
"files": ["lib/*.js"],
"parserOptions": {
"ecmaVersion": 2019,
"sourceType": "commonjs"
}
},
{
"files": ["test/**/*ts"],
"rules": {
Expand Down
38 changes: 38 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Lint

on:
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest

name: ${{ matrix.lint-target }}
strategy:
matrix:
lint-target: ["c++", "typescript"]

steps:
- uses: actions/checkout@v4

- name: Use Node.js LTS
uses: actions/setup-node@v4
with:
node-version: 'lts/*'
cache: 'npm'

- name: Install dependencies
shell: bash
run: npm i --ignore-scripts

- if: matrix.lint-target == 'c++'
shell: bash
run: |
npm run check:clang-format
- if: matrix.lint-target == 'typescript'
shell: bash
run: |
npm run check:eslint
8 changes: 6 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ jobs:
cache: 'npm'
registry-url: 'https://registry.npmjs.org'

- name: Build with Node.js ${{ matrix.node }} on ${{ matrix.os }}
run: npm install && npm run compile
- name: Install zstd
run: npm run install-zstd
shell: bash

- name: install dependencies and compmile
run: npm install --loglevel verbose
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
shell: bash

- name: Test ${{ matrix.os }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ node_modules
build

npm-debug.log
deps
73 changes: 73 additions & 0 deletions addon/compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include "compression.h"

std::vector<uint8_t> mongodb_zstd::compress(const std::vector<uint8_t>& data,
size_t compression_level) {
size_t output_buffer_size = ZSTD_compressBound(data.size());
std::vector<uint8_t> output(output_buffer_size);

size_t result_code =
ZSTD_compress(output.data(), output.size(), data.data(), data.size(), compression_level);

if (ZSTD_isError(result_code)) {
throw std::runtime_error(ZSTD_getErrorName(result_code));
}

output.resize(result_code);

return output;
}

std::vector<uint8_t> mongodb_zstd::decompress(const std::vector<uint8_t>& compressed) {
std::vector<uint8_t> decompressed;

using DCTX_Deleter = void (*)(ZSTD_DCtx*);

std::unique_ptr<ZSTD_DCtx, DCTX_Deleter> decompression_context(
ZSTD_createDCtx(), [](ZSTD_DCtx* ctx) { ZSTD_freeDCtx(ctx); });

ZSTD_inBuffer input = {compressed.data(), compressed.size(), 0};
std::vector<uint8_t> output_buffer(ZSTD_DStreamOutSize());
ZSTD_outBuffer output = {output_buffer.data(), output_buffer.size(), 0};

// Source: https://facebook.github.io/zstd/zstd_manual.html#Chapter9
//
// Use ZSTD_decompressStream() repetitively to consume your input.
// The function will update both `pos` fields.
// If `input.pos < input.size`, some input has not been consumed.
// It's up to the caller to present again remaining data.
// The function tries to flush all data decoded immediately, respecting output buffer size.
// If `output.pos < output.size`, decoder has flushed everything it could.
// But if `output.pos == output.size`, there might be some data left within internal buffers.,
// In which case, call ZSTD_decompressStream() again to flush whatever remains in the buffer.
// Note : with no additional input provided, amount of data flushed is necessarily <=
// ZSTD_BLOCKSIZE_MAX.
// @return : 0 when a frame is completely decoded and fully flushed,
// or an error code, which can be tested using ZSTD_isError(),
// or any other value > 0, which means there is still some decoding or flushing to do to
// complete current frame :
// the return value is a suggested next input size (just a hint
// for better latency) that will never request more than the
// remaining frame size.
auto inputRemains = [](const ZSTD_inBuffer& input) { return input.pos < input.size; };
auto isOutputBufferFlushed = [](const ZSTD_outBuffer& output) {
return output.pos < output.size;
};

while (inputRemains(input) || !isOutputBufferFlushed(output)) {
size_t const ret = ZSTD_decompressStream(decompression_context.get(), &output, &input);
if (ZSTD_isError(ret)) {
throw std::runtime_error(ZSTD_getErrorName(ret));
}

size_t decompressed_size = decompressed.size();
decompressed.resize(decompressed_size + output.pos);
std::copy(output_buffer.data(),
output_buffer.data() + output.pos,
decompressed.data() + decompressed_size);

// move the position back go 0, to indicate that we are ready for more data
output.pos = 0;
}

return decompressed;
}
15 changes: 15 additions & 0 deletions addon/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef MONGODB_ZSTD_COMPRESSION
#define MONGODB_ZSTD_COMPRESSION

#include <exception>
#include <vector>

#include "compression_worker.h"
#include "zstd.h"

namespace mongodb_zstd {
std::vector<uint8_t> compress(const std::vector<uint8_t>& data, size_t compression_level);
std::vector<uint8_t> decompress(const std::vector<uint8_t>& data);
} // namespace mongodb_zstd

#endif
46 changes: 46 additions & 0 deletions addon/compression_worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#ifndef MONGODB_ZSTD_COMPRESSION_WORKER_H
#define MONGODB_ZSTD_COMPRESSION_WORKER_H
#include <napi.h>

#include <optional>
#include <variant>

using namespace Napi;
addaleax marked this conversation as resolved.
Show resolved Hide resolved

namespace mongodb_zstd {
/**
* @brief An asynchronous Napi::Worker that can be with any function that produces
* CompressionResults.
* */
class CompressionWorker final : public AsyncWorker {
public:
CompressionWorker(const Function& callback, std::function<std::vector<uint8_t>()> worker)
: AsyncWorker{callback, "compression worker"}, m_worker(worker), m_result{} {}

protected:
void Execute() final {
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
m_result = m_worker();
}

void OnOK() final {
if (!m_result.has_value()) {
Callback().Call({Error::New(Env(),
"zstd runtime - async worker finished without "
"a compression or decompression result.")
.Value()});
return;
}
addaleax marked this conversation as resolved.
Show resolved Hide resolved

const std::vector<uint8_t>& data = m_result.value();
Buffer result = Buffer<uint8_t>::Copy(Env(), data.data(), data.size());

Callback().Call({Env().Undefined(), result});
}

private:
std::function<std::vector<uint8_t>()> m_worker;
std::optional<std::vector<uint8_t>> m_result;
};

} // namespace mongodb_zstd
#endif
58 changes: 49 additions & 9 deletions addon/zstd.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,60 @@
#include "zstd.h"

#include <napi.h>

#include <vector>

#include "compression.h"
#include "compression_worker.h"

using namespace Napi;

Napi::String Compress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "compress()");
return string;
namespace mongodb_zstd {
void Compress(const CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 3) {
const char* error_message = "Expected three arguments.";
throw TypeError::New(info.Env(), error_message);
}

Uint8Array to_compress = info[0].As<Uint8Array>();
std::vector<uint8_t> data(to_compress.Data(), to_compress.Data() + to_compress.ElementLength());

size_t compression_level = static_cast<size_t>(info[1].ToNumber().Int32Value());
Function callback = info[2].As<Function>();

CompressionWorker* worker =
new CompressionWorker(callback, [data = std::move(data), compression_level] {
return mongodb_zstd::compress(data, compression_level);
});

worker->Queue();
}
Napi::String Decompress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "decompress()");
return string;

void Decompress(const CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 2) {
const char* error_message = "Expected two argument.";
throw TypeError::New(info.Env(), error_message);
}

Uint8Array compressed_data = info[0].As<Uint8Array>();
std::vector<uint8_t> data(compressed_data.Data(),
compressed_data.Data() + compressed_data.ElementLength());
Function callback = info[1].As<Function>();

CompressionWorker* worker = new CompressionWorker(
callback, [data = std::move(data)] { return mongodb_zstd::decompress(data); });

worker->Queue();
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
exports.Set(Napi::String::New(env, "compress"), Napi::Function::New(env, Compress));
exports.Set(Napi::String::New(env, "decompress"), Napi::Function::New(env, Decompress));
Object Init(Env env, Object exports) {
exports.Set(String::New(env, "compress"), Function::New(env, Compress));
exports.Set(String::New(env, "decompress"), Function::New(env, Decompress));
return exports;
}

NODE_API_MODULE(zstd, Init)

} // namespace mongodb_zstd
18 changes: 14 additions & 4 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,35 @@
'type': 'loadable_module',
'defines': ['ZSTD_STATIC_LINKING_ONLY'],
'include_dirs': [
"<!(node -p \"require('node-addon-api').include_dir\")"
"<!(node -p \"require('node-addon-api').include_dir\")",
"<(module_root_dir)/deps/zstd/lib",
],
'variables': {
'ARCH': '<(host_arch)',
'built_with_electron%': 0
},
'sources': [
'addon/zstd.cpp'
'addon/zstd.cpp',
'addon/compression_worker.h',
'addon/compression.h',
'addon/compression.cpp'
],
'xcode_settings': {
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES',
'CLANG_CXX_LIBRARY': 'libc++',
'MACOSX_DEPLOYMENT_TARGET': '10.12',
'MACOSX_DEPLOYMENT_TARGET': '11',
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
},
'cflags!': [ '-fno-exceptions' ],
'cflags_cc!': [ '-fno-exceptions' ],
'cflags_cc': ['-std=c++17'],
'msvs_settings': {
'VCCLCompilerTool': { 'ExceptionHandling': 1 },
}
},
'link_settings': {
'libraries': [
'<(module_root_dir)/deps/zstd/build/cmake/lib/libzstd.a',
]
},
}]
}
26 changes: 26 additions & 0 deletions etc/install-zstd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/sh
set -o xtrace

clean_deps() {
rm -rf deps
}

download_zstd() {
rm -rf deps
mkdir -p deps/zstd

curl -L "https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz" \
| tar -zxf - -C deps/zstd --strip-components 1
}

build_zstd() {
export MACOSX_DEPLOYMENT_TARGET=10.12
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
cd deps/zstd/build/cmake

cmake .
make
}

clean_deps
download_zstd
build_zstd
21 changes: 18 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
const { compress: _compress, decompress: _decompress } = require('bindings')('zstd');
'use strict';
const zstd = require('bindings')('zstd');
addaleax marked this conversation as resolved.
Show resolved Hide resolved
const { promisify } = require('util');

const _compress = promisify(zstd.compress);
const _decompress = promisify(zstd.decompress);
// Error objects created via napi don't have JS stacks; wrap them so .stack is present
// https://github.com/nodejs/node/issues/25318#issuecomment-451068073

exports.compress = async function compress(data) {
exports.compress = async function compress(data, compressionLevel) {
if (!Buffer.isBuffer(data)) {
throw new TypeError(`parameter 'data' must be a Buffer.`);
}

if (compressionLevel != null && typeof compressionLevel !== 'number') {
throw new TypeError(`parameter 'compressionLevel' must be a number.`);
}

try {
return await _compress(data);
return await _compress(data, compressionLevel ?? 3);
} catch (e) {
throw new Error(`zstd: ${e.message}`);
}
};
exports.decompress = async function decompress(data) {
if (!Buffer.isBuffer(data)) {
throw new TypeError(`parameter 'data' must be a Buffer.`);
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
try {
return await _decompress(data);
} catch (e) {
Expand Down
Loading