Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: introduce zstd compression #3185

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,6 @@ jobs:
max_attempts: 3
command: make cli-fluvio-smoke


# test smdk
- name: Download artifact - smdk
if: matrix.test == 'smdk'
Expand Down
28 changes: 24 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ fluvio = { version = "0.19.0", path = "crates/fluvio" }
fluvio-auth = { path = "crates/fluvio-auth" }
fluvio-channel = { path = "crates/fluvio-channel" }
fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.2", path = "crates/fluvio-compression" }
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.22.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod stats;
pub use cmd::ProduceOpt;

mod cmd {

use std::sync::Arc;
use std::io::{BufReader, BufRead};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -91,7 +90,7 @@ mod cmd {
pub raw: bool,

/// Compression algorithm to use when sending records.
/// Supported values: none, gzip, snappy and lz4.
/// Supported values: none, gzip, snappy, zstd and lz4.
#[arg(long)]
pub compression: Option<Compression>,

Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-compression/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-compression"
version = "0.2.5"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand All @@ -19,4 +19,4 @@ snap = { version = "1" }
serde = { workspace = true, features = ['derive'] }
lz4_flex = { version = "0.10.0", default-features = false, features = ["safe-decode", "safe-encode", "frame"] }
thiserror = { workspace = true }

zstd = { version = "0.12.3+zstd.1.5.2", features = ['wasm'], default-features = false }
2 changes: 1 addition & 1 deletion crates/fluvio-compression/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Library with handlers to compress and uncompress data in the fluvio protocol.

In fluvio, compression is done in producer side, then consumers and SPU when it is using SmartModules, uncompress the data using the compression information that is in the attributes of the batch.

Currently, the supported compressions codecs are None (default), Gzip, Snappy and LZ4.
Currently, the supported compressions codecs are None (default), Gzip, Snappy, Zstd and LZ4.
10 changes: 10 additions & 0 deletions crates/fluvio-compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod error;
mod gzip;
mod snappy;
mod lz4;
mod zstd;

pub use error::CompressionError;
use serde::{Serialize, Deserialize};
Expand All @@ -21,6 +22,7 @@ pub enum Compression {
Gzip = 1,
Snappy = 2,
Lz4 = 3,
Zstd = 4,
}

impl TryFrom<i8> for Compression {
Expand All @@ -31,6 +33,7 @@ impl TryFrom<i8> for Compression {
1 => Ok(Compression::Gzip),
2 => Ok(Compression::Snappy),
3 => Ok(Compression::Lz4),
4 => Ok(Compression::Zstd),
_ => Err(CompressionError::UnknownCompressionFormat(format!(
"i8 representation: {v}"
))),
Expand All @@ -47,6 +50,7 @@ impl FromStr for Compression {
"gzip" => Ok(Compression::Gzip),
"snappy" => Ok(Compression::Snappy),
"lz4" => Ok(Compression::Lz4),
"zstd" => Ok(Compression::Zstd),
_ => Err(CompressionError::UnknownCompressionFormat(s.into())),
}
}
Expand All @@ -60,6 +64,7 @@ impl Compression {
Compression::Gzip => gzip::compress(src),
Compression::Snappy => snappy::compress(src),
Compression::Lz4 => lz4::compress(src),
Compression::Zstd => zstd::compress(src),
}
}

Expand All @@ -79,6 +84,10 @@ impl Compression {
let output = lz4::uncompress(src)?;
Ok(Some(output))
}
Compression::Zstd => {
let output = zstd::uncompress(src)?;
Ok(Some(output))
}
}
}
}
Expand All @@ -89,6 +98,7 @@ impl std::fmt::Display for Compression {
Compression::Gzip => write!(f, "gzip"),
Compression::Snappy => write!(f, "snappy"),
Compression::Lz4 => write!(f, "lz4"),
Compression::Zstd => write!(f, "zstd"),
}
}
}
37 changes: 37 additions & 0 deletions crates/fluvio-compression/src/zstd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::io::{Read, Write};

use bytes::{BufMut, Bytes, BytesMut};
use zstd::{Decoder, Encoder};

use crate::error::CompressionError;

pub fn compress(src: &[u8]) -> Result<Bytes, CompressionError> {
let mut encoder = Encoder::new(BytesMut::new().writer(), 1)?;
encoder.write_all(src)?;
Ok(encoder.finish()?.into_inner().freeze())
}

pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {
let mut decoder = Decoder::new(src)?;
let mut buffer: Vec<u8> = Vec::new();
decoder.read_to_end(&mut buffer)?;
Ok(buffer)
}

#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;

