Skip to content

Commit

Permalink
dekaf: Add support for fully suspended journals
Browse files Browse the repository at this point in the history
We can still fetch suspended journals with a regular `ListRequest`. This will return journal specs which contain a `suspend` field. If `journal.spec.suspend.level` is `FULL`, it's not possible to read from that journal. So we need to:
* Report both low and high-watermarks as `journal.spec.suspend.offset`
* Serve empty resultsets for any read against this partition
  • Loading branch information
jshearer committed Feb 7, 2025
1 parent 5ccab76 commit 966303f
Showing 1 changed file with 75 additions and 50 deletions.
125 changes: 75 additions & 50 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::{
};
use anyhow::{anyhow, bail, Context};
use futures::{StreamExt, TryStreamExt};
use gazette::{broker, journal, uuid};
use gazette::{
broker::{self, journal_spec},
journal, uuid,
};
use models::RawValue;
use proto_flow::flow;

Expand Down Expand Up @@ -99,13 +102,23 @@ pub struct Partition {
pub route: broker::Route,
}

#[derive(Clone, Copy, Default, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct PartitionOffset {
pub fragment_start: i64,
pub offset: i64,
pub mod_time: i64,
}

impl Default for PartitionOffset {
fn default() -> Self {
Self {
mod_time: -1, // UNKNOWN_TIMESTAMP
fragment_start: 0,
offset: 0,
}
}
}

impl Collection {
/// Build a Collection by fetching its spec, an authenticated data-plane access token, and its partitions.
pub async fn new(
Expand Down Expand Up @@ -305,60 +318,72 @@ impl Collection {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};
let (not_before_sec, _) = self.not_before.to_unix();

let begin_mod_time = if timestamp_millis == -1 {
i64::MAX // Sentinel for "largest available offset",
} else if timestamp_millis == -2 {
0 // Sentinel for "first available offset"
} else {
let timestamp = timestamp_millis / 1_000;
if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
timestamp as i64
match partition.spec.suspend {
Some(suspend) if suspend.level == journal_spec::suspend::Level::Full as i32 => {
return Ok(Some(PartitionOffset {
fragment_start: suspend.offset,
offset: suspend.offset,
mod_time: -1, //UNKNOWN_TIMESTAMP
}));
}
};
_ => {
let (not_before_sec, _) = self.not_before.to_unix();

let request = broker::FragmentsRequest {
journal: partition.spec.name.clone(),
begin_mod_time,
page_limit: 1,
..Default::default()
};
let response = self.journal_client.list_fragments(request).await?;

let offset_data = match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
PartitionOffset {
fragment_start: spec.begin,
// Subtract one to reflect the largest fetch-able offset of the fragment.
offset: spec.end - 1,
mod_time: spec.mod_time,
}
let begin_mod_time = if timestamp_millis == -1 {
i64::MAX // Sentinel for "largest available offset",
} else if timestamp_millis == -2 {
0 // Sentinel for "first available offset"
} else {
PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
let timestamp = timestamp_millis / 1_000;
if timestamp < not_before_sec as i64 {
not_before_sec as i64
} else {
timestamp as i64
}
};

let request = broker::FragmentsRequest {
journal: partition.spec.name.clone(),
begin_mod_time,
page_limit: 1,
..Default::default()
};
let response = self.journal_client.list_fragments(request).await?;

let offset_data = match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
PartitionOffset {
fragment_start: spec.begin,
// Subtract one to reflect the largest fetch-able offset of the fragment.
offset: spec.end - 1,
mod_time: spec.mod_time,
}
} else {
PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
}
}
}
}
_ => PartitionOffset::default(),
};

tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);

return Ok(Some(offset_data));
}
_ => PartitionOffset::default(),
};

tracing::debug!(
collection = self.spec.name,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);

Ok(Some(offset_data))
}
}

/// Build a journal client by resolving the collections data-plane gateway and an access token.
Expand Down

0 comments on commit 966303f

Please sign in to comment.