diff --git a/Cargo.lock b/Cargo.lock index 81ec3b249c..b28c84a386 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2590,7 +2590,7 @@ dependencies = [ "serde_json", "serde_yaml 0.9.25", "tracing", - "trybuild 1.0.82", + "trybuild 1.0.42", ] [[package]] @@ -2788,7 +2788,7 @@ dependencies = [ [[package]] name = "fluvio-package-index" -version = "0.7.4" +version = "0.7.5" dependencies = [ "http", "once_cell", @@ -2802,7 +2802,7 @@ dependencies = [ [[package]] name = "fluvio-protocol" -version = "0.10.5" +version = "0.10.6" dependencies = [ "bytes 1.4.0", "content_inspector", diff --git a/crates/fluvio-connector-common/src/config.rs b/crates/fluvio-connector-common/src/config.rs index 98f19fdc31..b603366543 100644 --- a/crates/fluvio-connector-common/src/config.rs +++ b/crates/fluvio-connector-common/src/config.rs @@ -30,7 +30,7 @@ pub fn get_value(value: Value, root: Option<&str>) -> Result { #[cfg(test)] mod tests { - use fluvio_connector_package::config::MetaConfigV1; + use fluvio_connector_package::config::{MetaConfigV1, ConsumerPartitionConfig}; #[test] fn test_from_value() { @@ -57,7 +57,7 @@ mod tests { .consumer .expect("expected some consume config") .partition, - Some(0) + ConsumerPartitionConfig::One(0) ); } } diff --git a/crates/fluvio-connector-common/src/consumer.rs b/crates/fluvio-connector-common/src/consumer.rs index 07559a1b6d..ae888433a9 100644 --- a/crates/fluvio-connector-common/src/consumer.rs +++ b/crates/fluvio-connector-common/src/consumer.rs @@ -1,5 +1,6 @@ -use fluvio::{FluvioConfig, Fluvio}; +use fluvio::{FluvioConfig, Fluvio, PartitionSelectionStrategy}; use fluvio::dataplane::record::ConsumerRecord; +use fluvio_connector_package::config::ConsumerPartitionConfig; use fluvio_sc_schema::errors::ErrorCode; use futures::StreamExt; use crate::{config::ConnectorConfig, Result}; @@ -24,28 +25,50 @@ pub async fn consumer_stream_from_config( let fluvio = Fluvio::connect_with_config(&cluster_config).await?; ensure_topic_exists(config).await?; - let consumer = fluvio - .partition_consumer( - config.meta().topic(), - config - .meta() - .consumer() - .and_then(|c| c.partition) - .unwrap_or_default(), - ) - .await?; let mut builder = fluvio::ConsumerConfig::builder(); - if let Some(max_bytes) = config.meta().consumer().and_then(|c| c.max_bytes) { builder.max_bytes(max_bytes.as_u64() as i32); } - if let Some(smartmodules) = smartmodule_vec_from_config(config) { builder.smartmodule(smartmodules); } - let config = builder.build()?; + let consumer_config = builder.build()?; + let consumer_partition = config + .meta() + .consumer() + .map(|c| c.partition.clone()) + .unwrap_or_default(); let offset = fluvio::Offset::end(); - let stream = consumer.stream_with_config(offset, config).await?; + let topic = config.meta().topic().to_string(); + let stream = match consumer_partition { + ConsumerPartitionConfig::One(partition) => { + let consumer = fluvio.partition_consumer(topic, partition).await?; + consumer + .stream_with_config(offset, consumer_config) + .await? + .boxed() + } + ConsumerPartitionConfig::All => { + let consumer = fluvio + .consumer(PartitionSelectionStrategy::All(topic)) + .await?; + consumer + .stream_with_config(offset, consumer_config) + .await? + .boxed() + } + ConsumerPartitionConfig::Many(partitions) => { + let partitions = partitions.into_iter().map(|p| (topic.clone(), p)).collect(); + let consumer = fluvio + .consumer(PartitionSelectionStrategy::Multiple(partitions)) + .await?; + consumer + .stream_with_config(offset, consumer_config) + .await? + .boxed() + } + }; + Ok((fluvio, stream)) } diff --git a/crates/fluvio-connector-package/src/config/mod.rs b/crates/fluvio-connector-package/src/config/mod.rs index 6b3a7d2961..fd1196c751 100644 --- a/crates/fluvio-connector-package/src/config/mod.rs +++ b/crates/fluvio-connector-package/src/config/mod.rs @@ -8,9 +8,11 @@ use std::time::Duration; use fluvio_controlplane_metadata::topic::config::TopicConfig; use fluvio_types::PartitionId; +use serde::de::{Visitor, SeqAccess}; +use serde::ser::SerializeSeq; use tracing::debug; use anyhow::Result; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Deserializer, Serializer}; use bytesize::ByteSize; use fluvio_smartengine::transformation::TransformationConfig; @@ -272,8 +274,8 @@ impl MetaConfig<'_> { #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct ConsumerParameters { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub partition: Option, + #[serde(default, skip_serializing_if = "ConsumerPartitionConfig::is_default")] + pub partition: ConsumerPartitionConfig, #[serde( with = "bytesize_serde", skip_serializing_if = "Option::is_none", @@ -386,6 +388,102 @@ impl Serialize for SecretName { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ConsumerPartitionConfig { + All, + One(PartitionId), + Many(Vec), +} + +impl Default for ConsumerPartitionConfig { + fn default() -> Self { + Self::One(0) + } +} + +impl ConsumerPartitionConfig { + pub fn is_default(&self) -> bool { + matches!(self, ConsumerPartitionConfig::One(partition) if partition.eq(&PartitionId::default())) + } +} + +struct PartitionConfigVisitor; +impl<'de> Visitor<'de> for PartitionConfigVisitor { + type Value = ConsumerPartitionConfig; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("integer, sequence of integers or `all` string") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + if v.eq("all") { + Ok(ConsumerPartitionConfig::All) + } else { + Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(v), + &self, + )) + } + } + + fn visit_u32(self, v: u32) -> std::result::Result + where + E: serde::de::Error, + { + Ok(ConsumerPartitionConfig::One(v)) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + let partition = PartitionId::try_from(v).map_err(E::custom)?; + Ok(ConsumerPartitionConfig::One(partition)) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut partitions = Vec::with_capacity(seq.size_hint().unwrap_or(2)); + while let Some(next) = seq.next_element()? { + partitions.push(next); + } + Ok(ConsumerPartitionConfig::Many(partitions)) + } +} + +impl<'de> Deserialize<'de> for ConsumerPartitionConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_any(PartitionConfigVisitor) + } +} + +impl Serialize for ConsumerPartitionConfig { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + ConsumerPartitionConfig::All => serializer.serialize_str("all"), + ConsumerPartitionConfig::One(partition) => serializer.serialize_u32(*partition), + ConsumerPartitionConfig::Many(partitions) => { + let mut seq_serializer = serializer.serialize_seq(Some(partitions.len()))?; + for p in partitions { + seq_serializer.serialize_element(p)?; + } + seq_serializer.end() + } + } + } +} + impl ConnectorConfig { pub fn from_file>(path: P) -> Result { let mut file = File::open(path.into())?; @@ -498,7 +596,7 @@ mod tests { batch_size: Some(ByteSize::mb(44)), }), consumer: Some(ConsumerParameters { - partition: Some(10), + partition: ConsumerPartitionConfig::One(10), max_bytes: Some(ByteSize::mb(1)), }), secrets: Some(vec![SecretConfig { @@ -577,7 +675,7 @@ mod tests { batch_size: Some(ByteSize::mb(44)), }), consumer: Some(ConsumerParameters { - partition: Some(10), + partition: ConsumerPartitionConfig::One(10), max_bytes: Some(ByteSize::mb(1)), }), secrets: Some(vec![SecretConfig { @@ -784,7 +882,7 @@ mod tests { }), consumer: Some(ConsumerParameters { max_bytes: Some(ByteSize::b(1400)), - partition: None, + partition: Default::default(), }), secrets: None, }, @@ -993,7 +1091,7 @@ mod tests { }), consumer: Some(ConsumerParameters { max_bytes: Some(ByteSize::b(1400)), - partition: None, + partition: Default::default(), }), secrets: None, }, @@ -1004,4 +1102,99 @@ mod tests { assert_eq!(have.version(), "0.1.0"); assert_eq!(have.r#type(), "mqtt-source"); } + + #[test] + fn test_deser_partition_config() { + //when + + let with_one_partition: ConsumerParameters = serde_yaml::from_str( + r#" + partition: 1 + "#, + ) + .expect("one partition"); + + let with_multiple_partitions: ConsumerParameters = serde_yaml::from_str( + r#" + partition: + - 120 + - 230 + "#, + ) + .expect("sequence of partitions"); + + let with_all_partitions: ConsumerParameters = serde_yaml::from_str( + r#" + partition: all + "#, + ) + .expect("all partitions"); + + let connector_cfg_all_partitions = + ConnectorConfig::from_file("test-data/connectors/all-partitions.yaml") + .expect("Failed to load test config"); + + let connector_cfg_many_partitions = + ConnectorConfig::from_file("test-data/connectors/many-partitions.yaml") + .expect("Failed to load test config"); + + //then + assert_eq!( + with_one_partition.partition, + ConsumerPartitionConfig::One(1) + ); + + assert_eq!( + with_multiple_partitions.partition, + ConsumerPartitionConfig::Many(vec![120, 230]) + ); + + assert_eq!(with_all_partitions.partition, ConsumerPartitionConfig::All); + + assert_eq!( + connector_cfg_all_partitions + .meta() + .consumer() + .unwrap() + .partition, + ConsumerPartitionConfig::All + ); + + assert_eq!( + connector_cfg_many_partitions + .meta() + .consumer() + .unwrap() + .partition, + ConsumerPartitionConfig::Many(vec![0, 1]) + ); + } + + #[test] + fn test_ser_partition_config() { + //given + let one = ConsumerParameters { + partition: ConsumerPartitionConfig::One(1), + max_bytes: Default::default(), + }; + let many = ConsumerParameters { + partition: ConsumerPartitionConfig::Many(vec![2, 3]), + max_bytes: Default::default(), + }; + + let all = ConsumerParameters { + partition: ConsumerPartitionConfig::All, + max_bytes: Default::default(), + }; + + //when + let one_ser = serde_yaml::to_string(&one).expect("one"); + let many_ser = serde_yaml::to_string(&many).expect("many"); + let all_ser = serde_yaml::to_string(&all).expect("all"); + + //then + assert_eq!(one_ser, "partition: 1\n"); + assert_eq!(many_ser, "partition:\n- 2\n- 3\n"); + assert_eq!(all_ser, "partition: all\n"); + } } diff --git a/crates/fluvio-connector-package/test-data/connectors/all-partitions.yaml b/crates/fluvio-connector-package/test-data/connectors/all-partitions.yaml new file mode 100644 index 0000000000..922eb450cf --- /dev/null +++ b/crates/fluvio-connector-package/test-data/connectors/all-partitions.yaml @@ -0,0 +1,12 @@ +apiVersion: 0.2.0 +meta: + version: 0.1.0 + name: my-test-mqtt + type: mqtt + topic: + version: 0.1.0 + meta: + name: test-topic + consumer: + partition: all + max_bytes: "1 MB" diff --git a/crates/fluvio-connector-package/test-data/connectors/many-partitions.yaml b/crates/fluvio-connector-package/test-data/connectors/many-partitions.yaml new file mode 100644 index 0000000000..dab67916e3 --- /dev/null +++ b/crates/fluvio-connector-package/test-data/connectors/many-partitions.yaml @@ -0,0 +1,14 @@ +apiVersion: 0.2.0 +meta: + version: 0.1.0 + name: my-test-mqtt + type: mqtt + topic: + version: 0.1.0 + meta: + name: test-topic + consumer: + partition: + - 0 + - 1 + max_bytes: "1 MB" diff --git a/tests/cli/cdk_smoke_tests/cdk-multi-partition-consumer.bats b/tests/cli/cdk_smoke_tests/cdk-multi-partition-consumer.bats new file mode 100644 index 0000000000..c067c277f9 --- /dev/null +++ b/tests/cli/cdk_smoke_tests/cdk-multi-partition-consumer.bats @@ -0,0 +1,162 @@ +#!/usr/bin/env bats + +SKIP_CLUSTER_START=true +export SKIP_CLUSTER_START + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + PROJECT_NAME_PREFIX="$(random_string)" + export PROJECT_NAME_PREFIX + TEST_DIR="$(mktemp -d -t cdk-consumer-test.XXXXX)" + export TEST_DIR + + CONNECTOR_DIR="$(pwd)/connector/sink-test-connector" + export CONNECTOR_DIR + LOG_PATH="$CONNECTOR_DIR/sink-test-connector.log" + export LOG_PATH + +} + +setup() { + rm $LOG_PATH | true +} + +@test "Topic with 2 partitions. Consumer reads one partition" { + # Prepare config + TOPIC_NAME=$(random_string) + debug_msg "Topic name: $TOPIC_NAME" + CONFIG_PATH="$TEST_DIR/$TOPIC_NAME.yaml" + cat <$CONFIG_PATH +apiVersion: 0.2.0 +meta: + version: 0.1.0 + name: $TOPIC_NAME + type: test-sink + topic: + meta: + name: $TOPIC_NAME + partition: + count: 2 + consumer: + partition: 1 +custom: + api_key: api_key + client_id: client_id +EOF + # Test + cd $CONNECTOR_DIR + run $CDK_BIN deploy --target x86_64-unknown-linux-gnu start --config $CONFIG_PATH --log-level info + assert_success + assert_output --partial "Connector runs with process id" + + wait_for_line_in_file "succesfully created" $LOG_PATH 30 + wait_for_line_in_file "monitoring started" $LOG_PATH 30 + + echo 1:1 | fluvio produce $TOPIC_NAME --key-separator ":" + echo 4:4 | fluvio produce $TOPIC_NAME --key-separator ":" + + wait_for_line_in_file "Received record: 4" $LOG_PATH 30 + + run cat $LOG_PATH + + refute_output --partial 'Received record: 1' + + run $CDK_BIN deploy shutdown --name $TOPIC_NAME + assert_success +} + +@test "Topic with 2 partitions. Consumer reads all partitions" { + # Prepare config + TOPIC_NAME=$(random_string) + debug_msg "Topic name: $TOPIC_NAME" + CONFIG_PATH="$TEST_DIR/$TOPIC_NAME.yaml" + cat <$CONFIG_PATH +apiVersion: 0.2.0 +meta: + version: 0.1.0 + name: $TOPIC_NAME + type: test-sink + topic: + meta: + name: $TOPIC_NAME + partition: + count: 2 + consumer: + partition: all +custom: + api_key: api_key + client_id: client_id +EOF + # Test + cd $CONNECTOR_DIR + run $CDK_BIN deploy --target x86_64-unknown-linux-gnu start --config $CONFIG_PATH --log-level info + assert_success + assert_output --partial "Connector runs with process id" + + wait_for_line_in_file "succesfully created" $LOG_PATH 30 + wait_for_line_in_file "monitoring started" $LOG_PATH 30 + + echo 1:1 | fluvio produce $TOPIC_NAME --key-separator ":" + echo 4:4 | fluvio produce $TOPIC_NAME --key-separator ":" + + wait_for_line_in_file "Received record: 4" $LOG_PATH 30 + wait_for_line_in_file "Received record: 1" $LOG_PATH 2 + + run $CDK_BIN deploy shutdown --name $TOPIC_NAME + assert_success +} + +@test "Topic with 3 partitions. Consumer reads 2 partitions" { + # Prepare config + TOPIC_NAME=$(random_string) + debug_msg "Topic name: $TOPIC_NAME" + CONFIG_PATH="$TEST_DIR/$TOPIC_NAME.yaml" + cat <$CONFIG_PATH +apiVersion: 0.2.0 +meta: + version: 0.1.0 + name: $TOPIC_NAME + type: test-sink + topic: + meta: + name: $TOPIC_NAME + partition: + count: 3 + consumer: + partition: + - 1 + - 2 +custom: + api_key: api_key + client_id: client_id +EOF + # Test + cd $CONNECTOR_DIR + run $CDK_BIN deploy --target x86_64-unknown-linux-gnu start --config $CONFIG_PATH --log-level info + assert_success + assert_output --partial "Connector runs with process id" + + wait_for_line_in_file "succesfully created" $LOG_PATH 30 + wait_for_line_in_file "monitoring started" $LOG_PATH 30 + + echo 3:3 | fluvio produce $TOPIC_NAME --key-separator ":" + echo 1:1 | fluvio produce $TOPIC_NAME --key-separator ":" + echo 2:2 | fluvio produce $TOPIC_NAME --key-separator ":" + + wait_for_line_in_file "Received record: 2" $LOG_PATH 30 + wait_for_line_in_file "Received record: 1" $LOG_PATH 2 + + run cat $LOG_PATH + refute_output --partial 'Received record: 3' + + run $CDK_BIN deploy shutdown --name $TOPIC_NAME + assert_success +} + diff --git a/tests/cli/test_helper/tools_check.bash b/tests/cli/test_helper/tools_check.bash index ddf8448406..1903be529e 100644 --- a/tests/cli/test_helper/tools_check.bash +++ b/tests/cli/test_helper/tools_check.bash @@ -87,4 +87,27 @@ function check_timeout_bin() { fi } -main; \ No newline at end of file +function wait_for_line_in_file() { + LINE="$1" + FILE="$2" + MAX_SECONDS="${3:-30}" # 30 seconds default value + + echo "Waiting for file $FILE containing $LINE" + + ELAPSED=0; + until grep -q "$LINE" "$FILE" + do + sleep 1 + let ELAPSED=$ELAPSED+1 + if [[ $ELAPSED -ge MAX_SECONDS ]] + then + echo "timeout $MAX_SECONDS seconds elapsed" + exit 1 + fi + done + echo "Done waiting for file $FILE containing $LINE" +} + + + +main;