diff --git a/PROTOCOL.md b/PROTOCOL.md index 323a1f8e..3af2e020 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -56,6 +56,36 @@ interface ConnectionAckMessage { The client is now **ready** to request subscription operations. +### `Ping` + +Direction: **bidirectional** + +Useful for detecting failed connections, displaying latency metrics or other types of network probing. + +A `Pong` must be sent in response from the receiving party as soon as possible. + +The `Ping` message can be sent at any time within the established socket. + +```typescript +interface PingMessage { + type: 'ping'; +} +``` + +### `Pong` + +Direction: **bidirectional** + +The response to the `Ping` message. Must be sent as soon as the `Ping` message is received. + +The `Pong` message can be sent at any time within the established socket. Furthermore, the `Pong` message may even be sent unsolicited as an unidirectional heartbeat. + +```typescript +interface PongMessage { + type: 'pong'; +} +``` + ### `Subscribe` Direction: **Client -> Server** diff --git a/README.md b/README.md index d7783411..5989da94 100644 --- a/README.md +++ b/README.md @@ -589,6 +589,42 @@ const client = createRestartableClient({ +
+🔗 Client usage with ping/pong timeout and latency metrics + +```typescript +import { createClient } from 'graphql-ws'; + +let activeSocket, + timedOut, + pingSentAt = 0, + latency = 0; +createClient({ + url: 'ws://i.time.out:4000/and-measure/latency', + keepAlive: 10_000, // ping server every 10 seconds + on: { + connected: (socket) => (activeSocket = socket), + ping: (received) => { + if (!received /* sent */) { + pingSentAt = Date.now(); + timedOut = setTimeout(() => { + if (activeSocket.readyState === WebSocket.OPEN) + activeSocket.close(4408, 'Request Timeout'); + }, 5_000); // wait 5 seconds for the pong and then close the connection + } + }, + pong: (received) => { + if (received) { + latency = Date.now() - pingSentAt; + clearTimeout(timedOut); // pong is received, clear connection close timeout + } + }, + }, +}); +``` + +
+
🔗 Client usage in browser @@ -1267,124 +1303,6 @@ const client = createClient({
-
-🔗 ws server and client with client to server pings and latency - -```typescript -// 🛸 server - -import { - GraphQLSchema, - GraphQLObjectType, - GraphQLNonNull, - GraphQLString, -} from 'graphql'; -import ws from 'ws'; // yarn add ws -import { useServer } from 'graphql-ws/lib/use/ws'; -import { schema } from './my-graphql-schema'; - -// a custom graphql schema that holds just the ping query. -// used exclusively when the client sends a ping to the server. -// if you want to send/receive more details, simply adjust the pinger schema. -const pinger = new GraphQLSchema({ - query: new GraphQLObjectType({ - name: 'Query', - fields: { - ping: { - type: new GraphQLNonNull(GraphQLString), - resolve: () => 'pong', - }, - }, - }), -}); - -const wsServer = new WebSocket.Server({ - port: 4000, - path: '/graphql', -}); - -useServer( - { - schema: (_ctx, msg) => { - if (msg.payload.query === '{ ping }') return pinger; - return schema; - }, - }, - wsServer, -); -``` - -```typescript -// 📺 client - -import { createClient } from 'graphql-ws'; - -let connection: WebSocket | undefined; -const client = createClient({ - url: 'ws://client.can:4000/send-pings/too', - on: { - connected: (socket) => (connection = socket as WebSocket), - closed: () => (connection = undefined), - }, -}); - -async function ping() { - // record the ping sent at moment for calculating latency - const pinged = Date.now(); - - // if the client went offline or the server is unresponsive - // close the active WebSocket connection as soon as the pong - // wait timeout expires and have the client silently reconnect. - // there is no need to dispose of the subscription since it - // will eventually settle because either: - // - the client reconnected and a new pong is received - // - the retry attempts were exceeded and the close is reported - // because if this, the latency accounts for retry waits too. - // if you do not want this, simply dispose of the ping subscription - // as soon as the pong timeout is exceeded - const pongTimeout = setTimeout( - () => connection?.close(4408, 'Pong Timeout'), - 2000, // expect a pong within 2 seconds of the ping - ); - - // wait for the pong. the promise is guaranteed to settle - await new Promise((resolve, reject) => { - client.subscribe<{ data: { ping: string } }>( - { query: '{ ping }' }, - { - next: () => { - /* not interested in the pong */ - }, - error: reject, - complete: resolve, - }, - ); - // whatever happens to the promise, clear the pong timeout - }).finally(() => clearTimeout(pongTimeout)); - - // record when pong has been received - const ponged = Date.now(); - - // how long it took for the pong to arrive after sending the ping - return ponged - pinged; -} - -// keep pinging until a fatal problem occurs -(async () => { - for (;;) { - const latency = await ping(); - - // or send to your favourite logger - the user - console.info('GraphQL WebSocket connection latency', latency); - - // ping every 3 seconds - await new Promise((resolve) => setTimeout(resolve, 3000)); - } -})(); -``` - -
-
🔗 ws server and client auth usage with token expiration, validation and refresh diff --git a/docs/enums/common.messagetype.md b/docs/enums/common.messagetype.md index 529dbb1e..0638c8eb 100644 --- a/docs/enums/common.messagetype.md +++ b/docs/enums/common.messagetype.md @@ -15,6 +15,8 @@ Types of messages allowed to be sent by the client/server over the WS protocol. - [ConnectionInit](common.messagetype.md#connectioninit) - [Error](common.messagetype.md#error) - [Next](common.messagetype.md#next) +- [Ping](common.messagetype.md#ping) +- [Pong](common.messagetype.md#pong) - [Subscribe](common.messagetype.md#subscribe) ## Enumeration members @@ -49,6 +51,18 @@ ___ ___ +### Ping + +• **Ping** = "ping" + +___ + +### Pong + +• **Pong** = "pong" + +___ + ### Subscribe • **Subscribe** = "subscribe" diff --git a/docs/interfaces/client.clientoptions.md b/docs/interfaces/client.clientoptions.md index ea69fcca..4f63baa9 100644 --- a/docs/interfaces/client.clientoptions.md +++ b/docs/interfaces/client.clientoptions.md @@ -15,6 +15,7 @@ Configuration used for the GraphQL over WebSocket client. - [isFatalConnectionProblem](client.clientoptions.md#isfatalconnectionproblem) - [jsonMessageReplacer](client.clientoptions.md#jsonmessagereplacer) - [jsonMessageReviver](client.clientoptions.md#jsonmessagereviver) +- [keepAlive](client.clientoptions.md#keepalive) - [lazy](client.clientoptions.md#lazy) - [lazyCloseTimeout](client.clientoptions.md#lazyclosetimeout) - [on](client.clientoptions.md#on) @@ -118,6 +119,48 @@ out of the incoming JSON. ___ +### keepAlive + +• `Optional` **keepAlive**: `number` + +The timout between dispatched keep-alive messages, naimly server pings. Internally +dispatches the `PingMessage` type to the server and expects a `PongMessage` in response. +This helps with making sure that the connection with the server is alive and working. + +Timeout countdown starts from the moment the socket was opened and subsequently +after every received `PongMessage`. + +Note that NOTHING will happen automatically with the client if the server never +responds to a `PingMessage` with a `PongMessage`. If you want the connection to close, +you should implement your own logic on top of the client. A simple example looks like this: + +```js +import { createClient } from 'graphql-ws'; + +let activeSocket, timedOut; +createClient({ + url: 'ws://i.time.out:4000/after-5/seconds', + keepAlive: 10_000, // ping server every 10 seconds + on: { + connected: (socket) => (activeSocket = socket), + ping: (received) => { + if (!received) // sent + timedOut = setTimeout(() => { + if (activeSocket.readyState === WebSocket.OPEN) + activeSocket.close(4408, 'Request Timeout'); + }, 5_000); // wait 5 seconds for the pong and then close the connection + }, + pong: (received) => { + if (received) clearTimeout(timedOut); // pong is received, clear connection close timeout + }, + }, +}); +``` + +**`default`** 0 + +___ + ### lazy • `Optional` **lazy**: `boolean` diff --git a/docs/interfaces/common.pingmessage.md b/docs/interfaces/common.pingmessage.md new file mode 100644 index 00000000..23860a81 --- /dev/null +++ b/docs/interfaces/common.pingmessage.md @@ -0,0 +1,17 @@ +[graphql-ws](../README.md) / [common](../modules/common.md) / PingMessage + +# Interface: PingMessage + +[common](../modules/common.md).PingMessage + +## Table of contents + +### Properties + +- [type](common.pingmessage.md#type) + +## Properties + +### type + +• `Readonly` **type**: [Ping](../enums/common.messagetype.md#ping) diff --git a/docs/interfaces/common.pongmessage.md b/docs/interfaces/common.pongmessage.md new file mode 100644 index 00000000..92d0d509 --- /dev/null +++ b/docs/interfaces/common.pongmessage.md @@ -0,0 +1,17 @@ +[graphql-ws](../README.md) / [common](../modules/common.md) / PongMessage + +# Interface: PongMessage + +[common](../modules/common.md).PongMessage + +## Table of contents + +### Properties + +- [type](common.pongmessage.md#type) + +## Properties + +### type + +• `Readonly` **type**: [Pong](../enums/common.messagetype.md#pong) diff --git a/docs/modules/client.md b/docs/modules/client.md index 7f2cddab..54f13581 100644 --- a/docs/modules/client.md +++ b/docs/modules/client.md @@ -18,6 +18,8 @@ - [Message](client.md#message) - [MessageType](client.md#messagetype) - [NextMessage](client.md#nextmessage) +- [PingMessage](client.md#pingmessage) +- [PongMessage](client.md#pongmessage) - [Sink](client.md#sink) - [SubscribeMessage](client.md#subscribemessage) - [SubscribePayload](client.md#subscribepayload) @@ -44,6 +46,10 @@ - [EventListener](client.md#eventlistener) - [EventMessage](client.md#eventmessage) - [EventMessageListener](client.md#eventmessagelistener) +- [EventPing](client.md#eventping) +- [EventPingListener](client.md#eventpinglistener) +- [EventPong](client.md#eventpong) +- [EventPongListener](client.md#eventponglistener) ### Functions @@ -53,7 +59,7 @@ ### Event -Ƭ **Event**: [EventConnecting](client.md#eventconnecting) \| [EventConnected](client.md#eventconnected) \| [EventMessage](client.md#eventmessage) \| [EventClosed](client.md#eventclosed) \| [EventError](client.md#eventerror) +Ƭ **Event**: [EventConnecting](client.md#eventconnecting) \| [EventConnected](client.md#eventconnected) \| [EventPing](client.md#eventping) \| [EventPong](client.md#eventpong) \| [EventMessage](client.md#eventmessage) \| [EventClosed](client.md#eventclosed) \| [EventError](client.md#eventerror) ___ @@ -174,7 +180,7 @@ ___ ### EventListener -Ƭ **EventListener**: `E` extends [EventConnecting](client.md#eventconnecting) ? [EventConnectingListener](client.md#eventconnectinglistener) : `E` extends [EventConnected](client.md#eventconnected) ? [EventConnectedListener](client.md#eventconnectedlistener) : `E` extends [EventMessage](client.md#eventmessage) ? [EventMessageListener](client.md#eventmessagelistener) : `E` extends [EventClosed](client.md#eventclosed) ? [EventClosedListener](client.md#eventclosedlistener) : `E` extends [EventError](client.md#eventerror) ? [EventErrorListener](client.md#eventerrorlistener) : `never` +Ƭ **EventListener**: `E` extends [EventConnecting](client.md#eventconnecting) ? [EventConnectingListener](client.md#eventconnectinglistener) : `E` extends [EventConnected](client.md#eventconnected) ? [EventConnectedListener](client.md#eventconnectedlistener) : `E` extends [EventPing](client.md#eventping) ? [EventPingListener](client.md#eventpinglistener) : `E` extends [EventPong](client.md#eventpong) ? [EventPongListener](client.md#eventponglistener) : `E` extends [EventMessage](client.md#eventmessage) ? [EventMessageListener](client.md#eventmessagelistener) : `E` extends [EventClosed](client.md#eventclosed) ? [EventClosedListener](client.md#eventclosedlistener) : `E` extends [EventError](client.md#eventerror) ? [EventErrorListener](client.md#eventerrorlistener) : `never` #### Type parameters @@ -213,6 +219,64 @@ debugging and logging received messages. ___ +### EventPing + +Ƭ **EventPing**: ``"ping"`` + +___ + +### EventPingListener + +Ƭ **EventPingListener**: (`received`: `boolean`) => `void` + +The first argument communicates whether the ping was received from the server. +If `false`, the ping was sent by the client. + +#### Type declaration + +▸ (`received`): `void` + +##### Parameters + +| Name | Type | +| :------ | :------ | +| `received` | `boolean` | + +##### Returns + +`void` + +___ + +### EventPong + +Ƭ **EventPong**: ``"pong"`` + +___ + +### EventPongListener + +Ƭ **EventPongListener**: (`received`: `boolean`) => `void` + +The first argument communicates whether the pong was received from the server. +If `false`, the pong was sent by the client. + +#### Type declaration + +▸ (`received`): `void` + +##### Parameters + +| Name | Type | +| :------ | :------ | +| `received` | `boolean` | + +##### Returns + +`void` + +___ + ### createClient ▸ **createClient**(`options`): [Client](../interfaces/client.client-1.md) @@ -303,6 +367,18 @@ Re-exports: [NextMessage](../interfaces/common.nextmessage.md) ___ +### PingMessage + +Re-exports: [PingMessage](../interfaces/common.pingmessage.md) + +___ + +### PongMessage + +Re-exports: [PongMessage](../interfaces/common.pongmessage.md) + +___ + ### Sink Re-exports: [Sink](../interfaces/common.sink.md) diff --git a/docs/modules/common.md b/docs/modules/common.md index b4adcefd..3071fe87 100644 --- a/docs/modules/common.md +++ b/docs/modules/common.md @@ -16,6 +16,8 @@ - [Disposable](../interfaces/common.disposable.md) - [ErrorMessage](../interfaces/common.errormessage.md) - [NextMessage](../interfaces/common.nextmessage.md) +- [PingMessage](../interfaces/common.pingmessage.md) +- [PongMessage](../interfaces/common.pongmessage.md) - [Sink](../interfaces/common.sink.md) - [SubscribeMessage](../interfaces/common.subscribemessage.md) - [SubscribePayload](../interfaces/common.subscribepayload.md) @@ -107,7 +109,7 @@ ___ ### Message -Ƭ **Message**: `T` extends [ConnectionAck](../enums/common.messagetype.md#connectionack) ? [ConnectionAckMessage](../interfaces/common.connectionackmessage.md) : `T` extends [ConnectionInit](../enums/common.messagetype.md#connectioninit) ? [ConnectionInitMessage](../interfaces/common.connectioninitmessage.md) : `T` extends [Subscribe](../enums/common.messagetype.md#subscribe) ? [SubscribeMessage](../interfaces/common.subscribemessage.md) : `T` extends [Next](../enums/common.messagetype.md#next) ? [NextMessage](../interfaces/common.nextmessage.md) : `T` extends [Error](../enums/common.messagetype.md#error) ? [ErrorMessage](../interfaces/common.errormessage.md) : `T` extends [Complete](../enums/common.messagetype.md#complete) ? [CompleteMessage](../interfaces/common.completemessage.md) : `never` +Ƭ **Message**: `T` extends [ConnectionAck](../enums/common.messagetype.md#connectionack) ? [ConnectionAckMessage](../interfaces/common.connectionackmessage.md) : `T` extends [ConnectionInit](../enums/common.messagetype.md#connectioninit) ? [ConnectionInitMessage](../interfaces/common.connectioninitmessage.md) : `T` extends [Ping](../enums/common.messagetype.md#ping) ? [PingMessage](../interfaces/common.pingmessage.md) : `T` extends [Pong](../enums/common.messagetype.md#pong) ? [PongMessage](../interfaces/common.pongmessage.md) : `T` extends [Subscribe](../enums/common.messagetype.md#subscribe) ? [SubscribeMessage](../interfaces/common.subscribemessage.md) : `T` extends [Next](../enums/common.messagetype.md#next) ? [NextMessage](../interfaces/common.nextmessage.md) : `T` extends [Error](../enums/common.messagetype.md#error) ? [ErrorMessage](../interfaces/common.errormessage.md) : `T` extends [Complete](../enums/common.messagetype.md#complete) ? [CompleteMessage](../interfaces/common.completemessage.md) : `never` #### Type parameters @@ -127,7 +129,7 @@ ___ ### isMessage -▸ **isMessage**(`val`): val is ConnectionInitMessage \| ConnectionAckMessage \| SubscribeMessage \| NextMessage \| ErrorMessage \| CompleteMessage +▸ **isMessage**(`val`): val is ConnectionInitMessage \| ConnectionAckMessage \| PingMessage \| PongMessage \| SubscribeMessage \| NextMessage \| ErrorMessage \| CompleteMessage Checks if the provided value is a message. @@ -139,7 +141,7 @@ Checks if the provided value is a message. #### Returns -val is ConnectionInitMessage \| ConnectionAckMessage \| SubscribeMessage \| NextMessage \| ErrorMessage \| CompleteMessage +val is ConnectionInitMessage \| ConnectionAckMessage \| PingMessage \| PongMessage \| SubscribeMessage \| NextMessage \| ErrorMessage \| CompleteMessage ___ diff --git a/src/client.ts b/src/client.ts index 2753dcad..ba69d634 100644 --- a/src/client.ts +++ b/src/client.ts @@ -28,6 +28,12 @@ export type EventConnecting = 'connecting'; /** @category Client */ export type EventConnected = 'connected'; // connected = socket opened + acknowledged +/** @category Client */ +export type EventPing = 'ping'; + +/** @category Client */ +export type EventPong = 'pong'; + /** @category Client */ export type EventMessage = 'message'; @@ -41,6 +47,8 @@ export type EventError = 'error'; export type Event = | EventConnecting | EventConnected + | EventPing + | EventPong | EventMessage | EventClosed | EventError; @@ -63,6 +71,22 @@ export type EventConnectedListener = ( /** @category Client */ export type EventConnectingListener = () => void; +/** + * The first argument communicates whether the ping was received from the server. + * If `false`, the ping was sent by the client. + * + * @category Client + */ +export type EventPingListener = (received: boolean) => void; + +/** + * The first argument communicates whether the pong was received from the server. + * If `false`, the pong was sent by the client. + * + * @category Client + */ +export type EventPongListener = (received: boolean) => void; + /** * Called for all **valid** messages received by the client. Mainly useful for * debugging and logging received messages. @@ -95,6 +119,10 @@ export type EventListener = E extends EventConnecting ? EventConnectingListener : E extends EventConnected ? EventConnectedListener + : E extends EventPing + ? EventPingListener + : E extends EventPong + ? EventPongListener : E extends EventMessage ? EventMessageListener : E extends EventClosed @@ -173,6 +201,44 @@ export interface ClientOptions { * @default 0 // close immediately */ lazyCloseTimeout?: number; + /** + * The timout between dispatched keep-alive messages, naimly server pings. Internally + * dispatches the `PingMessage` type to the server and expects a `PongMessage` in response. + * This helps with making sure that the connection with the server is alive and working. + * + * Timeout countdown starts from the moment the socket was opened and subsequently + * after every received `PongMessage`. + * + * Note that NOTHING will happen automatically with the client if the server never + * responds to a `PingMessage` with a `PongMessage`. If you want the connection to close, + * you should implement your own logic on top of the client. A simple example looks like this: + * + * ```js + * import { createClient } from 'graphql-ws'; + * + * let activeSocket, timedOut; + * createClient({ + * url: 'ws://i.time.out:4000/after-5/seconds', + * keepAlive: 10_000, // ping server every 10 seconds + * on: { + * connected: (socket) => (activeSocket = socket), + * ping: (received) => { + * if (!received) // sent + * timedOut = setTimeout(() => { + * if (activeSocket.readyState === WebSocket.OPEN) + * activeSocket.close(4408, 'Request Timeout'); + * }, 5_000); // wait 5 seconds for the pong and then close the connection + * }, + * pong: (received) => { + * if (received) clearTimeout(timedOut); // pong is received, clear connection close timeout + * }, + * }, + * }); + * ``` + * + * @default 0 + */ + keepAlive?: number; /** * How many times should the client try to reconnect on abnormal socket closure before it errors out? * @@ -278,6 +344,7 @@ export function createClient(options: ClientOptions): Client { lazy = true, onNonLazyError = console.error, lazyCloseTimeout = 0, + keepAlive = 0, retryAttempts = 5, retryWait = async function randomisedExponentialBackoff(retries) { let retryDelay = 1000; // start with 1s delay @@ -357,6 +424,8 @@ export function createClient(options: ClientOptions): Client { const listeners: { [event in Event]: EventListener[] } = { connecting: on?.connecting ? [on.connecting] : [], connected: on?.connected ? [on.connected] : [], + ping: on?.ping ? [on.ping] : [], + pong: on?.pong ? [on.pong] : [], message: on?.message ? [message.emit, on.message] : [message.emit], closed: on?.closed ? [on.closed] : [], error: on?.error ? [on.error] : [], @@ -414,6 +483,19 @@ export function createClient(options: ClientOptions): Client { GRAPHQL_TRANSPORT_WS_PROTOCOL, ); + let queuedPing: ReturnType; + function enqueuePing() { + if (isFinite(keepAlive) && keepAlive > 0) { + clearTimeout(queuedPing); // in case where a pong was received before a ping (this is valid behaviour) + queuedPing = setTimeout(() => { + if (socket.readyState === WebSocketImpl.OPEN) { + socket.send(stringifyMessage({ type: MessageType.Ping })); + emitter.emit('ping', false); + } + }, keepAlive); + } + } + socket.onerror = (err) => { // we let the onclose reject the promise for correct retry handling emitter.emit('error', err); @@ -421,6 +503,7 @@ export function createClient(options: ClientOptions): Client { socket.onclose = (event) => { connecting = undefined; + clearTimeout(queuedPing); emitter.emit('closed', event); denied(event); }; @@ -439,6 +522,7 @@ export function createClient(options: ClientOptions): Client { replacer, ), ); + enqueuePing(); // enqueue ping (noop if disabled) } catch (err) { socket.close( 4400, @@ -452,6 +536,15 @@ export function createClient(options: ClientOptions): Client { try { const message = parseMessage(data, reviver); emitter.emit('message', message); + if (message.type === 'ping' || message.type === 'pong') { + emitter.emit(message.type, true); // received + if (message.type === 'ping') { + // respond with pong on ping + socket.send(stringifyMessage({ type: MessageType.Pong })); + emitter.emit('pong', false); + } else enqueuePing(); // enqueue next ping on pong (noop if disabled) + return; // ping and pongs can be received whenever + } if (acknowledged) return; // already connected and acknowledged if (message.type !== MessageType.ConnectionAck) diff --git a/src/common.ts b/src/common.ts index 891c185e..a7b8786d 100644 --- a/src/common.ts +++ b/src/common.ts @@ -64,6 +64,9 @@ export enum MessageType { ConnectionInit = 'connection_init', // Client -> Server ConnectionAck = 'connection_ack', // Server -> Client + Ping = 'ping', // bidirectional + Pong = 'pong', /// bidirectional + Subscribe = 'subscribe', // Client -> Server Next = 'next', // Server -> Client Error = 'error', // Server -> Client @@ -82,6 +85,16 @@ export interface ConnectionAckMessage { readonly payload?: Record; } +/** @category Common */ +export interface PingMessage { + readonly type: MessageType.Ping; +} + +/** @category Common */ +export interface PongMessage { + readonly type: MessageType.Pong; +} + /** @category Common */ export interface SubscribeMessage { readonly id: ID; @@ -123,6 +136,10 @@ export type Message = ? ConnectionAckMessage : T extends MessageType.ConnectionInit ? ConnectionInitMessage + : T extends MessageType.Ping + ? PingMessage + : T extends MessageType.Pong + ? PongMessage : T extends MessageType.Subscribe ? SubscribeMessage : T extends MessageType.Next @@ -160,6 +177,10 @@ export function isMessage(val: unknown): val is Message { val.payload === undefined || isObject(val.payload) ); + case MessageType.Ping: + case MessageType.Pong: + // ping and pong types are simply valid + return true; case MessageType.Subscribe: return ( hasOwnStringProperty(val, 'id') && diff --git a/src/server.ts b/src/server.ts index 3d93d1bc..3a4cb60c 100644 --- a/src/server.ts +++ b/src/server.ts @@ -585,6 +585,12 @@ export function makeServer(options: ServerOptions): Server { ctx.acknowledged = true; return; } + case MessageType.Ping: { + await socket.send(stringifyMessage({ type: MessageType.Pong })); + return; + } + case MessageType.Pong: + return; case MessageType.Subscribe: { if (!ctx.acknowledged) return socket.close(4401, 'Unauthorized'); diff --git a/src/tests/client.ts b/src/tests/client.ts index e2d77b65..3d174d71 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -5,7 +5,7 @@ import WebSocket from 'ws'; import { EventEmitter } from 'events'; import { createClient, Client, EventListener } from '../client'; -import { SubscribePayload } from '../common'; +import { MessageType, stringifyMessage, SubscribePayload } from '../common'; import { startWSTServer as startTServer, waitForDone } from './utils'; // simulate browser environment for easier client testing @@ -401,6 +401,84 @@ it( }), ); +describe('ping/pong', () => { + it('should respond with a pong to a ping', async () => { + expect.assertions(1); + + const { url, waitForConnect, waitForClient, waitForClientClose } = + await startTServer(); + + createClient({ + url, + lazy: false, + retryAttempts: 0, + onNonLazyError: noop, + }); + + await waitForConnect(); + + await waitForClient((client) => { + client.send(stringifyMessage({ type: MessageType.Ping })); + client.onMessage((data) => { + expect(data).toBe('{"type":"pong"}'); + }); + }); + + await waitForClientClose(() => { + fail("Shouldn't have closed"); + }, 20); + }); + + it('should not react to a pong', async () => { + const { url, waitForConnect, waitForClient, waitForClientClose } = + await startTServer(); + + createClient({ + url, + lazy: false, + retryAttempts: 0, + onNonLazyError: noop, + }); + + await waitForConnect(); + + await waitForClient((client) => { + client.send(stringifyMessage({ type: MessageType.Pong })); + client.onMessage(() => { + fail("Shouldn't have received a message"); + }); + }); + + await waitForClientClose(() => { + fail("Shouldn't have closed"); + }, 20); + }); + + it( + 'should ping the server after the keepAlive timeout', + waitForDone(async (done) => { + const { url, waitForConnect, waitForClient } = await startTServer(); + + createClient({ + url, + lazy: false, + retryAttempts: 0, + onNonLazyError: noop, + keepAlive: 20, + }); + + await waitForConnect(); + + await waitForClient((client) => { + client.onMessage((data) => { + expect(data).toBe('{"type":"ping"}'); + done(); + }); + }); + }), + ); +}); + describe('query operation', () => { it('should execute the query, "next" the result and then complete', async () => { const { url } = await startTServer(); @@ -1480,4 +1558,84 @@ describe('events', () => { }, ); }); + + it('should emit ping and pong events when pinging server', async () => { + const { url, ...server } = await startTServer(); + + const pingFn = jest.fn(noop as EventListener<'ping'>); + const pongFn = jest.fn(noop as EventListener<'pong'>); + + const client = createClient({ + url, + lazy: false, + retryAttempts: 0, + onNonLazyError: noop, + keepAlive: 20, + on: { + ping: pingFn, + pong: pongFn, + }, + }); + client.on('ping', pingFn); + client.on('pong', pongFn); + + await server.waitForConnect(); + + await new Promise((resolve, reject) => { + server.waitForClient((client) => { + client.onMessage((data) => { + if (data === stringifyMessage({ type: MessageType.Ping })) resolve(); + else reject('Unexpected message'); + }); + }); + }); + + await new Promise((resolve) => { + client.on('pong', () => resolve()); + }); + + expect(pingFn).toBeCalledTimes(2); + expect(pingFn.mock.calls[0][0]).toBeFalsy(); + expect(pingFn.mock.calls[1][0]).toBeFalsy(); + + expect(pongFn).toBeCalledTimes(2); + expect(pongFn.mock.calls[0][0]).toBeTruthy(); + expect(pongFn.mock.calls[1][0]).toBeTruthy(); + }); + + it('should emit ping and pong events when receiving server pings', async () => { + const { url, ...server } = await startTServer(); + + const pingFn = jest.fn(noop as EventListener<'ping'>); + const pongFn = jest.fn(noop as EventListener<'pong'>); + + const client = createClient({ + url, + lazy: false, + retryAttempts: 0, + onNonLazyError: noop, + on: { + ping: pingFn, + pong: pongFn, + }, + }); + client.on('ping', pingFn); + client.on('pong', pongFn); + + await server.waitForClient((client) => { + client.send(stringifyMessage({ type: MessageType.Ping })); + }); + + await new Promise((resolve) => { + client.on('pong', () => resolve()); + }); + + expect(pingFn).toBeCalledTimes(2); + expect(pingFn.mock.calls[0][0]).toBeTruthy(); + expect(pingFn.mock.calls[1][0]).toBeTruthy(); + + expect(pongFn).toBeCalledTimes(2); + expect(pongFn.mock.calls[0][0]).toBeFalsy(); + expect(pongFn.mock.calls[1][0]).toBeFalsy(); + }); }); diff --git a/src/tests/server.ts b/src/tests/server.ts index de62d3a4..9cb80113 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -643,6 +643,46 @@ describe('Connect', () => { }); }); +describe('Ping/Pong', () => { + it('should respond with a pong to a ping', async () => { + const { url } = await startTServer(); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.Ping, + }), + ); + + await client.waitForMessage(({ data }) => { + expect(parseMessage(data)).toEqual({ + type: MessageType.Pong, + }); + }); + }); + + it('should not react to a pong', async () => { + const { url } = await startTServer(); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.Pong, + }), + ); + + await client.waitForMessage(() => { + fail('Shouldt have received a message'); + }, 20); + + await client.waitForClose(() => { + fail('Shouldt have closed'); + }, 20); + }); +}); + describe('Subscribe', () => { it('should close the socket on request if connection is not acknowledged', async () => { const { url } = await startTServer(); diff --git a/src/tests/utils/tservers.ts b/src/tests/utils/tservers.ts index 31dfb665..a30112ab 100644 --- a/src/tests/utils/tservers.ts +++ b/src/tests/utils/tservers.ts @@ -31,6 +31,7 @@ afterEach(async () => { }); export interface TServerClient { + send(data: string): void; onMessage(cb: (message: string) => void): () => void; close(code?: number, data?: string): void; } @@ -176,6 +177,7 @@ export async function startWSTServer( function toClient(socket: ws): TServerClient { return { + send: (data) => socket.send(data), onMessage: (cb) => { socket.on('message', cb); return () => socket.off('message', cb); @@ -435,6 +437,7 @@ export async function startFastifyWSTServer( function toClient(socket: ws): TServerClient { return { + send: (data) => socket.send(data), onMessage: (cb) => { socket.on('message', cb); return () => socket.off('message', cb);