From f3ab40384bae88c1dbfc70e759804d83e6cc28ec Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Fri, 31 May 2024 16:06:42 -0300 Subject: [PATCH] feat: add consumer --mirror argument to consume only from the selected (#4048) --- Cargo.lock | 2 +- crates/fluvio-cli/src/client/consume/mod.rs | 9 ++++++ crates/fluvio/Cargo.toml | 2 +- crates/fluvio/src/consumer/config.rs | 4 +++ crates/fluvio/src/fluvio.rs | 29 ++++++++++++++++++- .../e2e/fluvio-core.bats | 11 +++++-- 6 files changed, 51 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de058a6289..ef632f8bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2195,7 +2195,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.22.2" +version = "0.22.3" dependencies = [ "anyhow", "async-channel 1.9.0", diff --git a/crates/fluvio-cli/src/client/consume/mod.rs b/crates/fluvio-cli/src/client/consume/mod.rs index 92cf2d48d3..ea789948b9 100644 --- a/crates/fluvio-cli/src/client/consume/mod.rs +++ b/crates/fluvio-cli/src/client/consume/mod.rs @@ -84,6 +84,10 @@ mod cmd { #[arg(short = 'p', long, value_name = "integer")] pub partition: Vec, + /// Remote cluster to consume from + #[arg(short = 'm', long)] + pub mirror: Option, + /// Consume records from all partitions #[arg(short = 'A', long = "all-partitions", conflicts_with_all = &["partition"])] pub all_partitions: bool, @@ -316,6 +320,10 @@ mod cmd { builder.offset_flush(DEFAULT_OFFSET_FLUSH_INTERVAL); } + if let Some(ref mirror) = self.mirror { + builder.mirror(mirror.clone()); + } + if let Some(max_bytes) = self.max_bytes { builder.max_bytes(max_bytes); } @@ -780,6 +788,7 @@ mod cmd { ConsumeOpt { topic: "TOPIC_NAME".to_string(), partition: Default::default(), + mirror: Default::default(), all_partitions: Default::default(), disable_continuous: Default::default(), disable_progressbar: Default::default(), diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 317f10ca3a..53762b01b2 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.22.2" +version = "0.22.3" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio/src/consumer/config.rs b/crates/fluvio/src/consumer/config.rs index dec5a73214..3a8a0d5429 100644 --- a/crates/fluvio/src/consumer/config.rs +++ b/crates/fluvio/src/consumer/config.rs @@ -60,6 +60,8 @@ pub struct ConsumerConfigExt { #[builder(default, setter(custom))] pub partition: Vec, #[builder(default, setter(strip_option, into))] + pub mirror: Option, + #[builder(default, setter(strip_option, into))] pub offset_consumer: Option, pub offset_start: Offset, #[builder(default)] @@ -93,6 +95,7 @@ impl ConsumerConfigExt { let Self { topic: _, partition: _, + mirror: _, offset_consumer, offset_start, disable_continuous, @@ -132,6 +135,7 @@ impl From for ConsumerConfig { let ConsumerConfigExt { topic: _, partition: _, + mirror: _, offset_consumer: _, offset_start: _, offset_strategy: _, diff --git a/crates/fluvio/src/fluvio.rs b/crates/fluvio/src/fluvio.rs index 9b9141a5e9..0d117e2578 100644 --- a/crates/fluvio/src/fluvio.rs +++ b/crates/fluvio/src/fluvio.rs @@ -1,6 +1,10 @@ use std::convert::TryFrom; use std::sync::Arc; +use fluvio_sc_schema::partition::PartitionMirrorConfig; +use fluvio_sc_schema::topic::MirrorConfig; +use fluvio_sc_schema::topic::PartitionMap; +use fluvio_sc_schema::topic::ReplicaSpec; use tracing::{debug, info}; use tokio::sync::OnceCell; use anyhow::{anyhow, Result}; @@ -323,7 +327,30 @@ impl Fluvio { .await? .ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))? .spec; - let partitions = if config.partition.is_empty() { + + let mirror_partition = if let Some(ref mirror) = &config.mirror { + match topic_spec.replicas() { + ReplicaSpec::Mirror(MirrorConfig::Home(home_mirror_config)) => { + let partitions_maps = + Vec::::from(home_mirror_config.as_partition_maps()); + partitions_maps.iter().find_map(|p| { + if let Some(PartitionMirrorConfig::Home(remote)) = &p.mirror { + if remote.remote_cluster == *mirror { + return Some(p.id); + } + } + None + }) + } + _ => None, + } + } else { + None + }; + + let partitions = if let Some(partition) = mirror_partition { + vec![partition] + } else if config.partition.is_empty() { (0..topic_spec.partitions()).collect() } else { config.partition.clone() diff --git a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats index 791d3f80f6..d8ca93fa7a 100644 --- a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats +++ b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats @@ -178,19 +178,24 @@ setup_file() { assert_success } -#TODO: use --mirror argument when it is available -@test "Can consume message from mirror topic produced from remote 1" { +@test "Can consume message from mirror topic produced from remote 1 by partition or remote" { sleep 5 run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -p 0 -B -d + assert_output 1$'\n'a$'\n'2$'\n'b + assert_success + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" --mirror "$REMOTE_NAME" -B -d assert_output 1$'\n'a$'\n'2$'\n'b assert_success } -@test "Can consume message from mirror topic produced from remote 2" { +@test "Can consume message from mirror topic produced from remote 2 by partition or remote" { sleep 5 run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -p 1 -B -d + assert_output 9$'\n'z$'\n'8$'\n'y + assert_success + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" --mirror "$REMOTE_NAME_2" -B -d assert_output 9$'\n'z$'\n'8$'\n'y assert_success }