Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to have null defaults for avro sink schema #21842

5 changes: 3 additions & 2 deletions src/interchange/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod schema;
pub use crate::avro::decode::{Decoder, DiffPair};
pub use crate::avro::encode::{
encode_datums_as_avro, encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
AvroEncoder, AvroSchemaGenerator,
AvroEncoder, AvroSchemaGenerator, AvroSchemaOptions,
};
pub use crate::avro::envelope_cdc_v2 as cdc_v2;
pub use crate::avro::schema::{parse_schema, schema_to_relationdesc, ConfluentAvroResolver};
Expand Down Expand Up @@ -163,7 +163,8 @@ mod tests {
];
for (typ, datum, expected) in valid_pairings {
let desc = RelationDesc::empty().with_column("column1", typ.nullable(false));
let schema_generator = AvroSchemaGenerator::new(None, None, None, desc, false).unwrap();
let schema_generator =
AvroSchemaGenerator::new(None, desc, Default::default()).unwrap();
let avro_value =
encode_datums_as_avro(std::iter::once(datum), schema_generator.value_columns());
assert_eq!(
Expand Down
24 changes: 18 additions & 6 deletions src/interchange/src/avro/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ fn encode_message_unchecked(
buf
}

#[derive(Debug, Default)]
pub struct AvroSchemaOptions {
// Optional avro fullname on the generated key schema.
pub avro_key_fullname: Option<String>,
// Optional avro fullname on the generated value schema.
pub avro_value_fullname: Option<String>,
// Boolean flag to set null defaults for nullable types
pub set_null_defaults: bool,
// Boolean flag to indicate debezium envelope
pub is_debezium: bool,
}

/// Generates key and value Avro schemas
pub struct AvroSchemaGenerator {
value_columns: Vec<(ColumnName, ColumnType)>,
Expand All @@ -119,20 +131,19 @@ pub struct AvroSchemaGenerator {

impl AvroSchemaGenerator {
pub fn new(
key_fullname: Option<&str>,
value_fullname: Option<&str>,
key_desc: Option<RelationDesc>,
value_desc: RelationDesc,
debezium: bool,
options: AvroSchemaOptions,
moulimukherjee marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<Self, anyhow::Error> {
let mut value_columns = column_names_and_types(value_desc);
if debezium {
if options.is_debezium {
value_columns = envelopes::dbz_envelope(value_columns);
}
let row_schema = build_row_schema_json(
&value_columns,
value_fullname.unwrap_or("envelope"),
options.avro_value_fullname.as_deref().unwrap_or("envelope"),
&ENVELOPE_CUSTOM_NAMES,
options.set_null_defaults,
)?;
let writer_schema = Schema::parse(&row_schema).expect("valid schema constructed");
let key_info = match key_desc {
Expand All @@ -141,8 +152,9 @@ impl AvroSchemaGenerator {
let columns = column_names_and_types(key_desc);
let row_schema = build_row_schema_json(
&columns,
key_fullname.unwrap_or("row"),
options.avro_key_fullname.as_deref().unwrap_or("row"),
&BTreeMap::new(),
options.set_null_defaults,
)?;
Some(KeyInfo {
schema: Schema::parse(&row_schema).expect("valid schema constructed"),
Expand Down
1 change: 1 addition & 0 deletions src/interchange/src/avro/envelope_cdc_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ mod tests {
&crate::encode::column_names_and_types(desc),
"data",
&BTreeMap::new(),
false,
)
.unwrap();
let schema = build_schema(row_schema);
Expand Down
51 changes: 40 additions & 11 deletions src/interchange/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl fmt::Debug for JsonEncoder {
"schema",
&format!(
"{:?}",
build_row_schema_json(&self.value_columns, "schema", &BTreeMap::new())
build_row_schema_json(&self.value_columns, "schema", &BTreeMap::new(), false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here––using boolean params in the outermost API isn't ideal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaving this one in. In the following avro updates, there will be more configs like user defined comments and eventually user defined default values which should live in a struct along with this flag.

),
)
.finish()
Expand Down Expand Up @@ -247,10 +247,11 @@ impl ToJson for TypedDatum<'_> {
}
}

fn build_row_schema_field(
fn build_row_schema_field_type(
type_namer: &mut Namer,
custom_names: &BTreeMap<GlobalId, String>,
typ: &ColumnType,
set_null_defaults: bool,
) -> serde_json::Value {
let mut field_type = match &typ.scalar_type {
ScalarType::AclItem => json!("string"),
Expand Down Expand Up @@ -304,27 +305,29 @@ fn build_row_schema_field(
"logicalType": "uuid",
}),
ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
let inner = build_row_schema_field(
let inner = build_row_schema_field_type(
type_namer,
custom_names,
&ColumnType {
nullable: true,
scalar_type: ty.unwrap_collection_element_type().clone(),
},
set_null_defaults,
);
json!({
"type": "array",
"items": inner
})
}
ScalarType::Map { value_type, .. } => {
let inner = build_row_schema_field(
let inner = build_row_schema_field_type(
type_namer,
custom_names,
&ColumnType {
nullable: true,
scalar_type: (**value_type).clone(),
},
set_null_defaults,
);
json!({
"type": "map",
Expand All @@ -342,7 +345,8 @@ fn build_row_schema_field(
json!(name)
} else {
let fields = fields.to_vec();
let json_fields = build_row_schema_fields(&fields, type_namer, custom_names);
let json_fields =
build_row_schema_fields(&fields, type_namer, custom_names, set_null_defaults);
json!({
"type": "record",
"name": name,
Expand Down Expand Up @@ -377,16 +381,35 @@ fn build_row_schema_fields(
columns: &[(ColumnName, ColumnType)],
type_namer: &mut Namer,
custom_names: &BTreeMap<GlobalId, String>,
set_null_defaults: bool,
) -> Vec<serde_json::Value> {
let mut fields = Vec::new();
let mut field_namer = Namer::default();
for (name, typ) in columns.iter() {
let (name, _seen) = field_namer.valid_name(name.as_str());
let field_type = build_row_schema_field(type_namer, custom_names, typ);
fields.push(json!({
"name": name,
"type": field_type,
}));
let field_type =
build_row_schema_field_type(type_namer, custom_names, typ, set_null_defaults);

let is_nullable_union = field_type.is_array();
moulimukherjee marked this conversation as resolved.
Show resolved Hide resolved
if is_nullable_union {
// currently the only supported union types are nullable ones
let array = field_type.as_array().unwrap();
mz_ore::soft_assert!(array.len() == 2);
mz_ore::soft_assert!(array.first().is_some_and(|v| v == &json!("null")));
}

if set_null_defaults && is_nullable_union {
fields.push(json!({
"name": name,
"type": field_type,
"default": null,
}));
} else {
fields.push(json!({
"name": name,
"type": field_type,
}));
}
}
fields
}
Expand All @@ -396,8 +419,14 @@ pub fn build_row_schema_json(
columns: &[(ColumnName, ColumnType)],
name: &str,
custom_names: &BTreeMap<GlobalId, String>,
set_null_defaults: bool,
) -> Result<serde_json::Value, anyhow::Error> {
let fields = build_row_schema_fields(columns, &mut Namer::default(), custom_names);
let fields = build_row_schema_fields(
columns,
&mut Namer::default(),
custom_names,
set_null_defaults,
);
let _ = mz_avro::schema::Name::parse_simple(name)?;
Ok(json!({
"type": "record",
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Decimal
Declare
Decorrelated
Default
Defaults
moulimukherjee marked this conversation as resolved.
Show resolved Hide resolved
Delete
Delimited
Delimiter
Expand Down
2 changes: 2 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ impl_display_t!(ProtobufSchema);
pub enum CsrConfigOptionName {
AvroKeyFullname,
AvroValueFullname,
NullDefaults,
}

impl AstDisplay for CsrConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
CsrConfigOptionName::AvroKeyFullname => "AVRO KEY FULLNAME",
CsrConfigOptionName::AvroValueFullname => "AVRO VALUE FULLNAME",
CsrConfigOptionName::NullDefaults => "NULL DEFAULTS",
})
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2034,7 +2034,7 @@ impl<'a> Parser<'a> {
}

fn parse_csr_config_option(&mut self) -> Result<CsrConfigOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[AVRO])? {
let name = match self.expect_one_of_keywords(&[AVRO, NULL])? {
AVRO => {
let name = match self.expect_one_of_keywords(&[KEY, VALUE])? {
KEY => CsrConfigOptionName::AvroKeyFullname,
Expand All @@ -2044,6 +2044,10 @@ impl<'a> Parser<'a> {
self.expect_keyword(FULLNAME)?;
name
}
NULL => {
self.expect_keyword(DEFAULTS)?;
moulimukherjee marked this conversation as resolved.
Show resolved Hide resolved
CsrConfigOptionName::NullDefaults
}
_ => unreachable!(),
};
Ok(CsrConfigOption {
Expand Down
35 changes: 35 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT BYTE
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Bytes), envelope: None, with_options: [CreateSinkOption { name: Size, value: Some(Value(String("xlarge"))) }, CreateSinkOption { name: Snapshot, value: Some(Value(Boolean(true))) }] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = TRUE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = true) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(true))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS TRUE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = true) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(true))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: None }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL VALUES) ENVELOPE UPSERT
----
error: Expected DEFAULTS, found VALUES
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL VALUES) ENVELOPE UPSERT
^

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = FALSE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = false) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(false))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE INDEX foo ON myschema.bar (a, b)
----
Expand Down
17 changes: 12 additions & 5 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::iter;
use itertools::Itertools;
use mz_controller_types::{ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL_MICROS};
use mz_expr::CollectionPlan;
use mz_interchange::avro::AvroSchemaGenerator;
use mz_interchange::avro::{AvroSchemaGenerator, AvroSchemaOptions};
use mz_ore::cast::{self, CastFrom, TryCastFrom};
use mz_ore::collections::HashSet;
use mz_ore::str::StrExt;
Expand Down Expand Up @@ -2344,7 +2344,8 @@ fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanErro
generate_extracted_config!(
CsrConfigOption,
(AvroKeyFullname, String),
(AvroValueFullname, String)
(AvroValueFullname, String),
(NullDefaults, bool, Default(false))
);

fn kafka_sink_builder(
Expand Down Expand Up @@ -2444,6 +2445,7 @@ fn kafka_sink_builder(
let CsrConfigOptionExtracted {
avro_key_fullname,
avro_value_fullname,
null_defaults,
..
} = options.try_into()?;

Expand All @@ -2457,14 +2459,19 @@ fn kafka_sink_builder(
sql_bail!("Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names");
}

let options = AvroSchemaOptions {
avro_key_fullname,
avro_value_fullname,
set_null_defaults: null_defaults,
is_debezium: matches!(envelope, SinkEnvelope::Debezium),
};

let schema_generator = AvroSchemaGenerator::new(
avro_key_fullname.as_deref(),
avro_value_fullname.as_deref(),
key_desc_and_indices
.as_ref()
.map(|(desc, _indices)| desc.clone()),
value_desc.clone(),
matches!(envelope, SinkEnvelope::Debezium),
options,
)?;
let value_schema = schema_generator.value_writer_schema().to_string();
let key_schema = schema_generator
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use anyhow::{anyhow, bail, Context};
use differential_dataflow::{Collection, Hashable};
use futures::{StreamExt, TryFutureExt};
use maplit::btreemap;
use mz_interchange::avro::{AvroEncoder, AvroSchemaGenerator};
use mz_interchange::avro::{AvroEncoder, AvroSchemaGenerator, AvroSchemaOptions};
use mz_interchange::encode::Encode;
use mz_interchange::json::JsonEncoder;
use mz_kafka_util::client::{
Expand Down Expand Up @@ -1007,14 +1007,14 @@ where
key_schema_id,
value_schema_id,
}) => {
let schema_generator = AvroSchemaGenerator::new(
None,
None,
key_desc,
value_desc,
matches!(envelope, Some(SinkEnvelope::Debezium)),
)
.expect("avro schema validated");
let options = AvroSchemaOptions {
avro_key_fullname: None,
avro_value_fullname: None,
set_null_defaults: false,
is_debezium: matches!(envelope, Some(SinkEnvelope::Debezium)),
};
let schema_generator = AvroSchemaGenerator::new(key_desc, value_desc, options)
.expect("avro schema validated");
let encoder = AvroEncoder::new(schema_generator, key_schema_id, value_schema_id);
encode_stream(
stream,
Expand Down
Loading