From c120338d42b24c27b9898953586604dc373fa5bf Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Wed, 26 Jun 2024 08:24:37 +0900 Subject: [PATCH 01/11] feat(utils/stream): enable to abort streaming manually --- src/utils/stream.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/utils/stream.ts b/src/utils/stream.ts index f3d434e5e..13626fef1 100644 --- a/src/utils/stream.ts +++ b/src/utils/stream.ts @@ -30,7 +30,7 @@ export class StreamingApi { done ? controller.close() : controller.enqueue(value) }, cancel: () => { - this.abortSubscribers.forEach((subscriber) => subscriber()) + this.abort() }, }) } @@ -73,4 +73,8 @@ export class StreamingApi { onAbort(listener: () => void | Promise) { this.abortSubscribers.push(listener) } + + abort() { + this.abortSubscribers.forEach((subscriber) => subscriber()) + } } From 21c94b0e948f900a2868c72cb22c5161ef7291a0 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Wed, 26 Jun 2024 08:25:26 +0900 Subject: [PATCH 02/11] feat(utils/stream): prevent multiple aborts, and enable to get the abort status --- src/utils/stream.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/utils/stream.ts b/src/utils/stream.ts index 13626fef1..ee12473d3 100644 --- a/src/utils/stream.ts +++ b/src/utils/stream.ts @@ -9,6 +9,7 @@ export class StreamingApi { private writable: WritableStream private abortSubscribers: (() => void | Promise)[] = [] responseReadable: ReadableStream + aborted: boolean = false constructor(writable: WritableStream, _readable: ReadableStream) { this.writable = writable @@ -75,6 +76,9 @@ export class StreamingApi { } abort() { - this.abortSubscribers.forEach((subscriber) => subscriber()) + if (!this.aborted) { + this.aborted = true + this.abortSubscribers.forEach((subscriber) => subscriber()) + } } } From 9f5a78c4fb9f70e712dba18836f9995b5bb5e5c5 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Wed, 26 Jun 2024 08:35:50 +0900 Subject: [PATCH 03/11] fix(streaming): call `stream.abort()` explicitly when request is aborted --- src/helper/streaming/sse.ts | 9 +++++++++ src/helper/streaming/stream.ts | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/src/helper/streaming/sse.ts b/src/helper/streaming/sse.ts index 6498648d4..f34ebbde8 100644 --- a/src/helper/streaming/sse.ts +++ b/src/helper/streaming/sse.ts @@ -66,6 +66,15 @@ export const streamSSE = ( const { readable, writable } = new TransformStream() const stream = new SSEStreamingApi(writable, readable) + // bun does not cancel response stream when request is canceled, so detect abort by signal + c.req.raw.signal.addEventListener('abort', () => { + // "referencing a `c` that is never null in a condition" is a work around for bun (maybe JIT). + // If `c` is not referenced in this closure, this event will not fire. + if (c) { + stream.abort() + } + }) + c.header('Transfer-Encoding', 'chunked') c.header('Content-Type', 'text/event-stream') c.header('Cache-Control', 'no-cache') diff --git a/src/helper/streaming/stream.ts b/src/helper/streaming/stream.ts index ed7393369..e89454500 100644 --- a/src/helper/streaming/stream.ts +++ b/src/helper/streaming/stream.ts @@ -8,6 +8,15 @@ export const stream = ( ): Response => { const { readable, writable } = new TransformStream() const stream = new StreamingApi(writable, readable) + + // bun does not cancel response stream when request is canceled, so detect abort by signal + c.req.raw.signal.addEventListener('abort', () => { + // "referencing a `c` that is never null in a condition" is a work around for bun (maybe JIT). + // If `c` is not referenced in this closure, this event will not fire. + if (c) { + stream.abort() + } + }) ;(async () => { try { await cb(stream) From 842664573e97242fd9141b4ef2e79ac0066b9feb Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Thu, 27 Jun 2024 22:21:18 +0900 Subject: [PATCH 04/11] test: add tests for streaming --- package.json | 2 +- runtime_tests/bun/index.test.tsx | 69 +++++++++++++++++++++++++++++ runtime_tests/deno/stream.test.ts | 69 +++++++++++++++++++++++++++++ runtime_tests/node/index.test.ts | 67 ++++++++++++++++++++++++++++ src/helper/streaming/sse.test.ts | 27 +++++++++++ src/helper/streaming/stream.test.ts | 25 +++++++++++ src/utils/stream.test.ts | 22 +++++++++ 7 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 runtime_tests/deno/stream.test.ts diff --git a/package.json b/package.json index 57bd2ae6e..591330d23 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "scripts": { "test": "tsc --noEmit && vitest --run && vitest -c .vitest.config/jsx-runtime-default.ts --run && vitest -c .vitest.config/jsx-runtime-dom.ts --run", "test:watch": "vitest --watch", - "test:deno": "deno test --allow-read --allow-env --allow-write -c runtime_tests/deno/deno.json runtime_tests/deno && deno test --no-lock -c runtime_tests/deno-jsx/deno.precompile.json runtime_tests/deno-jsx && deno test --no-lock -c runtime_tests/deno-jsx/deno.react-jsx.json runtime_tests/deno-jsx", + "test:deno": "deno test --allow-read --allow-env --allow-write --allow-net -c runtime_tests/deno/deno.json runtime_tests/deno && deno test --no-lock -c runtime_tests/deno-jsx/deno.precompile.json runtime_tests/deno-jsx && deno test --no-lock -c runtime_tests/deno-jsx/deno.react-jsx.json runtime_tests/deno-jsx", "test:bun": "bun test --jsx-import-source ../../src/jsx runtime_tests/bun/index.test.tsx", "test:fastly": "vitest --run --config ./runtime_tests/fastly/vitest.config.ts", "test:node": "vitest --run --config ./runtime_tests/node/vitest.config.ts", diff --git a/runtime_tests/bun/index.test.tsx b/runtime_tests/bun/index.test.tsx index 3904c9edf..2ec57d6a0 100644 --- a/runtime_tests/bun/index.test.tsx +++ b/runtime_tests/bun/index.test.tsx @@ -11,6 +11,7 @@ import { jsx } from '../../src/jsx' import { basicAuth } from '../../src/middleware/basic-auth' import { jwt } from '../../src/middleware/jwt' import { HonoRequest } from '../../src/request' +import { stream, streamSSE } from '../..//src/helper/streaming' // Test just only minimal patterns. // Because others are tested well in Cloudflare Workers environment already. @@ -316,3 +317,71 @@ async function deleteDirectory(dirPath) { await fs.unlink(dirPath) } } + +describe('streaming', () => { + const app = new Hono() + let aborted = false + + app.get('/stream', (c) => { + return stream(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + app.get('/streamSSE', (c) => { + return streamSSE(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + + beforeEach(() => { + aborted = false + }) + + describe('stream', () => { + it('Should call onAbort', async () => { + const server = Bun.serve({ port: 0, fetch: app.fetch }) + const ac = new AbortController() + const req = new Request(`http://localhost:${server.port}/stream`, { + signal: ac.signal, + }) + expect(aborted).toBe(false) + const res = fetch(req).catch(() => {}) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + expect(aborted).toBe(true) + }) + }) + + describe('streamSSE', () => { + it('Should call onAbort', async () => { + const server = Bun.serve({ port: 0, fetch: app.fetch }) + const ac = new AbortController() + const req = new Request(`http://localhost:${server.port}/streamSSE`, { + signal: ac.signal, + }) + expect(aborted).toBe(false) + const res = fetch(req).catch(() => {}) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + expect(aborted).toBe(true) + }) + }) +}) diff --git a/runtime_tests/deno/stream.test.ts b/runtime_tests/deno/stream.test.ts new file mode 100644 index 000000000..8e48f51be --- /dev/null +++ b/runtime_tests/deno/stream.test.ts @@ -0,0 +1,69 @@ +import { Hono } from '../../src/hono.ts' +import { assertEquals } from './deps.ts' +import { stream, streamSSE } from '../../src/helper/streaming/index.ts' + +Deno.test('Shuld call onAbort via stream', async () => { + const app = new Hono() + let aborted = false + app.get('/stream', (c) => { + return stream(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + + const server = Deno.serve({ port: 0 }, app.fetch) + const ac = new AbortController() + const req = new Request(`http://localhost:${server.addr.port}/stream`, { + signal: ac.signal, + }) + assertEquals + const res = fetch(req).catch(() => {}) + assertEquals(aborted, false) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + assertEquals(aborted, true) + + await server.shutdown() +}) + +Deno.test('Shuld call onAbort via streamSSE', async () => { + const app = new Hono() + let aborted = false + app.get('/stream', (c) => { + return streamSSE(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + + const server = Deno.serve({ port: 0 }, app.fetch) + const ac = new AbortController() + const req = new Request(`http://localhost:${server.addr.port}/stream`, { + signal: ac.signal, + }) + assertEquals + const res = fetch(req).catch(() => {}) + assertEquals(aborted, false) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + assertEquals(aborted, true) + + await server.shutdown() +}) diff --git a/runtime_tests/node/index.test.ts b/runtime_tests/node/index.test.ts index 436a4dcd3..3b891da18 100644 --- a/runtime_tests/node/index.test.ts +++ b/runtime_tests/node/index.test.ts @@ -6,6 +6,7 @@ import { env, getRuntimeKey } from '../../src/helper/adapter' import { basicAuth } from '../../src/middleware/basic-auth' import { jwt } from '../../src/middleware/jwt' import { HonoRequest } from '../../src/request' +import { stream, streamSSE } from '../../src/helper/streaming' // Test only minimal patterns. // See for more tests and information. @@ -96,3 +97,69 @@ describe('JWT Auth Middleware', () => { expect(res.text).toBe('auth') }) }) + +describe('stream', () => { + const app = new Hono() + + let aborted = false + + app.get('/stream', (c) => { + return stream(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + + const server = createAdaptorServer(app) + + it('Should call onAbort', async () => { + const req = request(server) + .get('/stream') + .end(() => {}) + + expect(aborted).toBe(false) + await new Promise((resolve) => setTimeout(resolve, 10)) + req.abort() + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + expect(aborted).toBe(true) + }) +}) + +describe('streamSSE', () => { + const app = new Hono() + + let aborted = false + + app.get('/stream', (c) => { + return streamSSE(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + return new Promise((resolve) => { + stream.onAbort(resolve) + }) + }) + }) + + const server = createAdaptorServer(app) + + it('Should call onAbort', async () => { + const req = request(server) + .get('/stream') + .end(() => {}) + + expect(aborted).toBe(false) + await new Promise((resolve) => setTimeout(resolve, 10)) + req.abort() + while (!aborted) { + await new Promise((resolve) => setTimeout(resolve)) + } + expect(aborted).toBe(true) + }) +}) diff --git a/src/helper/streaming/sse.test.ts b/src/helper/streaming/sse.test.ts index c6f6ae599..d1ec48160 100644 --- a/src/helper/streaming/sse.test.ts +++ b/src/helper/streaming/sse.test.ts @@ -74,6 +74,33 @@ describe('SSE Streaming helper', () => { expect(aborted).toBeTruthy() }) + it('Check streamSSE Response if aborted by abort signal', async () => { + const ac = new AbortController() + const req = new HonoRequest(new Request('http://localhost/', { signal: ac.signal })) + const c = new Context(req) + + let aborted = false + const res = streamSSE(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + for (let i = 0; i < 3; i++) { + await stream.writeSSE({ + data: `Message ${i}`, + }) + await stream.sleep(1) + } + }) + if (!res.body) { + throw new Error('Body is null') + } + const reader = res.body.getReader() + const { value } = await reader.read() + expect(value).toEqual(new TextEncoder().encode('data: Message 0\n\n')) + ac.abort() + expect(aborted).toBeTruthy() + }) + it('Should include retry in the SSE message', async () => { const retryTime = 3000 // 3 seconds const res = streamSSE(c, async (stream) => { diff --git a/src/helper/streaming/stream.test.ts b/src/helper/streaming/stream.test.ts index 7dedb1541..3f886f1f8 100644 --- a/src/helper/streaming/stream.test.ts +++ b/src/helper/streaming/stream.test.ts @@ -47,6 +47,31 @@ describe('Basic Streaming Helper', () => { expect(aborted).toBeTruthy() }) + it('Check stream Response if aborted by abort signal', async () => { + const ac = new AbortController() + const req = new HonoRequest(new Request('http://localhost/', { signal: ac.signal })) + const c = new Context(req) + + let aborted = false + const res = stream(c, async (stream) => { + stream.onAbort(() => { + aborted = true + }) + for (let i = 0; i < 3; i++) { + await stream.write(new Uint8Array([i])) + await stream.sleep(1) + } + }) + if (!res.body) { + throw new Error('Body is null') + } + const reader = res.body.getReader() + const { value } = await reader.read() + expect(value).toEqual(new Uint8Array([0])) + ac.abort() + expect(aborted).toBeTruthy() + }) + it('Check stream Response if error occurred', async () => { const onError = vi.fn() const res = stream( diff --git a/src/utils/stream.test.ts b/src/utils/stream.test.ts index 5ce92ef17..f2b9b6d3f 100644 --- a/src/utils/stream.test.ts +++ b/src/utils/stream.test.ts @@ -96,4 +96,26 @@ describe('StreamingApi', () => { expect(handleAbort1).toBeCalled() expect(handleAbort2).toBeCalled() }) + + it('abort()', async () => { + const { readable, writable } = new TransformStream() + const handleAbort1 = vi.fn() + const handleAbort2 = vi.fn() + const api = new StreamingApi(writable, readable) + api.onAbort(handleAbort1) + api.onAbort(handleAbort2) + expect(handleAbort1).not.toBeCalled() + expect(handleAbort2).not.toBeCalled() + expect(api.aborted).toBe(false) + + api.abort() + expect(handleAbort1).toHaveBeenCalledOnce() + expect(handleAbort2).toHaveBeenCalledOnce() + expect(api.aborted).toBe(true) + + api.abort() + expect(handleAbort1).toHaveBeenCalledOnce() + expect(handleAbort2).toHaveBeenCalledOnce() + expect(api.aborted).toBe(true) + }) }) From 172fca262e890b4cfaf5b5666d17ee8325f2a810 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 06:38:32 +0900 Subject: [PATCH 05/11] docs(stream): add comments --- src/utils/stream.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/utils/stream.ts b/src/utils/stream.ts index ee12473d3..2bed9badd 100644 --- a/src/utils/stream.ts +++ b/src/utils/stream.ts @@ -9,6 +9,9 @@ export class StreamingApi { private writable: WritableStream private abortSubscribers: (() => void | Promise)[] = [] responseReadable: ReadableStream + /** + * Whether the stream has been aborted. + */ aborted: boolean = false constructor(writable: WritableStream, _readable: ReadableStream) { @@ -75,6 +78,10 @@ export class StreamingApi { this.abortSubscribers.push(listener) } + /** + * Abort the stream. + * You can call this method when stream is aborted by external event. + */ abort() { if (!this.aborted) { this.aborted = true From 92703995681fe96ee09059c68b5887e4c93f1409 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 06:54:30 +0900 Subject: [PATCH 06/11] test: add --allow-net to deno test command in ci.yml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21f051697..e1523a10c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: - uses: denoland/setup-deno@v1 with: deno-version: v1.x - - run: env NAME=Deno deno test --coverage=coverage/raw/deno-runtime --allow-read --allow-env --allow-write -c runtime_tests/deno/deno.json runtime_tests/deno + - run: env NAME=Deno deno test --coverage=coverage/raw/deno-runtime --allow-read --allow-env --allow-write --allow-net -c runtime_tests/deno/deno.json runtime_tests/deno - run: deno test -c runtime_tests/deno-jsx/deno.precompile.json --coverage=coverage/raw/deno-precompile-jsx runtime_tests/deno-jsx - run: deno test -c runtime_tests/deno-jsx/deno.react-jsx.json --coverage=coverage/raw/deno-react-jsx runtime_tests/deno-jsx - uses: actions/upload-artifact@v4 From dce4b42fbdffbc59d236e63ddaabaaebeadfd528 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 06:56:29 +0900 Subject: [PATCH 07/11] test(streaming): update test code --- src/helper/streaming/sse.test.ts | 2 +- src/helper/streaming/stream.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/helper/streaming/sse.test.ts b/src/helper/streaming/sse.test.ts index 2e1e12d2c..eb7bbb897 100644 --- a/src/helper/streaming/sse.test.ts +++ b/src/helper/streaming/sse.test.ts @@ -75,7 +75,7 @@ describe('SSE Streaming helper', () => { it('Check streamSSE Response if aborted by abort signal', async () => { const ac = new AbortController() - const req = new HonoRequest(new Request('http://localhost/', { signal: ac.signal })) + const req = new Request('http://localhost/', { signal: ac.signal }) const c = new Context(req) let aborted = false diff --git a/src/helper/streaming/stream.test.ts b/src/helper/streaming/stream.test.ts index ea17888fc..820579de5 100644 --- a/src/helper/streaming/stream.test.ts +++ b/src/helper/streaming/stream.test.ts @@ -48,7 +48,7 @@ describe('Basic Streaming Helper', () => { it('Check stream Response if aborted by abort signal', async () => { const ac = new AbortController() - const req = new HonoRequest(new Request('http://localhost/', { signal: ac.signal })) + const req = new Request('http://localhost/', { signal: ac.signal }) const c = new Context(req) let aborted = false From 0b15369e7c33f12bc8949fe5cc2c362f5db75a6f Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 08:49:17 +0900 Subject: [PATCH 08/11] test(stream): retry flaky test up to 3 times at "bun" --- runtime_tests/bun/index.test.tsx | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/runtime_tests/bun/index.test.tsx b/runtime_tests/bun/index.test.tsx index 2ec57d6a0..62b7788f6 100644 --- a/runtime_tests/bun/index.test.tsx +++ b/runtime_tests/bun/index.test.tsx @@ -369,18 +369,20 @@ describe('streaming', () => { describe('streamSSE', () => { it('Should call onAbort', async () => { const server = Bun.serve({ port: 0, fetch: app.fetch }) - const ac = new AbortController() - const req = new Request(`http://localhost:${server.port}/streamSSE`, { - signal: ac.signal, - }) - expect(aborted).toBe(false) - const res = fetch(req).catch(() => {}) - await new Promise((resolve) => setTimeout(resolve, 10)) - ac.abort() - await res - while (!aborted) { + + // It's a flaky test, so we try up to 3 times + for (let i = 0; !aborted && i < 3; i++) { + const ac = new AbortController() + const req = new Request(`http://localhost:${server.port}/streamSSE`, { + signal: ac.signal, + }) + const res = fetch(req).catch(() => {}) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res await new Promise((resolve) => setTimeout(resolve)) } + expect(aborted).toBe(true) }) }) From c43b89aa80940c486a4286a7b451b920737a5684 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 09:14:04 +0900 Subject: [PATCH 09/11] test(streaming): refactor test to use afterEach --- runtime_tests/bun/index.test.tsx | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/runtime_tests/bun/index.test.tsx b/runtime_tests/bun/index.test.tsx index 62b7788f6..c96ff837f 100644 --- a/runtime_tests/bun/index.test.tsx +++ b/runtime_tests/bun/index.test.tsx @@ -1,4 +1,4 @@ -import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { serveStatic, toSSG } from '../../src/adapter/bun' import { createBunWebSocket } from '../../src/adapter/bun/websocket' import type { BunWebSocketData } from '../../src/adapter/bun/websocket' @@ -320,6 +320,7 @@ async function deleteDirectory(dirPath) { describe('streaming', () => { const app = new Hono() + let server: ReturnType let aborted = false app.get('/stream', (c) => { @@ -345,11 +346,15 @@ describe('streaming', () => { beforeEach(() => { aborted = false + server = Bun.serve({ port: 0, fetch: app.fetch }) + }) + + afterEach(() => { + server.stop() }) describe('stream', () => { it('Should call onAbort', async () => { - const server = Bun.serve({ port: 0, fetch: app.fetch }) const ac = new AbortController() const req = new Request(`http://localhost:${server.port}/stream`, { signal: ac.signal, @@ -368,8 +373,6 @@ describe('streaming', () => { describe('streamSSE', () => { it('Should call onAbort', async () => { - const server = Bun.serve({ port: 0, fetch: app.fetch }) - // It's a flaky test, so we try up to 3 times for (let i = 0; !aborted && i < 3; i++) { const ac = new AbortController() From 55b6c8c22b9dbd9c4018d33cdc62d71b6801cc15 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 12:35:06 +0900 Subject: [PATCH 10/11] fix(streaming): in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming --- runtime_tests/bun/index.test.tsx | 20 +++++++++----------- src/helper/streaming/sse.ts | 9 ++++----- src/helper/streaming/stream.ts | 11 ++++++----- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/runtime_tests/bun/index.test.tsx b/runtime_tests/bun/index.test.tsx index c96ff837f..8caaf186f 100644 --- a/runtime_tests/bun/index.test.tsx +++ b/runtime_tests/bun/index.test.tsx @@ -373,19 +373,17 @@ describe('streaming', () => { describe('streamSSE', () => { it('Should call onAbort', async () => { - // It's a flaky test, so we try up to 3 times - for (let i = 0; !aborted && i < 3; i++) { - const ac = new AbortController() - const req = new Request(`http://localhost:${server.port}/streamSSE`, { - signal: ac.signal, - }) - const res = fetch(req).catch(() => {}) - await new Promise((resolve) => setTimeout(resolve, 10)) - ac.abort() - await res + const ac = new AbortController() + const req = new Request(`http://localhost:${server.port}/streamSSE`, { + signal: ac.signal, + }) + const res = fetch(req).catch(() => {}) + await new Promise((resolve) => setTimeout(resolve, 10)) + ac.abort() + await res + while (!aborted) { await new Promise((resolve) => setTimeout(resolve)) } - expect(aborted).toBe(true) }) }) diff --git a/src/helper/streaming/sse.ts b/src/helper/streaming/sse.ts index f34ebbde8..1ec5d5e73 100644 --- a/src/helper/streaming/sse.ts +++ b/src/helper/streaming/sse.ts @@ -58,6 +58,7 @@ const run = async ( } } +const contextStash = new WeakMap() export const streamSSE = ( c: Context, cb: (stream: SSEStreamingApi) => Promise, @@ -68,11 +69,7 @@ export const streamSSE = ( // bun does not cancel response stream when request is canceled, so detect abort by signal c.req.raw.signal.addEventListener('abort', () => { - // "referencing a `c` that is never null in a condition" is a work around for bun (maybe JIT). - // If `c` is not referenced in this closure, this event will not fire. - if (c) { - stream.abort() - } + stream.abort() }) c.header('Transfer-Encoding', 'chunked') @@ -80,6 +77,8 @@ export const streamSSE = ( c.header('Cache-Control', 'no-cache') c.header('Connection', 'keep-alive') + // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming + contextStash.set(stream.responseReadable, c) run(stream, cb, onError) return c.newResponse(stream.responseReadable) diff --git a/src/helper/streaming/stream.ts b/src/helper/streaming/stream.ts index e89454500..51f5f880d 100644 --- a/src/helper/streaming/stream.ts +++ b/src/helper/streaming/stream.ts @@ -1,6 +1,7 @@ import type { Context } from '../../context' import { StreamingApi } from '../../utils/stream' +const contextStash = new WeakMap() export const stream = ( c: Context, cb: (stream: StreamingApi) => Promise, @@ -11,12 +12,11 @@ export const stream = ( // bun does not cancel response stream when request is canceled, so detect abort by signal c.req.raw.signal.addEventListener('abort', () => { - // "referencing a `c` that is never null in a condition" is a work around for bun (maybe JIT). - // If `c` is not referenced in this closure, this event will not fire. - if (c) { - stream.abort() - } + stream.abort() }) + + // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming + contextStash.set(stream.responseReadable, c) ;(async () => { try { await cb(stream) @@ -30,5 +30,6 @@ export const stream = ( stream.close() } })() + return c.newResponse(stream.responseReadable) } From 49b770bd2827209c19f6f475ff43a9dcda5117d2 Mon Sep 17 00:00:00 2001 From: Taku Amano Date: Fri, 28 Jun 2024 12:42:01 +0900 Subject: [PATCH 11/11] refactor(streaming): tweaks code layout --- src/helper/streaming/sse.ts | 4 ++-- src/helper/streaming/stream.ts | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/helper/streaming/sse.ts b/src/helper/streaming/sse.ts index 1ec5d5e73..9bc18a615 100644 --- a/src/helper/streaming/sse.ts +++ b/src/helper/streaming/sse.ts @@ -71,14 +71,14 @@ export const streamSSE = ( c.req.raw.signal.addEventListener('abort', () => { stream.abort() }) + // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming + contextStash.set(stream.responseReadable, c) c.header('Transfer-Encoding', 'chunked') c.header('Content-Type', 'text/event-stream') c.header('Cache-Control', 'no-cache') c.header('Connection', 'keep-alive') - // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming - contextStash.set(stream.responseReadable, c) run(stream, cb, onError) return c.newResponse(stream.responseReadable) diff --git a/src/helper/streaming/stream.ts b/src/helper/streaming/stream.ts index 51f5f880d..f1264effc 100644 --- a/src/helper/streaming/stream.ts +++ b/src/helper/streaming/stream.ts @@ -14,7 +14,6 @@ export const stream = ( c.req.raw.signal.addEventListener('abort', () => { stream.abort() }) - // in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming contextStash.set(stream.responseReadable, c) ;(async () => {