Skip to content

Commit

Permalink
Merge pull request #9290 from radekmikeska/kafka-server-event-pattern…
Browse files Browse the repository at this point in the history
…-fix

fix(microservices): fix event pattern behavior
  • Loading branch information
kamilmysliwiec authored Mar 14, 2022
2 parents 26b3d1d + 4d689ee commit f3530ee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
5 changes: 3 additions & 2 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
payload.partition,
payload.topic,
]);
const handler = this.getHandlerByPattern(packet.pattern);
// if the correlation id or reply topic is not set
// then this is an event (events could still have correlation id)
if (!correlationId || !replyTopic) {
if (handler?.isEventHandler || !correlationId || !replyTopic) {
return this.handleEvent(packet.pattern, packet, kafkaContext);
}

Expand All @@ -174,7 +175,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
replyPartition,
correlationId,
);
const handler = this.getHandlerByPattern(packet.pattern);

if (!handler) {
return publish({
id: correlationId,
Expand Down
24 changes: 23 additions & 1 deletion packages/microservices/test/server/server-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,34 @@ describe('ServerKafka', () => {
expect(handleEventSpy.called).to.be.true;
});

it('should call "handleEvent" if correlation identifier is present by the reply topic is not present', async () => {
it('should call "handleEvent" if correlation identifier is present but the reply topic is not present', async () => {
const handleEventSpy = sinon.spy(server, 'handleEvent');
await server.handleMessage(eventWithCorrelationIdPayload);
expect(handleEventSpy.called).to.be.true;
});

it('should call "handleEvent" if correlation identifier and reply topic are present but the handler is of type eventHandler', async () => {
const handler = sinon.spy();
(handler as any).isEventHandler = true;
(server as any).messageHandlers = objectToMap({
[topic]: handler,
});
const handleEventSpy = sinon.spy(server, 'handleEvent');
await server.handleMessage(payload);
expect(handleEventSpy.called).to.be.true;
});

it('should NOT call "handleEvent" if correlation identifier and reply topic are present but the handler is not of type eventHandler', async () => {
const handler = sinon.spy();
(handler as any).isEventHandler = false;
(server as any).messageHandlers = objectToMap({
[topic]: handler,
});
const handleEventSpy = sinon.spy(server, 'handleEvent');
await server.handleMessage(payload);
expect(handleEventSpy.called).to.be.false;
});

it(`should publish NO_MESSAGE_HANDLER if pattern not exists in messageHandlers object`, async () => {
await server.handleMessage(payload);
expect(
Expand Down

0 comments on commit f3530ee

Please sign in to comment.