From 388e025ead4736c8f4a374432725e0e095af124a Mon Sep 17 00:00:00 2001 From: Mouli Mukherjee Date: Fri, 6 Oct 2023 20:14:20 -0700 Subject: [PATCH] Fixing debezium case --- misc/python/materialize/checks/sink.py | 6 +-- src/interchange/src/avro/encode.rs | 40 ++++++++++++++++++- .../kafka-avro-sinks-doc-comments.td | 15 +++++++ 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/misc/python/materialize/checks/sink.py b/misc/python/materialize/checks/sink.py index 7f49716d7e089..5ec5cba57d5d2 100644 --- a/misc/python/materialize/checks/sink.py +++ b/misc/python/materialize/checks/sink.py @@ -701,13 +701,13 @@ def validate(self) -> Testdrive: {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]} $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value - {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value - {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value - {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize GRANT SELECT ON sink_source_comments_view TO materialize diff --git a/src/interchange/src/avro/encode.rs b/src/interchange/src/avro/encode.rs index 35fa3184da8e6..e775cacbc12c7 100644 --- a/src/interchange/src/avro/encode.rs +++ b/src/interchange/src/avro/encode.rs @@ -10,6 +10,7 @@ use std::collections::BTreeMap; use std::fmt; +use anyhow::Ok; use byteorder::{NetworkEndian, WriteBytesExt}; use chrono::Timelike; use itertools::Itertools; @@ -23,7 +24,7 @@ use once_cell::sync::Lazy; use serde_json::json; use crate::encode::{column_names_and_types, Encode, TypedDatum}; -use crate::envelopes::{self, ENVELOPE_CUSTOM_NAMES}; +use crate::envelopes::{self, DBZ_ROW_TYPE_ID, ENVELOPE_CUSTOM_NAMES}; use crate::json::{build_row_schema_json, SchemaOptions}; // TODO(rkhaitan): this schema intentionally omits the data_collections field @@ -140,6 +141,15 @@ pub enum DocTarget { }, } +impl DocTarget { + fn id(&self) -> GlobalId { + match self { + DocTarget::Type(object_id) => *object_id, + DocTarget::Field { object_id, .. } => *object_id, + } + } +} + /// Generates key and value Avro schemas pub struct AvroSchemaGenerator { value_columns: Vec<(ColumnName, ColumnType)>, @@ -157,13 +167,39 @@ impl AvroSchemaGenerator { avro_key_fullname, set_null_defaults, sink_from, - value_doc_options, + mut value_doc_options, key_doc_options, }: AvroSchemaOptions, ) -> Result { let mut value_columns = column_names_and_types(value_desc); if is_debezium { value_columns = envelopes::dbz_envelope(value_columns); + // With DEBEZIUM envelope the message is wrapped into "before" and "after" + // with `DBZ_ROW_TYPE_ID` instead of `sink_from`. + // Replacing comments for the columns and type in `sink_from` to `DBZ_ROW_TYPE_ID`. + if let Some(sink_from_id) = sink_from { + let mut new_column_docs = BTreeMap::new(); + value_doc_options.iter().for_each(|(k, v)| { + if k.id() == sink_from_id { + match k { + DocTarget::Field { column_name, .. } => { + new_column_docs.insert( + DocTarget::Field { + object_id: DBZ_ROW_TYPE_ID, + column_name: column_name.clone(), + }, + v.clone(), + ); + } + DocTarget::Type(_) => { + new_column_docs.insert(DocTarget::Type(DBZ_ROW_TYPE_ID), v.clone()); + } + } + } + }); + value_doc_options.append(&mut new_column_docs); + value_doc_options.retain(|k, _v| k.id() != sink_from_id); + } } let row_schema = build_row_schema_json( &value_columns, diff --git a/test/testdrive/kafka-avro-sinks-doc-comments.td b/test/testdrive/kafka-avro-sinks-doc-comments.td index d3451d2ee6a6a..13f4922b5f178 100644 --- a/test/testdrive/kafka-avro-sinks-doc-comments.td +++ b/test/testdrive/kafka-avro-sinks-doc-comments.td @@ -64,6 +64,21 @@ $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.se $ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value {"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]} +> CREATE SINK sink3 FROM t + INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}') + KEY (c2) NOT ENFORCED + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( + DOC ON COLUMN t.c2 = 'doc on t.c2' + ) + ENVELOPE DEBEZIUM; + +$ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-value +{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string","doc":"doc on t.c2"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}]},{"name":"after","type":["null","row"]}]} + +$ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-key +{"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c2","type":"string","doc":"doc on t.c2"}]} + # errors ! CREATE SINK bad_sink FROM t INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')