Skip to content

Commit

Permalink
Merge pull request #227 from MaterializeInc/format-json
Browse files Browse the repository at this point in the history
Support FORMAT JSON for sources
  • Loading branch information
benesch authored Jul 7, 2023
2 parents 9d0e684 + 8ed69e2 commit afd70f1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/materialize/format_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type SourceFormatSpecStruct struct {
Csv *CsvFormatSpec
Bytes bool
Text bool
Json bool
}

type SinkAvroFormatSpec struct {
Expand Down Expand Up @@ -88,6 +89,9 @@ func GetFormatSpecStruc(v interface{}) SourceFormatSpecStruct {
if v, ok := u["text"]; ok {
format.Text = v.(bool)
}
if v, ok := u["json"]; ok {
format.Json = v.(bool)
}
return format
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/materialize/source_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (b *SourceKafkaBuilder) Create() error {
q.WriteString(` FORMAT TEXT`)
}

if b.format.Json {
q.WriteString(` FORMAT JSON`)
}

if b.keyFormat.Avro != nil {
if b.keyFormat.Avro.SchemaRegistryConnection.Name != "" {
q.WriteString(fmt.Sprintf(` KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, QualifiedName(b.keyFormat.Avro.SchemaRegistryConnection.DatabaseName, b.keyFormat.Avro.SchemaRegistryConnection.SchemaName, b.keyFormat.Avro.SchemaRegistryConnection.Name)))
Expand Down

0 comments on commit afd70f1

Please sign in to comment.