Skip to content

Commit

Permalink
Pluggable retry strategy for RetryingConnectionFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikzalewski committed Oct 17, 2023
1 parent 1206ec6 commit da3f8ec
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void setup(Binder binder)
install(new JdbcDiagnosticModule());
install(new IdentifierMappingModule());
install(new RemoteQueryModifierModule());
install(new RetryingConnectionFactoryModule());

newOptionalBinder(binder, ConnectorAccessControl.class);
newOptionalBinder(binder, QueryBuilder.class).setDefault().to(DefaultQueryBuilder.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory;
import io.trino.spi.connector.ConnectorSession;
import jakarta.annotation.Nullable;

Expand All @@ -30,10 +29,10 @@
public final class LazyConnectionFactory
implements ConnectionFactory
{
private final StatisticsAwareConnectionFactory delegate;
private final ConnectionFactory delegate;

@Inject
public LazyConnectionFactory(StatisticsAwareConnectionFactory delegate)
public LazyConnectionFactory(RetryingConnectionFactory delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;
Expand All @@ -33,28 +33,30 @@
public class RetryingConnectionFactory
implements ConnectionFactory
{
private static final RetryPolicy<Object> RETRY_POLICY = RetryPolicy.builder()
.withMaxDuration(java.time.Duration.of(30, SECONDS))
.withMaxAttempts(5)
.withBackoff(50, 5_000, MILLIS, 4)
.handleIf(RetryingConnectionFactory::isSqlRecoverableException)
.abortOn(TrinoException.class)
.build();
private final RetryPolicy<Object> retryPolicy;

private final StatisticsAwareConnectionFactory delegate;
private final ConnectionFactory delegate;

@Inject
public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate)
public RetryingConnectionFactory(StatisticsAwareConnectionFactory delegate, RetryStrategy retryStrategy)
{
requireNonNull(retryStrategy);
this.delegate = requireNonNull(delegate, "delegate is null");
this.retryPolicy = RetryPolicy.builder()
.withMaxDuration(java.time.Duration.of(30, SECONDS))
.withMaxAttempts(5)
.withBackoff(50, 5_000, MILLIS, 4)
.handleIf(retryStrategy::isExceptionRecoverable)
.abortOn(TrinoException.class)
.build();
}

@Override
public Connection openConnection(ConnectorSession session)
throws SQLException
{
try {
return Failsafe.with(RETRY_POLICY)
return Failsafe.with(retryPolicy)
.get(() -> delegate.openConnection(session));
}
catch (FailsafeException ex) {
Expand All @@ -72,9 +74,19 @@ public void close()
delegate.close();
}

private static boolean isSqlRecoverableException(Throwable exception)
public interface RetryStrategy
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLRecoverableException.class::isInstance);
boolean isExceptionRecoverable(Throwable exception);
}

public static class DefaultRetryStrategy
implements RetryStrategy
{
@Override
public boolean isExceptionRecoverable(Throwable exception)
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLTransientException.class::isInstance);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy;
import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;

public class RetryingConnectionFactoryModule
extends AbstractModule
{
@Override
public void configure()
{
bind(RetryingConnectionFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder(), RetryStrategy.class)
.setDefault()
.to(DefaultRetryStrategy.class)
.in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ public class TestLazyConnectionFactory
public void testNoConnectionIsCreated()
throws Exception
{
Injector injector = Guice.createInjector(binder -> binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
session -> {
throw new AssertionError("Expected no connection creation");
}));
Injector injector = Guice.createInjector(binder -> {
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
session -> {
throw new AssertionError("Expected no connection creation");
});
binder.install(new RetryingConnectionFactoryModule());
});

try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class);
Connection ignored = lazyConnectionFactory.openConnection(SESSION)) {
Expand All @@ -50,8 +53,11 @@ public void testConnectionCannotBeReusedAfterClose()
BaseJdbcConfig config = new BaseJdbcConfig()
.setConnectionUrl(format("jdbc:h2:mem:test%s;DB_CLOSE_DELAY=-1", System.nanoTime() + ThreadLocalRandom.current().nextLong()));

Injector injector = Guice.createInjector(binder -> binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider())));
Injector injector = Guice.createInjector(binder -> {
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).toInstance(
new DriverConnectionFactory(new Driver(), config, new EmptyCredentialProvider()));
binder.install(new RetryingConnectionFactoryModule());
});

try (LazyConnectionFactory lazyConnectionFactory = injector.getInstance(LazyConnectionFactory.class)) {
Connection connection = lazyConnectionFactory.openConnection(SESSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package io.trino.plugin.jdbc;

import com.google.common.base.Throwables;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Scopes;
import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -26,17 +28,20 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.stream.Stream;

import static com.google.common.reflect.Reflection.newProxy;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.RETURN;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_NPE;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_EXCEPTION;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_RECOVERABLE_EXCEPTION;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_SQL_TRANSIENT_EXCEPTION;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_TRINO_EXCEPTION;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION;
import static io.trino.plugin.jdbc.TestRetryingConnectionFactory.MockConnectorFactory.Action.THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION;
import static io.trino.spi.block.TestingSession.SESSION;
import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -68,7 +73,7 @@ public void testSimplyReturnConnection()
@Test
public void testRetryAndStopOnTrinoException()
{
Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, THROW_TRINO_EXCEPTION);
Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, THROW_TRINO_EXCEPTION);
ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class);
MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class);

Expand All @@ -82,7 +87,7 @@ public void testRetryAndStopOnTrinoException()
@Test
public void testRetryAndStopOnSqlException()
{
Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, THROW_SQL_EXCEPTION);
Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, THROW_SQL_EXCEPTION);
ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class);
MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class);

