From bb660532eb16531253ed5476a6887cc471af5615 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Fri, 5 Apr 2024 20:29:24 +0000 Subject: [PATCH 1/9] [bug30870]: make consumer polling timeout configurable for KafkaIO.Read --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 34 ++++++++++++++++++- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 11 ++++-- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 13 +++++++ .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 13 +++++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 231a1b9e49e1..6ecce6430966 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,6 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) + .setConsumerPollingTimeout(Duration.standardSeconds(1l)) .build(); } @@ -706,6 +707,9 @@ public abstract static class Read @Pure public abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + @Pure + public abstract @Nullable Duration getConsumerPollingTimeout(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -762,6 +766,8 @@ Builder setCheckStopReadingFn( return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)); } + abstract Builder setConsumerPollingTimeout(Duration consumerPollingTimeout); + abstract Read build(); static void setupExternalBuilder( @@ -1334,6 +1340,16 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord return toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build(); } + /** + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. + * The default is 1 second. + */ + public Read withConsumerPollingTimeout(Duration duration) { + checkState(duration == null || duration.compareTo(Duration.ZERO) > 0, + "Consumer polling timeout must be greater than 0."); + return toBuilder().setConsumerPollingTimeout(duration).build(); + } + /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform>> withoutMetadata() { return new TypedWithoutMetadata<>(this); @@ -1596,7 +1612,8 @@ public PCollection> expand(PBegin input) { .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) - .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()); + .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) + .withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout()); if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { readTransform = readTransform.commitOffsets(); } @@ -2036,6 +2053,9 @@ public abstract static class ReadSourceDescriptors @Pure abstract ErrorHandler getBadRecordErrorHandler(); + @Pure + abstract @Nullable Duration getConsumerPollingTimeout(); + abstract boolean isBounded(); abstract ReadSourceDescriptors.Builder toBuilder(); @@ -2086,6 +2106,9 @@ abstract ReadSourceDescriptors.Builder setBadRecordRouter( abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout( + @Nullable Duration duration); + abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); abstract ReadSourceDescriptors build(); @@ -2099,6 +2122,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) + .setConsumerPollingTimeout(Duration.standardSeconds(1l)) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2360,6 +2384,14 @@ public ReadSourceDescriptors withBadRecordErrorHandler( .build(); } + /** + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. + * The default is 1 second. + */ + public ReadSourceDescriptors withConsumerPollingTimeout( @Nullable Duration duration ) { + return toBuilder().setConsumerPollingTimeout(duration).build(); + } + ReadAllFromRow forExternalBuild() { return new ReadAllFromRow<>(this); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 924833290f13..b3278f9db95c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,6 +191,12 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; + if(transform.getConsumerPollingTimeout() != null) { + this.consumerPollingTimeout = + java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); + } else { + this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT; + } } private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class); @@ -219,6 +225,7 @@ private ReadFromKafkaDoFn( private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1); + @VisibleForTesting final java.time.Duration consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -508,7 +515,7 @@ private ConsumerRecords poll( java.time.Duration elapsed = java.time.Duration.ZERO; while (true) { final ConsumerRecords rawRecords = - consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed)); + consumer.poll(consumerPollingTimeout.minus(elapsed)); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -518,7 +525,7 @@ private ConsumerRecords poll( return rawRecords; } elapsed = sw.elapsed(); - if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) { + if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { // timeout is over return rawRecords; } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 9b15b86051f5..7577e05d3d54 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2121,6 +2121,19 @@ public void testSinkMetrics() throws Exception { } } + @Test(expected = IllegalStateException.class) + public void testWithInvalidConsumerPollingTimeout(){ + KafkaIO.read() + .withConsumerPollingTimeout(Duration.standardSeconds(-5)); + } + + @Test + public void testWithValidConsumerPollingTimeout(){ + KafkaIO.Read reader = KafkaIO.read() + .withConsumerPollingTimeout(Duration.standardSeconds(15)); + assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); + } + private static void verifyProducerRecords( MockProducer mockProducer, String topic, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 48b5b060a295..4d36af64ae45 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -641,6 +641,19 @@ public void testUnbounded() { Assert.assertNotEquals(0, visitor.unboundedPCollections.size()); } + @Test + public void testConstructorWithPollTimeout(){ + ReadSourceDescriptors descriptors = + makeReadSourceDescriptor(consumer); + // default poll timeout = 1 scond + ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); + Assert.assertEquals(Duration.ofSeconds(1l), dofnInstance.consumerPollingTimeout); + // updated timeout = 5 seconds + descriptors = descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5l)); + ReadFromKafkaDoFn dofnInstanceNew = ReadFromKafkaDoFn.create(descriptors, RECORDS); + Assert.assertEquals(Duration.ofSeconds(5l), dofnInstanceNew.consumerPollingTimeout); + } + private BoundednessVisitor testBoundedness( Function, ReadSourceDescriptors> readSourceDescriptorsDecorator) { From b62e26eb9c77e4a4d1112b7291d3835bc56e2d17 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Fri, 5 Apr 2024 20:57:12 +0000 Subject: [PATCH 2/9] fixed spotless complains --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 9 +++++---- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 2 +- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 11 +++++------ .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 15 ++++++++------- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6ecce6430966..33afca1f3a2f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,7 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) - .setConsumerPollingTimeout(Duration.standardSeconds(1l)) + .setConsumerPollingTimeout(Duration.standardSeconds(1L)) .build(); } @@ -1345,7 +1345,8 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord * The default is 1 second. */ public Read withConsumerPollingTimeout(Duration duration) { - checkState(duration == null || duration.compareTo(Duration.ZERO) > 0, + checkState( + duration == null || duration.compareTo(Duration.ZERO) > 0, "Consumer polling timeout must be greater than 0."); return toBuilder().setConsumerPollingTimeout(duration).build(); } @@ -2122,7 +2123,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) - .setConsumerPollingTimeout(Duration.standardSeconds(1l)) + .setConsumerPollingTimeout(Duration.standardSeconds(1L)) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2388,7 +2389,7 @@ public ReadSourceDescriptors withBadRecordErrorHandler( * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. * The default is 1 second. */ - public ReadSourceDescriptors withConsumerPollingTimeout( @Nullable Duration duration ) { + public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index b3278f9db95c..c18d30853ef2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,7 +191,7 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - if(transform.getConsumerPollingTimeout() != null) { + if (transform.getConsumerPollingTimeout() != null) { this.consumerPollingTimeout = java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); } else { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 7577e05d3d54..44c028f08a27 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2122,15 +2122,14 @@ public void testSinkMetrics() throws Exception { } @Test(expected = IllegalStateException.class) - public void testWithInvalidConsumerPollingTimeout(){ - KafkaIO.read() - .withConsumerPollingTimeout(Duration.standardSeconds(-5)); + public void testWithInvalidConsumerPollingTimeout() { + KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(-5)); } @Test - public void testWithValidConsumerPollingTimeout(){ - KafkaIO.Read reader = KafkaIO.read() - .withConsumerPollingTimeout(Duration.standardSeconds(15)); + public void testWithValidConsumerPollingTimeout() { + KafkaIO.Read reader = + KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(15)); assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 4d36af64ae45..874332555d86 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -642,16 +642,17 @@ public void testUnbounded() { } @Test - public void testConstructorWithPollTimeout(){ - ReadSourceDescriptors descriptors = - makeReadSourceDescriptor(consumer); + public void testConstructorWithPollTimeout() { + ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); // default poll timeout = 1 scond ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(1l), dofnInstance.consumerPollingTimeout); + Assert.assertEquals(Duration.ofSeconds(1L), dofnInstance.consumerPollingTimeout); // updated timeout = 5 seconds - descriptors = descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5l)); - ReadFromKafkaDoFn dofnInstanceNew = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(5l), dofnInstanceNew.consumerPollingTimeout); + descriptors = + descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); + ReadFromKafkaDoFn dofnInstanceNew = + ReadFromKafkaDoFn.create(descriptors, RECORDS); + Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); } private BoundednessVisitor testBoundedness( From 880a22ce6c6c74efde378431d462b8b36c562bb2 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Fri, 5 Apr 2024 23:10:19 +0000 Subject: [PATCH 3/9] fixed unit tests --- .../sdk/io/kafka/KafkaIOReadImplementationCompatibility.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index a2cc9aaeb4d9..7e54407300d4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -112,6 +112,7 @@ Object getDefaultValue() { VALUE_DESERIALIZER_PROVIDER, CHECK_STOP_READING_FN(SDF), BAD_RECORD_ERROR_HANDLER(SDF), + CONSUMER_POLLING_TIMEOUT, ; @Nonnull private final ImmutableSet supportedImplementations; From dbcf9d232016df8971669021c6ca52121a8135f3 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Mon, 8 Apr 2024 15:13:03 +0000 Subject: [PATCH 4/9] added logs and increased default polling timeout from 1 to 2 seconds. --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 ++++---- .../org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 4 +++- .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 33afca1f3a2f..c56071e85adb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,7 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) - .setConsumerPollingTimeout(Duration.standardSeconds(1L)) + .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build(); } @@ -1342,7 +1342,7 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord /** * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 1 second. + * The default is 2 second. */ public Read withConsumerPollingTimeout(Duration duration) { checkState( @@ -2123,7 +2123,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) - .setConsumerPollingTimeout(Duration.standardSeconds(1L)) + .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2387,7 +2387,7 @@ public ReadSourceDescriptors withBadRecordErrorHandler( /** * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 1 second. + * The default is 2 second. */ public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index c18d30853ef2..e7d2771728c9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -223,7 +223,7 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache avgRecordSize; - private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1); + private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2); @VisibleForTesting final java.time.Duration consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -527,6 +527,8 @@ private ConsumerRecords poll( elapsed = sw.elapsed(); if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { // timeout is over + LOG.warn("No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", + consumerPollingTimeout.getSeconds()); return rawRecords; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 874332555d86..8902f22164bc 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -646,7 +646,7 @@ public void testConstructorWithPollTimeout() { ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); // default poll timeout = 1 scond ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(1L), dofnInstance.consumerPollingTimeout); + Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); // updated timeout = 5 seconds descriptors = descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); From 2582e993e83dc01b49b43c0134a9176d63853fd8 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Mon, 8 Apr 2024 15:23:30 +0000 Subject: [PATCH 5/9] spotless apply changes --- .../java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index e7d2771728c9..3a821ef9519e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -527,7 +527,8 @@ private ConsumerRecords poll( elapsed = sw.elapsed(); if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { // timeout is over - LOG.warn("No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", + LOG.warn( + "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", consumerPollingTimeout.getSeconds()); return rawRecords; } From 049d9923ba5eeaaf7b30b6acb092d9a9716e9788 Mon Sep 17 00:00:00 2001 From: xianhualiu <122747878+xianhualiu@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:42:59 -0400 Subject: [PATCH 6/9] Update CHANGES.md updated changes.md with changes to make consumer polling timeout configurable for KafkaIO.Read --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 170d8d08da74..16e4f7f4b691 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ ## Bugfixes * Fixed locking issue when shutting down inactive bundle processors. Symptoms of this issue include slowness or stuckness in long-running jobs (Python) ([#30679](https://github.com/apache/beam/pull/30679)). +* Fixed kafka polling issue due to short consumer polling timeout time. Changes to make the consumer polling timeout configurable for KafkaIO.Read with new command: KafkaIO.read().withConsumerPollingTimeout(Duration duration) ( [#30870](https://github.com/apache/beam/pull/30877)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 78f0db8ead6d0efaf085bac9ab565a3dedca873b Mon Sep 17 00:00:00 2001 From: xianhualiu <122747878+xianhualiu@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:45:25 -0400 Subject: [PATCH 7/9] Update CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 16e4f7f4b691..7b26ce887b5b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,7 +80,7 @@ ## Bugfixes * Fixed locking issue when shutting down inactive bundle processors. Symptoms of this issue include slowness or stuckness in long-running jobs (Python) ([#30679](https://github.com/apache/beam/pull/30679)). -* Fixed kafka polling issue due to short consumer polling timeout time. Changes to make the consumer polling timeout configurable for KafkaIO.Read with new command: KafkaIO.read().withConsumerPollingTimeout(Duration duration) ( [#30870](https://github.com/apache/beam/pull/30877)). +* Fixed kafka polling issue due to short consumer polling timeout time. Changes to make the consumer polling timeout configurable for KafkaIO.Read with new command: KafkaIO.read().withConsumerPollingTimeout(Duration duration). Default timeout has been increased from 1 second to 2 seconds( [#30870](https://github.com/apache/beam/pull/30877)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From b1f5a96f7c1ee0aa5e39d1d7f62d48be0b6731f7 Mon Sep 17 00:00:00 2001 From: xianhualiu <122747878+xianhualiu@users.noreply.github.com> Date: Mon, 8 Apr 2024 15:35:27 -0400 Subject: [PATCH 8/9] Update CHANGES.md added break changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7b26ce887b5b..f868cb3a2822 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,6 +72,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/pull/30877)). ## Deprecations @@ -80,7 +81,6 @@ ## Bugfixes * Fixed locking issue when shutting down inactive bundle processors. Symptoms of this issue include slowness or stuckness in long-running jobs (Python) ([#30679](https://github.com/apache/beam/pull/30679)). -* Fixed kafka polling issue due to short consumer polling timeout time. Changes to make the consumer polling timeout configurable for KafkaIO.Read with new command: KafkaIO.read().withConsumerPollingTimeout(Duration duration). Default timeout has been increased from 1 second to 2 seconds( [#30870](https://github.com/apache/beam/pull/30877)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From ffb52f382499002d45a7739f5e03db823b4ff51e Mon Sep 17 00:00:00 2001 From: xianhualiu <122747878+xianhualiu@users.noreply.github.com> Date: Tue, 9 Apr 2024 01:14:29 -0400 Subject: [PATCH 9/9] Update CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index f868cb3a2822..37390abdfe29 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,7 +72,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). -* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/pull/30877)). +* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)). ## Deprecations