diff --git a/src/client.ts b/src/client.ts index 4e4ed32f..efcf5654 100644 --- a/src/client.ts +++ b/src/client.ts @@ -101,6 +101,13 @@ export interface ClientOptions { * @default 5 */ retryAttempts?: number; + /** + * Control the wait time between retries. You may implement your own strategy + * by timing the resolution of the returned promise. + * + * @default Randomised exponential backoff + */ + retryWait?: (tries: number) => Promise; /** * Register listeners before initialising the client. This way * you can ensure to catch all client relevant emitted events. @@ -148,6 +155,23 @@ export function createClient(options: ClientOptions): Client { lazy = true, keepAlive = 0, retryAttempts = 5, + /** + * Retry with randomised exponential backoff. + */ + retryWait = async function retryWait(tries) { + let retryDelay = 1000; // 1s + for (let i = 0; i < tries; i++) { + retryDelay *= 2; + } + await new Promise((resolve) => + setTimeout( + resolve, + retryDelay + + // add random timeout from 300ms to 3s + Math.floor(Math.random() * (3000 - 300) + 300), + ), + ); + }, on, webSocketImpl, /** @@ -175,11 +199,15 @@ export function createClient(options: ClientOptions): Client { } else if (typeof WebSocket !== 'undefined') { ws = WebSocket; } else if (typeof global !== 'undefined') { - // @ts-expect-error: Support more browsers - ws = global.WebSocket || global.MozWebSocket; + ws = + global.WebSocket || + // @ts-expect-error: Support more browsers + global.MozWebSocket; } else if (typeof window !== 'undefined') { - // @ts-expect-error: Support more browsers - ws = window.WebSocket || window.MozWebSocket; + ws = + window.WebSocket || + // @ts-expect-error: Support more browsers + window.MozWebSocket; } if (!ws) { throw new Error('WebSocket implementation missing'); @@ -222,18 +250,12 @@ export function createClient(options: ClientOptions): Client { socket: null as WebSocket | null, acknowledged: false, locks: 0, - tries: 0, + retrying: false, + retries: 0, }; - /** - * Retry with randomised exponential backoff. - */ - const initRetryDelay = 1000; - let currRetryDelay = initRetryDelay; - function retryDelay() { - if (state.tries > 0) currRetryDelay *= 2; - return currRetryDelay + Math.floor(Math.random() * (3000 - 300) + 300); // from 300ms to 3s - } + // all those waiting for the `retryWait` to resolve + const retryWaiting: (() => void)[] = []; type ConnectReturn = Promise< [ @@ -250,6 +272,24 @@ export function createClient(options: ClientOptions): Client { throw new Error('Kept trying to connect but the socket never settled.'); } + // retry wait strategy only on root caller + if (state.retrying && callDepth === 0) { + if (retryWaiting.length) { + // if others are waiting for retry, I'll wait too + await new Promise((resolve) => retryWaiting.push(resolve)); + } else { + retryWaiting.push(() => { + /** fake waiter to lead following connects in the `retryWaiting` queue */ + }); + // use retry wait strategy + await retryWait(state.retries); + // complete all waiting and clear the queue + while (retryWaiting.length) { + retryWaiting.pop()?.(); + } + } + } + // socket already exists. can be ready or pending, check and behave accordingly if (state.socket) { switch (state.socket.readyState) { @@ -285,7 +325,7 @@ export function createClient(options: ClientOptions): Client { ...state, acknowledged: false, socket, - tries: state.tries + 1, + retries: state.retrying ? state.retries + 1 : state.retries, }; emitter.emit('connecting'); @@ -328,8 +368,13 @@ export function createClient(options: ClientOptions): Client { } clearTimeout(tooLong); - state = { ...state, acknowledged: true, socket, tries: 0 }; - currRetryDelay = initRetryDelay; // reset retry delays + state = { + ...state, + acknowledged: true, + socket, + retrying: false, + retries: 0, + }; emitter.emit('connected', socket, message.payload); // connected = socket opened + acknowledged return resolve(); } catch (err) { @@ -455,11 +500,12 @@ export function createClient(options: ClientOptions): Client { } // retries are not allowed or we tried to many times, report error - if (!retryAttempts || state.tries > retryAttempts) { + if (!retryAttempts || state.retries >= retryAttempts) { throw errOrCloseEvent; } - // looks good, please retry + // looks good, start retrying + state.retrying = true; return true; } @@ -476,11 +522,7 @@ export function createClient(options: ClientOptions): Client { return; } catch (errOrCloseEvent) { // return if shouldnt try again - if (!shouldRetryConnectOrThrow(errOrCloseEvent)) { - return; - } - // if should try again, wait a bit and continue loop - await new Promise((resolve) => setTimeout(resolve, retryDelay())); + if (!shouldRetryConnectOrThrow(errOrCloseEvent)) return; } } })(); @@ -579,11 +621,7 @@ export function createClient(options: ClientOptions): Client { return; } catch (errOrCloseEvent) { // return if shouldnt try again - if (!shouldRetryConnectOrThrow(errOrCloseEvent)) { - return; - } - // if should try again, wait a bit and continue loop - await new Promise((resolve) => setTimeout(resolve, retryDelay())); + if (!shouldRetryConnectOrThrow(errOrCloseEvent)) return; } } })() diff --git a/src/tests/client.ts b/src/tests/client.ts index 21fb889a..5aae7fdd 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -664,6 +664,8 @@ describe('reconnecting', () => { it('should reconnect silently after socket closes', async () => { const { url, ...server } = await startTServer(); + console.log(url); + const sub = tsubscribe( createClient({ url, @@ -674,15 +676,21 @@ describe('reconnecting', () => { }, ); + console.log('1111'); + await server.waitForClient((client) => { client.close(); }); + console.log('2222'); + // tried again await server.waitForClient((client) => { + console.log('4444'); client.close(); }); + console.log('3333'); // client reported the error, meaning it wont retry anymore expect.assertions(1); await sub.waitForError((err) => { diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts index 3bc8b8f1..64e2d398 100644 --- a/src/tests/fixtures/simple.ts +++ b/src/tests/fixtures/simple.ts @@ -147,7 +147,7 @@ export async function startTServer( server: httpServer, path, }); - const server = await useServer( + const server = useServer( { schema, execute, @@ -180,6 +180,26 @@ export async function startTServer( keepAlive, ); + // disposes of all started servers + const dispose: Dispose = (beNice) => { + return new Promise((resolve, reject) => { + if (!beNice) { + for (const socket of sockets) { + socket.destroy(); + sockets.delete(socket); + } + } + const disposing = server.dispose() as Promise; + disposing.catch(reject).then(() => { + httpServer.close(() => { + leftovers.splice(leftovers.indexOf(dispose), 1); + resolve(); + }); + }); + }); + }; + leftovers.push(dispose); + // search for open port from the starting port let tried = 0; for (;;) { @@ -220,26 +240,6 @@ export async function startTServer( }); }); - // disposes of all started servers - const dispose: Dispose = (beNice) => { - return new Promise((resolve, reject) => { - if (!beNice) { - for (const socket of sockets) { - socket.destroy(); - sockets.delete(socket); - } - } - const disposing = server.dispose() as Promise; - disposing.catch(reject).then(() => { - httpServer.close(() => { - leftovers.splice(leftovers.indexOf(dispose), 1); - resolve(); - }); - }); - }); - }; - leftovers.push(dispose); - const addr = httpServer.address(); if (!addr || typeof addr !== 'object') { throw new Error(`Unexpected http server address ${addr}`);