diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7197086bbef6..7238cc5a50ab 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -890,7 +890,6 @@ static void setupExternalBuilder( builder.setRedistributeNumKeys(0); builder.setAllowDuplicates(false); } - System.out.println("xxx builder service" + builder.toString()); } private static Coder resolveCoder(Class> deserializer) { @@ -1697,17 +1696,17 @@ public PCollection> expand(PBegin input) { } if (kafkaRead.isRedistributed()) { - // fail here instead. - checkArgument( - !kafkaRead.isCommitOffsetsInFinalizeEnabled(), - "commitOffsetsInFinalize() can't be enabled with withRedistribute()"); - + if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { + LOG.warn( + "commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()"); + } if (Boolean.TRUE.equals( kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) { LOG.warn( "config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()"); } PCollection> output = input.getPipeline().apply(transform); + if (kafkaRead.getRedistributeNumKeys() == 0) { return output.apply( "Insert Redistribute", @@ -2660,6 +2659,13 @@ public PCollection> expand(PCollection if (getRedistributeNumKeys() == 0) { LOG.warn("This will create a key per record, which is sub-optimal for most use cases."); } + // is another check here needed for with commit offsets + if (isCommitOffsetEnabled() || configuredKafkaCommit()) { + LOG.warn( + "Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since " + + "withRestribute() is enabled, the runner may have additional work processed that " + + "is ahead of the current checkpoint"); + } } if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { @@ -2693,8 +2699,7 @@ public PCollection> expand(PCollection .getSchemaCoder(KafkaSourceDescriptor.class), recordCoder)); - boolean applyCommitOffsets = - isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute(); + boolean applyCommitOffsets = isCommitOffsetEnabled() && !configuredKafkaCommit(); if (!applyCommitOffsets) { return outputWithDescriptor .apply(MapElements.into(new TypeDescriptor>() {}).via(KV::getValue)) @@ -2716,6 +2721,15 @@ public PCollection> expand(PCollection if (Comparators.lexicographical(Comparator.naturalOrder()) .compare(requestedVersion, targetVersion) < 0) { + // Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle + // prior to the redistribute. The reshuffle will occur before commits are offsetted and + // before outputting KafkaRecords. Adding a redistrube then afterwards doesn't provide + // additional performance benefit. + checkArgument( + !isRedistribute(), + "Can not enable isRedistribute() while committing offsets prior to " + + String.join(".", targetVersion)); + return expand259Commits( outputWithDescriptor, recordCoder, input.getPipeline().getSchemaRegistry()); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 5c563b2bc688..ebc607609ee2 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -88,6 +88,7 @@ import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.metrics.SourceMetrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -628,10 +629,7 @@ public void testRiskyConfigurationWarnsProperly() { } @Test - public void testCommitOffsetsInFinalizeAndRedistributeErrors() { - thrown.expect(Exception.class); - thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with withRedistribute()"); - + public void testCommitOffsetsInFinalizeAndRedistributeWarnings() { int numElements = 1000; PCollection input = @@ -645,6 +643,9 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() { addCountingAsserts(input, numElements); p.run(); + + kafkaIOExpectedLogs.verifyWarn( + "commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()"); } @Test @@ -670,6 +671,32 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() { p.run(); } + @Test + public void testDisableRedistributeKafkaOffsetLegacy() { + thrown.expect(Exception.class); + thrown.expectMessage( + "Can not enable isRedistribute() while committing offsets prior to 2.60.0"); + p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0"); + + p.apply( + Create.of( + KafkaSourceDescriptor.of( + new TopicPartition("topic", 1), + null, + null, + null, + null, + ImmutableList.of("8.8.8.8:9092")))) + .apply( + KafkaIO.readSourceDescriptors() + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withRedistribute() + .withProcessingTime() + .commitOffsets()); + p.run(); + } + @Test public void testEnableAutoCommitWithRedistribute() throws Exception { @@ -678,6 +705,7 @@ public void testEnableAutoCommitWithRedistribute() throws Exception { PCollection input = p.apply( mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) + .withRedistribute() .withRedistributeNumKeys(100) .withConsumerConfigUpdates( ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))