Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom websocket support #1696

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,28 @@ use the latest auth token, you must have some outside mechanism running that
handles application-level authentication refreshing so that the websocket
connection can simply grab the latest valid token or signed url.

#### Customize Websockets with `createWebsocket` (Websocket Only)

When you need to add a custom websocket subprotocol or header to open a connection
through a proxy with custom authentication this callback allows you to create your own
instance of a websocket which will be used in the mqtt client.

```js
const createWebsocket = createWebsocket(url, websocketSubProtocols, options) => {
const subProtocols = [
websocketSubProtocols[0],
'myCustomSubprotocolOrOAuthToken',
]
return new WebSocket(url, subProtocols)
}

const connection = await mqtt.connectAsync(<wss url>, {
...,
createWebsocket: createWebsocket,
});
```


#### Enabling Reconnection with `reconnectPeriod` option

To ensure that the mqtt client automatically tries to reconnect when the
Expand Down Expand Up @@ -429,6 +451,8 @@ The arguments are:
- `transformWsUrl` : optional `(url, options, client) => url` function
For ws/wss protocols only. Can be used to implement signing
urls which upon reconnect can have become expired.
- `createWebsocket` : optional `url, websocketSubProtocols, options) => Websocket` function
For ws/wss protocols only. Can be used to implement a custom websocket subprotocol or implementation.
- `resubscribe` : if connection is broken and reconnects,
subscribed topics are automatically subscribed again (default `true`)
- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided.
Expand Down
7 changes: 7 additions & 0 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ export interface IClientOptions extends ISecureClientOptions {
client: MqttClient,
) => string

/** when defined this function will be called to create the Websocket instance, used to add custom protocols or websocket implementations */
createWebsocket?: (
url: string,
websocketSubProtocols: string[],
options: IClientOptions,
) => any

/** Custom message id provider */
messageIdProvider?: IMessageIdProvider

Expand Down
22 changes: 16 additions & 6 deletions src/lib/connect/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ function createWebSocket(
debug(
`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`,
)
const socket = new WS(
url,
[websocketSubProtocol],
opts.wsOptions as ClientOptions,
)
let socket: WS
if (opts.createWebsocket) {
socket = opts.createWebsocket(url, [websocketSubProtocol], opts)
} else {
socket = new WS(
url,
[websocketSubProtocol],
opts.wsOptions as ClientOptions,
)
}
return socket
}

Expand All @@ -123,7 +128,12 @@ function createBrowserWebSocket(client: MqttClient, opts: IClientOptions) {
: 'mqtt'

const url = buildUrl(opts, client)
const socket = new WebSocket(url, [websocketSubProtocol])
let socket: WebSocket
if (opts.createWebsocket) {
socket = opts.createWebsocket(url, [websocketSubProtocol], opts)
} else {
socket = new WebSocket(url, [websocketSubProtocol])
}
socket.binaryType = 'arraybuffer'
return socket
}
Expand Down
34 changes: 33 additions & 1 deletion test/websocket_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import { IClientOptions } from '../src/lib/client'

const port = 9999
const httpServer = http.createServer()

let lastProcotols = new Set<string>()
function attachWebsocketServer(httpServer2) {
const webSocketServer = new WebSocket.Server({
server: httpServer2,
handleProtocols: (protocols: Set<string>, request: any) => {
lastProcotols = protocols
return [...protocols][0]
},
perMessageDeflate: false,
})

Expand Down Expand Up @@ -132,6 +136,34 @@ describe('Websocket Client', () => {
})
})

it('should be able to create custom Websocket instance', function test(done) {
const baseUrl = 'ws://localhost:9999/mqtt'
let urlInCallback: string
const opts = makeOptions({
path: '/mqtt',
createWebsocket(
url: string,
websocketSubProtocols: string[],
options: IClientOptions,
) {
urlInCallback = url
assert.equal(url, baseUrl)
const subProtocols = [
websocketSubProtocols[0],
'myCustomSubprotocol',
]
return new WebSocket(url, subProtocols)
},
})
const client = mqtt.connect(opts)
client.on('connect', () => {
assert.equal((client.stream as any).url, urlInCallback)
assert.equal(baseUrl, urlInCallback)
assert.equal('myCustomSubprotocol', [...lastProcotols][1])
client.end(true, (err) => done(err))
})
})

it('should use mqttv3.1 as the protocol if using v3.1', function test(done) {
httpServer.once('client', (client) => {
assert.strictEqual(client.protocol, 'mqttv3.1')
Expand Down