diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 7d81a3cb90d9..b31790da736d 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -22,7 +22,6 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.base.mapping.IdentifierMappingModule; import io.trino.plugin.base.session.SessionPropertiesProvider; -import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; import io.trino.plugin.jdbc.logging.RemoteQueryModifierModule; import io.trino.plugin.jdbc.procedure.ExecuteProcedure; import io.trino.plugin.jdbc.procedure.FlushJdbcMetadataCacheProcedure; @@ -52,7 +51,7 @@ public void setup(Binder binder) install(new JdbcDiagnosticModule()); install(new IdentifierMappingModule()); install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); newOptionalBinder(binder, ConnectorAccessControl.class); newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); @@ -86,7 +85,7 @@ public void setup(Binder binder) newExporter(binder).export(DynamicFilteringStats.class) .as(generator -> generator.generatedNameOf(DynamicFilteringStats.class, catalogName.get().toString())); - binder.bind(JdbcClient.class).annotatedWith(ForCaching.class).to(Key.get(StatisticsAwareJdbcClient.class)).in(Scopes.SINGLETON); + binder.bind(JdbcClient.class).annotatedWith(ForCaching.class).to(Key.get(RetryingJdbcClient.class)).in(Scopes.SINGLETON); binder.bind(CachingJdbcClient.class).in(Scopes.SINGLETON); binder.bind(JdbcClient.class).to(Key.get(CachingJdbcClient.class)).in(Scopes.SINGLETON); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java new file mode 100644 index 000000000000..183912a9706f --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryStrategy.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +public interface RetryStrategy +{ + boolean isExceptionRecoverable(Throwable exception); +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java index e4a21883be4c..77f1ee9dd018 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactory.java @@ -13,48 +13,28 @@ */ package io.trino.plugin.jdbc; -import com.google.common.base.Throwables; import com.google.inject.Inject; import dev.failsafe.Failsafe; import dev.failsafe.FailsafeException; import dev.failsafe.RetryPolicy; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTransientException; -import java.util.Set; -import static java.time.temporal.ChronoUnit.MILLIS; -import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Objects.requireNonNull; public class RetryingConnectionFactory implements ConnectionFactory { - private final RetryPolicy retryPolicy; - private final ConnectionFactory delegate; + private final RetryPolicy retryPolicy; @Inject - public RetryingConnectionFactory(@ForRetrying ConnectionFactory delegate, Set retryStrategies) + public RetryingConnectionFactory(@ForRetrying ConnectionFactory delegate, RetryPolicy retryPolicy) { - requireNonNull(retryStrategies); this.delegate = requireNonNull(delegate, "delegate is null"); - this.retryPolicy = RetryPolicy.builder() - .withMaxDuration(java.time.Duration.of(30, SECONDS)) - .withMaxAttempts(5) - .withBackoff(50, 5_000, MILLIS, 4) - .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) - .abortOn(TrinoException.class) - .build(); - } - - private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) - { - return retryStrategies.stream() - .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + this.retryPolicy = requireNonNull(retryPolicy, "retryPolicy is null"); } @Override @@ -79,20 +59,4 @@ public void close() { delegate.close(); } - - public interface RetryStrategy - { - boolean isExceptionRecoverable(Throwable exception); - } - - public static class DefaultRetryStrategy - implements RetryStrategy - { - @Override - public boolean isExceptionRecoverable(Throwable exception) - { - return Throwables.getCausalChain(exception).stream() - .anyMatch(SQLTransientException.class::isInstance); - } - } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java deleted file mode 100644 index 26d5061be448..000000000000 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingConnectionFactoryModule.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.jdbc; - -import com.google.inject.AbstractModule; -import com.google.inject.Key; -import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; -import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; - -import static com.google.inject.multibindings.Multibinder.newSetBinder; - -public class RetryingConnectionFactoryModule - extends AbstractModule -{ - @Override - public void configure() - { - bind(ConnectionFactory.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareConnectionFactory.class)).in(Scopes.SINGLETON); - bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); - newSetBinder(binder(), RetryStrategy.class).addBinding().to(DefaultRetryStrategy.class).in(Scopes.SINGLETON); - } -} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java new file mode 100644 index 000000000000..ac2fab50e744 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingJdbcClient.java @@ -0,0 +1,524 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; +import io.trino.plugin.jdbc.JdbcProcedureHandle.ProcedureQuery; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.connector.TableScanRedirectApplicationResult; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.Type; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import static io.trino.plugin.jdbc.RetryingModule.retry; +import static java.util.Objects.requireNonNull; + +public class RetryingJdbcClient + implements JdbcClient +{ + private final JdbcClient delegate; + private final RetryPolicy policy; + + @Inject + public RetryingJdbcClient(@ForRetrying JdbcClient delegate, RetryPolicy policy) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.policy = requireNonNull(policy, "policy is null"); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schema) + { + return retry(policy, () -> delegate.schemaExists(session, schema)); + } + + @Override + public Set getSchemaNames(ConnectorSession session) + { + return retry(policy, () -> delegate.getSchemaNames(session)); + } + + @Override + public List getTableNames(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getTableNames(session, schema)); + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + return retry(policy, () -> delegate.getTableHandle(session, schemaTableName)); + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + return retry(policy, () -> delegate.getTableHandle(session, preparedQuery)); + } + + @Override + public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, ProcedureQuery procedureQuery) + { + return retry(policy, () -> delegate.getProcedureHandle(session, procedureQuery)); + } + + @Override + public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getColumns(session, tableHandle)); + } + + @Override + public Iterator getAllTableColumns(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableColumns(session, schema)); + } + + @Override + public List getAllTableComments(ConnectorSession session, Optional schema) + { + return retry(policy, () -> delegate.getAllTableComments(session, schema)); + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.toColumnMapping(session, connection, typeHandle); + } + + @Override + public List toColumnMappings(ConnectorSession session, List typeHandles) + { + return retry(policy, () -> delegate.toColumnMappings(session, typeHandles)); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + return retry(policy, () -> delegate.toWriteMapping(session, type)); + } + + @Override + public Optional getSupportedType(ConnectorSession session, Type type) + { + // there should be no remote database interaction + return delegate.getSupportedType(session, type); + } + + @Override + public boolean supportsAggregationPushdown(ConnectorSession session, JdbcTableHandle table, List aggregates, Map assignments, List> groupingSets) + { + // there should be no remote database interaction + return delegate.supportsAggregationPushdown(session, table, aggregates, assignments, groupingSets); + } + + @Override + public Optional implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map assignments) + { + // there should be no remote database interaction + return delegate.implementAggregation(session, aggregate, assignments); + } + + @Override + public Optional convertPredicate(ConnectorSession session, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertPredicate(session, expression, assignments); + } + + @Override + public Optional convertProjection(ConnectorSession session, JdbcTableHandle handle, ConnectorExpression expression, Map assignments) + { + // there should be no remote database interaction + return delegate.convertProjection(session, handle, expression, assignments); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getSplits(session, tableHandle)); + } + + @Override + public ConnectorSplitSource getSplits(ConnectorSession session, JdbcProcedureHandle procedureHandle) + { + return retry(policy, () -> delegate.getSplits(session, procedureHandle)); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcTableHandle tableHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, tableHandle); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, split, procedureHandle); + } + + @Override + public void abortReadConnection(Connection connection, ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + delegate.abortReadConnection(connection, resultSet); + } + + @Override + public PreparedQuery prepareQuery(ConnectorSession session, JdbcTableHandle table, Optional>> groupingSets, List columns, Map columnExpressions) + { + // there should be no remote database interaction + return delegate.prepareQuery(session, table, groupingSets, columns, columnExpressions); + } + + @Override + public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, JdbcTableHandle table, List columns) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildSql(session, connection, split, table, columns); + } + + @Override + public CallableStatement buildProcedure(ConnectorSession session, Connection connection, JdbcSplit split, JdbcProcedureHandle procedureHandle) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.buildProcedure(session, connection, split, procedureHandle); + } + + @Override + public Optional implementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, Map leftProjections, PreparedQuery rightSource, Map rightProjections, List joinConditions, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics); + } + + @Override + public Optional legacyImplementJoin(ConnectorSession session, JoinType joinType, PreparedQuery leftSource, PreparedQuery rightSource, List joinConditions, Map rightAssignments, Map leftAssignments, JoinStatistics statistics) + { + // there should be no remote database interaction + return delegate.legacyImplementJoin(session, joinType, leftSource, rightSource, joinConditions, rightAssignments, leftAssignments, statistics); + } + + @Override + public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) + { + // there should be no remote database interaction + return delegate.supportsTopN(session, handle, sortOrder); + } + + @Override + public boolean isTopNGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isTopNGuaranteed(session); + } + + @Override + public boolean supportsLimit() + { + // there should be no remote database interaction + return delegate.supportsLimit(); + } + + @Override + public boolean isLimitGuaranteed(ConnectorSession session) + { + // there should be no remote database interaction + return delegate.isLimitGuaranteed(session); + } + + @Override + public Optional getTableComment(ResultSet resultSet) + throws SQLException + { + // no retrying as it could be not idempotent operation + return delegate.getTableComment(resultSet); + } + + @Override + public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional comment) + { + retry(policy, () -> delegate.setTableComment(session, handle, comment)); + } + + @Override + public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional comment) + { + // no retrying as it could be not idempotent operation + retry(policy, () -> delegate.setColumnComment(session, handle, column, comment)); + } + + @Override + public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column) + { + // no retrying as it could be not idempotent operation + delegate.addColumn(session, handle, column); + } + + @Override + public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // no retrying as it could be not idempotent operation + delegate.dropColumn(session, handle, column); + } + + @Override + public void renameColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName) + { + // no retrying as it could be not idempotent operation + delegate.renameColumn(session, handle, jdbcColumn, newColumnName); + } + + @Override + public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type) + { + // no retrying as it could be not idempotent operation + delegate.setColumnType(session, handle, column, type); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + // safe to retry, but retry needs to handle exceptions in case the constraint is already dropped + delegate.dropNotNullConstraint(session, handle, column); + } + + @Override + public void renameTable(ConnectorSession session, JdbcTableHandle handle, SchemaTableName newTableName) + { + // no retrying as it could be not idempotent operation + delegate.renameTable(session, handle, newTableName); + } + + @Override + public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> properties) + { + // no retrying as it could be not idempotent operation + delegate.setTableProperties(session, handle, properties); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + delegate.createTable(session, tableMetadata); + } + + @Override + public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + // no retrying as it could be not idempotent operation + return delegate.beginCreateTable(session, tableMetadata); + } + + @Override + public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.commitCreateTable(session, handle, pageSinkIds); + } + + @Override + public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTableHandle tableHandle, List columns) + { + // no retrying as it could be not idempotent operation + return delegate.beginInsertTable(session, tableHandle, columns); + } + + @Override + public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + // no retrying as it could be not idempotent operation + delegate.finishInsertTable(session, handle, pageSinkIds); + } + + @Override + public void dropTable(ConnectorSession session, JdbcTableHandle jdbcTableHandle) + { + // no retrying as it could be not idempotent operation + delegate.dropTable(session, jdbcTableHandle); + } + + @Override + public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.rollbackCreateTable(session, handle); + } + + @Override + public boolean supportsRetries() + { + // there should be no remote database interaction + return delegate.supportsRetries(); + } + + @Override + public String buildInsertSql(JdbcOutputTableHandle handle, List columnWriters) + { + // there should be no remote database interaction + return delegate.buildInsertSql(handle, columnWriters); + } + + @Override + public Connection getConnection(ConnectorSession session) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session); + } + + @Override + public Connection getConnection(ConnectorSession session, JdbcOutputTableHandle handle) + throws SQLException + { + // retry already implemented by RetryingConnectionFactory + return delegate.getConnection(session, handle); + } + + @Override + public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional columnCount) + throws SQLException + { + // no retrying as it could be not idempotent operation (connection could be not reusable after the first failure) + return delegate.getPreparedStatement(connection, sql, columnCount); + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) + { + return retry(policy, () -> delegate.getTableStatistics(session, handle)); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName) + { + // no retrying as it could be not idempotent operation + delegate.createSchema(session, schemaName); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName, boolean cascade) + { + // no retrying as it could be not idempotent operation + delegate.dropSchema(session, schemaName, cascade); + } + + @Override + public void renameSchema(ConnectorSession session, String schemaName, String newSchemaName) + { + // no retrying as it could be not idempotent operation + delegate.renameSchema(session, schemaName, newSchemaName); + } + + @Override + public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) + { + // there should be no remote database interaction + return delegate.getSystemTable(session, tableName); + } + + @Override + public String quoted(String name) + { + // there should be no remote database interaction + return delegate.quoted(name); + } + + @Override + public String quoted(RemoteTableName remoteTableName) + { + // there should be no remote database interaction + return delegate.quoted(remoteTableName); + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableProperties(session, tableHandle)); + } + + @Override + public Optional getTableScanRedirection(ConnectorSession session, JdbcTableHandle tableHandle) + { + return retry(policy, () -> delegate.getTableScanRedirection(session, tableHandle)); + } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.delete(session, handle); + } + + @Override + public void truncateTable(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + delegate.truncateTable(session, handle); + } + + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + // no retrying as it could be not idempotent operation + return delegate.update(session, handle); + } + + @Override + public OptionalInt getMaxWriteParallelism(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxWriteParallelism(session)); + } + + @Override + public OptionalInt getMaxColumnNameLength(ConnectorSession session) + { + return retry(policy, () -> delegate.getMaxColumnNameLength(session)); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java new file mode 100644 index 000000000000..809e8d7b6a99 --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/RetryingModule.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.google.common.base.Throwables; +import com.google.inject.AbstractModule; +import com.google.inject.Key; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedRunnable; +import dev.failsafe.function.CheckedSupplier; +import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; +import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; +import io.trino.spi.TrinoException; + +import java.sql.SQLTransientException; +import java.time.Duration; +import java.util.Set; + +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static java.time.temporal.ChronoUnit.MILLIS; +import static java.time.temporal.ChronoUnit.SECONDS; + +public class RetryingModule + extends AbstractModule +{ + @Override + public void configure() + { + bind(ConnectionFactory.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareConnectionFactory.class)).in(Scopes.SINGLETON); + bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON); + bind(JdbcClient.class).annotatedWith(ForRetrying.class).to(Key.get(StatisticsAwareJdbcClient.class)).in(Scopes.SINGLETON); + bind(RetryingJdbcClient.class).in(Scopes.SINGLETON); + newSetBinder(binder(), RetryStrategy.class).addBinding().to(OnSqlTransientExceptionRetryStrategy.class).in(Scopes.SINGLETON); + } + + @Provides + @Singleton + public RetryPolicy createRetryPolicy(Set retryStrategies) + { + return RetryPolicy.builder() + .withMaxDuration(Duration.of(30, SECONDS)) + .withMaxAttempts(5) + .withBackoff(50, 5_000, MILLIS, 4) + .handleIf(throwable -> isExceptionRecoverable(retryStrategies, throwable)) + .abortOn(TrinoException.class) + .build(); + } + + public static T retry(RetryPolicy policy, CheckedSupplier supplier) + { + return Failsafe.with(policy) + .get(supplier); + } + + public static void retry(RetryPolicy policy, CheckedRunnable runnable) + { + Failsafe.with(policy) + .run(runnable); + } + + private static boolean isExceptionRecoverable(Set retryStrategies, Throwable throwable) + { + return retryStrategies.stream() + .anyMatch(retryStrategy -> retryStrategy.isExceptionRecoverable(throwable)); + } + + private static class OnSqlTransientExceptionRetryStrategy + implements RetryStrategy + { + @Override + public boolean isExceptionRecoverable(Throwable exception) + { + return Throwables.getCausalChain(exception).stream() + .anyMatch(SQLTransientException.class::isInstance); + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java index 95e83c5d0a08..426b6b160286 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingConnectionFactory.java @@ -14,12 +14,15 @@ package io.trino.plugin.jdbc; import com.google.common.base.Throwables; +import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; +import com.google.inject.Module; import com.google.inject.Scopes; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; +import io.trino.plugin.jdbc.jmx.StatisticsAwareJdbcClient; import io.trino.spi.StandardErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; @@ -156,21 +159,15 @@ public void testAdditionalRetryStrategyWorks() private static Injector createInjector(MockConnectorFactory.Action... actions) { - return Guice.createInjector(binder -> { + return Guice.createInjector(new TestingRetryingModule(), binder -> { binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); - binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); - binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); }); } private static Injector createInjectorWithAdditionalStrategy(MockConnectorFactory.Action... actions) { - return Guice.createInjector(binder -> { + return Guice.createInjector(new TestingRetryingModule(), binder -> { binder.bind(MockConnectorFactory.Action[].class).toInstance(actions); - binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); - binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); - binder.install(new RetryingConnectionFactoryModule()); newSetBinder(binder, RetryStrategy.class).addBinding().to(AdditionalRetryStrategy.class).in(Scopes.SINGLETON); }); } @@ -243,4 +240,25 @@ public enum Action RETURN, } } + + private static class TestingRetryingModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON); + binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class)); + binder.bind(StatisticsAwareConnectionFactory.class).in(Scopes.SINGLETON); + binder.bind(StatisticsAwareJdbcClient.class).toInstance(new StatisticsAwareJdbcClient(new ForwardingJdbcClient() + { + @Override + protected JdbcClient delegate() + { + throw new UnsupportedOperationException(); + } + })); + binder.install(new RetryingModule()); + } + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java new file mode 100644 index 000000000000..384bd6edbd6b --- /dev/null +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestRetryingJdbcClient.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import dev.failsafe.RetryPolicy; +import org.junit.jupiter.api.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; +import static io.trino.spi.testing.InterfaceTestUtils.assertProperForwardingMethodsAreCalled; + +class TestRetryingJdbcClient +{ + @Test + public void testEverythingImplemented() + { + assertAllMethodsOverridden(JdbcClient.class, RetryingJdbcClient.class); + } + + @Test + public void testProperForwardingMethodsAreCalled() + { + assertProperForwardingMethodsAreCalled(JdbcClient.class, jdbcClient -> new RetryingJdbcClient(jdbcClient, RetryPolicy.ofDefaults())); + } +} diff --git a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java index 219d5d25f297..8d858c915fbd 100644 --- a/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java +++ b/plugin/trino-oracle/src/main/java/io/trino/plugin/oracle/OracleClientModule.java @@ -27,7 +27,7 @@ import io.trino.plugin.jdbc.ForBaseJdbc; import io.trino.plugin.jdbc.JdbcClient; import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; -import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy; +import io.trino.plugin.jdbc.RetryStrategy; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.ptf.Query; diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java index 31831c4b99f6..8b089c5ce314 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/TestingOracleServer.java @@ -14,7 +14,6 @@ package io.trino.plugin.oracle; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; import com.google.common.io.Files; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; @@ -22,7 +21,6 @@ import io.trino.plugin.jdbc.ConnectionFactory; import io.trino.plugin.jdbc.DriverConnectionFactory; import io.trino.plugin.jdbc.RetryingConnectionFactory; -import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy; import io.trino.plugin.jdbc.credential.StaticCredentialProvider; import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory; import oracle.jdbc.OracleDriver; @@ -135,7 +133,7 @@ private ConnectionFactory getConnectionFactory(String connectionUrl, String user { StatisticsAwareConnectionFactory connectionFactory = new StatisticsAwareConnectionFactory( DriverConnectionFactory.builder(new OracleDriver(), connectionUrl, StaticCredentialProvider.of(username, password)).build()); - return new RetryingConnectionFactory(connectionFactory, ImmutableSet.of(new DefaultRetryStrategy())); + return new RetryingConnectionFactory(connectionFactory, RetryPolicy.ofDefaults()); } @Override diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index 5dd7c87ecb97..77962512e366 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -33,6 +33,11 @@ guice + + dev.failsafe + failsafe + + io.airlift bootstrap diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 1857863675ea..00ac2d9bc443 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -49,7 +49,7 @@ import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; import io.trino.plugin.jdbc.QueryBuilder; import io.trino.plugin.jdbc.RetryingConnectionFactory; -import io.trino.plugin.jdbc.RetryingConnectionFactoryModule; +import io.trino.plugin.jdbc.RetryingModule; import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; import io.trino.plugin.jdbc.TimestampTimeZoneDomain; import io.trino.plugin.jdbc.TypeHandlingJdbcConfig; @@ -104,7 +104,7 @@ public PhoenixClientModule(String catalogName) protected void setup(Binder binder) { install(new RemoteQueryModifierModule()); - install(new RetryingConnectionFactoryModule()); + install(new RetryingModule()); binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);