diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java index 9d3d3269055a..8b5b19fded03 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSet.java @@ -14,6 +14,7 @@ package io.trino.plugin.jdbc; import com.google.common.collect.ImmutableList; +import dev.failsafe.RetryPolicy; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordSet; @@ -22,6 +23,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; +import static io.trino.plugin.jdbc.RetryingModule.retry; import static java.util.Objects.requireNonNull; public class JdbcRecordSet @@ -34,8 +36,16 @@ public class JdbcRecordSet private final List columnTypes; private final JdbcSplit split; private final ConnectorSession session; + private final RetryPolicy policy; - public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List columnHandles) + public JdbcRecordSet( + JdbcClient jdbcClient, + ExecutorService executor, + ConnectorSession session, + RetryPolicy policy, + JdbcSplit split, + BaseJdbcConnectorTableHandle table, + List columnHandles) { this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.executor = requireNonNull(executor, "executor is null"); @@ -49,6 +59,7 @@ public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorS } this.columnTypes = types.build(); this.session = requireNonNull(session, "session is null"); + this.policy = requireNonNull(policy, "policy is null"); } @Override @@ -60,6 +71,6 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - return new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles); + return retry(policy, () -> new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles)); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java index f32b04049224..0e09450e885f 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcRecordSetProvider.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; +import dev.failsafe.RetryPolicy; import io.trino.plugin.base.MappedRecordSet; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorRecordSetProvider; @@ -41,12 +42,14 @@ public class JdbcRecordSetProvider { private final JdbcClient jdbcClient; private final ExecutorService executor; + private final RetryPolicy policy; @Inject - public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor) + public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy policy) { this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.executor = requireNonNull(executor, "executor is null"); + this.policy = requireNonNull(policy, "policy is null"); } @Override @@ -72,6 +75,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS jdbcClient, executor, session, + policy, jdbcSplit, jdbcTableHandle.intersectedWithConstraint(jdbcSplit.getDynamicFilter().transformKeys(ColumnHandle.class::cast)), handles.build()); @@ -88,6 +92,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS jdbcClient, executor, session, + policy, jdbcSplit, procedureHandle, sourceColumns), diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSet.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSet.java index c252771ce6ab..87aede67b24b 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSet.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSet.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import dev.failsafe.RetryPolicy; import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordSet; import io.trino.spi.connector.SchemaTableName; @@ -181,6 +182,6 @@ public void testIdempotentClose() private JdbcRecordSet createRecordSet(List columnHandles) { - return new JdbcRecordSet(jdbcClient, executor, SESSION, split, table, columnHandles); + return new JdbcRecordSet(jdbcClient, executor, SESSION, RetryPolicy.ofDefaults(), split, table, columnHandles); } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java index dd6382beaaa6..5ff6896312e1 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import dev.failsafe.RetryPolicy; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitSource; @@ -103,7 +104,7 @@ public void tearDown() public void testGetRecordSet() { ConnectorTransactionHandle transaction = new JdbcTransactionHandle(); - JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor); + JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor, RetryPolicy.ofDefaults()); RecordSet recordSet = recordSetProvider.getRecordSet(transaction, SESSION, split, table, ImmutableList.of(textColumn, textShortColumn, valueColumn)); assertThat(recordSet).withFailMessage("recordSet is null").isNotNull(); @@ -214,7 +215,7 @@ private RecordCursor getCursor(JdbcTableHandle jdbcTableHandle, List policy) { - this.recordSetProvider = new JdbcRecordSetProvider(phoenixClient, executor); + this.recordSetProvider = new JdbcRecordSetProvider(phoenixClient, executor, policy); this.phoenixClient = requireNonNull(phoenixClient, "phoenixClient is null"); }