Skip to content

Commit

Permalink
dekaf: Small style cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 6, 2025
1 parent fdcae6f commit d1c933d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 33 deletions.
3 changes: 1 addition & 2 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ impl KafkaApiClient {
)
.await?;

let (coord_host, coord_port) = if resp.coordinators.len() > 0 {
let coord = resp.coordinators.get(0).expect("already checked length");
let (coord_host, coord_port) = if let Some(coord) = resp.coordinators.first() {
(coord.host.as_str(), coord.port)
} else {
(resp.host.as_str(), resp.port)
Expand Down
68 changes: 37 additions & 31 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ impl Session {
let topics = collections
.into_iter()
.map(|name| {
MetadataResponseTopic::default()
.with_name(Some(self.encode_topic_name(name)))
Ok(MetadataResponseTopic::default()
.with_name(Some(self.encode_topic_name(name)?))
.with_is_internal(false)
.with_partitions(vec![MetadataResponsePartition::default()
.with_partition_index(0)
.with_leader_id(0.into())])
.with_leader_id(0.into())]))
})
.collect();
.collect::<anyhow::Result<_>>()?;

Ok(topics)
}
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Session {
let Some(collection) = maybe_collection else {
topics.push(
MetadataResponseTopic::default()
.with_name(Some(self.encode_topic_name(name.to_string())))
.with_name(Some(self.encode_topic_name(name.to_string())?))
.with_error_code(ResponseError::UnknownTopicOrPartition.code()),
);
continue;
Expand Down Expand Up @@ -825,11 +825,12 @@ impl Session {
consumer_protocol_subscription_msg
.topics
.iter_mut()
.for_each(|topic| {
let transformed = self.encrypt_topic_name(topic.to_owned().into()).into();
.try_for_each(|topic| {
let transformed = self.encrypt_topic_name(topic.to_owned().into())?.into();
tracing::info!(topic_name = ?topic, "Request to join group");
*topic = transformed;
});
Ok::<(), anyhow::Error>(())
})?;

let mut new_protocol_subscription = BytesMut::new();

Expand Down Expand Up @@ -874,7 +875,10 @@ impl Session {
consumer_protocol_subscription_msg
.topics
.iter_mut()
.for_each(|topic| *topic = self.decrypt_topic_name(topic.to_owned().into()).into());
.try_for_each(|topic| {
*topic = self.decrypt_topic_name(topic.to_owned().into())?.into();
Ok::<(), anyhow::Error>(())
})?;

let mut new_protocol_subscription = BytesMut::new();

Expand Down Expand Up @@ -971,11 +975,11 @@ impl Session {
.assigned_partitions
.into_iter()
.map(|part| {
let transformed_topic = self.encrypt_topic_name(part.topic.to_owned());
let transformed_topic = self.encrypt_topic_name(part.topic.to_owned())?;
tracing::info!(topic_name = ?part.topic, "Syncing group");
part.with_topic(transformed_topic)
Ok(part.with_topic(transformed_topic))
})
.collect();
.collect::<anyhow::Result<_>>()?;

let mut new_protocol_assignment = BytesMut::new();

Expand Down Expand Up @@ -1015,10 +1019,10 @@ impl Session {
.assigned_partitions
.into_iter()
.map(|part| {
let transformed_topic = self.decrypt_topic_name(part.topic.to_owned());
part.with_topic(transformed_topic)
let transformed_topic = self.decrypt_topic_name(part.topic.to_owned())?;
Ok(part.with_topic(transformed_topic))
})
.collect();
.collect::<anyhow::Result<_>>()?;

let mut new_protocol_assignment = BytesMut::new();

Expand Down Expand Up @@ -1067,7 +1071,7 @@ impl Session {
) -> anyhow::Result<messages::OffsetCommitResponse> {
let mut mutated_req = req.clone();
for topic in &mut mutated_req.topics {
let encrypted = self.encrypt_topic_name(topic.name.clone());
let encrypted = self.encrypt_topic_name(topic.name.clone())?;
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, "Committing offset");
topic.name = encrypted;
}
Expand Down Expand Up @@ -1101,7 +1105,7 @@ impl Session {
let auth = self.auth.as_ref().unwrap();

for topic in resp.topics.iter_mut() {
topic.name = self.decrypt_topic_name(topic.name.to_owned());
topic.name = self.decrypt_topic_name(topic.name.to_owned())?;

let collection_partitions = Collection::new(
&self.app,
Expand Down Expand Up @@ -1157,7 +1161,7 @@ impl Session {
let mut mutated_req = req.clone();
if let Some(ref mut topics) = mutated_req.topics {
for topic in topics {
topic.name = self.encrypt_topic_name(topic.name.clone());
topic.name = self.encrypt_topic_name(topic.name.clone())?;
}
}

Expand All @@ -1175,7 +1179,7 @@ impl Session {
let mut resp = client.send_request(mutated_req, Some(header)).await?;

for topic in resp.topics.iter_mut() {
topic.name = self.decrypt_topic_name(topic.name.to_owned());
topic.name = self.decrypt_topic_name(topic.name.to_owned())?;
}

Ok(resp)
Expand Down Expand Up @@ -1245,35 +1249,37 @@ impl Session {
Ok(res)
}

fn encrypt_topic_name(&self, name: TopicName) -> TopicName {
to_upstream_topic_name(
fn encrypt_topic_name(&self, name: TopicName) -> anyhow::Result<TopicName> {
Ok(to_upstream_topic_name(
name,
self.secret.to_owned(),
match self.auth.as_ref().expect("Must be authenticated") {
match self.auth.as_ref().context("Must be authenticated")? {
SessionAuthentication::User(auth) => auth.claims.sub.to_string(),
SessionAuthentication::Task(auth) => auth.config.token.to_string(),
},
)
))
}
fn decrypt_topic_name(&self, name: TopicName) -> TopicName {
from_upstream_topic_name(
fn decrypt_topic_name(&self, name: TopicName) -> anyhow::Result<TopicName> {
Ok(from_upstream_topic_name(
name,
self.secret.to_owned(),
match self.auth.as_ref().expect("Must be authenticated") {
match self.auth.as_ref().context("Must be authenticated")? {
SessionAuthentication::User(auth) => auth.claims.sub.to_string(),
SessionAuthentication::Task(auth) => auth.config.token.to_string(),
},
)
))
}

fn encode_topic_name(&self, name: String) -> TopicName {
if match self.auth.as_ref().expect("Must be authenticated") {
fn encode_topic_name(&self, name: String) -> anyhow::Result<TopicName> {
if match self.auth.as_ref().context("Must be authenticated")? {
SessionAuthentication::User(auth) => auth.config.strict_topic_names,
SessionAuthentication::Task(auth) => auth.config.strict_topic_names,
} {
to_downstream_topic_name(TopicName(StrBytes::from_string(name)))
Ok(to_downstream_topic_name(TopicName(StrBytes::from_string(
name,
))))
} else {
TopicName(StrBytes::from_string(name))
Ok(TopicName(StrBytes::from_string(name)))
}
}

Expand Down

0 comments on commit d1c933d

Please sign in to comment.