Skip to content

Commit

Permalink
fix: Add Web Workers as a way to run health check (#431)
Browse files Browse the repository at this point in the history
Added capacity to use Worker for heart beats. We also allow users to override the worker URL so they are able to modify it as they see fit.
  • Loading branch information
filipecabaco authored Oct 8, 2024
1 parent 176ccab commit 82e9966
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 11 deletions.
10 changes: 9 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"sinon": "^18.0.0",
"typedoc": "^0.22.16",
"typescript": "^4.0.3",
"vitest": "^2.0.5"
"vitest": "^2.0.5",
"web-worker": "1.2.0"
}
}
70 changes: 63 additions & 7 deletions src/RealtimeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export type RealtimeClientOptions = {
params?: { [key: string]: any }
log_level?: 'info' | 'debug' | 'warn' | 'error'
fetch?: Fetch
worker?: boolean
workerUrl?: string
}

export type RealtimeMessage = {
Expand Down Expand Up @@ -69,7 +71,12 @@ interface WebSocketLikeError {
}

const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined'

const WORKER_SCRIPT = `
addEventListener("message", (e) => {
if (e.data.event === "start") {
setInterval(() => postMessage({ event: "keepAlive" }), e.data.interval);
}
});`
export default class RealtimeClient {
accessToken: string | null = null
apiKey: string | null = null
Expand Down Expand Up @@ -104,6 +111,9 @@ export default class RealtimeClient {
message: [],
}
fetch: Fetch
worker?: boolean
workerUrl?: string
workerRef?: Worker

/**
* Initializes the Socket.
Expand All @@ -119,6 +129,8 @@ export default class RealtimeClient {
* @param options.encode The function to encode outgoing messages. Defaults to JSON: (payload, callback) => callback(JSON.stringify(payload))
* @param options.decode The function to decode incoming messages. Defaults to Serializer's decode.
* @param options.reconnectAfterMs he optional function that returns the millsec reconnect interval. Defaults to stepped backoff off.
* @param options.worker Use Web Worker to set a side flow. Defaults to false.
* @param options.workerUrl The URL of the worker script. Defaults to https://realtime.supabase.com/worker.js that includes a heartbeat event call to keep the connection alive.
*/
constructor(endPoint: string, options?: RealtimeClientOptions) {
this.endPoint = `${endPoint}/${TRANSPORTS.websocket}`
Expand Down Expand Up @@ -160,6 +172,13 @@ export default class RealtimeClient {
}, this.reconnectAfterMs)

this.fetch = this._resolveFetch(options?.fetch)
if (options?.worker) {
if (typeof window !== 'undefined' && !window.Worker) {
throw new Error('Web Worker is not supported')
}
this.worker = options?.worker || false
this.workerUrl = options?.workerUrl
}
}

/**
Expand Down Expand Up @@ -448,19 +467,45 @@ export default class RealtimeClient {
}

/** @internal */
private _onConnOpen() {
private async _onConnOpen() {
this.log('transport', `connected to ${this._endPointURL()}`)
this._flushSendBuffer()
this.reconnectTimer.reset()
this.heartbeatTimer && clearInterval(this.heartbeatTimer)
this.heartbeatTimer = setInterval(
() => this._sendHeartbeat(),
this.heartbeatIntervalMs
)
if (!this.worker) {
this.heartbeatTimer && clearInterval(this.heartbeatTimer)
this.heartbeatTimer = setInterval(
() => this._sendHeartbeat(),
this.heartbeatIntervalMs
)
} else {
if (this.workerUrl) {
this.log('worker', `starting worker for from ${this.workerUrl}`)
} else {
this.log('worker', `starting default worker`)
}

const objectUrl = this._workerObjectUrl(this.workerUrl!)
this.workerRef = new Worker(objectUrl)
this.workerRef.onerror = (error) => {
this.log('worker', 'worker error', error.message)
this.workerRef!.terminate()
}
this.workerRef.onmessage = (event) => {
if (event.data.event === 'keepAlive') {
this._sendHeartbeat()
}
}
this.workerRef.postMessage({
event: 'start',
interval: this.heartbeatIntervalMs,
})
}

this.stateChangeCallbacks.open.forEach((callback) => callback())!
}

/** @internal */

private _onConnClose(event: any) {
this.log('transport', 'close', event)
this._triggerChanError()
Expand Down Expand Up @@ -527,6 +572,17 @@ export default class RealtimeClient {
})
this.setAuth(this.accessToken)
}

private _workerObjectUrl(url: string | undefined): string {
let result_url: string
if (url) {
result_url = url
} else {
const blob = new Blob([WORKER_SCRIPT], { type: 'application/javascript' })
result_url = URL.createObjectURL(blob)
}
return result_url
}
}

class WSWebSocketDummy {
Expand Down
42 changes: 41 additions & 1 deletion test/channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import assert from 'assert'
import sinon from 'sinon'
import { describe, beforeEach, afterEach, test } from 'vitest'
import {
describe,
beforeEach,
afterEach,
test,
beforeAll,
afterAll,
} from 'vitest'

import RealtimeClient from '../src/RealtimeClient'
import RealtimeChannel from '../src/RealtimeChannel'
import { Response } from '@supabase/node-fetch'
import { WebSocketServer } from 'ws'
import Worker from 'web-worker'

let channel, socket
const defaultRef = '1'
Expand Down Expand Up @@ -1416,3 +1425,34 @@ describe('trigger', () => {
assert.equal(client.accessToken, '123')
})
})

describe('worker', () => {
let client
let mockServer

beforeAll(() => {
window.Worker = Worker
mockServer = new WebSocketServer({ port: 8080 })
})

afterAll(() => {
window.Worker = undefined
mockServer.close()
})

beforeEach(() => {
client = new RealtimeClient('ws://localhost:8080/socket', {
worker: true,
workerUrl: 'https://realtime.supabase.com/worker.js',
heartbeatIntervalMs: 10,
})
})

test('sets worker flag', () => {
assert.ok(client.worker)
})

test('sets worker URL', () => {
assert.equal(client.workerUrl, 'https://realtime.supabase.com/worker.js')
})
})
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
"esModuleInterop": true,
"moduleResolution": "Node",
"forceConsistentCasingInFileNames": true,
"stripInternal": true
"stripInternal": true,
}
}

0 comments on commit 82e9966

Please sign in to comment.