Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log conversion failures in batch listeners #3555

Closed
schmidti159 opened this issue Oct 11, 2024 · 3 comments · Fixed by #3611
Closed

Log conversion failures in batch listeners #3555

schmidti159 opened this issue Oct 11, 2024 · 3 comments · Fixed by #3611
Assignees
Milestone

Comments

@schmidti159
Copy link

schmidti159 commented Oct 11, 2024

Current Behavior

When using a batch listener the BatchMessagingMessageConverter will put any ConversionExceptions into the CONVERSION_FAILURES header and provide a null payload for this message in the batch passed to the listener.

The exception is not visible in the log at all. The error handling is left up completely to the listener, that needs to inject the header and act on it.

Expected Behavior

In addition to putting the exception in the header it should also be logged as error or warning.

Optional: Provide a configuration property to skip such messages completely after logging the error instead of providing them with a null payload to the listener.

@sobychacko
Copy link
Contributor

Logging the error in addition to the header is reasonable. However, skipping the record based on a configuration might be more invasive, and we must ensure no side effects. Feel free to submit a PR for this if you are up to it.

@artembilan
Copy link
Member

I don't think this is something what has to be done in the framework unconditionally.
More over it feels more like a target project responsibility.
And for the purpose we provide a strategy RecordFilterStrategy.
See more info in docs: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/filtering.html#page-title.
As you see there is a FilteringBatchMessageListenerAdapter under the hood.
So, it is up to you to filter out those records with errors and logging them at the same moment, too.

I would close this as Works as Designed, but I'll wait for your feedback.

@schmidti159
Copy link
Author

schmidti159 commented Oct 15, 2024

The issue with RecordFilterStrategy is, that it is executed earlier.
It works on ConsumerRecords, but the conversion failure only happens later in the RecordMessageConverter. So to log the errors in the RecordFilterStrategy I actually would have to do the conversion twice: Once in the filter strategy to decide if we need to log the error and then it will be done again in the message converter.

I could implement it on my side by overriding the BatchMessagingMessageConverter with something like this:

    @Override
    protected Object convert(
            ConsumerRecord<?, ?> consumerRecord,
            Type type,
            List<ConversionException> conversionFailures) {
        var payload = super.convert(consumerRecord, type, conversionFailures);
        if (payload == null) {
            var conversionFailure = conversionFailures.getLast();
            if(conversionFailure != null) {
                log.error("Could not convert message for topic={}, partition={}, offset={}",
                        consumerRecord.topic(),
                        consumerRecord.partition(),
                        consumerRecord.offset(),
                        conversionFailure);
            }
            return null;
        }
        return payload;
    }

Another option I see and probably how this mechanism with exceptions as headers is intended to be used: Inject all needed headers into every listener (in this case topic, partition, offset and conversion failures) and call a helper method from the listener to log the conversion failures.

If you feel this should be handled by the target project you can close this issue.

Still, it might also be valuable to add a note to https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#payload-conversion-with-batch that by default messages that can't be converted will be passed with a null payload. Or even add a code snippet how to handle this error situation inside the listener method.

@artembilan artembilan added this to the 3.3.0 milestone Nov 1, 2024
artembilan added a commit to artembilan/spring-kafka that referenced this issue Nov 1, 2024
…LURES`

Fixes: spring-projects#3555
Issue link: spring-projects#3555

The batch might be processed in the `@KafkaListener` silently without looking into conversion failures header.
So, that might cause in impression that nothing is wrong with the batch.

* Mention `KafkaHeaders.CONVERSION_FAILURES` in the docs
* Add `warn` for the failed conversion in the `BatchMessagingMessageConverter`
* Some other code optimization clean up in the `BatchMessagingMessageConverter`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants