diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 53be2714dc1f..72c17a04e637 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1741,26 +1741,20 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto // This check acts as a safeguard in cases where the input columns may differ from the table metadata case-sensitively checkAllColumnsPassedOnInsert(tableMetadata, inputColumns); - return createInsertHandle(session, retryMode, table, inputColumns); + return createInsertHandle(retryMode, table, inputColumns); } - private DeltaLakeInsertTableHandle createInsertHandle(ConnectorSession session, RetryMode retryMode, DeltaLakeTableHandle table, List inputColumns) + private DeltaLakeInsertTableHandle createInsertHandle(RetryMode retryMode, DeltaLakeTableHandle table, List inputColumns) { String tableLocation = table.getLocation(); - try { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - return new DeltaLakeInsertTableHandle( - table.getSchemaTableName(), - tableLocation, - table.getMetadataEntry(), - table.getProtocolEntry(), - inputColumns, - getMandatoryCurrentVersion(fileSystem, tableLocation, table.getReadVersion()), - retryMode != NO_RETRIES); - } - catch (IOException e) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, e); - } + return new DeltaLakeInsertTableHandle( + table.getSchemaTableName(), + tableLocation, + table.getMetadataEntry(), + table.getProtocolEntry(), + inputColumns, + table.getReadVersion(), + retryMode != NO_RETRIES); } private void checkAllColumnsPassedOnInsert(ConnectorTableMetadata tableMetadata, List insertColumns) @@ -2046,7 +2040,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT .filter(column -> column.getColumnType() != SYNTHESIZED) .collect(toImmutableList()); - DeltaLakeInsertTableHandle insertHandle = createInsertHandle(session, retryMode, handle, inputColumns); + DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); return new DeltaLakeMergeTableHandle(handle, insertHandle); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 9044ebf527cf..8bfc959ee905 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -2406,11 +2406,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -2458,13 +2458,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts() "SELECT * FROM " + tableName, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', true) """); } finally { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index a1b68dded2ef..b62385b281c0 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -447,7 +447,7 @@ public void testDeleteWithNonPartitionFilter() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.newStream")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.newStream")) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists"), 2) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.exists")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "OutputFile.createOrOverwrite")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "key=domain1/", "InputFile.newInput"), 2) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java index 5be0a4cbc0af..a2acf10cbdf7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.java @@ -153,13 +153,13 @@ private void testConcurrentInsertsReconciliationForBlindInserts(boolean partitio .forEach(MoreFutures::getDone); assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (1, 10), (11, 20), (21, 30)"); - assertQuery("SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + assertQuery("SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + (0, 'CREATE TABLE', 'WriteSerializable', 0, true), + (1, 'WRITE', 'WriteSerializable', 0, true), + (2, 'WRITE', 'WriteSerializable', 1, true), + (3, 'WRITE', 'WriteSerializable', 2, true) """); } finally { @@ -233,11 +233,11 @@ private void testConcurrentInsertsSelectingFromTheSameTable(boolean partitioned) .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -303,11 +303,11 @@ public void testConcurrentInsertsSelectingFromTheSamePartition() .map("(%d, 10)"::formatted) .collect(joining(", ", ", ", ""))); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", - "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0)" + + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", + "VALUES (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true)" + LongStream.rangeClosed(1, successfulInsertsCount) .boxed() - .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s)".formatted(version, version - 1)) + .map(version -> "(%s, 'WRITE', 'WriteSerializable', %s, false)".formatted(version, version - 1)) .collect(joining(", ", ", ", ""))); } finally { @@ -356,13 +356,13 @@ public void testConcurrentInsertsReconciliationForMixedInserts() "SELECT * FROM " + tableName, "VALUES (0, 10), (1, 10), (11, 20), (1, 20), (22, 30)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', true) """); } finally { @@ -409,13 +409,13 @@ public void testConcurrentInsertsSelectingFromDifferentPartitionsOfSameTable() "SELECT * FROM " + tableName, "VALUES (0, 10), (11, 20), (22, 30), (1, 40), (13, 40), (25, 40)"); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT operation, isolation_level, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2) + ('CREATE TABLE AS SELECT', 'WriteSerializable', true), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false), + ('WRITE', 'WriteSerializable', false) """); } finally { @@ -472,14 +472,14 @@ public void testConcurrentInsertsSelectingFromMultipleNonoverlappingPartitionsOf (55, 60), (56,60), (57, 60), (58,60) """); assertQuery( - "SELECT version, operation, isolation_level, read_version FROM \"" + tableName + "$history\"", + "SELECT version, operation, isolation_level, read_version, is_blind_append FROM \"" + tableName + "$history\"", """ VALUES - (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0), - (1, 'WRITE', 'WriteSerializable', 0), - (2, 'WRITE', 'WriteSerializable', 1), - (3, 'WRITE', 'WriteSerializable', 2), - (4, 'WRITE', 'WriteSerializable', 3) + (0, 'CREATE TABLE AS SELECT', 'WriteSerializable', 0, true), + (1, 'WRITE', 'WriteSerializable', 0, true), + (2, 'WRITE', 'WriteSerializable', 1, false), + (3, 'WRITE', 'WriteSerializable', 2, false), + (4, 'WRITE', 'WriteSerializable', 3, false) """); } finally {