Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When an exception is thrown from a @KafkaListener, the remaining messages in the poll loop are skipped #372

Closed
kmangan opened this issue May 17, 2021 · 15 comments · Fixed by #441
Assignees
Milestone

Comments

@kmangan
Copy link

kmangan commented May 17, 2021

Steps to Reproduce

  1. Use a @KafkaListener to consume messages from a topic
  2. Throw an exception from the listener when one of the messages is being processed

Expected Behaviour

The next message on the partition is processed

Actual Behaviour

The remaining messages in the poll loop are skipped, and message consumption resumes from the next poll.
For example:

  • Given 20 messages are on a partition
  • And the poll size is set to 10
  • When an exception is thrown while consuming message number 2
  • Then messages 3 to 9 are skipped
  • And the next message to be processed is 10 (the first message in the next poll loop)

Environment Information

  • Operating System: MacOsx
  • Micronaut Kafka Version: 3.3.1 (this is also present in previous versions)
  • JDK Version: 11

Example Application

Thanks for taking a look at this issue, I'd be interested to hear your thoughts on this. I was not sure if this behaviour was intended, but I know that developers on teams I have worked on who are using @KafkaListeners would not expect this.

'At least once' processing is probably the most common durability requirement when working with Kafka, so it would be good if it was not necessary to 'catch all' exceptions in a KafkaListener to achieve this.

@jaksah
Copy link

jaksah commented Jun 10, 2021

We have observed this behaviour as well on micronaut kafka version 3.3.3. Managed to set up an example application before finding that this issue was already reported: https://github.com/littlewoo/micronaut-kafka-message-skipping

@graemerocher
Copy link
Contributor

@jaksah I guess maybe I am misunderstanding something here, but consider this scenario:

  • Record 1 is processed and exception is thrown and caught by Micronaut Kafka then it moves onto the next consumer record in the current poll loop
  • Record 2 is processed succeeds and offsets are committed

At this point the act of committing the offset tells Kafka it has moved past the record both Record 1 and Record 2 does it not and hence Record 1 will never be processed? Or am I missing something?

@jaksah
Copy link

jaksah commented Jun 10, 2021

@graemerocher Our example application is adding a message to a map before throwing the exception, and at application shutdown it prints out what messages was received. The test is producing 50 messages but the consumer ends up stating it received ~40 messages (varies between runs).

I think it's due to this line, returning false for processConsumerRecords, skipping any other record in ConsumerRecords<?, ?> consumerRecords. This breaks the iteration through the remaining consumer records in the batch from the previous consumer.poll() but the poll loop continues. Nothing is committed until a successful record is processed, but consumer.poll() moves the consumer position, so at the next poll iteration, we will permanently skip everything in the previous batch after the failing record.

@kmangan
Copy link
Author

kmangan commented Jul 6, 2021

Yes @jaksah , this is my understanding too.

Hi @graemerocher , regarding your first bullet point - this isn't actually how it behaves (it will skip remaining messages in the poll loop). The example application has a test that demonstrates this.

@seancarroll
Copy link
Contributor

Given issue #315 I was curious about this as well. I updated KafkaErrorHandlingSpec (link) to try and be as close to https://github.com/kmangan/micronaut-kafka-missed-messages/blob/main/src/test/java/com/example/MissedMessageTest.java and wasnt able to reproduce an issue locally. Granted I am on a branch for a fix to 315 but that I think really only should affect shut down scenarios. Updated KafkaErrorHandlingSpec basically looks like

    protected Map<String, Object> getConfiguration() {
        super.configuration +
                [(EMBEDDED_TOPICS): ["errors"], ('kafka.consumers.default.' + MAX_POLL_RECORDS_CONFIG): "10"]
    }

    void "test an exception that is thrown is not committed"() {
        when:"A consumer throws an exception"
        ErrorClient myClient = context.getBean(ErrorClient)
        IntStream.range(0, 30).forEach(i -> myClient.sendMessage(String.valueOf(i)))

        ErrorCausingConsumer myConsumer = context.getBean(ErrorCausingConsumer)

        then:"The message is re-delivered and eventually handled"
        conditions.eventually {
            myConsumer.received.size() == 30
            myConsumer.count.get() == 31
        }
    }

