diff --git a/misc/python/materialize/checks/sink.py b/misc/python/materialize/checks/sink.py index f3e7f83bf3eff..fcbe1aa485ed4 100644 --- a/misc/python/materialize/checks/sink.py +++ b/misc/python/materialize/checks/sink.py @@ -11,6 +11,7 @@ from materialize.checks.actions import Testdrive from materialize.checks.checks import Check from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD +from materialize.checks.executors import Executor from materialize.util import MzVersion @@ -18,6 +19,29 @@ def schemas() -> str: return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) +def schemas_null() -> str: + return dedent( + """ + $ set keyschema={ + "type": "record", + "name": "Key", + "fields": [ + {"name": "key1", "type": "string"} + ] + } + + $ set schema={ + "type" : "record", + "name" : "test", + "fields" : [ + {"name":"f1", "type":["null", "string"]}, + {"name":"f2", "type":["long", "null"]} + ] + } + """ + ) + + class SinkUpsert(Check): """Basic Check on sinks from an upsert source""" @@ -294,3 +318,263 @@ def validate(self) -> Testdrive: """ ) ) + + +class SinkNullDefaults(Check): + """Check on an Avro sink with NULL DEFAULTS""" + + def _can_run(self, e: Executor) -> bool: + return self.base_version >= MzVersion.parse("0.71.0-dev") + + def initialize(self) -> Testdrive: + return Testdrive( + schemas_null() + + dedent( + """ + $ kafka-create-topic topic=sink-source-null + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null} + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}} + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null} + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}} + + > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}'; + + > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'; + + > CREATE SOURCE sink_source_null + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE UPSERT + + > CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100; + + > CREATE SINK sink_sink_null1 FROM sink_source_null_view + INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( NULL DEFAULTS ) + ENVELOPE DEBEZIUM + """ + ) + ) + + def manipulate(self) -> list[Testdrive]: + return [ + Testdrive(schemas_null() + dedent(s)) + for s in [ + """ + $[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize + GRANT SELECT ON sink_source_null_view TO materialize + GRANT USAGE ON CONNECTION kafka_conn TO materialize + GRANT USAGE ON CONNECTION csr_conn TO materialize + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null} + {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}} + {"key1": "D2${kafka-ingest.iteration}"} + + > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}'; + + > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'; + + > CREATE SINK sink_sink_null2 FROM sink_source_null_view + INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( NULL DEFAULTS ) + ENVELOPE DEBEZIUM + """, + """ + $[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize + GRANT SELECT ON sink_source_null_view TO materialize + GRANT USAGE ON CONNECTION kafka_conn TO materialize + GRANT USAGE ON CONNECTION csr_conn TO materialize + + $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000 + {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null} + {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}} + {"key1": "D2${kafka-ingest.iteration}"} + + > CREATE SINK sink_sink_null3 FROM sink_source_null_view + INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( NULL DEFAULTS ) + ENVELOPE DEBEZIUM + """, + ] + ] + + def validate(self) -> Testdrive: + return Testdrive( + dedent( + """ + $ schema-registry-verify schema-type=avro subject=sink-sink1-value + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + + $ schema-registry-verify schema-type=avro subject=sink-sink2-value + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + + $ schema-registry-verify schema-type=avro subject=sink-sink3-value + {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]} + + $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize + GRANT SELECT ON sink_source_null_view TO materialize + GRANT USAGE ON CONNECTION kafka_conn TO materialize + GRANT USAGE ON CONNECTION csr_conn TO materialize + + $[version>=5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize + GRANT CREATECLUSTER ON SYSTEM TO materialize + + $[version<5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize + ALTER ROLE materialize CREATECLUSTER + + > SELECT * FROM sink_source_null_view; + D3 0 100 + D3 1 100 + D3 2 100 + D3 3 100 + D3 4 100 + D3 5 100 + D3 6 100 + D3 7 100 + D3 8 100 + D3 9 100 + I2 B 1000 + U2 0 100 + U2 1 100 + U2 2 100 + U2 3 100 + U2 4 100 + U2 5 100 + U2 6 100 + U2 7 100 + U2 8 100 + U2 9 100 + U3 A 1000 + + # We check the contents of the sink topics by re-ingesting them. + + > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}'; + + > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}'; + + > CREATE SOURCE sink_view1 + FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE + + > CREATE SOURCE sink_view2 + FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE + + > CREATE SOURCE sink_view3 + FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3') + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE + + # Validate the sink by aggregating all the 'before' and 'after' records using SQL + > SELECT l_v1, l_v2, l_k, SUM(c) + FROM ( + SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view1 + UNION ALL + SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view1 + ) GROUP BY l_v1, l_v2, l_k + HAVING SUM(c) > 0; + 0 D3 100 + 0 U2 100 + 1 D3 100 + 1 U2 100 + 2 D3 100 + 2 U2 100 + 3 D3 100 + 3 U2 100 + 4 D3 100 + 4 U2 100 + 5 D3 100 + 5 U2 100 + 6 D3 100 + 6 U2 100 + 7 D3 100 + 7 U2 100 + 8 D3 100 + 8 U2 100 + 9 D3 100 + 9 U2 100 + A U3 1000 + B I2 1000 + + > SELECT l_v1, l_v2, l_k, SUM(c) + FROM ( + SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view2 + UNION ALL + SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view2 + ) GROUP BY l_v1, l_v2, l_k + HAVING SUM(c) > 0; + 0 D3 100 + 0 U2 100 + 1 D3 100 + 1 U2 100 + 2 D3 100 + 2 U2 100 + 3 D3 100 + 3 U2 100 + 4 D3 100 + 4 U2 100 + 5 D3 100 + 5 U2 100 + 6 D3 100 + 6 U2 100 + 7 D3 100 + 7 U2 100 + 8 D3 100 + 8 U2 100 + 9 D3 100 + 9 U2 100 + A U3 1000 + B I2 1000 + + > SELECT l_v1, l_v2, l_k, SUM(c) + FROM ( + SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view3 + UNION ALL + SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view3 + ) GROUP BY l_v1, l_v2, l_k + HAVING SUM(c) > 0; + 0 D3 100 + 0 U2 100 + 1 D3 100 + 1 U2 100 + 2 D3 100 + 2 U2 100 + 3 D3 100 + 3 U2 100 + 4 D3 100 + 4 U2 100 + 5 D3 100 + 5 U2 100 + 6 D3 100 + 6 U2 100 + 7 D3 100 + 7 U2 100 + 8 D3 100 + 8 U2 100 + 9 D3 100 + 9 U2 100 + A U3 1000 + B I2 1000 + + > DROP SOURCE sink_view1; + + > DROP SOURCE sink_view2; + + > DROP SOURCE sink_view3; + """ + ) + ) diff --git a/test/testdrive/kafka-avro-sinks-defaults.td b/test/testdrive/kafka-avro-sinks-defaults.td index 494fcef1e209f..f82e117f8f7b2 100644 --- a/test/testdrive/kafka-avro-sinks-defaults.td +++ b/test/testdrive/kafka-avro-sinks-defaults.td @@ -142,6 +142,26 @@ $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.se ENVELOPE UPSERT contains: invalid NULL DEFAULTS: cannot use value as boolean +! CREATE SINK bad_sink FROM v + INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}') + KEY (c2) NOT ENFORCED + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( + NULL DEFAULTS = "" + ) + ENVELOPE UPSERT +contains: Expected option value, found identifier "" + +! CREATE SINK bad_sink FROM v + INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}') + KEY (c2) NOT ENFORCED + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ( + NULL DEFAULTS = NULL + ) + ENVELOPE UPSERT +contains: invalid NULL DEFAULTS: cannot use value as boolean + ! CREATE SINK bad_sink FROM v INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}') KEY (c2) NOT ENFORCED