Skip to content

Commit

Permalink
Retry queries on Databricks when dealing with a communication failure
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath authored and mosabua committed Apr 30, 2024
1 parent 7ed571e commit 7003975
Showing 1 changed file with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;

import java.sql.SQLException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -50,10 +52,17 @@ public final class DeltaLakeTestUtils
@Language("RegExp")
public static final String DATABRICKS_COMMUNICATION_FAILURE_MATCH =
"\\Q[Databricks][\\E(DatabricksJDBCDriver|JDBCDriver)\\Q](500593) Communication link failure. Failed to connect to server. Reason: \\E" +
"(" +
"(HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503|HTTP Response code: 504), Error message: Unknown." +
"|java.net.SocketTimeoutException: Read timed out." +
")";
"(" +
"(HTTP retry after response received with no Retry-After header, error: HTTP Response code: 503|HTTP Response code: 504), Error message: Unknown." +
"|java.net.SocketTimeoutException: Read timed out." +
")";
private static final RetryPolicy<QueryResult> DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY = RetryPolicy.<QueryResult>builder()
.handleIf(throwable -> Throwables.getRootCause(throwable) instanceof SQLException)
.handleIf(throwable -> Pattern.compile(DATABRICKS_COMMUNICATION_FAILURE_MATCH).matcher(Throwables.getRootCause(throwable).getMessage()).find())
.withBackoff(1, 10, ChronoUnit.SECONDS)
.withMaxRetries(3)
.onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount()))
.build();
private static final RetryPolicy<QueryResult> CONCURRENT_MODIFICATION_EXCEPTION_RETRY_POLICY = RetryPolicy.<QueryResult>builder()
.handleIf(throwable -> Throwables.getRootCause(throwable) instanceof ConcurrentModificationException)
.handleIf(throwable -> throwable.getMessage() != null && throwable.getMessage().contains("Table being modified concurrently"))
Expand All @@ -66,7 +75,10 @@ private DeltaLakeTestUtils() {}

public static Optional<DatabricksVersion> getDatabricksRuntimeVersion()
{
String version = (String) onDelta().executeQuery("SELECT java_method('java.lang.System', 'getenv', 'DATABRICKS_RUNTIME_VERSION')").getOnlyValue();
String version = (String) Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery("SELECT java_method('java.lang.System', 'getenv', 'DATABRICKS_RUNTIME_VERSION')"))
.getOnlyValue();

// OSS Spark returns null
if (version.equals("null")) {
return Optional.empty();
Expand All @@ -91,7 +103,8 @@ public static void skipTestUnlessUnsupportedWriterVersionExists()

public static List<String> getColumnNamesOnDelta(String schemaName, String tableName)
{
QueryResult result = onDelta().executeQuery("SHOW COLUMNS IN " + schemaName + "." + tableName);
QueryResult result = Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery("SHOW COLUMNS IN " + schemaName + "." + tableName));
return result.column(1);
}

Expand All @@ -104,7 +117,8 @@ public static String getColumnCommentOnTrino(String schemaName, String tableName

public static String getColumnCommentOnDelta(String schemaName, String tableName, String columnName)
{
QueryResult result = onDelta().executeQuery(format("DESCRIBE %s.%s %s", schemaName, tableName, columnName));
QueryResult result = Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery(format("DESCRIBE %s.%s %s", schemaName, tableName, columnName)));
return (String) result.row(2).get(1);
}

Expand All @@ -116,7 +130,8 @@ public static String getTableCommentOnTrino(String schemaName, String tableName)

public static String getTableCommentOnDelta(String schemaName, String tableName)
{
QueryResult result = onDelta().executeQuery(format("DESCRIBE EXTENDED %s.%s", schemaName, tableName));
QueryResult result = Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery(format("DESCRIBE EXTENDED %s.%s", schemaName, tableName)));
return (String) result.rows().stream()
.filter(row -> row.get(0).equals("Comment"))
.map(row -> row.get(1))
Expand All @@ -125,15 +140,17 @@ public static String getTableCommentOnDelta(String schemaName, String tableName)

public static Map<String, String> getTablePropertiesOnDelta(String schemaName, String tableName)
{
QueryResult result = onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s".formatted(schemaName, tableName));
QueryResult result = Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s".formatted(schemaName, tableName)));
return result.rows().stream()
.map(column -> Map.entry((String) column.get(0), (String) column.get(1)))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static String getTablePropertyOnDelta(String schemaName, String tableName, String propertyName)
{
QueryResult result = onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s(%s)".formatted(schemaName, tableName, propertyName));
QueryResult result = Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY)
.get(() -> onDelta().executeQuery("SHOW TBLPROPERTIES %s.%s(%s)".formatted(schemaName, tableName, propertyName)));
return (String) getOnlyElement(result.rows()).get(1);
}

Expand Down

0 comments on commit 7003975

Please sign in to comment.