Skip to content

Commit

Permalink
feat: resilient sitemap loading (#2619)
Browse files Browse the repository at this point in the history
  • Loading branch information
barjin authored Aug 16, 2024
1 parent 2815de1 commit 1dd7660
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 47 deletions.
50 changes: 45 additions & 5 deletions packages/core/src/storages/sitemap_request_list.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import { Transform } from 'node:stream';

import defaultLog from '@apify/log';
import { parseSitemap } from '@crawlee/utils';
import { type ParseSitemapOptions, parseSitemap } from '@crawlee/utils';
import ow from 'ow';

import { KeyValueStore } from './key_value_store';
import type { IRequestList } from './request_list';
import { purgeDefaultStorages } from './utils';
import { Request } from '../request';

/** @internal */
const STATE_PERSISTENCE_KEY = 'SITEMAP_REQUEST_LIST_STATE';

export interface SitemapRequestListOptions {
/**
* List of sitemap URLs to parse.
Expand Down Expand Up @@ -37,6 +40,10 @@ export interface SitemapRequestListOptions {
* @default 200
*/
maxBufferSize?: number;
/**
* Advanced options for the underlying `parseSitemap` call.
*/
parseSitemapOptions?: Omit<ParseSitemapOptions, 'emitNestedSitemaps' | 'maxDepth'>;
}

interface SitemapParsingProgress {
Expand All @@ -50,6 +57,7 @@ interface SitemapRequestListState {
reclaimed: string[];
sitemapParsingProgress: Record<keyof SitemapParsingProgress, any>;
abortLoading: boolean;
closed: boolean;
requestData: [string, Request][];
}

Expand Down Expand Up @@ -118,6 +126,8 @@ export class SitemapRequestList implements IRequestList {

private store?: KeyValueStore;

private closed: boolean = false;

/**
* Proxy URL to be used for sitemap loading.
*/
Expand All @@ -139,6 +149,7 @@ export class SitemapRequestList implements IRequestList {
signal: ow.optional.any(),
timeoutMillis: ow.optional.number,
maxBufferSize: ow.optional.number,
parseSitemapOptions: ow.optional.object,
}),
);

Expand All @@ -161,6 +172,10 @@ export class SitemapRequestList implements IRequestList {
*/
private async pushNextUrl(url: string | null) {
return new Promise<void>((resolve) => {
if (this.closed) {
return resolve();
}

if (!this.urlQueueStream.push(url)) {
// This doesn't work with the 'drain' event (it's not emitted for some reason).
this.urlQueueStream.once('readdata', () => {
Expand All @@ -180,6 +195,10 @@ export class SitemapRequestList implements IRequestList {
*/
private async readNextUrl(): Promise<string | null> {
return new Promise((resolve) => {
if (this.closed) {
return resolve(null);
}

const result = this.urlQueueStream.read();

if (!result && !this.isSitemapFullyLoaded()) {
Expand Down Expand Up @@ -211,14 +230,17 @@ export class SitemapRequestList implements IRequestList {
*
* Resolves once all the sitemaps URLs have been fully loaded (sets `isSitemapFullyLoaded` to `true`).
*/
private async load(): Promise<void> {
private async load({
parseSitemapOptions,
}: { parseSitemapOptions?: SitemapRequestListOptions['parseSitemapOptions'] }): Promise<void> {
while (!this.isSitemapFullyLoaded() && !this.abortLoading) {
const sitemapUrl =
this.sitemapParsingProgress.inProgressSitemapUrl ??
this.sitemapParsingProgress.pendingSitemapUrls.values().next().value;

try {
for await (const item of parseSitemap([{ type: 'url', url: sitemapUrl }], this.proxyUrl, {
...parseSitemapOptions,
maxDepth: 0,
emitNestedSitemaps: true,
})) {
Expand Down Expand Up @@ -253,9 +275,12 @@ export class SitemapRequestList implements IRequestList {
* Track the loading progress using the `isSitemapFullyLoaded` property.
*/
static async open(options: SitemapRequestListOptions): Promise<SitemapRequestList> {
const requestList = new SitemapRequestList(options);
const requestList = new SitemapRequestList({
...options,
persistStateKey: options.persistStateKey ?? STATE_PERSISTENCE_KEY,
});
await requestList.restoreState();
void requestList.load();
void requestList.load({ parseSitemapOptions: options.parseSitemapOptions });

options?.signal?.addEventListener('abort', () => {
requestList.abortLoading = true;
Expand Down Expand Up @@ -334,6 +359,7 @@ export class SitemapRequestList implements IRequestList {
reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored
requestData: Array.from(this.requestData.entries()),
abortLoading: this.abortLoading,
closed: this.closed,
} satisfies SitemapRequestListState);
}

Expand Down Expand Up @@ -365,6 +391,7 @@ export class SitemapRequestList implements IRequestList {
}

this.abortLoading = state.abortLoading;
this.closed = state.closed;
}

/**
Expand Down Expand Up @@ -392,7 +419,7 @@ export class SitemapRequestList implements IRequestList {
* @inheritDoc
*/
async *[Symbol.asyncIterator]() {
while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) {
while (!(await this.isFinished())) {
const request = await this.fetchNextRequest();
if (!request) break;

Expand All @@ -409,6 +436,19 @@ export class SitemapRequestList implements IRequestList {
this.inProgress.delete(request.url);
}

/**
* Aborts the internal sitemap loading, stops the processing of the sitemap contents and drops all the pending URLs.
*
* Calling `fetchNextRequest()` after this method will always return `null`.
*/
async teardown(): Promise<void> {
this.closed = true;
this.abortLoading = true;
await this.persistState();

this.urlQueueStream.emit('readdata'); // unblocks the potentially waiting `pushNextUrl` call
}

/**
* @inheritDoc
*/
Expand Down
126 changes: 86 additions & 40 deletions packages/utils/src/internals/sitemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { StringDecoder } from 'node:string_decoder';
import { createGunzip } from 'node:zlib';

import log from '@apify/log';
// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types
import type { Delays } from 'got-scraping';
import sax from 'sax';
import MIMEType from 'whatwg-mimetype';

Expand Down Expand Up @@ -166,15 +168,23 @@ class SitemapXmlParser extends Transform {
}
}

interface ParseSitemapOptions {
export interface ParseSitemapOptions {
/**
* If set to `true`, elements referring to other sitemaps will be emitted as special objects with a `bouba` property.
* If set to `true`, elements referring to other sitemaps will be emitted as special objects with `originSitemapUrl` set to `null`.
*/
emitNestedSitemaps?: true | false;
/**
* Maximum depth of nested sitemaps to follow.
*/
maxDepth?: number;
/**
* Number of retries for fetching sitemaps. The counter resets for each nested sitemap.
*/
sitemapRetries?: number;
/**
* Network timeouts for sitemap fetching. See [Got documentation](https://github.com/sindresorhus/got/blob/main/documentation/6-timeout.md) for more details.
*/
networkTimeouts?: Delays;
}

export async function* parseSitemap<T extends ParseSitemapOptions>(
Expand All @@ -184,6 +194,7 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
): AsyncIterable<T['emitNestedSitemaps'] extends true ? SitemapUrl | NestedSitemap : SitemapUrl> {
const { gotScraping } = await import('got-scraping');
const { fileTypeStream } = await import('file-type');
const { emitNestedSitemaps = false, maxDepth = Infinity, sitemapRetries = 3, networkTimeouts } = options ?? {};

const sources = [...initialSources];
const visitedSitemapUrls = new Set<string>();
Expand Down Expand Up @@ -211,9 +222,9 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
while (sources.length > 0) {
const source = sources.shift()!;

if ((source?.depth ?? 0) > (options?.maxDepth ?? Infinity)) {
if ((source?.depth ?? 0) > maxDepth) {
log.debug(
`Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${options!.maxDepth!}.`,
`Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${maxDepth}.`,
);
continue;
}
Expand All @@ -223,49 +234,75 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
if (source.type === 'url') {
const sitemapUrl = new URL(source.url);
visitedSitemapUrls.add(sitemapUrl.toString());
let retriesLeft = sitemapRetries + 1;

while (retriesLeft-- > 0) {
try {
const sitemapStream = await new Promise<ReturnType<typeof gotScraping.stream>>(
(resolve, reject) => {
const request = gotScraping.stream({
url: sitemapUrl,
proxyUrl,
method: 'GET',
timeout: networkTimeouts,
headers: {
'accept': 'text/plain, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8',
},
});
request.on('response', () => resolve(request));
request.on('error', reject);
},
);

try {
const sitemapStream = await new Promise<ReturnType<typeof gotScraping.stream>>((resolve, reject) => {
const request = gotScraping.stream({ url: sitemapUrl, proxyUrl, method: 'GET' });
request.on('response', () => resolve(request));
request.on('error', reject);
});
let error: Error | null = null;

if (sitemapStream.response!.statusCode === 200) {
let contentType = sitemapStream.response!.headers['content-type'];
if (sitemapStream.response!.statusCode >= 200 && sitemapStream.response!.statusCode < 300) {
let contentType = sitemapStream.response!.headers['content-type'];

const streamWithType = await fileTypeStream(sitemapStream);
if (streamWithType.fileType !== undefined) {
contentType = streamWithType.fileType.mime;
}
const streamWithType = await fileTypeStream(sitemapStream);
if (streamWithType.fileType !== undefined) {
contentType = streamWithType.fileType.mime;
}

let isGzipped = false;
let isGzipped = false;

if (
contentType !== undefined
? contentType === 'application/gzip'
: sitemapUrl.pathname.endsWith('.gz')
) {
isGzipped = true;
if (
contentType !== undefined
? contentType === 'application/gzip'
: sitemapUrl.pathname.endsWith('.gz')
) {
isGzipped = true;

if (sitemapUrl.pathname.endsWith('.gz')) {
sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3);
if (sitemapUrl.pathname.endsWith('.gz')) {
sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3);
}
}

items = pipeline(
streamWithType,
isGzipped ? createGunzip() : new PassThrough(),
createParser(contentType, sitemapUrl),
(e) => {
if (e !== undefined) {
error = e;
}
},
);
} else {
error = new Error(
`Failed to fetch sitemap: ${sitemapUrl}, status code: ${sitemapStream.response!.statusCode}`,
);
}

items = pipeline(
streamWithType,
isGzipped ? createGunzip() : new PassThrough(),
createParser(contentType, sitemapUrl),
(error) => {
if (error !== undefined) {
log.warning(`Malformed sitemap content: ${sitemapUrl}, ${error}`);
}
},
if (error !== null) {
throw error;
}
break;
} catch (e) {
log.warning(
`Malformed sitemap content: ${sitemapUrl}, ${retriesLeft === 0 ? 'no retries left.' : 'retrying...'} (${e})`,
);
}
} catch (e) {
log.warning(`Malformed sitemap content: ${sitemapUrl}, ${e}`);
}
} else if (source.type === 'raw') {
items = pipeline(Readable.from([source.content]), createParser('text/xml'), (error) => {
Expand All @@ -282,7 +319,7 @@ export async function* parseSitemap<T extends ParseSitemapOptions>(
for await (const item of items) {
if (item.type === 'sitemapUrl' && !visitedSitemapUrls.has(item.url)) {
sources.push({ type: 'url', url: item.url, depth: (source.depth ?? 0) + 1 });
if (options?.emitNestedSitemaps) {
if (emitNestedSitemaps) {
// @ts-ignore
yield { loc: item.url, originSitemapUrl: null };
}
Expand Down Expand Up @@ -342,10 +379,15 @@ export class Sitemap {
* @param urls sitemap URL(s)
* @param proxyUrl URL of a proxy to be used for fetching sitemap contents
*/
static async load(urls: string | string[], proxyUrl?: string): Promise<Sitemap> {
static async load(
urls: string | string[],
proxyUrl?: string,
parseSitemapOptions?: ParseSitemapOptions,
): Promise<Sitemap> {
return await this.parse(
(Array.isArray(urls) ? urls : [urls]).map((url) => ({ type: 'url', url })),
proxyUrl,
parseSitemapOptions,
);
}

Expand All @@ -358,11 +400,15 @@ export class Sitemap {
return await this.parse([{ type: 'raw', content }], proxyUrl);
}

protected static async parse(sources: SitemapSource[], proxyUrl?: string): Promise<Sitemap> {
protected static async parse(
sources: SitemapSource[],
proxyUrl?: string,
parseSitemapOptions?: ParseSitemapOptions,
): Promise<Sitemap> {
const urls: string[] = [];

try {
for await (const item of parseSitemap(sources, proxyUrl)) {
for await (const item of parseSitemap(sources, proxyUrl, parseSitemapOptions)) {
urls.push(item.loc);
}
} catch (e) {
Expand Down
Loading

0 comments on commit 1dd7660

Please sign in to comment.