Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source versioning: Postgres, MySQL and Load generator #647

Open
wants to merge 45 commits into
base: main
Choose a base branch
from

Conversation

bobbyiliev
Copy link
Contributor

@bobbyiliev bobbyiliev commented Sep 3, 2024

Initial implementation for the source versioning refactor as per #646

The main changes to consider:

  • Marking the table attribute as optional and deprecated for both the MySQL and Postgres sources
  • Introduced a new all_tables bool attribute for the MySQL and the Loadgen sources, as in the past this was defaulting always using FOR ALL TABLES in the load gen sources (auction, marketing, tpch) and in the MySQL case whenever no table blocks were defined, we defaulted to FOR ALL TABLES. This all_tables bool attribute allows us to create sources without any tables defined as per the source versioning work
  • Introducing the new materialize_source_table_{mysql|postgres|load_generator} resource which allows us to do CREATE TABLE ... FROM SOURCE ...

Things that are still pending: #646

@bobbyiliev bobbyiliev changed the title Source versioning [WIP] Source versioning Sep 3, 2024
@bobbyiliev bobbyiliev force-pushed the source-versioning branch 3 times, most recently from c4a61c8 to e202851 Compare September 9, 2024 13:33
@@ -29,7 +29,7 @@ description: |-
- `ownership_role` (String) The owernship role of the object.
- `region` (String) The region to use for the resource connection. If not set, the default region is used.
- `schema_name` (String) The identifier for the table schema in Materialize. Defaults to `public`.
- `text_columns` (List of String) Columns to be decoded as text.
- `text_columns` (List of String) Columns to be decoded as text. Not supported for the load generator sources, if the source is a load generator, the attribute will be ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to sources, we might want a source table resource per source type? The existing source-level options will basically shift to source table-level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, I was just thinking about this. With the MySQL and Postgres sources, it is probably fine, but as soon as we add Kafka and Webhook sources, the logic will get out of hand.

Will refactor this to have a separate source table resource per source!

@bobbyiliev bobbyiliev changed the title [WIP] Source versioning Source versioning: Postgres, MySQL and Load generator Sep 13, 2024
@bobbyiliev bobbyiliev marked this pull request as ready for review September 13, 2024 12:29
@bobbyiliev bobbyiliev requested a review from a team as a code owner September 13, 2024 12:29
@bobbyiliev bobbyiliev requested review from arusahni and rjobanp and removed request for a team September 13, 2024 12:29
Copy link

@rjobanp rjobanp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work!

Comment on lines 76 to 77
- `start_offset` (List of Number, Deprecated) Read partitions from the specified offset. Deprecated: Use the new materialize_source_table_kafka resource instead.
- `start_timestamp` (Number, Deprecated) Use the specified value to set `START OFFSET` based on the Kafka timestamp. Deprecated: Use the new materialize_source_table_kafka resource instead.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two options are still currently only possible on the top-level CREATE SOURCE statement for kafka sources -- not yet on a per-table basis. It will require a non-trivial amount more refactoring to allow them on a per-table basis so I'm unsure if we will do that work until it's requested by a customer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes! Good catch! Thank you!

