Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix struct order for schema updates when using upsert/delete mode #368

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

jurgispods
Copy link

When using the connector in upsert/delete mode, it can fail under certain circumstances when the schema is updated in such a way that the intermediate table and the destination table have differently ordered nested struct fields.

Example scenario

Schema version 1

Assume the Kafka source topic has the following Avro schema (version 1):

{
   "type":"record",
   "name":"Message",
   "fields":[
      {
         "name":"data",
         "type":{
            "type":"record",
            "name":"Data",
            "fields":[
               {
                  "name":"minAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"name",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               }
            ]
         }
      }
   ]
}

The corresponding Bigquery destination table schema:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      }
    ],
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

Schema version 2

Now, the source table schema is updated to version 2:

{
   "type":"record",
   "name":"Message",
   "fields":[
      {
         "name":"data",
         "type":{
            "type":"record",
            "name":"Data",
            "fields":[
               {
                  "name":"minAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"maxAmount",
                  "type":[
                     "null",
                     "int"
                  ],
                  "default":null
               },
               {
                  "name":"name",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               }
            ]
         }
      }
   ]
}

The problem now is that the Bigquery schemas of the intermediate and destination tables will have different orders of nested fields.

Bigquery schema of the intermediate table after creation:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "maxAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      },
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

Updated Bigquery destination table schema - note that the new field maxAmount is appended at the end:

[  
  {
    "fields": [
      {
        "name": "minAmount",
        "type": "INTEGER"
      },
      {
        "name": "name",
        "type": "STRING"
      },
      {
        "name": "maxAmount",
        "type": "INTEGER"
      }
    ],
    "name": "data",
    "type": "RECORD"
  }
  // additional fields for Kafka key, Kafka data
]

The connector will subsequently fail during the periodic merge flush:

Value of type STRUCT<minAmount INT64, maxAmount INT64, name STRING> cannot be assigned to dstTableAlias.data, which has type STRUCT<minAmount INT64, name STRING, maxAmount INT64>

This can be easily seen by looking at the executed MERGE queries.

Comparison of executed MERGE queries

This query will fail due to different orders of nested fields:

BEGIN
    CREATE TEMP TABLE _SESSION.destination AS (        
        select 'foo' as key, struct(1 as minAmount, 'v1' as name, null as maxAmount) as data
    );
    CREATE TEMP TABLE _SESSION.intermediate AS (        
        select 'foo' as key, struct(1 as minAmount, 10 as maxAmount, 'v2' as name) as data
    );
   
MERGE _SESSION.destination dstTableAlias 
USING _SESSION.intermediate src 
ON dstTableAlias.key=src.key 
WHEN MATCHED THEN UPDATE SET dstTableAlias.data=src.data
WHEN NOT MATCHED THEN INSERT (`key`,`data`) VALUES (src.key, src.data);

select * from _SESSION.destination;

END

In contrast, this query succeeds:

BEGIN
    CREATE TEMP TABLE _SESSION.destination AS (        
        select 'foo' as key, struct(1 as minAmount, 'v1' as name, null as maxAmount) as data
    );
    CREATE TEMP TABLE _SESSION.intermediate_reordered AS (
        select 'foo' as key, struct(1 as minAmount, 'v2' as name, 10 as maxAmount) as data
    );
   
MERGE _SESSION.destination dstTableAlias 
USING _SESSION.intermediate_reordered src 
ON dstTableAlias.key=src.key 
WHEN MATCHED THEN UPDATE SET dstTableAlias.data=src.data
WHEN NOT MATCHED THEN INSERT (`key`,`data`) VALUES (src.key, src.data);

select * from _SESSION.destination;

END

We can see that for upserts, the order of struct fields matters.

Proposed changes

In this PR, I have added the destination table schema to the list returned by SchemaManager.getSchemasList when it is called for an intermediate table in upsert/merge mode. That way, the intermediate table schema is forced to respect the order of nested fields in the destination table schema - schema updates are simply applied on top of it, ensuring the same field order in both tables when new fields are added.

Please let me know what you think of this approach.

@jurgispods jurgispods requested a review from a team as a code owner December 1, 2023 13:43
Copy link

cla-assistant bot commented Dec 1, 2023

CLA assistant check
All committers have signed the CLA.

@b-goyal
Copy link
Member

b-goyal commented Dec 5, 2023

Thanks @jurgispods for taking time to write the detailed example and explaining the issue. I will take sometime to go over the changes and get back in 2 weeks time. Meanwhile, would you be able to add an integration test for this change please.

@jurgispods
Copy link
Author

@b-goyal Sure, I can add an integration test.

@jurgispods
Copy link
Author

@b-goyal I just added an integration test that reproduces the issue (and the fix).

I found out it only shows under certain circumstances, i.e. when a schema update happens after the intermediate table has been deleted. Otherwise, schemas of destination and intermediate tables are always in sync, as they are updated using the same logic.

As far as I can seen, deletion of intermediate table only happens when the connector is stopped. So in order to replicate the error, I had to write an IT test that is quite involved:

  1. Create records + table for record with schema v1
  2. Stop the connector to force deletion of intermediate tables
  3. Start the connector again and feed in records with updated schema v2 -> exception
  4. Apply fix, stop + start connector again -> success

In order to show that the connector indeed fails, I added a config for toggling my fix on or off. That might not be necessary in the final PR, as in reality, it should always be on. We could instead test that with a unit test and remove the added config.

@b-goyal
Copy link
Member

b-goyal commented Dec 20, 2023

Thanks for adding the integration test @jurgispods.
Had an initial look but did not follow the root cause and resolution. Will need some more time to review this.

@jurgispods
Copy link
Author

Hi @b-goyal, is there an update on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants