diff --git a/misc/python/materialize/checks/identifiers.py b/misc/python/materialize/checks/identifiers.py
index d5f288b891f1d..ea69ba8e6297e 100644
--- a/misc/python/materialize/checks/identifiers.py
+++ b/misc/python/materialize/checks/identifiers.py
@@ -72,6 +72,8 @@ def _can_run(self, e: Executor) -> bool:
             "sink2": "17",
             "alias": "18",
             "role": "19",
+            "comment_table": "20",
+            "comment_column": "21",
         },
         {
             "db": "-1.0",
@@ -95,6 +97,8 @@ def _can_run(self, e: Executor) -> bool:
             "sink2": "17.0",
             "alias": "18.0",
             "role": "19.0",
+            "comment_table": "20.0",
+            "comment_column": "21.0",
         },
         {
             "db": "\u0001\u0002\u0003\u0004\u0005\u0006\u0007\b\u000e\u000f\u0010\u0011\u0012\u0013\u0014\u0015\u0016\u0017\u0018\u0019\u001a\u001b\u001c\u001d\u001e\u001f^?",
@@ -118,6 +122,8 @@ def _can_run(self, e: Executor) -> bool:
             "sink2": "",
             "alias": "₀₁₂",
             "role": "⁰⁴⁵₀₁₂",
+            "comment_table": "ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็ ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็ ด้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็็้้้้้้้้็็็็็้้้้้็็็็",
+            "comment_column": "٠١٢٣٤٥٦٧٨٩",
         },
         {
             "db": "찦차를 타고 온 펲시맨과 쑛다리 똠방각하",
@@ -141,6 +147,8 @@ def _can_run(self, e: Executor) -> bool:
             "sink2": "`ィ(´∀`∩",
             "alias": "⅛⅜⅝⅞",
             "role": "ЁЂЃЄЅІЇЈЉЊЋЌЍЎЏАБВГДЕЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯабвгдежзийклмнопрстуфхцчшщъыьэюя",
+            "comment_table": "`⁄€‹›fifl‡°·‚—±",
+            "comment_column": "Œ„´‰ˇÁ¨ˆØ∏”’",
         },
         {
             "db": "❤️ 💔 💌 💕 💞 💓 💗 💖 💘 💝 💟 💜 💛 💚 💙",
@@ -164,6 +172,8 @@ def _can_run(self, e: Executor) -> bool:
             "sink2": "‫test‫",
             "alias": "1#INF",
             "role": "0xffffffffffffffff",
+            "comment_table": "0xabad1dea",
+            "comment_column": "123456789012345678901234567890123456789",
         },
         {
             "db": "ﺍﻺﻃﻼﻗ ﻊﻟ ﺈﻳﻭ.",
@@ -186,7 +196,9 @@ def _can_run(self, e: Executor) -> bool:
             "sink1": "⁦test⁧",
             "sink2": "‪‪᚛                 ᚜‪",
             "alias": "0xabad1dea",
-            "role": "0xffffffffffffffff",
+            "role": "1.000.000,00",
+            "comment_table": "2.2250738585072011e-308",
+            "comment_column": "09",
         },
     ]
 
@@ -230,6 +242,12 @@ def initialize(self) -> Testdrive:
             cmds += f"""
             > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
             """
+        if self.base_version >= MzVersion(0, 72, 0):
+            cmds += f"""
+            > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
+
+            > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
+            """
 
         return Testdrive(schemas() + cluster() + dedent(cmds))
 
@@ -293,6 +311,12 @@ def validate(self) -> Testdrive:
         > SELECT * FROM {dq(self.ident["source_view"])};
         U2 A 1000
         """
+        if self.base_version >= MzVersion(0, 72, 0):
+            cmds += f"""
+        > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
+        <null> {dq_print(self.ident["comment_table"])}
+        1 {dq_print(self.ident["comment_column"])}
+        """
         if self.base_version >= MzVersion(0, 44, 0):
             cmds += f"""
         > SHOW SECRETS;