#[test]
fn test_compress_decompress() {
let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
let compressed = compress(text.as_bytes()).unwrap();

assert!(compressed.len() < text.as_bytes().len());

let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap();

assert_eq!(uncompressed, text);
}
}
2 changes: 1 addition & 1 deletion crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ mod tests {
.expect_err("This yaml should error");
#[cfg(unix)]
assert_eq!(
"unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`",
"unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`, `zstd`",
format!("{connector_cfg}")
);

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.22.1"
version = "0.22.2"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio-controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ pub enum CompressionAlgorithm {
#[default]
#[fluvio(tag = 4)]
Any,
#[fluvio(tag = 5)]
Zstd,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -635,6 +637,7 @@ impl std::str::FromStr for CompressionAlgorithm {
"snappy" => Ok(CompressionAlgorithm::Snappy),
"lz4" => Ok(CompressionAlgorithm::Lz4),
"any" => Ok(CompressionAlgorithm::Any),
"zstd" => Ok(CompressionAlgorithm::Zstd),
_ => Err(InvalidCompressionAlgorithm),
}
}
Expand All @@ -647,6 +650,7 @@ impl std::fmt::Display for CompressionAlgorithm {
Self::Snappy => write!(f, "snappy"),
Self::Lz4 => write!(f, "lz4"),
Self::Any => write!(f, "any"),
Self::Zstd => write!(f, "zstd"),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ fn validate_records<R: BatchRecords>(
CompressionAlgorithm::Gzip => batch_compression == Compression::Gzip,
CompressionAlgorithm::Snappy => batch_compression == Compression::Snappy,
CompressionAlgorithm::Lz4 => batch_compression == Compression::Lz4,
CompressionAlgorithm::Zstd => batch_compression == Compression::Zstd,
}
}) {
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.19.0"
version = "0.19.1"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
4 changes: 3 additions & 1 deletion crates/fluvio/src/producer/memory_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ impl MemoryBatch {
(self.current_size_uncompressed as f32
* match self.compression {
Compression::None => 1.0,
Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5,
Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => {
0.5
}
}) as usize
+ Batch::<RawRecords>::default().write_size(0)
}
Expand Down
9 changes: 8 additions & 1 deletion crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ use fluvio_types::event::StickyEvent;
mod accumulator;
mod config;
mod error;
pub mod event;
mod output;
mod record;
mod partitioning;
mod partition_producer;
mod memory_batch;

pub mod event;

pub use fluvio_protocol::record::{RecordKey, RecordData};

use crate::FluvioError;
Expand Down Expand Up @@ -343,6 +344,12 @@ impl TopicProducer {
format!("Compression in the producer ({compression_config}) does not match with topic level compression (lz4)"),
)).into()),
},
CompressionAlgorithm::Zstd => match config.compression {
Some(Compression::Zstd) | None => Compression::Zstd,
Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration(
format!("Compression in the producer ({compression_config}) does not match with topic level compression (zstd)" ),
)).into()),
},
CompressionAlgorithm::None => match config.compression {
Some(Compression::None) | None => Compression::None,
Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration(
Expand Down
1 change: 1 addition & 0 deletions k8-util/helm/fluvio-sys/templates/crd_partition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ spec:
- Gzip
- Snappy
- Lz4
- Zstd
status:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
1 change: 1 addition & 0 deletions k8-util/helm/fluvio-sys/templates/crd_topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ spec:
- Gzip
- Snappy
- Lz4
- Zstd
storage:
type: object
properties:
Expand Down
29 changes: 29 additions & 0 deletions tests/cli/fluvio_smoke_tests/e2e-basic.bats
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ setup_file() {
export TOPIC_NAME_12
debug_msg "Topic name: $TOPIC_NAME_12"

TOPIC_NAME_13=$(random_string)
export TOPIC_NAME_13
debug_msg "Topic name: $TOPIC_NAME_13"

MESSAGE="$(random_string 7)"
export MESSAGE
debug_msg "$MESSAGE"
Expand All @@ -79,6 +83,9 @@ setup_file() {
LZ4_MESSAGE="$MESSAGE-LZ4"
export LZ4_MESSAGE

ZSTD_MESSAGE="$MESSAGE-ZSTD"
export ZSTD_MESSAGE

LINGER_MESSAGE="$MESSAGE-LINGER"
export LINGER_MESSAGE

Expand Down Expand Up @@ -129,6 +136,11 @@ teardown_file() {
assert_success
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_12"
assert_success

if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then
run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_13" --compression-type zstd
assert_success
fi
}

# Produce message
Expand All @@ -145,6 +157,12 @@ teardown_file() {
assert_success
run bash -c 'echo -e "$LZ4_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_6" --compression lz4'
assert_success

if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then
run bash -c 'echo -e "$ZSTD_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_13" --compression zstd'
assert_success
fi

run bash -c 'echo -e "$LINGER_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_7" --linger 0s'
assert_success
run bash -c 'echo -e "$BATCH_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_8" --batch-size 100'
Expand Down Expand Up @@ -211,6 +229,17 @@ teardown_file() {
assert_success
}

@test "Consume zstd message" {
EstebanBorai marked this conversation as resolved.
Show resolved Hide resolved
if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" || "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]]; then
skip "don't run on stable version"
fi
morenol marked this conversation as resolved.
Show resolved Hide resolved

run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_13" -B -d

assert_output --partial "$ZSTD_MESSAGE"
assert_success
}

@test "ReadCommitted Consume ReadCommitted message" {
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_9" -B -d --isolation read_committed

Expand Down