Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/603 hono node ws duplicate event listeners #605

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/olive-ducks-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@hono/node-ws': patch
---

Fixed bug with multiple connections in node-ws
108 changes: 89 additions & 19 deletions packages/node-ws/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,105 @@ import { WebSocket } from 'ws'
import { createNodeWebSocket } from '.'

describe('WebSocket helper', () => {
const app = new Hono()
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app })
let app: Hono
let server: ServerType
let injectWebSocket: ReturnType<typeof createNodeWebSocket>['injectWebSocket']
let upgradeWebSocket: ReturnType<typeof createNodeWebSocket>['upgradeWebSocket']

beforeEach(async () => {
app = new Hono()
;({ injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }))

server = await new Promise<ServerType>((resolve) => {
const server = serve({ fetch: app.fetch, port: 3030 }, () => resolve(server))
})
injectWebSocket(server)
})

afterEach(() => {
server.close()
})

it('Should be able to connect', async () => {
const mainPromise = new Promise<boolean>((resolve) =>
app.get(
'/',
upgradeWebSocket(() => ({
onOpen() {
resolve(true)
},
}))
)
)

new WebSocket('ws://localhost:3030/')

expect(await mainPromise).toBe(true)
})

it('Should be able to send and receive messages', async () => {
const mainPromise = new Promise((resolve) =>
app.get(
'/',
upgradeWebSocket(() => ({
onMessage(data) {
resolve(data.data)
},
}))
)
)

const ws = new WebSocket('ws://localhost:3030/')
await new Promise<void>((resolve) => ws.on('open', resolve))
ws.send('Hello')

expect(await mainPromise).toBe('Hello')
})

it('Should handle multiple concurrent connections', async () => {
const connectionCount = 5
let openConnections = 0
const messages: string[] = []

const mainPromise = new Promise((resolve) =>
app.get(
'/',
upgradeWebSocket(() => ({
onOpen() {
resolve(true)
openConnections++
},
onMessage(data, ws) {
messages.push(data.data as string)
ws.send(data.data as string)
},
}))
)
)

it('Should be able to connect', async () => {
const server = await new Promise<ServerType>((resolve) => {
const server = serve(
{
fetch: app.fetch,
port: 3030,
},
() => {
resolve(server)
}
)
const connections = await Promise.all(
Array(connectionCount)
.fill(null)
.map(async () => {
const ws = new WebSocket('ws://localhost:3030/')
await new Promise<void>((resolve) => ws.on('open', resolve))
return ws
})
)

expect(openConnections).toBe(connectionCount)

await Promise.all(
connections.map((ws, index) => {
return new Promise<void>((resolve) => {
ws.send(`Hello from connection ${index + 1}`)
ws.on('message', () => resolve())
})
})
)

expect(messages.length).toBe(connectionCount)
messages.forEach((msg, index) => {
expect(msg).toBe(`Hello from connection ${index + 1}`)
})
injectWebSocket(server)
new WebSocket('ws://localhost:3030/')

expect(await mainPromise).toBe(true)
connections.forEach((ws) => ws.close())
})
})
36 changes: 29 additions & 7 deletions packages/node-ws/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import type { Server } from 'node:http'
import type { Http2SecureServer, Http2Server } from 'node:http2'
import type { Hono } from 'hono'
import type { UpgradeWebSocket, WSContext } from 'hono/ws'
import type { WebSocket } from 'ws'
import { WebSocketServer } from 'ws'
import type { IncomingMessage } from 'http'

export interface NodeWebSocket {
upgradeWebSocket: UpgradeWebSocket
Expand All @@ -20,7 +22,22 @@ export interface NodeWebSocketInit {
* @returns NodeWebSocket
*/
export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
const wss = new WebSocketServer({noServer: true})
const wss = new WebSocketServer({ noServer: true })
const waiter = new Map<IncomingMessage, (ws: WebSocket) => void>()

wss.on('connection', (ws, request) => {
const waiterFn = waiter.get(request)
if (waiterFn) {
waiterFn(ws)
waiter.delete(request)
}
})

const nodeUpgradeWebSocket = (request: IncomingMessage) => {
return new Promise<WebSocket>((resolve) => {
waiter.set(request, resolve)
})
}

return {
injectWebSocket(server) {
Expand All @@ -34,9 +51,11 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
}
headers.append(key, Array.isArray(value) ? value[0] : value)
}
await init.app.request(url, {
headers: headers,
})
await init.app.request(
url,
{ headers: headers },
{ incoming: request, outgoing: undefined }
)
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request)
})
Expand All @@ -49,8 +68,11 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
await next()
return
}
const events = await createEvents(c)
wss.on('connection', (ws) => {

;(async () => {
const events = await createEvents(c)
const ws = await nodeUpgradeWebSocket(c.env.incoming)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about accepting { server: { incoming: request, outgoing: undefined } } as Env?

When user use env as custom value, WebSocket helper may not work.

References: honojs/hono#2645, honojs/hono#2696, honojs/hono#2812, honojs/hono#2595 (comment).

Copy link
Contributor Author

@inaridiy inaridiy Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review.

First, there are many theories about the implementation of { server: { incoming: request, outgoing: undefined } }, but I consider this a minimal change to fix a bug.

Secondly, this is just my subjective view.
I see env as an area to store values given by the runtime.
Rather than storing values in the env area, I think a Variables area and middleware is a more natural way of doing this.

Again, I am not familiar with HonoJS core ideas, so this is subjective.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nakasyou @inaridiy

Accepting { server: { incoming: request, outgoing: undefined } for Env is a good idea! But, as @inaridiy said, we don't have to add the change about it to this PR. And you can create another PR after merging this. Shall we go with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you do about this problem? Once we have discussed how to use env, should I reflect that discussion?

@nakasyou @yusukebe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge this first without including Env things!


const ctx: WSContext = {
binaryType: 'arraybuffer',
close(code, reason) {
Expand Down Expand Up @@ -92,7 +114,7 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
ctx
)
})
})
})()

return new Response()
},
Expand Down