From 866e09d1fc251f5b14721a53b736cc33cd8a17d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Kokosi=C5=84ski?= Date: Sat, 7 Sep 2024 23:20:09 +0200 Subject: [PATCH] Introduce RetryingJdbcClient So far JDBC connectors were able to retry in case of transient issues when establishing connections. RetryingJdbcClient is able to retry an operation on remote database during other situations. Here we can have nested retrying. One in jdbc client and other in connection factory. It is ok as as in both places we are using the retry policy with max duration to 30 seconds. The outer retries won't retry if the operation takes longer than 30s. --- .../java/io/trino/plugin/jdbc/JdbcModule.java | 5 +- .../io/trino/plugin/jdbc/RetryStrategy.java | 19 + .../jdbc/RetryingConnectionFactory.java | 42 +- .../jdbc/RetryingConnectionFactoryModule.java | 35 -- .../trino/plugin/jdbc/RetryingJdbcClient.java | 524 ++++++++++++++++++ .../io/trino/plugin/jdbc/RetryingModule.java | 92 +++ .../jdbc/TestRetryingConnectionFactory.java | 36 +- .../plugin/jdbc/TestRetryingJdbcClient.java | 35 ++ .../plugin/oracle/OracleClientModule.java | 2 +- .../plugin/oracle/TestingOracleServer.java | 4 +- plugin/trino-phoenix5/pom.xml | 5 + .../plugin/phoenix5/PhoenixClientModule.java | 4 +- 12 files changed, 711 insertions(+), 92 deletions(-) create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java delete mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java create mode 100644 plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java create mode 100644 plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 7d81a3cb90d9..b31790da736d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -22,7 +22,6 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.base.mapping.IdentifierMappingModule; import io.trino.plugin.base.session.SessionPropertiesProvider; -import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; import io.trino.plugin.jdbc.logging.RemoteQueryModifierModule; import io.trino.plugin.jdbc.procedure.ExecuteProcedure; import io.trino.plugin.jdbc.procedure.FlushJdbcMetadataCacheProcedure; @@ -52,7 +51,7 @@ public void setup(Binder binder) install(new JdbcDiagnosticModule()); install(new IdentifierMappingModule()); install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); newOptionalBinder(binder, ConnectorAccessControl.class); newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); @@ -86,7 +85,7 @@ public void setup(Binder binder) newExporter(binder).export(DynamicFilteringStats.class) .as(generator -> generator.generatedNameOf(DynamicFilteringStats.class, catalogName.get().toString())); - binder.bind(JdbcClient.class).annotatedWith(ForCaching.class).to(Key.get(StatisticsAwareJdbcClient.class)).in(Scopes.SINGLETON); + binder.bind(JdbcClient.class).annotatedWith(ForCaching.class).to(Key.get(RetryingJdbcClient.class)).in(Scopes.SINGLETON); binder.bind(CachingJdbcClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).to(Key.get(CachingJdbcClient.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java new file mode 100644 index 000000000000..183912a9706f --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +public interface RetryStrategy +{ + boolean isExceptionRecoverable(Throwable exception); +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index e4a21883be4c..77f1ee9dd018 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -13,48 +13,28 @@ */ package io.trino.plugin.jdbc; -import com.google.common.base.Throwables; import com.google.inject.Inject; import dev.failsafe.Failsafe; import dev.failsafe.FailsafeException; import dev.failsafe.RetryPolicy; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTransientException; -import java.util.Set; -import static java.time.temporal.ChronoUnit.MILLIS; -import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Objects.requireNonNull; public class RetryingConnectionFactory implements ConnectionFactory { - private final RetryPolicy retryPolicy; - private final ConnectionFactory delegate; + private final RetryPolicy retryPolicy; @Inject - public RetryingConnectionFactory(@ForRetrying ConnectionFactory delegate, Set retryStrategies) + public RetryingConnectionFactory(@ForRetrying ConnectionFactory delegate, RetryPolicy retryPolicy) { - requireNonNull(retryStrategies); this.delegate = requireNonNull(delegate, "delegate is null"); - this.retryPolicy = RetryPolicy.builder() - .withMaxDuration(java.time.Duration.of(30, SECONDS)) - .withMaxAttempts(5) - .withBackoff(50, 5_000, MILLIS, 4) - .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) - .abortOn(TrinoException.class) - .build(); - } - - private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) - { - return retryStrategies.stream() - .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); } @Override @@ -79,20 +59,4 @@ public void close() { delegate.close(); } - - public interface RetryStrategy - { - boolean isExceptionRecoverable(Throwable exception); - } - - public static class DefaultRetryStrategy - implements RetryStrategy - { - @Override - public boolean isExceptionRecoverable(Throwable exception) - { - return Throwables.getCausalChain(exception).stream() - .anyMatch(SQLTransientException.class::isInstance); - } - } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java deleted file mode 100644 index 26d5061be448..000000000000 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.jdbc; - -import com.google.inject.AbstractModule; -import com.google.inject.Key; -import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; -import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; - -import static com.google.inject.multibindings.Multibinder.newSetBinder; - -public class RetryingConnectionFactoryModule - extends AbstractModule -{ - @Override - public void configure() - { - bind(ConnectionFactory.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareConnectionFactory.class)).in(Scopes.SINGLETON); - bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); - newSetBinder(binder(), RetryStrategy.class).addBinding().to(DefaultRetryStrategy.class).in(Scopes.SINGLETON); - } -} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java new file mode 100644 index 000000000000..ac2fab50e744 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java @@ -0,0 +1,524 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; +import io.trino.plugin.jdbc.JdbcProcedureHandle.ProcedureQuery; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.Type; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import static io.trino.plugin.jdbc.RetryingModule.retry; +import static java.util.Objects.requireNonNull; + +public class RetryingJdbcClient + implements JdbcClient +{ + private final JdbcClient delegate; + private final RetryPolicy policy; + + @Inject + public RetryingJdbcClient(@ForRetrying JdbcClient delegate, RetryPolicy policy) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.policy = requireNonNull(policy, "policy is null"); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schema) + { + return retry(policy, () -> delegate.schemaExists(session, schema)); + } + + @Override + public Set getSchemaNames(ConnectorSession session) + { + return retry(policy, () -> delegate.getSchemaNames(session)); + } + + @Override + public List getTableNames(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getTableNames(session, schema)); + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + return retry(policy, () -> delegate.getTableHandle(session, schemaTableName)); + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return retry(policy, () -> delegate.getTableHandle(session, preparedQuery)); + } + + @Override + public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureQuery procedureQuery) + { + return retry(policy, () -> delegate.getProcedureHandle(session, procedureQuery)); + } + + @Override + public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getColumns(session, tableHandle)); + } + + @Override + public Iterator getAllTableColumns(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableColumns(session, schema)); + } + + @Override + public List getAllTableComments(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableComments(session, schema)); + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.toColumnMapping(session, connection, typeHandle); + } + + @Override + public List toColumnMappings(ConnectorSession session, List typeHandles) + { + return retry(policy, () -> delegate.toColumnMappings(session, typeHandles)); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + return retry(policy, () -> delegate.toWriteMapping(session, type)); + } + + @Override + public Optional getSupportedType(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.getSupportedType(session, type); + } + + @Override + public boolean supportsAggregationPushdown(ConnectorSession session, JdbcTableHandle table, List aggregates, Map assignments, List> groupingSets) + { + // there should be no remote database interaction + return delegate.supportsAggregationPushdown(session, table, aggregates, assignments, groupingSets); + } + + @Override + public Optional implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map assignments) + { + // there should be no remote database interaction + return delegate.implementAggregation(session, aggregate, assignments); + } + + @Override + public Optional convertPredicate(ConnectorSession session, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertPredicate(session, expression, assignments); + } + + @Override + public Optional convertProjection(ConnectorSession session, JdbcTableHandle handle, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertProjection(session, handle, expression, assignments); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getSplits(session, tableHandle)); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle) + { + return retry(policy, () -> delegate.getSplits(session, procedureHandle)); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, tableHandle); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, procedureHandle); + } + + @Override + public void abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + delegate.abortReadConnection(connection, resultSet); + } + + @Override + public PreparedQuery prepareQuery(ConnectorSession session, JdbcTableHandle table, Optional>> groupingSets, List columns, Map columnExpressions) + { + // there should be no remote database interaction + return delegate.prepareQuery(session, table, groupingSets, columns, columnExpressions); + } + + @Override + public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, JdbcTableHandle table, List columns) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildSql(session, connection, split, table, columns); + } + + @Override + public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildProcedure(session, connection, split, procedureHandle); + } + + @Override + public Optional implementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, Map leftProjections, PreparedQuery rightSource, Map rightProjections, List joinConditions, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics); + } + + @Override + public Optional legacyImplementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, PreparedQuery rightSource, List joinConditions, Map rightAssignments, Map leftAssignments, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics); + } + + @Override + public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) + { + // there should be no remote database interaction + return delegate.supportsTopN(session, handle, sortOrder); + } + + @Override + public boolean isTopNGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isTopNGuaranteed(session); + } + + @Override + public boolean supportsLimit() + { + // there should be no remote database interaction + return delegate.supportsLimit(); + } + + @Override + public boolean isLimitGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isLimitGuaranteed(session); + } + + @Override + public Optional getTableComment(ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation + return delegate.getTableComment(resultSet); + } + + @Override + public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional comment) + { + retry(policy, () -> delegate.setTableComment(session, handle, comment)); + } + + @Override + public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional comment) + { + // no retrying as it could be not idempotent operation + retry(policy, () -> delegate.setColumnComment(session, handle, column, comment)); + } + + @Override + public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column) + { + // no retrying as it could be not idempotent operation + delegate.addColumn(session, handle, column); + } + + @Override + public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropColumn(session, handle, column); + } + + @Override + public void renameColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName) + { + // no retrying as it could be not idempotent operation + delegate.renameColumn(session, handle, jdbcColumn, newColumnName); + } + + @Override + public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type) + { + // no retrying as it could be not idempotent operation + delegate.setColumnType(session, handle, column, type); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // safe to retry, but retry needs to handle exceptions in case the constraint is already dropped + delegate.dropNotNullConstraint(session, handle, column); + } + + @Override + public void renameTable(ConnectorSession session, JdbcTableHandle handle, SchemaTableName newTableName) + { + // no retrying as it could be not idempotent operation + delegate.renameTable(session, handle, newTableName); + } + + @Override + public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> properties) + { + // no retrying as it could be not idempotent operation + delegate.setTableProperties(session, handle, properties); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + delegate.createTable(session, tableMetadata); + } + + @Override + public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + return delegate.beginCreateTable(session, tableMetadata); + } + + @Override + public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.commitCreateTable(session, handle, pageSinkIds); + } + + @Override + public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTableHandle tableHandle, List columns) + { + // no retrying as it could be not idempotent operation + return delegate.beginInsertTable(session, tableHandle, columns); + } + + @Override + public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.finishInsertTable(session, handle, pageSinkIds); + } + + @Override + public void dropTable(ConnectorSession session, JdbcTableHandle jdbcTableHandle) + { + // no retrying as it could be not idempotent operation + delegate.dropTable(session, jdbcTableHandle); + } + + @Override + public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.rollbackCreateTable(session, handle); + } + + @Override + public boolean supportsRetries() + { + // there should be no remote database interaction + return delegate.supportsRetries(); + } + + @Override + public String buildInsertSql(JdbcOutputTableHandle handle, List columnWriters) + { + // there should be no remote database interaction + return delegate.buildInsertSql(handle, columnWriters); + } + + @Override + public Connection getConnection(ConnectorSession session) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle handle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, handle); + } + + @Override + public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional columnCount) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.getPreparedStatement(connection, sql, columnCount); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) + { + return retry(policy, () -> delegate.getTableStatistics(session, handle)); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName) + { + // no retrying as it could be not idempotent operation + delegate.createSchema(session, schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName, boolean cascade) + { + // no retrying as it could be not idempotent operation + delegate.dropSchema(session, schemaName, cascade); + } + + @Override + public void renameSchema(ConnectorSession session, String schemaName, String newSchemaName) + { + // no retrying as it could be not idempotent operation + delegate.renameSchema(session, schemaName, newSchemaName); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + // there should be no remote database interaction + return delegate.getSystemTable(session, tableName); + } + + @Override + public String quoted(String name) + { + // there should be no remote database interaction + return delegate.quoted(name); + } + + @Override + public String quoted(RemoteTableName remoteTableName) + { + // there should be no remote database interaction + return delegate.quoted(remoteTableName); + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableProperties(session, tableHandle)); + } + + @Override + public Optional getTableScanRedirection(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableScanRedirection(session, tableHandle)); + } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.delete(session, handle); + } + + @Override + public void truncateTable(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.truncateTable(session, handle); + } + + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.update(session, handle); + } + + @Override + public OptionalInt getMaxWriteParallelism(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxWriteParallelism(session)); + } + + @Override + public OptionalInt getMaxColumnNameLength(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxColumnNameLength(session)); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java new file mode 100644 index 000000000000..809e8d7b6a99 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.base.Throwables; +import com.google.inject.AbstractModule; +import com.google.inject.Key; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedRunnable; +import dev.failsafe.function.CheckedSupplier; +import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; +import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; +import io.trino.spi.TrinoException; + +import java.sql.SQLTransientException; +import java.time.Duration; +import java.util.Set; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.time.temporal.ChronoUnit.SECONDS; + +public class RetryingModule + extends AbstractModule +{ + @Override + public void configure() + { + bind(ConnectionFactory.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareConnectionFactory.class)).in(Scopes.SINGLETON); + bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); + bind(JdbcClient.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareJdbcClient.class)).in(Scopes.SINGLETON); + bind(RetryingJdbcClient.class).in(Scopes.SINGLETON); + newSetBinder(binder(), RetryStrategy.class).addBinding().to(OnSqlTransientExceptionRetryStrategy.class).in(Scopes.SINGLETON); + } + + @Provides + @Singleton + public RetryPolicy createRetryPolicy(Set retryStrategies) + { + return RetryPolicy.builder() + .withMaxDuration(Duration.of(30, SECONDS)) + .withMaxAttempts(5) + .withBackoff(50, 5_000, MILLIS, 4) + .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) + .abortOn(TrinoException.class) + .build(); + } + + public static T retry(RetryPolicy policy, CheckedSupplier supplier) + { + return Failsafe.with(policy) + .get(supplier); + } + + public static void retry(RetryPolicy policy, CheckedRunnable runnable) + { + Failsafe.with(policy) + .run(runnable); + } + + private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) + { + return retryStrategies.stream() + .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + } + + private static class OnSqlTransientExceptionRetryStrategy + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLTransientException.class::isInstance); + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java index 95e83c5d0a08..426b6b160286 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java @@ -14,12 +14,15 @@ package io.trino.plugin.jdbc; import com.google.common.base.Throwables; +import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; +import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; import io.trino.spi.StandardErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -156,21 +159,15 @@ public void testAdditionalRetryStrategyWorks() private static Injector createInjector(MockConnectorFactory.Action... actions) { - return Guice.createInjector(binder -> { + return Guice.createInjector(new TestingRetryingModule(), binder -> { binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); - binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); - binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); }); } private static Injector createInjectorWithAdditionalStrategy(MockConnectorFactory.Action... actions) { - return Guice.createInjector(binder -> { + return Guice.createInjector(new TestingRetryingModule(), binder -> { binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); - binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); - binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); newSetBinder(binder, RetryStrategy.class).addBinding().to(AdditionalRetryStrategy.class).in(Scopes.SINGLETON); }); } @@ -243,4 +240,25 @@ public enum Action RETURN, } } + + private static class TestingRetryingModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); + binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); + binder.bind(StatisticsAwareConnectionFactory.class).in(Scopes.SINGLETON); + binder.bind(StatisticsAwareJdbcClient.class).toInstance(new StatisticsAwareJdbcClient(new ForwardingJdbcClient() + { + @Override + protected JdbcClient delegate() + { + throw new UnsupportedOperationException(); + } + })); + binder.install(new RetryingModule()); + } + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java new file mode 100644 index 000000000000..384bd6edbd6b --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import dev.failsafe.RetryPolicy; +import org.junit.jupiter.api.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; +import static io.trino.spi.testing.InterfaceTestUtils.assertProperForwardingMethodsAreCalled; + +class TestRetryingJdbcClient +{ + @Test + public void testEverythingImplemented() + { + assertAllMethodsOverridden(JdbcClient.class, RetryingJdbcClient.class); + } + + @Test + public void testProperForwardingMethodsAreCalled() + { + assertProperForwardingMethodsAreCalled(JdbcClient.class, jdbcClient -> new RetryingJdbcClient(jdbcClient, RetryPolicy.ofDefaults())); + } +} diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 219d5d25f297..8d858c915fbd 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -27,7 +27,7 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.RetryStrategy; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.ptf.Query; diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java index 31831c4b99f6..8b089c5ce314 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java @@ -14,7 +14,6 @@ package io.trino.plugin.oracle; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; import com.google.common.io.Files; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; @@ -22,7 +21,6 @@ import io.trino.plugin.jdbc.ConnectionFactory; import io.trino.plugin.jdbc.DriverConnectionFactory; import io.trino.plugin.jdbc.RetryingConnectionFactory; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; import io.trino.plugin.jdbc.credential.StaticCredentialProvider; import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; import oracle.jdbc.OracleDriver; @@ -135,7 +133,7 @@ private ConnectionFactory getConnectionFactory(String connectionUrl, String user { StatisticsAwareConnectionFactory connectionFactory = new StatisticsAwareConnectionFactory( DriverConnectionFactory.builder(new OracleDriver(), connectionUrl, StaticCredentialProvider.of(username, password)).build()); - return new RetryingConnectionFactory(connectionFactory, ImmutableSet.of(new DefaultRetryStrategy())); + return new RetryingConnectionFactory(connectionFactory, RetryPolicy.ofDefaults()); } @Override diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index 5dd7c87ecb97..77962512e366 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -33,6 +33,11 @@ guice + + dev.failsafe + failsafe + + io.airlift bootstrap diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 1857863675ea..00ac2d9bc443 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -49,7 +49,7 @@ import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.QueryBuilder; import io.trino.plugin.jdbc.RetryingConnectionFactory; -import io.trino.plugin.jdbc.RetryingConnectionFactoryModule; +import io.trino.plugin.jdbc.RetryingModule; import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.TypeHandlingJdbcConfig; @@ -104,7 +104,7 @@ public PhoenixClientModule(String catalogName) protected void setup(Binder binder) { install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);