Skip to content

Commit

Permalink
Add tests for avro sink comments
Browse files Browse the repository at this point in the history
platform-checks: Explicit sink comment check (currently fails)
platform-checks: Added comments to Identifiers check
parallel-workload: Enabled COMMENT ON
testdrive: Added failure case on NULL and escaping
sqlsmith: will come separately in MaterializeInc/sqlsmith#3
  • Loading branch information
def- committed Oct 6, 2023
1 parent 952cdec commit e761c56
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 7 deletions.
26 changes: 25 additions & 1 deletion misc/python/materialize/checks/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def _can_run(self, e: Executor) -> bool:
"sink2": "17",
"alias": "18",
"role": "19",
"comment_table": "20",
"comment_column": "21",
},
{
"db": "-1.0",
Expand All @@ -95,6 +97,8 @@ def _can_run(self, e: Executor) -> bool:
"sink2": "17.0",
"alias": "18.0",
"role": "19.0",
"comment_table": "20.0",
"comment_column": "21.0",
},
{
"db": "\u0001\u0002\u0003\u0004\u0005\u0006\u0007\b\u000e\u000f\u0010\u0011\u0012\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001a\u001b\u001c\u001d\u001e\u001f^?",
Expand All @@ -118,6 +122,8 @@ def _can_run(self, e: Executor) -> bool:
"sink2": "",
"alias": "₀₁₂",
"role": "⁰⁴⁵₀₁₂",
"comment_table": "ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็ ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็ ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็",
"comment_column": "٠١٢٣٤٥٦٧٨٩",
},
{
"db": "찦차를 타고 온 펲시맨과 쑛다리 똠방각하",
Expand All @@ -141,6 +147,8 @@ def _can_run(self, e: Executor) -> bool:
"sink2": "`ィ(´∀`∩",
"alias": "⅛⅜⅝⅞",
"role": "ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏАБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯабвгдежзийклмнопрстуфхцчшщъыьэюя",
"comment_table": "`⁄€‹›fifl‡°·‚—±",
"comment_column": "Œ„´‰ˇÁ¨ˆØ∏”’",
},
{
"db": "❤️ 💔 💌 💕 💞 💓 💗 💖 💘 💝 💟 💜 💛 💚 💙",
Expand All @@ -164,6 +172,8 @@ def _can_run(self, e: Executor) -> bool:
"sink2": "‫test‫",
"alias": "1#INF",
"role": "0xffffffffffffffff",
"comment_table": "0xabad1dea",
"comment_column": "123456789012345678901234567890123456789",
},
{
"db": "ﺍﻺﻃﻼﻗ ﻊﻟ ﺈﻳﻭ.",
Expand All @@ -186,7 +196,9 @@ def _can_run(self, e: Executor) -> bool:
"sink1": "⁦test⁧",
"sink2": "‪‪᚛                 ᚜‪",
"alias": "0xabad1dea",
"role": "0xffffffffffffffff",
"role": "1.000.000,00",
"comment_table": "2.2250738585072011e-308",
"comment_column": "09",
},
]

Expand Down Expand Up @@ -230,6 +242,12 @@ def initialize(self) -> Testdrive:
cmds += f"""
> CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
> COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
"""

return Testdrive(schemas() + cluster() + dedent(cmds))

Expand Down Expand Up @@ -293,6 +311,12 @@ def validate(self) -> Testdrive:
> SELECT * FROM {dq(self.ident["source_view"])};
U2 A 1000
"""
if self.base_version >= MzVersion(0, 72, 0):
cmds += f"""
> SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
<null> {dq_print(self.ident["comment_table"])}
1 {dq_print(self.ident["comment_column"])}
"""
if self.base_version >= MzVersion(0, 44, 0):
cmds += f"""
> SHOW SECRETS;
Expand Down
286 changes: 286 additions & 0 deletions misc/python/materialize/checks/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,289 @@ def validate(self) -> Testdrive:
"""
)
)


class SinkComments(Check):
"""Check on an Avro sink with comments"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse("0.73.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
schemas_null()
+ dedent(
"""
$ kafka-create-topic topic=sink-sourcecomments
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SOURCE sink_source_comments
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
> CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100
> COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'
> CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
KEY (l_v2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS,
DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
)
ENVELOPE DEBEZIUM
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas_null() + dedent(s))
for s in [
"""
$[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_comments_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
{"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
{"key1": "D2${kafka-ingest.iteration}"}
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
KEY (l_v2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS,
DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
)
ENVELOPE DEBEZIUM
""",
"""
$[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_comments_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
{"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
{"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
{"key1": "D2${kafka-ingest.iteration}"}
> CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
KEY (l_v2) NOT ENFORCED
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
( NULL DEFAULTS,
DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
)
ENVELOPE DEBEZIUM
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
{"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
$ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT SELECT ON sink_source_comments_view TO materialize
GRANT USAGE ON CONNECTION kafka_conn TO materialize
GRANT USAGE ON CONNECTION csr_conn TO materialize
$[version>=5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
GRANT CREATECLUSTER ON SYSTEM TO materialize
$[version<5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
ALTER ROLE materialize CREATECLUSTER
> SELECT * FROM sink_source_comments_view;
D3 <null> 0 100
D3 <null> 1 100
D3 <null> 2 100
D3 <null> 3 100
D3 <null> 4 100
D3 <null> 5 100
D3 <null> 6 100
D3 <null> 7 100
D3 <null> 8 100
D3 <null> 9 100
I2 B <null> 1000
U2 <null> 0 100
U2 <null> 1 100
U2 <null> 2 100
U2 <null> 3 100
U2 <null> 4 100
U2 <null> 5 100
U2 <null> 6 100
U2 <null> 7 100
U2 <null> 8 100
U2 <null> 9 100
U3 A <null> 1000
# We check the contents of the sink topics by re-ingesting them.
> CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
> CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
> CREATE SOURCE sink_view_comments1
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
> CREATE SOURCE sink_view_comments2
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
> CREATE SOURCE sink_view_comments3
FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE
# Validate the sink by aggregating all the 'before' and 'after' records using SQL
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> SELECT l_v1, l_v2, l_k, SUM(c)
FROM (
SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
UNION ALL
SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
) GROUP BY l_v1, l_v2, l_k
HAVING SUM(c) > 0;
<null> 0 D3 100
<null> 0 U2 100
<null> 1 D3 100
<null> 1 U2 100
<null> 2 D3 100
<null> 2 U2 100
<null> 3 D3 100
<null> 3 U2 100
<null> 4 D3 100
<null> 4 U2 100
<null> 5 D3 100
<null> 5 U2 100
<null> 6 D3 100
<null> 6 U2 100
<null> 7 D3 100
<null> 7 U2 100
<null> 8 D3 100
<null> 8 U2 100
<null> 9 D3 100
<null> 9 U2 100
A <null> U3 1000
B <null> I2 1000
> DROP SOURCE sink_view_comments1;
> DROP SOURCE sink_view_comments2;
> DROP SOURCE sink_view_comments3;
"""
)
)
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"persist_streaming_snapshot_and_fetch_enabled": "true",
"enable_unified_clusters": "true",
"enable_jemalloc_profiling": "true",
"enable_comment": "true",
}

DEFAULT_CRDB_ENVIRONMENT = [
Expand Down
Loading

0 comments on commit e761c56

Please sign in to comment.