From a4a949ab8b4c8b33ecdcf94020162960302deb7e Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 3 Jan 2025 15:20:23 -0500 Subject: [PATCH 1/5] agent: Add Dekaf `connector_tags` support --- crates/agent/src/connector_tags.rs | 100 +++++++++++++++++++++++------ crates/dekaf/src/connector.rs | 34 ++++++++-- 2 files changed, 108 insertions(+), 26 deletions(-) diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index 9993ed2a29..dc63448975 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -83,6 +83,11 @@ impl Handler for TagHandler { /// connector_tags without having to push to a registry. pub const LOCAL_IMAGE_TAG: &str = ":local"; +/// Connectors with an image name starting with this value are Dekaf-type materializations and we should +/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf` +/// so that `Runtime` will invoke Dekaf's in-tree connector implementation +pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-"; + impl TagHandler { #[tracing::instrument(err, skip_all, fields(id=?row.tag_id))] async fn process( @@ -100,7 +105,8 @@ impl TagHandler { ); let image_composed = format!("{}{}", row.image_name, row.image_tag); - if row.image_tag != LOCAL_IMAGE_TAG { + if row.image_tag != LOCAL_IMAGE_TAG && !row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) + { // Pull the image. let pull = jobs::run( "pull", @@ -117,29 +123,40 @@ impl TagHandler { } } - let proto_type = match runtime::flow_runtime_protocol(&image_composed).await { - Ok(ct) => ct, - Err(err) => { - tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); - return Ok((row.tag_id, JobStatus::SpecFailed)); + let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + RuntimeProtocol::Materialize + } else { + match runtime::flow_runtime_protocol(&image_composed).await { + Ok(ct) => ct, + Err(err) => { + tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); + return Ok((row.tag_id, JobStatus::SpecFailed)); + } } }; let log_handler = logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token); - let runtime = Runtime::new( - self.allow_local, - self.connector_network.clone(), - log_handler, - None, // no need to change log level - "ops/connector-tags-job".to_string(), - ); - let spec_result = match proto_type { - RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, - RuntimeProtocol::Materialize => spec_materialization(&image_composed, runtime).await, - RuntimeProtocol::Derive => { - tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); - return Ok((row.tag_id, JobStatus::SpecFailed)); + let spec_result = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + spec_dekaf(&image_composed).await + } else { + let runtime = Runtime::new( + self.allow_local, + self.connector_network.clone(), + log_handler, + None, // no need to change log level + "ops/connector-tags-job".to_string(), + ); + + match proto_type { + RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, + RuntimeProtocol::Materialize => { + spec_materialization(&image_composed, runtime).await + } + RuntimeProtocol::Derive => { + tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); + return Ok((row.tag_id, JobStatus::SpecFailed)); + } } }; @@ -166,7 +183,9 @@ impl TagHandler { // Validate that there is an x-collection-name annotation in the resource config schema // of materialization connectors if proto_type == RuntimeProtocol::Materialize { - if let Err(err) = crate::resource_configs::pointer_for_schema(resource_config_schema.get()) { + if let Err(err) = + crate::resource_configs::pointer_for_schema(resource_config_schema.get()) + { tracing::warn!(image = %image_composed, error = %err, "resource schema does not have x-collection-name annotation"); return Ok((row.tag_id, JobStatus::SpecFailed)); } @@ -258,6 +277,47 @@ async fn spec_materialization( }) } +async fn spec_dekaf(image: &str) -> anyhow::Result { + use proto_flow::materialize; + + let req = materialize::Request { + spec: Some(materialize::request::Spec { + connector_type: flow::materialization_spec::ConnectorType::Image as i32, + config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) + .unwrap(), + }), + ..Default::default() + }; + + let spec = dekaf::connector::unary_materialize(req) + .await? + .spec + .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; + + let materialize::response::Spec { + protocol: _, + config_schema_json, + resource_config_schema_json, + documentation_url, + oauth2, + } = spec; + + let oauth = if let Some(oa) = oauth2 { + Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config")) + } else { + None + }; + Ok(ConnectorSpec { + documentation_url, + endpoint_config_schema: RawValue::from_string(config_schema_json) + .context("parsing endpoint config schema")?, + resource_config_schema: RawValue::from_string(resource_config_schema_json) + .context("parsing resource config schema")?, + resource_path_pointers: Vec::new(), + oauth2: oauth, + }) +} + async fn spec_capture( image: &str, runtime: Runtime, diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 827c0fb620..34fecc8b97 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -26,9 +26,6 @@ impl Default for DeletionMode { /// Configures the behavior of a whole dekaf task #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] pub struct DekafConfig { - /// Whether or not to expose topic names in a strictly Kafka-compliant format - /// for systems that require it. Off by default. - pub strict_topic_names: bool, /// The password that will authenticate Kafka consumers to this task. // TODO(jshearer): Uncomment when schemars 1.0 is out and we upgrade // #[schemars(extend("secret" = true))] @@ -39,7 +36,13 @@ pub struct DekafConfig { /// with empty string and `_is_deleted` header set to `1`. Setting this value /// will also cause all other non-deletions to have an `_is_deleted` header of `0`. #[serde(default)] + #[schemars(title = "Deletion Mode")] pub deletions: DeletionMode, + /// Whether or not to expose topic names in a strictly Kafka-compliant format + /// for systems that require it. Off by default. + #[serde(default)] + #[schemars(title = "Strict Topic Names")] + pub strict_topic_names: bool, } /// Configures a particular binding in a Dekaf-type materialization @@ -54,14 +57,17 @@ pub struct DekafResourceConfig { fn collection_name(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { serde_json::from_value(serde_json::json!({ "x-collection-name": true, + "type": "string" })) .unwrap() } fn token_secret(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { serde_json::from_value(serde_json::json!({ - "title": "Dekaf Auth Token", + "title": "Auth Token", "secret": true, + "type": "string", + "order": 0 })) .unwrap() } @@ -69,8 +75,24 @@ fn token_secret(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema:: pub async fn unary_materialize( request: materialize::Request, ) -> anyhow::Result { - use proto_flow::materialize::response::validated; - if let Some(mut validate) = request.validate { + if let Some(_) = request.spec { + let config_schema = schemars::schema_for!(DekafConfig); + let resource_schema = schemars::schema_for!(DekafResourceConfig); + + return Ok(materialize::Response { + spec: Some(materialize::response::Spec { + protocol: 3032023, + config_schema_json: serde_json::to_string(&config_schema)?, + resource_config_schema_json: serde_json::to_string(&resource_schema)?, + documentation_url: + "https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka" + .to_string(), + oauth2: None, + }), + ..Default::default() + }); + } else if let Some(mut validate) = request.validate { + use proto_flow::materialize::response::validated; match materialization_spec::ConnectorType::try_from(validate.connector_type)? { materialization_spec::ConnectorType::Dekaf => {} other => bail!("invalid connector type: {}", other.as_str_name()), From 466a716638767512d5516dd73b28478039400771 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 3 Jan 2025 18:00:49 -0500 Subject: [PATCH 2/5] runtime: Move `unseal` out of runtime into its own crate We need `runtime` to depend on `dekaf`, and they both need `unseal`, so it needed to be moved somewhere, and there didn't seem to be anywhere good to put it other than its own crate. --- Cargo.lock | 18 +++++++++++++++ crates/dekaf/Cargo.toml | 1 + crates/runtime/Cargo.toml | 1 + crates/runtime/src/capture/connector.rs | 3 ++- crates/runtime/src/derive/connector.rs | 3 ++- crates/runtime/src/lib.rs | 1 - crates/runtime/src/materialize/connector.rs | 3 ++- crates/unseal/Cargo.toml | 22 +++++++++++++++++++ .../src/unseal/mod.rs => unseal/src/lib.rs} | 0 .../src}/testdata/empty-input.json | 0 .../src}/testdata/hyphen-suffix.json | 0 .../src}/testdata/no-suffix.json | 0 .../src}/testdata/not-encrypted.json | 0 .../src}/testdata/under-suffix.json | 0 14 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 crates/unseal/Cargo.toml rename crates/{runtime/src/unseal/mod.rs => unseal/src/lib.rs} (100%) rename crates/{runtime/src/unseal => unseal/src}/testdata/empty-input.json (100%) rename crates/{runtime/src/unseal => unseal/src}/testdata/hyphen-suffix.json (100%) rename crates/{runtime/src/unseal => unseal/src}/testdata/no-suffix.json (100%) rename crates/{runtime/src/unseal => unseal/src}/testdata/not-encrypted.json (100%) rename crates/{runtime/src/unseal => unseal/src}/testdata/under-suffix.json (100%) diff --git a/Cargo.lock b/Cargo.lock index d419459a3d..f17efc967e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1990,6 +1990,7 @@ dependencies = [ "tracing", "tracing-subscriber", "typestate", + "unseal", "url", "webpki", ] @@ -5437,6 +5438,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tuple", + "unseal", "uuid 1.10.0", "xxhash-rust", "zeroize", @@ -7050,6 +7052,22 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unseal" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-process", + "futures", + "insta", + "locate-bin", + "models", + "serde", + "serde_json", + "tokio", + "zeroize", +] + [[package]] name = "untrusted" version = "0.9.0" diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index d9eda59926..66eb29e321 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -21,6 +21,7 @@ models = { path = "../models" } ops = { path = "../ops" } proto-flow = { path = "../proto-flow" } proto-gazette = { path = "../proto-gazette" } +unseal = { path = "../unseal" } simd-doc = { path = "../simd-doc" } aes-siv = { workspace = true } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 6116c3591a..2bbe9ec921 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -35,6 +35,7 @@ proto-grpc = { path = "../proto-grpc", features = [ ] } simd-doc = { path = "../simd-doc" } tuple = { path = "../tuple" } +unseal = { path = "../unseal" } anyhow = { workspace = true } bytes = { workspace = true } diff --git a/crates/runtime/src/capture/connector.rs b/crates/runtime/src/capture/connector.rs index 9ad16564a0..ff280b3622 100644 --- a/crates/runtime/src/capture/connector.rs +++ b/crates/runtime/src/capture/connector.rs @@ -1,10 +1,11 @@ -use crate::{unseal, verify, LogHandler, Runtime}; +use crate::{verify, LogHandler, Runtime}; use anyhow::Context; use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt}; use proto_flow::{ capture::{Request, Response}, flow::capture_spec::ConnectorType, }; +use unseal; // Start a capture connector as indicated by the `initial` Request. // Returns a pair of Streams for sending Requests and receiving Responses. diff --git a/crates/runtime/src/derive/connector.rs b/crates/runtime/src/derive/connector.rs index 212f4c5176..b6437bcee7 100644 --- a/crates/runtime/src/derive/connector.rs +++ b/crates/runtime/src/derive/connector.rs @@ -1,10 +1,11 @@ -use crate::{unseal, LogHandler, Runtime}; +use crate::{LogHandler, Runtime}; use anyhow::Context; use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt}; use proto_flow::{ derive::{Request, Response}, flow::collection_spec::derivation::ConnectorType, }; +use unseal; // Start a derivation connector as indicated by the `initial` Request. // Returns a pair of Streams for sending Requests and receiving Responses. diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 0352ec2c25..7761054807 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -14,7 +14,6 @@ mod rocksdb; mod task_service; mod tokio_context; mod unary; -mod unseal; pub mod uuid; pub use container::flow_runtime_protocol; diff --git a/crates/runtime/src/materialize/connector.rs b/crates/runtime/src/materialize/connector.rs index 6d1c215830..6261fdda5a 100644 --- a/crates/runtime/src/materialize/connector.rs +++ b/crates/runtime/src/materialize/connector.rs @@ -1,10 +1,11 @@ -use crate::{unseal, LogHandler, Runtime}; +use crate::{LogHandler, Runtime}; use anyhow::{bail, Context}; use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt}; use proto_flow::{ flow::materialization_spec::ConnectorType, materialize::{Request, Response}, }; +use unseal; // Start a materialization connector as indicated by the `initial` Request. // Returns a pair of Streams for sending Requests and receiving Responses. diff --git a/crates/unseal/Cargo.toml b/crates/unseal/Cargo.toml new file mode 100644 index 0000000000..4b1c885a97 --- /dev/null +++ b/crates/unseal/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "unseal" +version.workspace = true +rust-version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +async-process = { path = "../async-process" } +models = { path = "../models" } +locate-bin = { path = "../locate-bin" } + +anyhow = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +zeroize = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } +insta = { workspace = true } diff --git a/crates/runtime/src/unseal/mod.rs b/crates/unseal/src/lib.rs similarity index 100% rename from crates/runtime/src/unseal/mod.rs rename to crates/unseal/src/lib.rs diff --git a/crates/runtime/src/unseal/testdata/empty-input.json b/crates/unseal/src/testdata/empty-input.json similarity index 100% rename from crates/runtime/src/unseal/testdata/empty-input.json rename to crates/unseal/src/testdata/empty-input.json diff --git a/crates/runtime/src/unseal/testdata/hyphen-suffix.json b/crates/unseal/src/testdata/hyphen-suffix.json similarity index 100% rename from crates/runtime/src/unseal/testdata/hyphen-suffix.json rename to crates/unseal/src/testdata/hyphen-suffix.json diff --git a/crates/runtime/src/unseal/testdata/no-suffix.json b/crates/unseal/src/testdata/no-suffix.json similarity index 100% rename from crates/runtime/src/unseal/testdata/no-suffix.json rename to crates/unseal/src/testdata/no-suffix.json diff --git a/crates/runtime/src/unseal/testdata/not-encrypted.json b/crates/unseal/src/testdata/not-encrypted.json similarity index 100% rename from crates/runtime/src/unseal/testdata/not-encrypted.json rename to crates/unseal/src/testdata/not-encrypted.json diff --git a/crates/runtime/src/unseal/testdata/under-suffix.json b/crates/unseal/src/testdata/under-suffix.json similarity index 100% rename from crates/runtime/src/unseal/testdata/under-suffix.json rename to crates/unseal/src/testdata/under-suffix.json From e470818b53e20a486b113e169e23e1b5aa21ba8e Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 3 Jan 2025 18:05:01 -0500 Subject: [PATCH 3/5] runtime: Move Dekaf invocations into `Runtime::unary_materialize()` In order for field selection to work in the UI, it needs to be able to validate a draft with a Dekaf connector in it. This detects when the request is for a Dekaf materialization and re-routes it to Dekaf's `unary_materialize`. It also means that the `connector_tags` job can work for Dekaf with a small change to detect image names starting in `ghcr.io/estuary/dekaf-*` and mark them as `ConnectorType::Dekaf`. --- Cargo.lock | 1 + crates/agent/src/connector_tags.rs | 83 ++++++++---------------------- crates/build/src/lib.rs | 9 +--- crates/runtime/Cargo.toml | 1 + crates/runtime/src/unary.rs | 22 ++++++-- 5 files changed, 43 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f17efc967e..25de0124d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5409,6 +5409,7 @@ dependencies = [ "clap 4.5.17", "connector-init", "coroutines", + "dekaf", "derive-sqlite", "doc", "extractors", diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index dc63448975..ba8f827c4a 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -137,26 +137,20 @@ impl TagHandler { let log_handler = logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token); - let spec_result = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { - spec_dekaf(&image_composed).await - } else { - let runtime = Runtime::new( - self.allow_local, - self.connector_network.clone(), - log_handler, - None, // no need to change log level - "ops/connector-tags-job".to_string(), - ); - - match proto_type { - RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, - RuntimeProtocol::Materialize => { - spec_materialization(&image_composed, runtime).await - } - RuntimeProtocol::Derive => { - tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); - return Ok((row.tag_id, JobStatus::SpecFailed)); - } + let runtime = Runtime::new( + self.allow_local, + self.connector_network.clone(), + log_handler, + None, // no need to change log level + "ops/connector-tags-job".to_string(), + ); + + let spec_result = match proto_type { + RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, + RuntimeProtocol::Materialize => spec_materialization(&image_composed, runtime).await, + RuntimeProtocol::Derive => { + tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); + return Ok((row.tag_id, JobStatus::SpecFailed)); } }; @@ -235,9 +229,15 @@ async fn spec_materialization( ) -> anyhow::Result { use proto_flow::materialize; + let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + flow::materialization_spec::ConnectorType::Dekaf as i32 + } else { + flow::materialization_spec::ConnectorType::Image as i32 + }; + let req = materialize::Request { spec: Some(materialize::request::Spec { - connector_type: flow::materialization_spec::ConnectorType::Image as i32, + connector_type, config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) .unwrap(), }), @@ -277,47 +277,6 @@ async fn spec_materialization( }) } -async fn spec_dekaf(image: &str) -> anyhow::Result { - use proto_flow::materialize; - - let req = materialize::Request { - spec: Some(materialize::request::Spec { - connector_type: flow::materialization_spec::ConnectorType::Image as i32, - config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) - .unwrap(), - }), - ..Default::default() - }; - - let spec = dekaf::connector::unary_materialize(req) - .await? - .spec - .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; - - let materialize::response::Spec { - protocol: _, - config_schema_json, - resource_config_schema_json, - documentation_url, - oauth2, - } = spec; - - let oauth = if let Some(oa) = oauth2 { - Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config")) - } else { - None - }; - Ok(ConnectorSpec { - documentation_url, - endpoint_config_schema: RawValue::from_string(config_schema_json) - .context("parsing endpoint config schema")?, - resource_config_schema: RawValue::from_string(resource_config_schema_json) - .context("parsing resource config schema")?, - resource_path_pointers: Vec::new(), - oauth2: oauth, - }) -} - async fn spec_capture( image: &str, runtime: Runtime, diff --git a/crates/build/src/lib.rs b/crates/build/src/lib.rs index 0ea4cdb116..292e6b9886 100644 --- a/crates/build/src/lib.rs +++ b/crates/build/src/lib.rs @@ -376,14 +376,7 @@ impl validation::Connectors for RuntimeConnectors { request: materialize::Request, _data_plane: &'a tables::DataPlane, ) -> BoxFuture<'a, anyhow::Result> { - match flow::materialization_spec::ConnectorType::try_from( - request.validate.as_ref().unwrap().connector_type, - ) { - Ok(flow::materialization_spec::ConnectorType::Dekaf) => { - dekaf::connector::unary_materialize(request).boxed() - } - _ => self.runtime.clone().unary_materialize(request).boxed(), - } + self.runtime.clone().unary_materialize(request).boxed() } } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 2bbe9ec921..d256d071ed 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -13,6 +13,7 @@ assemble = { path = "../assemble" } async-process = { path = "../async-process" } connector-init = { path = "../connector-init" } coroutines = { path = "../coroutines" } +dekaf = { path = "../dekaf" } derive-sqlite = { path = "../derive-sqlite" } doc = { path = "../doc" } extractors = { path = "../extractors" } diff --git a/crates/runtime/src/unary.rs b/crates/runtime/src/unary.rs index 7ad7e28b82..8dc4204fe8 100644 --- a/crates/runtime/src/unary.rs +++ b/crates/runtime/src/unary.rs @@ -1,6 +1,6 @@ use super::{LogHandler, Runtime}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use proto_flow::{capture, derive, materialize}; +use proto_flow::{capture, derive, flow::materialization_spec, materialize}; impl Runtime { pub async fn unary_capture( @@ -20,8 +20,24 @@ impl Runtime { self, request: materialize::Request, ) -> anyhow::Result { - let response = self.serve_materialize(unary_in(request)).boxed(); - unary_out(response).await + let is_dekaf = request.spec.as_ref().is_some_and(|spec| { + matches!( + spec.connector_type(), + materialization_spec::ConnectorType::Dekaf + ) + }) || request.validate.as_ref().is_some_and(|validate| { + matches!( + validate.connector_type(), + materialization_spec::ConnectorType::Dekaf + ) + }); + + if is_dekaf { + dekaf::connector::unary_materialize(request).await + } else { + let unary_resp = self.serve_materialize(unary_in(request)).boxed(); + unary_out(unary_resp).await + } } } From 8fd0c27ee3794321d0f95f907130b36afc3ef889 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 6 Jan 2025 12:19:06 -0500 Subject: [PATCH 4/5] runtime: Move dekaf image check to `flow_runtime_protocol` --- crates/agent/src/connector_tags.rs | 25 +++++++++---------------- crates/runtime/src/container.rs | 9 +++++++++ crates/runtime/src/lib.rs | 2 +- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index ba8f827c4a..dd1cfa1af9 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -83,11 +83,6 @@ impl Handler for TagHandler { /// connector_tags without having to push to a registry. pub const LOCAL_IMAGE_TAG: &str = ":local"; -/// Connectors with an image name starting with this value are Dekaf-type materializations and we should -/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf` -/// so that `Runtime` will invoke Dekaf's in-tree connector implementation -pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-"; - impl TagHandler { #[tracing::instrument(err, skip_all, fields(id=?row.tag_id))] async fn process( @@ -105,7 +100,8 @@ impl TagHandler { ); let image_composed = format!("{}{}", row.image_name, row.image_tag); - if row.image_tag != LOCAL_IMAGE_TAG && !row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) + if row.image_tag != LOCAL_IMAGE_TAG + && !row.image_name.starts_with(runtime::DEKAF_IMAGE_NAME_PREFIX) { // Pull the image. let pull = jobs::run( @@ -123,17 +119,14 @@ impl TagHandler { } } - let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { - RuntimeProtocol::Materialize - } else { - match runtime::flow_runtime_protocol(&image_composed).await { - Ok(ct) => ct, - Err(err) => { - tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); - return Ok((row.tag_id, JobStatus::SpecFailed)); - } + let proto_type = match runtime::flow_runtime_protocol(&image_composed).await { + Ok(ct) => ct, + Err(err) => { + tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol"); + return Ok((row.tag_id, JobStatus::SpecFailed)); } }; + let log_handler = logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token); @@ -229,7 +222,7 @@ async fn spec_materialization( ) -> anyhow::Result { use proto_flow::materialize; - let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + let connector_type = if image.starts_with(runtime::DEKAF_IMAGE_NAME_PREFIX) { flow::materialization_spec::ConnectorType::Dekaf as i32 } else { flow::materialization_spec::ConnectorType::Image as i32 diff --git a/crates/runtime/src/container.rs b/crates/runtime/src/container.rs index 94cb5149b3..067cb9263f 100644 --- a/crates/runtime/src/container.rs +++ b/crates/runtime/src/container.rs @@ -21,11 +21,20 @@ const PORT_PROTO_LABEL_PREFIX: &str = "dev.estuary.port-proto."; const CONNECTOR_INIT_IMAGE: &str = "ghcr.io/estuary/flow:v0.5.7-119-g552f6c0ee2"; const CONNECTOR_INIT_IMAGE_PATH: &str = "/usr/local/bin/flow-connector-init"; +/// Connectors with an image name starting with this value are Dekaf-type materializations and we should +/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf` +/// so that `Runtime` will invoke Dekaf's in-tree connector implementation +pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-"; + /// Determines the protocol of an image. If the image has a `FLOW_RUNTIME_PROTOCOL` label, /// then it's value is used. Otherwise, this will apply a simple heuristic based on the image name, /// for backward compatibility purposes. An error will be returned if it fails to inspect the image /// or parse the label. The image must already have been pulled before calling this function. pub async fn flow_runtime_protocol(image: &str) -> anyhow::Result { + if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + return Ok(RuntimeProtocol::Materialize); + } + let inspect_output = docker_cmd(&["inspect", image]) .await .context("inspecting image")?; diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 7761054807..b8f126ac24 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -16,7 +16,7 @@ mod tokio_context; mod unary; pub mod uuid; -pub use container::flow_runtime_protocol; +pub use container::{flow_runtime_protocol, DEKAF_IMAGE_NAME_PREFIX}; pub use task_service::TaskService; pub use tokio_context::TokioContext; From 50a2bdbb615a12441441f3aafd011e37592707fb Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Mon, 6 Jan 2025 13:52:35 -0500 Subject: [PATCH 5/5] dekaf: decrypt endpoint config when validating --- crates/dekaf/src/connector.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index 34fecc8b97..b6ce4d3b42 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -103,7 +103,13 @@ pub async fn unary_materialize( .context("validating dekaf config")?; let _parsed_inner_config = serde_json::from_value::( - parsed_outer_config.config.to_value(), + unseal::decrypt_sops(&parsed_outer_config.config) + .await + .context(format!( + "decrypting dekaf endpoint config for variant {}", + parsed_outer_config.variant + ))? + .to_value(), ) .context(format!( "validating dekaf endpoint config for variant {}",