Skip to content

Commit

Permalink
feat: add consumer --mirror argument to consume only from the selected
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed May 30, 2024
1 parent 716f0a2 commit 2a7002d
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ mod cmd {
#[arg(short = 'p', long, value_name = "integer")]
pub partition: Vec<PartitionId>,

/// Remote cluster to consume from
#[arg(short = 'm', long)]
pub mirror: Option<String>,

/// Consume records from all partitions
#[arg(short = 'A', long = "all-partitions", conflicts_with_all = &["partition"])]
pub all_partitions: bool,
Expand Down Expand Up @@ -316,6 +320,10 @@ mod cmd {
builder.offset_flush(DEFAULT_OFFSET_FLUSH_INTERVAL);
}

if let Some(ref mirror) = self.mirror {
builder.mirror(mirror.clone());
}

if let Some(max_bytes) = self.max_bytes {
builder.max_bytes(max_bytes);
}
Expand Down Expand Up @@ -780,6 +788,7 @@ mod cmd {
ConsumeOpt {
topic: "TOPIC_NAME".to_string(),
partition: Default::default(),
mirror: Default::default(),
all_partitions: Default::default(),
disable_continuous: Default::default(),
disable_progressbar: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.22.2"
version = "0.22.3"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
4 changes: 4 additions & 0 deletions crates/fluvio/src/consumer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct ConsumerConfigExt {
#[builder(default, setter(custom))]
pub partition: Vec<PartitionId>,
#[builder(default, setter(strip_option, into))]
pub mirror: Option<String>,
#[builder(default, setter(strip_option, into))]
pub offset_consumer: Option<String>,
pub offset_start: Offset,
#[builder(default)]
Expand Down Expand Up @@ -93,6 +95,7 @@ impl ConsumerConfigExt {
let Self {
topic: _,
partition: _,
mirror: _,
offset_consumer,
offset_start,
disable_continuous,
Expand Down Expand Up @@ -132,6 +135,7 @@ impl From<ConsumerConfigExt> for ConsumerConfig {
let ConsumerConfigExt {
topic: _,
partition: _,
mirror: _,
offset_consumer: _,
offset_start: _,
offset_strategy: _,
Expand Down
29 changes: 28 additions & 1 deletion crates/fluvio/src/fluvio.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::convert::TryFrom;
use std::sync::Arc;

use fluvio_sc_schema::partition::PartitionMirrorConfig;
use fluvio_sc_schema::topic::MirrorConfig;
use fluvio_sc_schema::topic::PartitionMap;
use fluvio_sc_schema::topic::ReplicaSpec;
use tracing::{debug, info};
use tokio::sync::OnceCell;
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -323,7 +327,30 @@ impl Fluvio {
.await?
.ok_or_else(|| FluvioError::TopicNotFound(topic.to_string()))?
.spec;
let partitions = if config.partition.is_empty() {

let mirror_partition = if let Some(ref mirror) = &config.mirror {
match topic_spec.replicas() {
ReplicaSpec::Mirror(MirrorConfig::Home(home_mirror_config)) => {
let partitions_maps =
Vec::<PartitionMap>::from(home_mirror_config.as_partition_maps());
partitions_maps.iter().find_map(|p| {
if let Some(PartitionMirrorConfig::Home(remote)) = &p.mirror {
if remote.remote_cluster == *mirror {
return Some(p.id);
}
}
None
})
}
_ => None,
}
} else {
None
};

let partitions = if let Some(partition) = mirror_partition {
vec![partition]
} else if config.partition.is_empty() {
(0..topic_spec.partitions()).collect()
} else {
config.partition.clone()
Expand Down
11 changes: 8 additions & 3 deletions tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,24 @@ setup_file() {
assert_success
}

#TODO: use --mirror argument when it is available
@test "Can consume message from mirror topic produced from remote 1" {
@test "Can consume message from mirror topic produced from remote 1 by partition or remote" {
sleep 5
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -p 0 -B -d
assert_output 1$'\n'a$'\n'2$'\n'b
assert_success

run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" --mirror "$REMOTE_NAME" -B -d
assert_output 1$'\n'a$'\n'2$'\n'b
assert_success
}

@test "Can consume message from mirror topic produced from remote 2" {
@test "Can consume message from mirror topic produced from remote 2 by partition or remote" {
sleep 5
run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -p 1 -B -d
assert_output 9$'\n'z$'\n'8$'\n'y
assert_success

run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" --mirror "$REMOTE_NAME_2" -B -d
assert_output 9$'\n'z$'\n'8$'\n'y
assert_success
}
Expand Down

0 comments on commit 2a7002d

Please sign in to comment.