Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add server ping #120

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
"ws": "^8.4.0"
},
"devDependencies": {
"@types/sinon": "^17.0.3",
"aegir": "^44.0.1",
"delay": "^6.0.0",
"it-all": "^3.0.1",
Expand All @@ -207,6 +208,7 @@
"it-ndjson": "^1.0.0",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.0",
"sinon": "^18.0.0",
"wherearewe": "^2.0.1",
"wsurl": "^1.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export { default as duplex } from './duplex.js'
export { default as source } from './source.js'
export { default as sink } from './sink.js'
export { createServer } from './server.js'
export { createServer, type WebSocketServer } from './server.js'
export { connect } from './client.js'
42 changes: 40 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@ import duplex, { type DuplexWebSocket } from './duplex.js'
import type WebSocket from './web-socket.js'
import type { VerifyClientCallbackSync, VerifyClientCallbackAsync, AddressInfo } from 'ws'

export interface ClientWebSocket extends WebSocket {
alive?: boolean
}

export interface ServerOptions {
key?: string
cert?: string
server?: http.Server | https.Server
verifyClient?: VerifyClientCallbackAsync | VerifyClientCallbackSync
onConnection?(connection: DuplexWebSocket): void

/**
* If specified, send a PING to every connected client, if
* they do not respond with a PONG before the next interval,
* terminate the connection
*/
heartbeatMs?: number
}

export interface WebSocketServer extends EventEmitter {
Expand All @@ -23,6 +34,8 @@ export interface WebSocketServer extends EventEmitter {
class Server extends EventEmitter {
private readonly server: http.Server | https.Server
private readonly wsServer: WSServer
private readonly heartbeatMs?: number
private heartbeatInterval?: ReturnType<typeof setInterval>

constructor (server: http.Server | https.Server, opts?: ServerOptions) {
super()
Expand All @@ -34,9 +47,26 @@ class Server extends EventEmitter {
verifyClient: opts.verifyClient
})
this.wsServer.on('connection', this.onWsServerConnection.bind(this))
this.heartbeatMs = opts?.heartbeatMs
}

async listen (addrInfo: { port: number } | number): Promise<WebSocketServer> {
if (this.heartbeatMs != null) {
this.heartbeatInterval = setInterval(() => {
this.wsServer.clients.forEach((client: ClientWebSocket) => {
// the client did not send a pong since the last heartbeat so
// terminate the connection
if (client.alive === false) {
client.terminate()
return
}

client.alive = false
client.ping()
})
}, this.heartbeatMs)
}

return new Promise<WebSocketServer>((resolve, reject) => {
this.wsServer.once('error', (e) => { reject(e) })
this.wsServer.once('listening', () => { resolve(this) })
Expand All @@ -45,6 +75,10 @@ class Server extends EventEmitter {
}

async close (): Promise<void> {
if (this.heartbeatInterval != null) {
clearInterval(this.heartbeatInterval)
}

await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err != null) {
Expand All @@ -60,7 +94,7 @@ class Server extends EventEmitter {
return this.server.address()
}

onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void {
onWsServerConnection (socket: ClientWebSocket, req: http.IncomingMessage): void {
let addr: string | AddressInfo | null

try {
Expand All @@ -83,6 +117,10 @@ class Server extends EventEmitter {
return
}

socket.on('pong', () => {
socket.alive = true
})

const stream: DuplexWebSocket = {
...duplex(socket, {
remoteAddress: req.socket.remoteAddress,
Expand All @@ -100,7 +138,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer {
opts = opts ?? {}

const server = opts.server ?? (opts.key != null && opts.cert != null ? https.createServer(opts) : http.createServer())
const wss = new Server(server)
const wss = new Server(server, opts)

if (opts.onConnection != null) {
wss.on('connection', opts.onConnection)
Expand Down
56 changes: 56 additions & 0 deletions test/server-ping.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { expect } from 'aegir/chai'
import delay from 'delay'
import Sinon from 'sinon'
import { isNode, isElectronMain } from 'wherearewe'
import * as WS from '../src/index.js'
import WebSocket from '../src/web-socket.js'

describe('ping', () => {
if (!(isNode || isElectronMain)) {
return
}

let server: WS.WebSocketServer
let client: WebSocket

afterEach(async () => {
if (client != null) {
client.close()
}

if (server != null) {
await server.close()
}
})

it('server should ping connected clients', async () => {
server = WS.createServer({
heartbeatMs: 10
})
await server.listen(55214)

client = new WebSocket('http://127.0.0.1:55214')
const pongSpy = Sinon.spy(client, 'pong')

await delay(200)

expect(client).to.have.property('readyState', WebSocket.OPEN)
expect(pongSpy).to.have.property('called', true)
})

it('server should disconnected unresponsive clients', async () => {
server = WS.createServer({
heartbeatMs: 10
})
await server.listen(55214)

client = new WebSocket('http://127.0.0.1:55214')

// make sure the client will not respond to a ping
client.pong = () => {}

await delay(200)

expect(client).to.have.property('readyState', WebSocket.CLOSED)
})
})
Loading