Skip to content

Commit

Permalink
Remove table handle retriesEnabled field
Browse files Browse the repository at this point in the history
The table handle describes reads and retries do not matter for reads.
  • Loading branch information
findepi committed Apr 18, 2023
1 parent 9b4fd7b commit 0e69a72
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
Optional.empty(),
Optional.empty(),
Optional.empty(),
tableSnapshot.getVersion(),
false);
tableSnapshot.getVersion());
}

@Override
Expand Down Expand Up @@ -1503,7 +1502,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
List<DataFileInfo> newFiles = ImmutableList.copyOf(split.get(true));
List<DataFileInfo> cdfFiles = ImmutableList.copyOf(split.get(false));

if (handle.isRetriesEnabled()) {
if (mergeHandle.getInsertTableHandle().isRetriesEnabled()) {
cleanExtraOutputFilesForUpdate(session, handle.getLocation(), allFiles);
}

Expand Down Expand Up @@ -2166,8 +2165,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
Optional.empty(),
tableHandle.getReadVersion(),
tableHandle.isRetriesEnabled());
tableHandle.getReadVersion());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) {
Expand Down Expand Up @@ -2288,8 +2286,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
Optional.empty(),
Optional.empty(),
Optional.of(analyzeHandle),
handle.getReadVersion(),
false);
handle.getReadVersion());

TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata(
columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private Stream<DeltaLakeSplit> getSplits(

boolean splittable =
// Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource
// per file.
// per file. TODO (https://github.com/trinodb/trino/issues/17063) use deletion vectors instead of copy-on-write and remove DeltaLakeTableHandle.writeType
tableHandle.getWriteType().isEmpty() &&
// When only partitioning columns projected, there is no point splitting the files
mayAnyDataColumnProjected(tableHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public enum WriteType
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
private final long readVersion;
private final boolean retriesEnabled;

private final Optional<Set<ColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
Expand Down Expand Up @@ -78,8 +77,7 @@ public DeltaLakeTableHandle(
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
@JsonProperty("readVersion") long readVersion)
{
this(
schemaName,
Expand All @@ -95,8 +93,7 @@ public DeltaLakeTableHandle(
analyzeHandle,
false,
Optional.empty(),
readVersion,
retriesEnabled);
readVersion);
}

public DeltaLakeTableHandle(
Expand All @@ -113,8 +110,7 @@ public DeltaLakeTableHandle(
Optional<AnalyzeHandle> analyzeHandle,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize,
long readVersion,
boolean retriesEnabled)
long readVersion)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -132,7 +128,6 @@ public DeltaLakeTableHandle(
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.retriesEnabled = retriesEnabled;
}

public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColumns)
Expand All @@ -149,8 +144,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColu
updatedColumns,
updateRowIdColumns,
analyzeHandle,
readVersion,
retriesEnabled);
readVersion);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -169,8 +163,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
analyzeHandle,
recordScannedFiles,
Optional.of(maxScannedFileSize),
readVersion,
false);
readVersion);
}

@JsonProperty
Expand Down Expand Up @@ -258,12 +251,6 @@ public long getReadVersion()
return readVersion;
}

@JsonProperty
public boolean isRetriesEnabled()
{
return retriesEnabled;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -299,8 +286,7 @@ public boolean equals(Object o)
Objects.equals(updateRowIdColumns, that.updateRowIdColumns) &&
Objects.equals(analyzeHandle, that.analyzeHandle) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
readVersion == that.readVersion &&
retriesEnabled == that.retriesEnabled;
readVersion == that.readVersion;
}

@Override
Expand All @@ -320,7 +306,6 @@ public int hashCode()
analyzeHandle,
recordScannedFiles,
maxScannedFileSize,
readVersion,
retriesEnabled);
readVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,7 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<ColumnHandle>
Optional.of(ImmutableList.of(BOOLEAN_COLUMN_HANDLE)),
Optional.of(ImmutableList.of(DOUBLE_COLUMN_HANDLE)),
Optional.empty(),
0,
false);
0);
}

private static TupleDomain<DeltaLakeColumnHandle> createConstrainedColumnsTuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public class TestDeltaLakeSplitManager
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);

@Test
public void testInitialSplits()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ private void setupTransactionLogAccess(String tableName, String tableLocation, D
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);

tableSnapshot = transactionLogAccess.loadSnapshot(tableHandle.getSchemaTableName(), tableLocation, SESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);
}

@Test
Expand Down Expand Up @@ -289,8 +288,7 @@ public void testStatisticsMultipleFiles()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
stats = deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithUnenforcedConstraint);
columnStatistics = stats.getColumnStatistics().get(COLUMN_HANDLE);
assertEquals(columnStatistics.getRange().get().getMin(), 0.0);
Expand All @@ -313,8 +311,7 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
DeltaLakeTableHandle tableHandleWithNoneUnenforcedConstraint = new DeltaLakeTableHandle(
tableHandle.getSchemaName(),
tableHandle.getTableName(),
Expand All @@ -327,8 +324,7 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
// If either the table handle's constraint or the provided Constraint are none, it will cause a 0 record count to be reported
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneEnforcedConstraint));
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneUnenforcedConstraint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ public IcebergTableHandle getTableHandle(
Optional.ofNullable(nameMappingJson),
table.location(),
table.properties(),
NO_RETRIES,
false,
Optional.empty());
}
Expand Down Expand Up @@ -1992,8 +1991,10 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
IcebergTableHandle handle = ((IcebergMergeTableHandle) mergeTableHandle).getTableHandle();
finishWrite(session, handle, fragments, true);
IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle;
IcebergTableHandle handle = mergeHandle.getTableHandle();
RetryMode retryMode = mergeHandle.getInsertTableHandle().getRetryMode();
finishWrite(session, handle, fragments, true, retryMode);
}

private static void verifyTableVersionForUpdate(IcebergTableHandle table)
Expand All @@ -2020,7 +2021,7 @@ public static void validateNotPartitionedByNestedField(Schema schema, PartitionS
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations)
private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations, RetryMode retryMode)
{
Table icebergTable = transaction.table();

Expand Down Expand Up @@ -2113,7 +2114,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
if (retryMode != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

Expand Down Expand Up @@ -2322,7 +2323,6 @@ else if (isMetadataColumnId(columnHandle.getId())) {
table.getNameMappingJson(),
table.getTableLocation(),
table.getStorageProperties(),
table.getRetryMode(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
Expand Down Expand Up @@ -2470,7 +2470,6 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
originalHandle.getNameMappingJson(),
originalHandle.getTableLocation(),
originalHandle.getStorageProperties(),
NO_RETRIES, // retry mode doesn't affect stats
originalHandle.isRecordScannedFiles(),
originalHandle.getMaxScannedFileSize()),
handle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class IcebergTableHandle
private final int formatVersion;
private final String tableLocation;
private final Map<String, String> storageProperties;
private final RetryMode retryMode;

// Filter used during split generation and table scan, but not required to be strictly enforced by Iceberg Connector
private final TupleDomain<IcebergColumnHandle> unenforcedPredicate;
Expand Down Expand Up @@ -79,8 +78,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
@JsonProperty("projectedColumns") Set<IcebergColumnHandle> projectedColumns,
@JsonProperty("nameMappingJson") Optional<String> nameMappingJson,
@JsonProperty("tableLocation") String tableLocation,
@JsonProperty("storageProperties") Map<String, String> storageProperties,
@JsonProperty("retryMode") RetryMode retryMode)
@JsonProperty("storageProperties") Map<String, String> storageProperties)
{
return new IcebergTableHandle(
schemaName,
Expand All @@ -97,7 +95,6 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
nameMappingJson,
tableLocation,
storageProperties,
retryMode,
false,
Optional.empty());
}
Expand All @@ -117,7 +114,6 @@ public IcebergTableHandle(
Optional<String> nameMappingJson,
String tableLocation,
Map<String, String> storageProperties,
RetryMode retryMode,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize)
{
Expand All @@ -135,7 +131,6 @@ public IcebergTableHandle(
this.nameMappingJson = requireNonNull(nameMappingJson, "nameMappingJson is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.retryMode = requireNonNull(retryMode, "retryMode is null");
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
}
Expand Down Expand Up @@ -225,12 +220,6 @@ public Map<String, String> getStorageProperties()
return storageProperties;
}

@JsonProperty
public RetryMode getRetryMode()
{
return retryMode;
}

@JsonIgnore
public boolean isRecordScannedFiles()
{
Expand Down Expand Up @@ -270,7 +259,6 @@ public IcebergTableHandle withProjectedColumns(Set<IcebergColumnHandle> projecte
nameMappingJson,
tableLocation,
storageProperties,
retryMode,
recordScannedFiles,
maxScannedFileSize);
}
Expand All @@ -292,7 +280,6 @@ public IcebergTableHandle withRetryMode(RetryMode retryMode)
nameMappingJson,
tableLocation,
storageProperties,
retryMode,
recordScannedFiles,
maxScannedFileSize);
}
Expand All @@ -314,7 +301,6 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc
nameMappingJson,
tableLocation,
storageProperties,
retryMode,
recordScannedFiles,
Optional.of(maxScannedFileSize));
}
Expand Down Expand Up @@ -344,7 +330,6 @@ public boolean equals(Object o)
Objects.equals(projectedColumns, that.projectedColumns) &&
Objects.equals(nameMappingJson, that.nameMappingJson) &&
Objects.equals(tableLocation, that.tableLocation) &&
Objects.equals(retryMode, that.retryMode) &&
Objects.equals(storageProperties, that.storageProperties) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize);
}
Expand All @@ -367,7 +352,6 @@ public int hashCode()
nameMappingJson,
tableLocation,
storageProperties,
retryMode,
recordScannedFiles,
maxScannedFileSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -189,7 +188,6 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
Optional.empty(),
tablePath,
ImmutableMap.of(),
RetryMode.NO_RETRIES,
false,
Optional.empty()),
transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.tpch.TpchTable.NATION;
Expand Down Expand Up @@ -129,7 +128,6 @@ public void testIncompleteDynamicFilterTimeout()
Optional.empty(),
nationTable.location(),
nationTable.properties(),
NO_RETRIES,
false,
Optional.empty());

Expand Down
Loading

0 comments on commit 0e69a72

Please sign in to comment.