Skip to content

Commit

Permalink
tests: Add platform-check for NULL DEFAULTS sinks
Browse files Browse the repository at this point in the history
Also added 2 small parsing checks

Follow-up to MaterializeInc#21842 which I missed
  • Loading branch information
def- committed Sep 27, 2023
1 parent 4c8dc90 commit d1fe912
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 0 deletions.
284 changes: 284 additions & 0 deletions misc/python/materialize/checks/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,37 @@
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.util import MzVersion


def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


def schemas_null() -> str:
return dedent(
"""
$ set keyschema={
"type": "record",
"name": "Key",
"fields": [
{"name": "key1", "type": "string"}
]
}
$ set schema={
"type" : "record",
"name" : "test",
"fields" : [
{"name":"f1", "type":["null", "string"]},
{"name":"f2", "type":["long", "null"]}
]
}
"""
)


class SinkUpsert(Check):
"""Basic Check on sinks from an upsert source"""

Expand Down Expand Up @@ -294,3 +318,263 @@ def validate(self) -> Testdrive:
"""
)
)


class SinkNullDefaults(Check):
"""Check on an Avro sink with NULL DEFAULTS"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse("0.71.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
schemas_null()
+ dedent(
"""
$ kafka-create-topic topic=sink-source-null
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SOURCE sink_source_null
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
> CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100;
> CREATE SINK sink_sink_null1 FROM sink_source_null_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS )
ENVELOPE DEBEZIUM
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas_null() + dedent(s))
for s in [
"""
$[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_null_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
{"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
{"key1": "D2${kafka-ingest.iteration}"}
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SINK sink_sink_null2 FROM sink_source_null_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS )
ENVELOPE DEBEZIUM
""",
"""
$[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_null_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
{"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
{"key1": "D2${kafka-ingest.iteration}"}
> CREATE SINK sink_sink_null3 FROM sink_source_null_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS )
ENVELOPE DEBEZIUM
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ schema-registry-verify schema-type=avro subject=sink-sink1-value
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink2-value
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink3-value
{"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_null_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$[version>=5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT CREATECLUSTER ON SYSTEM TO materialize
$[version<5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
ALTER ROLE materialize CREATECLUSTER
> SELECT * FROM sink_source_null_view;
D3 <null> 0 100
D3 <null> 1 100
D3 <null> 2 100
D3 <null> 3 100
D3 <null> 4 100
D3 <null> 5 100
D3 <null> 6 100
D3 <null> 7 100
D3 <null> 8 100
D3 <null> 9 100
I2 B <null> 1000
U2 <null> 0 100
U2 <null> 1 100
U2 <null> 2 100
U2 <null> 3 100
U2 <null> 4 100
U2 <null> 5 100
U2 <null> 6 100
U2 <null> 7 100
U2 <null> 8 100
U2 <null> 9 100
U3 A <null> 1000
# We check the contents of the sink topics by re-ingesting them.
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SOURCE sink_view1
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
> CREATE SOURCE sink_view2
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
> CREATE SOURCE sink_view3
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
# Validate the sink by aggregating all the 'before' and 'after' records using SQL
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view1
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view1
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view2
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view2
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view3
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view3
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> DROP SOURCE sink_view1;
> DROP SOURCE sink_view2;
> DROP SOURCE sink_view3;
"""
)
)
20 changes: 20 additions & 0 deletions test/testdrive/kafka-avro-sinks-defaults.td
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.se
ENVELOPE UPSERT
contains: invalid NULL DEFAULTS: cannot use value as boolean

! CREATE SINK bad_sink FROM v
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
KEY (c2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
(
NULL DEFAULTS = ""
)
ENVELOPE UPSERT
contains: Expected option value, found identifier ""

! CREATE SINK bad_sink FROM v
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
KEY (c2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
(
NULL DEFAULTS = NULL
)
ENVELOPE UPSERT
contains: invalid NULL DEFAULTS: cannot use value as boolean

! CREATE SINK bad_sink FROM v
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
KEY (c2) NOT ENFORCED
Expand Down

0 comments on commit d1fe912

Please sign in to comment.