Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJSNonRetriableError Caused by: KafkaJSError: This is not the correct coordinator for this group at /usr/app/node_modules/kafkajs/src/consumer/consumerGroup.js:361:17 #1536

Open
Borduhh opened this issue Feb 23, 2023 · 8 comments

Comments

@Borduhh
Copy link

Borduhh commented Feb 23, 2023

Describe the bug
Getting the following error when connecting to an AWS MSK cluster:

KafkaJSNonRetriableError Caused by: KafkaJSError: This is not the correct coordinator for this group at /usr/app/node_modules/kafkajs/src/consumer/consumerGroup.js:361:17 at runMicrotasks (<anonymous>) at processTicksAndRejections (node:internal/process/task_queues:96:5) at async Runner.start (/usr/app/node_modules/kafkajs/src/consumer/runner.js:84:7) at async start (/usr/app/node_modules/kafkajs/src/consumer/index.js:243:7)

To Reproduce

  1. Start up an AWS MSK production system with 2 brokers
  2. Create an AWS ECS consumer with KafkaJS
  3. Consume data from topic X
  4. Delete topic X via the Kafka CLI
  5. Re-create topic X via the kafka CLI with the same name and config
  6. Try to restart AWS ECS consumer

Expected behavior
The consumer should connect just as it did before.

Observed behavior
The consumer failed with the following error:

KafkaJSNonRetriableError
  Caused by: KafkaJSError: This is not the correct coordinator for this group
    at /usr/app/node_modules/kafkajs/src/consumer/consumerGroup.js:361:17
    at runMicrotasks (<anonymous>)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)
    at async Runner.start (/usr/app/node_modules/kafkajs/src/consumer/runner.js:84:7)
    at async start (/usr/app/node_modules/kafkajs/src/consumer/index.js:243:7)
@Borduhh
Copy link
Author

Borduhh commented Feb 23, 2023

It seems like this was a Kafka issue. A broker restart did solve the problem. I am not sure if there is a better way to solve it, however.

@Borduhh
Copy link
Author

Borduhh commented Jun 21, 2023

This happens whenever the brokers are updated and do not fix themselves with a consumer restart. The only fix we have to date is to restart every broker.

The weird part is that all of our Lambda MSK consumers do not have this issue; only our ECS consumers use KafkaJS. It seems like the KafkaJS instance is constantly pinging the wrong broker.

@shg95
Copy link

shg95 commented Nov 28, 2023

Hi @Borduhh ,

I see the above error and also other non retriable errors like topic authorization failed and consumer group authorization failed errors.
The weird part is the this happens suddenly, my consumer consumes the messages and suddenly while consuming it throws these authorization non-retriable errors. Due to this my consumer stalls and stops consuming messages untill I manually restart my pod. As you know its not the right way to do. Any suggestions?
I am also using MSK and my pod is in EKS using kafkajs.

@Jabbar-Zoop
Copy link

Screenshot 2024-01-05 at 12 13 13 AM

I am also getting same issue suddenly consumer throw this error and i have added retry mechanism also but still not working as expected

@shg95
Copy link

shg95 commented Jan 5, 2024

@Jabbar-Zoop ,
Instead of just retrying to connect the consumer. Create a completely new Kafka connection in your retry logic.

@Jabbar-Zoop
Copy link

@shg95 This is what my code look like.Let me know what i am missing

this.consumer = this.queueInstance.consumer({
                groupId: this.kafkaConfig.groupId,
                rebalanceTimeout: 90000,
                retry: {
                    initialRetryTime: 500,
                    retries: 1,
                },
                heartbeatInterval: 30000,
                sessionTimeout: 90000,
                maxWaitTimeInMs: 10000,
                maxBytes: 1000 * 1000 * 500, // Partition * maxBytesPerPartition
                minBytes: 50 * 1000 * 1000,
                allowAutoTopicCreation: false,
                maxBytesPerPartition: 50 * 1000 * 1000,
                partitionAssigners: [PartitionAssigners.roundRobin],
            });
            await this.consumer.connect();
            
             this.consumer.on('consumer.crash', async (payload) => {
                console.log({
                    message: 'Consumer crashed event error',
                    context: { ...payload },
                });
                try {
                    await this.consumer.disconnect();
                    process.exit(1);
                } catch (error) {
                    console.error({
                        message: 'Error in consumer crash event',
                        error: error?.message ?? error,
                    });
                }
            });
            this.consumer.on('consumer.rebalancing', async (payload) => {
                console.log({
                    message: 'Consumer is reblancing',
                    context: { ...payload },
                });
            });

@shg95
Copy link

shg95 commented Jan 5, 2024

@Jabbar-Zoop ,
I see you are listening to the crash event and you disconnect the consumer and close the process.
What I was suggesting was, on crash event recreate the kafka connection instance. What I mean by this is that call this method again,

const kafka = new Kafka({ clientId: 'my-app', brokers: ['kafka1:9092', 'kafka2:9092'] })

This will create a new kafka connection and then you connect the consumer.

@Jabbar-Zoop
Copy link

@shg95 i am getting below error frequently in an interval of 1hr what could be possible reason

{
error: {
code: 27
helpUrl: "https://kafka.js.org/docs/faq#what-does-it-mean-to-get-rebalance-in-progress-errors"
name: "KafkaJSProtocolError"
retriable: false
type: "REBALANCE_IN_PROGRESS"
}
error_message: "The group is rebalancing, so a rejoin is needed"
level: "error"
message: "Error occurred while consuming the queue_message."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants