Skip to content

Commit

Permalink
Check
Browse files Browse the repository at this point in the history
  • Loading branch information
Dennis Hume committed Nov 30, 2023
1 parent fd360a8 commit e714b24
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 15 deletions.
40 changes: 25 additions & 15 deletions pkg/materialize/format_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,34 @@ func GetSinkFormatSpecStruc(v interface{}) SinkFormatSpecStruct {
if csr, ok := avro.([]interface{})[0].(map[string]interface{})["schema_registry_connection"]; ok {
key := avro.([]interface{})[0].(map[string]interface{})["avro_key_fullname"].(string)
value := avro.([]interface{})[0].(map[string]interface{})["avro_value_fullname"].(string)
dType := avro.([]interface{})[0].(map[string]interface{})["avro_doc_type"].([]interface{})
dColumn := avro.([]interface{})[0].(map[string]interface{})["avro_doc_column"].([]interface{})

docType := AvroDocType{
Object: GetIdentifierSchemaStruct(dType[0].(map[string]interface{})["object"]),
Doc: dType[0].(map[string]interface{})["doc"].(string),
Key: dType[0].(map[string]interface{})["key"].(bool),
Value: dType[0].(map[string]interface{})["value"].(bool),
var docType AvroDocType
if adt, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_type"].([]interface{}); ok {
if v, ok := adt[0].(map[string]interface{})["object"]; ok {
docType.Object = GetIdentifierSchemaStruct(v)
}
if v, ok := adt[0].(map[string]interface{})["doc"]; ok {
docType.Doc = v.(string)
}
if v, ok := adt[0].(map[string]interface{})["key"]; ok {
docType.Key = v.(bool)
}
if v, ok := adt[0].(map[string]interface{})["value"]; ok {
docType.Value = v.(bool)
}
}

var docColumn []AvroDocColumn
for _, column := range dColumn {
docColumn = append(docColumn, AvroDocColumn{
Object: GetIdentifierSchemaStruct(column.(map[string]interface{})["object"]),
Column: column.(map[string]interface{})["column"].(string),
Doc: column.(map[string]interface{})["doc"].(string),
Key: column.(map[string]interface{})["key"].(bool),
Value: column.(map[string]interface{})["value"].(bool),
})
if adc, ok := avro.([]interface{})[0].(map[string]interface{})["avro_doc_column"]; ok {
for _, column := range adc.([]interface{}) {
docColumn = append(docColumn, AvroDocColumn{
Object: GetIdentifierSchemaStruct(column.(map[string]interface{})["object"]),
Column: column.(map[string]interface{})["column"].(string),
Doc: column.(map[string]interface{})["doc"].(string),
Key: column.(map[string]interface{})["key"].(bool),
Value: column.(map[string]interface{})["value"].(bool),
})
}
}

format.Avro = &SinkAvroFormatSpec{
Expand Down
136 changes: 136 additions & 0 deletions pkg/provider/acceptance_sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,55 @@ func TestAccSinkKafka_basic(t *testing.T) {
})
}

func TestAccSinkKafkaAvro_basic(t *testing.T) {
sinkName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccSinkKafkaAvroResource(sinkName),
Check: resource.ComposeTestCheckFunc(
testAccCheckSinkKafkaExists("materialize_sink_kafka.test"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "name", sinkName+"_sink"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "schema_name", "public"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sinkName+"_sink")),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "cluster_name", sinkName+"_cluster"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "topic", "topic1"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "key.0", "counter"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "key_not_enforced", "true"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.name", sinkName+"_conn_schema"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.schema_registry_connection.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.name", sinkName+"_load_gen"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.object.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_type.0.doc", "top level comment"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.name", sinkName+"_load_gen"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.object.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.column", "counter"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.doc", "comment key"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.0.key", "true"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.name", sinkName+"_load_gen"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.object.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.column", "counter"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.doc", "comment value"),
resource.TestCheckResourceAttr("materialize_sink_kafka.test", "format.0.avro.0.avro_doc_column.1.value", "true"),
),
},
{
ResourceName: "materialize_sink_kafka.test",
ImportState: true,
ImportStateVerify: false,
},
},
})
}

func TestAccSinkKafka_update(t *testing.T) {
slug := acctest.RandStringFromCharSet(5, acctest.CharSetAlpha)
sinkName := fmt.Sprintf("old_%s", slug)
Expand Down Expand Up @@ -194,6 +243,93 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name
`, roleName, connName, tableName, sinkName, sink2Name, sinkOwner, comment)
}

func testAccSinkKafkaAvroResource(sinkName string) string {
return fmt.Sprintf(`
resource "materialize_cluster" "test" {
name = "%[1]s_cluster"
size = "3xsmall"
}
resource "materialize_source_load_generator" "test" {
name = "%[1]s_load_gen"
size = "3xsmall"
load_generator_type = "COUNTER"
}
resource "materialize_connection_kafka" "test" {
name = "%[1]s_conn"
security_protocol = "PLAINTEXT"
kafka_broker {
broker = "redpanda:9092"
}
validate = true
}
resource "materialize_connection_confluent_schema_registry" "test" {
name = "%[1]s_conn_schema"
url = "http://redpanda:8081"
}
resource "materialize_sink_kafka" "test" {
name = "%[1]s_sink"
cluster_name = materialize_cluster.test.name
topic = "topic1"
key = ["counter"]
key_not_enforced = true
from {
name = materialize_source_load_generator.test.name
database_name = materialize_source_load_generator.test.database_name
schema_name = materialize_source_load_generator.test.schema_name
}
kafka_connection {
name = materialize_connection_kafka.test.name
database_name = materialize_connection_kafka.test.database_name
schema_name = materialize_connection_kafka.test.schema_name
}
format {
avro {
schema_registry_connection {
name = materialize_connection_confluent_schema_registry.test.name
database_name = materialize_connection_confluent_schema_registry.test.database_name
schema_name = materialize_connection_confluent_schema_registry.test.schema_name
}
avro_doc_type {
object {
name = materialize_source_load_generator.test.name
database_name = materialize_source_load_generator.test.database_name
schema_name = materialize_source_load_generator.test.schema_name
}
doc = "top level comment"
}
avro_doc_column {
object {
name = materialize_source_load_generator.test.name
database_name = materialize_source_load_generator.test.database_name
schema_name = materialize_source_load_generator.test.schema_name
}
column = "counter"
doc = "comment key"
key = true
}
avro_doc_column {
object {
name = materialize_source_load_generator.test.name
database_name = materialize_source_load_generator.test.database_name
schema_name = materialize_source_load_generator.test.schema_name
}
column = "counter"
doc = "comment value"
value = true
}
}
}
envelope {
debezium = true
}
}
`, sinkName)
}

func testAccCheckSinkKafkaExists(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
db := testAccProvider.Meta().(*sqlx.DB)
Expand Down

0 comments on commit e714b24

Please sign in to comment.