From 9e447bfaa2eaf29925bfb279b83e1ad202d1b514 Mon Sep 17 00:00:00 2001 From: GiantCroc Date: Sun, 17 Mar 2024 21:53:10 +0800 Subject: [PATCH] contrib: add qatzstd compressor (#32166) Signed-off-by: giantcroc --- api/BUILD | 1 + .../qatzstd/compressor/v3alpha/BUILD | 9 ++ .../qatzstd/compressor/v3alpha/qatzstd.proto | 69 +++++++++++ api/versioning/BUILD | 1 + bazel/foreign_cc/qatzstd.patch | 13 +++ bazel/repositories.bzl | 9 ++ bazel/repository_locations.bzl | 17 ++- changelogs/current.yaml | 3 + contrib/all_contrib_extensions.bzl | 2 + contrib/contrib_build_config.bzl | 1 + contrib/extensions_metadata.yaml | 5 + contrib/qat/BUILD | 6 +- .../qatzstd/compressor/source/BUILD | 69 +++++++++++ .../qatzstd/compressor/source/config.cc | 81 +++++++++++++ .../qatzstd/compressor/source/config.h | 83 +++++++++++++ .../source/qatzstd_compressor_impl.cc | 85 ++++++++++++++ .../source/qatzstd_compressor_impl.h | 51 ++++++++ .../compression/qatzstd/compressor/test/BUILD | 20 ++++ .../test/qatzstd_compressor_impl_test.cc | 110 ++++++++++++++++++ docs/BUILD | 1 + docs/root/api-v3/config/contrib/qat/qat.rst | 1 + .../other_features/_include/qatzstd.yaml | 63 ++++++++++ .../other_features/other_features.rst | 1 + .../configuration/other_features/qatzstd.rst | 34 ++++++ source/common/common/logger.h | 1 + source/common/compression/zstd/common/BUILD | 19 +++ .../compression/zstd/common/base.cc | 4 +- .../compression/zstd/common/base.h | 2 - .../common/compression/zstd/compressor/BUILD | 20 ++++ .../compressor/zstd_compressor_impl_base.cc | 59 ++++++++++ .../compressor/zstd_compressor_impl_base.h | 43 +++++++ .../extensions/compression/zstd/common/BUILD | 10 -- .../compression/zstd/compressor/BUILD | 3 +- .../zstd/compressor/zstd_compressor_impl.cc | 47 ++------ .../zstd/compressor/zstd_compressor_impl.h | 20 ++-- .../compression/zstd/decompressor/BUILD | 2 +- .../decompressor/zstd_decompressor_impl.cc | 2 +- .../decompressor/zstd_decompressor_impl.h | 4 +- test/test_common/utility.cc | 6 +- test/test_common/utility.h | 3 +- tools/spelling/spelling_dictionary.txt | 1 + 41 files changed, 909 insertions(+), 72 deletions(-) create mode 100644 api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD create mode 100644 api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto create mode 100644 bazel/foreign_cc/qatzstd.patch create mode 100644 contrib/qat/compression/qatzstd/compressor/source/BUILD create mode 100644 contrib/qat/compression/qatzstd/compressor/source/config.cc create mode 100644 contrib/qat/compression/qatzstd/compressor/source/config.h create mode 100644 contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc create mode 100644 contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h create mode 100644 contrib/qat/compression/qatzstd/compressor/test/BUILD create mode 100644 contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc create mode 100644 docs/root/configuration/other_features/_include/qatzstd.yaml create mode 100644 docs/root/configuration/other_features/qatzstd.rst create mode 100644 source/common/compression/zstd/common/BUILD rename source/{extensions => common}/compression/zstd/common/base.cc (86%) rename source/{extensions => common}/compression/zstd/common/base.h (92%) create mode 100644 source/common/compression/zstd/compressor/BUILD create mode 100644 source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc create mode 100644 source/common/compression/zstd/compressor/zstd_compressor_impl_base.h diff --git a/api/BUILD b/api/BUILD index e0efeebce4f7..829715418693 100644 --- a/api/BUILD +++ b/api/BUILD @@ -73,6 +73,7 @@ proto_library( visibility = ["//visibility:public"], deps = [ "//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg", + "//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/checksum/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/dynamo/v3:pkg", "//contrib/envoy/extensions/filters/http/golang/v3alpha:pkg", diff --git a/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD new file mode 100644 index 000000000000..29ebf0741406 --- /dev/null +++ b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_xds//udpa/annotations:pkg"], +) diff --git a/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto new file mode 100644 index 000000000000..182722d01e4f --- /dev/null +++ b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto @@ -0,0 +1,69 @@ +syntax = "proto3"; + +package envoy.extensions.compression.qatzstd.compressor.v3alpha; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.compression.qatzstd.compressor.v3alpha"; +option java_outer_classname = "QatzstdProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Qatzstd Compressor] +// Qatzstd :ref:`configuration overview `. +// [#extension: envoy.compression.qatzstd.compressor] + +// [#next-free-field: 8] +message Qatzstd { + // Reference to http://facebook.github.io/zstd/zstd_manual.html + enum Strategy { + DEFAULT = 0; + FAST = 1; + DFAST = 2; + GREEDY = 3; + LAZY = 4; + LAZY2 = 5; + BTLAZY2 = 6; + BTOPT = 7; + BTULTRA = 8; + BTULTRA2 = 9; + } + + // Set compression parameters according to pre-defined compression level table. + // Note that exact compression parameters are dynamically determined, + // depending on both compression level and source content size (when known). + // Value 0 means default, and default level is 3. + // + // Setting a level does not automatically set all other compression parameters + // to default. Setting this will however eventually dynamically impact the compression + // parameters which have not been manually set. The manually set + // ones will 'stick'. + google.protobuf.UInt32Value compression_level = 1 [(validate.rules).uint32 = {lte: 22 gte: 1}]; + + // A 32-bits checksum of content is written at end of frame. If not set, defaults to false. + bool enable_checksum = 2; + + // The higher the value of selected strategy, the more complex it is, + // resulting in stronger and slower compression. + // + // Special: value 0 means "use default strategy". + Strategy strategy = 3 [(validate.rules).enum = {defined_only: true}]; + + // Value for compressor's next output buffer. If not set, defaults to 4096. + google.protobuf.UInt32Value chunk_size = 5 [(validate.rules).uint32 = {lte: 65536 gte: 4096}]; + + // Enable QAT to accelerate Zstd compression or not. If not set, defaults to false. + // + // This is useful in the case that users want to enable QAT for a period of time and disable QAT for another period of time, + // they don't have to change the config too much or prepare for another config that has software zstd compressor and just changing the value of this filed. + bool enable_qat_zstd = 6; + + // Fallback to software for Qatzstd when input size is less than this value. + // Valid only ``enable_qat_zstd`` is ``true``. 0 means no fallback at all. If not set, defaults to 4000. + google.protobuf.UInt32Value qat_zstd_fallback_threshold = 7 + [(validate.rules).uint32 = {lte: 65536 gte: 0}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index eb4569353709..b74e3df2a867 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -10,6 +10,7 @@ proto_library( visibility = ["//visibility:public"], deps = [ "//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg", + "//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg", "//contrib/envoy/extensions/config/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/checksum/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/dynamo/v3:pkg", diff --git a/bazel/foreign_cc/qatzstd.patch b/bazel/foreign_cc/qatzstd.patch new file mode 100644 index 000000000000..84bd231db5e9 --- /dev/null +++ b/bazel/foreign_cc/qatzstd.patch @@ -0,0 +1,13 @@ +diff --git a/src/Makefile b/src/Makefile +index 1abf10d..c5fa3a6 100644 +--- a/src/Makefile ++++ b/src/Makefile +@@ -54,7 +54,7 @@ ifneq ($(ICP_ROOT), ) + endif + else + QATFLAGS = -DINTREE +- LDFLAGS = -lqat ++ LDFLAGS += -lqat + ifneq ($(ENABLE_USDM_DRV), 0) + QATFLAGS += -DENABLE_USDM_DRV + LDFLAGS += -lusdm diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index bec5940badf3..d231f276c7ab 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -313,6 +313,7 @@ def envoy_dependencies(skip_targets = []): _com_github_intel_ipp_crypto_crypto_mb_fips() _com_github_intel_qatlib() _com_github_intel_qatzip() + _com_github_qat_zstd() _com_github_lz4_lz4() _com_github_jbeder_yaml_cpp() _com_github_libevent_libevent() @@ -585,6 +586,14 @@ def _com_github_intel_qatzip(): build_file_content = BUILD_ALL_CONTENT, ) +def _com_github_qat_zstd(): + external_http_archive( + name = "com_github_qat_zstd", + build_file_content = BUILD_ALL_CONTENT, + patch_args = ["-p1"], + patches = ["@envoy//bazel/foreign_cc:qatzstd.patch"], + ) + def _com_github_lz4_lz4(): external_http_archive( name = "com_github_lz4_lz4", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index b81d31b33a9d..6cddbf98ad60 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -446,7 +446,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( urls = ["https://github.com/intel/qatlib/archive/refs/tags/{version}.tar.gz"], use_category = ["dataplane_ext"], release_date = "2024-02-20", - extensions = ["envoy.tls.key_providers.qat", "envoy.compression.qatzip.compressor"], + extensions = ["envoy.tls.key_providers.qat", "envoy.compression.qatzip.compressor", "envoy.compression.qatzstd.compressor"], cpe = "N/A", license = "BSD-3-Clause", license_url = "https://github.com/intel/qatlib/blob/{version}/LICENSE", @@ -466,6 +466,21 @@ REPOSITORY_LOCATIONS_SPEC = dict( license = "BSD-3-Clause", license_url = "https://github.com/intel/QATzip/blob/v{version}/LICENSE", ), + com_github_qat_zstd = dict( + project_name = "QAT-ZSTD-Plugin", + project_desc = "Intel® QuickAssist Technology ZSTD Plugin (QAT ZSTD Plugin)", + project_url = "https://github.com/intel/QAT-ZSTD-Plugin/", + version = "0.1.0", + sha256 = "74c5bfbb3b0c6f1334e128ee0b43958d1d34751a4762e54e8f970c443e445f33", + strip_prefix = "QAT-ZSTD-Plugin-{version}", + urls = ["https://github.com/intel/QAT-ZSTD-Plugin/archive/refs/tags/v{version}.tar.gz"], + use_category = ["dataplane_ext"], + extensions = [ + "envoy.compression.qatzstd.compressor", + ], + release_date = "2023-09-08", + cpe = "N/A", + ), com_github_luajit_luajit = dict( project_name = "LuaJIT", project_desc = "Just-In-Time compiler for Lua", diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 4bb71b22796c..092048320fae 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -246,6 +246,9 @@ new_features: change: | added support for :ref:`%UPSTREAM_CONNECTION_ID% ` for the upstream connection identifier. +- area: compression + change: | + Added Qatzstd :ref:`compressor `. - area: aws_lambda change: | Added :ref:`host_rewrite ` config to be used diff --git a/contrib/all_contrib_extensions.bzl b/contrib/all_contrib_extensions.bzl index 6b7994a2623b..7caf371f6032 100644 --- a/contrib/all_contrib_extensions.bzl +++ b/contrib/all_contrib_extensions.bzl @@ -18,6 +18,7 @@ ARM64_SKIP_CONTRIB_TARGETS = [ "envoy.tls.key_providers.qat", "envoy.network.connection_balance.dlb", "envoy.compression.qatzip.compressor", + "envoy.compression.qatzstd.compressor", ] PPC_SKIP_CONTRIB_TARGETS = [ "envoy.tls.key_providers.cryptomb", @@ -26,6 +27,7 @@ PPC_SKIP_CONTRIB_TARGETS = [ "envoy.network.connection_balance.dlb", "envoy.regex_engines.hyperscan", "envoy.compression.qatzip.compressor", + "envoy.compression.qatzstd.compressor", ] FIPS_LINUX_X86_SKIP_CONTRIB_TARGETS = [ diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index b8e7811a168c..49b1f7afde79 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -5,6 +5,7 @@ CONTRIB_EXTENSIONS = { # "envoy.compression.qatzip.compressor": "//contrib/qat/compression/qatzip/compressor/source:config", + "envoy.compression.qatzstd.compressor": "//contrib/qat/compression/qatzstd/compressor/source:config", # # HTTP filters diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index 8168b063da58..d8aee9d710fb 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -18,6 +18,11 @@ envoy.compression.qatzip.compressor: - envoy.compression.compressor security_posture: robust_to_untrusted_downstream_and_upstream status: alpha +envoy.compression.qatzstd.compressor: + categories: + - envoy.compression.compressor + security_posture: robust_to_untrusted_downstream_and_upstream + status: alpha envoy.filters.http.squash: categories: - envoy.filters.http diff --git a/contrib/qat/BUILD b/contrib/qat/BUILD index d435976e4953..08ddf04136c3 100644 --- a/contrib/qat/BUILD +++ b/contrib/qat/BUILD @@ -20,12 +20,16 @@ configure_make( configure_options = [ "--disable-fast-crc-in-assembler", "--disable-systemd", - "--disable-shared", "--with-pic", + "--enable-shared", "--enable-static", "--enable-samples=no", ], lib_source = "@com_github_intel_qatlib//:all", + out_shared_libs = [ + "libqat.so", + "libusdm.so", + ], out_static_libs = [ "libqat.a", "libusdm.a", diff --git a/contrib/qat/compression/qatzstd/compressor/source/BUILD b/contrib/qat/compression/qatzstd/compressor/source/BUILD new file mode 100644 index 000000000000..960fa2b9b55e --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/BUILD @@ -0,0 +1,69 @@ +load("@rules_foreign_cc//foreign_cc:defs.bzl", "make") +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +make( + name = "qat-zstd", + build_data = ["@com_github_qat_zstd//:all"], + env = select({ + "//bazel:clang_build": { + "CFLAGS": "-Wno-error=unused-parameter -Wno-error=unused-command-line-argument -I$$EXT_BUILD_DEPS/qatlib/include -I$$EXT_BUILD_DEPS/zstd/include", + }, + "//conditions:default": { + "CFLAGS": "-I$$EXT_BUILD_DEPS/qatlib/include -I$$EXT_BUILD_DEPS/zstd/include", + }, + }), + includes = [], + lib_source = "@com_github_qat_zstd//:all", + out_static_libs = ["libqatseqprod.a"], + tags = ["skip_on_windows"], + target_compatible_with = [ + "@platforms//os:linux", + "@platforms//cpu:x86_64", + ], + targets = [ + "ENABLE_USDM_DRV=1", + "install", + ], + deps = [ + "//contrib/qat:qatlib", + "//external:zstd", + ], +) + +envoy_cc_library( + name = "compressor_lib", + srcs = ["qatzstd_compressor_impl.cc"], + hdrs = ["qatzstd_compressor_impl.h"], + deps = [ + ":qat-zstd", + "//envoy/compression/compressor:compressor_interface", + "//envoy/server:factory_context_interface", + "//source/common/buffer:buffer_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/compressor:compressor_base", + ], +) + +envoy_cc_contrib_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":compressor_lib", + ":qat-zstd", + "//envoy/event:dispatcher_interface", + "//envoy/thread_local:thread_local_interface", + "//source/common/http:headers_lib", + "//source/extensions/compression/common/compressor:compressor_factory_base_lib", + "@envoy_api//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg_cc_proto", + ], +) diff --git a/contrib/qat/compression/qatzstd/compressor/source/config.cc b/contrib/qat/compression/qatzstd/compressor/source/config.cc new file mode 100644 index 000000000000..67a24d530d1b --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/config.cc @@ -0,0 +1,81 @@ +#include "contrib/qat/compression/qatzstd/compressor/source/config.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +QatzstdCompressorFactory::QatzstdCompressorFactory( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& qatzstd, + ThreadLocal::SlotAllocator& tls) + : compression_level_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(qatzstd, compression_level, ZSTD_CLEVEL_DEFAULT)), + enable_checksum_(qatzstd.enable_checksum()), strategy_(qatzstd.strategy()), + chunk_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(qatzstd, chunk_size, ZSTD_CStreamOutSize())), + enable_qat_zstd_(qatzstd.enable_qat_zstd()), + qat_zstd_fallback_threshold_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( + qatzstd, qat_zstd_fallback_threshold, DefaultQatZstdFallbackThreshold)), + tls_slot_(nullptr) { + if (enable_qat_zstd_) { + tls_slot_ = ThreadLocal::TypedSlot::makeUnique(tls); + tls_slot_->set([](Event::Dispatcher&) { return std::make_shared(); }); + } +} + +QatzstdCompressorFactory::QatzstdThreadLocal::QatzstdThreadLocal() + : initialized_(false), sequenceProducerState_(nullptr) {} + +QatzstdCompressorFactory::QatzstdThreadLocal::~QatzstdThreadLocal() { + if (initialized_) { + /* Free sequence producer state */ + QZSTD_freeSeqProdState(sequenceProducerState_); + /* Stop QAT device, please call this function when + you won't use QAT anymore or before the process exits */ + QZSTD_stopQatDevice(); + } +} + +void* QatzstdCompressorFactory::QatzstdThreadLocal::GetQATSession() { + // The session must be initialized only once in every worker thread. + if (!initialized_) { + + int status = QZSTD_startQatDevice(); + // RELEASE_ASSERT(status == QZSTD_OK, "failed to initialize hardware"); + if (status != QZSTD_OK) { + ENVOY_LOG(warn, "Failed to initialize qat hardware"); + } else { + ENVOY_LOG(debug, "Initialize qat hardware successful"); + } + sequenceProducerState_ = QZSTD_createSeqProdState(); + initialized_ = true; + } + + return sequenceProducerState_; +} + +Envoy::Compression::Compressor::CompressorPtr QatzstdCompressorFactory::createCompressor() { + return std::make_unique( + compression_level_, enable_checksum_, strategy_, chunk_size_, enable_qat_zstd_, + qat_zstd_fallback_threshold_, enable_qat_zstd_ ? tls_slot_->get()->GetQATSession() : nullptr); +} + +Envoy::Compression::Compressor::CompressorFactoryPtr +QatzstdCompressorLibraryFactory::createCompressorFactoryFromProtoTyped( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& proto_config, + Server::Configuration::FactoryContext& context) { + return std::make_unique(proto_config, + context.serverFactoryContext().threadLocal()); +} + +/** + * Static registration for the zstd compressor library. @see NamedCompressorLibraryConfigFactory. + */ +REGISTER_FACTORY(QatzstdCompressorLibraryFactory, + Envoy::Compression::Compressor::NamedCompressorLibraryConfigFactory); + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/config.h b/contrib/qat/compression/qatzstd/compressor/source/config.h new file mode 100644 index 000000000000..8e78cd8123be --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/config.h @@ -0,0 +1,83 @@ +#pragma once + +#include "envoy/compression/compressor/factory.h" +#include "envoy/event/dispatcher.h" +#include "envoy/thread_local/thread_local.h" + +#include "source/common/common/logger.h" +#include "source/common/http/headers.h" +#include "source/extensions/compression/common/compressor/factory_base.h" + +#include "contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.pb.h" +#include "contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.pb.validate.h" +#include "contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h" +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +// Default threshold for qat_zstd fallback to software. +const uint32_t DefaultQatZstdFallbackThreshold = 4000; + +namespace { + +const std::string& qatzstdStatsPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "qatzstd."); } +const std::string& qatzstdExtensionName() { + CONSTRUCT_ON_FIRST_USE(std::string, "envoy.compression.qatzstd.compressor"); +} + +} // namespace + +class QatzstdCompressorFactory : public Envoy::Compression::Compressor::CompressorFactory { +public: + QatzstdCompressorFactory( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& qatzstd, + ThreadLocal::SlotAllocator& tls); + + // Envoy::Compression::Compressor::CompressorFactory + Envoy::Compression::Compressor::CompressorPtr createCompressor() override; + const std::string& statsPrefix() const override { return qatzstdStatsPrefix(); } + const std::string& contentEncoding() const override { + return Http::CustomHeaders::get().ContentEncodingValues.Zstd; + } + +private: + struct QatzstdThreadLocal : public ThreadLocal::ThreadLocalObject, + public Logger::Loggable { + QatzstdThreadLocal(); + ~QatzstdThreadLocal() override; + void* GetQATSession(); + bool initialized_; + void* sequenceProducerState_; + }; + const uint32_t compression_level_; + const bool enable_checksum_; + const uint32_t strategy_; + const uint32_t chunk_size_; + const bool enable_qat_zstd_; + const uint32_t qat_zstd_fallback_threshold_; + ThreadLocal::TypedSlotPtr tls_slot_; +}; + +class QatzstdCompressorLibraryFactory + : public Compression::Common::Compressor::CompressorLibraryFactoryBase< + envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd> { +public: + QatzstdCompressorLibraryFactory() : CompressorLibraryFactoryBase(qatzstdExtensionName()) {} + +private: + Envoy::Compression::Compressor::CompressorFactoryPtr createCompressorFactoryFromProtoTyped( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& config, + Server::Configuration::FactoryContext& context) override; +}; + +DECLARE_FACTORY(QatzstdCompressorLibraryFactory); + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc new file mode 100644 index 000000000000..7b1cb85a8aae --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc @@ -0,0 +1,85 @@ +#include "contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +QatzstdCompressorImpl::QatzstdCompressorImpl(uint32_t compression_level, bool enable_checksum, + uint32_t strategy, uint32_t chunk_size, + bool enable_qat_zstd, + uint32_t qat_zstd_fallback_threshold, + void* sequenceProducerState) + : ZstdCompressorImplBase(compression_level, enable_checksum, strategy, chunk_size), + enable_qat_zstd_(enable_qat_zstd), qat_zstd_fallback_threshold_(qat_zstd_fallback_threshold), + sequenceProducerState_(sequenceProducerState), input_ptr_{std::make_unique( + chunk_size)}, + input_len_(0), chunk_size_(chunk_size) { + size_t result; + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_compressionLevel, compression_level_); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + + ENVOY_LOG(debug, + "zstd new ZstdCompressorImpl, compression_level: {}, strategy: {}, chunk_size: " + "{}, enable_qat_zstd: {}, qat_zstd_fallback_threshold: {}", + compression_level, strategy, chunk_size, enable_qat_zstd, qat_zstd_fallback_threshold); + if (enable_qat_zstd_) { + /* register qatSequenceProducer */ + ZSTD_registerSequenceProducer(cctx_.get(), sequenceProducerState_, qatSequenceProducer); + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_enableSeqProducerFallback, 1); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + } +} + +void QatzstdCompressorImpl::compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) { + ENVOY_LOG(debug, "zstd compress input size {}", buffer.length()); + if (enable_qat_zstd_ && state == Envoy::Compression::Compressor::State::Flush) { + // Fall back to software if input size less than threshold to achieve better performance. + if (buffer.length() < qat_zstd_fallback_threshold_) { + ENVOY_LOG(debug, "zstd compress fall back to software"); + ZSTD_registerSequenceProducer(cctx_.get(), nullptr, nullptr); + } + } +} + +void QatzstdCompressorImpl::setInput(const uint8_t* input, size_t size) { + input_.src = input; + input_.pos = 0; + input_.size = size; + input_len_ = 0; +} + +void QatzstdCompressorImpl::compressProcess(const Buffer::Instance& buffer, + const Buffer::RawSlice& slice, + Buffer::Instance& accumulation_buffer) { + if (slice.len_ == buffer.length() || slice.len_ > chunk_size_) { + if (input_len_ > 0) { + setInput(input_ptr_.get(), input_len_); + process(accumulation_buffer, ZSTD_e_continue); + } + setInput(static_cast(slice.mem_), slice.len_); + process(accumulation_buffer, ZSTD_e_continue); + } else { + if (input_len_ + slice.len_ > chunk_size_) { + setInput(input_ptr_.get(), input_len_); + process(accumulation_buffer, ZSTD_e_continue); + } + memcpy(input_ptr_.get() + input_len_, slice.mem_, slice.len_); // NOLINT(safe-memcpy) + input_len_ += slice.len_; + } +} + +void QatzstdCompressorImpl::compressPostprocess(Buffer::Instance& accumulation_buffer) { + if (input_len_ > 0) { + setInput(input_ptr_.get(), input_len_); + process(accumulation_buffer, ZSTD_e_continue); + } +} + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h new file mode 100644 index 000000000000..1960b13b7671 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h @@ -0,0 +1,51 @@ +#pragma once + +#include "envoy/compression/compressor/compressor.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/logger.h" +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" + +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +/** + * Implementation of compressor's interface. + */ +class QatzstdCompressorImpl : public Envoy::Compression::Zstd::Compressor::ZstdCompressorImplBase, + public Logger::Loggable { +public: + QatzstdCompressorImpl(uint32_t compression_level, bool enable_checksum, uint32_t strategy, + uint32_t chunk_size, bool enable_qat_zstd, + uint32_t qat_zstd_fallback_threshold, void* sequenceProducerState); + +private: + void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) override; + + void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& slice, + Buffer::Instance& accumulation_buffer) override; + + void compressPostprocess(Buffer::Instance& accumulation_buffer) override; + + void setInput(const uint8_t* input, size_t size); + + bool enable_qat_zstd_; + const uint32_t qat_zstd_fallback_threshold_; + void* sequenceProducerState_; + std::unique_ptr input_ptr_; + uint64_t input_len_; + uint64_t chunk_size_; +}; + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/test/BUILD b/contrib/qat/compression/qatzstd/compressor/test/BUILD new file mode 100644 index 000000000000..85297caa141a --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/test/BUILD @@ -0,0 +1,20 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "compressor_test", + srcs = ["qatzstd_compressor_impl_test.cc"], + deps = [ + "//contrib/qat/compression/qatzstd/compressor/source:config", + "//source/extensions/compression/zstd/decompressor:decompressor_lib", + "//test/mocks/server:factory_context_mocks", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc b/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc new file mode 100644 index 000000000000..257a23061ed2 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc @@ -0,0 +1,110 @@ +#include "source/common/buffer/buffer_impl.h" +#include "source/common/stats/isolated_store_impl.h" +#include "source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h" + +#include "test/mocks/server/factory_context.h" +#include "test/test_common/utility.h" + +#include "contrib/qat/compression/qatzstd/compressor/source/config.h" +#include "gtest/gtest.h" +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { +namespace { + +class QatzstdCompressorImplTest : public testing::Test { +protected: + void drainBuffer(Buffer::OwnedImpl& buffer) { + buffer.drain(buffer.length()); + ASSERT_EQ(0, buffer.length()); + } + + void verifyWithDecompressor(Envoy::Compression::Compressor::CompressorPtr compressor) { + Buffer::OwnedImpl buffer; + Buffer::OwnedImpl accumulation_buffer; + std::string original_text{}; + for (uint64_t i = 0; i < 10; i++) { + TestUtility::feedBufferWithRandomCharacters(buffer, default_input_size_ * i, i, i); + original_text.append(buffer.toString()); + ASSERT_EQ(default_input_size_ * i * i, buffer.length()); + compressor->compress(buffer, Envoy::Compression::Compressor::State::Flush); + accumulation_buffer.add(buffer); + drainBuffer(buffer); + } + + compressor->compress(buffer, Envoy::Compression::Compressor::State::Finish); + accumulation_buffer.add(buffer); + drainBuffer(buffer); + + Stats::IsolatedStoreImpl stats_store{}; + Zstd::Decompressor::ZstdDecompressorImpl decompressor{*stats_store.rootScope(), "test.", + default_ddict_manager_, 4096}; + + decompressor.decompress(accumulation_buffer, buffer); + std::string decompressed_text{buffer.toString()}; + + ASSERT_EQ(original_text.length(), decompressed_text.length()); + EXPECT_EQ(original_text, decompressed_text); + } + + Envoy::Compression::Compressor::CompressorFactoryPtr + createQatzstdCompressorFactoryFromConfig(const std::string& json) { + envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd qatzstd_config; + TestUtility::loadFromJson(json, qatzstd_config); + + return qatzstd_compressor_library_factory_.createCompressorFactoryFromProto(qatzstd_config, + context_); + } + + uint32_t default_input_size_{796}; + Zstd::Decompressor::ZstdDDictManagerPtr default_ddict_manager_{nullptr}; + QatzstdCompressorLibraryFactory qatzstd_compressor_library_factory_; + NiceMock context_; +}; + +class QatzstdConfigTest : public QatzstdCompressorImplTest, + public ::testing::WithParamInterface> {}; + +// These tests should pass even if required hardware or setup steps required for qatzstd are +// missing. Qatzstd uses a sofware fallback in this case. +INSTANTIATE_TEST_SUITE_P( + QatzstdConfigTestInstantiation, QatzstdConfigTest, + // First tuple has all default values. + ::testing::Values(std::make_tuple(1, 4096, true, 4096), std::make_tuple(2, 4096, true, 4096), + std::make_tuple(3, 65536, true, 4096), std::make_tuple(4, 4096, true, 4096), + std::make_tuple(5, 8192, true, 1024), std::make_tuple(6, 4096, false, 1024), + std::make_tuple(7, 4096, true, 1024), std::make_tuple(8, 8192, true, 4096), + std::make_tuple(9, 8192, true, 1024), std::make_tuple(10, 16384, true, 1024), + std::make_tuple(11, 8192, true, 8192), + std::make_tuple(12, 4096, true, 1024))); + +TEST_P(QatzstdConfigTest, LoadConfigAndVerifyWithDecompressor) { + std::tuple config_value_tuple = GetParam(); + std::string json{fmt::format(R"EOF({{ + "compression_level": {}, + "chunk_size": {}, + "enable_qat_zstd": {}, + "qat_zstd_fallback_threshold": {}, +}})EOF", + std::get<0>(config_value_tuple), std::get<1>(config_value_tuple), + std::get<2>(config_value_tuple), std::get<3>(config_value_tuple))}; + + Envoy::Compression::Compressor::CompressorFactoryPtr qatzstd_compressor_factory = + createQatzstdCompressorFactoryFromConfig(json); + + EXPECT_EQ("zstd", qatzstd_compressor_factory->contentEncoding()); + EXPECT_EQ("qatzstd.", qatzstd_compressor_factory->statsPrefix()); + + verifyWithDecompressor(qatzstd_compressor_factory->createCompressor()); +} + +} // namespace +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/docs/BUILD b/docs/BUILD index 16df02c3e5a6..3497d8d9f186 100644 --- a/docs/BUILD +++ b/docs/BUILD @@ -35,6 +35,7 @@ filegroup( "root/configuration/other_features/_include/hyperscan_matcher_multiple.yaml", "root/configuration/other_features/_include/hyperscan_regex_engine.yaml", "root/configuration/other_features/_include/qatzip.yaml", + "root/configuration/other_features/_include/qatzstd.yaml", "root/intro/arch_overview/security/_include/ssl.yaml", "root/configuration/listeners/network_filters/_include/generic_proxy_filter.yaml", "root/configuration/overview/_include/xds_api/oauth-sds-example.yaml", diff --git a/docs/root/api-v3/config/contrib/qat/qat.rst b/docs/root/api-v3/config/contrib/qat/qat.rst index 3e524d700b13..c03e01dd4397 100644 --- a/docs/root/api-v3/config/contrib/qat/qat.rst +++ b/docs/root/api-v3/config/contrib/qat/qat.rst @@ -4,3 +4,4 @@ ../../../extensions/private_key_providers/qat/v3alpha/* ../../../extensions/compression/qatzip/compressor/v3alpha/* + ../../../extensions/compression/qatzstd/compressor/v3alpha/* diff --git a/docs/root/configuration/other_features/_include/qatzstd.yaml b/docs/root/configuration/other_features/_include/qatzstd.yaml new file mode 100644 index 000000000000..4a33cfe65094 --- /dev/null +++ b/docs/root/configuration/other_features/_include/qatzstd.yaml @@ -0,0 +1,63 @@ +static_resources: + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: backend + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: service + http_filters: + - name: envoy.filters.http.compressor + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor + response_direction_config: + common_config: + min_content_length: 100 + content_type: + - application/octet-stream + disable_on_etag_header: false + compressor_library: + name: text_optimized + typed_config: + "@type": type.googleapis.com/envoy.extensions.compression.qatzstd.compressor.v3alpha.Qatzstd + compression_level: 9 + enable_qat_zstd: true + chunk_size: 65536 + qat_zstd_fallback_threshold: 0 + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: service + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 +admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 9901 diff --git a/docs/root/configuration/other_features/other_features.rst b/docs/root/configuration/other_features/other_features.rst index bd54f4e7a129..81c5ca3046cc 100644 --- a/docs/root/configuration/other_features/other_features.rst +++ b/docs/root/configuration/other_features/other_features.rst @@ -12,4 +12,5 @@ Other features wasm wasm_service qatzip + qatzstd string_matcher diff --git a/docs/root/configuration/other_features/qatzstd.rst b/docs/root/configuration/other_features/qatzstd.rst new file mode 100644 index 000000000000..e963c862e6d7 --- /dev/null +++ b/docs/root/configuration/other_features/qatzstd.rst @@ -0,0 +1,34 @@ +.. _config_qatzstd: + +Qatzstd Compressor +======================= + +* :ref:`v3 API reference ` + + +Qatzstd compressor provides Envoy with faster hardware-accelerated zstd compression by integrating with `Intel® QuickAssist Technology (Intel® QAT) `_ through the qatlib and QAT-ZSTD-Plugin libraries. + +Example configuration +--------------------- + +An example for Qatzstd compressor configuration is: + +.. literalinclude:: _include/qatzstd.yaml + :language: yaml + + +How it works +------------ + +If enabled, the Qatzstd compressor will: + +- attach Qat hardware +- create Threadlocal Qatzstd context for each worker thread + +When new http package come, one worker thread will process it using its Qatzstd context and send the data needed to be compressed to +Qat hardware using standard zstd api. + +Installing and using QAT-ZSTD-Plugin +------------------------------------ + +For information on how to build/install and use QAT-ZSTD-Plugin see `introduction `_. diff --git a/source/common/common/logger.h b/source/common/common/logger.h index a90e43628e1e..4f0e7375f459 100644 --- a/source/common/common/logger.h +++ b/source/common/common/logger.h @@ -45,6 +45,7 @@ const static bool should_log = true; FUNCTION(config) \ FUNCTION(connection) \ FUNCTION(conn_handler) \ + FUNCTION(compression) \ FUNCTION(decompression) \ FUNCTION(dns) \ FUNCTION(dubbo) \ diff --git a/source/common/compression/zstd/common/BUILD b/source/common/compression/zstd/common/BUILD new file mode 100644 index 000000000000..e9ecaa5338b2 --- /dev/null +++ b/source/common/compression/zstd/common/BUILD @@ -0,0 +1,19 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_library( + name = "zstd_base_lib", + srcs = ["base.cc"], + hdrs = ["base.h"], + external_deps = ["zstd"], + deps = [ + "//source/common/buffer:buffer_lib", + ], +) diff --git a/source/extensions/compression/zstd/common/base.cc b/source/common/compression/zstd/common/base.cc similarity index 86% rename from source/extensions/compression/zstd/common/base.cc rename to source/common/compression/zstd/common/base.cc index 3682f3626934..89dc2941cb2d 100644 --- a/source/extensions/compression/zstd/common/base.cc +++ b/source/common/compression/zstd/common/base.cc @@ -1,7 +1,6 @@ -#include "source/extensions/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/base.h" namespace Envoy { -namespace Extensions { namespace Compression { namespace Zstd { namespace Common { @@ -27,5 +26,4 @@ void Base::getOutput(Buffer::Instance& output_buffer) { } // namespace Common } // namespace Zstd } // namespace Compression -} // namespace Extensions } // namespace Envoy diff --git a/source/extensions/compression/zstd/common/base.h b/source/common/compression/zstd/common/base.h similarity index 92% rename from source/extensions/compression/zstd/common/base.h rename to source/common/compression/zstd/common/base.h index 6a3c626a3701..8582fe3f86f3 100644 --- a/source/extensions/compression/zstd/common/base.h +++ b/source/common/compression/zstd/common/base.h @@ -7,7 +7,6 @@ #include "zstd.h" namespace Envoy { -namespace Extensions { namespace Compression { namespace Zstd { namespace Common { @@ -29,5 +28,4 @@ struct Base { } // namespace Common } // namespace Zstd } // namespace Compression -} // namespace Extensions } // namespace Envoy diff --git a/source/common/compression/zstd/compressor/BUILD b/source/common/compression/zstd/compressor/BUILD new file mode 100644 index 000000000000..16a3fbe97f1e --- /dev/null +++ b/source/common/compression/zstd/compressor/BUILD @@ -0,0 +1,20 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_library( + name = "compressor_base", + srcs = ["zstd_compressor_impl_base.cc"], + hdrs = ["zstd_compressor_impl_base.h"], + deps = [ + "//envoy/compression/compressor:compressor_interface", + "//source/common/buffer:buffer_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + ], +) diff --git a/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc new file mode 100644 index 000000000000..2a04f72b83c0 --- /dev/null +++ b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc @@ -0,0 +1,59 @@ +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" + +#include "source/common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Compression { +namespace Zstd { +namespace Compressor { + +ZstdCompressorImplBase::ZstdCompressorImplBase(uint32_t compression_level, bool enable_checksum, + uint32_t strategy, uint32_t chunk_size) + : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx), + compression_level_(compression_level) { + size_t result; + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy); + RELEASE_ASSERT(!ZSTD_isError(result), ""); +} + +void ZstdCompressorImplBase::compress(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) { + compressPreprocess(buffer, state); + + Buffer::OwnedImpl accumulation_buffer; + for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) { + if (input_slice.len_ > 0) { + compressProcess(buffer, input_slice, accumulation_buffer); + buffer.drain(input_slice.len_); + } + } + + compressPostprocess(accumulation_buffer); + + ASSERT(buffer.length() == 0); + buffer.move(accumulation_buffer); + + if (state == Envoy::Compression::Compressor::State::Finish) { + process(buffer, ZSTD_e_end); + } +} + +void ZstdCompressorImplBase::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) { + bool finished; + do { + const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode); + getOutput(output_buffer); + // If we're on the last chunk we're finished when zstd returns 0, + // which means its consumed all the input AND finished the frame. + // Otherwise, we're finished when we've consumed all the input. + finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size); + } while (!finished); +} + +} // namespace Compressor +} // namespace Zstd +} // namespace Compression +} // namespace Envoy diff --git a/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h new file mode 100644 index 000000000000..954b3aa17f9a --- /dev/null +++ b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h @@ -0,0 +1,43 @@ +#pragma once + +#include "envoy/compression/compressor/compressor.h" + +#include "source/common/compression/zstd/common/base.h" + +namespace Envoy { +namespace Compression { +namespace Zstd { +namespace Compressor { + +/** + * Implementation of compressor's interface. + */ +class ZstdCompressorImplBase : public Common::Base, + public Envoy::Compression::Compressor::Compressor, + NonCopyable { +public: + ZstdCompressorImplBase(uint32_t compression_level, bool enable_checksum, uint32_t strategy, + uint32_t chunk_size); + + // Compression::Compressor::Compressor + void compress(Buffer::Instance& buffer, Envoy::Compression::Compressor::State state) override; + + void process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode); + +protected: + virtual void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) PURE; + + virtual void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) PURE; + + virtual void compressPostprocess(Buffer::Instance& accumulation_buffer) PURE; + + std::unique_ptr cctx_; + const uint32_t compression_level_; +}; + +} // namespace Compressor +} // namespace Zstd +} // namespace Compression +} // namespace Envoy diff --git a/source/extensions/compression/zstd/common/BUILD b/source/extensions/compression/zstd/common/BUILD index 98b6a3abb62f..a0aa94556434 100644 --- a/source/extensions/compression/zstd/common/BUILD +++ b/source/extensions/compression/zstd/common/BUILD @@ -8,16 +8,6 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() -envoy_cc_library( - name = "zstd_base_lib", - srcs = ["base.cc"], - hdrs = ["base.h"], - external_deps = ["zstd"], - deps = [ - "//source/common/buffer:buffer_lib", - ], -) - envoy_cc_library( name = "zstd_dictionary_manager_lib", hdrs = ["dictionary_manager.h"], diff --git a/source/extensions/compression/zstd/compressor/BUILD b/source/extensions/compression/zstd/compressor/BUILD index e6167ff1a1cb..76c3343a3202 100644 --- a/source/extensions/compression/zstd/compressor/BUILD +++ b/source/extensions/compression/zstd/compressor/BUILD @@ -16,7 +16,8 @@ envoy_cc_library( deps = [ "//envoy/compression/compressor:compressor_interface", "//source/common/buffer:buffer_lib", - "//source/extensions/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/compressor:compressor_base", "//source/extensions/compression/zstd/common:zstd_dictionary_manager_lib", ], ) diff --git a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc index 89d3ed754e65..55ece012d086 100644 --- a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc +++ b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc @@ -1,7 +1,5 @@ #include "source/extensions/compression/zstd/compressor/zstd_compressor_impl.h" -#include "source/common/buffer/buffer_impl.h" - namespace Envoy { namespace Extensions { namespace Compression { @@ -11,15 +9,9 @@ namespace Compressor { ZstdCompressorImpl::ZstdCompressorImpl(uint32_t compression_level, bool enable_checksum, uint32_t strategy, const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size) - : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx), - cdict_manager_(cdict_manager), compression_level_(compression_level) { + : ZstdCompressorImplBase(compression_level, enable_checksum, strategy, chunk_size), + cdict_manager_(cdict_manager) { size_t result; - result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum); - RELEASE_ASSERT(!ZSTD_isError(result), ""); - - result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy); - RELEASE_ASSERT(!ZSTD_isError(result), ""); - if (cdict_manager_) { ZSTD_CDict* cdict = cdict_manager_->getFirstDictionary(); result = ZSTD_CCtx_refCDict(cctx_.get(), cdict); @@ -29,36 +21,17 @@ ZstdCompressorImpl::ZstdCompressorImpl(uint32_t compression_level, bool enable_c RELEASE_ASSERT(!ZSTD_isError(result), ""); } -void ZstdCompressorImpl::compress(Buffer::Instance& buffer, - Envoy::Compression::Compressor::State state) { - Buffer::OwnedImpl accumulation_buffer; - for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) { - if (input_slice.len_ > 0) { - setInput(input_slice); - process(accumulation_buffer, ZSTD_e_continue); - buffer.drain(input_slice.len_); - } - } - - ASSERT(buffer.length() == 0); - buffer.move(accumulation_buffer); +void ZstdCompressorImpl::compressPreprocess(Buffer::Instance&, + Envoy::Compression::Compressor::State) {} - if (state == Envoy::Compression::Compressor::State::Finish) { - process(buffer, ZSTD_e_end); - } +void ZstdCompressorImpl::compressProcess(const Buffer::Instance&, + const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) { + setInput(input_slice); + process(accumulation_buffer, ZSTD_e_continue); } -void ZstdCompressorImpl::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) { - bool finished; - do { - const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode); - getOutput(output_buffer); - // If we're on the last chunk we're finished when zstd returns 0, - // which means its consumed all the input AND finished the frame. - // Otherwise, we're finished when we've consumed all the input. - finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size); - } while (!finished); -} +void ZstdCompressorImpl::compressPostprocess(Buffer::Instance&) {} } // namespace Compressor } // namespace Zstd diff --git a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h index e0adaec7caaf..95da3d1a8e12 100644 --- a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h +++ b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h @@ -2,7 +2,8 @@ #include "envoy/compression/compressor/compressor.h" -#include "source/extensions/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" #include "source/extensions/compression/zstd/common/dictionary_manager.h" namespace Envoy { @@ -18,22 +19,21 @@ using ZstdCDictManagerPtr = std::unique_ptr; /** * Implementation of compressor's interface. */ -class ZstdCompressorImpl : public Common::Base, - public Envoy::Compression::Compressor::Compressor, - NonCopyable { +class ZstdCompressorImpl : public Envoy::Compression::Zstd::Compressor::ZstdCompressorImplBase { public: ZstdCompressorImpl(uint32_t compression_level, bool enable_checksum, uint32_t strategy, const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size); - // Compression::Compressor::Compressor - void compress(Buffer::Instance& buffer, Envoy::Compression::Compressor::State state) override; - private: - void process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode); + void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) override; + + void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) override; + + void compressPostprocess(Buffer::Instance& accumulation_buffer) override; - std::unique_ptr cctx_; const ZstdCDictManagerPtr& cdict_manager_; - const uint32_t compression_level_; }; } // namespace Compressor diff --git a/source/extensions/compression/zstd/decompressor/BUILD b/source/extensions/compression/zstd/decompressor/BUILD index 9fc4383b939a..361a3b866f23 100644 --- a/source/extensions/compression/zstd/decompressor/BUILD +++ b/source/extensions/compression/zstd/decompressor/BUILD @@ -18,7 +18,7 @@ envoy_cc_library( "//envoy/stats:stats_interface", "//envoy/stats:stats_macros", "//source/common/buffer:buffer_lib", - "//source/extensions/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_base_lib", "//source/extensions/compression/zstd/common:zstd_dictionary_manager_lib", ], ) diff --git a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc index 7a165da1973c..e2a00232c120 100644 --- a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc +++ b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc @@ -21,7 +21,7 @@ constexpr uint64_t MaxInflateRatio = 100; ZstdDecompressorImpl::ZstdDecompressorImpl(Stats::Scope& scope, const std::string& stats_prefix, const ZstdDDictManagerPtr& ddict_manager, uint32_t chunk_size) - : Common::Base(chunk_size), dctx_(ZSTD_createDCtx(), &ZSTD_freeDCtx), + : Envoy::Compression::Zstd::Common::Base(chunk_size), dctx_(ZSTD_createDCtx(), &ZSTD_freeDCtx), ddict_manager_(ddict_manager), stats_(generateStats(stats_prefix, scope)) {} void ZstdDecompressorImpl::decompress(const Buffer::Instance& input_buffer, diff --git a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h index 597a8c72ad0e..5f7ed470e3e4 100644 --- a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h +++ b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h @@ -5,7 +5,7 @@ #include "envoy/stats/stats_macros.h" #include "source/common/common/logger.h" -#include "source/extensions/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/base.h" #include "source/extensions/compression/zstd/common/dictionary_manager.h" #include "zstd_errors.h" @@ -39,7 +39,7 @@ struct ZstdDecompressorStats { /** * Implementation of decompressor's interface. */ -class ZstdDecompressorImpl : public Common::Base, +class ZstdDecompressorImpl : public Envoy::Compression::Zstd::Common::Base, public Envoy::Compression::Decompressor::Decompressor, public Logger::Loggable, NonCopyable { diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index 7f40c6e3efd3..8b1843373671 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -130,7 +130,7 @@ bool TestUtility::rawSlicesEqual(const Buffer::RawSlice* lhs, const Buffer::RawS } void TestUtility::feedBufferWithRandomCharacters(Buffer::Instance& buffer, uint64_t n_char, - uint64_t seed) { + uint64_t seed, uint64_t n_slice) { const std::string sample = "Neque porro quisquam est qui dolorem ipsum.."; std::mt19937 generate(seed); std::uniform_int_distribution<> distribute(1, sample.length() - 1); @@ -138,7 +138,9 @@ void TestUtility::feedBufferWithRandomCharacters(Buffer::Instance& buffer, uint6 for (uint64_t n = 0; n < n_char; ++n) { str += sample.at(distribute(generate)); } - buffer.add(str); + for (uint64_t n = 0; n < n_slice; ++n) { + buffer.add(str); + } } Stats::CounterSharedPtr TestUtility::findCounter(Stats::Store& store, const std::string& name) { diff --git a/test/test_common/utility.h b/test/test_common/utility.h index c365450e83a4..77e0785ea210 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -178,9 +178,10 @@ class TestUtility { * @param buffer supplies the buffer to be fed. * @param n_char number of characters that should be added to the supplied buffer. * @param seed seeds pseudo-random number generator (default = 0). + * @param n_slice number of slices (default = 1). */ static void feedBufferWithRandomCharacters(Buffer::Instance& buffer, uint64_t n_char, - uint64_t seed = 0); + uint64_t seed = 0, uint64_t n_slice = 1); /** * Finds a stat in a vector with the given name. diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 1a1eae3faa56..1edc5139573f 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -354,6 +354,7 @@ QUIC QoS qat qatzip +qatzstd RAII RANLUX RBAC