Skip to content

Commit

Permalink
adress comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Aug 31, 2024
1 parent e016d25 commit 9c37865
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ static <K, V> void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
System.out.println("xxx builder service" + builder.toString());
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -1697,17 +1696,17 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
}

if (kafkaRead.isRedistributed()) {
// fail here instead.
checkArgument(
!kafkaRead.isCommitOffsetsInFinalizeEnabled(),
"commitOffsetsInFinalize() can't be enabled with withRedistribute()");

if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
LOG.warn(
"commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()");
}
if (Boolean.TRUE.equals(
kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
LOG.warn(
"config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()");
}
PCollection<KafkaRecord<K, V>> output = input.getPipeline().apply(transform);

if (kafkaRead.getRedistributeNumKeys() == 0) {
return output.apply(
"Insert Redistribute",
Expand Down Expand Up @@ -2660,6 +2659,13 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
if (getRedistributeNumKeys() == 0) {
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
}
// is another check here needed for with commit offsets
if (isCommitOffsetEnabled() || configuredKafkaCommit()) {
LOG.warn(
"Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since "
+ "withRestribute() is enabled, the runner may have additional work processed that "
+ "is ahead of the current checkpoint");
}
}

if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
Expand Down Expand Up @@ -2693,8 +2699,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));

boolean applyCommitOffsets =
isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute();
boolean applyCommitOffsets = isCommitOffsetEnabled() && !configuredKafkaCommit();
if (!applyCommitOffsets) {
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
Expand All @@ -2716,6 +2721,15 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
// Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle
// prior to the redistribute. The reshuffle will occur before commits are offsetted and
// before outputting KafkaRecords. Adding a redistrube then afterwards doesn't provide
// additional performance benefit.
checkArgument(
!isRedistribute(),
"Can not enable isRedistribute() while committing offsets prior to "
+ String.join(".", targetVersion));

return expand259Commits(
outputWithDescriptor, recordCoder, input.getPipeline().getSchemaRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -628,10 +629,7 @@ public void testRiskyConfigurationWarnsProperly() {
}

@Test
public void testCommitOffsetsInFinalizeAndRedistributeErrors() {
thrown.expect(Exception.class);
thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with withRedistribute()");

public void testCommitOffsetsInFinalizeAndRedistributeWarnings() {
int numElements = 1000;

PCollection<Long> input =
Expand All @@ -645,6 +643,9 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() {

addCountingAsserts(input, numElements);
p.run();

kafkaIOExpectedLogs.verifyWarn(
"commitOffsetsInFinalize() will not capture all work processed if set with withRedistribute()");
}

@Test
Expand All @@ -670,6 +671,32 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
p.run();
}

@Test
public void testDisableRedistributeKafkaOffsetLegacy() {
thrown.expect(Exception.class);
thrown.expectMessage(
"Can not enable isRedistribute() while committing offsets prior to 2.60.0");
p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0");

p.apply(
Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
null,
null,
ImmutableList.of("8.8.8.8:9092"))))
.apply(
KafkaIO.<Long, Long>readSourceDescriptors()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withRedistribute()
.withProcessingTime()
.commitOffsets());
p.run();
}

@Test
public void testEnableAutoCommitWithRedistribute() throws Exception {

Expand All @@ -678,6 +705,7 @@ public void testEnableAutoCommitWithRedistribute() throws Exception {
PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0)
.withRedistribute()
.withRedistributeNumKeys(100)
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
Expand Down

0 comments on commit 9c37865

Please sign in to comment.