-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NoSuchFieldException when using consumer inherited from KafkaConsumer #195
Comments
I would looooove to do that, but when I implemented it, I couldn't think of any other way to make sure it's not enabled... |
The root issue of this however - is what should PC do with deserialization errors? Why don't we just add some handling logic to PC itself? Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue? @JorgenRingen how are you handling this issue? |
Yes, just skip and log the error |
We use kafka-streams in front with avro, so it's not a practical issue for us as we control input topic. Our de-serde would also just log error and return null. However, it would be really nice if this was handled by framework with sensible defaults. I kind of like the kafka-streams approach with default and customizable exception-handlers through config: https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_default.deserialization.exception.handler It uses There's also "production-exception-handler" for serialization errors. |
Btw, why isn't this a core feature in kafka-consumer? It seems like the only option is to create a "non-throwing" serde or create some sort of seek-logic. Or to use kafka-spring, which seems like the most popular solution on SO 😆 |
See #291 for implementation of SKIP and SHUTDOWN for user function failure - will be released in next feature release. Keen to get feedback! We should open a separate issue for deserialization issues: |
I run into the same start-up error when using PC and quarkus-native build. |
|
…afkaConsumer Corrects use of reflection for field access, by using the KafkaConsumer class directly, instead of trying to find it from the provided consumer.
Issue description
Hey!
I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.
But then I get this error on startup:
Possible solutions
Workaround
Use composition over inheritance. So
AbstractParallelEoSStreamProcessor
will skip this check.Proper fix
Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use
#getSuperclass()
on it.The text was updated successfully, but these errors were encountered: