diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e8c88dc6b792..e40ab426f913 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -383,6 +383,7 @@ jobs:
- { modules: plugin/trino-cassandra }
- { modules: plugin/trino-clickhouse }
- { modules: plugin/trino-delta-lake }
+ - { modules: plugin/trino-delta-lake, profile: test-failure-recovery }
- { modules: plugin/trino-hive }
- { modules: plugin/trino-hive, profile: test-parquet }
- { modules: plugin/trino-hive, profile: test-failure-recovery }
diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml
index ce11d1c5e3a1..570e1e0d3d8b 100644
--- a/plugin/trino-delta-lake/pom.xml
+++ b/plugin/trino-delta-lake/pom.xml
@@ -197,6 +197,18 @@
+
+ io.trino
+ trino-exchange
+ test
+
+
+
+ io.trino
+ trino-exchange
+ test-jar
+ test
+
io.trino
@@ -243,6 +255,12 @@
test
+
+ io.trino
+ trino-testing-containers
+ test
+
+
io.trino
trino-testing-services
@@ -289,6 +307,18 @@
azure-storage-blob
12.10.0
test
+
+
+
+ io.netty
+ netty-transport-classes-epoll
+
+
+
+ io.netty
+ netty-transport-native-epoll
+
+
@@ -380,9 +410,22 @@
**/TestDeltaLakeAdlsStorage.java
**/TestDeltaLakeAdlsConnectorSmokeTest.java
**/TestDeltaLakeGlueMetastore.java
+ **/TestDelta*FailureRecoveryTest.java
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+
+
+
+ mime.types
+ about.html
+
+
+
@@ -405,5 +448,25 @@
+
+
+ test-failure-recovery
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ 4
+
+ **/TestDelta*FailureRecoveryTest.java
+
+
+
+
+
+
+
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java
index 629da2a766ee..95425a4ae400 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java
@@ -32,6 +32,7 @@ public class DeltaLakeInsertTableHandle
private final MetadataEntry metadataEntry;
private final List inputColumns;
private final long readVersion;
+ private final boolean retriesEnabled;
@JsonCreator
public DeltaLakeInsertTableHandle(
@@ -40,7 +41,8 @@ public DeltaLakeInsertTableHandle(
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("inputColumns") List inputColumns,
- @JsonProperty("readVersion") long readVersion)
+ @JsonProperty("readVersion") long readVersion,
+ @JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
@@ -48,6 +50,7 @@ public DeltaLakeInsertTableHandle(
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
+ this.retriesEnabled = retriesEnabled;
}
@JsonProperty
@@ -85,4 +88,10 @@ public long getReadVersion()
{
return readVersion;
}
+
+ @JsonProperty
+ public boolean isRetriesEnabled()
+ {
+ return retriesEnabled;
+ }
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
index b715c8508dcc..89ef27e873ac 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
@@ -74,6 +74,7 @@
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
@@ -99,6 +100,7 @@
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -107,9 +109,12 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -167,6 +172,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static io.trino.spi.predicate.Range.greaterThanOrEqual;
import static io.trino.spi.predicate.Range.lessThanOrEqual;
@@ -345,7 +351,8 @@ public DeltaLakeTableHandle getTableHandle(ConnectorSession session, SchemaTable
Optional.empty(),
Optional.empty(),
Optional.empty(),
- tableSnapshot.getVersion());
+ tableSnapshot.getVersion(),
+ false);
}
@Override
@@ -675,7 +682,7 @@ private Path getExternalPath(HdfsContext context, String location)
}
@Override
- public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout)
+ public DeltaLakeOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode)
{
validateTableColumns(tableMetadata);
@@ -941,7 +948,7 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit
}
@Override
- public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns)
+ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
if (!allowWrite(session, table)) {
@@ -968,7 +975,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
tableLocation,
table.getMetadataEntry(),
inputColumns,
- getMandatoryCurrentVersion(fileSystem, new Path(tableLocation)));
+ getMandatoryCurrentVersion(fileSystem, new Path(tableLocation)),
+ retryMode != NO_RETRIES);
}
catch (IOException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
@@ -1003,6 +1011,10 @@ public Optional finishInsert(
.map(dataFileInfoCodec::fromJson)
.collect(toImmutableList());
+ if (handle.isRetriesEnabled()) {
+ cleanExtraOutputFiles(session, handle.getLocation(), dataFileInfos);
+ }
+
boolean writeCommitted = false;
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, handle.getLocation());
@@ -1058,7 +1070,7 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
}
@Override
- public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
+ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
if (!allowWrite(session, handle)) {
@@ -1075,7 +1087,8 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
handle.getEnforcedPartitionConstraint(),
handle.getNonPartitionConstraint(),
handle.getProjectedColumns(),
- handle.getReadVersion());
+ handle.getReadVersion(),
+ retryMode != NO_RETRIES);
}
@Override
@@ -1109,7 +1122,7 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect
}
@Override
- public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns)
+ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns, RetryMode retryMode)
{
DeltaLakeTableHandle handle = (DeltaLakeTableHandle) tableHandle;
if (!allowWrite(session, handle)) {
@@ -1141,7 +1154,8 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
handle.getProjectedColumns(),
updatedColumnHandles,
unmodifiedColumns,
- handle.getReadVersion());
+ handle.getReadVersion(),
+ retryMode != NO_RETRIES);
}
@Override
@@ -1155,7 +1169,8 @@ public Optional getTableHandleForExecute(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
String procedureName,
- Map executeProperties)
+ Map executeProperties,
+ RetryMode retryMode)
{
DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle;
@@ -1169,13 +1184,13 @@ public Optional getTableHandleForExecute(
switch (procedureId) {
case OPTIMIZE:
- return getTableHandleForOptimize(tableHandle, executeProperties);
+ return getTableHandleForOptimize(tableHandle, executeProperties, retryMode);
}
throw new IllegalArgumentException("Unknown procedure: " + procedureId);
}
- private Optional getTableHandleForOptimize(DeltaLakeTableHandle tableHandle, Map executeProperties)
+ private Optional getTableHandleForOptimize(DeltaLakeTableHandle tableHandle, Map executeProperties, RetryMode retryMode)
{
DataSize maxScannedFileSize = (DataSize) executeProperties.get("file_size_threshold");
@@ -1191,7 +1206,8 @@ private Optional getTableHandleForOptimize(DeltaLak
columns,
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
maxScannedFileSize,
- Optional.empty()),
+ Optional.empty(),
+ retryMode != NO_RETRIES),
tableHandle.getLocation()));
}
@@ -1286,6 +1302,10 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
.map(dataFileInfoCodec::fromJson)
.collect(toImmutableList());
+ if (optimizeHandle.isRetriesEnabled()) {
+ cleanExtraOutputFiles(session, executeHandle.getTableLocation(), dataFileInfos);
+ }
+
boolean writeCommitted = false;
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);
@@ -1372,6 +1392,10 @@ private void finishWrite(ConnectorSession session, ConnectorTableHandle tableHan
.map(deleteResultJsonCodec::fromJson)
.collect(toImmutableList());
+ if (handle.isRetriesEnabled()) {
+ cleanExtraOutputFilesForUpdate(session, handle.getLocation(), updateResults);
+ }
+
String tableLocation = metastore.getTableLocation(handle.getSchemaTableName(), session);
DeltaLakeTableHandle.WriteType writeType = handle.getWriteType().orElseThrow();
@@ -1589,7 +1613,8 @@ public Optional> applyFilter(C
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
Optional.empty(),
- tableHandle.getReadVersion());
+ tableHandle.getReadVersion(),
+ tableHandle.isRetriesEnabled());
if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) {
@@ -1710,7 +1735,8 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi
Optional.empty(),
Optional.empty(),
Optional.of(analyzeHandle),
- version);
+ version,
+ false);
}
@Override
@@ -1814,6 +1840,96 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
statisticsAccess.updateDeltaLakeStatistics(session, location, mergedDeltaLakeStatistics);
}
+ private void cleanExtraOutputFiles(ConnectorSession session, String baseLocation, List validDataFiles)
+ {
+ Set writtenFilePaths = validDataFiles.stream()
+ .map(dataFileInfo -> baseLocation + "/" + dataFileInfo.getPath())
+ .collect(toImmutableSet());
+
+ cleanExtraOutputFiles(session, writtenFilePaths);
+ }
+
+ private void cleanExtraOutputFilesForUpdate(ConnectorSession session, String baseLocation, List validUpdateResults)
+ {
+ Set writtenFilePaths = validUpdateResults.stream()
+ .map(DeltaLakeUpdateResult::getNewFile)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(dataFileInfo -> baseLocation + "/" + dataFileInfo.getPath())
+ .collect(toImmutableSet());
+
+ cleanExtraOutputFiles(session, writtenFilePaths);
+ }
+
+ private void cleanExtraOutputFiles(ConnectorSession session, Set validWrittenFilePaths)
+ {
+ HdfsContext hdfsContext = new HdfsContext(session);
+
+ Set fileLocations = validWrittenFilePaths.stream()
+ .map(path -> {
+ int fileNameSeparatorPos = path.lastIndexOf("/");
+ verify(fileNameSeparatorPos != -1 && fileNameSeparatorPos != 0, "invalid data file path: %s", path);
+ return path.substring(0, fileNameSeparatorPos);
+ })
+ .collect(toImmutableSet());
+
+ for (String location : fileLocations) {
+ cleanExtraOutputFiles(hdfsContext, session.getQueryId(), location, validWrittenFilePaths);
+ }
+ }
+
+ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, String location, Set filesToKeep)
+ {
+ Deque filesToDelete = new ArrayDeque<>();
+ try {
+ LOG.debug("Deleting failed attempt files from %s for query %s", location, queryId);
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, new Path(location));
+ if (!fileSystem.exists(new Path(location))) {
+ // directory may not exist if no files were actually written
+ return;
+ }
+
+ // files within given partition are written flat into location; we need to list recursively
+ RemoteIterator iterator = fileSystem.listFiles(new Path(location), false);
+ while (iterator.hasNext()) {
+ Path file = iterator.next().getPath();
+ if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(location + "/" + file.getName())) {
+ filesToDelete.add(file.getName());
+ }
+ }
+
+ if (filesToDelete.isEmpty()) {
+ return;
+ }
+
+ LOG.info("Found %s files to delete and %s to retain in location %s for query %s", filesToDelete.size(), filesToKeep.size(), location, queryId);
+ ImmutableList.Builder deletedFilesBuilder = ImmutableList.builder();
+ Iterator filesToDeleteIterator = filesToDelete.iterator();
+ while (filesToDeleteIterator.hasNext()) {
+ String fileName = filesToDeleteIterator.next();
+ LOG.debug("Deleting failed attempt file %s/%s for query %s", location, fileName, queryId);
+ fileSystem.delete(new Path(location, fileName), false);
+ deletedFilesBuilder.add(fileName);
+ filesToDeleteIterator.remove();
+ }
+
+ List deletedFiles = deletedFilesBuilder.build();
+ if (!deletedFiles.isEmpty()) {
+ LOG.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, location, queryId);
+ }
+ }
+ catch (IOException e) {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR,
+ format("Could not clean up extraneous output files; remaining files: %s", filesToDelete), e);
+ }
+ }
+
+ private boolean isFileCreatedByQuery(String fileName, String queryId)
+ {
+ verify(!queryId.contains("-"), "queryId(%s) should not contain hyphens", queryId);
+ return fileName.startsWith(queryId + "-");
+ }
+
private static Map toDeltaLakeColumnStatistics(Collection computedStatistics)
{
// Only statistics for whole table are collected
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java
index 3e402be12a7c..d757deac6a42 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java
@@ -52,6 +52,7 @@ public enum WriteType
private final TupleDomain nonPartitionConstraint;
private final Optional writeType;
private final long readVersion;
+ private final boolean retriesEnabled;
private final Optional> projectedColumns;
// UPDATE only: The list of columns being updated
@@ -79,7 +80,8 @@ public DeltaLakeTableHandle(
@JsonProperty("updatedColumns") Optional> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional analyzeHandle,
- @JsonProperty("readVersion") long readVersion)
+ @JsonProperty("readVersion") long readVersion,
+ @JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this(
schemaName,
@@ -95,7 +97,8 @@ public DeltaLakeTableHandle(
analyzeHandle,
false,
Optional.empty(),
- readVersion);
+ readVersion,
+ retriesEnabled);
}
public DeltaLakeTableHandle(
@@ -112,7 +115,8 @@ public DeltaLakeTableHandle(
Optional analyzeHandle,
boolean recordScannedFiles,
Optional maxScannedFileSize,
- long readVersion)
+ long readVersion,
+ boolean retriesEnabled)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
@@ -130,6 +134,7 @@ public DeltaLakeTableHandle(
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
+ this.retriesEnabled = retriesEnabled;
}
public static DeltaLakeTableHandle forDelete(
@@ -140,7 +145,8 @@ public static DeltaLakeTableHandle forDelete(
TupleDomain enforcedConstraint,
TupleDomain unenforcedConstraint,
Optional> projectedColumns,
- long readVersion)
+ long readVersion,
+ boolean retriesEnabled)
{
return new DeltaLakeTableHandle(
schemaName,
@@ -154,7 +160,8 @@ public static DeltaLakeTableHandle forDelete(
Optional.empty(),
Optional.empty(),
Optional.empty(),
- readVersion);
+ readVersion,
+ retriesEnabled);
}
public static DeltaLakeTableHandle forUpdate(
@@ -167,7 +174,8 @@ public static DeltaLakeTableHandle forUpdate(
Optional> projectedColumns,
List updatedColumns,
List updateRowIdColumns,
- long readVersion)
+ long readVersion,
+ boolean retriesEnabled)
{
checkArgument(!updatedColumns.isEmpty(), "Update must specify at least one column to set");
return new DeltaLakeTableHandle(
@@ -182,7 +190,8 @@ public static DeltaLakeTableHandle forUpdate(
Optional.of(updatedColumns),
Optional.of(updateRowIdColumns),
Optional.empty(),
- readVersion);
+ readVersion,
+ retriesEnabled);
}
public DeltaLakeTableHandle withProjectedColumns(Set projectedColumns)
@@ -199,7 +208,8 @@ public DeltaLakeTableHandle withProjectedColumns(Set projectedColu
getUpdatedColumns(),
getUpdateRowIdColumns(),
getAnalyzeHandle(),
- getReadVersion());
+ getReadVersion(),
+ isRetriesEnabled());
}
public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
@@ -218,7 +228,8 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
analyzeHandle,
recordScannedFiles,
Optional.of(maxScannedFileSize),
- readVersion);
+ readVersion,
+ false);
}
@JsonProperty
@@ -306,6 +317,12 @@ public long getReadVersion()
return readVersion;
}
+ @JsonProperty
+ public boolean isRetriesEnabled()
+ {
+ return retriesEnabled;
+ }
+
public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
@@ -341,7 +358,8 @@ public boolean equals(Object o)
Objects.equals(updateRowIdColumns, that.updateRowIdColumns) &&
Objects.equals(analyzeHandle, that.analyzeHandle) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
- readVersion == that.readVersion;
+ readVersion == that.readVersion &&
+ retriesEnabled == that.retriesEnabled;
}
@Override
@@ -361,6 +379,7 @@ public int hashCode()
analyzeHandle,
recordScannedFiles,
maxScannedFileSize,
- readVersion);
+ readVersion,
+ retriesEnabled);
}
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java
index 802a6f8660ab..839311885c5b 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DeltaTableOptimizeHandle.java
@@ -34,6 +34,7 @@ public class DeltaTableOptimizeHandle
private final List originalPartitionColumns;
private final DataSize maxScannedFileSize;
private final Optional currentVersion;
+ private final boolean retriesEnabled;
@JsonCreator
public DeltaTableOptimizeHandle(
@@ -41,13 +42,15 @@ public DeltaTableOptimizeHandle(
List tableColumns,
List originalPartitionColumns,
DataSize maxScannedFileSize,
- Optional currentVersion)
+ Optional currentVersion,
+ boolean retriesEnabled)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
this.originalPartitionColumns = ImmutableList.copyOf(requireNonNull(originalPartitionColumns, "originalPartitionColumns is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.currentVersion = requireNonNull(currentVersion, "currentVersion is null");
+ this.retriesEnabled = retriesEnabled;
}
public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
@@ -58,7 +61,8 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
- Optional.of(currentVersion));
+ Optional.of(currentVersion),
+ retriesEnabled);
}
@JsonProperty
@@ -93,4 +97,10 @@ public DataSize getMaxScannedFileSize()
{
return maxScannedFileSize;
}
+
+ @JsonProperty
+ public boolean isRetriesEnabled()
+ {
+ return retriesEnabled;
+ }
}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaFailureRecoveryTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaFailureRecoveryTest.java
new file mode 100644
index 000000000000..bf84cf41aeb4
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaFailureRecoveryTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake;
+
+import io.trino.operator.RetryPolicy;
+import io.trino.spi.ErrorType;
+import io.trino.testing.BaseFailureRecoveryTest;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static io.trino.execution.FailureInjector.FAILURE_INJECTION_MESSAGE;
+import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_FAILURE;
+import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_FAILURE;
+import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_GET_RESULTS_REQUEST_TIMEOUT;
+import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_FAILURE;
+import static io.trino.execution.FailureInjector.InjectedFailureType.TASK_MANAGEMENT_REQUEST_TIMEOUT;
+import static java.lang.String.format;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+public abstract class BaseDeltaFailureRecoveryTest
+ extends BaseFailureRecoveryTest
+{
+ protected BaseDeltaFailureRecoveryTest(RetryPolicy retryPolicy)
+ {
+ super(retryPolicy);
+ }
+
+ @Override
+ protected boolean areWriteRetriesSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public void testDelete()
+ {
+ // Test method is overriden because method from superclass assumes more complex plan for `DELETE` query.
+ // Assertions do not play well if plan consists of just two fragments.
+
+ Optional setupQuery = Optional.of("CREATE TABLE AS SELECT * FROM orders");
+ Optional cleanupQuery = Optional.of("DROP TABLE ");
+ String deleteQuery = "DELETE FROM WHERE orderkey = 1";
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(boundaryCoordinatorStage())
+ .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(rootStage())
+ .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(leafStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
+ .finishesSuccessfully();
+
+ // note: this is effectively same as test with `leafStage`. Should it be dropped?
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
+ .finishesSuccessfully();
+
+ // DELETE plan is too simplistic for testing with `intermediateDistributedStage`
+ assertThatThrownBy(() ->
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(intermediateDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)))
+ .hasMessageContaining("stage not found");
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_MANAGEMENT_REQUEST_FAILURE)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500"))
+ .finishesSuccessfully();
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_GET_RESULTS_REQUEST_FAILURE)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500"))
+ .finishesSuccessfully();
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node"))
+ .finishesSuccessfully();
+
+ assertThatQuery(deleteQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node"))
+ .finishesSuccessfully();
+ }
+
+ @Override
+ public void testUpdate()
+ {
+ // Test method is overriden because method from superclass assumes more complex plan for `UPDATE` query.
+ // Assertions do not play well if plan consists of just two fragments.
+
+ Optional setupQuery = Optional.of("CREATE TABLE AS SELECT * FROM orders");
+ Optional cleanupQuery = Optional.of("DROP TABLE ");
+ String updateQuery = "UPDATE SET shippriority = 101 WHERE custkey = 1";
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(boundaryCoordinatorStage())
+ .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(rootStage())
+ .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(leafStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
+ .finishesSuccessfully();
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
+ .finishesSuccessfully();
+
+ // UPDATE plan is too simplistic for testing with `intermediateDistributedStage`
+ assertThatThrownBy(() ->
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
+ .at(intermediateDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)))
+ .hasMessageContaining("stage not found");
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_MANAGEMENT_REQUEST_FAILURE)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500"))
+ .finishesSuccessfully();
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_GET_RESULTS_REQUEST_FAILURE)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500"))
+ .finishesSuccessfully();
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node"))
+ .finishesSuccessfully();
+
+ assertThatQuery(updateQuery)
+ .withSetupQuery(setupQuery)
+ .withCleanupQuery(cleanupQuery)
+ .experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT)
+ .at(boundaryDistributedStage())
+ .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node"))
+ .finishesSuccessfully();
+ }
+
+ @Override
+ // materialized views are currently not implemented by Delta connector
+ public void testRefreshMaterializedView()
+ {
+ assertThatThrownBy(super::testRefreshMaterializedView)
+ .hasMessageContaining("This connector does not support creating materialized views");
+ }
+
+ @Override
+ protected void createPartitionedLineitemTable(String tableName, List columns, String partitionColumn)
+ {
+ String sql = format(
+ "CREATE TABLE %s WITH (partitioned_by = array['%s']) AS SELECT %s FROM tpch.tiny.lineitem",
+ tableName,
+ partitionColumn,
+ String.join(",", columns));
+ getQueryRunner().execute(sql);
+ }
+
+ @Test(invocationCount = INVOCATION_COUNT)
+ public void testCreatePartitionedTable()
+ {
+ testTableModification(
+ Optional.empty(),
+ "CREATE TABLE WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders",
+ Optional.of("DROP TABLE "));
+ }
+
+ @Test(invocationCount = INVOCATION_COUNT)
+ public void testInsertIntoNewPartition()
+ {
+ testTableModification(
+ Optional.of("CREATE TABLE WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
+ "INSERT INTO SELECT *, 'partition2' p FROM orders",
+ Optional.of("DROP TABLE "));
+ }
+
+ @Test(invocationCount = INVOCATION_COUNT)
+ public void testInsertIntoExistingPartition()
+ {
+ testTableModification(
+ Optional.of("CREATE TABLE WITH (partitioned_by = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
+ "INSERT INTO SELECT *, 'partition1' p FROM orders",
+ Optional.of("DROP TABLE "));
+ }
+}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java
index 8d5fa2fd002c..cee1fdc1d02b 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java
@@ -325,7 +325,8 @@ public void testGetInsertLayoutTableNotFound()
Optional.empty(),
Optional.empty(),
Optional.empty(),
- 0);
+ 0,
+ false);
assertThatThrownBy(() -> deltaLakeMetadataFactory.create(SESSION.getIdentity())
.getInsertLayout(SESSION, missingTableHandle))
@@ -449,7 +450,8 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set
Optional.of(ImmutableList.of(BOOLEAN_COLUMN_HANDLE)),
Optional.of(ImmutableList.of(DOUBLE_COLUMN_HANDLE)),
Optional.empty(),
- 0);
+ 0,
+ false);
}
private static TupleDomain createConstrainedColumnsTuple(
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
index 19b383cafdd7..ca91146cb47c 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
@@ -77,7 +77,8 @@ public class TestDeltaLakeSplitManager
Optional.empty(),
Optional.empty(),
Optional.empty(),
- 0);
+ 0,
+ false);
@Test
public void testInitialSplits()
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java
new file mode 100644
index 000000000000..4fa5bdd5919b
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaTaskFailureRecoveryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.operator.RetryPolicy;
+import io.trino.plugin.deltalake.util.DockerizedMinioDataLake;
+import io.trino.plugin.exchange.FileSystemExchangePlugin;
+import io.trino.plugin.exchange.containers.MinioStorage;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import io.trino.tpch.TpchTable;
+
+import java.util.List;
+import java.util.Map;
+
+import static io.trino.plugin.deltalake.DeltaLakeDockerizedMinioDataLake.createDockerizedMinioDataLakeForDeltaLake;
+import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG;
+import static io.trino.plugin.exchange.containers.MinioStorage.getExchangeManagerProperties;
+import static io.trino.testing.sql.TestTable.randomTableSuffix;
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class TestDeltaTaskFailureRecoveryTest
+ extends BaseDeltaFailureRecoveryTest
+{
+ private static final String SCHEMA = "task_failure_recovery";
+ private final String bucketName = "test-delta-lake-task-failure-recovery-" + randomTableSuffix();
+
+ protected TestDeltaTaskFailureRecoveryTest()
+ {
+ super(RetryPolicy.TASK);
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner(
+ List> requiredTpchTables,
+ Map configProperties,
+ Map coordinatorProperties)
+ throws Exception
+ {
+ DockerizedMinioDataLake dockerizedMinioDataLake = closeAfterClass(createDockerizedMinioDataLakeForDeltaLake(bucketName));
+ MinioStorage minioStorage = closeAfterClass(new MinioStorage("test-exchange-spooling-" + randomTableSuffix()));
+ minioStorage.start();
+
+ DistributedQueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner(
+ DELTA_CATALOG,
+ SCHEMA,
+ ImmutableMap.builder()
+ .putAll(configProperties)
+ // currently not supported for fault tolerant execution mode
+ .put("enable-dynamic-filtering", "false")
+ .buildOrThrow(),
+ coordinatorProperties,
+ ImmutableMap.of("delta.enable-non-concurrent-writes", "true"),
+ dockerizedMinioDataLake.getMinioAddress(),
+ dockerizedMinioDataLake.getTestingHadoop(),
+ runner -> {
+ runner.installPlugin(new FileSystemExchangePlugin());
+ runner.loadExchangeManager("filesystem", getExchangeManagerProperties(minioStorage));
+ });
+ queryRunner.execute(format("CREATE SCHEMA %s WITH (location = 's3://%s/%s')", SCHEMA, bucketName, SCHEMA));
+ requiredTpchTables.forEach(table -> queryRunner.execute(format("CREATE TABLE %s AS SELECT * FROM tpch.tiny.%1$s", table.getTableName())));
+
+ return queryRunner;
+ }
+
+ @Override
+ public void testJoinDynamicFilteringEnabled()
+ {
+ assertThatThrownBy(super::testJoinDynamicFilteringEnabled)
+ .hasMessageContaining("Dynamic filtering is not supported with automatic task retries enabled");
+ }
+}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java
index e8580ae7eddf..d972640c770c 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java
@@ -142,7 +142,8 @@ private void setupTransactionLogAccess(String tableName, Path tableLocation)
Optional.empty(),
Optional.empty(),
Optional.empty(),
- 0);
+ 0,
+ false);
tableSnapshot = transactionLogAccess.loadSnapshot(tableHandle.getSchemaTableName(), tableLocation, SESSION);
}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java
index a74396fe72c0..76b79232ebd4 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreStatistics.java
@@ -179,7 +179,8 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam
Optional.empty(),
Optional.empty(),
Optional.empty(),
- 0);
+ 0,
+ false);
}
@Test
@@ -308,7 +309,8 @@ public void testStatisticsMultipleFiles()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
- 0);
+ 0,
+ false);
stats = deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithUnenforcedConstraint, Constraint.alwaysTrue());
columnStatistics = stats.getColumnStatistics().get(COLUMN_HANDLE);
assertEquals(columnStatistics.getRange().get().getMin(), 0.0);
@@ -331,7 +333,8 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
- 0);
+ 0,
+ false);
DeltaLakeTableHandle tableHandleWithNoneUnenforcedConstraint = new DeltaLakeTableHandle(
tableHandle.getSchemaName(),
tableHandle.getTableName(),
@@ -344,7 +347,8 @@ public void testStatisticsNoRecords()
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
tableHandle.getAnalyzeHandle(),
- 0);
+ 0,
+ false);
// If either the table handle's constraint or the provided Constraint are none, it will cause a 0 record count to be reported
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneEnforcedConstraint, Constraint.alwaysTrue()));
assertEmptyStats(deltaLakeMetastore.getTableStatistics(SESSION, tableHandleWithNoneUnenforcedConstraint, Constraint.alwaysTrue()));
diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java
index fde06369d41b..d69e017ef1df 100644
--- a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java
+++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java
@@ -456,7 +456,7 @@ protected void testNonSelect(Optional session, Optional setupQu
.finishesSuccessfully();
}
- private FailureRecoveryAssert assertThatQuery(String query)
+ protected FailureRecoveryAssert assertThatQuery(String query)
{
return new FailureRecoveryAssert(query);
}