Skip to content

Commit

Permalink
Option to have null defaults for avro sink schema (#21842)
Browse files Browse the repository at this point in the history
This adds a way to specify an option NULL DEFAULTS in the CREATE SINK sql. If this is set, then the generated avro schema will have nullable fields with default value of null.
  • Loading branch information
Mouli Mukherjee authored Sep 26, 2023
1 parent becfcb5 commit e20ae46
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 34 deletions.
5 changes: 3 additions & 2 deletions src/interchange/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod schema;
pub use crate::avro::decode::{Decoder, DiffPair};
pub use crate::avro::encode::{
encode_datums_as_avro, encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
AvroEncoder, AvroSchemaGenerator,
AvroEncoder, AvroSchemaGenerator, AvroSchemaOptions,
};
pub use crate::avro::envelope_cdc_v2 as cdc_v2;
pub use crate::avro::schema::{parse_schema, schema_to_relationdesc, ConfluentAvroResolver};
Expand Down Expand Up @@ -163,7 +163,8 @@ mod tests {
];
for (typ, datum, expected) in valid_pairings {
let desc = RelationDesc::empty().with_column("column1", typ.nullable(false));
let schema_generator = AvroSchemaGenerator::new(None, None, None, desc, false).unwrap();
let schema_generator =
AvroSchemaGenerator::new(None, desc, Default::default()).unwrap();
let avro_value =
encode_datums_as_avro(std::iter::once(datum), schema_generator.value_columns());
assert_eq!(
Expand Down
29 changes: 23 additions & 6 deletions src/interchange/src/avro/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ fn encode_message_unchecked(
buf
}

#[derive(Debug, Default)]
pub struct AvroSchemaOptions {
// Optional avro fullname on the generated key schema.
pub avro_key_fullname: Option<String>,
// Optional avro fullname on the generated value schema.
pub avro_value_fullname: Option<String>,
// Boolean flag to set null defaults for nullable types
pub set_null_defaults: bool,
// Boolean flag to indicate debezium envelope
pub is_debezium: bool,
}

/// Generates key and value Avro schemas
pub struct AvroSchemaGenerator {
value_columns: Vec<(ColumnName, ColumnType)>,
Expand All @@ -119,20 +131,24 @@ pub struct AvroSchemaGenerator {

impl AvroSchemaGenerator {
pub fn new(
key_fullname: Option<&str>,
value_fullname: Option<&str>,
key_desc: Option<RelationDesc>,
value_desc: RelationDesc,
debezium: bool,
AvroSchemaOptions {
is_debezium,
avro_value_fullname,
avro_key_fullname,
set_null_defaults,
}: AvroSchemaOptions,
) -> Result<Self, anyhow::Error> {
let mut value_columns = column_names_and_types(value_desc);
if debezium {
if is_debezium {
value_columns = envelopes::dbz_envelope(value_columns);
}
let row_schema = build_row_schema_json(
&value_columns,
value_fullname.unwrap_or("envelope"),
avro_value_fullname.as_deref().unwrap_or("envelope"),
&ENVELOPE_CUSTOM_NAMES,
set_null_defaults,
)?;
let writer_schema = Schema::parse(&row_schema).expect("valid schema constructed");
let key_info = match key_desc {
Expand All @@ -141,8 +157,9 @@ impl AvroSchemaGenerator {
let columns = column_names_and_types(key_desc);
let row_schema = build_row_schema_json(
&columns,
key_fullname.unwrap_or("row"),
avro_key_fullname.as_deref().unwrap_or("row"),
&BTreeMap::new(),
set_null_defaults,
)?;
Some(KeyInfo {
schema: Schema::parse(&row_schema).expect("valid schema constructed"),
Expand Down
1 change: 1 addition & 0 deletions src/interchange/src/avro/envelope_cdc_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ mod tests {
&crate::encode::column_names_and_types(desc),
"data",
&BTreeMap::new(),
false,
)
.unwrap();
let schema = build_schema(row_schema);
Expand Down
51 changes: 40 additions & 11 deletions src/interchange/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl fmt::Debug for JsonEncoder {
"schema",
&format!(
"{:?}",
build_row_schema_json(&self.value_columns, "schema", &BTreeMap::new())
build_row_schema_json(&self.value_columns, "schema", &BTreeMap::new(), false)
),
)
.finish()
Expand Down Expand Up @@ -247,10 +247,11 @@ impl ToJson for TypedDatum<'_> {
}
}

fn build_row_schema_field(
fn build_row_schema_field_type(
type_namer: &mut Namer,
custom_names: &BTreeMap<GlobalId, String>,
typ: &ColumnType,
set_null_defaults: bool,
) -> serde_json::Value {
let mut field_type = match &typ.scalar_type {
ScalarType::AclItem => json!("string"),
Expand Down Expand Up @@ -304,27 +305,29 @@ fn build_row_schema_field(
"logicalType": "uuid",
}),
ty @ (ScalarType::Array(..) | ScalarType::Int2Vector | ScalarType::List { .. }) => {
let inner = build_row_schema_field(
let inner = build_row_schema_field_type(
type_namer,
custom_names,
&ColumnType {
nullable: true,
scalar_type: ty.unwrap_collection_element_type().clone(),
},
set_null_defaults,
);
json!({
"type": "array",
"items": inner
})
}
ScalarType::Map { value_type, .. } => {
let inner = build_row_schema_field(
let inner = build_row_schema_field_type(
type_namer,
custom_names,
&ColumnType {
nullable: true,
scalar_type: (**value_type).clone(),
},
set_null_defaults,
);
json!({
"type": "map",
Expand All @@ -342,7 +345,8 @@ fn build_row_schema_field(
json!(name)
} else {
let fields = fields.to_vec();
let json_fields = build_row_schema_fields(&fields, type_namer, custom_names);
let json_fields =
build_row_schema_fields(&fields, type_namer, custom_names, set_null_defaults);
json!({
"type": "record",
"name": name,
Expand All @@ -368,6 +372,9 @@ fn build_row_schema_field(
ScalarType::MzAclItem => json!("string"),
};
if typ.nullable {
// Should be revisited if we ever support a different kind of union scheme.
// Currently adding the "null" at the beginning means we can set the default
// value to "null" if such a preference is set.
field_type = json!(["null", field_type]);
}
field_type
Expand All @@ -377,16 +384,32 @@ fn build_row_schema_fields(
columns: &[(ColumnName, ColumnType)],
type_namer: &mut Namer,
custom_names: &BTreeMap<GlobalId, String>,
set_null_defaults: bool,
) -> Vec<serde_json::Value> {
let mut fields = Vec::new();
let mut field_namer = Namer::default();
for (name, typ) in columns.iter() {
let (name, _seen) = field_namer.valid_name(name.as_str());
let field_type = build_row_schema_field(type_namer, custom_names, typ);
fields.push(json!({
"name": name,
"type": field_type,
}));
let field_type =
build_row_schema_field_type(type_namer, custom_names, typ, set_null_defaults);

// It's a nullable union if the type is an array and the first option is "null"
let is_nullable_union = field_type
.as_array()
.is_some_and(|array| array.first().is_some_and(|first| first == &json!("null")));

if set_null_defaults && is_nullable_union {
fields.push(json!({
"name": name,
"type": field_type,
"default": null,
}));
} else {
fields.push(json!({
"name": name,
"type": field_type,
}));
}
}
fields
}
Expand All @@ -396,8 +419,14 @@ pub fn build_row_schema_json(
columns: &[(ColumnName, ColumnType)],
name: &str,
custom_names: &BTreeMap<GlobalId, String>,
set_null_defaults: bool,
) -> Result<serde_json::Value, anyhow::Error> {
let fields = build_row_schema_fields(columns, &mut Namer::default(), custom_names);
let fields = build_row_schema_fields(
columns,
&mut Namer::default(),
custom_names,
set_null_defaults,
);
let _ = mz_avro::schema::Name::parse_simple(name)?;
Ok(json!({
"type": "record",
Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Decimal
Declare
Decorrelated
Default
Defaults
Delete
Delimited
Delimiter
Expand Down
2 changes: 2 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ impl_display_t!(ProtobufSchema);
pub enum CsrConfigOptionName {
AvroKeyFullname,
AvroValueFullname,
NullDefaults,
}

impl AstDisplay for CsrConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
CsrConfigOptionName::AvroKeyFullname => "AVRO KEY FULLNAME",
CsrConfigOptionName::AvroValueFullname => "AVRO VALUE FULLNAME",
CsrConfigOptionName::NullDefaults => "NULL DEFAULTS",
})
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2034,7 +2034,7 @@ impl<'a> Parser<'a> {
}

fn parse_csr_config_option(&mut self) -> Result<CsrConfigOption<Raw>, ParserError> {
let name = match self.expect_one_of_keywords(&[AVRO])? {
let name = match self.expect_one_of_keywords(&[AVRO, NULL])? {
AVRO => {
let name = match self.expect_one_of_keywords(&[KEY, VALUE])? {
KEY => CsrConfigOptionName::AvroKeyFullname,
Expand All @@ -2044,6 +2044,10 @@ impl<'a> Parser<'a> {
self.expect_keyword(FULLNAME)?;
name
}
NULL => {
self.expect_keyword(DEFAULTS)?;
CsrConfigOptionName::NullDefaults
}
_ => unreachable!(),
};
Ok(CsrConfigOption {
Expand Down
35 changes: 35 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT BYTE
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Bytes), envelope: None, with_options: [CreateSinkOption { name: Size, value: Some(Value(String("xlarge"))) }, CreateSinkOption { name: Snapshot, value: Some(Value(Boolean(true))) }] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = TRUE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = true) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(true))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS TRUE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = true) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(true))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: None }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL VALUES) ENVELOPE UPSERT
----
error: Expected DEFAULTS, found VALUES
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL VALUES) ENVELOPE UPSERT
^

parse-statement
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = FALSE) ENVELOPE UPSERT
----
CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 (NULL DEFAULTS = false) ENVELOPE UPSERT
=>
CreateSink(CreateSinkStatement { name: UnresolvedItemName([Ident("foo")]), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: KafkaConnection { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaConfigOption { name: Topic, value: Some(Value(String("topic"))) }] }, key: None }, format: Some(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [CsrConfigOption { name: NullDefaults, value: Some(Value(Boolean(false))) }] }, key_strategy: None, value_strategy: None, seed: None } })), envelope: Some(Upsert), with_options: [] })

parse-statement
CREATE INDEX foo ON myschema.bar (a, b)
----
Expand Down
17 changes: 12 additions & 5 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::iter;
use itertools::Itertools;
use mz_controller_types::{ClusterId, ReplicaId, DEFAULT_REPLICA_LOGGING_INTERVAL_MICROS};
use mz_expr::CollectionPlan;
use mz_interchange::avro::AvroSchemaGenerator;
use mz_interchange::avro::{AvroSchemaGenerator, AvroSchemaOptions};
use mz_ore::cast::{self, CastFrom, TryCastFrom};
use mz_ore::collections::HashSet;
use mz_ore::str::StrExt;
Expand Down Expand Up @@ -2344,7 +2344,8 @@ fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanErro
generate_extracted_config!(
CsrConfigOption,
(AvroKeyFullname, String),
(AvroValueFullname, String)
(AvroValueFullname, String),
(NullDefaults, bool, Default(false))
);

fn kafka_sink_builder(
Expand Down Expand Up @@ -2444,6 +2445,7 @@ fn kafka_sink_builder(
let CsrConfigOptionExtracted {
avro_key_fullname,
avro_value_fullname,
null_defaults,
..
} = options.try_into()?;

Expand All @@ -2457,14 +2459,19 @@ fn kafka_sink_builder(
sql_bail!("Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names");
}

let options = AvroSchemaOptions {
avro_key_fullname,
avro_value_fullname,
set_null_defaults: null_defaults,
is_debezium: matches!(envelope, SinkEnvelope::Debezium),
};

let schema_generator = AvroSchemaGenerator::new(
avro_key_fullname.as_deref(),
avro_value_fullname.as_deref(),
key_desc_and_indices
.as_ref()
.map(|(desc, _indices)| desc.clone()),
value_desc.clone(),
matches!(envelope, SinkEnvelope::Debezium),
options,
)?;
let value_schema = schema_generator.value_writer_schema().to_string();
let key_schema = schema_generator
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use anyhow::{anyhow, bail, Context};
use differential_dataflow::{Collection, Hashable};
use futures::{StreamExt, TryFutureExt};
use maplit::btreemap;
use mz_interchange::avro::{AvroEncoder, AvroSchemaGenerator};
use mz_interchange::avro::{AvroEncoder, AvroSchemaGenerator, AvroSchemaOptions};
use mz_interchange::encode::Encode;
use mz_interchange::json::JsonEncoder;
use mz_kafka_util::client::{
Expand Down Expand Up @@ -1007,14 +1007,14 @@ where
key_schema_id,
value_schema_id,
}) => {
let schema_generator = AvroSchemaGenerator::new(
None,
None,
key_desc,
value_desc,
matches!(envelope, Some(SinkEnvelope::Debezium)),
)
.expect("avro schema validated");
let options = AvroSchemaOptions {
avro_key_fullname: None,
avro_value_fullname: None,
set_null_defaults: false,
is_debezium: matches!(envelope, Some(SinkEnvelope::Debezium)),
};
let schema_generator = AvroSchemaGenerator::new(key_desc, value_desc, options)
.expect("avro schema validated");
let encoder = AvroEncoder::new(schema_generator, key_schema_id, value_schema_id);
encode_stream(
stream,
Expand Down
Loading

0 comments on commit e20ae46

Please sign in to comment.