@@ -53,12 +53,12 @@ resource "materialize_source_mysql" "test" {
- `comment` (String) **Public Preview** Comment on an object in the database.
- `database_name` (String) The identifier for the source database in Materialize. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `expose_progress` (Block List, Max: 1) The name of the progress collection for the source. If this is not specified, the collection will be named `<src_name>_progress`. (see [below for nested schema](#nestedblock--expose_progress))
- `ignore_columns` (List of String, Deprecated) Ignore specific columns when reading data from MySQL. Can only be updated in place when also updating a corresponding `table` attribute. Deprecated: Use the new materialize_source_table resource instead.
- `ignore_columns` (List of String, Deprecated) Ignore specific columns when reading data from MySQL. Can only be updated in place when also updating a corresponding `table` attribute. Deprecated: Use the new materialize_source_table_mysql resource instead.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi this option is also being renamed MaterializeInc/materialize#29438 but the old name will be aliased to the new one, so this shouldn't break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I will go ahead and use the exclude columns for the new table source resource!

Comment on lines 45 to 46
- `start_offset` (List of Number) Read partitions from the specified offset.
- `start_timestamp` (Number) Use the specified value to set `START OFFSET` based on the Kafka timestamp.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these aren't currently available on a per-table basis for kafka sources

- `schema_name` (String) The identifier for the source schema in Materialize. Defaults to `public`.
- `start_offset` (List of Number) Read partitions from the specified offset.
- `start_timestamp` (Number) Use the specified value to set `START OFFSET` based on the Kafka timestamp.
- `upstream_schema_name` (String) The schema of the table in the upstream database.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this refer to for kafka sources? we might just want to omit it since the upstream reference should just be the kafka topic name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was an overlook on my end in the schema for the Kafka source table resource.

Comment on lines 26 to 27
startOffset []int
startTimestamp int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two aren't used below and also aren't possible on the statement


This guide will walk you through the process of migrating your existing source table definitions to the new `materialize_source_table_{source}` resource.

For each source type (e.g., MySQL, Postgres, etc.), you will need to create a new `materialize_source_table_{source}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For each source type (e.g., MySQL, Postgres, etc.), you will need to create a new `materialize_source_table_{source}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process.
For each source type (e.g., MySQL, Postgres, etc.), you will need to create a new `materialize_source_table_{source}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process. For Kafka sources, you will need to create at least one `materialize_source_table_kafka` table to hold data for the kafka topic.

@morsapaes might have better wording for this but I think we should be clear that this migration needs to happen for sources that previously didn't have subsources too (e.g. kafka)


The same approach can be used for other source types such as Postgres, eg. `materialize_source_table_postgres`.

## Automated Migration Process (TBD)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice - this is great! We will probably want to figure out how to tell them that they will be able to coordinate the 'automated' migration process with their field-engineer representative if they go down this path

@rjobanp
Copy link

rjobanp commented Sep 17, 2024

@morsapaes @bobbyiliev let's discuss this PR at the sources & sinks meeting this week - we should decide when it makes sense to merge this - my thinking is we should do so whenever we move into private preview for the source versioning feature. But if we want to merge sooner and just have a disclaimer that the things mentioned as 'deprecated' here are not actually yet deprecated, that could work too

@bobbyiliev bobbyiliev force-pushed the source-versioning branch 3 times, most recently from 61cd55c to 9b96939 Compare September 24, 2024 09:43
@bobbyiliev
Copy link
Contributor Author

One thing that we can consider here as per this old tracking issue: #391 is take the chance and decide if we still want to rename some of the attributes in the new source table resources:

Lists For attributes that use list, we have more cases of singular than plural.

Attribute Resource Type Plural Comment
start_offset materialize_source_kafka List of Strings In Materialize the attribute is singular START OFFSET even though it is a list of strings
header materialize_source_kafka List of Strings In Materialize the attribute is singular HEADER even though it is a list of strings
text_columns materialize_source_postgres List of Strings X

Blocks We had decided should be singular. There are some blocks that use plural so this could be a good chance to rename those attributes in the new source table load gen resource:

Attribute Resource Type Plural
auction_options materialize_source_load_generator Block X
counter_options materialize_source_load_generator Block X
marketing_options materialize_source_load_generator Block X
tpch_options materialize_source_load_generator Block X
check_options materialize_webhook Block X

Copy link
Contributor

@arusahni arusahni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't let the ~51 comments scare you off -- they're mostly nits. Great job on this!!!

options = append(options, fmt.Sprintf(`FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, QualifiedName(b.format.Avro.SchemaRegistryConnection.DatabaseName, b.format.Avro.SchemaRegistryConnection.SchemaName, b.format.Avro.SchemaRegistryConnection.Name)))
}
if b.format.Avro.KeyStrategy != "" {
options = append(options, fmt.Sprintf(`KEY STRATEGY %s`, b.format.Avro.KeyStrategy))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be quoting/escaping this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we would not need to escape this is a keyword. If we were to escape it it would result in the following error on the Materialize side:

Expected one of ID or LATEST or INLINE, found string literal "INLINE"

We do however have validation in place on the terraform schema side already, so a user will only be able to specify one of the following values:

var strategy = []string{
"INLINE",
"ID",
"LATEST",
}

options = append(options, fmt.Sprintf(`KEY STRATEGY %s`, b.format.Avro.KeyStrategy))
}
if b.format.Avro.ValueStrategy != "" {
options = append(options, fmt.Sprintf(`VALUE STRATEGY %s`, b.format.Avro.ValueStrategy))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be quoting/escaping this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above:

Here we would not need to escape this is a keyword. If we were to escape it it would result in the following error on the Materialize side:

Expected one of ID or LATEST or INLINE, found string literal "INLINE"

We do however have validation in place on the terraform schema side already, so a user will only be able to specify one of the following values:

var strategy = []string{
"INLINE",
"ID",
"LATEST",
}


if b.format.Protobuf != nil {
if b.format.Protobuf.SchemaRegistryConnection.Name != "" && b.format.Protobuf.MessageName != "" {
options = append(options, fmt.Sprintf(`FORMAT PROTOBUF MESSAGE '%s' USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, b.format.Protobuf.MessageName, QualifiedName(b.format.Protobuf.SchemaRegistryConnection.DatabaseName, b.format.Protobuf.SchemaRegistryConnection.SchemaName, b.format.Protobuf.SchemaRegistryConnection.Name)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably quote the MessageName

Comment on lines 212 to 218
options = append(options, fmt.Sprintf(`FORMAT CSV WITH %d COLUMNS`, b.format.Csv.Columns))
}
if b.format.Csv.Header != nil {
options = append(options, fmt.Sprintf(`FORMAT CSV WITH HEADER ( %s )`, strings.Join(b.format.Csv.Header, ", ")))
}
if b.format.Csv.DelimitedBy != "" {
options = append(options, fmt.Sprintf(`DELIMITER '%s'`, b.format.Csv.DelimitedBy))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be quoting/escaping these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the format.Csv.Columns is of TypeInt so no need to quote it, but good catch for the other ones! 🙇


if b.keyFormat.Protobuf != nil {
if b.keyFormat.Protobuf.SchemaRegistryConnection.Name != "" && b.keyFormat.Protobuf.MessageName != "" {
options = append(options, fmt.Sprintf(`KEY FORMAT PROTOBUF MESSAGE '%s' USING CONFLUENT SCHEMA REGISTRY CONNECTION %s`, b.keyFormat.Protobuf.MessageName, QualifiedName(b.keyFormat.Protobuf.SchemaRegistryConnection.DatabaseName, b.keyFormat.Protobuf.SchemaRegistryConnection.SchemaName, b.keyFormat.Protobuf.SchemaRegistryConnection.Name)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should quote the MessageName

Comment on lines 44 to 45
Description: "Specify the tables to be included in the source. Deprecated: Use the new materialize_source_table_mysql resource instead.",
Deprecated: "Use the new materialize_source_table_mysql resource instead.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description: "Specify the tables to be included in the source. Deprecated: Use the new materialize_source_table_mysql resource instead.",
Deprecated: "Use the new materialize_source_table_mysql resource instead.",
Description: "Specify the tables to be included in the source. Deprecated: Use the new `materialize_source_table_mysql` resource instead.",
Deprecated: "Use the new `materialize_source_table_mysql` resource instead.",

@@ -76,6 +79,13 @@ var sourceMySQLSchema = map[string]*schema.Schema{
},
},
},
"all_tables": {
Description: "Include all tables in the source. If `table` is specified, this will be ignored.",
Deprecated: "Use the new materialize_source_table_mysql resource instead.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Deprecated: "Use the new materialize_source_table_mysql resource instead.",
Deprecated: "Use the new `materialize_source_table_mysql` resource instead.",

Comment on lines 36 to 37
Description: "Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute. Deprecated: Use the new materialize_source_table_postgres resource instead.",
Deprecated: "Use the new materialize_source_table_postgres resource instead.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description: "Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute. Deprecated: Use the new materialize_source_table_postgres resource instead.",
Deprecated: "Use the new materialize_source_table_postgres resource instead.",
Description: "Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute. Deprecated: Use the new `materialize_source_table_postgres` resource instead.",
Deprecated: "Use the new `materialize_source_table_postgres` resource instead.",

Comment on lines 43 to 44
Description: "Creates subsources for specific tables in the Postgres connection. Deprecated: Use the new materialize_source_table_postgres resource instead.",
Deprecated: "Use the new materialize_source_table_postgres resource instead.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description: "Creates subsources for specific tables in the Postgres connection. Deprecated: Use the new materialize_source_table_postgres resource instead.",
Deprecated: "Use the new materialize_source_table_postgres resource instead.",
Description: "Creates subsources for specific tables in the Postgres connection. Deprecated: Use the new `materialize_source_table_postgres` resource instead.",
Deprecated: "Use the new `materialize_source_table_postgres` resource instead.",

ForceNew: true,
},
"exclude_columns": {
Description: "Exclude specific columns when reading data from MySQL. The option used to be called `ignore_columns`.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description: "Exclude specific columns when reading data from MySQL. The option used to be called `ignore_columns`.",
Description: "Exclude specific columns when reading data from MySQL. This option used to be called `ignore_columns`.",

Copy link

@rjobanp rjobanp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks solid! Just a few minor comments on some things that have been updated, and one about the migration guide

# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "materialize_source_reference Data Source - terraform-provider-materialize"
subcategory: ""
description: |-
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we populate this to explain that these are 'available' source references? Such that these expose all the possible upstream references that this source can create a table for, not necessarily all the references it is already ingesting


This guide will walk you through the process of migrating your existing source table definitions to the new `materialize_source_table_{source_type}` resource.

For each source type (e.g., MySQL, Postgres, etc.), you will need to create a new `materialize_source_table_{source_type}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process. For Kafka sources, you will need to create at least one `materialize_source_table_kafka` table to hold data for the kafka topic.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For each source type (e.g., MySQL, Postgres, etc.), you will need to create a new `materialize_source_table_{source_type}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process. For Kafka sources, you will need to create at least one `materialize_source_table_kafka` table to hold data for the kafka topic.
For each MySQL and Postgres source, you will need to create a new `materialize_source_table_{source_type}` resource for each table that was previously defined within the source resource. This ensures that the tables are preserved during the migration process. For Kafka sources, you will need to create a `materialize_source_table_kafka` table with the same name as the kafka source to contain the data for the kafka topic.


In previous versions of the Materialize Terraform provider, source tables were defined within the source resource itself and were considered subsources of the source rather than separate entities.

This guide will walk you through the process of migrating your existing source table definitions to the new `materialize_source_table_{source_type}` resource.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused on whether this guide is meant to explain how to migrate using just the manual process (which is outlined below and looks correct), or also to explain how to reconcile your terraform configuration with the results of the 'automatic' migration process that we would do for a customer with a catalog migration. Specifically, the paragraph below sounds like it is explaining what to do to handle the automatic migration process, but it doesn't explain that the manual process would then be unnecessary.

It might be simpler if this guide just explained the manual process and we had a separate one to explain the 'automatic' process. Or we could just assume any terraform user would only do the manual process. @morsapaes thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I've removed some of the confusing text so that this should focus on the manual migration!

Happy to work on a follow up migration guide for the automated migration if we decide to do that!


## Future Improvements

The Kafka and Webhooks sources are currently being implemented. Once these changes, the migration process will be updated to include them.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The Kafka and Webhooks sources are currently being implemented. Once these changes, the migration process will be updated to include them.
Webhooks sources have not yet been migrated to the new model. Once this changes, the migration process will be updated to include them.


- `name` (String) The identifier for the source table.
- `source` (Block List, Min: 1, Max: 1) The source this table is created from. (see [below for nested schema](#nestedblock--source))
- `topic` (String) The name of the Kafka topic in the Kafka cluster.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually no longer necessary to include for Kafka sources -- if the (REFERENCE ..) option in the statement is omitted it will still work since there is only one possible kafka topic that the table can reference

p := map[string]string{
"sr.source_id": sourceId,
}
q := sourceReferenceQuery.QueryPredicate(p)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a bonus, but there is a PR to implement a REFRESH SOURCE REFERENCES <source> statement here MaterializeInc/materialize#29923 and we could automatically run that before listing the references which would mean we always get the most 'up to date' view of them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just implemented this!

func refreshSourceReferences(conn *sqlx.DB, sourceName, schemaName, databaseName string) error {
query := fmt.Sprintf(`ALTER SOURCE %s REFRESH REFERENCES`, QualifiedName(databaseName, schemaName, sourceName))
_, err := conn.Exec(query)
return err
}

Comment on lines 159 to 166
q.WriteString(` (REFERENCE `)

if b.upstreamSchemaName != "" {
q.WriteString(fmt.Sprintf(`%s.`, QuoteIdentifier(b.upstreamSchemaName)))
}
q.WriteString(QuoteIdentifier(b.upstreamName))

q.WriteString(")")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentioned above that this can be made optional for kafka and single-output load generator sources

mz_sources.name AS source_name,
source_schemas.name AS source_schema_name,
source_databases.name AS source_database_name,
mz_kafka_source_tables.topic AS upstream_table_name,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR open now to add the envelope, key_format and value_format columns to this table MaterializeInc/materialize#30076

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! I created a tracking issue for this for the moment: #665

Should be ok to handle in a follow up PR later on!

@bobbyiliev bobbyiliev force-pushed the source-versioning branch 2 times, most recently from 36cd7d1 to 147606a Compare October 28, 2024 19:50
Copy link
Member

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really just skimmed the entire PR but I think it all looks good! Biggest feedback would be following up on quoting identifiers that Aru commented on, but if there aren't any concerns there I would say we're good to go!

resource.TestCheckResourceAttr("materialize_source_table_kafka.test_kafka", "name", nameSpace+"_table_kafka"),
resource.TestCheckResourceAttr("materialize_source_table_kafka.test_kafka", "database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_source_table_kafka.test_kafka", "schema_name", "public"),
// resource.TestCheckResourceAttr("materialize_source_table_kafka.test_kafka", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s_table_kafka"`, nameSpace)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left behind?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants