Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DML operations on Delta Lake tables with id column mapping #16600

Merged
merged 1 commit into from
May 15, 2023

Conversation

findinpath
Copy link
Contributor

@findinpath findinpath commented Mar 16, 2023

Description

This PR is a follow-up of #16183 and relates to #12638

Additional context and related issues

https://books.japila.pl/delta-lake-internals/DeltaConfigs/#COLUMN_MAPPING_MODE

id A column ID is the identifier of a column. This mode is used for tables converted from Iceberg and parquet files in this mode will also have corresponding field Ids for each column in their file schema.

I intended to test this column mapping mode for tables migrated from Iceberg and used

https://docs.databricks.com/sql/language-manual/delta-convert-to-delta.html

Apply CLONE on an Iceberg table created by Trino was causing internal error issues on Databricks, so I used Spark with Iceberg to create the table on AWS and then applied CLONE via Databricks.

CREATE OR REPLACE TABLE default.findinpath_table_migrated CLONE iceberg.`s3://trino-bucket/iceberg-warehouse/findinpath.db/findinpath_table/metadata/00001-0b3c1ecd-e5b9-479f-9ddd-e286b9efd472.metadata.json`

Outcome was a table with table writer version 7 , which can't be written by Trino. :(

Nevertheless, relevant info - the Parquet files written via Iceberg look like:

➜  ~ parquet-tools schema  /Users/marius/Downloads/00001-3-c0e1f76e-1234-4de8-9348-b18789e0720f-00001.parquet 
message table {
  required binary c (STRING) = 1;
}

The new Parquet schema for the files written by Databricks (on INSERT statements after the CLONE statement) look like

➜  ~ parquet-tools schema /Users/marius/Downloads/part-00000-cdd011ba-6032-4012-9cb7-364ca60a5b1f-c000.snappy\ \(1\).parquet
message spark_schema {
  optional binary col-34630ee4-4dbf-4916-a464-5f13f9d3ff46 (STRING) = 1;
}

The parquet files produced by Trino with the column mapping mode set to id are also consistent with this schema.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Delta Lake
* Support DML operations on Delta Lake tables with `id` column mapping ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Mar 16, 2023
@github-actions github-actions bot added delta-lake Delta Lake connector tests:hive labels Mar 16, 2023
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 357f148 to f09f502 Compare March 17, 2023 05:25
@findinpath findinpath self-assigned this Mar 17, 2023
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 6db5fe5 to 4e6dae7 Compare March 17, 2023 11:20
@findinpath findinpath requested a review from ebyhr March 20, 2023 12:10
@findinpath findinpath marked this pull request as ready for review March 20, 2023 12:10
@findepi
Copy link
Member

findepi commented Mar 20, 2023

/test-with-secrets sha=4e6dae7c622f3fffa809c8cce7fff6c587334505

import static org.apache.parquet.schema.Type.Repetition.REQUIRED;

public final class DeltaLakeMetadataSchemas
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka this class is a stripped version of ParquetSchemaConverter used in order to cope with the need to create Parquet schemas for Delta Lake column handles with nested types (the id information is the Delta Lake metadata entry schema items)

@github-actions
Copy link

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/4468845163

OptionalInt id,
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode,
List<String> parent,
BiConsumer<List<String>, Type> primitiveTypesConsumer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass in the builder instead of BiConsumer ? Not sure why we would need a more generic parameter here.

@ebyhr ebyhr mentioned this pull request Mar 24, 2023
10 tasks
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 4e6dae7 to b7c07a0 Compare March 28, 2023 09:14
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from b7c07a0 to 8f00561 Compare March 28, 2023 09:43
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch 3 times, most recently from f00d515 to 4c24030 Compare April 18, 2023 10:16
@findinpath
Copy link
Contributor Author

@ebyhr could you please run this PR with secrets?

