Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Cache] Use LRU to manage cache data #73486

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export function createFromNextReadableStream(
})
}

export function createUnclosingPrefetchStream(
function createUnclosingPrefetchStream(
originalFlightStream: ReadableStream<Uint8Array>
): ReadableStream<Uint8Array> {
// When PPR is enabled, prefetch streams may contain references that never
Expand Down
208 changes: 180 additions & 28 deletions packages/next/src/client/components/segment-cache/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
import {
createFetch,
createFromNextReadableStream,
createUnclosingPrefetchStream,
urlToUrlWithoutFlightMarker,
type RequestHeaders,
} from '../router-reducer/fetch-server-response'
Expand All @@ -32,6 +31,7 @@ import type {
RouteCacheKey,
} from './cache-key'
import { createTupleMap, type TupleMap, type Prefix } from './tuple-map'
import { createLRU, type LRU } from './lru'

// A note on async/await when working in the prefetch cache:
//
Expand Down Expand Up @@ -60,6 +60,12 @@ type RouteCacheEntryShared = {
// true in all other cases, including on initialization when we haven't yet
// received a response from the server.
couldBeIntercepted: boolean

// LRU-related fields
keypath: null | Prefix<RouteCacheKeypath>
next: null | RouteCacheEntry
prev: null | RouteCacheEntry
size: number
}

export const enum EntryStatus {
Expand Down Expand Up @@ -99,6 +105,12 @@ export type RouteCacheEntry =

type SegmentCacheEntryShared = {
staleAt: number

// LRU-related fields
key: null | string
next: null | RouteCacheEntry
prev: null | RouteCacheEntry
size: number
}

type PendingSegmentCacheEntry = SegmentCacheEntryShared & {
Expand Down Expand Up @@ -136,9 +148,29 @@ type RouteCacheKeypath = [NormalizedHref, NormalizedNextUrl]
const routeCacheMap: TupleMap<RouteCacheKeypath, RouteCacheEntry> =
createTupleMap()

// We use an LRU for memory management. We must update this whenever we add or
// remove a new cache entry, or when an entry changes size.
// TODO: I chose the max size somewhat arbitrarily. Consider setting this based
// on navigator.deviceMemory, or some other heuristic. We should make this
// customizable via the Next.js config, too.
const maxRouteLruSize = 10 * 1024 * 1024 // 10 MB
const routeCacheLru = createLRU<RouteCacheEntry>(
maxRouteLruSize,
onRouteLRUEviction
)

// TODO: We may eventually store segment entries in a tuple map, too, to
// account for search params.
const segmentCache = new Map<string, SegmentCacheEntry>()
const segmentCacheMap = new Map<string, SegmentCacheEntry>()
// NOTE: Segments and Route entries are managed by separate LRUs. We could
// combine them into a single LRU, but because they are separate types, we'd
// need to wrap each one in an extra LRU node (to maintain monomorphism, at the
// cost of additional memory).
const maxSegmentLruSize = 50 * 1024 * 1024 // 50 MB
const segmentCacheLru = createLRU<SegmentCacheEntry>(
maxSegmentLruSize,
onSegmentLRUEviction
)

export function readExactRouteCacheEntry(
now: number,
Expand All @@ -152,10 +184,14 @@ export function readExactRouteCacheEntry(
// Check if the entry is stale
if (existingEntry.staleAt > now) {
// Reuse the existing entry.

// Since this is an access, move the entry to the front of the LRU.
routeCacheLru.put(existingEntry)

return existingEntry
} else {
// Evict the stale entry from the cache.
routeCacheMap.delete(keypath)
deleteRouteFromCache(existingEntry, keypath)
}
}
return null
Expand All @@ -180,15 +216,19 @@ export function readSegmentCacheEntry(
now: number,
path: string
): SegmentCacheEntry | null {
const existingEntry = segmentCache.get(path)
const existingEntry = segmentCacheMap.get(path)
if (existingEntry !== undefined) {
// Check if the entry is stale
if (existingEntry.staleAt > now) {
// Reuse the existing entry.

// Since this is an access, move the entry to the front of the LRU.
segmentCacheLru.put(existingEntry)

return existingEntry
} else {
// Evict the stale entry from the cache.
evictSegmentEntryFromCache(existingEntry, path)
deleteSegmentFromCache(existingEntry, path)
}
}
return null
Expand Down Expand Up @@ -248,11 +288,21 @@ export function requestRouteCacheEntryFromCache(
// could be intercepted. It's only set to false once we receive a response
// from the server.
couldBeIntercepted: true,

// LRU-related fields
keypath: null,
next: null,
prev: null,
size: 0,
}
spawnPrefetchSubtask(fetchRouteOnCacheMiss(pendingEntry, task))
const keypath: Prefix<RouteCacheKeypath> =
key.nextUrl === null ? [key.href] : [key.href, key.nextUrl]
routeCacheMap.set(keypath, pendingEntry)
// Stash the keypath on the entry so we know how to remove it from the map
// if it gets evicted from the LRU.
pendingEntry.keypath = keypath
routeCacheLru.put(pendingEntry)
return pendingEntry
}

Expand All @@ -279,6 +329,12 @@ export function requestSegmentEntryFromCache(
loading: null,
staleAt: route.staleAt,
promise: null,

// LRU-related fields
key: null,
next: null,
prev: null,
size: 0,
}
spawnPrefetchSubtask(
fetchSegmentEntryOnCacheMiss(
Expand All @@ -289,15 +345,50 @@ export function requestSegmentEntryFromCache(
accessToken
)
)
segmentCache.set(path, pendingEntry)
segmentCacheMap.set(path, pendingEntry)
// Stash the keypath on the entry so we know how to remove it from the map
// if it gets evicted from the LRU.
pendingEntry.key = path
segmentCacheLru.put(pendingEntry)
return pendingEntry
}

function evictSegmentEntryFromCache(
entry: SegmentCacheEntry,
key: string
function deleteRouteFromCache(
entry: RouteCacheEntry,
keypath: Prefix<RouteCacheKeypath>
): void {
segmentCache.delete(key)
pingBlockedTasks(entry)
routeCacheMap.delete(keypath)
routeCacheLru.delete(entry)
}

function deleteSegmentFromCache(entry: SegmentCacheEntry, key: string): void {
cancelEntryListeners(entry)
segmentCacheMap.delete(key)
segmentCacheLru.delete(entry)
}

function onRouteLRUEviction(entry: RouteCacheEntry): void {
// The LRU evicted this entry. Remove it from the map.
const keypath = entry.keypath
if (keypath !== null) {
entry.keypath = null
pingBlockedTasks(entry)
routeCacheMap.delete(keypath)
}
}

function onSegmentLRUEviction(entry: SegmentCacheEntry): void {
// The LRU evicted this entry. Remove it from the map.
const key = entry.key
if (key !== null) {
entry.key = null
cancelEntryListeners(entry)
segmentCacheMap.delete(key)
}
}

function cancelEntryListeners(entry: SegmentCacheEntry): void {
if (entry.status === EntryStatus.Pending && entry.promise !== null) {
// There were listeners for this entry. Resolve them with `null` to indicate
// that the prefetch failed. It's up to the listener to decide how to handle
Expand All @@ -309,6 +400,18 @@ function evictSegmentEntryFromCache(
}
}

function pingBlockedTasks(entry: {
blockedTasks: Set<PrefetchTask> | null
}): void {
const blockedTasks = entry.blockedTasks
if (blockedTasks !== null) {
for (const task of blockedTasks) {
pingPrefetchTask(task)
}
entry.blockedTasks = null
}
}

function fulfillRouteCacheEntry(
entry: PendingRouteCacheEntry,
tree: TreePrefetch,
Expand All @@ -324,13 +427,7 @@ function fulfillRouteCacheEntry(
fulfilledEntry.staleAt = staleAt
fulfilledEntry.couldBeIntercepted = couldBeIntercepted
fulfilledEntry.canonicalUrl = canonicalUrl
const blockedTasks = entry.blockedTasks
if (blockedTasks !== null) {
for (const task of blockedTasks) {
pingPrefetchTask(task)
}
fulfilledEntry.blockedTasks = null
}
pingBlockedTasks(entry)
return fulfilledEntry
}

Expand Down Expand Up @@ -360,13 +457,7 @@ function rejectRouteCacheEntry(
const rejectedEntry: RejectedRouteCacheEntry = entry as any
rejectedEntry.status = EntryStatus.Rejected
rejectedEntry.staleAt = staleAt
const blockedTasks = entry.blockedTasks
if (blockedTasks !== null) {
for (const task of blockedTasks) {
pingPrefetchTask(task)
}
rejectedEntry.blockedTasks = null
}
pingBlockedTasks(entry)
}

function rejectSegmentCacheEntry(
Expand Down Expand Up @@ -402,8 +493,13 @@ async function fetchRouteOnCacheMiss(
rejectRouteCacheEntry(entry, Date.now() + 10 * 1000)
return
}
const prefetchStream = createPrefetchResponseStream(
response.body,
routeCacheLru,
entry
)
const serverData: RootTreePrefetch = await (createFromNextReadableStream(
response.body
prefetchStream
) as Promise<RootTreePrefetch>)
if (serverData.buildId !== getAppBuildId()) {
// The server build does not match the client. Treat as a 404. During
Expand Down Expand Up @@ -449,6 +545,10 @@ async function fetchRouteOnCacheMiss(
routeCacheMap.delete(currentKeypath)
const newKeypath: Prefix<RouteCacheKeypath> = [href]
routeCacheMap.set(newKeypath, entry)
// We don't need to update the LRU because the entry is already in it.
// But since we changed the keypath, we do need to update that, so we
// know how to remove it from the map if it gets evicted from the LRU.
entry.keypath = newKeypath
} else {
// Something else modified this entry already. Since the re-keying is
// just a performance optimization, we can safely skip it.
Expand Down Expand Up @@ -488,9 +588,12 @@ async function fetchSegmentEntryOnCacheMiss(
return
}
// Wrap the original stream in a new stream that never closes. That way the
// Flight client doesn't error if there's a hanging promise. See comment in
// createUnclosingPrefetchStream for more details.
const prefetchStream = createUnclosingPrefetchStream(response.body)
// Flight client doesn't error if there's a hanging promise.
const prefetchStream = createPrefetchResponseStream(
response.body,
segmentCacheLru,
segmentCacheEntry
)
const serverData = await (createFromNextReadableStream(
prefetchStream
) as Promise<SegmentPrefetch>)
Expand Down Expand Up @@ -544,6 +647,55 @@ async function fetchSegmentPrefetchResponse(
return response
}

function createPrefetchResponseStream<
T extends RouteCacheEntry | SegmentCacheEntry,
>(
originalFlightStream: ReadableStream<Uint8Array>,
lru: LRU<T>,
lruEntry: T
): ReadableStream<Uint8Array> {
// When PPR is enabled, prefetch streams may contain references that never
// resolve, because that's how we encode dynamic data access. In the decoded
// object returned by the Flight client, these are reified into hanging
// promises that suspend during render, which is effectively what we want.
// The UI resolves when it switches to the dynamic data stream
// (via useDeferredValue(dynamic, static)).
//
// However, the Flight implementation currently errors if the server closes
// the response before all the references are resolved. As a cheat to work
// around this, we wrap the original stream in a new stream that never closes,
// and therefore doesn't error.
//
// While processing the original stream, we also incrementally update the size
// of the cache entry in the LRU.
let totalByteLength = 0
const reader = originalFlightStream.getReader()
return new ReadableStream({
async pull(controller) {
while (true) {
const { done, value } = await reader.read()
if (!done) {
// Pass to the target stream and keep consuming the Flight response
// from the server.
controller.enqueue(value)

// Incrementally update the size of the cache entry in the LRU.
// NOTE: Since prefetch responses are delivered in a single chunk,
// it's not really necessary to do this streamingly, but I'm doing it
// anyway in case this changes in the future.
totalByteLength += value.byteLength
lru.updateSize(lruEntry, totalByteLength)

continue
}
// The server stream has closed. Exit, but intentionally do not close
// the target stream.
return
}
},
})
}

function createPromiseWithResolvers<T>(): PromiseWithResolvers<T> {
// Shim of Stage 4 Promise.withResolvers proposal
let resolve: (value: T | PromiseLike<T>) => void
Expand Down
Loading
Loading