diff --git a/src-tauri/src/kafka/consumer.rs b/src-tauri/src/kafka/consumer.rs index cae5e13e..a51b4e13 100644 --- a/src-tauri/src/kafka/consumer.rs +++ b/src-tauri/src/kafka/consumer.rs @@ -17,7 +17,7 @@ use self::{ state::{ ConsumerInfo, KafkaRecord, push_record }, parser::{ parse_record }, notification::notify_error, - setup_consumer::ConsumerConfig, + setup_consumer::{ ConsumerConfig, ConsumeFrom }, }; #[tauri::command] @@ -65,8 +65,21 @@ pub fn start_consumer( Some(Ok(msg)) => match parse_record(msg.detach()) { Ok(record) => { + if + let ConsumeFrom::Custom { + start_timestamp: _, + stop_timestamp: Some(stop_timestamp), + } = config.from + { + if let Some(current_timestamp) = record.timestamp { + if stop_timestamp < current_timestamp { + // skip push_record into the consumer record_state + //todo: disable consumption for the current partition + continue; + } + } + } push_record(record, records_state.clone(), &consumer_info); - //notification::notify_records_count(len, &app, &consumer_info); } Err(err) => { notify_error(err, &app); diff --git a/src-tauri/src/kafka/consumer/setup_consumer.rs b/src-tauri/src/kafka/consumer/setup_consumer.rs index 66a6a7d1..d6cc1940 100644 --- a/src-tauri/src/kafka/consumer/setup_consumer.rs +++ b/src-tauri/src/kafka/consumer/setup_consumer.rs @@ -1,13 +1,26 @@ -use rdkafka::{ TopicPartitionList, consumer::{ Consumer, StreamConsumer } }; +use std::time::Duration; + +use rdkafka::{ TopicPartitionList, consumer::{ Consumer, StreamConsumer }, Offset }; use serde::{ Serialize, Deserialize }; use super::{ create_consumer }; use crate::{ error::{ Result, TauriError }, configuration::Cluster, kafka::admin::{ list_topic_internal } }; +#[derive(Serialize, Deserialize, Debug)] +pub enum ConsumeFrom { + Beginning, + End, + Custom { + start_timestamp: i64, //time in ms + stop_timestamp: Option, //time in ms + }, +} + #[derive(Serialize, Deserialize, Debug)] pub struct ConsumerConfig { pub cluster: Cluster, pub topic: String, + pub from: ConsumeFrom, } pub(super) fn setup_consumer(config: &ConsumerConfig) -> Result { @@ -20,9 +33,44 @@ pub(super) fn setup_consumer(config: &ConsumerConfig) -> Result })?; let mut assignment = TopicPartitionList::new(); - // set the offset for each specified partition - for p in topic_info.partitions.iter() { - assignment.add_partition_offset(&config.topic, p.id, rdkafka::Offset::Offset(0))?; + topic_info.partitions.iter().for_each(|p| { + assignment.add_partition(&config.topic, p.id); + }); + println!("Assigning"); + match config.from { + ConsumeFrom::Beginning => { + topic_info.partitions.iter().for_each(|p| { + assignment + .set_partition_offset(&config.topic, p.id, Offset::Beginning) + .expect("Unable to configure the consumer to Beginning"); + }); + } + ConsumeFrom::End => + topic_info.partitions.iter().for_each(|p| { + assignment + .set_partition_offset(&config.topic, p.id, Offset::End) + .expect("Unable to configure the consumer to End"); + }), + ConsumeFrom::Custom { start_timestamp, stop_timestamp: _ } => { + // note: the offsets_for_times function takes a TopicPartitionList in which the + // offset is the timestamp in ms (instead of the actual offset) and returns a + // new TopicPartitionList with the actual offset + let mut timestamp_assignment = TopicPartitionList::new(); + topic_info.partitions.iter().for_each(|p| { + timestamp_assignment + .add_partition_offset(&config.topic, p.id, Offset::Offset(start_timestamp)) + .expect("Unable to configure the consumer to End"); + }); + consumer + .offsets_for_times(timestamp_assignment, Duration::from_secs(10))? + .elements() + .iter() + .for_each(|tp| { + assignment + .set_partition_offset(tp.topic(), tp.partition(), tp.offset()) + .expect("Unable to configure the consumer to starting offset"); + }); + } } consumer.assign(&assignment)?; Ok(consumer) diff --git a/src/tauri.ts b/src/tauri.ts index 6b62db5a..62cae3b2 100644 --- a/src/tauri.ts +++ b/src/tauri.ts @@ -75,6 +75,8 @@ export const stopConsumer = (clusterId: string, topic: string): Promise => ); export const startConsumer = (cluster: Cluster, topic: string): Promise => - invoke("start_consumer", { config: { cluster, topic } }).catch((err: TauriError) => + invoke("start_consumer", { + config: { cluster, topic, from: { Custom: { start_timestamp: 0 } } }, + }).catch((err: TauriError) => addNotification({ type: "error", title: "Start Kafka record", description: format(err) }) );