Skip to content

Commit

Permalink
fix(websocket): browser in ws (#1145)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoseph Maguire authored Aug 24, 2020
1 parent abc7339 commit 40177ca
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 39 deletions.
2 changes: 1 addition & 1 deletion examples/ws/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

var mqtt = require('../../types')
var mqtt = require('../../')

var clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)

Expand Down
225 changes: 191 additions & 34 deletions lib/connect/ws.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
'use strict'

var WebSocket = require('ws')
var debug = require('debug')('mqttjs:ws')
var urlModule = require('url')
var WSS_OPTIONS = [
const WS = require('ws')
const debug = require('debug')('mqttjs:ws')
const duplexify = require('duplexify')
const Buffer = require('safe-buffer').Buffer
const urlModule = require('url')
const Transform = require('readable-stream').Transform

let WSS_OPTIONS = [
'rejectUnauthorized',
'ca',
'cert',
Expand All @@ -12,85 +16,238 @@ var WSS_OPTIONS = [
'passphrase'
]
// eslint-disable-next-line camelcase
var IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
function buildUrl (opts, client) {
var url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}

function setDefaultOpts (opts) {
let options = opts
if (!opts.hostname) {
opts.hostname = 'localhost'
options.hostname = 'localhost'
}
if (!opts.port) {
if (opts.protocol === 'wss') {
opts.port = 443
options.port = 443
} else {
opts.port = 80
options.port = 80
}
}
if (!opts.path) {
opts.path = '/'
options.path = '/'
}

if (!opts.wsOptions) {
opts.wsOptions = {}
options.wsOptions = {}
}
if (!IS_BROWSER && opts.protocol === 'wss') {
// Add cert/key/ca etc options
WSS_OPTIONS.forEach(function (prop) {
if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) {
opts.wsOptions[prop] = opts[prop]
options.wsOptions[prop] = opts[prop]
}
})
}

return options
}

function createWebSocket (client, opts) {
function setDefaultBrowserOpts (opts) {
let options = setDefaultOpts(opts)

if (!options.hostname) {
options.hostname = options.host
}

if (!options.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof (document) === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
}
const parsed = urlModule.parse(document.URL)
options.hostname = parsed.hostname

if (!options.port) {
options.port = parsed.port
}
}

// objectMode should be defined for logic
if (options.objectMode === undefined) {
options.objectMode = !(options.binary === true || options.binary === undefined)
}

return options
}

function createWebSocket (client, url, opts) {
debug('createWebSocket')
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
var websocketSubProtocol =
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

setDefaultOpts(opts)
var url = buildUrl(opts, client)
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
var ws = new WebSocket(url, [websocketSubProtocol], opts.wsOptions)
var duplex = WebSocket.createWebSocketStream(ws, opts.wsOptions)
duplex.url = url
return duplex
let socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
return socket
}

function createBrowserWebSocket (client, opts) {
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

let url = buildUrl(opts, client)
/* global WebSocket */
let socket = new WebSocket(url, [websocketSubProtocol])
socket.binaryType = 'arraybuffer'
return socket
}

function streamBuilder (client, opts) {
return createWebSocket(client, opts)
debug('streamBuilder')
let options = setDefaultOpts(opts)
const url = buildUrl(options, client)
let socket = createWebSocket(client, url, options)
let webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
webSocketStream.url = url
return webSocketStream
}

function browserStreamBuilder (client, opts) {
debug('browserStreamBuilder')
if (!opts.hostname) {
opts.hostname = opts.host
let stream
let options = setDefaultBrowserOpts(opts)
// sets the maximum socket buffer size before throttling
const bufferSize = options.browserBufferSize || 1024 * 512

const bufferTimeout = opts.browserBufferTimeout || 1000

const coerceToBuffer = !opts.objectMode

let socket = createBrowserWebSocket(client, opts)

let proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)

if (!opts.objectMode) {
proxy._writev = writev
}
proxy.on('close', () => { socket.close() })

if (!opts.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof (document) === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
const eventListenerSupport = (typeof socket.addEventListener === 'undefined')

// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = stream = duplexify(undefined, undefined, opts)
if (!opts.objectMode) {
stream._writev = writev
}
var parsed = urlModule.parse(document.URL)
opts.hostname = parsed.hostname

if (!opts.port) {
opts.port = parsed.port
if (eventListenerSupport) {
socket.addEventListener('open', onopen)
} else {
socket.onopen = onopen
}
}
return createWebSocket(client, opts)

stream.socket = socket

if (eventListenerSupport) {
socket.addEventListener('close', onclose)
socket.addEventListener('error', onerror)
socket.addEventListener('message', onmessage)
} else {
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
}

// methods for browserStreamBuilder

function buildProxy (options, socketWrite, socketEnd) {
let proxy = new Transform({
objectModeMode: options.objectMode
})

proxy._write = socketWrite
proxy._flush = socketEnd

return proxy
}

function onopen () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
}

function onclose () {
stream.end()
stream.destroy()
}

function onerror (err) {
stream.destroy(err)
}

function onmessage (event) {
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
}

// this is to be enabled only if objectMode is false
function writev (chunks, cb) {
const buffers = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
} else {
buffers[i] = chunks[i].chunk
}
}

this._write(Buffer.concat(buffers), 'binary', cb)
}

function socketWriteBrowser (chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
// throttle data until buffered amount is reduced.
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
}

if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}

try {
socket.send(chunk)
} catch (err) {
return next(err)
}

next()
}

function socketEndBrowser (done) {
socket.close()
done()
}

// end methods for browserStreamBuilder

return stream
}

if (IS_BROWSER) {
Expand Down
6 changes: 3 additions & 3 deletions test/browser/server.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

var handleClient
var websocket = require('websocket-stream')
var WebSocketServer = require('ws').Server
var WS = require('ws')
var WebSocketServer = WS.Server
var Connection = require('mqtt-connection')
var http = require('http')

Expand Down Expand Up @@ -109,7 +109,7 @@ function start (startPort, done) {
return ws.close()
}

stream = websocket(ws)
stream = WS.createWebSocketStream(ws)
connection = new Connection(stream)
handleClient.call(server, connection)
})
Expand Down
2 changes: 1 addition & 1 deletion test/websocket_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('Websocket Client', function () {
})
})

it('should be able transform the url (for e.g. to sign it)', function (done) {
it('should be able to transform the url (for e.g. to sign it)', function (done) {
var baseUrl = 'ws://localhost:9999/mqtt'
var sig = '?AUTH=token'
var expected = baseUrl + sig
Expand Down

0 comments on commit 40177ca

Please sign in to comment.