Skip to content

Commit

Permalink
fix: do not reconnect ws on normal close
Browse files Browse the repository at this point in the history
  • Loading branch information
avsetsin committed Nov 2, 2022
1 parent f03acad commit 5897e61
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
5 changes: 2 additions & 3 deletions src/transport/kafka/kafka.transport.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { LoggerService } from '@nestjs/common';
import { LoggerModule } from 'common/logger';
import { ConfigModule } from 'common/config';
import { sleep } from 'utils';
import { MockProviderModule } from 'provider';
import { KafkaTransport } from './kafka.transport';
import { Kafka } from 'kafkajs';
Expand Down Expand Up @@ -57,9 +58,7 @@ describe('KafkaTransport', () => {
await transport.publish('test', { label: 'first' }, MessageType.PING);
await transport.publish('test', { label: 'second' }, MessageType.PING);

await new Promise<void>(async (resolve) => {
setTimeout(resolve, 5000);
});
await sleep(5000);

expect(receivedMessages.length).toBe(2);
expect(receivedMessages[0]).toHaveProperty('label');
Expand Down
23 changes: 16 additions & 7 deletions src/transport/stomp/stomp.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ export default class StompClient {
return ws;
}

private async transmit(command, headers, body = '') {
private async transmit(
command: string,
headers: Record<string, string>,
body = '',
) {
const msg = StompFrame.marshall(command, headers, body);
this.ws.send(msg.toString());
}
Expand All @@ -47,13 +51,18 @@ export default class StompClient {
this.opened = true;
}

private async onClose(event) {
this.logger?.warn('WS connection is closed', { closeReason: event.body });
private async onClose(code: number, reason: Buffer) {
const closeReason = reason.toString();

const isClosedNormally = code === 1000;
if (isClosedNormally) return;

this.logger?.warn('WS connection is closed', { code, closeReason });
await this.reconnect();
}

private async onError(event) {
this.logger?.warn('WS connection error', { error: event.body });
private async onError(error: Error) {
this.logger?.warn('WS connection error', { error });
await this.reconnect();
}

Expand Down Expand Up @@ -108,8 +117,8 @@ export default class StompClient {
}
}

public disconnect(headers: Record<string, string> = {}) {
this.transmit('DISCONNECT', headers);
public async disconnect(headers: Record<string, string> = {}) {
await this.transmit('DISCONNECT', headers);
this.cleanUp();
}

Expand Down
18 changes: 7 additions & 11 deletions src/transport/stomp/stomp.transport.e2e-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigModule } from 'common/config';
import { LoggerModule } from 'common/logger';
import { sleep } from 'utils';
import { MessageType } from '../../messages';
import StompTransport from './stomp.transport';
import StompClient from './stomp.client';
Expand All @@ -10,7 +12,7 @@ describe('StompTransport', () => {

beforeEach(async () => {
moduleRef = await Test.createTestingModule({
imports: [ConfigModule.forRoot()],
imports: [ConfigModule.forRoot(), LoggerModule],
providers: [
StompTransport,
{
Expand All @@ -20,10 +22,8 @@ describe('StompTransport', () => {
'ws://127.0.0.1:15674/ws',
'guest', // lgtm[js/hardcoded-credentials]
'guest', // lgtm[js/hardcoded-credentials]
// eslint-disable-next-line @typescript-eslint/no-empty-function
() => {},
// eslint-disable-next-line @typescript-eslint/no-empty-function
() => {},
() => void 0,
() => void 0,
);
},
},
Expand All @@ -42,9 +42,7 @@ describe('StompTransport', () => {
it('should send two messages to topic and read two messages from topic', async () => {
const receivedMessages: any[] = [];

await new Promise<void>(async (resolve) => {
setTimeout(resolve, 1000);
});
await sleep(2000);

await transport.subscribe('amq.direct', MessageType.PING, async (msg) => {
receivedMessages.push(msg);
Expand All @@ -61,9 +59,7 @@ describe('StompTransport', () => {
MessageType.PING,
);

await new Promise<void>(async (resolve) => {
setTimeout(resolve, 2000);
});
await sleep(2000);

expect(receivedMessages.length).toBe(2);
expect(receivedMessages[0]).toHaveProperty('label');
Expand Down

0 comments on commit 5897e61

Please sign in to comment.