Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves content layer sync to a queue and support selective sync #11767

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sixty-masks-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'astro': patch
---

Refactors content layer sync to use a queue
1 change: 1 addition & 0 deletions packages/astro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
"esbuild": "^0.21.5",
"estree-walker": "^3.0.3",
"fast-glob": "^3.3.2",
"fastq": "^1.17.1",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a transitive dependency via about a million of our deps

"flattie": "^1.1.1",
"github-slugger": "^2.0.0",
"gray-matter": "^4.0.3",
Expand Down
5 changes: 5 additions & 0 deletions packages/astro/src/@types/astro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3242,6 +3242,11 @@ export interface SSRLoadedRenderer extends Pick<AstroRenderer, 'name' | 'clientE
ssr: SSRLoadedRendererValue;
}

export interface RefreshContentOptions {
loaders?: Array<string>;
context?: Record<string, any>;
}

export type HookParameters<
Hook extends keyof AstroIntegration['hooks'],
Fn = AstroIntegration['hooks'][Hook],
Expand Down
66 changes: 33 additions & 33 deletions packages/astro/src/content/content-layer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { promises as fs, existsSync } from 'node:fs';
import { isAbsolute } from 'node:path';
import { fileURLToPath } from 'node:url';
import * as fastq from 'fastq';
import type { FSWatcher } from 'vite';
import xxhash from 'xxhash-wasm';
import type { AstroSettings, ContentEntryType } from '../@types/astro.js';
import type { AstroSettings, ContentEntryType, RefreshContentOptions } from '../@types/astro.js';
import { AstroUserError } from '../core/errors/errors.js';
import type { Logger } from '../core/logger/core.js';
import {
Expand Down Expand Up @@ -38,7 +39,8 @@ export class ContentLayer {

#generateDigest?: (data: Record<string, unknown> | string) => string;

#loading = false;
#queue: fastq.queueAsPromised<RefreshContentOptions, void>;

constructor({ settings, logger, store, watcher }: ContentLayerOptions) {
// The default max listeners is 10, which can be exceeded when using a lot of loaders
watcher?.setMaxListeners(50);
Expand All @@ -47,13 +49,14 @@ export class ContentLayer {
this.#store = store;
this.#settings = settings;
this.#watcher = watcher;
this.#queue = fastq.promise(this.#doSync.bind(this), 1);
}

/**
* Whether the content layer is currently loading content
*/
get loading() {
return this.#loading;
return !this.#queue.idle();
}

/**
Expand All @@ -62,11 +65,7 @@ export class ContentLayer {
watchContentConfig() {
this.#unsubscribe?.();
this.#unsubscribe = globalContentConfigObserver.subscribe(async (ctx) => {
if (
!this.#loading &&
ctx.status === 'loaded' &&
ctx.config.digest !== this.#lastConfigDigest
) {
if (ctx.status === 'loaded' && ctx.config.digest !== this.#lastConfigDigest) {
this.sync();
}
});
Expand All @@ -76,23 +75,6 @@ export class ContentLayer {
this.#unsubscribe?.();
}

/**
* Run the `load()` method of each collection's loader, which will load the data and save it in the data store.
* The loader itself is responsible for deciding whether this will clear and reload the full collection, or
* perform an incremental update. After the data is loaded, the data store is written to disk.
*/
async sync() {
if (this.#loading) {
return;
}
this.#loading = true;
try {
await this.#doSync();
} finally {
this.#loading = false;
}
}

async #getGenerateDigest() {
if (this.#generateDigest) {
return this.#generateDigest;
Expand All @@ -113,10 +95,12 @@ export class ContentLayer {
collectionName,
loaderName = 'content',
parseData,
refreshContextData,
}: {
collectionName: string;
loaderName: string;
parseData: LoaderContext['parseData'];
refreshContextData?: Record<string, unknown>;
}): Promise<LoaderContext> {
return {
collection: collectionName,
Expand All @@ -127,14 +111,26 @@ export class ContentLayer {
parseData,
generateDigest: await this.#getGenerateDigest(),
watcher: this.#watcher,
refreshContextData,
entryTypes: getEntryConfigByExtMap([
...this.#settings.contentEntryTypes,
...this.#settings.dataEntryTypes,
] as Array<ContentEntryType>),
};
}

async #doSync() {
/**
* Enqueues a sync job that runs the `load()` method of each collection's loader, which will load the data and save it in the data store.
* The loader itself is responsible for deciding whether this will clear and reload the full collection, or
* perform an incremental update. After the data is loaded, the data store is written to disk. Jobs are queued,
* so that only one sync can run at a time. The function returns a promise that resolves when this sync job is complete.
*/

sync(options: RefreshContentOptions = {}): Promise<void> {
return this.#queue.push(options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon the question, I don't know how fastq works, and the the docs are a bit difficult to grasp. Does push execute the task by itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, pushing it onto the queue will start the queue running

}

async #doSync(options: RefreshContentOptions) {
const contentConfig = globalContentConfigObserver.get();
const logger = this.#logger.forkIntegrationLogger('content');
if (contentConfig?.status !== 'loaded') {
Expand Down Expand Up @@ -180,6 +176,15 @@ export class ContentLayer {
}
}

// If loaders are specified, only sync the specified loaders
if (
options?.loaders &&
(typeof collection.loader !== 'object' ||
!options.loaders.includes(collection.loader.name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand !options.loaders.includes(collection.loader.name), but I ask just in case: why do we add this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this will allow selective sync: if loaders is passed then only loaders named in that list will be refreshed

) {
return;
}

const collectionWithResolvedSchema = { ...collection, schema };

const parseData: LoaderContext['parseData'] = async ({ id, data, filePath = '' }) => {
Expand Down Expand Up @@ -213,6 +218,7 @@ export class ContentLayer {
collectionName: name,
parseData,
loaderName: collection.loader.name,
refreshContextData: options?.context,
});

if (typeof collection.loader === 'function') {
Expand Down Expand Up @@ -293,18 +299,12 @@ export async function simpleLoader<TData extends { id: string }>(
function contentLayerSingleton() {
let instance: ContentLayer | null = null;
return {
initialized: () => Boolean(instance),
init: (options: ContentLayerOptions) => {
instance?.unwatchContentConfig();
instance = new ContentLayer(options);
return instance;
},
get: () => {
if (!instance) {
throw new Error('Content layer not initialized');
}
return instance;
},
get: () => instance,
dispose: () => {
instance?.unwatchContentConfig();
instance = null;
Expand Down
1 change: 1 addition & 0 deletions packages/astro/src/content/loaders/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface LoaderContext {
/** When running in dev, this is a filesystem watcher that can be used to trigger updates */
watcher?: FSWatcher;

refreshContextData?: Record<string, unknown>;
entryTypes: Map<string, ContentEntryType>;
}

Expand Down
4 changes: 1 addition & 3 deletions packages/astro/src/core/dev/restart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ export async function createContainerWithAutomaticRestart({
key: 's',
description: 'sync content layer',
action: () => {
if (globalContentLayer.initialized()) {
globalContentLayer.get().sync();
}
globalContentLayer.get()?.sync();
},
});
}
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading