diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 35d94442f37a..2e7334f183a8 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -370,6 +370,7 @@ public static ReadWithPartitions readWithPartitions() { } private static final long DEFAULT_BATCH_SIZE = 1000L; + private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L; private static final int DEFAULT_FETCH_SIZE = 50_000; // Default values used from fluent backoff. private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1); @@ -389,6 +390,7 @@ public static Write write() { public static WriteVoid writeVoid() { return new AutoValue_JdbcIO_WriteVoid.Builder() .setBatchSize(DEFAULT_BATCH_SIZE) + .setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION) .setRetryStrategy(new DefaultRetryStrategy()) .setRetryConfiguration(RetryConfiguration.create(5, null, Duration.standardSeconds(5))) .build(); @@ -1686,6 +1688,11 @@ public Write withBatchSize(long batchSize) { return new Write<>(inner.withBatchSize(batchSize)); } + /** See {@link WriteVoid#withMaxBatchBufferingDuration(long)}. */ + public Write withMaxBatchBufferingDuration(long maxBatchBufferingDuration) { + return new Write<>(inner.withMaxBatchBufferingDuration(maxBatchBufferingDuration)); + } + /** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */ public Write withRetryStrategy(RetryStrategy retryStrategy) { return new Write<>(inner.withRetryStrategy(retryStrategy)); @@ -1754,13 +1761,16 @@ public PDone expand(PCollection input) { /* The maximum number of elements that will be included in a batch. */ static PCollection> batchElements( - PCollection input, @Nullable Boolean withAutoSharding, long batchSize) { + PCollection input, + @Nullable Boolean withAutoSharding, + long batchSize, + long maxBatchBufferingDuration) { PCollection> iterables; if (input.isBounded() == IsBounded.UNBOUNDED) { PCollection> keyedInput = input.apply(WithKeys.of("")); GroupIntoBatches groupTransform = GroupIntoBatches.ofSize(batchSize) - .withMaxBufferingDuration(Duration.millis(200)); + .withMaxBufferingDuration(Duration.millis(maxBatchBufferingDuration)); if (withAutoSharding != null && withAutoSharding) { // unbounded and withAutoSharding enabled, group into batches with shardedKey iterables = keyedInput.apply(groupTransform.withShardedKey()).apply(Values.create()); @@ -1958,7 +1968,8 @@ public PCollection expand(PCollection input) { "Autosharding is only supported for streaming pipelines."); PCollection> iterables = - JdbcIO.batchElements(input, autoSharding, DEFAULT_BATCH_SIZE); + JdbcIO.batchElements( + input, autoSharding, DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCH_BUFFERING_DURATION); return iterables.apply( ParDo.of( new WriteFn( @@ -1971,6 +1982,7 @@ public PCollection expand(PCollection input) { .setRetryConfiguration(getRetryConfiguration()) .setReturnResults(true) .setBatchSize(1L) + .setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION) .build()))); } } @@ -1990,6 +2002,8 @@ public abstract static class WriteVoid extends PTransform, PCo abstract long getBatchSize(); + abstract long getMaxBatchBufferingDuration(); + abstract @Nullable PreparedStatementSetter getPreparedStatementSetter(); abstract @Nullable RetryStrategy getRetryStrategy(); @@ -2011,6 +2025,8 @@ abstract Builder setDataSourceProviderFn( abstract Builder setBatchSize(long batchSize); + abstract Builder setMaxBatchBufferingDuration(long maxBatchBufferingDuration); + abstract Builder setPreparedStatementSetter(PreparedStatementSetter setter); abstract Builder setRetryStrategy(RetryStrategy deadlockPredicate); @@ -2049,7 +2065,9 @@ public WriteVoid withPreparedStatementSetter(PreparedStatementSetter sette } /** - * Provide a maximum size in number of SQL statement for the batch. Default is 1000. + * Provide a maximum size in number of SQL statement for the batch. Default is 1000. The + * pipeline will either commit a batch when this maximum is reached or its maximum buffering + * time has been reached. See {@link #withMaxBatchBufferingDuration(long)} * * @param batchSize maximum batch size in number of statements */ @@ -2058,6 +2076,21 @@ public WriteVoid withBatchSize(long batchSize) { return toBuilder().setBatchSize(batchSize).build(); } + /** + * Provide maximum buffering time to batch elements before committing SQL statement. Default is + * 200 The pipeline will either commit a batch when this maximum buffering time has been reached + * or the maximum amount of elements has been collected. See {@link #withBatchSize(long)} + * + * @param maxBatchBufferingDuration maximum time in milliseconds before batch is committed + */ + public WriteVoid withMaxBatchBufferingDuration(long maxBatchBufferingDuration) { + checkArgument( + maxBatchBufferingDuration > 0, + "maxBatchBufferingDuration must be > 0, but was %s", + maxBatchBufferingDuration); + return toBuilder().setMaxBatchBufferingDuration(maxBatchBufferingDuration).build(); + } + /** * When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it * will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true}, @@ -2128,7 +2161,8 @@ public PCollection expand(PCollection input) { } PCollection> iterables = - JdbcIO.batchElements(input, getAutoSharding(), getBatchSize()); + JdbcIO.batchElements( + input, getAutoSharding(), getBatchSize(), getMaxBatchBufferingDuration()); return iterables .apply( @@ -2142,6 +2176,7 @@ public PCollection expand(PCollection input) { .setTable(spec.getTable()) .setStatement(spec.getStatement()) .setBatchSize(spec.getBatchSize()) + .setMaxBatchBufferingDuration(spec.getMaxBatchBufferingDuration()) .setReturnResults(false) .build()))) .setCoder(VoidCoder.of()); @@ -2448,6 +2483,9 @@ public void populateDisplayData(DisplayData.Builder builder) { @Pure abstract @Nullable Long getBatchSize(); + @Pure + abstract @Nullable Long getMaxBatchBufferingDuration(); + @Pure abstract Boolean getReturnResults(); @@ -2477,6 +2515,9 @@ abstract Builder setRetryConfiguration( abstract Builder setBatchSize(@Nullable Long batchSize); + abstract Builder setMaxBatchBufferingDuration( + @Nullable Long maxBatchBufferingDuration); + abstract Builder setReturnResults(Boolean returnResults); abstract WriteFnSpec build();