Skip to content

Commit

Permalink
Use queue for sync jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ascorbic committed Aug 29, 2024
1 parent 04ecf3f commit 19ef892
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
1 change: 1 addition & 0 deletions packages/astro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
"esbuild": "^0.21.5",
"estree-walker": "^3.0.3",
"fast-glob": "^3.3.2",
"fastq": "^1.17.1",
"flattie": "^1.1.1",
"github-slugger": "^2.0.0",
"gray-matter": "^4.0.3",
Expand Down
44 changes: 24 additions & 20 deletions packages/astro/src/content/content-layer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { promises as fs, existsSync } from 'node:fs';
import { isAbsolute } from 'node:path';
import { fileURLToPath } from 'node:url';
import * as fastq from 'fastq';
import type { FSWatcher } from 'vite';
import xxhash from 'xxhash-wasm';
import type { AstroSettings, ContentEntryType, RefreshContentOptions } from '../@types/astro.js';
Expand Down Expand Up @@ -38,7 +39,8 @@ export class ContentLayer {

#generateDigest?: (data: Record<string, unknown> | string) => string;

#loading = false;
#queue: fastq.queueAsPromised<RefreshContentOptions, void>;

constructor({ settings, logger, store, watcher }: ContentLayerOptions) {
// The default max listeners is 10, which can be exceeded when using a lot of loaders
watcher?.setMaxListeners(50);
Expand All @@ -47,20 +49,23 @@ export class ContentLayer {
this.#store = store;
this.#settings = settings;
this.#watcher = watcher;
this.#queue = fastq.promise(this.#doSync.bind(this), 1);
}

/**
* Whether the content layer is currently loading content
*/
get loading() {
return !this.#queue.idle();
}

/**
* Watch for changes to the content config and trigger a sync when it changes.
*/
watchContentConfig() {
this.#unsubscribe?.();
this.#unsubscribe = globalContentConfigObserver.subscribe(async (ctx) => {
if (
!this.#loading &&
ctx.status === 'loaded' &&
ctx.config.digest !== this.#lastConfigDigest
) {
if (ctx.status === 'loaded' && ctx.config.digest !== this.#lastConfigDigest) {
this.sync();
}
});
Expand All @@ -70,7 +75,6 @@ export class ContentLayer {
this.#unsubscribe?.();
}


async #getGenerateDigest() {
if (this.#generateDigest) {
return this.#generateDigest;
Expand Down Expand Up @@ -116,12 +120,17 @@ export class ContentLayer {
}

/**
* Run the `load()` method of each collection's loader, which will load the data and save it in the data store.
* Enqueues a sync job that runs the `load()` method of each collection's loader, which will load the data and save it in the data store.
* The loader itself is responsible for deciding whether this will clear and reload the full collection, or
* perform an incremental update. After the data is loaded, the data store is written to disk.
* perform an incremental update. After the data is loaded, the data store is written to disk. Jobs are queued,
* so that only one sync can run at a time. The function returns a promise that resolves when this sync job is complete.
*/
async sync(options?: RefreshContentOptions) {


sync(options: RefreshContentOptions = {}): Promise<void> {
return this.#queue.push(options);
}

async #doSync(options: RefreshContentOptions) {
const contentConfig = globalContentConfigObserver.get();
const logger = this.#logger.forkIntegrationLogger('content');
if (contentConfig?.status !== 'loaded') {
Expand Down Expand Up @@ -170,7 +179,8 @@ export class ContentLayer {
// If loaders are specified, only sync the specified loaders
if (
options?.loaders &&
(typeof collection.loader !== 'object' || !options.loaders.includes(collection.loader.name))
(typeof collection.loader !== 'object' ||
!options.loaders.includes(collection.loader.name))
) {
return;
}
Expand Down Expand Up @@ -208,7 +218,7 @@ export class ContentLayer {
collectionName: name,
parseData,
loaderName: collection.loader.name,
refreshContextData: options?.context
refreshContextData: options?.context,
});

if (typeof collection.loader === 'function') {
Expand Down Expand Up @@ -289,18 +299,12 @@ export async function simpleLoader<TData extends { id: string }>(
function contentLayerSingleton() {
let instance: ContentLayer | null = null;
return {
initialized: () => Boolean(instance),
init: (options: ContentLayerOptions) => {
instance?.unwatchContentConfig();
instance = new ContentLayer(options);
return instance;
},
get: () => {
if (!instance) {
throw new Error('Content layer not initialized');
}
return instance;
},
get: () => instance,
dispose: () => {
instance?.unwatchContentConfig();
instance = null;
Expand Down
4 changes: 1 addition & 3 deletions packages/astro/src/core/dev/restart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ export async function createContainerWithAutomaticRestart({
key: 's',
description: 'sync content layer',
action: () => {
if (globalContentLayer.initialized()) {
globalContentLayer.get().sync();
}
globalContentLayer.get()?.sync();
},
});
}
Expand Down
4 changes: 2 additions & 2 deletions packages/astro/src/integrations/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import type {
RouteData,
RouteOptions,
} from '../@types/astro.js';
import { globalContentLayer } from '../content/content-layer.js';
import type { SerializedSSRManifest } from '../core/app/types.js';
import type { PageBuildData } from '../core/build/types.js';
import { buildClientDirectiveEntrypoint } from '../core/client-directive/index.js';
import { mergeConfig } from '../core/config/index.js';
import type { AstroIntegrationLogger, Logger } from '../core/logger/core.js';
import { isServerLikeOutput } from '../core/util.js';
import { validateSupportedFeatures } from './features-validation.js';
import { globalContentLayer } from '../content/content-layer.js';

async function withTakingALongTimeMsg<T>({
name,
Expand Down Expand Up @@ -378,7 +378,7 @@ export async function runHookServerSetup({
if (config.experimental?.contentLayer) {
refreshContent = async (options: RefreshContentOptions) => {
const contentLayer = await globalContentLayer.get();
await contentLayer.sync(options);
await contentLayer?.sync(options);
};
}

Expand Down
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 19ef892

Please sign in to comment.