Skip to content

Commit

Permalink
[JdbcIO] - Adding option for max batch buffering duration (#30259)
Browse files Browse the repository at this point in the history
* Adding option for max batch buffering duration

* adding getter for max batch duration

* ran spotless

* adding more to the javadoc for both withBatchSize and withMaxBatchBufferingDuration

* running spotless

* using default value

---------

Co-authored-by: Scott Strong <[email protected]>
  • Loading branch information
scott-strong and Scott Strong authored Feb 12, 2024
1 parent 47f7778 commit fa32492
Showing 1 changed file with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
}

private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
Expand All @@ -389,6 +390,7 @@ public static <T> Write<T> write() {
public static <T> WriteVoid<T> writeVoid() {
return new AutoValue_JdbcIO_WriteVoid.Builder<T>()
.setBatchSize(DEFAULT_BATCH_SIZE)
.setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION)
.setRetryStrategy(new DefaultRetryStrategy())
.setRetryConfiguration(RetryConfiguration.create(5, null, Duration.standardSeconds(5)))
.build();
Expand Down Expand Up @@ -1686,6 +1688,11 @@ public Write<T> withBatchSize(long batchSize) {
return new Write<>(inner.withBatchSize(batchSize));
}

/** See {@link WriteVoid#withMaxBatchBufferingDuration(long)}. */
public Write<T> withMaxBatchBufferingDuration(long maxBatchBufferingDuration) {
return new Write<>(inner.withMaxBatchBufferingDuration(maxBatchBufferingDuration));
}

/** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
return new Write<>(inner.withRetryStrategy(retryStrategy));
Expand Down Expand Up @@ -1754,13 +1761,16 @@ public PDone expand(PCollection<T> input) {
/* The maximum number of elements that will be included in a batch. */

static <T> PCollection<Iterable<T>> batchElements(
PCollection<T> input, @Nullable Boolean withAutoSharding, long batchSize) {
PCollection<T> input,
@Nullable Boolean withAutoSharding,
long batchSize,
long maxBatchBufferingDuration) {
PCollection<Iterable<T>> iterables;
if (input.isBounded() == IsBounded.UNBOUNDED) {
PCollection<KV<String, T>> keyedInput = input.apply(WithKeys.<String, T>of(""));
GroupIntoBatches<String, T> groupTransform =
GroupIntoBatches.<String, T>ofSize(batchSize)
.withMaxBufferingDuration(Duration.millis(200));
.withMaxBufferingDuration(Duration.millis(maxBatchBufferingDuration));
if (withAutoSharding != null && withAutoSharding) {
// unbounded and withAutoSharding enabled, group into batches with shardedKey
iterables = keyedInput.apply(groupTransform.withShardedKey()).apply(Values.create());
Expand Down Expand Up @@ -1958,7 +1968,8 @@ public PCollection<V> expand(PCollection<T> input) {
"Autosharding is only supported for streaming pipelines.");

PCollection<Iterable<T>> iterables =
JdbcIO.<T>batchElements(input, autoSharding, DEFAULT_BATCH_SIZE);
JdbcIO.<T>batchElements(
input, autoSharding, DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCH_BUFFERING_DURATION);
return iterables.apply(
ParDo.of(
new WriteFn<T, V>(
Expand All @@ -1971,6 +1982,7 @@ public PCollection<V> expand(PCollection<T> input) {
.setRetryConfiguration(getRetryConfiguration())
.setReturnResults(true)
.setBatchSize(1L)
.setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION)
.build())));
}
}
Expand All @@ -1990,6 +2002,8 @@ public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, PCo

abstract long getBatchSize();

abstract long getMaxBatchBufferingDuration();

abstract @Nullable PreparedStatementSetter<T> getPreparedStatementSetter();

abstract @Nullable RetryStrategy getRetryStrategy();
Expand All @@ -2011,6 +2025,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setBatchSize(long batchSize);

abstract Builder<T> setMaxBatchBufferingDuration(long maxBatchBufferingDuration);

abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);

abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
Expand Down Expand Up @@ -2049,7 +2065,9 @@ public WriteVoid<T> withPreparedStatementSetter(PreparedStatementSetter<T> sette
}

/**
* Provide a maximum size in number of SQL statement for the batch. Default is 1000.
* Provide a maximum size in number of SQL statement for the batch. Default is 1000. The
* pipeline will either commit a batch when this maximum is reached or its maximum buffering
* time has been reached. See {@link #withMaxBatchBufferingDuration(long)}
*
* @param batchSize maximum batch size in number of statements
*/
Expand All @@ -2058,6 +2076,21 @@ public WriteVoid<T> withBatchSize(long batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}

/**
* Provide maximum buffering time to batch elements before committing SQL statement. Default is
* 200 The pipeline will either commit a batch when this maximum buffering time has been reached
* or the maximum amount of elements has been collected. See {@link #withBatchSize(long)}
*
* @param maxBatchBufferingDuration maximum time in milliseconds before batch is committed
*/
public WriteVoid<T> withMaxBatchBufferingDuration(long maxBatchBufferingDuration) {
checkArgument(
maxBatchBufferingDuration > 0,
"maxBatchBufferingDuration must be > 0, but was %s",
maxBatchBufferingDuration);
return toBuilder().setMaxBatchBufferingDuration(maxBatchBufferingDuration).build();
}

/**
* When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
* will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
Expand Down Expand Up @@ -2128,7 +2161,8 @@ public PCollection<Void> expand(PCollection<T> input) {
}

PCollection<Iterable<T>> iterables =
JdbcIO.<T>batchElements(input, getAutoSharding(), getBatchSize());
JdbcIO.<T>batchElements(
input, getAutoSharding(), getBatchSize(), getMaxBatchBufferingDuration());

return iterables
.apply(
Expand All @@ -2142,6 +2176,7 @@ public PCollection<Void> expand(PCollection<T> input) {
.setTable(spec.getTable())
.setStatement(spec.getStatement())
.setBatchSize(spec.getBatchSize())
.setMaxBatchBufferingDuration(spec.getMaxBatchBufferingDuration())
.setReturnResults(false)
.build())))
.setCoder(VoidCoder.of());
Expand Down Expand Up @@ -2448,6 +2483,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
@Pure
abstract @Nullable Long getBatchSize();

@Pure
abstract @Nullable Long getMaxBatchBufferingDuration();

@Pure
abstract Boolean getReturnResults();

Expand Down Expand Up @@ -2477,6 +2515,9 @@ abstract Builder<T, V> setRetryConfiguration(

abstract Builder<T, V> setBatchSize(@Nullable Long batchSize);

abstract Builder<T, V> setMaxBatchBufferingDuration(
@Nullable Long maxBatchBufferingDuration);

abstract Builder<T, V> setReturnResults(Boolean returnResults);

abstract WriteFnSpec<T, V> build();
Expand Down

0 comments on commit fa32492

Please sign in to comment.