Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove table handle retriesEnabled field #17074

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {}
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,11 @@ default ConnectorMergeTableHandle beginMerge(ConnectorSession session, Connector
* Finish a merge query
*
* @param session The session
* @param tableHandle A ConnectorMergeTableHandle for the table that is the target of the merge
* @param mergeTableHandle A ConnectorMergeTableHandle for the table that is the target of the merge
* @param fragments All fragments returned by the merge plan
* @param computedStatistics Statistics for the table, meaningful only to the connector that produced them.
*/
default void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
default void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginMerge() is implemented without finishMerge()");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,10 +1056,10 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.finishMerge(session, tableHandle, fragments, computedStatistics);
delegate.finishMerge(session, mergeTableHandle, fragments, computedStatistics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {}
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {}

@Override
public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
Optional.empty(),
Optional.empty(),
Optional.empty(),
tableSnapshot.getVersion(),
false);
tableSnapshot.getVersion());
}

@Override
Expand Down Expand Up @@ -1477,10 +1476,10 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
DeltaLakeMergeTableHandle mergeTableHandle = (DeltaLakeMergeTableHandle) tableHandle;
DeltaLakeTableHandle handle = mergeTableHandle.getTableHandle();
DeltaLakeMergeTableHandle mergeHandle = (DeltaLakeMergeTableHandle) mergeTableHandle;
DeltaLakeTableHandle handle = mergeHandle.getTableHandle();

List<DeltaLakeMergeResult> mergeResults = fragments.stream()
.map(Slice::getBytes)
Expand All @@ -1497,14 +1496,13 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl
.flatMap(Optional::stream)
.collect(toImmutableList());

Map<Boolean, List<DataFileInfo>> splitted = allFiles.stream()
Map<Boolean, List<DataFileInfo>> split = allFiles.stream()
.collect(partitioningBy(dataFile -> dataFile.getDataFileType() == DATA));

List<DataFileInfo> newFiles = ImmutableList.copyOf(splitted.get(true));

List<DataFileInfo> cdfFiles = ImmutableList.copyOf(splitted.get(false));
List<DataFileInfo> newFiles = ImmutableList.copyOf(split.get(true));
List<DataFileInfo> cdfFiles = ImmutableList.copyOf(split.get(false));

if (handle.isRetriesEnabled()) {
if (mergeHandle.getInsertTableHandle().isRetriesEnabled()) {
cleanExtraOutputFilesForUpdate(session, handle.getLocation(), allFiles);
}

Expand Down Expand Up @@ -1539,7 +1537,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl
}

List<String> partitionColumns = getColumnMappingMode(handle.getMetadataEntry()) == ColumnMappingMode.NAME
? getPartitionColumnsForNameMapping(handle.getMetadataEntry().getOriginalPartitionColumns(), mergeTableHandle.getInsertTableHandle().getInputColumns())
? getPartitionColumnsForNameMapping(handle.getMetadataEntry().getOriginalPartitionColumns(), mergeHandle.getInsertTableHandle().getInputColumns())
: handle.getMetadataEntry().getOriginalPartitionColumns();
appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, true);

Expand Down Expand Up @@ -2167,8 +2165,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
Optional.empty(),
tableHandle.getReadVersion(),
tableHandle.isRetriesEnabled());
tableHandle.getReadVersion());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) {
Expand Down Expand Up @@ -2289,8 +2286,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
Optional.empty(),
Optional.empty(),
Optional.of(analyzeHandle),
handle.getReadVersion(),
false);
handle.getReadVersion());

TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata(
columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private Stream<DeltaLakeSplit> getSplits(

boolean splittable =
// Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource
// per file.
// per file. TODO (https://github.com/trinodb/trino/issues/17063) use deletion vectors instead of copy-on-write and remove DeltaLakeTableHandle.writeType
tableHandle.getWriteType().isEmpty() &&
// When only partitioning columns projected, there is no point splitting the files
mayAnyDataColumnProjected(tableHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public enum WriteType
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
private final long readVersion;
private final boolean retriesEnabled;

private final Optional<Set<ColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
Expand Down Expand Up @@ -78,8 +77,7 @@ public DeltaLakeTableHandle(
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
@JsonProperty("readVersion") long readVersion)
{
this(
schemaName,
Expand All @@ -95,8 +93,7 @@ public DeltaLakeTableHandle(
analyzeHandle,
false,
Optional.empty(),
readVersion,
retriesEnabled);
readVersion);
}

public DeltaLakeTableHandle(
Expand All @@ -113,8 +110,7 @@ public DeltaLakeTableHandle(
Optional<AnalyzeHandle> analyzeHandle,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize,
long readVersion,
boolean retriesEnabled)
long readVersion)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -132,7 +128,6 @@ public DeltaLakeTableHandle(
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.retriesEnabled = retriesEnabled;
}

public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColumns)
Expand All @@ -149,8 +144,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColu
updatedColumns,
updateRowIdColumns,
analyzeHandle,
readVersion,
retriesEnabled);
readVersion);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -169,8 +163,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
analyzeHandle,
recordScannedFiles,
Optional.of(maxScannedFileSize),
readVersion,
false);
readVersion);
}

