From 82e996685da8ecb75f327d7faf52096b5953c3c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 8 Oct 2024 21:44:58 +0100 Subject: [PATCH] fix: Add Web Workers as a way to run health check (#431) Added capacity to use Worker for heart beats. We also allow users to override the worker URL so they are able to modify it as they see fit. --- package-lock.json | 10 ++++++- package.json | 3 +- src/RealtimeClient.ts | 70 ++++++++++++++++++++++++++++++++++++++----- test/channel.test.ts | 42 +++++++++++++++++++++++++- tsconfig.json | 2 +- 5 files changed, 116 insertions(+), 11 deletions(-) diff --git a/package-lock.json b/package-lock.json index d231a87..add57a8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30,7 +30,8 @@ "sinon": "^18.0.0", "typedoc": "^0.22.16", "typescript": "^4.0.3", - "vitest": "^2.0.5" + "vitest": "^2.0.5", + "web-worker": "1.2.0" } }, "node_modules/@ampproject/remapping": { @@ -7247,6 +7248,13 @@ "node": ">=10" } }, + "node_modules/web-worker": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/web-worker/-/web-worker-1.2.0.tgz", + "integrity": "sha512-PgF341avzqyx60neE9DD+XS26MMNMoUQRz9NOZwW32nPQrF6p77f1htcnjBSEV8BGMKZ16choqUG4hyI0Hx7mA==", + "dev": true, + "license": "Apache-2.0" + }, "node_modules/webidl-conversions": { "version": "6.1.0", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-6.1.0.tgz", diff --git a/package.json b/package.json index 3b78a5d..24831cb 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "sinon": "^18.0.0", "typedoc": "^0.22.16", "typescript": "^4.0.3", - "vitest": "^2.0.5" + "vitest": "^2.0.5", + "web-worker": "1.2.0" } } \ No newline at end of file diff --git a/src/RealtimeClient.ts b/src/RealtimeClient.ts index 6e0bc49..842ee5b 100755 --- a/src/RealtimeClient.ts +++ b/src/RealtimeClient.ts @@ -38,6 +38,8 @@ export type RealtimeClientOptions = { params?: { [key: string]: any } log_level?: 'info' | 'debug' | 'warn' | 'error' fetch?: Fetch + worker?: boolean + workerUrl?: string } export type RealtimeMessage = { @@ -69,7 +71,12 @@ interface WebSocketLikeError { } const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined' - +const WORKER_SCRIPT = ` + addEventListener("message", (e) => { + if (e.data.event === "start") { + setInterval(() => postMessage({ event: "keepAlive" }), e.data.interval); + } + });` export default class RealtimeClient { accessToken: string | null = null apiKey: string | null = null @@ -104,6 +111,9 @@ export default class RealtimeClient { message: [], } fetch: Fetch + worker?: boolean + workerUrl?: string + workerRef?: Worker /** * Initializes the Socket. @@ -119,6 +129,8 @@ export default class RealtimeClient { * @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload)) * @param options.decode The function to decode incoming messages. Defaults to Serializer's decode. * @param options.reconnectAfterMs he optional function that returns the millsec reconnect interval. Defaults to stepped backoff off. + * @param options.worker Use Web Worker to set a side flow. Defaults to false. + * @param options.workerUrl The URL of the worker script. Defaults to https://realtime.supabase.com/worker.js that includes a heartbeat event call to keep the connection alive. */ constructor(endPoint: string, options?: RealtimeClientOptions) { this.endPoint = `${endPoint}/${TRANSPORTS.websocket}` @@ -160,6 +172,13 @@ export default class RealtimeClient { }, this.reconnectAfterMs) this.fetch = this._resolveFetch(options?.fetch) + if (options?.worker) { + if (typeof window !== 'undefined' && !window.Worker) { + throw new Error('Web Worker is not supported') + } + this.worker = options?.worker || false + this.workerUrl = options?.workerUrl + } } /** @@ -448,19 +467,45 @@ export default class RealtimeClient { } /** @internal */ - private _onConnOpen() { + private async _onConnOpen() { this.log('transport', `connected to ${this._endPointURL()}`) this._flushSendBuffer() this.reconnectTimer.reset() - this.heartbeatTimer && clearInterval(this.heartbeatTimer) - this.heartbeatTimer = setInterval( - () => this._sendHeartbeat(), - this.heartbeatIntervalMs - ) + if (!this.worker) { + this.heartbeatTimer && clearInterval(this.heartbeatTimer) + this.heartbeatTimer = setInterval( + () => this._sendHeartbeat(), + this.heartbeatIntervalMs + ) + } else { + if (this.workerUrl) { + this.log('worker', `starting worker for from ${this.workerUrl}`) + } else { + this.log('worker', `starting default worker`) + } + + const objectUrl = this._workerObjectUrl(this.workerUrl!) + this.workerRef = new Worker(objectUrl) + this.workerRef.onerror = (error) => { + this.log('worker', 'worker error', error.message) + this.workerRef!.terminate() + } + this.workerRef.onmessage = (event) => { + if (event.data.event === 'keepAlive') { + this._sendHeartbeat() + } + } + this.workerRef.postMessage({ + event: 'start', + interval: this.heartbeatIntervalMs, + }) + } + this.stateChangeCallbacks.open.forEach((callback) => callback())! } /** @internal */ + private _onConnClose(event: any) { this.log('transport', 'close', event) this._triggerChanError() @@ -527,6 +572,17 @@ export default class RealtimeClient { }) this.setAuth(this.accessToken) } + + private _workerObjectUrl(url: string | undefined): string { + let result_url: string + if (url) { + result_url = url + } else { + const blob = new Blob([WORKER_SCRIPT], { type: 'application/javascript' }) + result_url = URL.createObjectURL(blob) + } + return result_url + } } class WSWebSocketDummy { diff --git a/test/channel.test.ts b/test/channel.test.ts index 1b6a6ee..bb3c93a 100755 --- a/test/channel.test.ts +++ b/test/channel.test.ts @@ -1,10 +1,19 @@ import assert from 'assert' import sinon from 'sinon' -import { describe, beforeEach, afterEach, test } from 'vitest' +import { + describe, + beforeEach, + afterEach, + test, + beforeAll, + afterAll, +} from 'vitest' import RealtimeClient from '../src/RealtimeClient' import RealtimeChannel from '../src/RealtimeChannel' import { Response } from '@supabase/node-fetch' +import { WebSocketServer } from 'ws' +import Worker from 'web-worker' let channel, socket const defaultRef = '1' @@ -1416,3 +1425,34 @@ describe('trigger', () => { assert.equal(client.accessToken, '123') }) }) + +describe('worker', () => { + let client + let mockServer + + beforeAll(() => { + window.Worker = Worker + mockServer = new WebSocketServer({ port: 8080 }) + }) + + afterAll(() => { + window.Worker = undefined + mockServer.close() + }) + + beforeEach(() => { + client = new RealtimeClient('ws://localhost:8080/socket', { + worker: true, + workerUrl: 'https://realtime.supabase.com/worker.js', + heartbeatIntervalMs: 10, + }) + }) + + test('sets worker flag', () => { + assert.ok(client.worker) + }) + + test('sets worker URL', () => { + assert.equal(client.workerUrl, 'https://realtime.supabase.com/worker.js') + }) +}) diff --git a/tsconfig.json b/tsconfig.json index 5a1b5e0..1b4cd1c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,6 +12,6 @@ "esModuleInterop": true, "moduleResolution": "Node", "forceConsistentCasingInFileNames": true, - "stripInternal": true + "stripInternal": true, } }