Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

Commit

Permalink
feat(socket): use round robin for endpoints when the connection fails (
Browse files Browse the repository at this point in the history
…#92)

* feat(socket): use round robin for endpoints when the connection fails

* address feedback & reconnect on 1xxx codes
  • Loading branch information
EthanWaite authored and ProbablePrime committed Jan 26, 2018
1 parent 00a823f commit 264abaa
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 29 deletions.
18 changes: 9 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/Client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ setWebSocket(WebSocket);
const port = process.env.SERVER_PORT || 1339;

describe('client', () => {
const url = `ws://127.0.0.1:${port}/`;
const urls = [`ws://127.0.0.1:${port}/`];
let client: Client;
let server: WebSocket.Server;
let ws: WebSocket;
Expand All @@ -24,7 +24,7 @@ describe('client', () => {
}

const socketOptions = {
url,
urls,
};
function createClient(): Client {
return new Client(ClientType.GameClient);
Expand Down
2 changes: 1 addition & 1 deletion src/GameClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class GameClient extends Client {
.then(endpoints => {
return super.open({
authToken: options.authToken,
url: endpoints[0].address,
urls: endpoints.map(({ address }) => address),
extraHeaders: extraHeaders,
});
});
Expand Down
2 changes: 1 addition & 1 deletion src/ParticipantClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class ParticipantClient extends Client {

public open(options: IParticipantOptions): Promise<this> {
return super.open({
url: options.url,
urls: [options.url],
reconnectChecker: options.reconnectChecker,
queryParams: {
'x-protocol-version': '2.0',
Expand Down
30 changes: 26 additions & 4 deletions src/wire/Socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('socket', () => {
let server: WebSocketModule.Server;
let socket: InteractiveSocket;

const url = `ws://127.0.0.1:${port}/`;
const urls = [`ws://127.0.0.1:${port}/`];

beforeEach(ready => {
server = new WebSocketModule.Server({ port }, ready);
Expand All @@ -45,7 +45,7 @@ describe('socket', () => {

describe('connecting', () => {
it('connects with no auth', done => {
socket = new InteractiveSocket({ url }).connect();
socket = new InteractiveSocket({ urls }).connect();
server.on('connection', (ws: WebSocketModule) => {
expect(ws.upgradeReq.url).to.equal('/');
expect(ws.upgradeReq.headers.authorization).to.equal(
Expand All @@ -59,7 +59,7 @@ describe('socket', () => {

it('connects with an OAuth token', done => {
socket = new InteractiveSocket({
url,
urls,
authToken: 'asdf!',
}).connect();
server.on('connection', (ws: WebSocketModule) => {
Expand Down Expand Up @@ -126,7 +126,7 @@ describe('socket', () => {
checker = sinon.stub();
checker.resolves();
socket = new InteractiveSocket({
url,
urls,
pingInterval: 100,
replyTimeout: 50,
}).connect();
Expand Down Expand Up @@ -176,6 +176,28 @@ describe('socket', () => {
});
});

it('reconnects to the next server on disconnection', done => {
socket.setOptions({ urls: [...urls, `ws://127.0.0.1:${port + 1}/`] });

// Connect to the first server.
socket.once('open', () => {
const fallbackServer = new WebSocketModule.Server({ port: port + 1 }, () => {
closeNormal(ws);

// Connect to the second server.
fallbackServer.once('connection', (ws2: WebSocketModule) => {
closeNormal(ws2);

// Connect to the first server again.
awaitConnect((ws3: WebSocketModule) => {
closeNormal(ws3);
fallbackServer.close(done);
});
});
});
});
});

it('respects closing the socket during a reconnection', done => {
greet();
resolveOn(socket, 'method')
Expand Down
26 changes: 14 additions & 12 deletions src/wire/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import {
IReconnectionPolicy,
} from './reconnection';

/**
* Close codes that are deemed to be recoverable by the reconnection policy
*/
export const recoverableCloseCodes = [1000, 1011];

//We don't support lz4 due to time constraints right now
export type CompressionScheme = 'none' | 'gzip';

Expand All @@ -27,8 +22,8 @@ export interface ISocketOptions {
reconnectionPolicy?: IReconnectionPolicy;
autoReconnect?: boolean;

// Websocket URL to connect to, defaults to <TODO>
url?: string;
// Array of possible websocket URLs to connect to.
urls?: string[];

//compression scheme, defaults to none, Will remain none until pako typings are updated
compressionScheme?: CompressionScheme;
Expand Down Expand Up @@ -92,7 +87,7 @@ export enum SocketState {

function getDefaults(): ISocketOptions {
return {
url: '',
urls: [],
replyTimeout: 10000,
compressionScheme: 'none',
autoReconnect: true,
Expand All @@ -119,6 +114,7 @@ export class InteractiveSocket extends EventEmitter {
private socket: any;
private queue: Set<Packet> = new Set<Packet>();
private lastSequenceNumber = 0;
private endpointIndex = 0;

constructor(options: ISocketOptions = {}) {
super();
Expand All @@ -143,9 +139,10 @@ export class InteractiveSocket extends EventEmitter {
});

this.on('close', (evt: ICloseEvent) => {
// If this close event's code is not within our recoverable code array
// We raise it as an error and refuse to connect.
if (recoverableCloseCodes.indexOf(evt.code) === -1) {
// If this close event's code is an application error (e.g. bad authentication)
// or invalid status code (for Edge), we raise it as an error and refuse to
// reconnect.
if (evt.code < 1000 || evt.code > 1999 || evt.code === 1005) {
const err = InteractiveError.fromSocketMessage({
code: evt.code,
message: evt.reason,
Expand Down Expand Up @@ -213,7 +210,7 @@ export class InteractiveSocket extends EventEmitter {
headers,
};

const url = Url.parse(this.options.url, true);
const url = Url.parse(this.getURL(), true);
// Clear out search so it populates query using the query
// https://nodejs.org/api/url.html#url_url_format_urlobject
url.search = null;
Expand Down Expand Up @@ -369,6 +366,11 @@ export class InteractiveSocket extends EventEmitter {
this.socket.send(payload);
}

private getURL(): string {
const addresses = this.options.urls;
return this.options.urls[this.endpointIndex++ % addresses.length];
}

private extractMessage(packet: string | Buffer) {
let messageString: string;
messageString = <string>packet;
Expand Down

0 comments on commit 264abaa

Please sign in to comment.