diff --git a/src/client.ts b/src/client.ts index 0ffb5951..d96198d8 100644 --- a/src/client.ts +++ b/src/client.ts @@ -33,85 +33,160 @@ export interface Client extends Disposable { subscribe(payload: SubscribePayload, sink: Sink): () => void; } +/** The nifty internal socket state manager: Socky 🧦. */ +function createSocky() { + let socket: WebSocket | undefined; + let state = { + connecting: false, + connected: false, + disconnecting: false, + }; + + return { + async beginConnecting(): Promise { + if (state.connecting) { + let waitedTimes = 0; + while (state.connecting) { + await new Promise((resolve) => setTimeout(resolve, 100)); + // 100ms * 50 = 5sec + if (waitedTimes >= 50) { + throw new Error('Waited 10 seconds but socket never connected'); + } + waitedTimes++; + } + } + + if (state.disconnecting) { + let waitedTimes = 0; + while (state.disconnecting) { + await new Promise((resolve) => setTimeout(resolve, 100)); + // 100ms * 50 = 5sec + if (waitedTimes >= 50) { + throw new Error('Waited 10 seconds but socket never disconnected'); + } + waitedTimes++; + } + } + + // the state could've changed while waiting for `connecting` or + // `disconnecting`, if it did - start connecting again + if (state.connecting || state.disconnecting) { + return await this.beginConnecting(); + } + + // the socket could've connected in the meantime + if (state.connected) { + return false; + } + + state = { ...state, connecting: true }; + return true; + }, + connected(connectedSocket: WebSocket) { + socket = connectedSocket; + state = { ...state, connected: true, connecting: false }; + }, + registerMessageListener( + listener: (event: MessageEvent) => void, + ): Disposable { + if (!socket) { + throw new Error( + 'Illegal socket access while registering a message listener. Has Socky been prepared?', + ); + } + + socket.addEventListener('message', listener); + return { + dispose: () => { + // we use the internal socket here because the connection + // might have been lost before the deregistration is requested + if (socket) { + socket.removeEventListener('message', listener); + } + }, + }; + }, + send(data: string) { + // TODO-db-200827 decide if accessing missing socket during send is illegal + if (!socket) { + throw new Error( + 'Illegal socket access while sending a message. Preparation skipped?', + ); + } + + if (socket.readyState === WebSocket.OPEN) { + socket.send(data); + } + }, + dispose() { + if (!state.disconnecting) { + state = { ...state, disconnecting: true }; + + if (socket && socket.readyState === WebSocket.OPEN) { + socket.close(1000, 'Normal Closure'); + } + socket = undefined; + + state = { ...state, disconnecting: false, connected: false }; + } + }, + }; +} + /** Creates a disposable GQL subscriptions client. */ export function createClient(options: ClientOptions): Client { const { url, connectionParams } = options; - // holds all currently subscribed sinks, will use this map - // to dispatch messages to the correct destination + const pendingSinks: Record = {}; const subscribedSinks: Record = {}; - function errorAllSinks(err: Error | CloseEvent) { - Object.entries(subscribedSinks).forEach(([, sink]) => sink.error(err)); - } - function completeAllSinks() { - Object.entries(subscribedSinks).forEach(([, sink]) => sink.complete()); - } + const socky = createSocky(); + async function prepare(): Promise { + if (await socky.beginConnecting()) { + return new Promise((resolve, reject) => { + let done = false; + const socket = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); - // Lazily uses the socket singleton to establishes a connection described by the protocol. - let socket: WebSocket | null = null, - connected = false, - connecting = false; - async function connect(): Promise { - if (connected) { - return; - } + /** + * `onerror` handler is unnecessary because even if an error occurs, the `onclose` handler will be called + * + * From: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications + * > If an error occurs while attempting to connect, first a simple event with the name error is sent to the + * > WebSocket object (thereby invoking its onerror handler), and then the CloseEvent is sent to the WebSocket + * > object (thereby invoking its onclose handler) to indicate the reason for the connection's closing. + */ - if (connecting) { - let waitedTimes = 0; - while (!connected) { - await new Promise((resolve) => setTimeout(resolve, 100)); - // 100ms * 50 = 5sec - if (waitedTimes >= 50) { - throw new Error('Waited 10 seconds but socket never connected'); - } - waitedTimes++; - } + socket.onclose = (closeEvent) => { + socky.dispose(); - // connected === true - return; - } + if (closeEvent.code === 1000 || closeEvent.code === 1001) { + // close event `1000: Normal Closure` is ok and so is `1001: Going Away` (maybe the server is restarting) + // complete only subscribed sinks because pending ones want a new connection + Object.entries(subscribedSinks).forEach(([, sink]) => + sink.complete(), + ); + } else { + // all other close events are considered erroneous for all sinks - connected = false; - connecting = true; - return new Promise((resolve, reject) => { - let done = false; - socket = new WebSocket(url, GRAPHQL_TRANSPORT_WS_PROTOCOL); - - /** - * `onerror` handler is unnecessary because even if an error occurs, the `onclose` handler will be called - * - * From: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications - * > If an error occurs while attempting to connect, first a simple event with the name error is sent to the - * > WebSocket object (thereby invoking its onerror handler), and then the CloseEvent is sent to the WebSocket - * > object (thereby invoking its onclose handler) to indicate the reason for the connection's closing. - */ - - socket.onclose = (closeEvent) => { - // NOTE: reading the `CloseEvent.reason` either trows or empties the whole error message - // (if trying to pass the reason in the `Error` message). let the user handle the close event - - if (closeEvent.code === 1000 || closeEvent.code === 1001) { - // close event `1000: Normal Closure` is ok and so is `1001: Going Away` (maybe the server is restarting) - completeAllSinks(); - } else { - // all other close events are considered erroneous - errorAllSinks(closeEvent); - } + // reading the `CloseEvent.reason` can either throw or empty the whole error message + // (if trying to pass the reason in the `Error` message). having this in mind, + // simply let the user handle the close event... + Object.entries(pendingSinks).forEach(([, sink]) => + sink.error(closeEvent), + ); + Object.entries(subscribedSinks).forEach(([, sink]) => + sink.error(closeEvent), + ); + } - if (!done) { - done = true; - connecting = false; - connected = false; - socket = null; - reject(closeEvent); - } - }; - socket.onopen = () => { - try { - if (!socket) { - throw new Error('Opened a socket on nothing'); + if (!done) { + done = true; + reject(closeEvent); } + }; + + socket.onopen = () => { + // as soon as the socket opens, send the connection initalisation request socket.send( stringifyMessage({ type: MessageType.ConnectionInit, @@ -121,158 +196,137 @@ export function createClient(options: ClientOptions): Client { : connectionParams, }), ); - } catch (err) { - errorAllSinks(err); - if (!done) { - done = true; - connecting = false; - if (socket) { - socket.close(); - socket = null; - } - reject(err); - } - } - }; + }; - socket.addEventListener('message', handleMessage); - function handleMessage({ data }: MessageEvent) { - try { - if (!socket) { - throw new Error('Received a message on nothing'); - } + socket.addEventListener('message', handleMessage); + function handleMessage({ data }: MessageEvent) { + try { + const message = parseMessage(data); + if (message.type !== MessageType.ConnectionAck) { + throw new Error( + `First message cannot be of type ${message.type}`, + ); + } - const message = parseMessage(data); - if (message.type !== MessageType.ConnectionAck) { - throw new Error(`First message cannot be of type ${message.type}`); - } + socky.connected(socket); + if (!done) { + done = true; + resolve(); + } + } catch (err) { + socky.dispose(); - // message.type === MessageType.ConnectionAck - if (!done) { - done = true; - connecting = false; - connected = true; // only now is the connection ready - resolve(); - } - } catch (err) { - errorAllSinks(err); - if (!done) { - done = true; - connecting = false; - if (socket) { - socket.close(); - socket = null; + Object.entries(pendingSinks).forEach(([, sink]) => sink.error(err)); + Object.entries(subscribedSinks).forEach(([, sink]) => + sink.error(err), + ); + if (!done) { + done = true; + reject(err); } - reject(err); - } - } finally { - if (socket) { + } finally { socket.removeEventListener('message', handleMessage); } } - } - }); + }); + } } return { subscribe: (payload, sink) => { const uuid = generateUUID(); - if (subscribedSinks[uuid]) { + if (pendingSinks[uuid] || subscribedSinks[uuid]) { sink.error(new Error(`Sink with ID ${uuid} already registered`)); return noop; } - subscribedSinks[uuid] = sink; - - function handleMessage({ data }: MessageEvent) { - const message = parseMessage(data); - switch (message.type) { - case MessageType.Next: { - if (message.id === uuid) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - sink.next(message.payload as any); - } - break; - } - case MessageType.Error: { - if (message.id === uuid) { - sink.error(message.payload); - } - break; - } - case MessageType.Complete: { - if (message.id === uuid) { - sink.complete(); - } - break; - } - } - } + pendingSinks[uuid] = sink; + + let messageListener: Disposable | undefined, + disposed = false; + prepare() + .then(() => { + delete pendingSinks[uuid]; - (async () => { - try { - await connect(); - if (!socket) { - throw new Error('Socket connected but empty'); + // the sink might have been disposed before the socket became ready + if (disposed) { + return; } - socket.addEventListener('message', handleMessage); - socket.send( + subscribedSinks[uuid] = sink; + messageListener = socky.registerMessageListener(({ data }) => { + const message = parseMessage(data); + switch (message.type) { + case MessageType.Next: { + if (message.id === uuid) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sink.next(message.payload as any); + } + break; + } + case MessageType.Error: { + if (message.id === uuid) { + sink.error(message.payload); + } + break; + } + case MessageType.Complete: { + if (message.id === uuid) { + sink.complete(); + } + break; + } + } + }); + + socky.send( stringifyMessage({ id: uuid, type: MessageType.Subscribe, payload, }), ); - } catch (err) { - sink.error(err); - } - })(); + }) + .catch(sink.error); return () => { - if (socket) { - if (socket.readyState === WebSocket.OPEN) { - socket.send( - stringifyMessage({ - id: uuid, - type: MessageType.Complete, - }), - ); - } - - socket.removeEventListener('message', handleMessage); + disposed = true; - // equal to 1 because this sink is the last one. - // the deletion from the map happens afterwards - if (Object.entries(subscribedSinks).length === 1) { - connected = false; - connecting = false; - if (socket.readyState === WebSocket.OPEN) { - socket.close(1000, 'Normal Closure'); - } - socket = null; - } + // having a message listener indicates that prepare has resolved + if (messageListener) { + messageListener.dispose(); + socky.send( + stringifyMessage({ + id: uuid, + type: MessageType.Complete, + }), + ); } sink.complete(); + delete pendingSinks[uuid]; delete subscribedSinks[uuid]; + + if (Object.entries(subscribedSinks).length === 0) { + // dispose of socky if no subscribers are left + socky.dispose(); + } }; }, dispose: async () => { // TODO-db-200817 complete or error? the sinks should be completed BEFORE the client gets disposed - completeAllSinks(); - + Object.entries(pendingSinks).forEach(([, sink]) => sink.complete()); + Object.keys(pendingSinks).forEach((uuid) => { + delete pendingSinks[uuid]; + }); + Object.entries(subscribedSinks).forEach(([, sink]) => { + sink.complete(); + }); Object.keys(subscribedSinks).forEach((uuid) => { delete subscribedSinks[uuid]; }); - // if there is an active socket, close it with a normal closure - if (socket && socket.readyState === WebSocket.OPEN) { - connected = false; - connecting = false; - // TODO-db-200817 decide if `1001: Going Away` should be used instead - socket.close(1000, 'Normal Closure'); - socket = null; - } + // bye bye 👋 + socky.dispose(); }, }; } diff --git a/src/tests/client.ts b/src/tests/client.ts index 538a4d27..151dfe1f 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -143,7 +143,7 @@ describe('subscription operation', () => { complete: completeFnForHappy, }, ); - await wait(5); + await wait(10); const nextFnForBananas = jest.fn(); const completeFnForBananas = jest.fn(); @@ -165,7 +165,7 @@ describe('subscription operation', () => { complete: completeFnForBananas, }, ); - await wait(5); + await wait(10); pubsub.publish('becameHappy', { becameHappy: { @@ -228,3 +228,72 @@ describe('subscription operation', () => { expect(completeFnForBananas).toBeCalled(); }); }); + +it('should dispatch and receive messages even if one subscriber disposes while another one subscribes', async () => { + const client = createClient({ url }); + + const nextFnForHappy = jest.fn(); + const completeFnForHappy = jest.fn(); + let disposeOfHappy: () => void; + setTimeout(() => { + disposeOfHappy = client.subscribe( + { + operationName: 'BecomingHappy', + query: `subscription BecomingHappy { + becameHappy { + name + } + }`, + variables: {}, + }, + { + next: nextFnForHappy, + error: () => { + fail(`Unexpected error call`); + }, + complete: completeFnForHappy, + }, + ); + }); + + const nextFnForBananas = jest.fn(); + const completeFnForBananas = jest.fn(); + setTimeout(async () => { + disposeOfHappy(); + + await wait(5); + + client.subscribe( + { + operationName: 'BoughtBananas', + query: `subscription BoughtBananas { + boughtBananas { + name + } + }`, + variables: {}, + }, + { + next: nextFnForBananas, + error: () => { + fail(`Unexpected error call`); + }, + complete: completeFnForBananas, + }, + ); + + await wait(10); + + pubsub.publish('boughtBananas', { + boughtBananas: { + name: 'john', + }, + }); + }); + + await wait(25); + + expect(nextFnForHappy).not.toBeCalled(); + expect(completeFnForHappy).toBeCalled(); + expect(nextFnForBananas).toBeCalled(); +}); diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts index eb790c8f..6ed1dcab 100644 --- a/src/tests/fixtures/simple.ts +++ b/src/tests/fixtures/simple.ts @@ -80,12 +80,12 @@ export async function startServer( server, () => new Promise((resolve, reject) => { - server - .dispose() - .catch(reject) - .then(() => { + const disposing = server.dispose(); + if (disposing instanceof Promise) { + disposing.catch(reject).then(() => { httpServer.close((err) => (err ? reject(err) : resolve())); }); + } }), ]; } diff --git a/src/types.ts b/src/types.ts index a8c42e8c..bf3ab66a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -14,7 +14,7 @@ export type UUID = string; export interface Disposable { /** Dispose of the instance and clear up resources. */ - dispose: () => Promise; + dispose: () => void | Promise; } /**