Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JdbcIO] - Adding option for max batch buffering duration #30259

Merged
merged 6 commits into from
Feb 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
}

private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
Expand All @@ -389,6 +390,7 @@ public static <T> Write<T> write() {
public static <T> WriteVoid<T> writeVoid() {
return new AutoValue_JdbcIO_WriteVoid.Builder<T>()
.setBatchSize(DEFAULT_BATCH_SIZE)
.setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION)
.setRetryStrategy(new DefaultRetryStrategy())
.setRetryConfiguration(RetryConfiguration.create(5, null, Duration.standardSeconds(5)))
.build();
Expand Down Expand Up @@ -1686,6 +1688,11 @@ public Write<T> withBatchSize(long batchSize) {
return new Write<>(inner.withBatchSize(batchSize));
}

/** See {@link WriteVoid#withMaxBatchBufferingDuration(long)}. */
public Write<T> withMaxBatchBufferingDuration(long maxBatchBufferingDuration) {
return new Write<>(inner.withMaxBatchBufferingDuration(maxBatchBufferingDuration));
}

/** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
return new Write<>(inner.withRetryStrategy(retryStrategy));
Expand Down Expand Up @@ -1754,13 +1761,16 @@ public PDone expand(PCollection<T> input) {
/* The maximum number of elements that will be included in a batch. */

static <T> PCollection<Iterable<T>> batchElements(
PCollection<T> input, @Nullable Boolean withAutoSharding, long batchSize) {
PCollection<T> input,
@Nullable Boolean withAutoSharding,
long batchSize,
long maxBatchBufferingDuration) {
PCollection<Iterable<T>> iterables;
if (input.isBounded() == IsBounded.UNBOUNDED) {
PCollection<KV<String, T>> keyedInput = input.apply(WithKeys.<String, T>of(""));
GroupIntoBatches<String, T> groupTransform =
GroupIntoBatches.<String, T>ofSize(batchSize)
.withMaxBufferingDuration(Duration.millis(200));
.withMaxBufferingDuration(Duration.millis(maxBatchBufferingDuration));
if (withAutoSharding != null && withAutoSharding) {
// unbounded and withAutoSharding enabled, group into batches with shardedKey
iterables = keyedInput.apply(groupTransform.withShardedKey()).apply(Values.create());
Expand Down Expand Up @@ -1958,7 +1968,8 @@ public PCollection<V> expand(PCollection<T> input) {
"Autosharding is only supported for streaming pipelines.");

PCollection<Iterable<T>> iterables =
JdbcIO.<T>batchElements(input, autoSharding, DEFAULT_BATCH_SIZE);
JdbcIO.<T>batchElements(
input, autoSharding, DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCH_BUFFERING_DURATION);
return iterables.apply(
ParDo.of(
new WriteFn<T, V>(
Expand All @@ -1971,6 +1982,7 @@ public PCollection<V> expand(PCollection<T> input) {
.setRetryConfiguration(getRetryConfiguration())
.setReturnResults(true)
.setBatchSize(1L)
.setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION)
.build())));
}
}
Expand All @@ -1990,6 +2002,8 @@ public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, PCo

abstract long getBatchSize();

abstract long getMaxBatchBufferingDuration();

abstract @Nullable PreparedStatementSetter<T> getPreparedStatementSetter();

abstract @Nullable RetryStrategy getRetryStrategy();
Expand All @@ -2011,6 +2025,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setBatchSize(long batchSize);

abstract Builder<T> setMaxBatchBufferingDuration(long maxBatchBufferingDuration);

abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);

abstract Builder<T> setRetryStrategy(RetryStrategy deadlockPredicate);
Expand Down Expand Up @@ -2049,7 +2065,9 @@ public WriteVoid<T> withPreparedStatementSetter(PreparedStatementSetter<T> sette
}

/**
* Provide a maximum size in number of SQL statement for the batch. Default is 1000.
* Provide a maximum size in number of SQL statement for the batch. Default is 1000. The
* pipeline will either commit a batch when this maximum is reached or its maximum buffering
* time has been reached. See {@link #withMaxBatchBufferingDuration(long)}
*
* @param batchSize maximum batch size in number of statements
*/
Expand All @@ -2058,6 +2076,21 @@ public WriteVoid<T> withBatchSize(long batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}

/**
* Provide maximum buffering time to batch elements before committing SQL statement. Default is
* 200 The pipeline will either commit a batch when this maximum buffering time has been reached
* or the maximum amount of elements has been collected. See {@link #withBatchSize(long)}
*
* @param maxBatchBufferingDuration maximum time in milliseconds before batch is committed
*/
public WriteVoid<T> withMaxBatchBufferingDuration(long maxBatchBufferingDuration) {
checkArgument(
maxBatchBufferingDuration > 0,
"maxBatchBufferingDuration must be > 0, but was %s",
maxBatchBufferingDuration);
return toBuilder().setMaxBatchBufferingDuration(maxBatchBufferingDuration).build();
}

/**
* When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
* will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
Expand Down Expand Up @@ -2128,7 +2161,8 @@ public PCollection<Void> expand(PCollection<T> input) {
}

PCollection<Iterable<T>> iterables =
JdbcIO.<T>batchElements(input, getAutoSharding(), getBatchSize());
JdbcIO.<T>batchElements(
input, getAutoSharding(), getBatchSize(), getMaxBatchBufferingDuration());

return iterables
.apply(
Expand All @@ -2142,6 +2176,7 @@ public PCollection<Void> expand(PCollection<T> input) {
.setTable(spec.getTable())
.setStatement(spec.getStatement())
.setBatchSize(spec.getBatchSize())
.setMaxBatchBufferingDuration(spec.getMaxBatchBufferingDuration())
.setReturnResults(false)
.build())))
.setCoder(VoidCoder.of());
Expand Down Expand Up @@ -2448,6 +2483,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
@Pure
abstract @Nullable Long getBatchSize();

@Pure
abstract @Nullable Long getMaxBatchBufferingDuration();

@Pure
abstract Boolean getReturnResults();

Expand Down Expand Up @@ -2477,6 +2515,9 @@ abstract Builder<T, V> setRetryConfiguration(

abstract Builder<T, V> setBatchSize(@Nullable Long batchSize);

abstract Builder<T, V> setMaxBatchBufferingDuration(
@Nullable Long maxBatchBufferingDuration);

abstract Builder<T, V> setReturnResults(Boolean returnResults);

abstract WriteFnSpec<T, V> build();
Expand Down
Loading