From 0e69a72718214a1d8c91071bc3b7c866db790444 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Apr 2023 13:42:03 +0200 Subject: [PATCH] Remove table handle retriesEnabled field The table handle describes reads and retries do not matter for reads. --- .../plugin/deltalake/DeltaLakeMetadata.java | 11 +++---- .../deltalake/DeltaLakeSplitManager.java | 2 +- .../deltalake/DeltaLakeTableHandle.java | 29 +++++-------------- .../deltalake/TestDeltaLakeMetadata.java | 3 +- .../deltalake/TestDeltaLakeSplitManager.java | 3 +- .../deltalake/TestTransactionLogAccess.java | 3 +- .../TestDeltaLakeMetastoreStatistics.java | 12 +++----- .../trino/plugin/iceberg/IcebergMetadata.java | 13 ++++----- .../plugin/iceberg/IcebergTableHandle.java | 18 +----------- ...stIcebergNodeLocalDynamicSplitPruning.java | 2 -- .../iceberg/TestIcebergSplitSource.java | 2 -- ...TestConnectorPushdownRulesWithIceberg.java | 5 ---- 12 files changed, 26 insertions(+), 77 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 837a75dddb5f..78996dee8483 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -464,8 +464,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable Optional.empty(), Optional.empty(), Optional.empty(), - tableSnapshot.getVersion(), - false); + tableSnapshot.getVersion()); } @Override @@ -1503,7 +1502,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg List newFiles = ImmutableList.copyOf(split.get(true)); List cdfFiles = ImmutableList.copyOf(split.get(false)); - if (handle.isRetriesEnabled()) { + if (mergeHandle.getInsertTableHandle().isRetriesEnabled()) { cleanExtraOutputFilesForUpdate(session, handle.getLocation(), allFiles); } @@ -2166,8 +2165,7 @@ public Optional> applyFilter(C tableHandle.getUpdatedColumns(), tableHandle.getUpdateRowIdColumns(), Optional.empty(), - tableHandle.getReadVersion(), - tableHandle.isRetriesEnabled()); + tableHandle.getReadVersion()); if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) && tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) { @@ -2288,8 +2286,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession Optional.empty(), Optional.empty(), Optional.of(analyzeHandle), - handle.getReadVersion(), - false); + handle.getReadVersion()); TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata( columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index 1f4797116e45..b6de98c3ddf1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -139,7 +139,7 @@ private Stream getSplits( boolean splittable = // Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource - // per file. + // per file. TODO (https://github.com/trinodb/trino/issues/17063) use deletion vectors instead of copy-on-write and remove DeltaLakeTableHandle.writeType tableHandle.getWriteType().isEmpty() && // When only partitioning columns projected, there is no point splitting the files mayAnyDataColumnProjected(tableHandle); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java index 93cf0b89f01f..9be838ffe741 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java @@ -50,7 +50,6 @@ public enum WriteType private final TupleDomain nonPartitionConstraint; private final Optional writeType; private final long readVersion; - private final boolean retriesEnabled; private final Optional> projectedColumns; // UPDATE only: The list of columns being updated @@ -78,8 +77,7 @@ public DeltaLakeTableHandle( @JsonProperty("updatedColumns") Optional> updatedColumns, @JsonProperty("updateRowIdColumns") Optional> updateRowIdColumns, @JsonProperty("analyzeHandle") Optional analyzeHandle, - @JsonProperty("readVersion") long readVersion, - @JsonProperty("retriesEnabled") boolean retriesEnabled) + @JsonProperty("readVersion") long readVersion) { this( schemaName, @@ -95,8 +93,7 @@ public DeltaLakeTableHandle( analyzeHandle, false, Optional.empty(), - readVersion, - retriesEnabled); + readVersion); } public DeltaLakeTableHandle( @@ -113,8 +110,7 @@ public DeltaLakeTableHandle( Optional analyzeHandle, boolean recordScannedFiles, Optional maxScannedFileSize, - long readVersion, - boolean retriesEnabled) + long readVersion) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -132,7 +128,6 @@ public DeltaLakeTableHandle( this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.readVersion = readVersion; - this.retriesEnabled = retriesEnabled; } public DeltaLakeTableHandle withProjectedColumns(Set projectedColumns) @@ -149,8 +144,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set projectedColu updatedColumns, updateRowIdColumns, analyzeHandle, - readVersion, - retriesEnabled); + readVersion); } public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) @@ -169,8 +163,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max analyzeHandle, recordScannedFiles, Optional.of(maxScannedFileSize), - readVersion, - false); + readVersion); } @JsonProperty @@ -258,12 +251,6 @@ public long getReadVersion() return readVersion; } - @JsonProperty - public boolean isRetriesEnabled() - { - return retriesEnabled; - } - public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); @@ -299,8 +286,7 @@ public boolean equals(Object o) Objects.equals(updateRowIdColumns, that.updateRowIdColumns) && Objects.equals(analyzeHandle, that.analyzeHandle) && Objects.equals(maxScannedFileSize, that.maxScannedFileSize) && - readVersion == that.readVersion && - retriesEnabled == that.retriesEnabled; + readVersion == that.readVersion; } @Override @@ -320,7 +306,6 @@ public int hashCode() analyzeHandle, recordScannedFiles, maxScannedFileSize, - readVersion, - retriesEnabled); + readVersion); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 93df2d2a06b5..00e558937df6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -455,8 +455,7 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set Optional.of(ImmutableList.of(BOOLEAN_COLUMN_HANDLE)), Optional.of(ImmutableList.of(DOUBLE_COLUMN_HANDLE)), Optional.empty(), - 0, - false); + 0); } private static TupleDomain createConstrainedColumnsTuple( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 933b939dbcac..4b5d5d81b7e6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -76,8 +76,7 @@ public class TestDeltaLakeSplitManager Optional.empty(), Optional.empty(), Optional.empty(), - 0, - false); + 0); @Test public void testInitialSplits() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 1d3dd45303e2..cc85e4d520d6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -153,8 +153,7 @@ private void setupTransactionLogAccess(String tableName, String tableLocation, D Optional.empty(), Optional.empty(), Optional.empty(), - 0, - false); + 0); tableSnapshot = transactionLogAccess.loadSnapshot(tableHandle.getSchemaTableName(), tableLocation, SESSION); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java index 4c71580fb6f5..0de94956133b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java @@ -159,8 +159,7 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam Optional.empty(), Optional.empty(), Optional.empty(), - 0, - false); + 0); } @Test @@ -289,8 +288,7 @@ public void testStatisticsMultipleFiles() tableHandle.getUpdatedColumns(), tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), - 0, - false); + 0); stats = deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithUnenforcedConstraint); columnStatistics = stats.getColumnStatistics().get(COLUMN_HANDLE); assertEquals(columnStatistics.getRange().get().getMin(), 0.0); @@ -313,8 +311,7 @@ public void testStatisticsNoRecords() tableHandle.getUpdatedColumns(), tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), - 0, - false); + 0); DeltaLakeTableHandle tableHandleWithNoneUnenforcedConstraint = new DeltaLakeTableHandle( tableHandle.getSchemaName(), tableHandle.getTableName(), @@ -327,8 +324,7 @@ public void testStatisticsNoRecords() tableHandle.getUpdatedColumns(), tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), - 0, - false); + 0); // If either the table handle's constraint or the provided Constraint are none, it will cause a 0 record count to be reported assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneEnforcedConstraint)); assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneUnenforcedConstraint)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index ddceb2599c3b..c445f20ca17d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -405,7 +405,6 @@ public IcebergTableHandle getTableHandle( Optional.ofNullable(nameMappingJson), table.location(), table.properties(), - NO_RETRIES, false, Optional.empty()); } @@ -1992,8 +1991,10 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT @Override public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection fragments, Collection computedStatistics) { - IcebergTableHandle handle = ((IcebergMergeTableHandle) mergeTableHandle).getTableHandle(); - finishWrite(session, handle, fragments, true); + IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle; + IcebergTableHandle handle = mergeHandle.getTableHandle(); + RetryMode retryMode = mergeHandle.getInsertTableHandle().getRetryMode(); + finishWrite(session, handle, fragments, true, retryMode); } private static void verifyTableVersionForUpdate(IcebergTableHandle table) @@ -2020,7 +2021,7 @@ public static void validateNotPartitionedByNestedField(Schema schema, PartitionS } } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, boolean runUpdateValidations) + private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, boolean runUpdateValidations, RetryMode retryMode) { Table icebergTable = transaction.table(); @@ -2113,7 +2114,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col } // try to leave as little garbage as possible behind - if (table.getRetryMode() != NO_RETRIES) { + if (retryMode != NO_RETRIES) { cleanExtraOutputFiles(session, writtenFiles.build()); } @@ -2322,7 +2323,6 @@ else if (isMetadataColumnId(columnHandle.getId())) { table.getNameMappingJson(), table.getTableLocation(), table.getStorageProperties(), - table.getRetryMode(), table.isRecordScannedFiles(), table.getMaxScannedFileSize()), remainingConstraint.transformKeys(ColumnHandle.class::cast), @@ -2470,7 +2470,6 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab originalHandle.getNameMappingJson(), originalHandle.getTableLocation(), originalHandle.getStorageProperties(), - NO_RETRIES, // retry mode doesn't affect stats originalHandle.isRecordScannedFiles(), originalHandle.getMaxScannedFileSize()), handle -> { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index 055dc7943fe5..d8c4edd903cf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -49,7 +49,6 @@ public class IcebergTableHandle private final int formatVersion; private final String tableLocation; private final Map storageProperties; - private final RetryMode retryMode; // Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector private final TupleDomain unenforcedPredicate; @@ -79,8 +78,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("projectedColumns") Set projectedColumns, @JsonProperty("nameMappingJson") Optional nameMappingJson, @JsonProperty("tableLocation") String tableLocation, - @JsonProperty("storageProperties") Map storageProperties, - @JsonProperty("retryMode") RetryMode retryMode) + @JsonProperty("storageProperties") Map storageProperties) { return new IcebergTableHandle( schemaName, @@ -97,7 +95,6 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( nameMappingJson, tableLocation, storageProperties, - retryMode, false, Optional.empty()); } @@ -117,7 +114,6 @@ public IcebergTableHandle( Optional nameMappingJson, String tableLocation, Map storageProperties, - RetryMode retryMode, boolean recordScannedFiles, Optional maxScannedFileSize) { @@ -135,7 +131,6 @@ public IcebergTableHandle( this.nameMappingJson = requireNonNull(nameMappingJson, "nameMappingJson is null"); this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); - this.retryMode = requireNonNull(retryMode, "retryMode is null"); this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); } @@ -225,12 +220,6 @@ public Map getStorageProperties() return storageProperties; } - @JsonProperty - public RetryMode getRetryMode() - { - return retryMode; - } - @JsonIgnore public boolean isRecordScannedFiles() { @@ -270,7 +259,6 @@ public IcebergTableHandle withProjectedColumns(Set projecte nameMappingJson, tableLocation, storageProperties, - retryMode, recordScannedFiles, maxScannedFileSize); } @@ -292,7 +280,6 @@ public IcebergTableHandle withRetryMode(RetryMode retryMode) nameMappingJson, tableLocation, storageProperties, - retryMode, recordScannedFiles, maxScannedFileSize); } @@ -314,7 +301,6 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc nameMappingJson, tableLocation, storageProperties, - retryMode, recordScannedFiles, Optional.of(maxScannedFileSize)); } @@ -344,7 +330,6 @@ public boolean equals(Object o) Objects.equals(projectedColumns, that.projectedColumns) && Objects.equals(nameMappingJson, that.nameMappingJson) && Objects.equals(tableLocation, that.tableLocation) && - Objects.equals(retryMode, that.retryMode) && Objects.equals(storageProperties, that.storageProperties) && Objects.equals(maxScannedFileSize, that.maxScannedFileSize); } @@ -367,7 +352,6 @@ public int hashCode() nameMappingJson, tableLocation, storageProperties, - retryMode, recordScannedFiles, maxScannedFileSize); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index c325944dd0bd..3acb35d56685 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -39,7 +39,6 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.DynamicFilter; -import io.trino.spi.connector.RetryMode; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; @@ -189,7 +188,6 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle Optional.empty(), tablePath, ImmutableMap.of(), - RetryMode.NO_RETRIES, false, Optional.empty()), transaction); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 889b193d5e60..d66f7f16ac22 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -60,7 +60,6 @@ import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.spi.connector.Constraint.alwaysTrue; -import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.tpch.TpchTable.NATION; @@ -129,7 +128,6 @@ public void testIncompleteDynamicFilterTimeout() Optional.empty(), nationTable.location(), nationTable.properties(), - NO_RETRIES, false, Optional.empty()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index c50a6bb24db2..ac763f1b40e0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -65,7 +65,6 @@ import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.STRUCT; import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity; import static io.trino.plugin.iceberg.TableType.DATA; -import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RowType.field; @@ -168,7 +167,6 @@ public void testProjectionPushdown() Optional.empty(), "", ImmutableMap.of(), - NO_RETRIES, false, Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -250,7 +248,6 @@ public void testPredicatePushdown() Optional.empty(), "", ImmutableMap.of(), - NO_RETRIES, false, Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -299,7 +296,6 @@ public void testColumnPruningProjectionPushdown() Optional.empty(), "", ImmutableMap.of(), - NO_RETRIES, false, Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false)); @@ -359,7 +355,6 @@ public void testPushdownWithDuplicateExpressions() Optional.empty(), "", ImmutableMap.of(), - NO_RETRIES, false, Optional.empty()); TableHandle table = new TableHandle(catalogHandle, icebergTable, new HiveTransactionHandle(false));