From 456c7050022fef33c5d5e41e464fea12dd51054f Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 18 Dec 2024 14:29:51 -0500 Subject: [PATCH] dekaf: Fix endpoint config parsing This got left out of #1793 and is needed for Dekaf materialization endpoints --- crates/dekaf/src/connector.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index e945966814..827c0fb620 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -1,5 +1,5 @@ use anyhow::{bail, Context}; -use proto_flow::materialize; +use proto_flow::{flow::materialization_spec, materialize}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -38,6 +38,7 @@ pub struct DekafConfig { /// tombstones with null values, and "Header" emits then as a kafka document /// with empty string and `_is_deleted` header set to `1`. Setting this value /// will also cause all other non-deletions to have an `_is_deleted` header of `0`. + #[serde(default)] pub deletions: DeletionMode, } @@ -70,8 +71,22 @@ pub async fn unary_materialize( ) -> anyhow::Result { use proto_flow::materialize::response::validated; if let Some(mut validate) = request.validate { - serde_json::de::from_str::(&validate.config_json) - .context("validating endpoint config")?; + match materialization_spec::ConnectorType::try_from(validate.connector_type)? { + materialization_spec::ConnectorType::Dekaf => {} + other => bail!("invalid connector type: {}", other.as_str_name()), + }; + + let parsed_outer_config = + serde_json::from_str::(&validate.config_json) + .context("validating dekaf config")?; + + let _parsed_inner_config = serde_json::from_value::( + parsed_outer_config.config.to_value(), + ) + .context(format!( + "validating dekaf endpoint config for variant {}", + parsed_outer_config.variant + ))?; // Largely copied from crates/validation/src/noop.rs let validated_bindings = std::mem::take(&mut validate.bindings)