Skip to content

Commit

Permalink
fix: stomp reconnection and missing promises
Browse files Browse the repository at this point in the history
  • Loading branch information
eddort committed Feb 16, 2023
1 parent 8c8233b commit ecf1c7a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
52 changes: 31 additions & 21 deletions src/transport/stomp/stomp.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { LoggerService } from '@nestjs/common';

// https://stomp.github.io/stomp-specification-1.1.html#Overview
const VERSIONS = '1.0,1.1';
const STOMP_DIED_ERROR = 'STOMP died';

export default class StompClient {
private ws: WebSocket;
Expand Down Expand Up @@ -44,20 +45,25 @@ export default class StompClient {
body = '',
) {
const msg = StompFrame.marshall(command, headers, body);
this.ws.send(msg.toString());
return new Promise((res, rej) => {
this.ws.send(msg.toString(), (error) => (error ? rej(error) : res(null)));
});
}

private onOpen() {
this.opened = true;
}

private async onClose(code: number, reason: Buffer) {
const closeReason = reason.toString();

const isClosedNormally = code === 1000;
const closeReasonMessage = reason.toString();
this.logger?.warn('WS connection is closed', { code, closeReasonMessage });
// check code and closeReasonMessage because we have a case
// with 1000 and closeReasonMessage as STOMP_DIED_ERROR
// and we need to reconnect in this case
const isClosedNormally =
code === 1000 && closeReasonMessage !== STOMP_DIED_ERROR;
if (isClosedNormally) return;

this.logger?.warn('WS connection is closed', { code, closeReason });
await this.reconnect();
}

Expand All @@ -67,6 +73,8 @@ export default class StompClient {
}

private async reconnect() {
this.logger?.warn('WS connection is reconnecting');

this.cleanUp();
await sleep(10000);

Expand All @@ -83,7 +91,9 @@ export default class StompClient {
this.opened = false;
try {
this.ws.close();
} catch (error) {}
} catch (error) {
console.log(error, 'error!');
}
}

public async connect(headers = {}, timeout = 10000) {
Expand All @@ -101,7 +111,7 @@ export default class StompClient {
headers['passcode'] = this.passcode;
}

this.transmit('CONNECT', headers);
await this.transmit('CONNECT', headers);
}

private async _connect(timeout: number) {
Expand Down Expand Up @@ -131,14 +141,14 @@ export default class StompClient {
await sleep(1000);
}
headers['destination'] = destination;
return this.transmit('SEND', headers, body);
return await this.transmit('SEND', headers, body);
}

public subscribe(
public async subscribe(
destination: string,
callback: (frame) => void,
callback: (frame: StompFrame) => void,
headers: Record<string, string> = {},
): string {
) {
if (!('id' in headers)) {
headers['id'] = `sub-${this.counter}`;
this.counter += 1;
Expand All @@ -147,25 +157,25 @@ export default class StompClient {
headers['destination'] = destination;
this.subscriptions[headers['id']] = callback;

this.transmit('SUBSCRIBE', headers);
await this.transmit('SUBSCRIBE', headers);

return headers['id'];
}

public unsubscribe(subscriptionId: string): void {
public async unsubscribe(subscriptionId: string) {
delete this.subscriptions[subscriptionId];
this.transmit('UNSUBSCRIBE', { id: subscriptionId });
await this.transmit('UNSUBSCRIBE', { id: subscriptionId });
}

private acknowledged(
private async acknowledged(
acknowledgedType: 'ACK' | 'NACK',
messageId: string,
subscriptionId: string,
headers: Record<string, string> = {},
): void {
) {
headers['message-id'] = messageId;
headers['subscription'] = subscriptionId;
this.transmit(acknowledgedType, headers);
await this.transmit(acknowledgedType, headers);
}

private onMessage(event): void {
Expand All @@ -190,11 +200,11 @@ export default class StompClient {
const onReceive = this.subscriptions[subscription];
const messageId = frame.headers['message-id'];

const ack = (headers: Record<string, string> = {}) => {
this.acknowledged('ACK', messageId, subscription, headers);
const ack = async (headers: Record<string, string> = {}) => {
await this.acknowledged('ACK', messageId, subscription, headers);
};
const nack = (headers: Record<string, string> = {}) => {
this.acknowledged('NACK', messageId, subscription, headers);
const nack = async (headers: Record<string, string> = {}) => {
await this.acknowledged('NACK', messageId, subscription, headers);
};

frame.ack = ack;
Expand Down
8 changes: 6 additions & 2 deletions src/transport/stomp/stomp.frame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ export class StompFrame {
/**
* Use ack and nack to confirm delivery outside client.
*/
public ack: (headers?: Record<string, string>) => void = (
public ack: (
headers?: Record<string, string>,
) => Promise<void | Record<string, string>> = async (
headers: Record<string, string> = {},
) => headers;
public nack: (headers?: Record<string, string>) => void = (
public nack: (
headers?: Record<string, string>,
) => Promise<void | Record<string, string>> = async (
headers: Record<string, string> = {},
) => headers;

Expand Down
2 changes: 1 addition & 1 deletion src/transport/stomp/stomp.transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export default class StompTransport implements TransportInterface {
cb: (message: T) => Promise<void>,
): Promise<void> {
const destination = `/exchange/${topic}/${messageType}`;
this.client.subscribe(destination, (frame) => {
await this.client.subscribe(destination, (frame) => {
cb(JSON.parse(frame.body));
});
}
Expand Down

0 comments on commit ecf1c7a

Please sign in to comment.