Skip to content

Commit

Permalink
Implement WebSocketServer Adapter (parse-community#5866)
Browse files Browse the repository at this point in the history
* Implement WebSocketServerAdapter

* lint

* clean up
  • Loading branch information
dplewis authored Jul 30, 2019
1 parent 4859895 commit 510464f
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 544 deletions.
24 changes: 12 additions & 12 deletions spec/ParseWebSocket.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@ describe('ParseWebSocket', function() {
expect(parseWebSocket.ws).toBe(ws);
});

it('can handle events defined in typeMap', function() {
it('can handle disconnect event', function(done) {
const ws = {
on: jasmine.createSpy('on'),
onclose: () => {},
};
const callback = {};
const parseWebSocket = new ParseWebSocket(ws);
parseWebSocket.on('disconnect', callback);

expect(parseWebSocket.ws.on).toHaveBeenCalledWith('close', callback);
parseWebSocket.on('disconnect', () => {
done();
});
ws.onclose();
});

it('can handle events which are not defined in typeMap', function() {
it('can handle message event', function(done) {
const ws = {
on: jasmine.createSpy('on'),
onmessage: () => {},
};
const callback = {};
const parseWebSocket = new ParseWebSocket(ws);
parseWebSocket.on('open', callback);

expect(parseWebSocket.ws.on).toHaveBeenCalledWith('open', callback);
parseWebSocket.on('message', () => {
done();
});
ws.onmessage();
});

it('can send a message', function() {
Expand Down
7 changes: 3 additions & 4 deletions spec/ParseWebSocketServer.spec.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const ParseWebSocketServer = require('../lib/LiveQuery/ParseWebSocketServer')
.ParseWebSocketServer;
const { ParseWebSocketServer } = require('../lib/LiveQuery/ParseWebSocketServer');

describe('ParseWebSocketServer', function() {
beforeEach(function(done) {
Expand All @@ -19,14 +18,14 @@ describe('ParseWebSocketServer', function() {
const parseWebSocketServer = new ParseWebSocketServer(
server,
onConnectCallback,
5
{ websocketTimeout: 5 }
).server;
const ws = {
readyState: 0,
OPEN: 0,
ping: jasmine.createSpy('ping'),
};
parseWebSocketServer.emit('connection', ws);
parseWebSocketServer.onConnection(ws);

// Make sure callback is called
expect(onConnectCallback).toHaveBeenCalled();
Expand Down
22 changes: 22 additions & 0 deletions src/Adapters/WebSocketServer/WSAdapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*eslint no-unused-vars: "off"*/
import { WSSAdapter } from './WSSAdapter';
const WebSocketServer = require('ws').Server;

/**
* Wrapper for ws node module
*/
export class WSAdapter extends WSSAdapter {
constructor(options: any) {
super(options);
const wss = new WebSocketServer({ server: options.server });
wss.on('listening', this.onListen);
wss.on('connection', this.onConnection);
}

onListen() {}
onConnection(ws) {}
start() {}
close() {}
}

export default WSAdapter;
52 changes: 52 additions & 0 deletions src/Adapters/WebSocketServer/WSSAdapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*eslint no-unused-vars: "off"*/
// WebSocketServer Adapter
//
// Adapter classes must implement the following functions:
// * onListen()
// * onConnection(ws)
// * start()
// * close()
//
// Default is WSAdapter. The above functions will be binded.

/**
* @module Adapters
*/
/**
* @interface WSSAdapter
*/
export class WSSAdapter {
/**
* @param {Object} options - {http.Server|https.Server} server
*/
constructor(options) {
this.onListen = () => {}
this.onConnection = () => {}
}

// /**
// * Emitted when the underlying server has been bound.
// */
// onListen() {}

// /**
// * Emitted when the handshake is complete.
// *
// * @param {WebSocket} ws - RFC 6455 WebSocket.
// */
// onConnection(ws) {}

/**
* Initialize Connection.
*
* @param {Object} options
*/
start(options) {}

/**
* Closes server.
*/
close() {}
}

export default WSSAdapter;
2 changes: 1 addition & 1 deletion src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ParseLiveQueryServer {
this.parseWebSocketServer = new ParseWebSocketServer(
server,
parseWebsocket => this._onConnect(parseWebsocket),
config.websocketTimeout
config
);

// Initialize subscriber
Expand Down
45 changes: 26 additions & 19 deletions src/LiveQuery/ParseWebSocketServer.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { loadAdapter } from '../Adapters/AdapterLoader';
import { WSAdapter } from '../Adapters/WebSocketServer/WSAdapter';
import logger from '../logger';

const typeMap = new Map([['disconnect', 'close']]);
const getWS = function(){
return require('ws');
};
import events from 'events';

export class ParseWebSocketServer {
server: Object;

constructor(
server: any,
onConnect: Function,
websocketTimeout: number = 10 * 1000
config
) {
const WebSocketServer = getWS().Server;
const wss = new WebSocketServer({ server: server });
wss.on('listening', () => {
config.server = server;
const wss = loadAdapter(
config.wssAdapter,
WSAdapter,
config,
);
wss.onListen = () => {
logger.info('Parse LiveQuery Server starts running');
});
wss.on('connection', ws => {
};
wss.onConnection = (ws) => {
onConnect(new ParseWebSocket(ws));
// Send ping to client periodically
const pingIntervalId = setInterval(() => {
Expand All @@ -27,24 +29,29 @@ export class ParseWebSocketServer {
} else {
clearInterval(pingIntervalId);
}
}, websocketTimeout);
});
}, config.websocketTimeout || 10 * 1000);
};
wss.start();
this.server = wss;
}

close() {
if (this.server && this.server.close) {
this.server.close();
}
}
}

export class ParseWebSocket {
export class ParseWebSocket extends events.EventEmitter {
ws: any;

constructor(ws: any) {
super();
ws.onmessage = (request) => this.emit('message', request);
ws.onclose = () => this.emit('disconnect');
this.ws = ws;
}

on(type: string, callback): void {
const wsType = typeMap.has(type) ? typeMap.get(type) : type;
this.ws.on(wsType, callback);
}

send(message: any): void {
this.ws.send(message);
}
Expand Down
Loading

0 comments on commit 510464f

Please sign in to comment.