Skip to content

Commit

Permalink
feat: add heartbeat mechanism (#14)
Browse files Browse the repository at this point in the history
* feat: add heartbeat mechanism

* feat: add heartbeat to core

* test: fix ts

* test: remove timeReceived

* test: add timeout for await message
  • Loading branch information
melishev authored Feb 10, 2024
1 parent 8e14d4b commit ff6d8c5
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 50 deletions.
27 changes: 27 additions & 0 deletions src/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { send } from './send'

const heartbeatMessage = 'ping'
const heartbeatInterval = 1000

let heartbeatTimeout: ReturnType<typeof setTimeout> | undefined

export function heartbeatStart(ws: WebSocket): void {
heartbeatStop()

heartbeatTimeout = setTimeout(() => {
send(heartbeatMessage, heartbeatMessage, ws)
}, heartbeatInterval)
}

export function heartbeatStop(): void {
clearTimeout(heartbeatTimeout)
heartbeatTimeout = undefined
}

export function listenHeartbeat(ws: WebSocket, e: MessageEvent<any>): void {
if (e.data === heartbeatMessage) {
heartbeatStart(ws)
// eslint-disable-next-line no-useless-return
return
}
}
70 changes: 45 additions & 25 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,47 @@
import { type WSGOConfig, type WSGOSubscriptions } from './types'
import { type RemoveFirstFromTuple } from './types/utils'
import { type WSGOEventName, type WSGOConfig, type WSGOSubscriptions } from './types'

import { type WSGOSubscribeCallback } from './subscribe'

import { send } from './send'
import { subscribe } from './subscribe'
import { heartbeat } from './utils'
import { heartbeatStart, heartbeatStop, listenHeartbeat } from './heartbeat'

/** Method allows you create new WebSocket connection */
export default function create(
url: string,
config?: WSGOConfig,
config: Partial<WSGOConfig> = {},
): {
ws: WebSocket | undefined
// status: 'OPEN' | 'CLOSED' | 'CONNECTING'

open: () => void
close: () => void
send: (eventName: Parameters<typeof send>[0], data?: Parameters<typeof send>[1]) => ReturnType<typeof send>
subscribe: <T>(...args: RemoveFirstFromTuple<Parameters<typeof subscribe<T>>>) => ReturnType<typeof subscribe<T>>
send: (eventName: WSGOEventName, data?: Parameters<typeof send>[1]) => void
subscribe: <T>(eventName: WSGOEventName, callback: WSGOSubscribeCallback<T>) => void
} {
let ws: WebSocket | undefined
const subscriptions: WSGOSubscriptions = {}

if (config?.immediate ?? true) {
ws = open(url)
const _config = config as WSGOConfig
for (const option of ['debugging', 'immediate', 'heartbeat'] as Array<keyof typeof config>) {
if (_config[option] !== undefined) continue

if (option === 'debugging') {
_config[option] = false
continue
}

if (option === 'immediate' || option === 'heartbeat') {
_config[option] = true
continue
}
}

if (_config.immediate) {
ws = open(url, _config)

if (ws !== undefined) {
_listen(ws, subscriptions)
_listen(ws, subscriptions, _config)
}
}

Expand All @@ -34,66 +50,69 @@ export default function create(
return ws
},
open: () => {
ws = open(url)
ws = open(url, _config)

if (ws !== undefined) {
_listen(ws, subscriptions)
_listen(ws, subscriptions, _config)
}
},
close: () => {
close(ws)
},
send: (...args) => {
send(...args, ws, config)
send(...args, ws, _config)
},
subscribe: (...args) => {
subscribe(subscriptions, ...args)
},
}
}

function open(url?: string): WebSocket | undefined {
if (url === undefined) return

function open(url: string, _config: WSGOConfig): WebSocket {
// close()

const ws = new WebSocket(url)
// initialize heartbeat interval
heartbeat(ws)

// if (config.heartbeat) {
// heartbeatStart(ws)
// }

return ws
}

function _listen(ws: WebSocket, subscriptions: WSGOSubscriptions, config?: WSGOConfig): void {
// TODO: если добавится логика, то можно оставить
function _listen(ws: WebSocket, subscriptions: WSGOSubscriptions, _config: WSGOConfig): void {
ws.onopen = (ev) => {
config?.onConnected?.(ws, ev)
_config.onConnected?.(ws, ev)

heartbeatStart(ws)
}

ws.onclose = (ev) => {
config?.onDisconnected?.(ws, ev)
_config.onDisconnected?.(ws, ev)

heartbeatStop()
}

ws.onerror = (ev) => {
config?.onError?.(ws, ev)
_config.onError?.(ws, ev)
}

ws.onmessage = (e: MessageEvent<any>): any => {
if (e.data === 'pong') return
listenHeartbeat(ws, e)

let message

try {
message = JSON.parse(e.data)
} catch (e) {
if (config?.debugging ?? false) {
if (_config.debugging) {
console.error(e)
}

return
}

if (config?.debugging ?? false) {
if (_config.debugging) {
if (message.event === 'exception') {
console.error(message.data)
} else {
Expand All @@ -112,6 +131,7 @@ function close(ws?: WebSocket, ...[code = 1000, reason]: Parameters<WebSocket['c
if (ws === undefined) return

// stop heartbeat interval
heartbeatStop()

// close websocket connection
ws.close(code, reason)
Expand Down
2 changes: 1 addition & 1 deletion src/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type WSGOSubscriptions } from './types'

type WSGOSubscribeCallback<T> = (message: WSGOSubscribeResponse<T>) => any
export type WSGOSubscribeCallback<T> = (message: WSGOSubscribeResponse<T>) => any

export interface WSGOSubscribeResponse<T = any> {
/** Event name */
Expand Down
7 changes: 5 additions & 2 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
export type WSGOSubscriptions = Record<string, (message: any) => any>

export type WSGOEventName = string

export interface WSGOConfig {
onConnected?: (ws: WebSocket, event: Event) => void
onDisconnected?: (ws: WebSocket, event: CloseEvent) => void
onError?: (ws: WebSocket, event: Event) => void

debugging?: boolean
immediate?: boolean
debugging: boolean
immediate: boolean
heartbeat: boolean
}

export type WSGOHeartbeat =
Expand Down
22 changes: 0 additions & 22 deletions src/utils.ts

This file was deleted.

57 changes: 57 additions & 0 deletions test/heartbeat.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { describe, it, expect, vi, beforeAll, afterAll } from 'vitest'
import WSGO from '../src/index'
import ws from 'ws'
import { createMockWSServer } from './utils'

describe('open', () => {
const date = new Date(2000, 1, 1)

let port: number = 0
let server: ws.Server

beforeAll(() => {
const mockWSServer = createMockWSServer(port)

server = mockWSServer.server
port = mockWSServer.port
})

afterAll(() => {
server.close()
})

it('should send a ping event and receive a pong response', async () => {
const eventName = 'pong'

let event: any

// Arrange
const wsgo = WSGO(`ws://localhost:${port}`)
await vi.waitFor(() => {
vi.setSystemTime(date)
if (wsgo.ws?.readyState !== window.WebSocket.OPEN) {
throw new Error()
}
})

// Act
wsgo.subscribe(eventName, (e) => (event = e))
await vi.waitFor(
() => {
vi.setSystemTime(date)
if (event === undefined) {
throw new Error('Message not received back')
}
},
{
timeout: 5000,
interval: 250,
},
)

// Assert
expect(event).toStrictEqual({ event: eventName, timeSended: Date.now() })
})

it.todo('must close the connection if no response is received from the server')
})
28 changes: 28 additions & 0 deletions test/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import ws from 'ws'

export function createMockWSServer(port: number = 0): { server: ws.WebSocketServer; port: number } {
let server = new ws.WebSocketServer({ port })

server.on('connection', (ws) => {
ws.on('message', (data, isBinary) => {
const parsedData = isBinary ? data : (JSON.parse(data.toString()) as any)

if (parsedData.event === 'ping') {
const message = { event: 'pong', timeSended: Date.now() }
ws.send(JSON.stringify(message))
}

const message = { ...data, timeSended: Date.now() }

// setTimeout(() => {
// ws.send(JSON.stringify(message))
// }, 1000)
ws.send(JSON.stringify(message))
})
})

return {
server,
port: (server.address() as ws.AddressInfo).port,
}
}

0 comments on commit ff6d8c5

Please sign in to comment.