diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 03f5571d59..23085e64af 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -3,7 +3,10 @@ use crate::{ }; use anyhow::{anyhow, bail, Context}; use futures::{StreamExt, TryStreamExt}; -use gazette::{broker, journal, uuid}; +use gazette::{ + broker::{self, journal_spec}, + journal, uuid, +}; use models::RawValue; use proto_flow::flow; @@ -99,13 +102,23 @@ pub struct Partition { pub route: broker::Route, } -#[derive(Clone, Copy, Default, Debug)] +#[derive(Clone, Copy, Debug)] pub struct PartitionOffset { pub fragment_start: i64, pub offset: i64, pub mod_time: i64, } +impl Default for PartitionOffset { + fn default() -> Self { + Self { + mod_time: -1, // UNKNOWN_TIMESTAMP + fragment_start: 0, + offset: 0, + } + } +} + impl Collection { /// Build a Collection by fetching its spec, an authenticated data-plane access token, and its partitions. pub async fn new( @@ -305,60 +318,72 @@ impl Collection { let Some(partition) = self.partitions.get(partition_index) else { return Ok(None); }; - let (not_before_sec, _) = self.not_before.to_unix(); - let begin_mod_time = if timestamp_millis == -1 { - i64::MAX // Sentinel for "largest available offset", - } else if timestamp_millis == -2 { - 0 // Sentinel for "first available offset" - } else { - let timestamp = timestamp_millis / 1_000; - if timestamp < not_before_sec as i64 { - not_before_sec as i64 - } else { - timestamp as i64 + match partition.spec.suspend { + Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => { + return Ok(Some(PartitionOffset { + fragment_start: suspend.offset, + offset: suspend.offset, + mod_time: -1, //UNKNOWN_TIMESTAMP + })); } - }; + _ => { + let (not_before_sec, _) = self.not_before.to_unix(); - let request = broker::FragmentsRequest { - journal: partition.spec.name.clone(), - begin_mod_time, - page_limit: 1, - ..Default::default() - }; - let response = self.journal_client.list_fragments(request).await?; - - let offset_data = match response.fragments.get(0) { - Some(broker::fragments_response::Fragment { - spec: Some(spec), .. - }) => { - if timestamp_millis == -1 { - PartitionOffset { - fragment_start: spec.begin, - // Subtract one to reflect the largest fetch-able offset of the fragment. - offset: spec.end - 1, - mod_time: spec.mod_time, - } + let begin_mod_time = if timestamp_millis == -1 { + i64::MAX // Sentinel for "largest available offset", + } else if timestamp_millis == -2 { + 0 // Sentinel for "first available offset" } else { - PartitionOffset { - fragment_start: spec.begin, - offset: spec.begin, - mod_time: spec.mod_time, + let timestamp = timestamp_millis / 1_000; + if timestamp < not_before_sec as i64 { + not_before_sec as i64 + } else { + timestamp as i64 + } + }; + + let request = broker::FragmentsRequest { + journal: partition.spec.name.clone(), + begin_mod_time, + page_limit: 1, + ..Default::default() + }; + let response = self.journal_client.list_fragments(request).await?; + + let offset_data = match response.fragments.get(0) { + Some(broker::fragments_response::Fragment { + spec: Some(spec), .. + }) => { + if timestamp_millis == -1 { + PartitionOffset { + fragment_start: spec.begin, + // Subtract one to reflect the largest fetch-able offset of the fragment. + offset: spec.end - 1, + mod_time: spec.mod_time, + } + } else { + PartitionOffset { + fragment_start: spec.begin, + offset: spec.begin, + mod_time: spec.mod_time, + } + } } - } + _ => PartitionOffset::default(), + }; + + tracing::debug!( + collection = self.spec.name, + ?offset_data, + partition_index, + timestamp_millis, + "fetched offset" + ); + + return Ok(Some(offset_data)); } - _ => PartitionOffset::default(), - }; - - tracing::debug!( - collection = self.spec.name, - ?offset_data, - partition_index, - timestamp_millis, - "fetched offset" - ); - - Ok(Some(offset_data)) + } } /// Build a journal client by resolving the collections data-plane gateway and an access token.