Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf runtime changes #1854

Merged
merged 5 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 27 additions & 8 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ impl Handler for TagHandler {
/// connector_tags without having to push to a registry.
pub const LOCAL_IMAGE_TAG: &str = ":local";

/// Connectors with an image name starting with this value are Dekaf-type materializations and we should
/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf`
/// so that `Runtime` will invoke Dekaf's in-tree connector implementation
pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-";

impl TagHandler {
#[tracing::instrument(err, skip_all, fields(id=?row.tag_id))]
async fn process(
Expand All @@ -100,7 +105,8 @@ impl TagHandler {
);
let image_composed = format!("{}{}", row.image_name, row.image_tag);

if row.image_tag != LOCAL_IMAGE_TAG {
if row.image_tag != LOCAL_IMAGE_TAG && !row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX)
{
// Pull the image.
let pull = jobs::run(
"pull",
Expand All @@ -117,15 +123,20 @@ impl TagHandler {
}
}

let proto_type = match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda feel like this conditional should be in runtime::flow_runtime_protocol. Is there a reason to prefer it to be in agent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. I read runtime::flow_runtime_protocol and parse_image_inspection briefly and saw that docker inspect call, but it's trivial to check the image first. 👍

RuntimeProtocol::Materialize
} else {
match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
}
};
let log_handler =
logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token);

let runtime = Runtime::new(
self.allow_local,
self.connector_network.clone(),
Expand Down Expand Up @@ -166,7 +177,9 @@ impl TagHandler {
// Validate that there is an x-collection-name annotation in the resource config schema
// of materialization connectors
if proto_type == RuntimeProtocol::Materialize {
if let Err(err) = crate::resource_configs::pointer_for_schema(resource_config_schema.get()) {
if let Err(err) =
crate::resource_configs::pointer_for_schema(resource_config_schema.get())
{
tracing::warn!(image = %image_composed, error = %err, "resource schema does not have x-collection-name annotation");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
Expand Down Expand Up @@ -216,9 +229,15 @@ async fn spec_materialization(
) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
flow::materialization_spec::ConnectorType::Dekaf as i32
} else {
flow::materialization_spec::ConnectorType::Image as i32
};

let req = materialize::Request {
spec: Some(materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Image as i32,
connector_type,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
.unwrap(),
}),
Expand Down
9 changes: 1 addition & 8 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,7 @@ impl<L: runtime::LogHandler> validation::Connectors for RuntimeConnectors<L> {
request: materialize::Request,
_data_plane: &'a tables::DataPlane,
) -> BoxFuture<'a, anyhow::Result<materialize::Response>> {
match flow::materialization_spec::ConnectorType::try_from(
request.validate.as_ref().unwrap().connector_type,
) {
Ok(flow::materialization_spec::ConnectorType::Dekaf) => {
dekaf::connector::unary_materialize(request).boxed()
}
_ => self.runtime.clone().unary_materialize(request).boxed(),
}
self.runtime.clone().unary_materialize(request).boxed()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ models = { path = "../models" }
ops = { path = "../ops" }
proto-flow = { path = "../proto-flow" }
proto-gazette = { path = "../proto-gazette" }
unseal = { path = "../unseal" }
simd-doc = { path = "../simd-doc" }

aes-siv = { workspace = true }
Expand Down
34 changes: 28 additions & 6 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ impl Default for DeletionMode {
/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
/// Whether or not to expose topic names in a strictly Kafka-compliant format
/// for systems that require it. Off by default.
pub strict_topic_names: bool,
/// The password that will authenticate Kafka consumers to this task.
// TODO(jshearer): Uncomment when schemars 1.0 is out and we upgrade
// #[schemars(extend("secret" = true))]
Expand All @@ -39,7 +36,13 @@ pub struct DekafConfig {
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
#[serde(default)]
#[schemars(title = "Deletion Mode")]
pub deletions: DeletionMode,
/// Whether or not to expose topic names in a strictly Kafka-compliant format
/// for systems that require it. Off by default.
#[serde(default)]
#[schemars(title = "Strict Topic Names")]
pub strict_topic_names: bool,
}

/// Configures a particular binding in a Dekaf-type materialization
Expand All @@ -54,23 +57,42 @@ pub struct DekafResourceConfig {
fn collection_name(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
serde_json::from_value(serde_json::json!({
"x-collection-name": true,
"type": "string"
}))
.unwrap()
}

fn token_secret(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
serde_json::from_value(serde_json::json!({
"title": "Dekaf Auth Token",
"title": "Auth Token",
"secret": true,
"type": "string",
"order": 0
}))
.unwrap()
}

pub async fn unary_materialize(
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
use proto_flow::materialize::response::validated;
if let Some(mut validate) = request.validate {
if let Some(_) = request.spec {
let config_schema = schemars::schema_for!(DekafConfig);
let resource_schema = schemars::schema_for!(DekafResourceConfig);

return Ok(materialize::Response {
spec: Some(materialize::response::Spec {
protocol: 3032023,
config_schema_json: serde_json::to_string(&config_schema)?,
resource_config_schema_json: serde_json::to_string(&resource_schema)?,
documentation_url:
"https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka"
.to_string(),
oauth2: None,
}),
..Default::default()
});
} else if let Some(mut validate) = request.validate {
use proto_flow::materialize::response::validated;
match materialization_spec::ConnectorType::try_from(validate.connector_type)? {
materialization_spec::ConnectorType::Dekaf => {}
other => bail!("invalid connector type: {}", other.as_str_name()),
Expand Down
2 changes: 2 additions & 0 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ assemble = { path = "../assemble" }
async-process = { path = "../async-process" }
connector-init = { path = "../connector-init" }
coroutines = { path = "../coroutines" }
dekaf = { path = "../dekaf" }
derive-sqlite = { path = "../derive-sqlite" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
Expand All @@ -35,6 +36,7 @@ proto-grpc = { path = "../proto-grpc", features = [
] }
simd-doc = { path = "../simd-doc" }
tuple = { path = "../tuple" }
unseal = { path = "../unseal" }

anyhow = { workspace = true }
bytes = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime/src/capture/connector.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{unseal, verify, LogHandler, Runtime};
use crate::{verify, LogHandler, Runtime};
use anyhow::Context;
use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt};
use proto_flow::{
capture::{Request, Response},
flow::capture_spec::ConnectorType,
};
use unseal;

// Start a capture connector as indicated by the `initial` Request.
// Returns a pair of Streams for sending Requests and receiving Responses.
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime/src/derive/connector.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{unseal, LogHandler, Runtime};
use crate::{LogHandler, Runtime};
use anyhow::Context;
use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt};
use proto_flow::{
derive::{Request, Response},
flow::collection_spec::derivation::ConnectorType,
};
use unseal;

// Start a derivation connector as indicated by the `initial` Request.
// Returns a pair of Streams for sending Requests and receiving Responses.
Expand Down
1 change: 0 additions & 1 deletion crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod rocksdb;
mod task_service;
mod tokio_context;
mod unary;
mod unseal;
pub mod uuid;

pub use container::flow_runtime_protocol;
Expand Down
3 changes: 2 additions & 1 deletion crates/runtime/src/materialize/connector.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{unseal, LogHandler, Runtime};
use crate::{LogHandler, Runtime};
use anyhow::{bail, Context};
use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt};
use proto_flow::{
flow::materialization_spec::ConnectorType,
materialize::{Request, Response},
};
use unseal;

// Start a materialization connector as indicated by the `initial` Request.
// Returns a pair of Streams for sending Requests and receiving Responses.
Expand Down
22 changes: 19 additions & 3 deletions crates/runtime/src/unary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{LogHandler, Runtime};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use proto_flow::{capture, derive, materialize};
use proto_flow::{capture, derive, flow::materialization_spec, materialize};

impl<L: LogHandler> Runtime<L> {
pub async fn unary_capture(
Expand All @@ -20,8 +20,24 @@ impl<L: LogHandler> Runtime<L> {
self,
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
let response = self.serve_materialize(unary_in(request)).boxed();
unary_out(response).await
let is_dekaf = request.spec.as_ref().is_some_and(|spec| {
matches!(
spec.connector_type(),
materialization_spec::ConnectorType::Dekaf
)
}) || request.validate.as_ref().is_some_and(|validate| {
matches!(
validate.connector_type(),
materialization_spec::ConnectorType::Dekaf
)
});

if is_dekaf {
dekaf::connector::unary_materialize(request).await
} else {
let unary_resp = self.serve_materialize(unary_in(request)).boxed();
unary_out(unary_resp).await
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions crates/unseal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "unseal"
version.workspace = true
rust-version.workspace = true
edition.workspace = true
authors.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true

[dependencies]
async-process = { path = "../async-process" }
models = { path = "../models" }
locate-bin = { path = "../locate-bin" }

anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
zeroize = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
insta = { workspace = true }
File renamed without changes.
Loading