Skip to content

Commit

Permalink
The patched fetch function should not buffer a streamed response
Browse files Browse the repository at this point in the history
When our patched `fetch` function comes to the conclusion that it should
cache a response, it currently buffers the full response body before
returning a pseudo-cloned `Response` instance.

This is especially a problem in chat applications, where LLM responses
need to be streamed to the client immediately, without being buffered.

Since those chat requests are usually POSTs though, the buffering in
`createPatchedFetcher` did not create a problem because this was only
applied to GET requests. Although use cases where GET requests are
streamed do also exist, most prominently RSC requests. Those would have
been already affected by the buffering.

With the introduction of the Server Components HMR cache in #67527
(enabled per default in #67800), the patched `fetch` function was also
buffering POST response bodies, so that they can be stored in the HMR
cache. This made the buffering behaviour obvious because now Next.js
applications using the AI SDK to stream responses were affected, see
vercel/ai#2480 for example.

With this PR, we are now returning the original response immediately,
thus allowing streaming again, and cache a cloned response in the
background.

As an alternative, I considered to not cache POST requests in the Server
Components HMR cache. But I dismissed this solution, because I still
think that caching those requests is useful when editing server
components. In addition, this solution would not have addressed the
buffering issue for GET requests.
  • Loading branch information
unstubbable committed Aug 6, 2024
1 parent 326785d commit c05acf7
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 50 deletions.
99 changes: 99 additions & 0 deletions packages/next/src/server/lib/patch-fetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { AsyncLocalStorage } from 'node:async_hooks'
import type { RequestStore } from '../../client/components/request-async-storage.external'
import type { StaticGenerationStore } from '../../client/components/static-generation-async-storage.external'
import type { IncrementalCache } from './incremental-cache'
import { createPatchedFetcher } from './patch-fetch'

describe('createPatchedFetcher', () => {
it('should not buffer a streamed response', async () => {
const mockFetch: jest.MockedFunction<typeof fetch> = jest.fn()
let streamChunk: () => void

const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('stream start'))
streamChunk = () => {
controller.enqueue(new TextEncoder().encode('stream end'))
controller.close()
}
},
})

mockFetch.mockResolvedValue(new Response(readableStream))

const staticGenerationAsyncStorage =
new AsyncLocalStorage<StaticGenerationStore>()

const patchedFetch = createPatchedFetcher(mockFetch, {
// requestAsyncStorage does not need to provide a store for this test.
requestAsyncStorage: new AsyncLocalStorage<RequestStore>(),
staticGenerationAsyncStorage,
})

let resolveIncrementalCacheSet: () => void

const incrementalCacheSetPromise = new Promise<void>((resolve) => {
resolveIncrementalCacheSet = resolve
})

const incrementalCache = {
get: jest.fn(),
set: jest.fn(() => resolveIncrementalCacheSet()),
generateCacheKey: jest.fn(() => 'test-cache-key'),
lock: jest.fn(),
} as unknown as IncrementalCache

// We only need to provide a few of the StaticGenerationStore properties.
const staticGenerationStore: Partial<StaticGenerationStore> = {
page: '/',
route: '/',
incrementalCache,
}

await staticGenerationAsyncStorage.run(
staticGenerationStore as StaticGenerationStore,
async () => {
const response = await patchedFetch('https://example.com', {
cache: 'force-cache',
})

if (!response.body) {
throw new Error(`Response body is ${JSON.stringify(response.body)}.`)
}

const reader = response.body.getReader()
let result = await reader.read()
const textDecoder = new TextDecoder()
expect(textDecoder.decode(result.value)).toBe('stream start')
streamChunk()
result = await reader.read()
expect(textDecoder.decode(result.value)).toBe('stream end')

await incrementalCacheSetPromise

expect(incrementalCache.set).toHaveBeenCalledWith(
'test-cache-key',
{
data: {
body: btoa('stream startstream end'),
headers: {},
status: 200,
url: '', // the mocked response does not have a URL
},
kind: 'FETCH',
revalidate: 31536000, // default of one year
},
{
fetchCache: true,
fetchIdx: 1,
fetchUrl: 'https://example.com/',
revalidate: false,
tags: [],
}
)
}
)
// Setting a lower timeout than default, because the test will fail with a
// timeout when we regress and buffer the response.
}, 1000)
})
96 changes: 46 additions & 50 deletions packages/next/src/server/lib/patch-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ interface PatchableModule {
requestAsyncStorage: RequestAsyncStorage
}