@JsonProperty
Expand Down Expand Up @@ -210,7 +203,7 @@ public TupleDomain<DeltaLakeColumnHandle> getNonPartitionConstraint()
}

@JsonProperty
public Optional<DeltaLakeTableHandle.WriteType> getWriteType()
public Optional<WriteType> getWriteType()
{
return writeType;
}
Expand Down Expand Up @@ -258,12 +251,6 @@ public long getReadVersion()
return readVersion;
}

@JsonProperty
public boolean isRetriesEnabled()
{
return retriesEnabled;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -299,8 +286,7 @@ public boolean equals(Object o)
Objects.equals(updateRowIdColumns, that.updateRowIdColumns) &&
Objects.equals(analyzeHandle, that.analyzeHandle) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
readVersion == that.readVersion &&
retriesEnabled == that.retriesEnabled;
readVersion == that.readVersion;
}

@Override
Expand All @@ -320,7 +306,6 @@ public int hashCode()
analyzeHandle,
recordScannedFiles,
maxScannedFileSize,
readVersion,
retriesEnabled);
readVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,7 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<ColumnHandle>
Optional.of(ImmutableList.of(BOOLEAN_COLUMN_HANDLE)),
Optional.of(ImmutableList.of(DOUBLE_COLUMN_HANDLE)),
Optional.empty(),
0,
false);
0);
}

private static TupleDomain<DeltaLakeColumnHandle> createConstrainedColumnsTuple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public class TestDeltaLakeSplitManager
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);

@Test
public void testInitialSplits()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ private void setupTransactionLogAccess(String tableName, String tableLocation, D
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);

tableSnapshot = transactionLogAccess.loadSnapshot(tableHandle.getSchemaTableName(), tableLocation, SESSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
false);
0);
}

@Test
Expand Down Expand Up @@ -289,8 +288,7 @@ public void testStatisticsMultipleFiles()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
stats = deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithUnenforcedConstraint);
columnStatistics = stats.getColumnStatistics().get(COLUMN_HANDLE);
assertEquals(columnStatistics.getRange().get().getMin(), 0.0);
Expand All @@ -313,8 +311,7 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
DeltaLakeTableHandle tableHandleWithNoneUnenforcedConstraint = new DeltaLakeTableHandle(
tableHandle.getSchemaName(),
tableHandle.getTableName(),
Expand All @@ -327,8 +324,7 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
0,
false);
0);
// If either the table handle's constraint or the provided Constraint are none, it will cause a 0 record count to be reported
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneEnforcedConstraint));
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneUnenforcedConstraint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,9 +1930,9 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
HiveMergeTableHandle mergeHandle = (HiveMergeTableHandle) tableHandle;
HiveMergeTableHandle mergeHandle = (HiveMergeTableHandle) mergeTableHandle;
HiveInsertTableHandle insertHandle = mergeHandle.getInsertHandle();
HiveTableHandle handle = mergeHandle.getTableHandle();
checkArgument(handle.isAcidMerge(), "handle should be a merge handle, but is %s", handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,6 @@ public IcebergTableHandle getTableHandle(
Optional.ofNullable(nameMappingJson),
table.location(),
table.properties(),
NO_RETRIES,
false,
Optional.empty());
}
Expand Down Expand Up @@ -1990,10 +1989,12 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
}

@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
IcebergTableHandle handle = ((IcebergMergeTableHandle) tableHandle).getTableHandle();
finishWrite(session, handle, fragments, true);
IcebergMergeTableHandle mergeHandle = (IcebergMergeTableHandle) mergeTableHandle;
IcebergTableHandle handle = mergeHandle.getTableHandle();
RetryMode retryMode = mergeHandle.getInsertTableHandle().getRetryMode();
finishWrite(session, handle, fragments, true, retryMode);
}

private static void verifyTableVersionForUpdate(IcebergTableHandle table)
Expand All @@ -2020,7 +2021,7 @@ public static void validateNotPartitionedByNestedField(Schema schema, PartitionS
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations)
private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, boolean runUpdateValidations, RetryMode retryMode)
{
Table icebergTable = transaction.table();

Expand Down Expand Up @@ -2113,7 +2114,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
}

// try to leave as little garbage as possible behind
if (table.getRetryMode() != NO_RETRIES) {
if (retryMode != NO_RETRIES) {
cleanExtraOutputFiles(session, writtenFiles.build());
}

Expand Down Expand Up @@ -2322,7 +2323,6 @@ else if (isMetadataColumnId(columnHandle.getId())) {
table.getNameMappingJson(),
table.getTableLocation(),
table.getStorageProperties(),
table.getRetryMode(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
Expand Down Expand Up @@ -2470,7 +2470,6 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
originalHandle.getNameMappingJson(),
originalHandle.getTableLocation(),
originalHandle.getStorageProperties(),
NO_RETRIES, // retry mode doesn't affect stats
originalHandle.isRecordScannedFiles(),
originalHandle.getMaxScannedFileSize()),
handle -> {
Expand Down
Loading