Skip to content

Commit

Permalink
GH-3276: Support async retry with @RetryableTopic
Browse files Browse the repository at this point in the history
Fixes: #3276
#3276

GH-3276: Draft and test codes.

Add test cases.

Add method

Fixes lint errors.

Minimize FailedRecordTuple dependency.

Refactor test codes and real codes.

Modify method name.

Fixes compile error.

Remove async retry callback from MessageListener contract.

Revert Copyright period.

Revert KafkaBackkOffAwareMessageListenerAdapter.

Depends on general type on KafkaMessageListenerContainer.

Remove sleep(1) from async retry test.

Remove waitAWhile().

Add java docs.

Add author

Fixes flaky test.

Modify contract of callback for async failure.

Fixes java docs for setCallbackFroAsyncFailure.

Fix the tests for async failure retry.

Remove weed from async retry test and fix lint error.

Add Tags for conditional test.

Remove useless tag for async failure retry tests.

Remove unused import

Make a method static and Add @SuppressWarning.

Remove useless latch.

Use specific ListenerAdapter type for async retry feature.

* Some code clean up
  • Loading branch information
chickenchickenlove authored and artembilan committed Oct 16, 2024
1 parent c878099 commit 9eafa61
Show file tree
Hide file tree
Showing 6 changed files with 4,335 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -106,6 +108,7 @@
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -168,6 +171,7 @@
* @author Mikael Carlstedt
* @author Borahm Lee
* @author Lokesh Alamuri
* @author Sanghyeok An
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -841,6 +845,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private volatile long lastPoll = System.currentTimeMillis();

private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque<>();

@SuppressWarnings(UNCHECKED)
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {
Expand Down Expand Up @@ -896,6 +902,16 @@ else if (listener instanceof MessageListener) {
this.wantsFullRecords = false;
this.pollThreadStateProcessor = setUpPollProcessor(false);
this.observationEnabled = this.containerProperties.isObservationEnabled();

if (!AopUtils.isAopProxy(this.genericListener) &&
this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter<?, ?>) {
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
// This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic.
adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
}
}
}
else {
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
Expand Down Expand Up @@ -1299,6 +1315,15 @@ public void run() {
boolean failedAuthRetry = false;
this.lastReceive = System.currentTimeMillis();
while (isRunning()) {

try {
handleAsyncFailure();
}
catch (Exception e) {
ListenerConsumer.this.logger.error(
"Failed to process async retry messages. skip this time, try it again next loop.");
}

try {
pollAndInvoke();
if (failedAuthRetry) {
Expand Down Expand Up @@ -1440,6 +1465,26 @@ protected void pollAndInvoke() {
}
}

protected void handleAsyncFailure() {
List<FailedRecordTuple<K, V>> copyFailedRecords = new ArrayList<>(this.failedRecords);
this.failedRecords.clear();

// If any copied and failed record fails to complete due to an unexpected error,
// We will give up on retrying with the remaining copied and failed Records.
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
try {
invokeErrorHandlerBySingleRecord(copyFailedRecord);
}
catch (Exception e) {
this.logger.warn(() ->
"Async failed record failed to complete, thus skip it. record :"
+ copyFailedRecord.toString()
+ ", Exception : "
+ e.getMessage());
}
}
}

private void doProcessCommits() {
if (!this.autoCommit && !this.isRecordAck) {
try {
Expand Down Expand Up @@ -2840,6 +2885,44 @@ private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
}
}

private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedRecordTuple) {
final ConsumerRecord<K, V> cRecord = failedRecordTuple.record;
RuntimeException rte = failedRecordTuple.ex;
if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
try {
if (this.producer == null) {
processCommits();
}
}
catch (Exception ex) { // NO SONAR
this.logger.error(ex, "Failed to commit before handling error");
}
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
records.add(cRecord);
this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
else {
boolean handled = false;
try {
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
}
catch (Exception ex) {
this.logger.error(ex, "ErrorHandler threw unexpected exception");
}
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
if (!handled) {
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
tp -> new ArrayList<>()).add(cRecord);
}
if (!records.isEmpty()) {
this.remainingRecords = new ConsumerRecords<>(records);
this.pauseForPending = true;
}
}
}

private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {

Expand Down Expand Up @@ -3312,6 +3395,10 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
.values();
}

private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
}

@Override
public void seek(String topic, int partition, long offset) {
this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
Expand Down Expand Up @@ -3926,4 +4013,6 @@ private static class StopAfterFenceException extends KafkaException {

}

private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -158,6 +159,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

private BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback;

/**
* Create an instance with the provided bean and method.
* @param bean the bean.
Expand Down Expand Up @@ -707,16 +710,27 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
Throwable t, Message<?> source) {

try {
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t));
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), cause));
}
catch (Throwable ex) {
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
if (canAsyncRetry(request, ex) && this.callbackForAsyncFailure != null) {
@SuppressWarnings("unchecked")
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
this.callbackForAsyncFailure.accept(record, (RuntimeException) ex);
}
}
}

private static boolean canAsyncRetry(Object request, Throwable exception) {
// The async retry with @RetryableTopic is only supported for SingleRecord Listener.
return request instanceof ConsumerRecord && exception instanceof RuntimeException;
}

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

Expand Down Expand Up @@ -788,7 +802,8 @@ protected final String createMessagingErrorMessage(String description, Object pa
* @param method the method.
* @return the type.
*/
protected Type determineInferredType(Method method) { // NOSONAR complexity
@Nullable
protected Type determineInferredType(@Nullable Method method) { // NOSONAR complexity
if (method == null) {
return null;
}
Expand Down Expand Up @@ -911,6 +926,20 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#callbackForAsyncFailure} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(
@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {

this.callbackForAsyncFailure = asyncRetryCallback;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand Down
Loading

0 comments on commit 9eafa61

Please sign in to comment.