Skip to content

Commit

Permalink
feat(BE): allow to start consuming from different points
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci committed Oct 2, 2022
1 parent 5534ab0 commit dac0dbd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
17 changes: 15 additions & 2 deletions src-tauri/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use self::{
state::{ ConsumerInfo, KafkaRecord, push_record },
parser::{ parse_record },
notification::notify_error,
setup_consumer::ConsumerConfig,
setup_consumer::{ ConsumerConfig, ConsumeFrom },
};

#[tauri::command]
Expand Down Expand Up @@ -65,8 +65,21 @@ pub fn start_consumer(
Some(Ok(msg)) =>
match parse_record(msg.detach()) {
Ok(record) => {
if
let ConsumeFrom::Custom {
start_timestamp: _,
stop_timestamp: Some(stop_timestamp),
} = config.from
{
if let Some(current_timestamp) = record.timestamp {
if stop_timestamp < current_timestamp {
// skip push_record into the consumer record_state
//todo: disable consumption for the current partition
continue;
}
}
}
push_record(record, records_state.clone(), &consumer_info);
//notification::notify_records_count(len, &app, &consumer_info);
}
Err(err) => {
notify_error(err, &app);
Expand Down
56 changes: 52 additions & 4 deletions src-tauri/src/kafka/consumer/setup_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
use rdkafka::{ TopicPartitionList, consumer::{ Consumer, StreamConsumer } };
use std::time::Duration;

use rdkafka::{ TopicPartitionList, consumer::{ Consumer, StreamConsumer }, Offset };
use serde::{ Serialize, Deserialize };

use super::{ create_consumer };
use crate::{ error::{ Result, TauriError }, configuration::Cluster, kafka::admin::{ list_topic_internal } };

#[derive(Serialize, Deserialize, Debug)]
pub enum ConsumeFrom {
Beginning,
End,
Custom {
start_timestamp: i64, //time in ms
stop_timestamp: Option<i64>, //time in ms
},
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ConsumerConfig {
pub cluster: Cluster,
pub topic: String,
pub from: ConsumeFrom,
}

pub(super) fn setup_consumer(config: &ConsumerConfig) -> Result<StreamConsumer> {
Expand All @@ -20,9 +33,44 @@ pub(super) fn setup_consumer(config: &ConsumerConfig) -> Result<StreamConsumer>
})?;
let mut assignment = TopicPartitionList::new();

// set the offset for each specified partition
for p in topic_info.partitions.iter() {
assignment.add_partition_offset(&config.topic, p.id, rdkafka::Offset::Offset(0))?;
topic_info.partitions.iter().for_each(|p| {
assignment.add_partition(&config.topic, p.id);
});
println!("Assigning");
match config.from {
ConsumeFrom::Beginning => {
topic_info.partitions.iter().for_each(|p| {
assignment
.set_partition_offset(&config.topic, p.id, Offset::Beginning)
.expect("Unable to configure the consumer to Beginning");
});
}
ConsumeFrom::End =>
topic_info.partitions.iter().for_each(|p| {
assignment
.set_partition_offset(&config.topic, p.id, Offset::End)
.expect("Unable to configure the consumer to End");
}),
ConsumeFrom::Custom { start_timestamp, stop_timestamp: _ } => {
// note: the offsets_for_times function takes a TopicPartitionList in which the
// offset is the timestamp in ms (instead of the actual offset) and returns a
// new TopicPartitionList with the actual offset
let mut timestamp_assignment = TopicPartitionList::new();
topic_info.partitions.iter().for_each(|p| {
timestamp_assignment
.add_partition_offset(&config.topic, p.id, Offset::Offset(start_timestamp))
.expect("Unable to configure the consumer to End");
});
consumer
.offsets_for_times(timestamp_assignment, Duration::from_secs(10))?
.elements()
.iter()
.for_each(|tp| {
assignment
.set_partition_offset(tp.topic(), tp.partition(), tp.offset())
.expect("Unable to configure the consumer to starting offset");
});
}
}
consumer.assign(&assignment)?;
Ok(consumer)
Expand Down
4 changes: 3 additions & 1 deletion src/tauri.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export const stopConsumer = (clusterId: string, topic: string): Promise<void> =>
);

export const startConsumer = (cluster: Cluster, topic: string): Promise<void> =>
invoke<void>("start_consumer", { config: { cluster, topic } }).catch((err: TauriError) =>
invoke<void>("start_consumer", {
config: { cluster, topic, from: { Custom: { start_timestamp: 0 } } },
}).catch((err: TauriError) =>
addNotification({ type: "error", title: "Start Kafka record", description: format(err) })
);

0 comments on commit dac0dbd

Please sign in to comment.