Skip to content

Commit

Permalink
Add retry to test connections (apache#23757)
Browse files Browse the repository at this point in the history
* Move connection setup logic for JDBCIO WriteFn to @startBundle to limit parallel calls to datasource.getConnection()

* move connection setup logic back to processElement.
Put a retry into the DatabaseTestHelper

* run spotless

* use fluent backoff instead of manual implementation

* refactor to manual resource management

* run spotless
  • Loading branch information
johnjcasey authored Nov 22, 2022
1 parent fe4a98a commit 67e2008
Showing 1 changed file with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@
import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.JdbcDatabaseContainer;

Expand Down Expand Up @@ -86,9 +91,40 @@ public static void createTable(
fieldsAndTypes.stream()
.map(kv -> kv.getKey() + " " + kv.getValue())
.collect(Collectors.joining(", "));
try (Connection connection = dataSource.getConnection()) {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
SQLException exception = null;
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxCumulativeBackoff(Duration.standardMinutes(5))
.withMaxRetries(4)
.backoff();
while (true) {
// This is not implemented as try-with-resources because it appears that try-with-resources is
// not correctly catching the PSQLException thrown by dataSource.getConnection()
Connection connection = null;
try {
connection = dataSource.getConnection();
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
return;
}
} catch (SQLException e) {
exception = e;
} finally {
if (connection != null) {
connection.close();
}
}
boolean hasNext;
try {
hasNext = BackOffUtils.next(sleeper, backoff);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (!hasNext) {
// we tried the max number of times
throw exception;
}
}
}
Expand Down

0 comments on commit 67e2008

Please sign in to comment.