From 1e6722dcc8d6c038a42ad135aa57e442867b0857 Mon Sep 17 00:00:00 2001 From: Dennis Hume Date: Mon, 18 Dec 2023 12:50:22 -0600 Subject: [PATCH 1/4] Rename Singular Attributes --- integration/source.tf | 2 +- .../acceptance_source_postgres_test.go | 6 +- pkg/resources/resource_source_postgres.go | 87 +++++++++++++++++-- .../resource_source_postgres_test.go | 2 +- 4 files changed, 87 insertions(+), 10 deletions(-) diff --git a/integration/source.tf b/integration/source.tf index 73cb2c2e..d8e30133 100644 --- a/integration/source.tf +++ b/integration/source.tf @@ -91,7 +91,7 @@ resource "materialize_source_postgres" "example_source_postgres_schema" { name = "source_postgres_schema" size = "3xsmall" publication = "mz_source" - schema = ["PUBLIC"] + schemas = ["PUBLIC"] postgres_connection { name = materialize_connection_postgres.postgres_connection.name diff --git a/pkg/provider/acceptance_source_postgres_test.go b/pkg/provider/acceptance_source_postgres_test.go index a165fc3d..5e6f4f09 100644 --- a/pkg/provider/acceptance_source_postgres_test.go +++ b/pkg/provider/acceptance_source_postgres_test.go @@ -75,8 +75,8 @@ func TestAccSourcePostgresSchema_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_postgres.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sourceName+"_source")), resource.TestCheckResourceAttr("materialize_source_postgres.test", "cluster_name", sourceName+"_cluster"), resource.TestCheckResourceAttr("materialize_source_postgres.test", "publication", "mz_source"), - resource.TestCheckResourceAttr("materialize_source_postgres.test", "schema.#", "1"), - resource.TestCheckResourceAttr("materialize_source_postgres.test", "schema.0", "PUBLIC"), + resource.TestCheckResourceAttr("materialize_source_postgres.test", "schemas.#", "1"), + resource.TestCheckResourceAttr("materialize_source_postgres.test", "schemas.0", "PUBLIC"), ), }, { @@ -353,7 +353,7 @@ func testAccSourcePostgresResourceSchema(sourceName string) string { database_name = materialize_connection_postgres.test.database_name } publication = "mz_source" - schema = ["PUBLIC"] + schemas = ["PUBLIC"] } `, sourceName) } diff --git a/pkg/resources/resource_source_postgres.go b/pkg/resources/resource_source_postgres.go index 6fba1493..6c5c2ec7 100644 --- a/pkg/resources/resource_source_postgres.go +++ b/pkg/resources/resource_source_postgres.go @@ -2,6 +2,7 @@ package resources import ( "context" + "fmt" "log" "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" @@ -12,7 +13,67 @@ import ( "github.com/jmoiron/sqlx" ) -var sourcePostgresSchema = map[string]*schema.Schema{ +func sourcePostgresSchemaV0() *schema.Resource { + return &schema.Resource{ + Schema: map[string]*schema.Schema{ + "name": ObjectNameSchema("source", true, false), + "schema_name": SchemaNameSchema("source", false), + "database_name": DatabaseNameSchema("source", false), + "qualified_sql_name": QualifiedNameSchema("source"), + "comment": CommentSchema(false), + "cluster_name": ObjectClusterNameSchema("source"), + "size": ObjectSizeSchema("source"), + "postgres_connection": IdentifierSchema("postgres_connection", "The PostgreSQL connection to use in the source.", true), + "publication": { + Description: "The PostgreSQL publication (the replication data set containing the tables to be streamed to Materialize).", + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "text_columns": { + Description: "Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute.", + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, + Optional: true, + }, + "table": { + Description: "Creates subsources for specific tables. If neither table or schema is specified, will default to ALL TABLES", + Type: schema.TypeList, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "name": { + Description: "The name of the table.", + Type: schema.TypeString, + Required: true, + }, + "alias": { + Description: "The alias of the table.", + Type: schema.TypeString, + Optional: true, + }, + }, + }, + Optional: true, + MinItems: 1, + ConflictsWith: []string{"schema"}, + }, + "schema": { + Description: "Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES", + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, + Optional: true, + ForceNew: true, + MinItems: 1, + ConflictsWith: []string{"table"}, + }, + "expose_progress": IdentifierSchema("expose_progress", "The name of the progress subsource for the source. If this is not specified, the subsource will be named `_progress`.", false), + "subsource": SubsourceSchema(), + "ownership_role": OwnershipRoleSchema(), + }, + } +} + +var sourcePostgresSchemaV1 = map[string]*schema.Schema{ "name": ObjectNameSchema("source", true, false), "schema_name": SchemaNameSchema("source", false), "database_name": DatabaseNameSchema("source", false), @@ -52,9 +113,9 @@ var sourcePostgresSchema = map[string]*schema.Schema{ }, Optional: true, MinItems: 1, - ConflictsWith: []string{"schema"}, + ConflictsWith: []string{"schemas"}, }, - "schema": { + "schemas": { Description: "Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES", Type: schema.TypeList, Elem: &schema.Schema{Type: schema.TypeString}, @@ -68,6 +129,14 @@ var sourcePostgresSchema = map[string]*schema.Schema{ "ownership_role": OwnershipRoleSchema(), } +func postgresSourceStateUpgradeV0(_ context.Context, rawState map[string]interface{}, _ interface{}) (map[string]interface{}, error) { + if rawState == nil { + return nil, fmt.Errorf("SourcePostgres resource state upgrade failed, state is nil") + } + rawState["schemas"] = rawState["schema"] + return rawState, nil +} + func SourcePostgres() *schema.Resource { return &schema.Resource{ Description: "A Postgres source describes a PostgreSQL instance you want Materialize to read data from.", @@ -81,7 +150,15 @@ func SourcePostgres() *schema.Resource { StateContext: schema.ImportStatePassthroughContext, }, - Schema: sourcePostgresSchema, + Schema: sourcePostgresSchemaV1, + SchemaVersion: 1, + StateUpgraders: []schema.StateUpgrader{ + { + Version: 0, + Type: sourcePostgresSchemaV0().CoreConfigSchema().ImpliedType(), + Upgrade: postgresSourceStateUpgradeV0, + }, + }, } } @@ -115,7 +192,7 @@ func sourcePostgresCreate(ctx context.Context, d *schema.ResourceData, meta any) b.Table(tables) } - if v, ok := d.GetOk("schema"); ok { + if v, ok := d.GetOk("schemas"); ok { schemas := materialize.GetSliceValueString(v.([]interface{})) b.Schema(schemas) } diff --git a/pkg/resources/resource_source_postgres_test.go b/pkg/resources/resource_source_postgres_test.go index 246caa59..f412b617 100644 --- a/pkg/resources/resource_source_postgres_test.go +++ b/pkg/resources/resource_source_postgres_test.go @@ -75,7 +75,7 @@ var inSourcePostgresSchema = map[string]interface{}{ }, "publication": "mz_source", "text_columns": []interface{}{"table.unsupported_type_1"}, - "schema": []interface{}{"schema1", "schema2"}, + "schemas": []interface{}{"schema1", "schema2"}, } func TestResourceSourcePostgresCreateSchema(t *testing.T) { From 32aaa4396dad35c1359550ffe9249724303262ee Mon Sep 17 00:00:00 2001 From: dehume Date: Tue, 19 Dec 2023 14:51:22 +0000 Subject: [PATCH 2/4] Terraform Docs --- docs/resources/source_postgres.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/source_postgres.md b/docs/resources/source_postgres.md index e9a4c75f..1b50d707 100644 --- a/docs/resources/source_postgres.md +++ b/docs/resources/source_postgres.md @@ -59,8 +59,8 @@ resource "materialize_source_postgres" "example_source_postgres" { - `database_name` (String) The identifier for the source database. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set. - `expose_progress` (Block List, Max: 1) The name of the progress subsource for the source. If this is not specified, the subsource will be named `_progress`. (see [below for nested schema](#nestedblock--expose_progress)) - `ownership_role` (String) The owernship role of the object. -- `schema` (List of String) Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES - `schema_name` (String) The identifier for the source schema. Defaults to `public`. +- `schemas` (List of String) Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES - `size` (String) The size of the source. If not specified, the `cluster_name` option must be specified. - `table` (Block List) Creates subsources for specific tables. If neither table or schema is specified, will default to ALL TABLES (see [below for nested schema](#nestedblock--table)) - `text_columns` (List of String) Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute. From c51d479f51edd9aee823a80ad63ce5c319edddcc Mon Sep 17 00:00:00 2001 From: Dennis Hume Date: Tue, 19 Dec 2023 11:04:03 -0600 Subject: [PATCH 3/4] Check --- .../acceptance_source_kafka_migration_test.go | 111 ++++++++++++ pkg/provider/acceptance_source_kafka_test.go | 4 +- pkg/resources/resource_source_kafka.go | 159 +++++++++++++++++- pkg/resources/resource_source_kafka_test.go | 2 +- pkg/resources/resource_source_postgres.go | 8 +- 5 files changed, 273 insertions(+), 11 deletions(-) create mode 100644 pkg/provider/acceptance_source_kafka_migration_test.go diff --git a/pkg/provider/acceptance_source_kafka_migration_test.go b/pkg/provider/acceptance_source_kafka_migration_test.go new file mode 100644 index 00000000..8835dc3a --- /dev/null +++ b/pkg/provider/acceptance_source_kafka_migration_test.go @@ -0,0 +1,111 @@ +package provider + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-testing/helper/acctest" + "github.com/hashicorp/terraform-plugin-testing/helper/resource" +) + +func TestAccSourceKafkaMigration_basic(t *testing.T) { + addTestTopic() + sourceName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + ExternalProviders: map[string]resource.ExternalProvider{ + "materialize": { + VersionConstraint: "0.4.1", + Source: "MaterializeInc/materialize", + }, + }, + Config: testAccSourceKafkaMigrationV0Resource(sourceName), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceKafkaExists("materialize_source_kafka.test"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offset.#", "1"), + ), + }, + { + ProviderFactories: testAccProviderFactories, + Config: testAccSourceKafkaMigrationV1Resource(sourceName), + Check: resource.ComposeTestCheckFunc( + testAccCheckSourceKafkaExists("materialize_source_kafka.test"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offsets.#", "1"), + ), + }, + { + ProviderFactories: testAccProviderFactories, + ResourceName: "materialize_source_kafka.test", + ImportState: true, + ImportStateVerify: false, + }, + }, + }) +} + +func testAccSourceKafkaMigrationV0Resource(sourceName string) string { + return fmt.Sprintf(` + resource "materialize_connection_kafka" "test" { + name = "%[1]s" + kafka_broker { + broker = "redpanda:9092" + } + security_protocol = "PLAINTEXT" + } + + resource "materialize_source_kafka" "test" { + name = "%[1]s" + kafka_connection { + name = materialize_connection_kafka.test.name + } + + size = "3xsmall" + topic = "terraform" + key_format { + text = true + } + value_format { + text = true + } + envelope { + none = true + } + start_offset = [0] + } +`, sourceName) +} + +func testAccSourceKafkaMigrationV1Resource(sourceName string) string { + return fmt.Sprintf(` + resource "materialize_connection_kafka" "test" { + name = "%[1]s" + kafka_broker { + broker = "redpanda:9092" + } + security_protocol = "PLAINTEXT" + } + + resource "materialize_source_kafka" "test" { + name = "%[1]s" + kafka_connection { + name = materialize_connection_kafka.test.name + } + + size = "3xsmall" + topic = "terraform" + key_format { + text = true + } + value_format { + text = true + } + envelope { + none = true + } + start_offset = [0] + } +`, sourceName) +} diff --git a/pkg/provider/acceptance_source_kafka_test.go b/pkg/provider/acceptance_source_kafka_test.go index 1b98d115..c15bd048 100644 --- a/pkg/provider/acceptance_source_kafka_test.go +++ b/pkg/provider/acceptance_source_kafka_test.go @@ -54,7 +54,7 @@ func TestAccSourceKafka_basic(t *testing.T) { resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.name", connName), resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.database_name", "materialize"), resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.schema_name", "public"), - resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offset.#", "1"), + resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offsets.#", "1"), resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_timestamp_alias", "timestamp_alias"), resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset", "true"), resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset_alias", "offset_alias"), @@ -212,7 +212,7 @@ func testAccSourceKafkaResource(roleName, connName, sourceName, source2Name, sou none = true } - start_offset = [0] + start_offsets = [0] include_timestamp_alias = "timestamp_alias" include_offset = true include_offset_alias = "offset_alias" diff --git a/pkg/resources/resource_source_kafka.go b/pkg/resources/resource_source_kafka.go index ebf77526..63406a61 100644 --- a/pkg/resources/resource_source_kafka.go +++ b/pkg/resources/resource_source_kafka.go @@ -2,6 +2,7 @@ package resources import ( "context" + "fmt" "log" "github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize" @@ -12,6 +13,140 @@ import ( "github.com/jmoiron/sqlx" ) +func sourceKafkaSchemaV0() *schema.Resource { + return &schema.Resource{ + Schema: map[string]*schema.Schema{"name": ObjectNameSchema("source", true, false), + "schema_name": SchemaNameSchema("source", false), + "database_name": DatabaseNameSchema("source", false), + "qualified_sql_name": QualifiedNameSchema("source"), + "comment": CommentSchema(false), + "cluster_name": ObjectClusterNameSchema("source"), + "size": ObjectSizeSchema("source"), + "kafka_connection": IdentifierSchema("kafka_connection", "The Kafka connection to use in the source.", true), + "topic": { + Description: "The Kafka topic you want to subscribe to.", + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "include_key": { + Description: "Include a column containing the Kafka message key.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "include_key_alias": { + Description: "Provide an alias for the key column.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "include_headers": { + Description: "Include message headers.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Default: false, + }, + "include_headers_alias": { + Description: "Provide an alias for the headers column.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "include_partition": { + Description: "Include a partition column containing the Kafka message partition", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "include_partition_alias": { + Description: "Provide an alias for the partition column.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "include_offset": { + Description: "Include an offset column containing the Kafka message offset.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "include_offset_alias": { + Description: "Provide an alias for the offset column.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "include_timestamp": { + Description: "Include a timestamp column containing the Kafka message timestamp.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "include_timestamp_alias": { + Description: "Provide an alias for the timestamp column.", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "format": FormatSpecSchema("format", "How to decode raw bytes from different formats into data structures Materialize can understand at runtime.", false), + "key_format": FormatSpecSchema("key_format", "Set the key format explicitly.", false), + "value_format": FormatSpecSchema("value_format", "Set the value format explicitly.", false), + "envelope": { + Description: "How Materialize should interpret records (e.g. append-only, upsert)..", + Type: schema.TypeList, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "upsert": { + Description: "Use the upsert envelope, which uses message keys to handle CRUD operations.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + ConflictsWith: []string{"envelope.0.debezium", "envelope.0.none"}, + }, + "debezium": { + Description: "Use the Debezium envelope, which uses a diff envelope to handle CRUD operations.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + ConflictsWith: []string{"envelope.0.upsert", "envelope.0.none"}, + }, + "none": { + Description: "Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.", + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + ConflictsWith: []string{"envelope.0.upsert", "envelope.0.debezium"}, + }, + }, + }, + Optional: true, + ForceNew: true, + }, + "start_offset": { + Description: "Read partitions from the specified offset.", + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeInt}, + Optional: true, + ForceNew: true, + ConflictsWith: []string{"start_timestamp"}, + }, + "start_timestamp": { + Description: "Use the specified value to set `START OFFSET` based on the Kafka timestamp.", + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + ConflictsWith: []string{"start_offset"}, + }, + "expose_progress": IdentifierSchema("expose_progress", "The name of the progress subsource for the source. If this is not specified, the subsource will be named `_progress`.", false), + "subsource": SubsourceSchema(), + "ownership_role": OwnershipRoleSchema(), + }, + } +} + var sourceKafkaSchema = map[string]*schema.Schema{ "name": ObjectNameSchema("source", true, false), "schema_name": SchemaNameSchema("source", false), @@ -123,7 +258,7 @@ var sourceKafkaSchema = map[string]*schema.Schema{ Optional: true, ForceNew: true, }, - "start_offset": { + "start_offsets": { Description: "Read partitions from the specified offset.", Type: schema.TypeList, Elem: &schema.Schema{Type: schema.TypeInt}, @@ -136,13 +271,21 @@ var sourceKafkaSchema = map[string]*schema.Schema{ Type: schema.TypeInt, Optional: true, ForceNew: true, - ConflictsWith: []string{"start_offset"}, + ConflictsWith: []string{"start_offsets"}, }, "expose_progress": IdentifierSchema("expose_progress", "The name of the progress subsource for the source. If this is not specified, the subsource will be named `_progress`.", false), "subsource": SubsourceSchema(), "ownership_role": OwnershipRoleSchema(), } +func sourceKafkaStateUpgradeV0(_ context.Context, rawState map[string]interface{}, _ interface{}) (map[string]interface{}, error) { + if rawState == nil { + return nil, fmt.Errorf("SourcePostgres resource state upgrade failed, state is nil") + } + rawState["start_offsets"] = rawState["start_offset"] + return rawState, nil +} + func SourceKafka() *schema.Resource { return &schema.Resource{ Description: "A Kafka source describes a Kafka cluster you want Materialize to read data from.", @@ -156,7 +299,15 @@ func SourceKafka() *schema.Resource { StateContext: schema.ImportStatePassthroughContext, }, - Schema: sourceKafkaSchema, + Schema: sourceKafkaSchema, + SchemaVersion: 1, + StateUpgraders: []schema.StateUpgrader{ + { + Version: 0, + Type: sourceKafkaSchemaV0().CoreConfigSchema().ImpliedType(), + Upgrade: sourceKafkaStateUpgradeV0, + }, + }, } } @@ -245,7 +396,7 @@ func sourceKafkaCreate(ctx context.Context, d *schema.ResourceData, meta any) di b.Envelope(envelope) } - if v, ok := d.GetOk("start_offset"); ok { + if v, ok := d.GetOk("start_offsets"); ok { so := materialize.GetSliceValueInt(v.([]interface{})) b.StartOffset(so) } diff --git a/pkg/resources/resource_source_kafka_test.go b/pkg/resources/resource_source_kafka_test.go index 0e008b16..a68d43fd 100644 --- a/pkg/resources/resource_source_kafka_test.go +++ b/pkg/resources/resource_source_kafka_test.go @@ -48,7 +48,7 @@ var inSourceKafka = map[string]interface{}{ }, }, "envelope": []interface{}{map[string]interface{}{"upsert": true}}, - "start_offset": []interface{}{1, 2, 3}, + "start_offsets": []interface{}{1, 2, 3}, "start_timestamp": -1000, } diff --git a/pkg/resources/resource_source_postgres.go b/pkg/resources/resource_source_postgres.go index 6c5c2ec7..39c55b0d 100644 --- a/pkg/resources/resource_source_postgres.go +++ b/pkg/resources/resource_source_postgres.go @@ -73,7 +73,7 @@ func sourcePostgresSchemaV0() *schema.Resource { } } -var sourcePostgresSchemaV1 = map[string]*schema.Schema{ +var sourcePostgresSchema = map[string]*schema.Schema{ "name": ObjectNameSchema("source", true, false), "schema_name": SchemaNameSchema("source", false), "database_name": DatabaseNameSchema("source", false), @@ -129,7 +129,7 @@ var sourcePostgresSchemaV1 = map[string]*schema.Schema{ "ownership_role": OwnershipRoleSchema(), } -func postgresSourceStateUpgradeV0(_ context.Context, rawState map[string]interface{}, _ interface{}) (map[string]interface{}, error) { +func sourcePostgresStateUpgradeV0(_ context.Context, rawState map[string]interface{}, _ interface{}) (map[string]interface{}, error) { if rawState == nil { return nil, fmt.Errorf("SourcePostgres resource state upgrade failed, state is nil") } @@ -150,13 +150,13 @@ func SourcePostgres() *schema.Resource { StateContext: schema.ImportStatePassthroughContext, }, - Schema: sourcePostgresSchemaV1, + Schema: sourcePostgresSchema, SchemaVersion: 1, StateUpgraders: []schema.StateUpgrader{ { Version: 0, Type: sourcePostgresSchemaV0().CoreConfigSchema().ImpliedType(), - Upgrade: postgresSourceStateUpgradeV0, + Upgrade: sourcePostgresStateUpgradeV0, }, }, } From a353c52a80c1778a1ff9f54ef57d4f000398f21d Mon Sep 17 00:00:00 2001 From: dehume Date: Wed, 20 Dec 2023 16:26:39 +0000 Subject: [PATCH 4/4] Terraform Docs --- docs/resources/source_kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/source_kafka.md b/docs/resources/source_kafka.md index 04e821f3..f4708714 100644 --- a/docs/resources/source_kafka.md +++ b/docs/resources/source_kafka.md @@ -74,7 +74,7 @@ resource "materialize_source_kafka" "example_source_kafka" { - `ownership_role` (String) The owernship role of the object. - `schema_name` (String) The identifier for the source schema. Defaults to `public`. - `size` (String) The size of the source. If not specified, the `cluster_name` option must be specified. -- `start_offset` (List of Number) Read partitions from the specified offset. +- `start_offsets` (List of Number) Read partitions from the specified offset. - `start_timestamp` (Number) Use the specified value to set `START OFFSET` based on the Kafka timestamp. - `value_format` (Block List, Max: 1) Set the value format explicitly. (see [below for nested schema](#nestedblock--value_format))