Skip to content

Commit

Permalink
Retry creating JdbcRecordCursor
Browse files Browse the repository at this point in the history
Creating PreparedStatement can fail due transient issues in remote
database. Let's retry it in the same as in JdbcClient.
  • Loading branch information
kokosing committed Oct 11, 2024
1 parent d4229fa commit 87122c1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import dev.failsafe.RetryPolicy;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
Expand All @@ -22,6 +23,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;

import static io.trino.plugin.jdbc.RetryingModule.retry;
import static java.util.Objects.requireNonNull;

public class JdbcRecordSet
Expand All @@ -34,8 +36,16 @@ public class JdbcRecordSet
private final List<Type> columnTypes;
private final JdbcSplit split;
private final ConnectorSession session;
private final RetryPolicy<Object> policy;

public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorSession session, JdbcSplit split, BaseJdbcConnectorTableHandle table, List<JdbcColumnHandle> columnHandles)
public JdbcRecordSet(
JdbcClient jdbcClient,
ExecutorService executor,
ConnectorSession session,
RetryPolicy<Object> policy,
JdbcSplit split,
BaseJdbcConnectorTableHandle table,
List<JdbcColumnHandle> columnHandles)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.executor = requireNonNull(executor, "executor is null");
Expand All @@ -49,6 +59,7 @@ public JdbcRecordSet(JdbcClient jdbcClient, ExecutorService executor, ConnectorS
}
this.columnTypes = types.build();
this.session = requireNonNull(session, "session is null");
this.policy = requireNonNull(policy, "policy is null");
}

@Override
Expand All @@ -60,6 +71,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles);
return retry(policy, () -> new JdbcRecordCursor(jdbcClient, executor, session, split, table, columnHandles));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import dev.failsafe.RetryPolicy;
import io.trino.plugin.base.MappedRecordSet;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorRecordSetProvider;
Expand All @@ -41,12 +42,14 @@ public class JdbcRecordSetProvider
{
private final JdbcClient jdbcClient;
private final ExecutorService executor;
private final RetryPolicy<Object> policy;

@Inject
public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor)
public JdbcRecordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.executor = requireNonNull(executor, "executor is null");
this.policy = requireNonNull(policy, "policy is null");
}

@Override
Expand All @@ -72,6 +75,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
jdbcClient,
executor,
session,
policy,
jdbcSplit,
jdbcTableHandle.intersectedWithConstraint(jdbcSplit.getDynamicFilter().transformKeys(ColumnHandle.class::cast)),
handles.build());
Expand All @@ -88,6 +92,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
jdbcClient,
executor,
session,
policy,
jdbcSplit,
procedureHandle,
sourceColumns),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import dev.failsafe.RetryPolicy;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -181,6 +182,6 @@ public void testIdempotentClose()

private JdbcRecordSet createRecordSet(List<JdbcColumnHandle> columnHandles)
{
return new JdbcRecordSet(jdbcClient, executor, SESSION, split, table, columnHandles);
return new JdbcRecordSet(jdbcClient, executor, SESSION, RetryPolicy.ofDefaults(), split, table, columnHandles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import dev.failsafe.RetryPolicy;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitSource;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void tearDown()
public void testGetRecordSet()
{
ConnectorTransactionHandle transaction = new JdbcTransactionHandle();
JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor);
JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor, RetryPolicy.ofDefaults());
RecordSet recordSet = recordSetProvider.getRecordSet(transaction, SESSION, split, table, ImmutableList.of(textColumn, textShortColumn, valueColumn));
assertThat(recordSet).withFailMessage("recordSet is null").isNotNull();

Expand Down Expand Up @@ -214,7 +215,7 @@ private RecordCursor getCursor(JdbcTableHandle jdbcTableHandle, List<JdbcColumnH
JdbcSplit split = (JdbcSplit) getOnlyElement(getFutureValue(splits.getNextBatch(1000)).getSplits());

ConnectorTransactionHandle transaction = new JdbcTransactionHandle();
JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor);
JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor, RetryPolicy.ofDefaults());
RecordSet recordSet = recordSetProvider.getRecordSet(transaction, SESSION, split, jdbcTableHandle, columns);

return recordSet.cursor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import dev.failsafe.RetryPolicy;
import io.trino.plugin.jdbc.ForRecordCursor;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
Expand Down Expand Up @@ -48,9 +49,9 @@ public class PhoenixPageSourceProvider
private final PhoenixClient phoenixClient;

@Inject
public PhoenixPageSourceProvider(PhoenixClient phoenixClient, @ForRecordCursor ExecutorService executor)
public PhoenixPageSourceProvider(PhoenixClient phoenixClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
{
this.recordSetProvider = new JdbcRecordSetProvider(phoenixClient, executor);
this.recordSetProvider = new JdbcRecordSetProvider(phoenixClient, executor, policy);
this.phoenixClient = requireNonNull(phoenixClient, "phoenixClient is null");
}

Expand Down

0 comments on commit 87122c1

Please sign in to comment.