@sayandigital
Copy link

sayandigital commented Oct 16, 2021

Faced the same issue.The application can have a feedback loop for the unprocessed messages but explicitly should not throw away rest of the messages in the poll.

@cwebbtw
Copy link

cwebbtw commented Oct 20, 2021

Any thoughts about a configuration property to seek back to the failed offset in the default exception handler if the exception is not a deserialisation error?

@graemerocher
Copy link
Contributor

that seems reasonable

@cwebbtw
Copy link

cwebbtw commented Oct 20, 2021

We've been discussing this and we think this would be better than skipping over messages due to order of consumption. I believe the return false in the consumer processor is the correct behaviour since there is no checking with the key partitioner algorithm and we could end up consuming messages out of order.

We will look at contributing a PR to micronaut kafka with the proposed suggestion.

@cwebbtw
Copy link

cwebbtw commented Oct 20, 2021

I've also been discussing with a colleague and the point raised below by @jaksah is a decision that the user should make around prioritising consistency or availability.

I think it's due to this line, returning false for processConsumerRecords

My thoughts are that if we do not return false when consuming the polled records, we would skip that message (which is what Spring does by default), thus prioritising availability over consistency (especially when order is important). If we return false and seek back to the failed offset, we prioritise consistency but reduce availability (no more messages will be consumed).

I think both options are sensible but depend on the use case.

@kmangan
Copy link
Author

kmangan commented Oct 20, 2021

We have the @Retryable annotation for retry semantics, and maybe this can be used in conjunction with the @KafkaListener. The implication would then be that if the @Retryable annotation is not present any message that causes an exception should be skipped.

@vsulibhavi
Copy link

vsulibhavi commented Oct 20, 2021

Thanks, @cwebbtw summarizing the availability vs consistency.

I see this in 4 ways how this can be improved.

  1. Fix the current issue. By default, failed records should be simply logged and we move on to the next ones in the same poll consumer records for consumption.
  2. Ability to seek back to the failed offset in the default exception handler if the exception is not a deserialisation error. @cwebbtw
  3. The SeekToCurrentErrorHandler discards remaining records from the poll() and performs seek operations on the consumer to reset the offsets so that the discarded records are fetched again on the next poll. By default, the error handler tracks the failed record, gives up after 10 delivery attempts and logs the failed record. However, we can also send the failed message to another topic. We call this a dead letter topic.
  4. @retryable topics .

To fix this bug , is it ok if we pick the first option and move on . The error handling can be a new feature request ? @graemerocher

@graemerocher
Copy link
Contributor

@vsulibhavi if option 1) is chosen I assume a subsequent poll loop will pick up the failed on again if the offsets are not committed?

@cwebbtw
Copy link

cwebbtw commented Oct 20, 2021

The implication would then be that if the @Retryable annotation is not present any message that causes an exception should be skipped.

I like this a lot actually rather than a new configuration property - I think this is different to what retryable topics in spring does, which is what @vsulibhavi is referring to in point 4.

I quite like @Retryable being a solution for the following but I also quite like the idea of the KafkaListener annotation having a retry strategy of skip or retry since if you're opting for consistency over availability, you'll want to infinitely retry until the message can be consumed. The alternative being the configuration property, which doesn't feel quite right.

Ability to seek back to the failed offset in the default exception handler if the exception is not a deserialisation error. @cwebbtw

Referring to what @graemerocher added:

@vsulibhavi if option 1) is chosen I assume a subsequent poll loop will pick up the failed on again if the offsets are not committed?

In the case where the consumer repeatedly errors for each record in the poll loop, nothing would be commit and the subsequent poll loop would pick up from the last commit offset. Transient errors in downstream APIs can cause havoc here though if messages are skipped (and logged) and this goes back to the point of prioritising availability over consistency.

@dstepanov
Copy link
Contributor

@cwebbtw Thanks for your contribution! I have combined your changes with my into a separate PR that should solve this by introducing error strategies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants