Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(microservices): messages emitted before kafka client is ready fail #11026

Merged
merged 2 commits into from
Feb 3, 2023

Conversation

kamilmysliwiec
Copy link
Member

@kamilmysliwiec kamilmysliwiec commented Feb 3, 2023

PR Checklist

Please check if your PR fulfills the following requirements:

PR Type

What kind of change does this PR introduce?

  • Bugfix
  • Feature
  • Code style update (formatting, local variables)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • CI related changes
  • Other... Please describe:

What is the current behavior?

Issue Number: #10449

What is the new behavior?

Does this PR introduce a breaking change?

  • Yes
  • No

Other information

@@ -94,46 +94,56 @@ export class ClientKafka extends ClientProxy {
this.consumer && (await this.consumer.disconnect());
this.producer = null;
this.consumer = null;
this.initialized$ = null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamilmysliwiec is there any convention defined in Nestjs for the use of the $ suffix?
I'm aware that is an Angular convention to define Observable but doesn't seem to express any special type here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be somewhat confusing indeed @overbit. We usually use $ for observables, we don't have any specific convention dedicated to promises. Let me fix this up

@coveralls
Copy link

Pull Request Test Coverage Report for Build 9ca3f05f-9a28-4b4b-9c3a-9d826bebd6fa

  • 22 of 24 (91.67%) changed or added relevant lines in 1 file are covered.
  • 4 unchanged lines in 1 file lost coverage.
  • Overall coverage increased (+0.02%) to 92.958%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/microservices/client/client-kafka.ts 22 24 91.67%
Files with Coverage Reduction New Missed Lines %
packages/microservices/server/server-nats.ts 4 88.07%
Totals Coverage Status
Change from base Build 6032d3ea-d47b-430d-a0be-73d44b81f028: 0.02%
Covered Lines: 6389
Relevant Lines: 6873

💛 - Coveralls

@kamilmysliwiec kamilmysliwiec merged commit 452a71b into master Feb 3, 2023
@delete-merged-branch delete-merged-branch bot deleted the fix/kafka-client-parallel-messages branch February 3, 2023 09:40
this.client = null;
}

public async connect(): Promise<Producer> {
if (this.client) {
return this.producer;
return this.initialized$.then(() => this.producer);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question around consistency:
in the rest of the function, we use async await but in there we leverage .then.

I'd suggest keeping the same approach here to make the code more readable even if the execution is not a 100% replacement of the promise chaining.

Suggested change
return this.initialized$.then(() => this.producer);
await this.initialized$;
return this.producer;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.then is intentionally used here to indicate the intent. The Promise here is used for slightly different purpose than other promises in this class

groupId: this.groupId,
},
);
this.initialized$ = new Promise(async (resolve, reject) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to split the initialized promise into two promises consumer and producer?
Conceptually, a service can start producing without the need to wait for a Kafka consumer connection to be established.
In Event-driven architecture, we need to aim for produce as soon as required but consume only when the service is ready to accept.
@kamilmysliwiec what do you think?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we could just create a promise for the consumer.connect and consumer.bind and not await for it so the function connect can continue to execute and await only for producer.connect.
I'll draft a PR with the idea to make it more clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, a service can start producing without the need to wait for a Kafka consumer connection to be established.

Well, it depends. If you're (for some reason) using the message-based approach then you should wait for a Kafka consumer connection to be established before you start producing messages.

If you don't need to consume messages on the other hand, you could just use the producerOnly mode which skips the initialization of the consumer - in this case we don't need 2 promises either.

So IMO we should leave it as is

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. That is unfortunately to force the same approach for both but it would require a major refactor so I get it.
Thanks @kamilmysliwiec

@overbit
Copy link

overbit commented Feb 3, 2023

#11029 @kamilmysliwiec if that makes sense I'll properly fix the PR and we can revert this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants