Skip to content

Commit

Permalink
GH-2179: Exit RetryingBatchErrorHandler on Stop
Browse files Browse the repository at this point in the history
Resolves #2179
  • Loading branch information
garyrussell authored and artembilan committed Mar 21, 2022
1 parent e9310c0 commit a78a3cd
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
throw new KafkaException("Interrupted during retry", logLevel, e1);
}
if (!container.isRunning()) {
throw new KafkaException("Container stopped during retries");
}
try {
invokeListener.run();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -29,13 +31,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.KafkaException;
import org.springframework.util.backoff.FixedBackOff;

/**
Expand Down Expand Up @@ -63,6 +67,7 @@ void recover() {
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
throw new RuntimeException();
Expand Down Expand Up @@ -92,6 +97,7 @@ void successOnRetry() {
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
eh.handle(new RuntimeException(), records, consumer, container, () -> this.invoked++);
assertThat(this.invoked).isEqualTo(1);
assertThat(recovered).hasSize(0);
Expand Down Expand Up @@ -119,6 +125,7 @@ void recoveryFails() {
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
Expand All @@ -134,4 +141,31 @@ void recoveryFails() {
verify(consumer).seek(new TopicPartition("foo", 1), 0L);
}

@Test
void exitOnContainerStop() {
this.invoked = 0;
List<ConsumerRecord<?, ?>> recovered = new ArrayList<>();
FallbackBatchErrorHandler eh = new FallbackBatchErrorHandler(new FixedBackOff(0, 99999), (cr, ex) -> {
recovered.add(cr);
});
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = new HashMap<>();
map.put(new TopicPartition("foo", 0),
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, "foo", "bar")));
map.put(new TopicPartition("foo", 1),
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
Consumer<?, ?> consumer = mock(Consumer.class);
MessageListenerContainer container = mock(MessageListenerContainer.class);
AtomicBoolean stopped = new AtomicBoolean(true);
willAnswer(inv -> stopped.get()).given(container).isRunning();
assertThatExceptionOfType(KafkaException.class).isThrownBy(() ->
eh.handle(new RuntimeException(), records, consumer, container, () -> {
this.invoked++;
stopped.set(false);
throw new RuntimeException();
})
).withMessage("Container stopped during retries");
assertThat(this.invoked).isEqualTo(1);
}

}

0 comments on commit a78a3cd

Please sign in to comment.