@findinpath findinpath requested a review from pajaks April 18, 2023 11:36
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 4c24030 to 11686fc Compare April 18, 2023 13:21
}
case "timestamp" -> {
// Spark/DeltaLake stores timestamps in UTC, but renders them in session time zone.
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this link still accessible? I can't access the page. They use https://linen.delta.io/ to archive Slack messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken it from here

// Spark/DeltaLake stores timestamps in UTC, but renders them in session time zone.
// For more info, see https://delta-users.slack.com/archives/GKTUWT03T/p1585760533005400
// and https://cwiki.apache.org/confluence/display/Hive/Different+TIMESTAMP+types

Should I remove it from there as well?

Comment on lines 1542 to 1547
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
List<String> partitionColumns = switch (columnMappingMode) {
case NAME, ID -> getPartitionColumnsForNameOrIdMapping(handle.getMetadataEntry().getOriginalPartitionColumns(), mergeHandle.getInsertTableHandle().getInputColumns());
case NONE -> handle.getMetadataEntry().getOriginalPartitionColumns();
case UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported column mapping mode");
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of code is very similar to code on line 1370. Can we extract it to function?

@findinpath
Copy link
Contributor Author

findinpath commented Apr 19, 2023

suite-delta-lake-oss shows several failures which are caused by the fact that the DeltaLakeMergeSink uses for writing to Parquet the parquet schema corresponding to the metadata of the table, but it omits adding CDF related column _change_type (see io.trino.plugin.deltalake.DeltaLakeCdfPageSink#addSpecialColumns).

I'll need to rethink a bit the parquet schema conversion for Delta Lake.

In any case hive's ParquetSchemaConverter builds on Trino Type (which doesn't include any id metadata) so probably a Delta Lake specific alternative needs to be considered.

cc @ebyhr @pajaks

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch 2 times, most recently from c608b7b to 723ee73 Compare April 20, 2023 05:26
@findinpath
Copy link
Contributor Author

findinpath commented Apr 20, 2023

Regarding #16600 (comment) , I created the method io.trino.plugin.deltalake.DeltaLakeCdfPageSink#adapt to add the _change_type column to the Parquet schema of the data table for the Parquet output schema for CDF files.

It feels to me like a patch method, but I didn't find a better alternative that doesn't involve more refactorings.
Another option would have been to have a similar hierarchy of TypeParquetMetadata (PrimitiveTypeParquetMetadata, RowTypeParquetMetadata, ArrayTypeParquetMetadata, MapTypeParquetMetadata) to get from DeltaLakeSchemaSupport into DeltaLakeColumnMetadata and DeltaLakeColumnHandle so that it can be passed to ParquetSchemaConverter and used for making use of id, physicalName.

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 723ee73 to 112dd5f Compare April 20, 2023 07:27
@findinpath findinpath requested a review from ebyhr April 20, 2023 07:28
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 112dd5f to 55e2d90 Compare April 20, 2023 08:07
@findinpath
Copy link
Contributor Author

Given that adding _change_type column is a rather exceptional use-case, I've decided to add handling for it in the method
io.trino.plugin.deltalake.DeltaLakeParquetSchemas#createParquetSchemaMapping(io.trino.plugin.deltalake.transactionlog.MetadataEntry, io.trino.spi.type.TypeManager, boolean).

I think the PR is now ready for review.

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch 2 times, most recently from 613a0a7 to 01cc453 Compare April 24, 2023 09:33
@findinpath
Copy link
Contributor Author

Rebased on master to cope with code conflicts.

@@ -1778,7 +1791,7 @@ private void checkWriteSupported(ConnectorSession session, SchemaTableName schem
checkSupportedWriterVersion(session, schemaTableName);
checkUnsupportedGeneratedColumns(metadataEntry);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME)) {
if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME || columnMappingMode == ColumnMappingMode.ID)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add static import for NAME and ID similar to NONE?

assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_string = 'test'"))
.hasMessageContaining("Writing with column mapping id is not supported");
}
assertThat(onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'one'), (2, 'two')"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As test is named testUnsupportedOperationsColumnMappingMode can we have separate test for write operation and test only unsupported here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The DML operations here are actually irrelevant now.
I'll remove them.

@@ -840,33 +960,17 @@ public void testUnsupportedColumnMappingModeChangeDataFeed(String mode)
onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (3, 'nation3', 300)");

// Column mapping mode 'none' is tested in TestDeltaLakeDatabricksChangeDataFeedCompatibility
// TODO: Remove these failure check and update TestDeltaLakeDatabricksChangeDataFeedCompatibility when adding support the column mapping mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO still valid right?

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 01cc453 to cdd8150 Compare April 27, 2023 13:08
Copy link
Contributor Author

@findinpath findinpath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AC

assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_string = 'test'"))
.hasMessageContaining("Writing with column mapping id is not supported");
}
assertThat(onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'one'), (2, 'two')"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The DML operations here are actually irrelevant now.
I'll remove them.

@findinpath
Copy link
Contributor Author

Rebasing on master to address conflicts.

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from cdd8150 to b734bdf Compare April 27, 2023 14:01
@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from b734bdf to 1e886c9 Compare May 9, 2023 10:41
@findinpath
Copy link
Contributor Author

findinpath commented May 9, 2023

Rebased again on master to address conflicts.
Changed io.trino.plugin.deltalake.DeltaLakePageSinkProvider#createPageSink(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorOutputTableHandle, io.trino.spi.connector.ConnectorPageSinkId) to referenced DeltaLakeColumnHandle#getColumnName (instead of DeltaLakeColumnHandle#getName as consequence of 1b9b988

@findinpath findinpath requested review from ebyhr and findepi May 9, 2023 10:44
@findepi
Copy link
Member

findepi commented May 10, 2023

/test-with-secrets sha=1e886c943c9b3f9f4649cb1da2df9addf0f386a3

@findinpath
Copy link
Contributor Author

Rebasing on top of master (no conflicts met) to ensure that the functionality still behaves as expected with the latest updates.

@findinpath findinpath force-pushed the findinpath/delta-column-mapping-id branch from 1e886c9 to e23f4b6 Compare May 13, 2023 07:50
@ebyhr
Copy link
Member

ebyhr commented May 14, 2023

/test-with-secrets sha=e23f4b626ac58f8a31c07068fc6ba49c774e9a24

https://github.com/trinodb/trino/actions/runs/4975047889

@ebyhr ebyhr merged commit 39d82f9 into trinodb:master May 15, 2023
@ebyhr ebyhr mentioned this pull request May 15, 2023
@github-actions github-actions bot added this to the 418 milestone May 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector
Development

Successfully merging this pull request may close these issues.

5 participants