From 77e9620b3b90944be171afe70a7d426827cf839c Mon Sep 17 00:00:00 2001 From: Esteban Borai Date: Sun, 23 Apr 2023 01:51:47 -0400 Subject: [PATCH 1/2] feat: introduce zstd compression --- .github/workflows/ci.yml | 1 - Cargo.lock | 28 ++++++++++++-- Cargo.toml | 2 +- crates/fluvio-cli/src/client/produce/mod.rs | 3 +- crates/fluvio-compression/Cargo.toml | 4 +- crates/fluvio-compression/README.md | 2 +- crates/fluvio-compression/src/lib.rs | 10 +++++ crates/fluvio-compression/src/zstd.rs | 37 +++++++++++++++++++ .../src/config/mod.rs | 2 +- .../fluvio-controlplane-metadata/Cargo.toml | 2 +- .../src/topic/spec.rs | 4 ++ .../src/services/public/produce_handler.rs | 1 + crates/fluvio/src/producer/memory_batch.rs | 4 +- crates/fluvio/src/producer/mod.rs | 9 ++++- .../fluvio-sys/templates/crd_partition.yaml | 1 + .../helm/fluvio-sys/templates/crd_topic.yaml | 1 + tests/cli/fluvio_smoke_tests/e2e-basic.bats | 29 +++++++++++++++ 17 files changed, 125 insertions(+), 15 deletions(-) create mode 100644 crates/fluvio-compression/src/zstd.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 57d7507f44..40ebe1ac22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1032,7 +1032,6 @@ jobs: max_attempts: 3 command: make cli-fluvio-smoke - # test smdk - name: Download artifact - smdk if: matrix.test == 'smdk' diff --git a/Cargo.lock b/Cargo.lock index 0e66967fcd..6ac6c55fa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2600,7 +2600,7 @@ dependencies = [ [[package]] name = "fluvio-compression" -version = "0.2.5" +version = "0.3.0" dependencies = [ "bytes 1.4.0", "flate2", @@ -2608,6 +2608,7 @@ dependencies = [ "serde", "snap", "thiserror", + "zstd 0.12.3+zstd.1.5.2", ] [[package]] @@ -2686,7 +2687,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.22.1" +version = "0.22.2" dependencies = [ "async-trait", "base64 0.21.0", @@ -7919,7 +7920,7 @@ dependencies = [ "sha2 0.10.6", "toml 0.5.11", "windows-sys 0.45.0", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -8615,7 +8616,16 @@ version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.12.3+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" +dependencies = [ + "zstd-safe 6.0.5+zstd.1.5.4", ] [[package]] @@ -8628,6 +8638,16 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "6.0.5+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.8+zstd.1.5.5" diff --git a/Cargo.toml b/Cargo.toml index 8f5e7a4eb8..1543588c37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,7 +138,7 @@ fluvio = { version = "0.19.0", path = "crates/fluvio" } fluvio-auth = { path = "crates/fluvio-auth" } fluvio-channel = { path = "crates/fluvio-channel" } fluvio-cli-common = { path = "crates/fluvio-cli-common"} -fluvio-compression = { version = "0.2", path = "crates/fluvio-compression" } +fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" } fluvio-connector-package = { path = "crates/fluvio-connector-package/" } fluvio-controlplane = { path = "crates/fluvio-controlplane" } fluvio-controlplane-metadata = { version = "0.22.0", default-features = false, path = "crates/fluvio-controlplane-metadata" } diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 5d32d668a9..8107497a92 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -4,7 +4,6 @@ mod stats; pub use cmd::ProduceOpt; mod cmd { - use std::sync::Arc; use std::io::{BufReader, BufRead}; use std::collections::BTreeMap; @@ -91,7 +90,7 @@ mod cmd { pub raw: bool, /// Compression algorithm to use when sending records. - /// Supported values: none, gzip, snappy and lz4. + /// Supported values: none, gzip, snappy, zstd and lz4. #[arg(long)] pub compression: Option, diff --git a/crates/fluvio-compression/Cargo.toml b/crates/fluvio-compression/Cargo.toml index 2e8ea953f7..02b5b175c2 100644 --- a/crates/fluvio-compression/Cargo.toml +++ b/crates/fluvio-compression/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-compression" -version = "0.2.5" +version = "0.3.0" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] @@ -19,4 +19,4 @@ snap = { version = "1" } serde = { workspace = true, features = ['derive'] } lz4_flex = { version = "0.10.0", default-features = false, features = ["safe-decode", "safe-encode", "frame"] } thiserror = { workspace = true } - +zstd = { version = "0.12.3+zstd.1.5.2", features = ['wasm'], default-features = false } diff --git a/crates/fluvio-compression/README.md b/crates/fluvio-compression/README.md index e49e03dd43..8966595600 100644 --- a/crates/fluvio-compression/README.md +++ b/crates/fluvio-compression/README.md @@ -4,4 +4,4 @@ Library with handlers to compress and uncompress data in the fluvio protocol. In fluvio, compression is done in producer side, then consumers and SPU when it is using SmartModules, uncompress the data using the compression information that is in the attributes of the batch. -Currently, the supported compressions codecs are None (default), Gzip, Snappy and LZ4. +Currently, the supported compressions codecs are None (default), Gzip, Snappy, Zstd and LZ4. diff --git a/crates/fluvio-compression/src/lib.rs b/crates/fluvio-compression/src/lib.rs index 9d9fc1b0fb..832c888565 100644 --- a/crates/fluvio-compression/src/lib.rs +++ b/crates/fluvio-compression/src/lib.rs @@ -6,6 +6,7 @@ mod error; mod gzip; mod snappy; mod lz4; +mod zstd; pub use error::CompressionError; use serde::{Serialize, Deserialize}; @@ -21,6 +22,7 @@ pub enum Compression { Gzip = 1, Snappy = 2, Lz4 = 3, + Zstd = 4, } impl TryFrom for Compression { @@ -31,6 +33,7 @@ impl TryFrom for Compression { 1 => Ok(Compression::Gzip), 2 => Ok(Compression::Snappy), 3 => Ok(Compression::Lz4), + 4 => Ok(Compression::Zstd), _ => Err(CompressionError::UnknownCompressionFormat(format!( "i8 representation: {v}" ))), @@ -47,6 +50,7 @@ impl FromStr for Compression { "gzip" => Ok(Compression::Gzip), "snappy" => Ok(Compression::Snappy), "lz4" => Ok(Compression::Lz4), + "zstd" => Ok(Compression::Zstd), _ => Err(CompressionError::UnknownCompressionFormat(s.into())), } } @@ -60,6 +64,7 @@ impl Compression { Compression::Gzip => gzip::compress(src), Compression::Snappy => snappy::compress(src), Compression::Lz4 => lz4::compress(src), + Compression::Zstd => zstd::compress(src), } } @@ -79,6 +84,10 @@ impl Compression { let output = lz4::uncompress(src)?; Ok(Some(output)) } + Compression::Zstd => { + let output = zstd::uncompress(src)?; + Ok(Some(output)) + } } } } @@ -89,6 +98,7 @@ impl std::fmt::Display for Compression { Compression::Gzip => write!(f, "gzip"), Compression::Snappy => write!(f, "snappy"), Compression::Lz4 => write!(f, "lz4"), + Compression::Zstd => write!(f, "zstd"), } } } diff --git a/crates/fluvio-compression/src/zstd.rs b/crates/fluvio-compression/src/zstd.rs new file mode 100644 index 0000000000..8a532a27f9 --- /dev/null +++ b/crates/fluvio-compression/src/zstd.rs @@ -0,0 +1,37 @@ +use std::io::{Read, Write}; + +use bytes::{BufMut, Bytes, BytesMut}; +use zstd::{Decoder, Encoder}; + +use crate::error::CompressionError; + +pub fn compress(src: &[u8]) -> Result { + let mut encoder = Encoder::new(BytesMut::new().writer(), 1)?; + encoder.write_all(src)?; + Ok(encoder.finish()?.into_inner().freeze()) +} + +pub fn uncompress(src: T) -> Result, CompressionError> { + let mut decoder = Decoder::new(src)?; + let mut buffer: Vec = Vec::new(); + decoder.read_to_end(&mut buffer)?; + Ok(buffer) +} + +#[cfg(test)] +mod tests { + use bytes::Buf; + use super::*; + + #[test] + fn test_compress_decompress() { + let text = "FLUVIO_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + let compressed = compress(text.as_bytes()).unwrap(); + + assert!(compressed.len() < text.as_bytes().len()); + + let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap(); + + assert_eq!(uncompressed, text); + } +} diff --git a/crates/fluvio-connector-package/src/config/mod.rs b/crates/fluvio-connector-package/src/config/mod.rs index 4b7e381377..014c19151a 100644 --- a/crates/fluvio-connector-package/src/config/mod.rs +++ b/crates/fluvio-connector-package/src/config/mod.rs @@ -417,7 +417,7 @@ mod tests { .expect_err("This yaml should error"); #[cfg(unix)] assert_eq!( - "unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`", + "unknown variant `gzipaoeu`, expected one of `none`, `gzip`, `snappy`, `lz4`, `zstd`", format!("{connector_cfg}") ); diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 59b7964547..b7c57f6baa 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.22.1" +version = "0.22.2" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-controlplane-metadata/src/topic/spec.rs b/crates/fluvio-controlplane-metadata/src/topic/spec.rs index 63b453164e..795768b124 100644 --- a/crates/fluvio-controlplane-metadata/src/topic/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/topic/spec.rs @@ -619,6 +619,8 @@ pub enum CompressionAlgorithm { #[default] #[fluvio(tag = 4)] Any, + #[fluvio(tag = 5)] + Zstd, } #[derive(Debug, thiserror::Error)] @@ -635,6 +637,7 @@ impl std::str::FromStr for CompressionAlgorithm { "snappy" => Ok(CompressionAlgorithm::Snappy), "lz4" => Ok(CompressionAlgorithm::Lz4), "any" => Ok(CompressionAlgorithm::Any), + "zstd" => Ok(CompressionAlgorithm::Zstd), _ => Err(InvalidCompressionAlgorithm), } } @@ -647,6 +650,7 @@ impl std::fmt::Display for CompressionAlgorithm { Self::Snappy => write!(f, "snappy"), Self::Lz4 => write!(f, "lz4"), Self::Any => write!(f, "any"), + Self::Zstd => write!(f, "zstd"), } } } diff --git a/crates/fluvio-spu/src/services/public/produce_handler.rs b/crates/fluvio-spu/src/services/public/produce_handler.rs index 33beaf2624..1261534b98 100644 --- a/crates/fluvio-spu/src/services/public/produce_handler.rs +++ b/crates/fluvio-spu/src/services/public/produce_handler.rs @@ -259,6 +259,7 @@ fn validate_records( CompressionAlgorithm::Gzip => batch_compression == Compression::Gzip, CompressionAlgorithm::Snappy => batch_compression == Compression::Snappy, CompressionAlgorithm::Lz4 => batch_compression == Compression::Lz4, + CompressionAlgorithm::Zstd => batch_compression == Compression::Zstd, } }) { Ok(()) diff --git a/crates/fluvio/src/producer/memory_batch.rs b/crates/fluvio/src/producer/memory_batch.rs index 97c791f0fb..5f1e90be7c 100644 --- a/crates/fluvio/src/producer/memory_batch.rs +++ b/crates/fluvio/src/producer/memory_batch.rs @@ -76,7 +76,9 @@ impl MemoryBatch { (self.current_size_uncompressed as f32 * match self.compression { Compression::None => 1.0, - Compression::Gzip | Compression::Snappy | Compression::Lz4 => 0.5, + Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => { + 0.5 + } }) as usize + Batch::::default().write_size(0) } diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index cc5fe08ca3..0ed1de677e 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -14,13 +14,14 @@ use fluvio_types::event::StickyEvent; mod accumulator; mod config; mod error; -pub mod event; mod output; mod record; mod partitioning; mod partition_producer; mod memory_batch; +pub mod event; + pub use fluvio_protocol::record::{RecordKey, RecordData}; use crate::FluvioError; @@ -343,6 +344,12 @@ impl TopicProducer { format!("Compression in the producer ({compression_config}) does not match with topic level compression (lz4)"), )).into()), }, + CompressionAlgorithm::Zstd => match config.compression { + Some(Compression::Zstd) | None => Compression::Zstd, + Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration( + format!("Compression in the producer ({compression_config}) does not match with topic level compression (zstd)" ), + )).into()), + }, CompressionAlgorithm::None => match config.compression { Some(Compression::None) | None => Compression::None, Some(compression_config) => return Err(FluvioError::Producer(ProducerError::InvalidConfiguration( diff --git a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml index a147f4a90a..0bd3f6fd6e 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_partition.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_partition.yaml @@ -53,6 +53,7 @@ spec: - Gzip - Snappy - Lz4 + - Zstd status: type: object x-kubernetes-preserve-unknown-fields: true diff --git a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml index 0fbfcebce8..060310a0db 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_topic.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_topic.yaml @@ -80,6 +80,7 @@ spec: - Gzip - Snappy - Lz4 + - Zstd storage: type: object properties: diff --git a/tests/cli/fluvio_smoke_tests/e2e-basic.bats b/tests/cli/fluvio_smoke_tests/e2e-basic.bats index ffa808fb22..50505795a8 100644 --- a/tests/cli/fluvio_smoke_tests/e2e-basic.bats +++ b/tests/cli/fluvio_smoke_tests/e2e-basic.bats @@ -60,6 +60,10 @@ setup_file() { export TOPIC_NAME_12 debug_msg "Topic name: $TOPIC_NAME_12" + TOPIC_NAME_13=$(random_string) + export TOPIC_NAME_13 + debug_msg "Topic name: $TOPIC_NAME_13" + MESSAGE="$(random_string 7)" export MESSAGE debug_msg "$MESSAGE" @@ -79,6 +83,9 @@ setup_file() { LZ4_MESSAGE="$MESSAGE-LZ4" export LZ4_MESSAGE + ZSTD_MESSAGE="$MESSAGE-ZSTD" + export ZSTD_MESSAGE + LINGER_MESSAGE="$MESSAGE-LINGER" export LINGER_MESSAGE @@ -129,6 +136,11 @@ teardown_file() { assert_success run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_12" assert_success + + if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME_13" --compression-type zstd + assert_success + fi } # Produce message @@ -145,6 +157,12 @@ teardown_file() { assert_success run bash -c 'echo -e "$LZ4_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_6" --compression lz4' assert_success + + if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" != "stable" && "$FLUVIO_CLUSTER_RELEASE_CHANNEL" != "stable" ]]; then + run bash -c 'echo -e "$ZSTD_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_13" --compression zstd' + assert_success + fi + run bash -c 'echo -e "$LINGER_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_7" --linger 0s' assert_success run bash -c 'echo -e "$BATCH_MESSAGE" | timeout 15s "$FLUVIO_BIN" produce "$TOPIC_NAME_8" --batch-size 100' @@ -211,6 +229,17 @@ teardown_file() { assert_success } +@test "Consume zstd message" { + if [[ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" || "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]]; then + skip "don't run on stable version" + fi + + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_13" -B -d + + assert_output --partial "$ZSTD_MESSAGE" + assert_success +} + @test "ReadCommitted Consume ReadCommitted message" { run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME_9" -B -d --isolation read_committed From 653ebbac82c8a93a4ba3cd0b28843a5f860519cf Mon Sep 17 00:00:00 2001 From: Esteban Borai Date: Tue, 16 May 2023 17:22:44 -0400 Subject: [PATCH 2/2] chore: bump version --- crates/fluvio/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 3e0fd796f5..3e7477c736 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.19.0" +version = "0.19.1" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "]