diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java index 3cfe08ba4d11..1204fa7ada61 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java @@ -34,8 +34,13 @@ import java.util.Optional; import java.util.stream.Collectors; import javax.sql.DataSource; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Duration; import org.postgresql.ds.PGSimpleDataSource; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -86,9 +91,40 @@ public static void createTable( fieldsAndTypes.stream() .map(kv -> kv.getKey() + " " + kv.getValue()) .collect(Collectors.joining(", ")); - try (Connection connection = dataSource.getConnection()) { - try (Statement statement = connection.createStatement()) { - statement.execute(String.format("create table %s (%s)", tableName, fieldsList)); + SQLException exception = null; + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxCumulativeBackoff(Duration.standardMinutes(5)) + .withMaxRetries(4) + .backoff(); + while (true) { + // This is not implemented as try-with-resources because it appears that try-with-resources is + // not correctly catching the PSQLException thrown by dataSource.getConnection() + Connection connection = null; + try { + connection = dataSource.getConnection(); + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("create table %s (%s)", tableName, fieldsList)); + return; + } + } catch (SQLException e) { + exception = e; + } finally { + if (connection != null) { + connection.close(); + } + } + boolean hasNext; + try { + hasNext = BackOffUtils.next(sleeper, backoff); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (!hasNext) { + // we tried the max number of times + throw exception; } } }