Skip to content

Commit

Permalink
wasn't possible to enable this before
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Aug 30, 2024
1 parent 511f294 commit e016d25
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1699,8 +1699,14 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
if (kafkaRead.isRedistributed()) {
// fail here instead.
checkArgument(
kafkaRead.isCommitOffsetsInFinalizeEnabled(),
"commitOffsetsInFinalize() can't be enabled with isRedistributed");
!kafkaRead.isCommitOffsetsInFinalizeEnabled(),
"commitOffsetsInFinalize() can't be enabled with withRedistribute()");

if (Boolean.TRUE.equals(
kafkaRead.getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
LOG.warn(
"config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()");
}
PCollection<KafkaRecord<K, V>> output = input.getPipeline().apply(transform);
if (kafkaRead.getRedistributeNumKeys() == 0) {
return output.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,16 @@ public void testRiskyConfigurationWarnsProperly() {
@Test
public void testCommitOffsetsInFinalizeAndRedistributeErrors() {
thrown.expect(Exception.class);
thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with isRedistributed");
thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with withRedistribute()");

int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
.withoutMetadata())
.apply(Values.create());

Expand All @@ -648,11 +649,36 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() {

@Test
public void testNumKeysIgnoredWithRedistributeNotEnabled() {
thrown.expect(Exception.class);
thrown.expectMessage(
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform");

int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0)
.withRedistributeNumKeys(100)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
.withoutMetadata())
.apply(Values.create());

addCountingAsserts(input, numElements);

p.run();
}

@Test
public void testEnableAutoCommitWithRedistribute() throws Exception {

int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0)
.withRedistributeNumKeys(100)
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
.withoutMetadata())
Expand All @@ -661,6 +687,9 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
addCountingAsserts(input, numElements);

p.run();

kafkaIOExpectedLogs.verifyWarn(
"config.ENABLE_AUTO_COMMIT_CONFIG doesn't need to be set with withRedistribute()");
}

@Test
Expand Down

0 comments on commit e016d25

Please sign in to comment.