Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming without ENABLE_SCHEMA_EVOLUTION; postgres JSON mapped to snowflake VARIANT #536

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

acristu
Copy link

@acristu acristu commented Feb 6, 2023

Hi,

We needed to map postgresql JSON data type to snowflake VARIANT and we will require some other tweaking of the schema evolution logic in the future so we propose an approach in which Snowpipe Streaming is used but schema evolution is still done using ALTER TABLE statements via jdbc. Will this approach be supported in the future?

This is a draft pull req with the following changes:

  • New config parameter snowflake.schematization.auto, true by default, only relevant if "snowflake.enable.schematization": "true"; when false, the connector will not try to set ENABLE_SCHEMA_EVOLUTION
  • added postgresql JSON to VARIANT in SchematizationUtils.convertToSnowflakeType; we can make this configurable/extensible with other mappings, this is just a draft specific to the debezium semantic type naming

@acristu acristu marked this pull request as ready for review February 6, 2023 17:46
@acristu
Copy link
Author

acristu commented Feb 8, 2023

Testing postgresql source (debezium) -> snowflake sink, every time a new column is added to the source postgresql table the snowflake sink task crashes with:

java.lang.IllegalStateException: No current assignment for partition tsttbl-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:387)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1604)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:625)
. . . .

The topic has one partition, "tasks.max": "1", one connect worker ... troubleshooting ongoing ... if anyone has seen this before or have any hints please share.

Comment on lines +154 to +166
if (autoSchematization) {
// Enable schema evolution by default if the table is created by the connector
String enableSchemaEvolutionQuery =
"alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true";
try {
PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery);
stmt.setString(1, tableName);
stmt.executeQuery();
} catch (SQLException e) {
// Skip the error given that schema evolution is still under PrPr
LOG_WARN_MSG(
"Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work because we rely on the schema evolution to create the table with the correct schema, if you don't want schema evolution on the table, you should create the table yourself and it will have schema evolution turned off by default

Copy link
Author

@acristu acristu Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the reply, the schema is evolved here: https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L653. The main point of this draft PR is to assess if "manual schema evolution" from SchematizationUtils.evolveSchemaIfNeeded can be supported as an alternative to "automatic schema evolution". Our main concern is that that "manual schema evolution" is currently done on the error path, if the insertRow fails.

We cand contribute this "manual schema evolution" option, when setting snowflake.schematization.auto=false the connector would use the connect schema from the schema registry or embedded into the records to evolve the snowflake schema before inserting the data. But we wanted to see first if this approach is acceptable for you going forward.

Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our main concern is that that "manual schema evolution" is currently done on the error path, if the insertRow fails.

This is by design, insertRow will always fail first because we just create a table with RECORD_METADATA only during connector start up

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that should not be a problem, I mean schema changes should not happen often. The problem is that without these proposed changes https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L264, we cannot enable schema evolution without setting ENABLE_SCHEMA_EVOLUTION. We would like to use only SchematizationUtils.evolveSchemaIfNeeded and not ENABLE_SCHEMA_EVOLUTION.

@@ -209,6 +209,9 @@ private static String convertToSnowflakeType(Type kafkaType) {
case BOOLEAN:
return "BOOLEAN";
case STRING:
if (semanticType != null && semanticType.equals("io.debezium.data.Json")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the issue of using VARCHAR in this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using varchar you'd have to parse json each time you wanted to pull out a field. With variant can use json.field. So this will skip extra processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, this makes sense, I guess this is a general issue for json value since they will be all mapped to STRING, I will see what we can do here, thanks!

@acristu
Copy link
Author

acristu commented Feb 10, 2023

Testing postgresql source (debezium) -> snowflake sink, every time a new column is added to the source postgresql table the snowflake sink task crashes with:

java.lang.IllegalStateException: No current assignment for partition tsttbl-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:387)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1604)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:625)
. . . .

The topic has one partition, "tasks.max": "1", one connect worker ... troubleshooting ongoing ... if anyone has seen this before or have any hints please share.

This was because of a configuration we used:

    "transforms": "changeTopicName",
    "transforms.changeTopicName.regex": "^[^.]\\w+.\\w+.(.*)",
    "transforms.changeTopicName.replacement": "$1",
    "transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",

Because the schema evolution is applied in case of error, a consumer.seek is needed to the previous offsets, using the wrong topic name causes the exception ... removed the above SMT and things work well.

@sfc-gh-tzhang sfc-gh-tzhang self-assigned this Feb 24, 2023
Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick update:

  • The wildcard support is a good one, and we will look into it separately
  • Using the semanticType to support more data type won't work in general, but looks like there is a doc field in ConnectSchema which we could use it for hint
  • We don't plan to support auto schema evolution for existing tables because it's a one time thing and we want customer to do that manually to make sure this is something they want. It will be automatically enabled for tables created by KC

@acristu
Copy link
Author

acristu commented Mar 24, 2023

Hi @sfc-gh-tzhang ,

Thank you for the update.
It is difficult to use the connector without regex support for topic-to-table mappings and JSON/date/timestamp support in schema evolution from all debezium sources, for now using the workarounds in this PR.

@sfc-gh-achyzy
Copy link
Contributor

Hi @acristu. Do you need more help with this PR, or can it be closed now that you've solved the issue with a workaround?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants