diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..4c6598f Binary files /dev/null and b/.DS_Store differ diff --git a/.eslintrc.json b/.eslintrc.json deleted file mode 100644 index e3578aa..0000000 --- a/.eslintrc.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "extends": "standard" -} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b625466 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,12 @@ +{ + "editor.codeActionsOnSave": { + "source.organizeImports.biome": "explicit", + "quickfix.biome": "explicit" + }, + "[javascript]": { + "editor.defaultFormatter": "biomejs.biome" + }, + "[json]": { + "editor.defaultFormatter": "biomejs.biome" + } +} diff --git a/biome.json b/biome.json new file mode 100644 index 0000000..f1637b0 --- /dev/null +++ b/biome.json @@ -0,0 +1,94 @@ +{ + "$schema": "https://biomejs.dev/schemas/1.7.1/schema.json", + "files": { + "ignore": ["tests/fastify/**", "types/**", "dist/**"] + }, + "organizeImports": { "enabled": true }, + "formatter": { + "enabled": true + }, + "linter": { + "ignore": ["example.js"], + "enabled": true, + "rules": { + "recommended": false, + "complexity": { + "noExtraBooleanCast": "error", + "noMultipleSpacesInRegularExpressionLiterals": "error", + "noUselessCatch": "error", + "noUselessConstructor": "error", + "noUselessLoneBlockStatements": "error", + "noUselessRename": "error", + "noUselessTernary": "error", + "noVoid": "error", + "noWith": "error", + "useLiteralKeys": "error", + "useRegexLiterals": "error" + }, + "correctness": { + "noConstAssign": "error", + "noConstantCondition": "error", + "noEmptyCharacterClassInRegex": "error", + "noEmptyPattern": "error", + "noGlobalObjectCalls": "error", + "noInvalidConstructorSuper": "error", + "noInvalidUseBeforeDeclaration": "error", + "noNewSymbol": "error", + "noPrecisionLoss": "error", + "noSelfAssign": "error", + "noSwitchDeclarations": "error", + "noUndeclaredVariables": "error", + "noUnreachable": "error", + "noUnreachableSuper": "error", + "noUnsafeFinally": "error", + "noUnusedVariables": "error", + "useIsNan": "error" + }, + "security": { "noGlobalEval": "error" }, + "style": { + "noCommaOperator": "error", + "noVar": "warn", + "useBlockStatements": "off", + "useConst": "error", + "useSingleVarDeclarator": "error" + }, + "suspicious": { + "noAssignInExpressions": "error", + "noAsyncPromiseExecutor": "error", + "noCatchAssign": "error", + "noClassAssign": "error", + "noCompareNegZero": "error", + "noConfusingLabels": "error", + "noControlCharactersInRegex": "error", + "noDebugger": "error", + "noDoubleEquals": "error", + "noDuplicateCase": "error", + "noDuplicateClassMembers": "error", + "noDuplicateObjectKeys": "error", + "noDuplicateParameters": "error", + "noEmptyBlockStatements": "error", + "noFallthroughSwitchClause": "error", + "noFunctionAssign": "error", + "noGlobalAssign": "error", + "noImportAssign": "error", + "noMisleadingCharacterClass": "error", + "noPrototypeBuiltins": "error", + "noRedeclare": "error", + "noSelfCompare": "error", + "noShadowRestrictedNames": "error", + "noUnsafeNegation": "error", + "useDefaultSwitchClauseLast": "error", + "useValidTypeof": "error" + } + } + }, + "javascript": { + "globals": ["document", "navigator", "window"], + "formatter": { + "bracketSpacing": true, + "indentStyle": "space", + "quoteStyle": "single", + "semicolons": "asNeeded" + } + } +} diff --git a/example.js b/example.js index 6b1c3e3..a055bad 100644 --- a/example.js +++ b/example.js @@ -1,25 +1,24 @@ import fastify from 'fastify' -import { serverFactory, getUws, WebSocketStream } from './src/server.js' +import { WebSocketStream, getUws, serverFactory } from './src/server.js' import fastifyUwsPlugin from './src/plugin.js' const app = fastify({ - serverFactory + serverFactory, }) await app.register(fastifyUwsPlugin) app.addHook('onReady', async () => { // access to uws app - // eslint-disable-next-line no-unused-vars const uwsApp = getUws(app) }) -app.websocketServer.on('open', ws => { +app.websocketServer.on('open', (ws) => { console.log('OPEN') }) -app.websocketServer.on('close', ws => { +app.websocketServer.on('close', (ws) => { console.log('CLOSE') }) @@ -27,32 +26,32 @@ app .route({ method: 'GET', url: '/', - handler (req, reply) { + handler(req, reply) { return 'hello from http endpoint' }, uws: { // cache subscription topics to produce less memory allocations - topics: [ - 'home/sensors/ligth', - 'home/sensors/temp' - ] + topics: ['home/sensors/ligth', 'home/sensors/temp'], }, - uwsHandler (conn) { + uwsHandler(conn) { conn.subscribe('home/sensors/temp') conn.on('message', (message) => { conn.publish('home/sensors/temp', 'random message') }) conn.send(JSON.stringify({ hello: 'world' })) - } + }, }) .get('/stream', { uws: true }, (conn) => { const stream = new WebSocketStream(conn) - stream.on('data', data => { + stream.on('data', (data) => { console.log('stream data from /stream') }) }) - .listen({ - port: 3000 - }, (err) => { - err && console.error(err) - }) + .listen( + { + port: 3000, + }, + (err) => { + err && console.error(err) + }, + ) diff --git a/package.json b/package.json index 12faefd..71c0ba4 100644 --- a/package.json +++ b/package.json @@ -1,93 +1,80 @@ { - "name": "@geut/fastify-uws", - "version": "3.1.1", - "description": "uWebSockets.js for fastify", - "type": "module", - "exports": { - ".": { - "types": "./types/server.d.ts", - "import": "./src/server.js", - "require": "./dist/server.cjs" - }, - "./plugin": { - "types": "./types/plugin.d.ts", - "import": "./src/plugin.js", - "require": "./dist/plugin.cjs" - }, - "./package.json": "./package.json" - }, - "workspaces": [ - "tests/fastify/module" - ], - "files": [ - "types", - "dist", - "src" - ], - "scripts": { - "start": "node index.js", - "build": "rm -rf dist && tsup src/server.js src/plugin.js --splitting", - "test": "uvu --ignore tests/fastify", - "posttest": "npm run lint && tsc", - "lint": "standard", - "prepublishOnly": "npm test && npm run build && npm run types", - "types": "node scripts/generate-dts.js" - }, - "dependencies": { - "fastify-plugin": "^4.5.1", - "fastq": "^1.13.0", - "ipaddr.js": "^2.0.1", - "nanoerror": "^2.0.0", - "streamx": "^2.12.5", - "tempy": "^1.0.1", - "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.42.0" - }, - "devDependencies": { - "@types/events": "^3.0.2", - "@types/node": "^20.8.10", - "@types/streamx": "^2.9.3", - "execa": "^8.0.1", - "fastify": "^4.24.3", - "require-inject": "^1.4.4", - "simple-get": "^4.0.1", - "standard": "^17.0.0", - "tap": "^16.3.0", - "tsup": "^7.2.0", - "typescript": "^5.2.2", - "uvu": "^0.5.3", - "ws": "^8.9.0" - }, - "standard": { - "env": [ - "node", - "browser" - ], - "ignore": [ - "tests/fastify/**" - ] - }, - "repository": { - "type": "git", - "url": "git+https://github.com/geut/fastify-uws.git" - }, - "keywords": [ - "fastify", - "uWebSockets.js", - "fastify-plugin" - ], - "author": { - "name": "GEUT", - "email": "contact@geutstudio.com" - }, - "license": "MIT", - "bugs": { - "url": "https://github.com/geut/fastify-uws/issues" - }, - "homepage": "https://github.com/geut/fastify-uws#readme", - "publishConfig": { - "access": "public" - }, - "engines": { - "node": ">=18" - } + "name": "@geut/fastify-uws", + "version": "3.1.1", + "description": "uWebSockets.js for fastify", + "type": "module", + "exports": { + ".": { + "types": "./types/server.d.ts", + "import": "./src/server.js", + "require": "./dist/server.cjs" + }, + "./plugin": { + "types": "./types/plugin.d.ts", + "import": "./src/plugin.js", + "require": "./dist/plugin.cjs" + }, + "./package.json": "./package.json" + }, + "workspaces": ["tests/fastify/module"], + "files": ["types", "dist", "src"], + "scripts": { + "start": "node index.js", + "build": "rm -rf dist && tsup src/server.js src/plugin.js --splitting", + "test": "uvu --ignore tests/fastify", + "posttest": "npm run lint && tsc", + "lint": "biome check .", + "lint:fix": "biome check --apply .", + "prepublishOnly": "npm test && npm run build && npm run types", + "types": "node scripts/generate-dts.js" + }, + "dependencies": { + "fastify-plugin": "^4.5.1", + "fastq": "^1.13.0", + "ipaddr.js": "^2.0.1", + "nanoerror": "^2.0.0", + "streamx": "^2.12.5", + "tempy": "^1.0.1", + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.42.0" + }, + "devDependencies": { + "@biomejs/biome": "^1.7.3", + "@fastify/static": "^7.0.3", + "@types/events": "^3.0.2", + "@types/node": "^20.8.10", + "@types/streamx": "^2.9.3", + "execa": "^8.0.1", + "fastify": "^4.24.3", + "require-inject": "^1.4.4", + "simple-get": "^4.0.1", + "tap": "^16.3.0", + "tsup": "^7.2.0", + "typescript": "^5.2.2", + "uvu": "^0.5.3", + "ws": "^8.9.0" + }, + "standard": { + "env": ["node", "browser"], + "ignore": ["tests/fastify/**"] + }, + "repository": { + "type": "git", + "url": "git+https://github.com/geut/fastify-uws.git" + }, + "keywords": ["fastify", "uWebSockets.js", "fastify-plugin"], + "author": { + "name": "GEUT", + "email": "contact@geutstudio.com" + }, + "license": "MIT", + "bugs": { + "url": "https://github.com/geut/fastify-uws/issues" + }, + "homepage": "https://github.com/geut/fastify-uws#readme", + "publishConfig": { + "access": "public" + }, + "engines": { + "node": ">=18" + } } diff --git a/scripts/generate-dts.js b/scripts/generate-dts.js index 7fe8b22..97db0c5 100644 --- a/scripts/generate-dts.js +++ b/scripts/generate-dts.js @@ -6,5 +6,5 @@ await $`tsc src/server.js src/plugin.js --declaration --allowJs --emitDeclaratio const types = fs.readFileSync('types/plugin.d.ts', 'utf-8') fs.writeFileSync( 'types/plugin.d.ts', - types + 'import "./fastify-overload.d.ts"' + types + 'import "./fastify-overload.d.ts"', ) diff --git a/src/errors.js b/src/errors.js index ed361f0..79f44e4 100644 --- a/src/errors.js +++ b/src/errors.js @@ -1,9 +1,30 @@ import nanoerror from 'nanoerror' -export const ERR_HEAD_SET = nanoerror('ERR_HEAD_SET', 'Cannot set headers after they are sent to the client') -export const ERR_ADDRINUSE = nanoerror('EADDRINUSE', 'listen EADDRINUSE: address already in use %s:%s') -export const ERR_UPGRADE = nanoerror('ERR_UPGRADE', 'Cannot upgrade to WebSocket protocol %o') -export const ERR_STREAM_DESTROYED = nanoerror('ERR_STREAM_DESTROYED', 'Stream destroyed') -export const ERR_UWS_APP_NOT_FOUND = nanoerror('ERR_UWS_APP_NOT_FOUND', 'uWebSockets app not found') -export const ERR_ENOTFOUND = nanoerror('ERR_ENOTFOUND', 'getaddrinfo ENOTFOUND %s') -export const ERR_SOCKET_BAD_PORT = nanoerror('ERR_SOCKET_BAD_PORT', 'RangeError [ERR_SOCKET_BAD_PORT]: options.port should be >= 0 and < 65536. Received (%s)') +export const ERR_HEAD_SET = nanoerror( + 'ERR_HEAD_SET', + 'Cannot set headers after they are sent to the client', +) +export const ERR_ADDRINUSE = nanoerror( + 'EADDRINUSE', + 'listen EADDRINUSE: address already in use %s:%s', +) +export const ERR_UPGRADE = nanoerror( + 'ERR_UPGRADE', + 'Cannot upgrade to WebSocket protocol %o', +) +export const ERR_STREAM_DESTROYED = nanoerror( + 'ERR_STREAM_DESTROYED', + 'Stream destroyed', +) +export const ERR_UWS_APP_NOT_FOUND = nanoerror( + 'ERR_UWS_APP_NOT_FOUND', + 'uWebSockets app not found', +) +export const ERR_ENOTFOUND = nanoerror( + 'ERR_ENOTFOUND', + 'getaddrinfo ENOTFOUND %s', +) +export const ERR_SOCKET_BAD_PORT = nanoerror( + 'ERR_SOCKET_BAD_PORT', + 'RangeError [ERR_SOCKET_BAD_PORT]: options.port should be >= 0 and < 65536. Received (%s)', +) diff --git a/src/http-socket.js b/src/http-socket.js index bbe9107..1a47e5c 100644 --- a/src/http-socket.js +++ b/src/http-socket.js @@ -2,76 +2,55 @@ import EventEmitter from 'events' import fastq from 'fastq' import { - kRes, - kHttps, - kServer, kAddress, - kRemoteAdress, kEncoding, - kTimeoutRef, + kHead, + kHttps, + kQueue, kReadyState, + kRemoteAdress, + kRes, + kServer, + kTimeoutRef, + kUwsRemoteAddress, kWriteOnly, kWs, - kUwsRemoteAddress, - kQueue, - kHead } from './symbols.js' import { ERR_STREAM_DESTROYED } from './errors.js' -const localAddressIpv6 = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]) +const localAddressIpv6 = Buffer.from([ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, +]) const toHex = (buf, start, end) => buf.slice(start, end).toString('hex') +// biome-ignore lint/suspicious/noEmptyBlockStatements: noop const noop = () => {} /** * @this {HTTPSocket} */ -function onAbort () { +function onAbort() { this.aborted = true this.emit('aborted') this.errored && this.emit('error', this.errored) this.emit('close') } -function onDrain () { - this.emit('drain') +function onDrain(offset) { + this.emit('drain', offset) return true } -function onTimeout () { +function onTimeout() { if (!this.destroyed) { this.emit('timeout') this.abort() } } -/** - * @this {HTTPSocket} - */ -function onWrite (data, cb) { - const res = this[kRes] - - this[kReadyState].write = true - - res.cork(() => { - if (this[kHead]) { - writeHead(res, this[kHead]) - this[kHead] = null - } - - const drained = res.write(getChunk(data)) - if (drained) { - this.bytesWritten += byteLength(data) - return cb() - } - - drain(this, res, data, cb) - }) -} - -function end (socket, data) { +function end(socket, data) { socket._clearTimeout() const res = socket[kRes] @@ -88,7 +67,7 @@ function end (socket, data) { }) } -function drain (socket, res, data, cb) { +function drain(socket, cb) { socket.writableNeedDrain = true let done = false @@ -102,24 +81,17 @@ function drain (socket, res, data, cb) { const onDrain = () => { if (done) return - - res.cork(() => { - done = res.write(getChunk(data)) - if (done) { - socket.writableNeedDrain = false - socket.bytesWritten += byteLength(data) - socket.removeListener('close', onClose) - socket.removeListener('drain', onDrain) - cb() - } - }) + socket.writableNeedDrain = false + socket.removeListener('close', onClose) + socket.removeListener('drain', onDrain) + cb() } socket.on('drain', onDrain) socket.once('close', onClose) } -function writeHead (res, head) { +function writeHead(res, head) { if (head.status) res.writeStatus(head.status) if (head.headers) { for (const header of head.headers.values()) { @@ -128,18 +100,19 @@ function writeHead (res, head) { } } -function byteLength (data) { - if (data.byteLength !== undefined) return data.byteLength +function byteLength(data) { + if (data?.empty) return 0 + if (data?.byteLength !== undefined) return data.byteLength return Buffer.byteLength(data) } -function getChunk (data) { - if (data.chunk) return data.chunk +function getChunk(data) { + if (data?.chunk) return data.chunk return data } export class HTTPSocket extends EventEmitter { - constructor (server, res, writeOnly) { + constructor(server, res, writeOnly) { super() this.aborted = false @@ -153,7 +126,7 @@ export class HTTPSocket extends EventEmitter { this[kWriteOnly] = writeOnly this[kReadyState] = { read: false, - write: false + write: false, } this[kEncoding] = null this[kRemoteAdress] = null @@ -169,7 +142,7 @@ export class HTTPSocket extends EventEmitter { } } - get readyState () { + get readyState() { const state = this[kReadyState] if (state.read && !state.write) return 'readOnly' if (!state.read && state.write) return 'writeOnly' @@ -177,19 +150,19 @@ export class HTTPSocket extends EventEmitter { return 'opening' } - get writable () { + get writable() { return true } - get readable () { + get readable() { return true } - get encrypted () { + get encrypted() { return !!this[kServer][kHttps] } - get remoteAddress () { + get remoteAddress() { let remoteAddress = this[kRemoteAdress] if (remoteAddress) return remoteAddress @@ -199,13 +172,23 @@ export class HTTPSocket extends EventEmitter { } if (buf.length === 4) { - remoteAddress = `${buf.readUInt8(0)}.${buf.readUInt8(1)}.${buf.readUInt8(2)}.${buf.readUInt8(3)}` + remoteAddress = `${buf.readUInt8(0)}.${buf.readUInt8(1)}.${buf.readUInt8( + 2, + )}.${buf.readUInt8(3)}` } else { // avoid to call toHex if local if (buf.equals(localAddressIpv6)) { remoteAddress = '::1' } else { - remoteAddress = `${toHex(buf, 0, 2)}:${toHex(buf, 2, 4)}:${toHex(buf, 4, 6)}:${toHex(buf, 6, 8)}:${toHex(buf, 8, 10)}:${toHex(buf, 10, 12)}:${toHex(buf, 12, 14)}:${toHex(buf, 14)}` + remoteAddress = `${toHex(buf, 0, 2)}:${toHex(buf, 2, 4)}:${toHex( + buf, + 4, + 6, + )}:${toHex(buf, 6, 8)}:${toHex(buf, 8, 10)}:${toHex( + buf, + 10, + 12, + )}:${toHex(buf, 12, 14)}:${toHex(buf, 14)}` } } @@ -213,7 +196,7 @@ export class HTTPSocket extends EventEmitter { return remoteAddress } - get remoteFamily () { + get remoteFamily() { if (!this[kUwsRemoteAddress]) { this[kUwsRemoteAddress] = Buffer.from(this[kRes].getRemoteAddress()) } @@ -221,15 +204,15 @@ export class HTTPSocket extends EventEmitter { return this[kUwsRemoteAddress].length === 4 ? 'IPv4' : 'IPv6' } - get destroyed () { + get destroyed() { return this.writableEnded || this.aborted } - address () { + address() { return { ...this[kServer][kAddress] } } - abort () { + abort() { if (this.aborted) return this.aborted = true this[kQueue] && this[kQueue].kill() @@ -238,18 +221,18 @@ export class HTTPSocket extends EventEmitter { } } - setEncoding (encoding) { + setEncoding(encoding) { this[kEncoding] = encoding } - destroy (err) { + destroy(err) { if (this.aborted) return this._clearTimeout() this.errored = err this.abort() } - onRead (cb) { + onRead(cb) { if (this[kWriteOnly] || this.aborted) return cb(null, null) let done = false @@ -282,7 +265,7 @@ export class HTTPSocket extends EventEmitter { } } - end (data, _, cb = noop) { + end(data, _, cb = noop) { if (this.aborted) throw new ERR_STREAM_DESTROYED() if (!data) return this.abort() @@ -300,18 +283,37 @@ export class HTTPSocket extends EventEmitter { queue.push(data, cb) } - write (data, _, cb = noop) { + write(data, _, cb = noop) { if (this.destroyed) throw new ERR_STREAM_DESTROYED() if (!this[kQueue]) { - this[kQueue] = fastq(this, onWrite, 1) + this[kQueue] = fastq(this, this._onWrite, 1) } this[kQueue].push(data, cb) return !this.writableNeedDrain } - _clearTimeout () { + _clearTimeout() { this[kTimeoutRef] && clearTimeout(this[kTimeoutRef]) } + + _onWrite(data, cb) { + const res = this[kRes] + + this[kReadyState].write = true + + res.cork(() => { + if (this[kHead]) { + writeHead(res, this[kHead]) + this[kHead] = null + } + + const drained = res.write(getChunk(data)) + this.bytesWritten += byteLength(data) + + if (drained) return cb() + drain(this, cb) + }) + } } diff --git a/src/plugin.js b/src/plugin.js index f7a938f..abe269d 100644 --- a/src/plugin.js +++ b/src/plugin.js @@ -7,12 +7,9 @@ /** @typedef {import('./request.js').Request} Request */ import fp from 'fastify-plugin' -import { WebSocketServer, WebSocket } from './websocket-server.js' +import { WebSocket, WebSocketServer } from './websocket-server.js' -import { - kWs, - kRes -} from './symbols.js' +import { kRes, kWs } from './symbols.js' /** * @this {import('fastify').FastifyInstance} @@ -20,13 +17,14 @@ import { * @param {WebSocket} conn * @param {import('fastify').FastifyRequest} request */ -function defaultErrorHandler (err, conn, request) { +// biome-ignore lint/correctness/noUnusedVariables: ignore +function defaultErrorHandler(err, conn, request) { request.log.error(err) request.raw.destroy(err) } /** @type {FastifyPluginCallback<{ errorHandler?: typeof defaultErrorHandler } & WSOptions>} */ -function fastifyUws (fastify, opts, next) { +function fastifyUws(fastify, opts, next) { const { server } = fastify const { errorHandler = defaultErrorHandler, ...options } = opts || {} @@ -34,17 +32,20 @@ function fastifyUws (fastify, opts, next) { return next(new Error('invalid errorHandler function')) } - const websocketServer = server[kWs] = new WebSocketServer(options) + const websocketServer = new WebSocketServer(options) + server[kWs] = websocketServer fastify.decorate('websocketServer', websocketServer) - fastify.addHook('onRoute', routeOptions => { + fastify.addHook('onRoute', (routeOptions) => { const isWebSocket = !!routeOptions.uws || routeOptions.uwsHandler if (!isWebSocket || routeOptions.method !== 'GET') return - const wsOptions = typeof routeOptions.uws === 'object' ? routeOptions.uws : {} + const wsOptions = + typeof routeOptions.uws === 'object' ? routeOptions.uws : {} - let httpHandler, uwsHandler + let httpHandler + let uwsHandler if (routeOptions.uwsHandler) { httpHandler = routeOptions.handler uwsHandler = routeOptions.uwsHandler @@ -56,53 +57,59 @@ function fastifyUws (fastify, opts, next) { const topics = {} if (wsOptions.topics) { - wsOptions.topics.forEach(topic => { + wsOptions.topics.forEach((topic) => { topics[topic] = WebSocket.allocTopic(namespace, topic) }) } routeOptions.handler = function (request, reply) { - const requestRaw = /** @type {Request} */(/** @type {unknown} */(request.raw)) + const requestRaw = /** @type {Request} */ ( + /** @type {unknown} */ (request.raw) + ) if (requestRaw[kWs]) { reply.hijack() const uRes = requestRaw.socket[kRes] requestRaw.socket[kWs] = true if (requestRaw.socket.aborted || requestRaw.socket.destroyed) return - uRes.upgrade({ - req: requestRaw, - handler: (ws) => { - request.uws = true - - const conn = new WebSocket(namespace, ws, topics) - let result - try { - request.log.info('fastify-uws: websocket connection opened') - conn.once('close', () => { - request.log.info('fastify-uws: websocket connection closed') - }) - - requestRaw.once('error', () => { - conn.close() - }) - - requestRaw.once('close', () => { - conn.end() - }) - - result = uwsHandler.call(this, conn, request) - } catch (err) { - return errorHandler.call(this, err, conn, request) - } - - if (result && typeof result.catch === 'function') { - result.catch(err => errorHandler.call(this, err, conn, request)) - } - } - }, - requestRaw.headers['sec-websocket-key'], - requestRaw.headers['sec-websocket-protocol'], - requestRaw.headers['sec-websocket-extensions'], - requestRaw[kWs]) + uRes.upgrade( + { + req: requestRaw, + handler: (ws) => { + request.uws = true + + const conn = new WebSocket(namespace, ws, topics) + let result + try { + request.log.info('fastify-uws: websocket connection opened') + conn.once('close', () => { + request.log.info('fastify-uws: websocket connection closed') + }) + + requestRaw.once('error', () => { + conn.close() + }) + + requestRaw.once('close', () => { + conn.end() + }) + + result = uwsHandler.call(this, conn, request) + } catch (err) { + return errorHandler.call(this, err, conn, request) + } + + if (result && typeof result.catch === 'function') { + result.catch((err) => + errorHandler.call(this, err, conn, request), + ) + } + }, + }, + requestRaw.headers['sec-websocket-key'], + requestRaw.headers['sec-websocket-protocol'], + requestRaw.headers['sec-websocket-extensions'], + requestRaw[kWs], + ) } else { return httpHandler.call(this, request, reply) } @@ -115,5 +122,5 @@ function fastifyUws (fastify, opts, next) { /** @type {typeof fastifyUws} */ export default fp(fastifyUws, { fastify: '>= 4.0.0', - name: 'fastify-uws' + name: 'fastify-uws', }) diff --git a/src/request.js b/src/request.js index 5876601..4d0bb76 100644 --- a/src/request.js +++ b/src/request.js @@ -1,14 +1,15 @@ import { Readable } from 'streamx' -import { kReq, kHeaders, kUrl } from './symbols.js' +import { kHeaders, kReq, kUrl } from './symbols.js' +// biome-ignore lint/suspicious/noEmptyBlockStatements: noop const noop = () => {} -function onAbort () { +function onAbort() { this.emit('aborted') } export class Request extends Readable { - constructor (req, socket, method) { + constructor(req, socket, method) { super() this.socket = socket @@ -26,23 +27,24 @@ export class Request extends Readable { socket.once('aborted', onAbort.bind(this)) } - get aborted () { + get aborted() { return this.socket.aborted } - get url () { + get url() { let url = this[kUrl] if (url) return url const query = this[kReq].getQuery() - url = this[kUrl] = this[kReq].getUrl() + (query && query.length > 0 ? `?${query}` : '') + url = this[kUrl] = + this[kReq].getUrl() + (query && query.length > 0 ? `?${query}` : '') return url } - set url (url) { + set url(url) { this[kUrl] = url } - get headers () { + get headers() { let headers = this[kHeaders] if (headers) return headers headers = this[kHeaders] = {} @@ -52,20 +54,20 @@ export class Request extends Readable { return headers } - setEncoding (encoding) { + setEncoding(encoding) { this.socket.setEncoding(encoding) } - setTimeout (timeout) { + setTimeout(timeout) { this.socket.setTimeout(timeout) } - destroy (err) { + destroy(err) { if (this.destroyed || this.destroying) return this.socket.destroy(err) } - _read (cb) { + _read(cb) { if (this.destroyed || this.destroying || this.socket.destroyed) return cb() this.socket.onRead((err, data) => { diff --git a/src/response.js b/src/response.js index 110eed1..4d8d1db 100644 --- a/src/response.js +++ b/src/response.js @@ -3,10 +3,10 @@ import { STATUS_CODES } from 'http' import { Writable } from 'streamx' import { ERR_HEAD_SET, ERR_STREAM_DESTROYED } from './errors.js' -import { kHeaders, kHead } from './symbols.js' +import { kHead, kHeaders } from './symbols.js' class Header { - constructor (name, value) { + constructor(name, value) { this.name = name this.value = String(value) } @@ -14,7 +14,7 @@ class Header { const EMPTY = Buffer.alloc(0) class HTTPResponse { - constructor (chunk, end = false) { + constructor(chunk, end = false) { this.chunk = chunk || EMPTY this.empty = !chunk this.end = end @@ -22,20 +22,21 @@ class HTTPResponse { } } -function onAbort () { +function onAbort() { this.emit('aborted') } +// biome-ignore lint/suspicious/noEmptyBlockStatements: noop const noop = () => {} const options = { - byteLength (data) { + byteLength(data) { return data.byteLength - } + }, } export class Response extends Writable { - constructor (socket) { + constructor(socket) { super(options) this.socket = socket @@ -55,31 +56,33 @@ export class Response extends Writable { socket.once('aborted', onAbort.bind(this)) } - get aborted () { + get aborted() { return this.socket.aborted } - get finished () { + get finished() { return this.socket.writableEnded && !this.socket.aborted } - get status () { - return `${this.statusCode} ${this.statusMessage || STATUS_CODES[this.statusCode]}` + get status() { + return `${this.statusCode} ${ + this.statusMessage || STATUS_CODES[this.statusCode] + }` } - get bytesWritten () { + get bytesWritten() { return this.socket.bytesWritten } - hasHeader (name) { + hasHeader(name) { return this[kHeaders].has(name.toLowerCase()) } - getHeader (name) { + getHeader(name) { return this[kHeaders].get(name.toLowerCase())?.value } - getHeaders () { + getHeaders() { const headers = {} this[kHeaders].forEach((header, key) => { headers[key] = header.value @@ -87,7 +90,7 @@ export class Response extends Writable { return headers } - setHeader (name, value) { + setHeader(name, value) { if (this.headersSent) throw new ERR_HEAD_SET() const key = name.toLowerCase() @@ -105,13 +108,13 @@ export class Response extends Writable { this[kHeaders].set(key, new Header(name, value)) } - removeHeader (name) { + removeHeader(name) { if (this.headersSent) throw new ERR_HEAD_SET() this[kHeaders].delete(name.toLowerCase()) } - writeHead (statusCode, statusMessage, headers) { + writeHead(statusCode, statusMessage, headers) { if (this.headersSent) throw new ERR_HEAD_SET() this.statusCode = statusCode @@ -123,25 +126,25 @@ export class Response extends Writable { } if (headers) { - Object.keys(headers).forEach(key => { + Object.keys(headers).forEach((key) => { this.setHeader(key, headers[key]) }) } } - end (data) { + end(data) { if (this.aborted) return if (this.destroyed) throw new ERR_STREAM_DESTROYED() this.writableEnded = true return super.end(new HTTPResponse(data, true)) } - destroy (err) { + destroy(err) { if (this.destroyed || this.destroying || this.aborted) return this.socket.destroy(err) } - write (data) { + write(data) { if (this.aborted) return if (this.destroyed) throw new ERR_STREAM_DESTROYED() @@ -149,7 +152,11 @@ export class Response extends Writable { data = new HTTPResponse(data) // fast end - if (this.firstChunk && this.contentLength !== null && this.contentLength === data.byteLength) { + if ( + this.firstChunk && + this.contentLength !== null && + this.contentLength === data.byteLength + ) { data.end = true this.writableEnded = true super.end(data) @@ -160,14 +167,14 @@ export class Response extends Writable { return super.write(data) } - _write (data, cb) { + _write(data, cb) { if (this.aborted) return cb() if (!this.headersSent) { this.headersSent = true this.socket[kHead] = { headers: this[kHeaders], - status: this.status + status: this.status, } } @@ -179,7 +186,7 @@ export class Response extends Writable { this.socket.write(data, null, cb) } - _destroy (cb) { + _destroy(cb) { if (this.socket.destroyed) return cb() this.socket.once('close', cb) } diff --git a/src/server.js b/src/server.js index f9058ff..4bb3e23 100644 --- a/src/server.js +++ b/src/server.js @@ -17,31 +17,39 @@ * @typedef {import('fastify').FastifyServerFactory} FastifyServerFactory */ +import assert from 'assert' import EventEmitter from 'events' import { writeFileSync } from 'fs' -import assert from 'assert' import dns from 'dns/promises' -import uws from 'uWebSockets.js' import ipaddr from 'ipaddr.js' import tempy from 'tempy' +import uws from 'uWebSockets.js' -import { ERR_ADDRINUSE, ERR_UWS_APP_NOT_FOUND, ERR_ENOTFOUND, ERR_SOCKET_BAD_PORT } from './errors.js' +import { + ERR_ADDRINUSE, + ERR_ENOTFOUND, + ERR_SOCKET_BAD_PORT, + ERR_UWS_APP_NOT_FOUND, +} from './errors.js' import { HTTPSocket } from './http-socket.js' import { Request } from './request.js' import { Response } from './response.js' import { - kHttps, - kHandler, kAddress, - kListenSocket, - kListen, kApp, kClosed, - kWs + kHandler, + kHttps, + kListen, + kListenSocket, + kWs, } from './symbols.js' -function createApp (https) { +// biome-ignore lint/suspicious/noEmptyBlockStatements: noop +const noop = () => {} + +function createApp(https) { if (!https) return uws.App() if (!https.key) return uws.SSLApp(https) const keyFile = tempy.file() @@ -51,7 +59,7 @@ function createApp (https) { return uws.SSLApp({ key_file_name: keyFile, cert_file_name: certFile, - passphrase: https.passphrase + passphrase: https.passphrase, }) } @@ -62,12 +70,15 @@ export class Server extends EventEmitter { * @param {(req: Request, res: Response) => void} handler * @param {ServerOptions} opts */ - constructor (handler, opts = {}) { + constructor(handler, opts = {}) { super() const { connectionTimeout = 0, https = false } = opts - assert(!https || typeof https === 'object', 'https must be a valid object { key: string, cert: string } or follow the uws.AppOptions') + assert( + !https || typeof https === 'object', + 'https must be a valid object { key: string, cert: string } or follow the uws.AppOptions', + ) this[kHandler] = handler this.timeout = connectionTimeout @@ -81,21 +92,21 @@ export class Server extends EventEmitter { } /** @type {boolean} */ - get encrypted () { + get encrypted() { return !!this[kHttps] } /** * @param {number} timeout */ - setTimeout (timeout) { + setTimeout(timeout) { this.timeout = timeout } /** * @returns {{ address: string, port: number }} */ - address () { + address() { return this[kAddress] } @@ -104,10 +115,10 @@ export class Server extends EventEmitter { * @param {{ host: string, port: number }} listenOptions * @param {() => void} cb */ - listen (listenOptions, cb) { + listen(listenOptions, cb) { this[kListen](listenOptions) .then(() => cb && cb()) - .catch(err => { + .catch((err) => { this[kAddress] = null process.nextTick(() => this.emit('error', err)) }) @@ -116,7 +127,7 @@ export class Server extends EventEmitter { /** * @param {() => void} [cb] */ - close (cb = () => {}) { + close(cb = noop) { if (this[kClosed]) return cb() const port = this[kAddress]?.port if (port !== undefined && mainServer[port] === this) { @@ -129,7 +140,7 @@ export class Server extends EventEmitter { this[kListenSocket] = null } if (this[kWs]) { - this[kWs].connections.forEach(conn => conn.close()) + this[kWs].connections.forEach((conn) => conn.close()) } setTimeout(() => { this.emit('close') @@ -137,36 +148,43 @@ export class Server extends EventEmitter { }, 1) } - ref () {} - unref () {} + ref = noop + unref = noop - async [kListen] ({ port, host }) { + async [kListen]({ port, host }) { if (port !== undefined && port !== null && Number.isNaN(Number(port))) { throw new ERR_SOCKET_BAD_PORT(port) } - port = (port === undefined || port === null) ? 0 : Number(port) + port = port === undefined || port === null ? 0 : Number(port) const lookupAddress = await dns.lookup(host) this[kAddress] = { ...lookupAddress, - port + port, } - if (this[kAddress].address.startsWith('[')) throw new ERR_ENOTFOUND(this[kAddress].address) + if (this[kAddress].address.startsWith('[')) + throw new ERR_ENOTFOUND(this[kAddress].address) const parsedAddress = ipaddr.parse(this[kAddress].address) const longAddress = parsedAddress.toNormalizedString() const app = this[kApp] - const onRequest = method => (res, req) => { - const socket = new HTTPSocket(this, res, method === 'GET' || method === 'HEAD') + const onRequest = (method) => (res, req) => { + const socket = new HTTPSocket( + this, + res, + method === 'GET' || method === 'HEAD', + ) const request = new Request(req, socket, method) const response = new Response(socket) if (request.headers.upgrade) { + if (this[kWs]) return this.emit('upgrade', request, socket) + return } this[kHandler](request, response) } @@ -192,7 +210,8 @@ export class Server extends EventEmitter { return new Promise((resolve, reject) => { app.listen(longAddress, port, (listenSocket) => { - if (!listenSocket) return reject(new ERR_ADDRINUSE(this[kAddress].address, port)) + if (!listenSocket) + return reject(new ERR_ADDRINUSE(this[kAddress].address, port)) this[kListenSocket] = listenSocket port = this[kAddress].port = uws.us_socket_local_port(listenSocket) if (!mainServer[port]) { @@ -243,5 +262,5 @@ export { DEDICATED_DECOMPRESSOR_8KB, DISABLED, SHARED_COMPRESSOR, - SHARED_DECOMPRESSOR + SHARED_DECOMPRESSOR, } from 'uWebSockets.js' diff --git a/src/websocket-server.js b/src/websocket-server.js index d059583..9ce9656 100644 --- a/src/websocket-server.js +++ b/src/websocket-server.js @@ -59,18 +59,12 @@ import { Duplex } from 'streamx' import { HTTPSocket } from './http-socket.js' import { Request } from './request.js' import { Response } from './response.js' -import { - kApp, - kWs, - kHandler, - kTopic, - kEnded -} from './symbols.js' +import { kApp, kEnded, kHandler, kTopic, kWs } from './symbols.js' const defaultWebSocketConfig = { compression: uws.SHARED_COMPRESSOR, maxPayloadLength: 16 * 1024 * 1024, - idleTimeout: 16 + idleTimeout: 16, } const SEP = '!' @@ -82,20 +76,20 @@ export class WebSocket extends EventEmitter { * @param {Buffer | string} topic * @returns {Buffer} */ - static allocTopic (namespace, topic) { - if (topic[kTopic]) return /** @type {Buffer} */(topic) + static allocTopic(namespace, topic) { + if (topic[kTopic]) return /** @type {Buffer} */ (topic) const buf = Buffer.concat([ namespace, SEP_BUFFER, - Buffer.isBuffer(topic) ? topic : Buffer.from(topic) + Buffer.isBuffer(topic) ? topic : Buffer.from(topic), ]) buf[kTopic] = true return buf } - constructor (namespace, connection, topics = {}) { + constructor(namespace, connection, topics = {}) { super() /** @type {Buffer} */ @@ -107,7 +101,7 @@ export class WebSocket extends EventEmitter { this[kEnded] = false } - get uws () { + get uws() { return true } @@ -115,7 +109,7 @@ export class WebSocket extends EventEmitter { * @param {Buffer | string} topic * @returns {Buffer} */ - allocTopic (topic) { + allocTopic(topic) { if (this.topics[topic]) return this.topics[topic] return WebSocket.allocTopic(this.namespace, topic) } @@ -125,7 +119,7 @@ export class WebSocket extends EventEmitter { * @param {boolean} [isBinary] * @param {boolean} [compress] */ - send (message, isBinary, compress) { + send(message, isBinary, compress) { if (this[kEnded]) return return this.connection.send(message, isBinary, compress) } @@ -136,15 +130,20 @@ export class WebSocket extends EventEmitter { * @param {boolean} [isBinary] * @param {boolean} [compress] */ - publish (topic, message, isBinary, compress) { + publish(topic, message, isBinary, compress) { if (this[kEnded]) return - return this.connection.publish(this.allocTopic(topic), message, isBinary, compress) + return this.connection.publish( + this.allocTopic(topic), + message, + isBinary, + compress, + ) } /** * @param {Buffer | string} topic */ - subscribe (topic) { + subscribe(topic) { if (this[kEnded]) return return this.connection.subscribe(this.allocTopic(topic)) } @@ -152,7 +151,7 @@ export class WebSocket extends EventEmitter { /** * @param {Buffer | string} topic */ - unsubscribe (topic) { + unsubscribe(topic) { if (this[kEnded]) return return this.connection.unsubscribe(this.allocTopic(topic)) } @@ -160,17 +159,19 @@ export class WebSocket extends EventEmitter { /** * @param {Buffer | string} topic */ - isSubscribed (topic) { + isSubscribed(topic) { if (this[kEnded]) return false return this.connection.isSubscribed(this.allocTopic(topic)) } - getTopics () { + getTopics() { if (this[kEnded]) return [] - return this.connection.getTopics().map(topic => topic.slice(topic.indexOf(SEP) + 1)) + return this.connection + .getTopics() + .map((topic) => topic.slice(topic.indexOf(SEP) + 1)) } - close () { + close() { if (this[kEnded]) return this[kEnded] = true return this.connection.close() @@ -180,7 +181,7 @@ export class WebSocket extends EventEmitter { * @param {number} [code] * @param {RecognizedString} [shortMessage] */ - end (code, shortMessage) { + end(code, shortMessage) { if (this[kEnded]) return this[kEnded] = true return this.connection.end(code, shortMessage) @@ -189,12 +190,12 @@ export class WebSocket extends EventEmitter { /** * @param {() => void} cb */ - cork (cb) { + cork(cb) { if (this[kEnded]) return return this.connection.cork(cb) } - getBufferedAmount () { + getBufferedAmount() { if (this[kEnded]) return 0 return this.connection.getBufferedAmount() } @@ -202,7 +203,7 @@ export class WebSocket extends EventEmitter { /** * @param {RecognizedString} message */ - ping (message) { + ping(message) { if (this[kEnded]) return return this.connection.ping(message) } @@ -212,7 +213,7 @@ export class WebSocket extends EventEmitter { * @param {T} eventName * @param {WebsocketEvent[T]} listener */ - on (eventName, listener) { + on(eventName, listener) { return super.on(eventName, listener) } @@ -221,7 +222,7 @@ export class WebSocket extends EventEmitter { * @param {T} eventName * @param {WebsocketEvent[T]} listener */ - once (eventName, listener) { + once(eventName, listener) { return super.once(eventName, listener) } } @@ -239,7 +240,7 @@ export class WebSocketStream extends Duplex { * byteLengthWritable?: (packet: { data: any, isBinary: boolean, compress: boolean }) => number | 1024 // optional function that calculates the byte size of input data * }} opts */ - constructor (socket, opts = {}) { + constructor(socket, opts = {}) { const { compress = false } = opts super({ @@ -259,7 +260,7 @@ export class WebSocketStream extends Duplex { byteLengthWritable: (packet) => { if (opts.byteLengthWritable) return opts.byteLengthWritable(packet) return packet.isBinary ? packet.data.byteLength : 1024 - } + }, }) /** @type {WebSocket} */ @@ -267,22 +268,22 @@ export class WebSocketStream extends Duplex { this._onMessage = this._onMessage.bind(this) } - _open (cb) { + _open(cb) { this.socket.on('message', this._onMessage) cb() } - _close (cb) { + _close(cb) { this.socket.off('message', this._onMessage) this.socket.close() cb() } - _onMessage (data, isBinary) { + _onMessage(data, isBinary) { this.push({ data, isBinary }) } - _write (packet, cb) { + _write(packet, cb) { this.socket.send(packet.data, packet.isBinary, packet.compress) cb() } @@ -292,7 +293,7 @@ export class WebSocketServer extends EventEmitter { /** * @param {WSOptions} options */ - constructor (options = {}) { + constructor(options = {}) { super() this.options = { ...options, ...defaultWebSocketConfig } this.connections = new Set() @@ -301,7 +302,7 @@ export class WebSocketServer extends EventEmitter { /** * @param {import('./server.js').Server} server */ - addServer (server) { + addServer(server) { const { options } = this /** @type {TemplatedApp} */ const app = server[kApp] @@ -310,7 +311,11 @@ export class WebSocketServer extends EventEmitter { app.ws('/*', { upgrade: async (res, req, context) => { const method = req.getMethod().toUpperCase() - const socket = new HTTPSocket(server, res, method === 'GET' || method === 'HEAD') + const socket = new HTTPSocket( + server, + res, + method === 'GET' || method === 'HEAD', + ) const request = new Request(req, socket, method) const response = new Response(socket) request[kWs] = context @@ -320,7 +325,7 @@ export class WebSocketServer extends EventEmitter { /** * @param {UWSocket} ws */ - open: ws => { + open: (ws) => { this.connections.add(ws) // @ts-ignore ws.handler(ws) @@ -371,7 +376,7 @@ export class WebSocketServer extends EventEmitter { ws.websocket.emit('pong', message) this.emit('pong', ws, message) }, - ...options + ...options, }) } @@ -380,7 +385,7 @@ export class WebSocketServer extends EventEmitter { * @param {T} eventName * @param {WebsocketServerEvent[T]} listener */ - on (eventName, listener) { + on(eventName, listener) { return super.on(eventName, listener) } @@ -389,7 +394,7 @@ export class WebSocketServer extends EventEmitter { * @param {T} eventName * @param {WebsocketServerEvent[T]} listener */ - once (eventName, listener) { + once(eventName, listener) { return super.once(eventName, listener) } } diff --git a/tests/host.test.js b/tests/host.test.js index 9f471ca..c0f6252 100644 --- a/tests/host.test.js +++ b/tests/host.test.js @@ -1,6 +1,6 @@ +import fastify from 'fastify' import { test } from 'uvu' import * as assert from 'uvu/assert' -import fastify from 'fastify' import { serverFactory } from '../src/server.js' diff --git a/tests/http-socket.test.js b/tests/http-socket.test.js new file mode 100644 index 0000000..afe4ab7 --- /dev/null +++ b/tests/http-socket.test.js @@ -0,0 +1,32 @@ +import { Buffer } from 'buffer' +import { randomBytes } from 'crypto' +import { test } from 'uvu' +import * as assert from 'uvu/assert' +import { Server } from '../src/server.js' + +test('writable drain', async () => { + const chunks = [] + const server = new Server((_req, res) => { + for (let i = 0; i < 2; i++) { + const t = randomBytes(1024 * 1024 * 10) + chunks.push(t) + res.write(t) + } + res.end() + }) + + await new Promise((resolve) => server.listen({ host: 'localhost' }, resolve)) + + const res = await fetch(`http://localhost:${server.address().port}`).then( + (res) => res.arrayBuffer(), + ) + + const buf = Buffer.concat(chunks) + + assert.equal(res.byteLength, buf.length) + assert.equal(Buffer.from(res), buf) + + return new Promise((resolve) => server.close(() => resolve())) +}) + +test.run() diff --git a/tests/integration.test.js b/tests/integration.test.js index cce1948..b6a65c6 100644 --- a/tests/integration.test.js +++ b/tests/integration.test.js @@ -40,7 +40,7 @@ test('fastify integration tests', async () => { 'stream', 'https/https', 'url-rewriting', - 'trust-proxy' + 'trust-proxy', ], ignore: [ 'framework-unsupported method', @@ -54,10 +54,9 @@ test('fastify integration tests', async () => { 'Current opened connection should NOT continue to work after closing and return "connection: close" header - return503OnClosing: false, skip Node < v19.x', 'shutsdown while keep-alive connections are active (non-async, native)', 'shutsdown while keep-alive connections are active (non-async, idle, native)', - 'shutsdown while keep-alive connections are active (non-async, custom)' - ] + 'shutsdown while keep-alive connections are active (non-async, custom)', + ], }) - console.log(results) }) diff --git a/tests/plugin.test.js b/tests/plugin.test.js index ce690fe..6fa0266 100644 --- a/tests/plugin.test.js +++ b/tests/plugin.test.js @@ -1,18 +1,21 @@ import { once } from 'events' +import fastify from 'fastify' +import sget from 'simple-get' import { test } from 'uvu' import * as assert from 'uvu/assert' -import fastify from 'fastify' import WebSocket from 'ws' -import sget from 'simple-get' -import { serverFactory } from '../src/server.js' import fastifyUwsPlugin from '../src/plugin.js' +import { serverFactory } from '../src/server.js' -const get = (opts) => new Promise((resolve, reject) => sget.concat(opts, (err, _, data) => { - if (err) return reject(err) - resolve(data.toString()) -})) +const get = (opts) => + new Promise((resolve, reject) => + sget.concat(opts, (err, _, data) => { + if (err) return reject(err) + resolve(data.toString()) + }), + ) test.after.each(async (context) => { if (context.app) await context.app.close() @@ -21,11 +24,13 @@ test.after.each(async (context) => { test('basic websocket', async (context) => { let onGlobalMessage = 0 - const app = context.app = fastify({ + const app = fastify({ logger: true, - serverFactory + serverFactory, }) + context.app = app + await app.register(fastifyUwsPlugin) app.websocketServer.on('message', () => onGlobalMessage++) @@ -40,13 +45,15 @@ test('basic websocket', async (context) => { }, uwsHandler: (conn) => { conn.subscribe('home/sensors/temp') - conn.on('message', (message) => conn.publish('home/sensors/temp', message)) + conn.on('message', (message) => + conn.publish('home/sensors/temp', message), + ) conn.send(JSON.stringify(conn.getTopics())) - } + }, }) .listen({ port: 3000, - host: '127.0.0.1' + host: '127.0.0.1', }) const address = `${app.server.address().address}:${app.server.address().port}` @@ -55,13 +62,19 @@ test('basic websocket', async (context) => { const clientB = new WebSocket(`ws://${address}`) await Promise.all([ - once(clientA, 'message').then(([message]) => assert.is(message.toString(), JSON.stringify(['home/sensors/temp']))), - once(clientB, 'message').then(([message]) => assert.is(message.toString(), JSON.stringify(['home/sensors/temp']))), - once(app.websocketServer, 'open') + once(clientA, 'message').then(([message]) => + assert.is(message.toString(), JSON.stringify(['home/sensors/temp'])), + ), + once(clientB, 'message').then(([message]) => + assert.is(message.toString(), JSON.stringify(['home/sensors/temp'])), + ), + once(app.websocketServer, 'open'), ]) clientA.send('message from A') - await once(clientB, 'message').then(([message]) => assert.is(message.toString(), 'message from A')) + await once(clientB, 'message').then(([message]) => + assert.is(message.toString(), 'message from A'), + ) assert.is(onGlobalMessage, 1) diff --git a/tests/upgrade.test.js b/tests/upgrade.test.js index 678d594..934d88b 100644 --- a/tests/upgrade.test.js +++ b/tests/upgrade.test.js @@ -1,10 +1,10 @@ -import { lookup } from 'dns/promises' -import { connect } from 'net' import { once } from 'events' +import { connect } from 'net' +import { lookup } from 'dns/promises' +import Fastify from 'fastify' import { test } from 'uvu' import * as assert from 'uvu/assert' -import Fastify from 'fastify' import { serverFactory } from '../src/server.js' @@ -15,16 +15,22 @@ test('upgrade to both servers', async () => { } const app = Fastify({ - serverFactory + serverFactory, }) - app.get('/', (req, res) => { - }) + app.get('/', () => ({})) await app.listen() { const client = connect(app.server.address().port, '127.0.0.1') + const done = Promise.all([ + once(client, 'close'), + once(app.server, 'upgrade').then(([_req, socket, _head]) => { + assert.ok(`upgrade event ${JSON.stringify(socket.address())}`) + socket.end() + }), + ]) client.write('GET / HTTP/1.1\r\n') client.write('Upgrade: websocket\r\n') client.write('Connection: Upgrade\r\n') @@ -33,17 +39,18 @@ test('upgrade to both servers', async () => { client.write('Sec-WebSocket-Protocol: com.xxx.service.v1\r\n') client.write('Sec-WebSocket-Version: 13\r\n\r\n') client.write('\r\n\r\n') - await Promise.all([ - once(client, 'close'), - once(app.server, 'upgrade').then(([req, socket, head]) => { - assert.ok(`upgrade event ${JSON.stringify(socket.address())}`) - socket.end() - }) - ]) + await done } { const client = connect(app.server.address().port, '::1') + const done = Promise.all([ + once(client, 'close'), + once(app.server, 'upgrade').then(([_req, socket, _head]) => { + assert.ok(`upgrade event ${JSON.stringify(socket.address())}`) + socket.end() + }), + ]) client.write('GET / HTTP/1.1\r\n') client.write('Upgrade: websocket\r\n') client.write('Connection: Upgrade\r\n') @@ -51,16 +58,47 @@ test('upgrade to both servers', async () => { client.write('Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n') client.write('Sec-WebSocket-Protocol: com.xxx.service.v1\r\n') client.write('Sec-WebSocket-Version: 13\r\n\r\n') - await Promise.all([ + await done + } + + await app.close() +}) + +test('upgrade to both servers 2', async () => { + const results = await lookup('localhost', { all: true }) + if (results.length !== 2) { + throw new Error('should test both servers') + } + + const app = Fastify() + + app.get('/', () => { + console.log('entra') + return {} + }) + + await app.listen() + + { + const client = connect(app.server.address().port, '127.0.0.1') + const done = Promise.all([ once(client, 'close'), - once(app.server, 'upgrade').then(([req, socket, head]) => { + once(app.server, 'upgrade').then(([_req, socket, _head]) => { assert.ok(`upgrade event ${JSON.stringify(socket.address())}`) socket.end() - }) + }), ]) + client.write('GET / HTTP/1.1\r\n') + client.write('Upgrade: websocket\r\n') + client.write('Connection: Upgrade\r\n') + client.write('Host: localhost\r\n') + client.write('Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n') + client.write('Sec-WebSocket-Protocol: com.xxx.service.v1\r\n') + client.write('Sec-WebSocket-Version: 13\r\n\r\n') + client.write('\r\n\r\n') + await done } await app.close() }) - test.run() diff --git a/tsconfig.json b/tsconfig.json index b25734f..ecaa27b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,17 +1,14 @@ { - "compilerOptions": { - "allowJs": true, - "checkJs": true, - "noEmit": true, - "module": "NodeNext", - "target": "ES2022", - "moduleResolution": "NodeNext", - "esModuleInterop": true, - "allowSyntheticDefaultImports": true, - "forceConsistentCasingInFileNames": true - }, - "include": [ - "./src/**/*.js", - "types/fastify-overload.d.ts" - ] + "compilerOptions": { + "allowJs": true, + "checkJs": true, + "noEmit": true, + "module": "NodeNext", + "target": "ES2022", + "moduleResolution": "NodeNext", + "esModuleInterop": true, + "allowSyntheticDefaultImports": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["./src/**/*.js", "types/fastify-overload.d.ts"] }