Skip to content

Commit

Permalink
fixup! dekaf: Implement capturing and appending stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 10, 2025
1 parent d038c06 commit a118c5e
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{dekaf_shard_template_id, topology::fetch_dekaf_task_auth, App};
use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use flow_client::fetch_task_authorization;
use futures::{StreamExt, TryStreamExt};
use gazette::{
Expand Down Expand Up @@ -289,6 +289,7 @@ const WELL_KNOWN_LOG_FIELDS: &'static [&'static str] = &[
SESSION_CLIENT_ID_FIELD_MARKER,
];
pub const LOG_MESSAGE_QUEUE_SIZE: usize = 50;
pub const LOG_BUFFER_SIZE: usize = 2 ^ 22; // 2mb

impl<W: TaskWriter + 'static> TaskForwarder<W> {
pub fn new(producer: Producer, writer: W) -> Self {
Expand Down Expand Up @@ -347,11 +348,29 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
)
.chain(ReceiverStream::new(logs_rx));

let mut log_buffer = BytesMut::with_capacity(LOG_BUFFER_SIZE);

// TODO(jshearer): Do we want to make this configurable?
let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(30));

loop {
tokio::select! {
biased;

// Always start another append as soon as possible after the previous one completes.
// This way we both have more prompt log delivery, and only need to buffer logs
// for at most the duration of a single append round-trip.
resp = writer.append_logs(log_buffer.split().freeze()), if log_buffer.len() > 0 => {
resp?
},
_ = stats_interval.tick() => {
// Take current stats and write if non-zero
if let Some(current_stats) = stats.take(){
let data = Self::serialize_stats(uuid_producer, current_stats, task_name.to_owned());
writer.append_stats(data.into()).await?;
}
}

msg = event_stream.next() => {
match msg {
Some(TaskWriterMessage::SetTaskName(new_name)) => {
Expand All @@ -369,9 +388,13 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
}
}

writer
.append_logs(Self::serialize_log(uuid_producer, log, task_name.to_owned()).into())
.await?;
let serialized = Self::serialize_log(uuid_producer, log, task_name.to_owned());

if (log_buffer.len() + serialized.len()) < LOG_BUFFER_SIZE {
log_buffer.extend_from_slice(serialized.as_slice());
} else {
tracing::error!(task_name, "Log buffer full, dropping log on the ground. Are we unable to send logs?");
}
}
Some(TaskWriterMessage::Stats((collection_name, new_stats))) => {
stats.add(collection_name, new_stats);
Expand All @@ -380,17 +403,13 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
None => break,
}
},
_ = stats_interval.tick() => {
// Take current stats and write if non-zero
if let Some(current_stats) = stats.take(){
let data = Self::serialize_stats(uuid_producer, current_stats, task_name.to_owned());
writer.append_stats(data.into()).await?;
}
}
}
}

// Flush any remaining stats after stream ends
// Flush any remaining logs and stats after stream ends
if log_buffer.len() > 0 {
writer.append_logs(log_buffer.freeze()).await?;
}
if let Some(remaining_stats) = stats.take() {
let data = Self::serialize_stats(uuid_producer, remaining_stats, task_name);
writer.append_stats(data.into()).await?;
Expand Down

0 comments on commit a118c5e

Please sign in to comment.