Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientKafka messages emitted before client is ready fail silently #10449

Closed
3 of 15 tasks
nikolovjdmbh opened this issue Oct 24, 2022 · 5 comments
Closed
3 of 15 tasks

ClientKafka messages emitted before client is ready fail silently #10449

nikolovjdmbh opened this issue Oct 24, 2022 · 5 comments
Labels
needs triage This issue has not been looked into

Comments

@nikolovjdmbh
Copy link

Is there an existing issue for this?

  • I have searched the existing issues

Current behavior

If ClientKafka#emit is called inside a controller before the client is initialized and ready, only the first message is sent (eventually), any subsequent messages sent before the client is ready fail silently.

Minimum reproduction code

https://github.com/donamo/nk

Steps to reproduce

  1. npm ci
  2. (needs a running instance of Kafka), set broker on storage.module.ts
  3. npm run dev
  4. call /storage multiple times before KafkaClient is ready

Expected behavior

All messages sent before ClientKafka initialization should either throw an error or be sent after initialization, not just the first one.

Package

  • I don't know. Or some 3rd-party package
  • @nestjs/common
  • @nestjs/core
  • @nestjs/microservices
  • @nestjs/platform-express
  • @nestjs/platform-fastify
  • @nestjs/platform-socket.io
  • @nestjs/platform-ws
  • @nestjs/testing
  • @nestjs/websockets
  • Other (see below)

Other package

No response

NestJS version

9.1.4

Packages versions

platform-express version : 9.1.4
microservices version    : 9.1.4
schematics version       : 9.0.3
testing version          : 9.1.4
common version           : 9.1.4
core version             : 9.1.4
cli version              : 9.1.4

Node.js version

16.17.0

In which operating systems have you tested?

  • macOS
  • Windows
  • Linux

Other

Logs showing the problem: https://github.com/donamo/nk/blob/master/log

messages sent on lines 21, 26, 31 fail silently.

@nikolovjdmbh nikolovjdmbh added the needs triage This issue has not been looked into label Oct 24, 2022
@nikolovjdmbh nikolovjdmbh changed the title Kafka Client eating emitted messages while Kafka is rebalancing ClientKafka messages emitted before client is ready fail silently Oct 24, 2022
@FelipeCavichiolliSilvestre

Even minimal reproduction code

https://github.com/FelipeCavichiolliSilvestre/kafka-client-nest-bug

NestJS version

9.0.0

Package versions

platform-express version : 9.0.0
microservices version    : 9.1.4
schematics version       : 9.0.0
common version           : 9.0.0
core version             : 9.0.0
cli version              : 9.0.0

Node.js version

16.15.1

In which operating systems have you tested?

  • macOS
  • Windows
  • Linux

Other

This bug does not happen when you pass the option "produceOnlyMode: true" to the Kafka client injected in app.module.ts

This will work:

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'KAFKA_CLIENT',
        transport: Transport.KAFKA,
        options: {
          client: {
            brokers: ['localhost:9094'],
          },
          producerOnlyMode: true,
        },
      },
    ]),
export class AppModule {}

@tak1n
Copy link

tak1n commented Nov 15, 2022

We faced a similar issue, we use a service that injects the Kafka client, but we explicitly called connect() on that which returns a producer instance before emitting Kafka messages:

const producer = await this.client.connect();
await produer.send({ ... });

After a restart of the application, it seems some messages weren't emitted correctly as awaiting the connect() isn't blocking til a connection is properly established. What we did to "fix" this situation is moving the connect() and disconnect() call into NestJS Lifecycle events (OnModuleInit, OnModuleDestroy).

