diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslation.java index 499c7fd21f51..7129854d44cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslation.java @@ -83,7 +83,7 @@ private static GroupIntoBatchesPayload getPayloadFromParameters( return RunnerApi.GroupIntoBatchesPayload.newBuilder() .setBatchSize(params.getBatchSize()) .setBatchSizeBytes(params.getBatchSizeBytes()) - .setMaxBufferingDurationMillis(params.getMaxBufferingDuration().getStandardSeconds() * 1000) + .setMaxBufferingDurationMillis(params.getMaxBufferingDuration().getMillis()) .build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslationTest.java index cb2054e09144..65f571467ca4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/GroupIntoBatchesTranslationTest.java @@ -71,6 +71,7 @@ public static Iterable> transform() { ImmutableSet.of( // gib -> gib, // gib -> gib.withMaxBufferingDuration(Duration.ZERO), // + gib -> gib.withMaxBufferingDuration(Duration.millis(200)), // gib -> gib.withMaxBufferingDuration(Duration.standardSeconds(10))); return Sets.cartesianProduct( @@ -150,7 +151,7 @@ private void verifyPayload( assertThat(payload.getBatchSize(), equalTo(params.getBatchSize())); assertThat(payload.getBatchSizeBytes(), equalTo(params.getBatchSizeBytes())); assertThat( - payload.getMaxBufferingDurationMillis(), - equalTo(params.getMaxBufferingDuration().getStandardSeconds() * 1000)); + Duration.millis(payload.getMaxBufferingDurationMillis()), + equalTo(params.getMaxBufferingDuration())); } }