From f6666ff91af284f4f8bfb66a0349a813b9c604d2 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Sat, 30 Mar 2024 08:47:04 +0100 Subject: [PATCH 1/2] Fix potentially incorrect table read version for writes on Delta Lake In the context of concurrent operations, it may potentially happen that between the time when the version of table has been initially read for scanning and the time when the planning of the write operation begins that a new version of the table is current. Nevertheless, use in performing the write operations the version of the table initially read as reference for beginning the operation to ensure the consistency of the operation. This strategy helps in unintentionally skipping the processing of transaction log information corresponding to other concurrent operations. --- .../plugin/deltalake/DeltaLakeMetadata.java | 28 ++++++++----------- .../TestDeltaLakeFileOperations.java | 2 +- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 53be2714dc1f..72c17a04e637 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1741,26 +1741,20 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto // This check acts as a safeguard in cases where the input columns may differ from the table metadata case-sensitively checkAllColumnsPassedOnInsert(tableMetadata, inputColumns); - return createInsertHandle(session, retryMode, table, inputColumns); + return createInsertHandle(retryMode, table, inputColumns); } - private DeltaLakeInsertTableHandle createInsertHandle(ConnectorSession session, RetryMode retryMode, DeltaLakeTableHandle table, List inputColumns) + private DeltaLakeInsertTableHandle createInsertHandle(RetryMode retryMode, DeltaLakeTableHandle table, List inputColumns) { String tableLocation = table.getLocation(); - try { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - return new DeltaLakeInsertTableHandle( - table.getSchemaTableName(), - tableLocation, - table.getMetadataEntry(), - table.getProtocolEntry(), - inputColumns, - getMandatoryCurrentVersion(fileSystem, tableLocation, table.getReadVersion()), - retryMode != NO_RETRIES); - } - catch (IOException e) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, e); - } + return new DeltaLakeInsertTableHandle( + table.getSchemaTableName(), + tableLocation, + table.getMetadataEntry(), + table.getProtocolEntry(), + inputColumns, + table.getReadVersion(), + retryMode != NO_RETRIES); } private void checkAllColumnsPassedOnInsert(ConnectorTableMetadata tableMetadata, List insertColumns) @@ -2046,7 +2040,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT .filter(column -> column.getColumnType() != SYNTHESIZED) .collect(toImmutableList()); - DeltaLakeInsertTableHandle insertHandle = createInsertHandle(session, retryMode, handle, inputColumns); + DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); return new DeltaLakeMergeTableHandle(handle, insertHandle); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index a1b68dded2ef..b62385b281c0 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -447,7 +447,7 @@ public void testDeleteWithNonPartitionFilter() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "key=domain1/", "InputFile.newInput"), 2) From 1941f3f496604a2a5c9caac8cbfee85357b9953f Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Sun, 31 Mar 2024 08:02:03 +0200 Subject: [PATCH 2/2] Verify whether `is_blind_append` is calculated accurately Without the fix in the previous commit, in the concurrency tests for the non-blind INSERT operations it happened sometime that the source table handles processed in `finishInsert` operation were wrongly skipped because they were considered time travel references of the target table which led to the situation in which the INSERT operation was inaccurately considered blind. --- .../BaseDeltaLakeConnectorSmokeTest.java | 16 +++--- ...estDeltaLakeLocalConcurrentWritesTest.java | 54 +++++++++---------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 9044ebf527cf..8bfc959ee905 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -2406,11 +2406,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -2458,13 +2458,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts() "SELECT * FROM " + tableName, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', true) """); } finally { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 5be0a4cbc0af..a2acf10cbdf7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -153,13 +153,13 @@ private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitio .forEach(MoreFutures::getDone); assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (21, 30)"); - assertQuery("SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + (0, 'CREATE TABLE', 'WriteSerializable', 0, true), + (1, 'WRITE', 'WriteSerializable', 0, true), + (2, 'WRITE', 'WriteSerializable', 1, true), + (3, 'WRITE', 'WriteSerializable', 2, true) """); } finally { @@ -233,11 +233,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -303,11 +303,11 @@ public void testConcurrentInsertsSelectingFromTheSamePartition() .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -356,13 +356,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts() "SELECT * FROM " + tableName, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', true) """); } finally { @@ -409,13 +409,13 @@ public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable() "SELECT * FROM " + tableName, "VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false) """); } finally { @@ -472,14 +472,14 @@ public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOf (55, 60), (56,60), (57, 60), (58,60) """); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2), - (4, 'WRITE', 'WriteSerializable', 3) + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true), + (1, 'WRITE', 'WriteSerializable', 0, true), + (2, 'WRITE', 'WriteSerializable', 1, false), + (3, 'WRITE', 'WriteSerializable', 2, false), + (4, 'WRITE', 'WriteSerializable', 3, false) """); } finally {