Skip to content

Commit

Permalink
Fixing debezium case
Browse files Browse the repository at this point in the history
  • Loading branch information
Mouli Mukherjee committed Oct 10, 2023
1 parent 1d48efc commit 388e025
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
6 changes: 3 additions & 3 deletions misc/python/materialize/checks/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,13 @@ def validate(self) -> Testdrive:
{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_comments_view TO materialize
Expand Down
40 changes: 38 additions & 2 deletions src/interchange/src/avro/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::collections::BTreeMap;
use std::fmt;

use anyhow::Ok;
use byteorder::{NetworkEndian, WriteBytesExt};
use chrono::Timelike;
use itertools::Itertools;
Expand All @@ -23,7 +24,7 @@ use once_cell::sync::Lazy;
use serde_json::json;

use crate::encode::{column_names_and_types, Encode, TypedDatum};
use crate::envelopes::{self, ENVELOPE_CUSTOM_NAMES};
use crate::envelopes::{self, DBZ_ROW_TYPE_ID, ENVELOPE_CUSTOM_NAMES};
use crate::json::{build_row_schema_json, SchemaOptions};

// TODO(rkhaitan): this schema intentionally omits the data_collections field
Expand Down Expand Up @@ -140,6 +141,15 @@ pub enum DocTarget {
},
}

impl DocTarget {
fn id(&self) -> GlobalId {
match self {
DocTarget::Type(object_id) => *object_id,
DocTarget::Field { object_id, .. } => *object_id,
}
}
}

/// Generates key and value Avro schemas
pub struct AvroSchemaGenerator {
value_columns: Vec<(ColumnName, ColumnType)>,
Expand All @@ -157,13 +167,39 @@ impl AvroSchemaGenerator {
avro_key_fullname,
set_null_defaults,
sink_from,
value_doc_options,
mut value_doc_options,
key_doc_options,
}: AvroSchemaOptions,
) -> Result<Self, anyhow::Error> {
let mut value_columns = column_names_and_types(value_desc);
if is_debezium {
value_columns = envelopes::dbz_envelope(value_columns);
// With DEBEZIUM envelope the message is wrapped into "before" and "after"
// with `DBZ_ROW_TYPE_ID` instead of `sink_from`.
// Replacing comments for the columns and type in `sink_from` to `DBZ_ROW_TYPE_ID`.
if let Some(sink_from_id) = sink_from {
let mut new_column_docs = BTreeMap::new();
value_doc_options.iter().for_each(|(k, v)| {
if k.id() == sink_from_id {
match k {
DocTarget::Field { column_name, .. } => {
new_column_docs.insert(
DocTarget::Field {
object_id: DBZ_ROW_TYPE_ID,
column_name: column_name.clone(),
},
v.clone(),
);
}
DocTarget::Type(_) => {
new_column_docs.insert(DocTarget::Type(DBZ_ROW_TYPE_ID), v.clone());
}
}
}
});
value_doc_options.append(&mut new_column_docs);
value_doc_options.retain(|k, _v| k.id() != sink_from_id);
}
}
let row_schema = build_row_schema_json(
&value_columns,
Expand Down
15 changes: 15 additions & 0 deletions test/testdrive/kafka-avro-sinks-doc-comments.td
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.se
$ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value
{"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}

> CREATE SINK sink3 FROM t
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
KEY (c2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
(
DOC ON COLUMN t.c2 = 'doc on t.c2'
)
ENVELOPE DEBEZIUM;

$ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-value
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string","doc":"doc on t.c2"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}]},{"name":"after","type":["null","row"]}]}

$ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-key
{"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c2","type":"string","doc":"doc on t.c2"}]}

# errors
! CREATE SINK bad_sink FROM t
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
Expand Down

0 comments on commit 388e025

Please sign in to comment.