diff --git a/misc/python/materialize/checks/sink.py b/misc/python/materialize/checks/sink.py
index 12d7632e70c93..7f49716d7e089 100644
--- a/misc/python/materialize/checks/sink.py
+++ b/misc/python/materialize/checks/sink.py
@@ -578,3 +578,289 @@ def validate(self) -> Testdrive:
             """
             )
         )
+
+
+class SinkComments(Check):
+    """Check on an Avro sink with comments"""
+
+    def _can_run(self, e: Executor) -> bool:
+        return self.base_version >= MzVersion.parse("0.73.0-dev")
+
+    def initialize(self) -> Testdrive:
+        return Testdrive(
+            schemas_null()
+            + dedent(
+                """
+                $ kafka-create-topic topic=sink-sourcecomments
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
+
+                > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
+
+                > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
+
+                > CREATE SOURCE sink_source_comments
+                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE UPSERT
+
+                > CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100
+
+                > COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'
+
+                > CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
+                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
+                  KEY (l_v2) NOT ENFORCED
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ( NULL DEFAULTS,
+                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
+                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
+                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
+                  )
+                  ENVELOPE DEBEZIUM
+                """
+            )
+        )
+
+    def manipulate(self) -> list[Testdrive]:
+        return [
+            Testdrive(schemas_null() + dedent(s))
+            for s in [
+                """
+                $[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
+                GRANT SELECT ON sink_source_comments_view TO materialize
+                GRANT USAGE ON CONNECTION kafka_conn TO materialize
+                GRANT USAGE ON CONNECTION csr_conn TO materialize
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
+                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
+                {"key1": "D2${kafka-ingest.iteration}"}
+
+                > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
+
+                > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
+
+                > CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
+                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
+                  KEY (l_v2) NOT ENFORCED
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ( NULL DEFAULTS,
+                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
+                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
+                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
+                  )
+                  ENVELOPE DEBEZIUM
+                """,
+                """
+                $[version>=5200] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
+                GRANT SELECT ON sink_source_comments_view TO materialize
+                GRANT USAGE ON CONNECTION kafka_conn TO materialize
+                GRANT USAGE ON CONNECTION csr_conn TO materialize
+
+                $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
+                {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
+                {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
+                {"key1": "D2${kafka-ingest.iteration}"}
+
+                > CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
+                  INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
+                  KEY (l_v2) NOT ENFORCED
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ( NULL DEFAULTS,
+                    DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
+                    VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
+                    KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
+                  )
+                  ENVELOPE DEBEZIUM
+                """,
+            ]
+        ]
+
+    def validate(self) -> Testdrive:
+        return Testdrive(
+            dedent(
+                """
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
+                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
+
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
+                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
+
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
+                {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
+
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
+                {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
+
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
+                {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
+
+                $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
+                {"type":"record","name":"envelope","doc":"comment on view sink_source_comments_view","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v1"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
+
+                $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
+                GRANT SELECT ON sink_source_comments_view TO materialize
+                GRANT USAGE ON CONNECTION kafka_conn TO materialize
+                GRANT USAGE ON CONNECTION csr_conn TO materialize
+
+                $[version>=5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
+                GRANT CREATECLUSTER ON SYSTEM TO materialize
+
+                $[version<5900] postgres-execute connection=postgres://mz_system@materialized:6877/materialize
+                ALTER ROLE materialize CREATECLUSTER
+
+                > SELECT * FROM sink_source_comments_view;
+                D3 <null> 0 100
+                D3 <null> 1 100
+                D3 <null> 2 100
+                D3 <null> 3 100
+                D3 <null> 4 100
+                D3 <null> 5 100
+                D3 <null> 6 100
+                D3 <null> 7 100
+                D3 <null> 8 100
+                D3 <null> 9 100
+                I2 B <null> 1000
+                U2 <null> 0 100
+                U2 <null> 1 100
+                U2 <null> 2 100
+                U2 <null> 3 100
+                U2 <null> 4 100
+                U2 <null> 5 100
+                U2 <null> 6 100
+                U2 <null> 7 100
+                U2 <null> 8 100
+                U2 <null> 9 100
+                U3 A <null> 1000
+
+                # We check the contents of the sink topics by re-ingesting them.
+
+                > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}';
+
+                > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
+
+                > CREATE SOURCE sink_view_comments1
+                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE NONE
+
+                > CREATE SOURCE sink_view_comments2
+                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE NONE
+
+                > CREATE SOURCE sink_view_comments3
+                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE NONE
+
+                # Validate the sink by aggregating all the 'before' and 'after' records using SQL
+                > SELECT l_v1, l_v2, l_k, SUM(c)
+                  FROM (
+                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
+                    UNION ALL
+                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
+                  ) GROUP BY l_v1, l_v2, l_k
+                  HAVING SUM(c) > 0;
+                <null> 0 D3 100
+                <null> 0 U2 100
+                <null> 1 D3 100
+                <null> 1 U2 100
+                <null> 2 D3 100
+                <null> 2 U2 100
+                <null> 3 D3 100
+                <null> 3 U2 100
+                <null> 4 D3 100
+                <null> 4 U2 100
+                <null> 5 D3 100
+                <null> 5 U2 100
+                <null> 6 D3 100
+                <null> 6 U2 100
+                <null> 7 D3 100
+                <null> 7 U2 100
+                <null> 8 D3 100
+                <null> 8 U2 100
+                <null> 9 D3 100
+                <null> 9 U2 100
+                A <null> U3 1000
+                B <null> I2 1000
+
+                > SELECT l_v1, l_v2, l_k, SUM(c)
+                  FROM (
+                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
+                    UNION ALL
+                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
+                  ) GROUP BY l_v1, l_v2, l_k
+                  HAVING SUM(c) > 0;
+                <null> 0 D3 100
+                <null> 0 U2 100
+                <null> 1 D3 100
+                <null> 1 U2 100
+                <null> 2 D3 100
+                <null> 2 U2 100
+                <null> 3 D3 100
+                <null> 3 U2 100
+                <null> 4 D3 100
+                <null> 4 U2 100
+                <null> 5 D3 100
+                <null> 5 U2 100
+                <null> 6 D3 100
+                <null> 6 U2 100
+                <null> 7 D3 100
+                <null> 7 U2 100
+                <null> 8 D3 100
+                <null> 8 U2 100
+                <null> 9 D3 100
+                <null> 9 U2 100
+                A <null> U3 1000
+                B <null> I2 1000
+
+                > SELECT l_v1, l_v2, l_k, SUM(c)
+                  FROM (
+                    SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
+                    UNION ALL
+                    SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
+                  ) GROUP BY l_v1, l_v2, l_k
+                  HAVING SUM(c) > 0;
+                <null> 0 D3 100
+                <null> 0 U2 100
+                <null> 1 D3 100
+                <null> 1 U2 100
+                <null> 2 D3 100
+                <null> 2 U2 100
+                <null> 3 D3 100
+                <null> 3 U2 100
+                <null> 4 D3 100
+                <null> 4 U2 100
+                <null> 5 D3 100
+                <null> 5 U2 100
+                <null> 6 D3 100
+                <null> 6 U2 100
+                <null> 7 D3 100
+                <null> 7 U2 100
+                <null> 8 D3 100
+                <null> 8 U2 100
+                <null> 9 D3 100
+                <null> 9 U2 100
+                A <null> U3 1000
+                B <null> I2 1000
+
+                > DROP SOURCE sink_view_comments1;
+
+                > DROP SOURCE sink_view_comments2;
+
+                > DROP SOURCE sink_view_comments3;
+            """
+            )
+        )
diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py
index 25b4f1559555d..b8a52ab057e6f 100644
--- a/misc/python/materialize/mzcompose/__init__.py
+++ b/misc/python/materialize/mzcompose/__init__.py
@@ -69,6 +69,7 @@
     "persist_streaming_snapshot_and_fetch_enabled": "true",
     "enable_unified_clusters": "true",
     "enable_jemalloc_profiling": "true",
+    "enable_comment": "true",
 }
 
 DEFAULT_CRDB_ENVIRONMENT = [
diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py
index c3d76d83185aa..5378a75bd3505 100644
--- a/misc/python/materialize/parallel_workload/action.py
+++ b/misc/python/materialize/parallel_workload/action.py
@@ -251,6 +251,19 @@ def run(self, exe: Executor) -> None:
                     table.num_rows = 0
 
 
+class CommentAction(Action):
+    def run(self, exe: Executor) -> None:
+        table = self.rng.choice(self.db.tables)
+
+        if self.rng.choice([True, False]):
+            column = self.rng.choice(table.columns)
+            query = f"COMMENT ON COLUMN {column} IS '{Text.value(self.rng)}'"
+        else:
+            query = f"COMMENT ON TABLE {table} IS '{Text.value(self.rng)}'"
+
+        exe.execute(query)
+
+
 class CreateIndexAction(Action):
     def errors_to_ignore(self) -> list[str]:
         return [
@@ -752,6 +765,7 @@ def __init__(
     [
         (DeleteAction, 10),
         (UpdateAction, 10),
+        (CommentAction, 5),
         (SetClusterAction, 1),
         (ReconnectAction, 1),
         # (TransactionIsolationAction, 1),
diff --git a/test/testdrive/kafka-avro-sinks-doc-comments.td b/test/testdrive/kafka-avro-sinks-doc-comments.td
index 823044541fb5a..d3451d2ee6a6a 100644
--- a/test/testdrive/kafka-avro-sinks-doc-comments.td
+++ b/test/testdrive/kafka-avro-sinks-doc-comments.td
@@ -17,9 +17,9 @@ ALTER SYSTEM SET enable_comment = true;
 > CREATE TABLE t (c1 point, c2 text NOT NULL, c3 custom_map, c4 point list);
 > INSERT INTO t SELECT ROW(1, 1)::point AS c1, 'text' AS c2, '{a=>true}'::custom_map as c3, LIST[ROW(1, 1)::point] as c4;
 
-> COMMENT ON TABLE t IS 'comment on table t';
-> COMMENT ON COLUMN t.c3 IS 'comment on column t.c3';
-> COMMENT ON COLUMN t.c4 IS 'comment on column t.c4';
+> COMMENT ON TABLE t IS 'comment on table t with a \\ \';
+> COMMENT ON COLUMN t.c3 IS 'comment on column t.c3 with a ''';
+> COMMENT ON COLUMN t.c4 IS 'comment on column t.c4 with an äöü';
 > COMMENT ON TYPE point IS 'comment on type point';
 
 > CREATE CONNECTION kafka_conn
@@ -47,10 +47,10 @@ ALTER SYSTEM SET enable_comment = true;
 > SHOW CREATE SINK sink1;
 name                            create_sql
 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- "materialize.public.sink1"  "CREATE SINK \"materialize\".\"public\".\"sink1\" FROM \"materialize\".\"public\".\"t\" INTO KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'testdrive-sink1-${testdrive.seed}') KEY (\"c2\") NOT ENFORCED FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION \"materialize\".\"public\".\"csr_conn\" (DOC ON COLUMN \"materialize\".\"public\".\"t\".c1 = 'doc on t.c1', VALUE DOC ON COLUMN \"materialize\".\"public\".\"t\".c2 = 'value doc on t.c2', KEY DOC ON COLUMN \"materialize\".\"public\".\"t\".c2 = 'key doc on t.c2', DOC ON COLUMN \"materialize\".\"public\".\"t\".c4 = 'doc on t.c4', KEY DOC ON TYPE \"materialize\".\"public\".\"point\" = 'key doc on point', VALUE DOC ON TYPE \"materialize\".\"public\".\"point\" = 'value doc on point', KEY DOC ON TYPE \"materialize\".\"public\".\"t\" = 'key doc on t', DOC ON TYPE \"materialize\".\"public\".\"point\" = 'comment on type point', DOC ON TYPE \"materialize\".\"public\".\"t\" = 'comment on table t', DOC ON COLUMN \"materialize\".\"public\".\"t\".c3 = 'comment on column t.c3') ENVELOPE UPSERT"
+ "materialize.public.sink1"  "CREATE SINK \"materialize\".\"public\".\"sink1\" FROM \"materialize\".\"public\".\"t\" INTO KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'testdrive-sink1-${testdrive.seed}') KEY (\"c2\") NOT ENFORCED FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION \"materialize\".\"public\".\"csr_conn\" (DOC ON COLUMN \"materialize\".\"public\".\"t\".c1 = 'doc on t.c1', VALUE DOC ON COLUMN \"materialize\".\"public\".\"t\".c2 = 'value doc on t.c2', KEY DOC ON COLUMN \"materialize\".\"public\".\"t\".c2 = 'key doc on t.c2', DOC ON COLUMN \"materialize\".\"public\".\"t\".c4 = 'doc on t.c4', KEY DOC ON TYPE \"materialize\".\"public\".\"point\" = 'key doc on point', VALUE DOC ON TYPE \"materialize\".\"public\".\"point\" = 'value doc on point', KEY DOC ON TYPE \"materialize\".\"public\".\"t\" = 'key doc on t', DOC ON TYPE \"materialize\".\"public\".\"point\" = 'comment on type point', DOC ON TYPE \"materialize\".\"public\".\"t\" = 'comment on table t with a \\\\ \\', DOC ON COLUMN \"materialize\".\"public\".\"t\".c3 = 'comment on column t.c3 with a ''') ENVELOPE UPSERT"
 
 $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-value
-{"type":"record","name":"envelope","doc":"comment on table t","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}],"doc":"doc on t.c1"},{"name":"c2","type":"string","doc":"value doc on t.c2"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"doc on t.c4"}]}
+{"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}],"doc":"doc on t.c1"},{"name":"c2","type":"string","doc":"value doc on t.c2"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"doc on t.c4"}]}
 
 $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-key
 {"type":"record","name":"row","doc":"key doc on t","fields":[{"name":"c2","type":"string","doc":"key doc on t.c2"}]}
@@ -62,7 +62,7 @@ $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.se
   ENVELOPE UPSERT;
 
 $ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value
-{"type":"record","name":"envelope","doc":"comment on table t","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4"}]}
+{"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"doc":"comment on column t.c3 with a '"},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}
 
 # errors
 ! CREATE SINK bad_sink FROM t
@@ -114,3 +114,13 @@ contains: option value: cannot be empty
   )
   ENVELOPE UPSERT
 contains: need to specify an object and a column
+
+! CREATE SINK bad_sink FROM t
+  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
+  KEY (c2) NOT ENFORCED
+  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+  (
+   DOC ON COLUMN t.c1 = NULL
+  )
+  ENVELOPE UPSERT;
+contains: cannot use value as string