Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: resilient sitemap loading #2619

Merged
merged 9 commits into from
Aug 16, 2024
50 changes: 45 additions & 5 deletions packages/core/src/storages/sitemap_request_list.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { Transform } from 'node:stream';

import defaultLog from '@apify/log';
import { parseSitemap } from '@crawlee/utils';
import { type ParseSitemapOptions, parseSitemap } from '@crawlee/utils';
import ow from 'ow';

import { KeyValueStore } from './key_value_store';
import type { IRequestList } from './request_list';
import { purgeDefaultStorages } from './utils';
import { Request } from '../request';

/** @internal */
const STATE_PERSISTENCE_KEY = 'SITEMAP_REQUEST_LIST_STATE';

export interface SitemapRequestListOptions {
/**
* List of sitemap URLs to parse.
Expand Down Expand Up @@ -37,6 +40,10 @@ export interface SitemapRequestListOptions {
* @default 200
*/
maxBufferSize?: number;
/**
* Advanced options for the underlying `parseSitemap` call.
*/
parseSitemapOptions?: Omit<ParseSitemapOptions, 'emitNestedSitemaps' | 'maxDepth'>;
}

interface SitemapParsingProgress {
Expand All @@ -50,6 +57,7 @@ interface SitemapRequestListState {
reclaimed: string[];
sitemapParsingProgress: Record<keyof SitemapParsingProgress, any>;
abortLoading: boolean;
closed: boolean;
requestData: [string, Request][];
}

Expand Down Expand Up @@ -118,6 +126,8 @@ export class SitemapRequestList implements IRequestList {

private store?: KeyValueStore;

private closed: boolean = false;

/**
* Proxy URL to be used for sitemap loading.
*/
Expand All @@ -139,6 +149,7 @@ export class SitemapRequestList implements IRequestList {
signal: ow.optional.any(),
timeoutMillis: ow.optional.number,
maxBufferSize: ow.optional.number,
parseSitemapOptions: ow.optional.object,
}),
);