function createPatchedFetcher(
export function createPatchedFetcher(
originFetch: Fetcher,
{ staticGenerationAsyncStorage, requestAsyncStorage }: PatchableModule
): PatchedFetcher {
Expand Down Expand Up @@ -488,17 +488,17 @@ function createPatchedFetcher(
finalRevalidate === false

let cacheKey: string | undefined
const { incrementalCache } = staticGenerationStore

if (
staticGenerationStore.incrementalCache &&
incrementalCache &&
(isCacheableRevalidate || requestStore?.serverComponentsHmrCache)
) {
try {
cacheKey =
await staticGenerationStore.incrementalCache.generateCacheKey(
fetchUrl,
isRequestInput ? (input as RequestInit) : init
)
cacheKey = await incrementalCache.generateCacheKey(
fetchUrl,
isRequestInput ? (input as RequestInit) : init
)
} catch (err) {
console.error(`Failed to generate cache key for`, input)
}
Expand Down Expand Up @@ -576,52 +576,49 @@ function createPatchedFetcher(
}
if (
res.status === 200 &&
staticGenerationStore.incrementalCache &&
incrementalCache &&
cacheKey &&
(isCacheableRevalidate || requestStore?.serverComponentsHmrCache)
) {
const bodyBuffer = Buffer.from(await res.arrayBuffer())

const cachedFetchData = {
headers: Object.fromEntries(res.headers.entries()),
body: bodyBuffer.toString('base64'),
status: res.status,
url: res.url,
}

requestStore?.serverComponentsHmrCache?.set(
cacheKey,
cachedFetchData
)
res
.clone()
.arrayBuffer()
.then(async (arrayBuffer) => {
const bodyBuffer = Buffer.from(arrayBuffer)

const cachedFetchData = {
headers: Object.fromEntries(res.headers.entries()),
body: bodyBuffer.toString('base64'),
status: res.status,
url: res.url,
}

if (isCacheableRevalidate) {
try {
await staticGenerationStore.incrementalCache.set(
requestStore?.serverComponentsHmrCache?.set(
cacheKey,
{
kind: 'FETCH',
data: cachedFetchData,
revalidate: normalizedRevalidate,
},
{
fetchCache: true,
revalidate: finalRevalidate,
fetchUrl,
fetchIdx,
tags,
}
cachedFetchData
)
} catch (err) {
console.warn(`Failed to set fetch cache`, input, err)
}
}

const response = new Response(bodyBuffer, {
headers: new Headers(res.headers),
status: res.status,
})
Object.defineProperty(response, 'url', { value: res.url })
return response
if (isCacheableRevalidate) {
await incrementalCache.set(
cacheKey,
{
kind: 'FETCH',
data: cachedFetchData,
revalidate: normalizedRevalidate,
},
{
fetchCache: true,
revalidate: finalRevalidate,
fetchUrl,
fetchIdx,
tags,
}
)
}
})
.catch((error) =>
console.warn(`Failed to set fetch cache`, input, error)
)
}
return res
})
Expand All @@ -632,7 +629,7 @@ function createPatchedFetcher(
let isForegroundRevalidate = false
let isHmrRefreshCache = false

if (cacheKey && staticGenerationStore.incrementalCache) {
if (cacheKey && incrementalCache) {
let cachedFetchData: CachedFetchData | undefined

if (
Expand All @@ -646,12 +643,11 @@ function createPatchedFetcher(
}

if (isCacheableRevalidate && !cachedFetchData) {
handleUnlock =
await staticGenerationStore.incrementalCache.lock(cacheKey)
handleUnlock = await incrementalCache.lock(cacheKey)

const entry = staticGenerationStore.isOnDemandRevalidate
? null
: await staticGenerationStore.incrementalCache.get(cacheKey, {
: await incrementalCache.get(cacheKey, {
kindHint: 'fetch',
revalidate: finalRevalidate,
fetchUrl,
Expand Down

0 comments on commit c05acf7

Please sign in to comment.