From 8ed69e23deea223946b4b2b6b1fc1ebc7a8e1877 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Fri, 7 Jul 2023 01:41:07 -0400 Subject: [PATCH] Support FORMAT JSON for sources Part of MaterializeInc/materialize#20387. --- pkg/materialize/format_specs.go | 4 ++++ pkg/materialize/source_kafka.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pkg/materialize/format_specs.go b/pkg/materialize/format_specs.go index 1760f604..22bb5a5b 100644 --- a/pkg/materialize/format_specs.go +++ b/pkg/materialize/format_specs.go @@ -23,6 +23,7 @@ type SourceFormatSpecStruct struct { Csv *CsvFormatSpec Bytes bool Text bool + Json bool } type SinkAvroFormatSpec struct { @@ -88,6 +89,9 @@ func GetFormatSpecStruc(v interface{}) SourceFormatSpecStruct { if v, ok := u["text"]; ok { format.Text = v.(bool) } + if v, ok := u["json"]; ok { + format.Json = v.(bool) + } return format } diff --git a/pkg/materialize/source_kafka.go b/pkg/materialize/source_kafka.go index 3b12d792..9b2b43a0 100644 --- a/pkg/materialize/source_kafka.go +++ b/pkg/materialize/source_kafka.go @@ -202,6 +202,10 @@ func (b *SourceKafkaBuilder) Create() error { q.WriteString(` FORMAT TEXT`) } + if b.format.Json { + q.WriteString(` FORMAT JSON`) + } + if b.keyFormat.Avro != nil { if b.keyFormat.Avro.SchemaRegistryConnection.Name != "" { q.WriteString(fmt.Sprintf(` KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, QualifiedName(b.keyFormat.Avro.SchemaRegistryConnection.DatabaseName, b.keyFormat.Avro.SchemaRegistryConnection.SchemaName, b.keyFormat.Avro.SchemaRegistryConnection.Name)))