Skip to content

Commit

Permalink
feat: use retryWait strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Dec 9, 2020
1 parent f7fbb07 commit e061d2a
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 50 deletions.
96 changes: 67 additions & 29 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export interface ClientOptions {
* @default 5
*/
retryAttempts?: number;
/**
* Control the wait time between retries. You may implement your own strategy
* by timing the resolution of the returned promise.
*
* @default Randomised exponential backoff
*/
retryWait?: (tries: number) => Promise<void>;
/**
* Register listeners before initialising the client. This way
* you can ensure to catch all client relevant emitted events.
Expand Down Expand Up @@ -148,6 +155,23 @@ export function createClient(options: ClientOptions): Client {
lazy = true,
keepAlive = 0,
retryAttempts = 5,
/**
* Retry with randomised exponential backoff.
*/
retryWait = async function retryWait(tries) {
let retryDelay = 1000; // 1s
for (let i = 0; i < tries; i++) {
retryDelay *= 2;
}
await new Promise((resolve) =>
setTimeout(
resolve,
retryDelay +
// add random timeout from 300ms to 3s
Math.floor(Math.random() * (3000 - 300) + 300),
),
);
},
on,
webSocketImpl,
/**
Expand Down Expand Up @@ -175,11 +199,15 @@ export function createClient(options: ClientOptions): Client {
} else if (typeof WebSocket !== 'undefined') {
ws = WebSocket;
} else if (typeof global !== 'undefined') {
// @ts-expect-error: Support more browsers
ws = global.WebSocket || global.MozWebSocket;
ws =
global.WebSocket ||
// @ts-expect-error: Support more browsers
global.MozWebSocket;
} else if (typeof window !== 'undefined') {
// @ts-expect-error: Support more browsers
ws = window.WebSocket || window.MozWebSocket;
ws =
window.WebSocket ||
// @ts-expect-error: Support more browsers
window.MozWebSocket;
}
if (!ws) {
throw new Error('WebSocket implementation missing');
Expand Down Expand Up @@ -222,18 +250,12 @@ export function createClient(options: ClientOptions): Client {
socket: null as WebSocket | null,
acknowledged: false,
locks: 0,
tries: 0,
retrying: false,
retries: 0,
};

/**
* Retry with randomised exponential backoff.
*/
const initRetryDelay = 1000;
let currRetryDelay = initRetryDelay;
function retryDelay() {
if (state.tries > 0) currRetryDelay *= 2;
return currRetryDelay + Math.floor(Math.random() * (3000 - 300) + 300); // from 300ms to 3s
}
// all those waiting for the `retryWait` to resolve
const retryWaiting: (() => void)[] = [];

type ConnectReturn = Promise<
[
Expand All @@ -250,6 +272,24 @@ export function createClient(options: ClientOptions): Client {
throw new Error('Kept trying to connect but the socket never settled.');
}

// retry wait strategy only on root caller
if (state.retrying && callDepth === 0) {
if (retryWaiting.length) {
// if others are waiting for retry, I'll wait too
await new Promise((resolve) => retryWaiting.push(resolve));
} else {
retryWaiting.push(() => {
/** fake waiter to lead following connects in the `retryWaiting` queue */
});
// use retry wait strategy
await retryWait(state.retries);
// complete all waiting and clear the queue
while (retryWaiting.length) {
retryWaiting.pop()?.();
}
}
}

// socket already exists. can be ready or pending, check and behave accordingly
if (state.socket) {
switch (state.socket.readyState) {
Expand Down Expand Up @@ -285,7 +325,7 @@ export function createClient(options: ClientOptions): Client {
...state,
acknowledged: false,
socket,
tries: state.tries + 1,
retries: state.retrying ? state.retries + 1 : state.retries,
};
emitter.emit('connecting');

Expand Down Expand Up @@ -328,8 +368,13 @@ export function createClient(options: ClientOptions): Client {
}

clearTimeout(tooLong);
state = { ...state, acknowledged: true, socket, tries: 0 };
currRetryDelay = initRetryDelay; // reset retry delays
state = {
...state,
acknowledged: true,
socket,
retrying: false,
retries: 0,
};
emitter.emit('connected', socket, message.payload); // connected = socket opened + acknowledged
return resolve();
} catch (err) {
Expand Down Expand Up @@ -455,11 +500,12 @@ export function createClient(options: ClientOptions): Client {
}

// retries are not allowed or we tried to many times, report error
if (!retryAttempts || state.tries > retryAttempts) {
if (!retryAttempts || state.retries >= retryAttempts) {
throw errOrCloseEvent;
}

// looks good, please retry
// looks good, start retrying
state.retrying = true;
return true;
}

Expand All @@ -476,11 +522,7 @@ export function createClient(options: ClientOptions): Client {
return;
} catch (errOrCloseEvent) {
// return if shouldnt try again
if (!shouldRetryConnectOrThrow(errOrCloseEvent)) {
return;
}
// if should try again, wait a bit and continue loop
await new Promise((resolve) => setTimeout(resolve, retryDelay()));
if (!shouldRetryConnectOrThrow(errOrCloseEvent)) return;
}
}
})();
Expand Down Expand Up @@ -579,11 +621,7 @@ export function createClient(options: ClientOptions): Client {
return;
} catch (errOrCloseEvent) {
// return if shouldnt try again
if (!shouldRetryConnectOrThrow(errOrCloseEvent)) {
return;
}
// if should try again, wait a bit and continue loop
await new Promise((resolve) => setTimeout(resolve, retryDelay()));
if (!shouldRetryConnectOrThrow(errOrCloseEvent)) return;
}
}
})()
Expand Down
8 changes: 8 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ describe('reconnecting', () => {
it('should reconnect silently after socket closes', async () => {
const { url, ...server } = await startTServer();

console.log(url);

const sub = tsubscribe(
createClient({
url,
Expand All @@ -674,15 +676,21 @@ describe('reconnecting', () => {
},
);

console.log('1111');

await server.waitForClient((client) => {
client.close();
});

console.log('2222');

// tried again
await server.waitForClient((client) => {
console.log('4444');
client.close();
});

console.log('3333');
// client reported the error, meaning it wont retry anymore
expect.assertions(1);
await sub.waitForError((err) => {
Expand Down
42 changes: 21 additions & 21 deletions src/tests/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export async function startTServer(
server: httpServer,
path,
});
const server = await useServer(
const server = useServer(
{
schema,
execute,
Expand Down Expand Up @@ -180,6 +180,26 @@ export async function startTServer(
keepAlive,
);

// disposes of all started servers
const dispose: Dispose = (beNice) => {
return new Promise((resolve, reject) => {
if (!beNice) {
for (const socket of sockets) {
socket.destroy();
sockets.delete(socket);
}
}
const disposing = server.dispose() as Promise<void>;
disposing.catch(reject).then(() => {
httpServer.close(() => {
leftovers.splice(leftovers.indexOf(dispose), 1);
resolve();
});
});
});
};
leftovers.push(dispose);

// search for open port from the starting port
let tried = 0;
for (;;) {
Expand Down Expand Up @@ -220,26 +240,6 @@ export async function startTServer(
});
});

// disposes of all started servers
const dispose: Dispose = (beNice) => {
return new Promise((resolve, reject) => {
if (!beNice) {
for (const socket of sockets) {
socket.destroy();
sockets.delete(socket);
}
}
const disposing = server.dispose() as Promise<void>;
disposing.catch(reject).then(() => {
httpServer.close(() => {
leftovers.splice(leftovers.indexOf(dispose), 1);
resolve();
});
});
});
};
leftovers.push(dispose);

const addr = httpServer.address();
if (!addr || typeof addr !== 'object') {
throw new Error(`Unexpected http server address ${addr}`);
Expand Down

0 comments on commit e061d2a

Please sign in to comment.