Expand All @@ -161,6 +172,10 @@ export class SitemapRequestList implements IRequestList {
*/
private async pushNextUrl(url: string | null) {
return new Promise<void>((resolve) => {
if (this.closed) {
return resolve();
}

if (!this.urlQueueStream.push(url)) {
// This doesn't work with the 'drain' event (it's not emitted for some reason).
this.urlQueueStream.once('readdata', () => {
Expand All @@ -180,6 +195,10 @@ export class SitemapRequestList implements IRequestList {
*/
private async readNextUrl(): Promise<string | null> {
return new Promise((resolve) => {
if (this.closed) {
return resolve(null);
}

const result = this.urlQueueStream.read();

if (!result && !this.isSitemapFullyLoaded()) {
Expand Down Expand Up @@ -211,14 +230,17 @@ export class SitemapRequestList implements IRequestList {
*
* Resolves once all the sitemaps URLs have been fully loaded (sets `isSitemapFullyLoaded` to `true`).
*/
private async load(): Promise<void> {
private async load({
parseSitemapOptions,
}: { parseSitemapOptions?: SitemapRequestListOptions['parseSitemapOptions'] }): Promise<void> {
while (!this.isSitemapFullyLoaded() && !this.abortLoading) {
const sitemapUrl =
this.sitemapParsingProgress.inProgressSitemapUrl ??
this.sitemapParsingProgress.pendingSitemapUrls.values().next().value;

try {
for await (const item of parseSitemap([{ type: 'url', url: sitemapUrl }], this.proxyUrl, {
...parseSitemapOptions,
maxDepth: 0,
emitNestedSitemaps: true,
})) {
Expand Down Expand Up @@ -253,9 +275,12 @@ export class SitemapRequestList implements IRequestList {
* Track the loading progress using the `isSitemapFullyLoaded` property.
*/
static async open(options: SitemapRequestListOptions): Promise<SitemapRequestList> {
const requestList = new SitemapRequestList(options);
const requestList = new SitemapRequestList({
...options,
persistStateKey: options.persistStateKey ?? STATE_PERSISTENCE_KEY,
});
await requestList.restoreState();
void requestList.load();
void requestList.load({ parseSitemapOptions: options.parseSitemapOptions });

options?.signal?.addEventListener('abort', () => {
requestList.abortLoading = true;
Expand Down Expand Up @@ -334,6 +359,7 @@ export class SitemapRequestList implements IRequestList {
reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored
requestData: Array.from(this.requestData.entries()),
abortLoading: this.abortLoading,
closed: this.closed,
} satisfies SitemapRequestListState);
}

Expand Down Expand Up @@ -365,6 +391,7 @@ export class SitemapRequestList implements IRequestList {
}

this.abortLoading = state.abortLoading;
this.closed = state.closed;
}

/**
Expand Down Expand Up @@ -392,7 +419,7 @@ export class SitemapRequestList implements IRequestList {
* @inheritDoc
*/
async *[Symbol.asyncIterator]() {
while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) {
while (!(await this.isFinished())) {
const request = await this.fetchNextRequest();
if (!request) break;

Expand All @@ -409,6 +436,19 @@ export class SitemapRequestList implements IRequestList {
this.inProgress.delete(request.url);
}

/**
* Aborts the internal sitemap loading, stops the processing of the sitemap contents and drops all the pending URLs.
*
* Calling `fetchNextRequest()` after this method will always return `null`.
*/
async teardown(): Promise<void> {
this.closed = true;
this.abortLoading = true;
await this.persistState();

this.urlQueueStream.emit('readdata'); // unblocks the potentially waiting `pushNextUrl` call
}

/**
* @inheritDoc
*/
Expand Down
126 changes: 86 additions & 40 deletions packages/utils/src/internals/sitemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { StringDecoder } from 'node:string_decoder';
import { createGunzip } from 'node:zlib';

import log from '@apify/log';
// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types
import type { Delays } from 'got-scraping';
import sax from 'sax';
import MIMEType from 'whatwg-mimetype';

Expand Down Expand Up @@ -166,15 +168,23 @@ class SitemapXmlParser extends Transform {
}
}

interface ParseSitemapOptions {
export interface ParseSitemapOptions {
/**
* If set to `true`, elements referring to other sitemaps will be emitted as special objects with a `bouba` property.
* If set to `true`, elements referring to other sitemaps will be emitted as special objects with `originSitemapUrl` set to `null`.
*/
emitNestedSitemaps?: true | false;
/**
* Maximum depth of nested sitemaps to follow.
*/
maxDepth?: number;
/**
* Number of retries for fetching sitemaps. The counter resets for each nested sitemap.
*/
sitemapRetries?: number;
/**
* Network timeouts for sitemap fetching. See [Got documentation](https://github.com/sindresorhus/got/blob/main/documentation/6-timeout.md) for more details.
*/
networkTimeouts?: Delays;
}

export async function* parseSitemap<T extends ParseSitemapOptions>(
Expand All @@ -184,6 +194,7 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
): AsyncIterable<T['emitNestedSitemaps'] extends true ? SitemapUrl | NestedSitemap : SitemapUrl> {
const { gotScraping } = await import('got-scraping');
const { fileTypeStream } = await import('file-type');
const { emitNestedSitemaps = false, maxDepth = Infinity, sitemapRetries = 3, networkTimeouts } = options ?? {};

const sources = [...initialSources];
const visitedSitemapUrls = new Set<string>();
Expand Down Expand Up @@ -211,9 +222,9 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
while (sources.length > 0) {
const source = sources.shift()!;

if ((source?.depth ?? 0) > (options?.maxDepth ?? Infinity)) {
if ((source?.depth ?? 0) > maxDepth) {
log.debug(
`Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${options!.maxDepth!}.`,
`Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${maxDepth}.`,
);
continue;
}
Expand All @@ -223,49 +234,75 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
if (source.type === 'url') {
const sitemapUrl = new URL(source.url);
visitedSitemapUrls.add(sitemapUrl.toString());
let retriesLeft = sitemapRetries + 1;

while (retriesLeft-- > 0) {
try {
const sitemapStream = await new Promise<ReturnType<typeof gotScraping.stream>>(
(resolve, reject) => {
const request = gotScraping.stream({
url: sitemapUrl,
proxyUrl,
method: 'GET',
timeout: networkTimeouts,
headers: {
'accept': 'text/plain, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8',
},
});
request.on('response', () => resolve(request));
request.on('error', reject);
},
);

try {
const sitemapStream = await new Promise<ReturnType<typeof gotScraping.stream>>((resolve, reject) => {
const request = gotScraping.stream({ url: sitemapUrl, proxyUrl, method: 'GET' });
request.on('response', () => resolve(request));
request.on('error', reject);
});
let error: Error | null = null;

if (sitemapStream.response!.statusCode === 200) {
let contentType = sitemapStream.response!.headers['content-type'];
if (sitemapStream.response!.statusCode >= 200 && sitemapStream.response!.statusCode < 300) {
let contentType = sitemapStream.response!.headers['content-type'];

const streamWithType = await fileTypeStream(sitemapStream);
if (streamWithType.fileType !== undefined) {
contentType = streamWithType.fileType.mime;
}
const streamWithType = await fileTypeStream(sitemapStream);
if (streamWithType.fileType !== undefined) {
contentType = streamWithType.fileType.mime;
}

let isGzipped = false;
let isGzipped = false;

if (
contentType !== undefined
? contentType === 'application/gzip'
: sitemapUrl.pathname.endsWith('.gz')
) {
isGzipped = true;
if (
contentType !== undefined
? contentType === 'application/gzip'
: sitemapUrl.pathname.endsWith('.gz')
) {
isGzipped = true;

if (sitemapUrl.pathname.endsWith('.gz')) {
sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3);
if (sitemapUrl.pathname.endsWith('.gz')) {
sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3);
}
}

items = pipeline(
streamWithType,
isGzipped ? createGunzip() : new PassThrough(),
createParser(contentType, sitemapUrl),
(e) => {
if (e !== undefined) {
error = e;
}
},
);
} else {
error = new Error(
`Failed to fetch sitemap: ${sitemapUrl}, status code: ${sitemapStream.response!.statusCode}`,
);
}

items = pipeline(
streamWithType,
isGzipped ? createGunzip() : new PassThrough(),
createParser(contentType, sitemapUrl),
(error) => {
if (error !== undefined) {
log.warning(`Malformed sitemap content: ${sitemapUrl}, ${error}`);
}
},
if (error !== null) {
throw error;
}
break;
} catch (e) {
log.warning(
`Malformed sitemap content: ${sitemapUrl}, ${retriesLeft === 0 ? 'no retries left.' : 'retrying...'} (${e})`,
);
}
} catch (e) {
log.warning(`Malformed sitemap content: ${sitemapUrl}, ${e}`);
}
} else if (source.type === 'raw') {
items = pipeline(Readable.from([source.content]), createParser('text/xml'), (error) => {
Expand All @@ -282,7 +319,7 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
for await (const item of items) {
if (item.type === 'sitemapUrl' && !visitedSitemapUrls.has(item.url)) {
sources.push({ type: 'url', url: item.url, depth: (source.depth ?? 0) + 1 });
if (options?.emitNestedSitemaps) {
if (emitNestedSitemaps) {
// @ts-ignore
yield { loc: item.url, originSitemapUrl: null };
}
Expand Down Expand Up @@ -342,10 +379,15 @@ export class Sitemap {
* @param urls sitemap URL(s)
* @param proxyUrl URL of a proxy to be used for fetching sitemap contents
*/
static async load(urls: string | string[], proxyUrl?: string): Promise<Sitemap> {
static async load(
urls: string | string[],
proxyUrl?: string,
parseSitemapOptions?: ParseSitemapOptions,
): Promise<Sitemap> {
return await this.parse(
(Array.isArray(urls) ? urls : [urls]).map((url) => ({ type: 'url', url })),
proxyUrl,
parseSitemapOptions,
);
}

Expand All @@ -358,11 +400,15 @@ export class Sitemap {
return await this.parse([{ type: 'raw', content }], proxyUrl);
}

protected static async parse(sources: SitemapSource[], proxyUrl?: string): Promise<Sitemap> {
protected static async parse(
sources: SitemapSource[],
proxyUrl?: string,
parseSitemapOptions?: ParseSitemapOptions,
): Promise<Sitemap> {
const urls: string[] = [];

try {
for await (const item of parseSitemap(sources, proxyUrl)) {
for await (const item of parseSitemap(sources, proxyUrl, parseSitemapOptions)) {
urls.push(item.loc);
}
} catch (e) {
Expand Down
Loading