diff --git a/src/client.ts b/src/client.ts index d0337b0..fe6bcb4 100644 --- a/src/client.ts +++ b/src/client.ts @@ -19,6 +19,26 @@ import { /** This file is the entry point for browsers, re-export common elements. */ export * from './common'; +/** @category Client */ +export interface EventListeners { + /** + * Emitted when the client starts connecting to the server. + * + * @param reconnecting - Whether the client is reconnecting after the connection was broken. + */ + connecting?: (reconnecting: boolean) => void; + /** + * Emitted when the client receives a message from the server. + */ + message?: (message: StreamMessage) => void; + /** + * Emitted when the client has successfully connected to the server. + * + * @param reconnecting - Whether the client has reconnected after the connection was broken. + */ + connected?: (reconnected: boolean) => void; +} + /** @category Client */ export interface ClientOptions { /** @@ -44,7 +64,7 @@ export interface ClientOptions { * - `true`: Establish a connection on first subscribe and close on last unsubscribe. * * Note that the `lazy` option has NO EFFECT when using the client - * in "distinct connection mode" (`singleConnection = false`). + * in "distinct connections mode" (`singleConnection = false`). * * @default true */ @@ -56,7 +76,7 @@ export interface ClientOptions { * Meant to be used in combination with `lazy`. * * Note that the `lazy` option has NO EFFECT when using the client - * in "distinct connection mode" (`singleConnection = false`). + * in "distinct connections mode" (`singleConnection = false`). * * @default 0 */ @@ -190,28 +210,44 @@ export interface ClientOptions { * and because `graphql-sse` implements a custom SSE parser - received messages will **not** appear in browser's DevTools. * * Use this function if you want to inspect valid messages received through the active SSE connection. + * + * @deprecated Consider using {@link ClientOptions.on} instead. */ onMessage?: (message: StreamMessage) => void; + /** + * Event listeners for events happening in teh SSE connection. + * + * Will emit events for both the "single connection mode" and the default "distinct connections mode". + * + * Beware that the `connecting` event will be called for **each** subscription when using with "distinct connections mode". + */ + on?: EventListeners; } /** @category Client */ -export interface Client { +export interface Client { /** * Subscribes to receive through a SSE connection. * * It uses the `sink` to emit received data or errors. Returns a _dispose_ * function used for dropping the subscription and cleaning up. + * + * @param on - The event listener for "distinct connections mode". Note that **no events will be emitted** in "single connection mode"; for that, consider using the event listener in {@link ClientOptions}. */ subscribe, Extensions = unknown>( request: RequestParams, sink: Sink>, + on?: SingleConnection extends true ? never : EventListeners, ): () => void; /** * Subscribes and iterates over emitted results from an SSE connection * through the returned async iterator. + * + * @param on - The event listener for "distinct connections mode". Note that **no events will be emitted** in "single connection mode"; for that, consider using the event listener in {@link ClientOptions}. */ iterate, Extensions = unknown>( request: RequestParams, + on?: SingleConnection extends true ? never : EventListeners, ): AsyncIterableIterator>; /** * Dispose of the client, destroy connections and clean up resources. @@ -235,7 +271,7 @@ export interface Client { */ export function createClient( options: ClientOptions, -): Client { +): Client { const { singleConnection = false, lazy = true, @@ -274,6 +310,7 @@ export function createClient( referrer, referrerPolicy, onMessage, + on: clientOn, } = options; const fetchFn = (options.fetchFn || fetch) as typeof fetch; const AbortControllerImpl = (options.abortControllerImpl || @@ -333,6 +370,8 @@ export function createClient( retries++; } + clientOn?.connecting?.(!!retryingErr); + // we must create a new controller here because lazy mode aborts currently active ones connCtrl = new AbortControllerImpl(); const unlistenDispose = client.onDispose(() => connCtrl.abort()); @@ -381,9 +420,14 @@ export function createClient( referrerPolicy, url, fetchFn, - onMessage, + onMessage: (msg) => { + clientOn?.message?.(msg); + onMessage?.(msg); // @deprecated + }, }); + clientOn?.connected?.(!!retryingErr); + connected.waitForThrow().catch(() => (conn = undefined)); return connected; @@ -423,7 +467,11 @@ export function createClient( })(); } - function subscribe(request: RequestParams, sink: Sink) { + function subscribe( + request: RequestParams, + sink: Sink, + on?: EventListeners, + ) { if (!singleConnection) { // distinct connections mode @@ -449,6 +497,9 @@ export function createClient( retries++; } + clientOn?.connecting?.(!!retryingErr); + on?.connecting?.(!!retryingErr); + const url = typeof options.url === 'function' ? await options.url() @@ -475,9 +526,16 @@ export function createClient( url, body: JSON.stringify(request), fetchFn, - onMessage, + onMessage: (msg) => { + clientOn?.message?.(msg); + on?.message?.(msg); + onMessage?.(msg); // @deprecated + }, }); + clientOn?.connected?.(!!retryingErr); + on?.connected?.(!!retryingErr); + for await (const result of getResults()) { // only after receiving results are future connects not considered retries. // this is because a client might successfully connect, but the server @@ -645,7 +703,7 @@ export function createClient( return { subscribe, - iterate(request) { + iterate(request, on) { const pending: ExecutionResult< // TODO: how to not use `any` and not have a redundant function signature? // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -660,22 +718,26 @@ export function createClient( // noop }, }; - const dispose = subscribe(request, { - next(val) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - pending.push(val as any); - deferred.resolve(); - }, - error(err) { - deferred.done = true; - deferred.error = err; - deferred.resolve(); - }, - complete() { - deferred.done = true; - deferred.resolve(); + const dispose = subscribe( + request, + { + next(val) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + pending.push(val as any); + deferred.resolve(); + }, + error(err) { + deferred.done = true; + deferred.error = err; + deferred.resolve(); + }, + complete() { + deferred.done = true; + deferred.resolve(); + }, }, - }); + on, + ); const iterator = (async function* iterator() { for (;;) { diff --git a/tests/client.test.ts b/tests/client.test.ts index 8690839..539ab5c 100644 --- a/tests/client.test.ts +++ b/tests/client.test.ts @@ -74,7 +74,7 @@ it('should supply all valid messages received to onMessage', async () => { // single connection mode let msgs: StreamMessage[] = []; - let client = createClient({ + const singleConnClient = createClient({ singleConnection: true, url: 'http://localhost', fetchFn: fetch, @@ -82,7 +82,7 @@ it('should supply all valid messages received to onMessage', async () => { generateID: () => 'veryunique', onMessage: (msg) => msgs.push(msg), }); - let sub = tsubscribe(client, { + let sub = tsubscribe(singleConnClient, { query: '{ getValue }', }); await Promise.race([sub.throwOnError(), sub.waitForComplete()]); @@ -110,7 +110,7 @@ it('should supply all valid messages received to onMessage', async () => { // distinct connection mode msgs = []; - client = createClient({ + const distinctConnsClient = createClient({ singleConnection: false, url: 'http://localhost', fetchFn: fetch, @@ -118,7 +118,7 @@ it('should supply all valid messages received to onMessage', async () => { generateID: () => 'veryunique', onMessage: (msg) => msgs.push(msg), }); - sub = tsubscribe(client, { + sub = tsubscribe(distinctConnsClient, { query: '{ getValue }', }); await Promise.race([sub.throwOnError(), sub.waitForComplete()]); @@ -597,35 +597,35 @@ describe('retries', () => { // lazy tried = 0; - let client = createClient({ + const singleConnClient = createClient({ singleConnection: true, url: 'http://localhost', fetchFn: fetch, retryAttempts: 2, retry: () => Promise.resolve(), }); - let sub = tsubscribe(client, { query: '{ getValue }' }); + let sub = tsubscribe(singleConnClient, { query: '{ getValue }' }); await expect(sub.waitForError()).resolves.toMatchInlineSnapshot( `[NetworkError: Server responded with 403: Forbidden]`, ); expect(tried).toBe(3); // initial + 2 retries - client.dispose(); + singleConnClient.dispose(); // distinct connections mode tried = 0; - client = createClient({ + const distinctConnsClient = createClient({ singleConnection: false, url: 'http://localhost', fetchFn: fetch, retryAttempts: 2, retry: () => Promise.resolve(), }); - sub = tsubscribe(client, { query: '{ getValue }' }); + sub = tsubscribe(distinctConnsClient, { query: '{ getValue }' }); await expect(sub.waitForError()).resolves.toMatchInlineSnapshot( `[NetworkError: Server responded with 403: Forbidden]`, ); expect(tried).toBe(3); // initial + 2 retries - client.dispose(); + distinctConnsClient.dispose(); }); it('should retry network errors even if they occur during event emission', async () => { @@ -982,3 +982,319 @@ it('should support distinct connections mode with EventSource', async () => { ] `); }); + +describe('event listeners', () => { + describe('single connection mode', () => { + it('should emit connection events', async () => { + const { fetch } = createTFetch(); + + const onConnecting = vitest.fn(); + const onConnected = vitest.fn(); + + const client = createClient({ + singleConnection: true, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 0, + on: { + connecting: onConnecting, + connected: onConnected, + }, + }); + + const iter = client.iterate({ query: '{ getValue }' }); + + await iter.next(); // next + await iter.next(); // complete + + expect(onConnecting).toHaveBeenCalledWith(false); + expect(onConnected).toHaveBeenCalledWith(false); + }); + + it('should emit connection events when reconnecting', async () => { + const { fetch, dispose } = createTFetch(); + + const onConnecting = vitest.fn(); + const onConnected = vitest.fn(); + + const client = createClient({ + singleConnection: true, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 1, + retry: () => Promise.resolve(), + on: { + connecting: onConnecting, + connected: onConnected, + }, + }); + + const pingId = Math.random() + ''; + const iter = client.iterate({ + query: `subscription { ping(key: "${pingId}") }`, + }); + pong(pingId); + + await iter.next(); // next + + expect(onConnecting).toHaveBeenCalledWith(false); + expect(onConnected).toHaveBeenCalledWith(false); + + await dispose(); + pong(pingId); + + await iter.next(); // next + + expect(onConnecting).toHaveBeenCalledWith(true); + expect(onConnected).toHaveBeenCalledWith(true); + }); + + it('should emit "message" events', async () => { + const { fetch } = createTFetch(); + + const onMessage = vitest.fn(); + + const client = createClient({ + singleConnection: true, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 0, + generateID() { + return '1'; + }, + on: { + message: onMessage, + }, + }); + + const iter = client.iterate({ query: '{ getValue }' }); + + await iter.next(); // next + await iter.next(); // complete + + expect(onMessage.mock.calls).toMatchInlineSnapshot(` + [ + [ + { + "data": { + "id": "1", + "payload": { + "data": { + "getValue": "value", + }, + }, + }, + "event": "next", + }, + ], + [ + { + "data": { + "id": "1", + }, + "event": "complete", + }, + ], + ] + `); + }); + + it('should not emit events on subscription listeners', async () => { + const { fetch } = createTFetch(); + + const onConnecting = vitest.fn(); + const onMessage = vitest.fn(); + const onConnected = vitest.fn(); + + const client = createClient({ + singleConnection: true, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 0, + generateID() { + return '1'; + }, + on: { + connecting: onConnecting, + message: onMessage, + connected: onConnected, + }, + }); + + const iter = client.iterate( + { query: '{ getValue }' }, + // @ts-expect-error testing + { + connecting: onConnecting, + message: onMessage, + connected: onConnected, + }, + ); + await iter.next(); // next + await iter.next(); // complete + + expect(onConnecting).toBeCalledTimes(1); + expect(onMessage).toBeCalledTimes(2); // next + complete + expect(onConnected).toBeCalledTimes(1); + }); + }); + + describe('distinct connections mode', () => { + it('should emit connection events', async () => { + const { fetch } = createTFetch(); + + const onConnecting = vitest.fn(); + const onConnected = vitest.fn(); + + const client = createClient({ + singleConnection: false, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 0, + on: { + connecting: onConnecting, + connected: onConnected, + }, + }); + + const iter = client.iterate( + { query: '{ getValue }' }, + { connecting: onConnecting, connected: onConnected }, + ); + await iter.next(); // next + await iter.next(); // complete + + expect(onConnecting).toHaveBeenCalledTimes(2); + expect(onConnecting).toHaveBeenCalledWith(false); + expect(onConnected).toHaveBeenCalledTimes(2); + expect(onConnected).toHaveBeenCalledWith(false); + }); + + it('should emit connection events when reconnecting', async () => { + const { fetch, waitForRequest, dispose } = createTFetch(); + + const onConnecting = vitest.fn(); + const onConnected = vitest.fn(); + + const client = createClient({ + singleConnection: false, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 1, + retry: () => Promise.resolve(), + on: { + connecting: onConnecting, + connected: onConnected, + }, + }); + + client.iterate( + { + query: `subscription { ping(key: "${Math.random()}") }`, + }, + { connecting: onConnecting, connected: onConnected }, + ); + await waitForRequest(); + + await dispose(); + + await waitForRequest(); + + expect(onConnecting.mock.calls).toMatchInlineSnapshot(` + [ + [ + false, + ], + [ + false, + ], + [ + true, + ], + [ + true, + ], + ] + `); + expect(onConnected.mock.calls).toMatchInlineSnapshot(` + [ + [ + false, + ], + [ + false, + ], + [ + true, + ], + [ + true, + ], + ] + `); + }); + + it('should emit "message" events', async () => { + const { fetch } = createTFetch(); + + const onMessage = vitest.fn(); + + const client = createClient({ + singleConnection: false, + url: 'http://localhost', + fetchFn: fetch, + retryAttempts: 1, + retry: () => Promise.resolve(), + on: { + message: onMessage, + }, + }); + + const iter = client.iterate( + { + query: '{ getValue }', + }, + { message: onMessage }, + ); + await iter.next(); // next + await iter.next(); // complete + + expect(onMessage.mock.calls).toMatchInlineSnapshot(` + [ + [ + { + "data": { + "data": { + "getValue": "value", + }, + }, + "event": "next", + }, + ], + [ + { + "data": { + "data": { + "getValue": "value", + }, + }, + "event": "next", + }, + ], + [ + { + "data": null, + "event": "complete", + }, + ], + [ + { + "data": null, + "event": "complete", + }, + ], + ] + `); + }); + }); +}); diff --git a/tests/utils/tsubscribe.ts b/tests/utils/tsubscribe.ts index 32daee2..793d542 100644 --- a/tests/utils/tsubscribe.ts +++ b/tests/utils/tsubscribe.ts @@ -11,10 +11,10 @@ interface TSubscribe { dispose: () => void; } -export function tsubscribe( - client: Client, - payload: RequestParams, -): TSubscribe { +export function tsubscribe< + SingleConnection extends boolean = false, + T = unknown, +>(client: Client, payload: RequestParams): TSubscribe { const emitter = new EventEmitter(); const results: ExecutionResult[] = []; let error: unknown,