From 3f6fc4e0b78a21988c840008ee530365117d5b79 Mon Sep 17 00:00:00 2001 From: Naireen Date: Wed, 11 Sep 2024 17:14:12 +0000 Subject: [PATCH] address review comments --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 +----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 25 +------------------ 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7238cc5a50ab..ae5373e5cd23 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1698,12 +1698,7 @@ public PCollection> expand(PBegin input) { if (kafkaRead.isRedistributed()) { if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { LOG.warn( - "commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()"); - } - if (Boolean.TRUE.equals( - kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) { - LOG.warn( - "config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()"); + "Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()"); } PCollection> output = input.getPipeline().apply(transform); @@ -2659,7 +2654,6 @@ public PCollection> expand(PCollection if (getRedistributeNumKeys() == 0) { LOG.warn("This will create a key per record, which is sub-optimal for most use cases."); } - // is another check here needed for with commit offsets if (isCommitOffsetEnabled() || configuredKafkaCommit()) { LOG.warn( "Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since " diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index ebc607609ee2..73d05c812dca 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -645,7 +645,7 @@ public void testCommitOffsetsInFinalizeAndRedistributeWarnings() { p.run(); kafkaIOExpectedLogs.verifyWarn( - "commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()"); + "Offsets committed due to usage of commitOffsetsInFinalize() may not capture all work processed due to use of withRedistribute()"); } @Test @@ -697,29 +697,6 @@ public void testDisableRedistributeKafkaOffsetLegacy() { p.run(); } - @Test - public void testEnableAutoCommitWithRedistribute() throws Exception { - - int numElements = 1000; - - PCollection input = - p.apply( - mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) - .withRedistribute() - .withRedistributeNumKeys(100) - .withConsumerConfigUpdates( - ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) - .withoutMetadata()) - .apply(Values.create()); - - addCountingAsserts(input, numElements); - - p.run(); - - kafkaIOExpectedLogs.verifyWarn( - "config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()"); - } - @Test public void testUnreachableKafkaBrokers() { // Expect an exception when the Kafka brokers are not reachable on the workers.