I guess in theory it could still happen, eg when the application starts up but immediately handles a request which has to produce a Kafka message (and the producer isn't connected properly yet). From what I can tell we are explicitly calling connect()/disconnect() for the Kafka Client, none of the above-mentioned reproductions do that. I know GH issues aren't for questions, but I hope my question could bring some benefit to other users here.

Are we supposed to explicitly handle connect()/disconnect() ourselves, or is that normally not required? I could also provide another reproduction repo (if of interest) with using that approach.

EDIT: I guess I should have read the docu more intensely before posting 😅

https://docs.nestjs.com/microservices/basics#client.

The ClientProxy is lazy. It doesn't initiate a connection immediately. Instead, it will be established before the first microservice call, and then reused across each subsequent call. However, if you want to delay the application bootstrapping process until a connection is established, you can manually initiate a connection using the ClientProxy object's connect() method inside the OnApplicationBootstrap lifecycle hook.

EDIT 2: Turns out the issue on our side was caused by emitting multiple messages in parallel which in turn calls this.client.connect(), and not by the issue described here. Sorry for hijacking the thread.

Pseudocode:

export class KafkaProducerService {
  .
  .
  async emit(topic, message) {
    const producer = await this.client.connect();
    await producer.send({ topic, messages: [message] });
  }
  .
  .
}

// Some service using KafkaProducerService
export class SomeService {
  .
  .
  async doSomething {
    await Promise.all([
      this.kafkaProducer.emit('topic1', { ... }),
      this.kafkaProducer.emit('topic2', { ... }),
    ]);
  }
  .
  .
}

@tak1n
Copy link

tak1n commented Nov 15, 2022

After the journey in my last comment, I got curious whether the same or similar problem could occur when using the "nest way" of interacting with Kafka (means by using this.client.emit(), instead of building a separate producer service). I took the freedom to fork the repo from @FelipeCavichiolliSilvestre and adapt it so that it tries to emit multiple Kafka messages for the same request.

Forked repo: https://github.com/tak1n/kafka-client-nest-bug (note it uses producerOnlyMode: true). When starting this up and hitting the endpoint twice I get following log output:

[3:16:17 PM] Starting compilation in watch mode...

[3:16:18 PM] Found 0 errors. Watching for file changes.

[Nest] 1782583  - 11/15/2022, 3:16:18 PM     LOG [NestFactory] Starting Nest application...
[Nest] 1782583  - 11/15/2022, 3:16:18 PM     LOG [InstanceLoader] ClientsModule dependencies initialized +21ms
[Nest] 1782583  - 11/15/2022, 3:16:18 PM     LOG [InstanceLoader] AppModule dependencies initialized +0ms
[Nest] 1782583  - 11/15/2022, 3:16:18 PM    WARN [ServerKafka] WARN [undefined] KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" {"timestamp":"2022-11-15T14:16:18.848Z","logger":"kafkajs"}
[Nest] 1782583  - 11/15/2022, 3:16:18 PM     LOG [ServerKafka] INFO [Consumer] Starting {"timestamp":"2022-11-15T14:16:18.869Z","logger":"kafkajs","groupId":"kafka-bug-server"}
[Nest] 1782583  - 11/15/2022, 3:16:39 PM     LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2022-11-15T14:16:39.701Z","logger":"kafkajs","groupId":"kafka-bug-server","memberId":"kafka-bug-server-9f50b737-5be0-410c-a4ab-84c31400d3a3","leaderId":"kafka-bug-server-9f50b737-5be0-410c-a4ab-84c31400d3a3","isLeader":true,"memberAssignment":{"any":[0]},"groupProtocol":"RoundRobinAssigner","duration":20830}
[Nest] 1782583  - 11/15/2022, 3:16:39 PM     LOG [NestMicroservice] Nest microservice successfully started +5ms
[Nest] 1782583  - 11/15/2022, 3:16:39 PM     LOG [RoutesResolver] AppController {/}: +18ms
[Nest] 1782583  - 11/15/2022, 3:16:39 PM     LOG [RouterExplorer] Mapped {/kafka, GET} route +5ms
[Nest] 1782583  - 11/15/2022, 3:16:39 PM     LOG [NestApplication] Nest application successfully started +3ms
Emiting...
Received: 
{ count: 0 }
Emiting...
Received: 
{ count2: 1 }
Received: 
{ count: 1 }

When explicitly calling await this.client.connect() on application startup (OnModuleInit eg). The problem is fixed:

[3:24:07 PM] Starting compilation in watch mode...

[3:24:09 PM] Found 0 errors. Watching for file changes.

[Nest] 1786949  - 11/15/2022, 3:24:09 PM     LOG [NestFactory] Starting Nest application...
[Nest] 1786949  - 11/15/2022, 3:24:09 PM     LOG [InstanceLoader] ClientsModule dependencies initialized +22ms
[Nest] 1786949  - 11/15/2022, 3:24:09 PM     LOG [InstanceLoader] AppModule dependencies initialized +1ms
[Nest] 1786949  - 11/15/2022, 3:24:09 PM    WARN [ServerKafka] WARN [undefined] KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" {"timestamp":"2022-11-15T14:24:09.801Z","logger":"kafkajs"}
[Nest] 1786949  - 11/15/2022, 3:24:09 PM     LOG [ServerKafka] INFO [Consumer] Starting {"timestamp":"2022-11-15T14:24:09.822Z","logger":"kafkajs","groupId":"kafka-bug-server"}
[Nest] 1786949  - 11/15/2022, 3:24:12 PM     LOG [ServerKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2022-11-15T14:24:12.862Z","logger":"kafkajs","groupId":"kafka-bug-server","memberId":"kafka-bug-server-8ffd0c81-292e-4ae4-8261-70e6418e7137","leaderId":"kafka-bug-server-8ffd0c81-292e-4ae4-8261-70e6418e7137","isLeader":true,"memberAssignment":{"any":[0]},"groupProtocol":"RoundRobinAssigner","duration":3038}
[Nest] 1786949  - 11/15/2022, 3:24:12 PM     LOG [NestMicroservice] Nest microservice successfully started +4ms
[Nest] 1786949  - 11/15/2022, 3:24:12 PM     LOG [RoutesResolver] AppController {/}: +9ms
[Nest] 1786949  - 11/15/2022, 3:24:12 PM     LOG [RouterExplorer] Mapped {/kafka, GET} route +2ms
[Nest] 1786949  - 11/15/2022, 3:24:12 PM     LOG [NestApplication] Nest application successfully started +13ms
Emiting...
Received: 
{ count2: 0 }
Received: 
{ count: 0 }
Emiting...
Received: 
{ count2: 1 }
Received: 
{ count: 1 }

Node.js versions
16.18.1
18.12.1

In which operating systems have you tested?
Linux (Ubuntu 22.04.1 LTS)

@overbit
Copy link

overbit commented Jan 14, 2023

We are experiencing the same issue and thanks @tak1n for the super clear bug report with two simple code examples.

@kamilmysliwiec
Copy link
Member

Fixed in this PR #11026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into
Projects
None yet
Development

No branches or pull requests

5 participants