Expand Down Expand Up @@ -111,7 +116,7 @@ public void testNullPointerException()
public void testRetryAndReturn()
throws Exception
{
Injector injector = createInjector(THROW_SQL_RECOVERABLE_EXCEPTION, RETURN);
Injector injector = createInjector(THROW_SQL_TRANSIENT_EXCEPTION, RETURN);
ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class);
MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class);

Expand All @@ -125,7 +130,21 @@ public void testRetryAndReturn()
public void testRetryOnWrappedAndReturn()
throws Exception
{
Injector injector = createInjector(THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION, RETURN);
Injector injector = createInjector(THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION, RETURN);
ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class);
MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class);

Connection connection = factory.openConnection(SESSION);

assertThat(connection).isNotNull();
assertThat(mock.getCallCount()).isEqualTo(2);
}

@Test
public void testOverridingRetryStrategyWorks()
throws Exception
{
Injector injector = createInjectorWithOverridenStrategy(THROW_SQL_RECOVERABLE_EXCEPTION, RETURN);
ConnectionFactory factory = injector.getInstance(RetryingConnectionFactory.class);
MockConnectorFactory mock = injector.getInstance(MockConnectorFactory.class);

Expand All @@ -141,9 +160,32 @@ private static Injector createInjector(MockConnectorFactory.Action... actions)
binder.bind(MockConnectorFactory.Action[].class).toInstance(actions);
binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON);
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class));
binder.install(new RetryingConnectionFactoryModule());
});
}

private static Injector createInjectorWithOverridenStrategy(MockConnectorFactory.Action... actions)
{
return Guice.createInjector(binder -> {
binder.bind(MockConnectorFactory.Action[].class).toInstance(actions);
binder.bind(MockConnectorFactory.class).in(Scopes.SINGLETON);
binder.bind(ConnectionFactory.class).annotatedWith(ForBaseJdbc.class).to(Key.get(MockConnectorFactory.class));
binder.install(new RetryingConnectionFactoryModule());
newOptionalBinder(binder, RetryStrategy.class).setBinding().to(OverrideRetryStrategy.class).in(Scopes.SINGLETON);
});
}

private static class OverrideRetryStrategy
implements RetryStrategy
{
@Override
public boolean isExceptionRecoverable(Throwable exception)
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLRecoverableException.class::isInstance);
}
}

public static class MockConnectorFactory
implements ConnectionFactory
{
Expand Down Expand Up @@ -181,6 +223,10 @@ public Connection openConnection(ConnectorSession session)
throw new SQLRecoverableException("Testing sql recoverable exception");
case THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION:
throw new RuntimeException(new SQLRecoverableException("Testing sql recoverable exception"));
case THROW_SQL_TRANSIENT_EXCEPTION:
throw new SQLTransientException("Testing sql transient exception");
case THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION:
throw new RuntimeException(new SQLTransientException("Testing sql transient exception"));
}
throw new IllegalStateException("Unsupported action:" + action);
}
Expand All @@ -191,6 +237,8 @@ public enum Action
THROW_SQL_EXCEPTION,
THROW_SQL_RECOVERABLE_EXCEPTION,
THROW_WRAPPED_SQL_RECOVERABLE_EXCEPTION,
THROW_SQL_TRANSIENT_EXCEPTION,
THROW_WRAPPED_SQL_TRANSIENT_EXCEPTION,
THROW_NPE,
RETURN,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.oracle;

import com.google.common.base.Throwables;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
Expand All @@ -26,15 +27,15 @@
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.MaxDomainCompactionThreshold;
import io.trino.plugin.jdbc.RetryingConnectionFactory;
import io.trino.plugin.jdbc.RetryingConnectionFactory.RetryStrategy;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.function.table.ConnectorTableFunction;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleDriver;

import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.util.Properties;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
Expand All @@ -54,6 +55,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(OracleConfig.class);
newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)).setBinding().toInstance(ORACLE_MAX_LIST_EXPRESSIONS);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, RetryStrategy.class).setBinding().to(OracleRetryStrategy.class).in(Scopes.SINGLETON);
}

@Provides
Expand All @@ -77,11 +79,22 @@ public static ConnectionFactory connectionFactory(BaseJdbcConfig config, Credent
openTelemetry);
}

return new RetryingConnectionFactory(new StatisticsAwareConnectionFactory(new DriverConnectionFactory(
return new DriverConnectionFactory(
new OracleDriver(),
config.getConnectionUrl(),
connectionProperties,
credentialProvider,
openTelemetry)));
openTelemetry);
}

private static class OracleRetryStrategy
implements RetryStrategy
{
@Override
public boolean isExceptionRecoverable(Throwable exception)
{
return Throwables.getCausalChain(exception).stream()
.anyMatch(SQLRecoverableException.class::isInstance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.RetryingConnectionFactory;
import io.trino.plugin.jdbc.RetryingConnectionFactory.DefaultRetryStrategy;
import io.trino.plugin.jdbc.credential.StaticCredentialProvider;
import io.trino.plugin.jdbc.jmx.StatisticsAwareConnectionFactory;
import io.trino.testing.ResourcePresence;
Expand Down Expand Up @@ -130,7 +131,7 @@ private ConnectionFactory getConnectionFactory(String connectionUrl, String user
new OracleDriver(),
new BaseJdbcConfig().setConnectionUrl(connectionUrl),
StaticCredentialProvider.of(username, password)));
return new RetryingConnectionFactory(connectionFactory);
return new RetryingConnectionFactory(connectionFactory, new DefaultRetryStrategy());
}

@Override
Expand Down
Loading

0 comments on commit da3f8ec

Please sign in to comment.