Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnsatisfiedArgumentException on Kafka Listener using Kotlin #52

Closed
Chase22 opened this issue Jul 18, 2019 · 3 comments · Fixed by #814
Closed

UnsatisfiedArgumentException on Kafka Listener using Kotlin #52

Chase22 opened this issue Jul 18, 2019 · 3 comments · Fixed by #814
Assignees

Comments

@Chase22
Copy link

Chase22 commented Jul 18, 2019

When defining a message processing method using @Topic i get an "UnsatisfiedArgumentException: Required argument [String key] not specified"

Micronaut-Version: 1.1.3
JVM Version: Java 12

@KafkaListener(offsetStrategy = OffsetStrategy.SYNC, batch = true, threads = 7)
open class Consumer {
    @Topic("\${application.kafka.topic.name}")
    fun processMessage(
        @KafkaKey key: String, 
        message: String, 
        offset: Long, 
        partition: Int, 
        topic: String
    ): Boolean {
        \\...
}
@paulolieuthier
Copy link

Have you tried disabling batching (just to help debugging)? I filed a similar issue in #54.

@hmatt1
Copy link
Contributor

hmatt1 commented Jul 29, 2019

To add to paulolieuthier's point,

Check out the batch section of the docs:
https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerBatch

When batch = true, you receive a list of messages, a list of offsets, etc.

The example method signature looks like this:

@Topic("all-the-books")
public void receive(
        List<Book> books,
        List<Long> offsets,
        List<Integer> partitions,
        List<String> topics,
        Consumer kafkaConsumer) {

@guillermocalvo
Copy link
Contributor

Thanks for for bringing this issue to our attention.

Current documentation does not accurately reflect the way kafka batch listeners work.

The proper way to handle this use case should be by defining a consumer method that receives a list of consumer records:

@Topic("all-the-books")
public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) {
    /* ... */
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants