Skip to content

Commit

Permalink
feat: multi partition consumer in sink connectors (#3470)
Browse files Browse the repository at this point in the history
Added an ability to configure which partitions the connector should read.
Supported options:
1. String value `all` for consuming all partitions: 
```yaml
consumer:
  partition: all
```
2. List of partitions:
```yaml
consumer:
  partition: 
    - 2
    - 3
```
3. One partition (supported previously):
```yaml
consumer:
  partition: 1
```

Closes #3467
  • Loading branch information
Alexander Galibey committed Aug 16, 2023
1 parent 2c70ff8 commit f37f00c
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 28 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/fluvio-connector-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn get_value(value: Value, root: Option<&str>) -> Result<Value> {

#[cfg(test)]
mod tests {
use fluvio_connector_package::config::MetaConfigV1;
use fluvio_connector_package::config::{MetaConfigV1, ConsumerPartitionConfig};

#[test]
fn test_from_value() {
Expand All @@ -57,7 +57,7 @@ mod tests {
.consumer
.expect("expected some consume config")
.partition,
Some(0)
ConsumerPartitionConfig::One(0)
);
}
}
53 changes: 38 additions & 15 deletions crates/fluvio-connector-common/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use fluvio::{FluvioConfig, Fluvio};
use fluvio::{FluvioConfig, Fluvio, PartitionSelectionStrategy};
use fluvio::dataplane::record::ConsumerRecord;
use fluvio_connector_package::config::ConsumerPartitionConfig;
use fluvio_sc_schema::errors::ErrorCode;
use futures::StreamExt;
use crate::{config::ConnectorConfig, Result};
Expand All @@ -24,28 +25,50 @@ pub async fn consumer_stream_from_config(

let fluvio = Fluvio::connect_with_config(&cluster_config).await?;
ensure_topic_exists(config).await?;
let consumer = fluvio
.partition_consumer(
config.meta().topic(),
config
.meta()
.consumer()
.and_then(|c| c.partition)
.unwrap_or_default(),
)
.await?;

let mut builder = fluvio::ConsumerConfig::builder();

if let Some(max_bytes) = config.meta().consumer().and_then(|c| c.max_bytes) {
builder.max_bytes(max_bytes.as_u64() as i32);
}

if let Some(smartmodules) = smartmodule_vec_from_config(config) {
builder.smartmodule(smartmodules);
}
let config = builder.build()?;
let consumer_config = builder.build()?;
let consumer_partition = config
.meta()
.consumer()
.map(|c| c.partition.clone())
.unwrap_or_default();
let offset = fluvio::Offset::end();
let stream = consumer.stream_with_config(offset, config).await?;
let topic = config.meta().topic().to_string();
let stream = match consumer_partition {
ConsumerPartitionConfig::One(partition) => {
let consumer = fluvio.partition_consumer(topic, partition).await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
}
ConsumerPartitionConfig::All => {
let consumer = fluvio
.consumer(PartitionSelectionStrategy::All(topic))
.await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
}
ConsumerPartitionConfig::Many(partitions) => {
let partitions = partitions.into_iter().map(|p| (topic.clone(), p)).collect();
let consumer = fluvio
.consumer(PartitionSelectionStrategy::Multiple(partitions))
.await?;
consumer
.stream_with_config(offset, consumer_config)
.await?
.boxed()
}
};

Ok((fluvio, stream))
}
207 changes: 200 additions & 7 deletions crates/fluvio-connector-package/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use std::time::Duration;

use fluvio_controlplane_metadata::topic::config::TopicConfig;
use fluvio_types::PartitionId;
use serde::de::{Visitor, SeqAccess};
use serde::ser::SerializeSeq;
use tracing::debug;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Deserializer, Serializer};
use bytesize::ByteSize;

use fluvio_smartengine::transformation::TransformationConfig;
Expand Down Expand Up @@ -272,8 +274,8 @@ impl MetaConfig<'_> {

#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct ConsumerParameters {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub partition: Option<PartitionId>,
#[serde(default, skip_serializing_if = "ConsumerPartitionConfig::is_default")]
pub partition: ConsumerPartitionConfig,
#[serde(
with = "bytesize_serde",
skip_serializing_if = "Option::is_none",
Expand Down Expand Up @@ -386,6 +388,102 @@ impl Serialize for SecretName {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConsumerPartitionConfig {
All,
One(PartitionId),
Many(Vec<PartitionId>),
}

impl Default for ConsumerPartitionConfig {
fn default() -> Self {
Self::One(0)
}
}

impl ConsumerPartitionConfig {
pub fn is_default(&self) -> bool {
matches!(self, ConsumerPartitionConfig::One(partition) if partition.eq(&PartitionId::default()))
}
}

struct PartitionConfigVisitor;
impl<'de> Visitor<'de> for PartitionConfigVisitor {
type Value = ConsumerPartitionConfig;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("integer, sequence of integers or `all` string")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if v.eq("all") {
Ok(ConsumerPartitionConfig::All)
} else {
Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(v),
&self,
))
}
}

fn visit_u32<E>(self, v: u32) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(ConsumerPartitionConfig::One(v))
}

fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
let partition = PartitionId::try_from(v).map_err(E::custom)?;
Ok(ConsumerPartitionConfig::One(partition))
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut partitions = Vec::with_capacity(seq.size_hint().unwrap_or(2));
while let Some(next) = seq.next_element()? {
partitions.push(next);
}
Ok(ConsumerPartitionConfig::Many(partitions))
}
}

impl<'de> Deserialize<'de> for ConsumerPartitionConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(PartitionConfigVisitor)
}
}

impl Serialize for ConsumerPartitionConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
ConsumerPartitionConfig::All => serializer.serialize_str("all"),
ConsumerPartitionConfig::One(partition) => serializer.serialize_u32(*partition),
ConsumerPartitionConfig::Many(partitions) => {
let mut seq_serializer = serializer.serialize_seq(Some(partitions.len()))?;
for p in partitions {
seq_serializer.serialize_element(p)?;
}
seq_serializer.end()
}
}
}
}

impl ConnectorConfig {
pub fn from_file<P: Into<PathBuf>>(path: P) -> Result<Self> {
let mut file = File::open(path.into())?;
Expand Down Expand Up @@ -498,7 +596,7 @@ mod tests {
batch_size: Some(ByteSize::mb(44)),
}),
consumer: Some(ConsumerParameters {
partition: Some(10),
partition: ConsumerPartitionConfig::One(10),
max_bytes: Some(ByteSize::mb(1)),
}),
secrets: Some(vec![SecretConfig {
Expand Down Expand Up @@ -577,7 +675,7 @@ mod tests {
batch_size: Some(ByteSize::mb(44)),
}),
consumer: Some(ConsumerParameters {
partition: Some(10),
partition: ConsumerPartitionConfig::One(10),
max_bytes: Some(ByteSize::mb(1)),
}),
secrets: Some(vec![SecretConfig {
Expand Down Expand Up @@ -784,7 +882,7 @@ mod tests {
}),
consumer: Some(ConsumerParameters {
max_bytes: Some(ByteSize::b(1400)),
partition: None,
partition: Default::default(),
}),
secrets: None,
},
Expand Down Expand Up @@ -993,7 +1091,7 @@ mod tests {
}),
consumer: Some(ConsumerParameters {
max_bytes: Some(ByteSize::b(1400)),
partition: None,
partition: Default::default(),
}),
secrets: None,
},
Expand All @@ -1004,4 +1102,99 @@ mod tests {
assert_eq!(have.version(), "0.1.0");
assert_eq!(have.r#type(), "mqtt-source");
}

#[test]
fn test_deser_partition_config() {
//when

let with_one_partition: ConsumerParameters = serde_yaml::from_str(
r#"
partition: 1
"#,
)
.expect("one partition");

let with_multiple_partitions: ConsumerParameters = serde_yaml::from_str(
r#"
partition:
- 120
- 230
"#,
)
.expect("sequence of partitions");

let with_all_partitions: ConsumerParameters = serde_yaml::from_str(
r#"
partition: all
"#,
)
.expect("all partitions");

let connector_cfg_all_partitions =
ConnectorConfig::from_file("test-data/connectors/all-partitions.yaml")
.expect("Failed to load test config");

let connector_cfg_many_partitions =
ConnectorConfig::from_file("test-data/connectors/many-partitions.yaml")
.expect("Failed to load test config");

//then
assert_eq!(
with_one_partition.partition,
ConsumerPartitionConfig::One(1)
);

assert_eq!(
with_multiple_partitions.partition,
ConsumerPartitionConfig::Many(vec![120, 230])
);

assert_eq!(with_all_partitions.partition, ConsumerPartitionConfig::All);

assert_eq!(
connector_cfg_all_partitions
.meta()
.consumer()
.unwrap()
.partition,
ConsumerPartitionConfig::All
);

assert_eq!(
connector_cfg_many_partitions
.meta()
.consumer()
.unwrap()
.partition,
ConsumerPartitionConfig::Many(vec![0, 1])
);
}

#[test]
fn test_ser_partition_config() {
//given
let one = ConsumerParameters {
partition: ConsumerPartitionConfig::One(1),
max_bytes: Default::default(),
};
let many = ConsumerParameters {
partition: ConsumerPartitionConfig::Many(vec![2, 3]),
max_bytes: Default::default(),
};

let all = ConsumerParameters {
partition: ConsumerPartitionConfig::All,
max_bytes: Default::default(),
};

//when
let one_ser = serde_yaml::to_string(&one).expect("one");
let many_ser = serde_yaml::to_string(&many).expect("many");
let all_ser = serde_yaml::to_string(&all).expect("all");

//then
assert_eq!(one_ser, "partition: 1\n");
assert_eq!(many_ser, "partition:\n- 2\n- 3\n");
assert_eq!(all_ser, "partition: all\n");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: 0.2.0
meta:
version: 0.1.0
name: my-test-mqtt
type: mqtt
topic:
version: 0.1.0
meta:
name: test-topic
consumer:
partition: all
max_bytes: "1 MB"
Loading

0 comments on commit f37f00c

Please sign in to comment.