diff --git a/packages/core/src/storages/sitemap_request_list.ts b/packages/core/src/storages/sitemap_request_list.ts index b7f85900fd80..c30642e9541f 100644 --- a/packages/core/src/storages/sitemap_request_list.ts +++ b/packages/core/src/storages/sitemap_request_list.ts @@ -1,7 +1,7 @@ import { Transform } from 'node:stream'; import defaultLog from '@apify/log'; -import { parseSitemap } from '@crawlee/utils'; +import { type ParseSitemapOptions, parseSitemap } from '@crawlee/utils'; import ow from 'ow'; import { KeyValueStore } from './key_value_store'; @@ -9,6 +9,9 @@ import type { IRequestList } from './request_list'; import { purgeDefaultStorages } from './utils'; import { Request } from '../request'; +/** @internal */ +const STATE_PERSISTENCE_KEY = 'SITEMAP_REQUEST_LIST_STATE'; + export interface SitemapRequestListOptions { /** * List of sitemap URLs to parse. @@ -37,6 +40,10 @@ export interface SitemapRequestListOptions { * @default 200 */ maxBufferSize?: number; + /** + * Advanced options for the underlying `parseSitemap` call. + */ + parseSitemapOptions?: Omit; } interface SitemapParsingProgress { @@ -50,6 +57,7 @@ interface SitemapRequestListState { reclaimed: string[]; sitemapParsingProgress: Record; abortLoading: boolean; + closed: boolean; requestData: [string, Request][]; } @@ -118,6 +126,8 @@ export class SitemapRequestList implements IRequestList { private store?: KeyValueStore; + private closed: boolean = false; + /** * Proxy URL to be used for sitemap loading. */ @@ -139,6 +149,7 @@ export class SitemapRequestList implements IRequestList { signal: ow.optional.any(), timeoutMillis: ow.optional.number, maxBufferSize: ow.optional.number, + parseSitemapOptions: ow.optional.object, }), ); @@ -161,6 +172,10 @@ export class SitemapRequestList implements IRequestList { */ private async pushNextUrl(url: string | null) { return new Promise((resolve) => { + if (this.closed) { + return resolve(); + } + if (!this.urlQueueStream.push(url)) { // This doesn't work with the 'drain' event (it's not emitted for some reason). this.urlQueueStream.once('readdata', () => { @@ -180,6 +195,10 @@ export class SitemapRequestList implements IRequestList { */ private async readNextUrl(): Promise { return new Promise((resolve) => { + if (this.closed) { + return resolve(null); + } + const result = this.urlQueueStream.read(); if (!result && !this.isSitemapFullyLoaded()) { @@ -211,7 +230,9 @@ export class SitemapRequestList implements IRequestList { * * Resolves once all the sitemaps URLs have been fully loaded (sets `isSitemapFullyLoaded` to `true`). */ - private async load(): Promise { + private async load({ + parseSitemapOptions, + }: { parseSitemapOptions?: SitemapRequestListOptions['parseSitemapOptions'] }): Promise { while (!this.isSitemapFullyLoaded() && !this.abortLoading) { const sitemapUrl = this.sitemapParsingProgress.inProgressSitemapUrl ?? @@ -219,6 +240,7 @@ export class SitemapRequestList implements IRequestList { try { for await (const item of parseSitemap([{ type: 'url', url: sitemapUrl }], this.proxyUrl, { + ...parseSitemapOptions, maxDepth: 0, emitNestedSitemaps: true, })) { @@ -253,9 +275,12 @@ export class SitemapRequestList implements IRequestList { * Track the loading progress using the `isSitemapFullyLoaded` property. */ static async open(options: SitemapRequestListOptions): Promise { - const requestList = new SitemapRequestList(options); + const requestList = new SitemapRequestList({ + ...options, + persistStateKey: options.persistStateKey ?? STATE_PERSISTENCE_KEY, + }); await requestList.restoreState(); - void requestList.load(); + void requestList.load({ parseSitemapOptions: options.parseSitemapOptions }); options?.signal?.addEventListener('abort', () => { requestList.abortLoading = true; @@ -334,6 +359,7 @@ export class SitemapRequestList implements IRequestList { reclaimed: [...this.inProgress, ...this.reclaimed], // In-progress and reclaimed requests will be both retried if state is restored requestData: Array.from(this.requestData.entries()), abortLoading: this.abortLoading, + closed: this.closed, } satisfies SitemapRequestListState); } @@ -365,6 +391,7 @@ export class SitemapRequestList implements IRequestList { } this.abortLoading = state.abortLoading; + this.closed = state.closed; } /** @@ -392,7 +419,7 @@ export class SitemapRequestList implements IRequestList { * @inheritDoc */ async *[Symbol.asyncIterator]() { - while ((!this.isSitemapFullyLoaded() && !this.abortLoading) || !(await this.isEmpty())) { + while (!(await this.isFinished())) { const request = await this.fetchNextRequest(); if (!request) break; @@ -409,6 +436,19 @@ export class SitemapRequestList implements IRequestList { this.inProgress.delete(request.url); } + /** + * Aborts the internal sitemap loading, stops the processing of the sitemap contents and drops all the pending URLs. + * + * Calling `fetchNextRequest()` after this method will always return `null`. + */ + async teardown(): Promise { + this.closed = true; + this.abortLoading = true; + await this.persistState(); + + this.urlQueueStream.emit('readdata'); // unblocks the potentially waiting `pushNextUrl` call + } + /** * @inheritDoc */ diff --git a/packages/utils/src/internals/sitemap.ts b/packages/utils/src/internals/sitemap.ts index 6a4ef544f351..7c3cc1249018 100644 --- a/packages/utils/src/internals/sitemap.ts +++ b/packages/utils/src/internals/sitemap.ts @@ -5,6 +5,8 @@ import { StringDecoder } from 'node:string_decoder'; import { createGunzip } from 'node:zlib'; import log from '@apify/log'; +// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types +import type { Delays } from 'got-scraping'; import sax from 'sax'; import MIMEType from 'whatwg-mimetype'; @@ -166,15 +168,23 @@ class SitemapXmlParser extends Transform { } } -interface ParseSitemapOptions { +export interface ParseSitemapOptions { /** - * If set to `true`, elements referring to other sitemaps will be emitted as special objects with a `bouba` property. + * If set to `true`, elements referring to other sitemaps will be emitted as special objects with `originSitemapUrl` set to `null`. */ emitNestedSitemaps?: true | false; /** * Maximum depth of nested sitemaps to follow. */ maxDepth?: number; + /** + * Number of retries for fetching sitemaps. The counter resets for each nested sitemap. + */ + sitemapRetries?: number; + /** + * Network timeouts for sitemap fetching. See [Got documentation](https://github.com/sindresorhus/got/blob/main/documentation/6-timeout.md) for more details. + */ + networkTimeouts?: Delays; } export async function* parseSitemap( @@ -184,6 +194,7 @@ export async function* parseSitemap( ): AsyncIterable { const { gotScraping } = await import('got-scraping'); const { fileTypeStream } = await import('file-type'); + const { emitNestedSitemaps = false, maxDepth = Infinity, sitemapRetries = 3, networkTimeouts } = options ?? {}; const sources = [...initialSources]; const visitedSitemapUrls = new Set(); @@ -211,9 +222,9 @@ export async function* parseSitemap( while (sources.length > 0) { const source = sources.shift()!; - if ((source?.depth ?? 0) > (options?.maxDepth ?? Infinity)) { + if ((source?.depth ?? 0) > maxDepth) { log.debug( - `Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${options!.maxDepth!}.`, + `Skipping sitemap ${source.type === 'url' ? source.url : ''} because it reached max depth ${maxDepth}.`, ); continue; } @@ -223,49 +234,75 @@ export async function* parseSitemap( if (source.type === 'url') { const sitemapUrl = new URL(source.url); visitedSitemapUrls.add(sitemapUrl.toString()); + let retriesLeft = sitemapRetries + 1; + + while (retriesLeft-- > 0) { + try { + const sitemapStream = await new Promise>( + (resolve, reject) => { + const request = gotScraping.stream({ + url: sitemapUrl, + proxyUrl, + method: 'GET', + timeout: networkTimeouts, + headers: { + 'accept': 'text/plain, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8', + }, + }); + request.on('response', () => resolve(request)); + request.on('error', reject); + }, + ); - try { - const sitemapStream = await new Promise>((resolve, reject) => { - const request = gotScraping.stream({ url: sitemapUrl, proxyUrl, method: 'GET' }); - request.on('response', () => resolve(request)); - request.on('error', reject); - }); + let error: Error | null = null; - if (sitemapStream.response!.statusCode === 200) { - let contentType = sitemapStream.response!.headers['content-type']; + if (sitemapStream.response!.statusCode >= 200 && sitemapStream.response!.statusCode < 300) { + let contentType = sitemapStream.response!.headers['content-type']; - const streamWithType = await fileTypeStream(sitemapStream); - if (streamWithType.fileType !== undefined) { - contentType = streamWithType.fileType.mime; - } + const streamWithType = await fileTypeStream(sitemapStream); + if (streamWithType.fileType !== undefined) { + contentType = streamWithType.fileType.mime; + } - let isGzipped = false; + let isGzipped = false; - if ( - contentType !== undefined - ? contentType === 'application/gzip' - : sitemapUrl.pathname.endsWith('.gz') - ) { - isGzipped = true; + if ( + contentType !== undefined + ? contentType === 'application/gzip' + : sitemapUrl.pathname.endsWith('.gz') + ) { + isGzipped = true; - if (sitemapUrl.pathname.endsWith('.gz')) { - sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3); + if (sitemapUrl.pathname.endsWith('.gz')) { + sitemapUrl.pathname = sitemapUrl.pathname.substring(0, sitemapUrl.pathname.length - 3); + } } + + items = pipeline( + streamWithType, + isGzipped ? createGunzip() : new PassThrough(), + createParser(contentType, sitemapUrl), + (e) => { + if (e !== undefined) { + error = e; + } + }, + ); + } else { + error = new Error( + `Failed to fetch sitemap: ${sitemapUrl}, status code: ${sitemapStream.response!.statusCode}`, + ); } - items = pipeline( - streamWithType, - isGzipped ? createGunzip() : new PassThrough(), - createParser(contentType, sitemapUrl), - (error) => { - if (error !== undefined) { - log.warning(`Malformed sitemap content: ${sitemapUrl}, ${error}`); - } - }, + if (error !== null) { + throw error; + } + break; + } catch (e) { + log.warning( + `Malformed sitemap content: ${sitemapUrl}, ${retriesLeft === 0 ? 'no retries left.' : 'retrying...'} (${e})`, ); } - } catch (e) { - log.warning(`Malformed sitemap content: ${sitemapUrl}, ${e}`); } } else if (source.type === 'raw') { items = pipeline(Readable.from([source.content]), createParser('text/xml'), (error) => { @@ -282,7 +319,7 @@ export async function* parseSitemap( for await (const item of items) { if (item.type === 'sitemapUrl' && !visitedSitemapUrls.has(item.url)) { sources.push({ type: 'url', url: item.url, depth: (source.depth ?? 0) + 1 }); - if (options?.emitNestedSitemaps) { + if (emitNestedSitemaps) { // @ts-ignore yield { loc: item.url, originSitemapUrl: null }; } @@ -342,10 +379,15 @@ export class Sitemap { * @param urls sitemap URL(s) * @param proxyUrl URL of a proxy to be used for fetching sitemap contents */ - static async load(urls: string | string[], proxyUrl?: string): Promise { + static async load( + urls: string | string[], + proxyUrl?: string, + parseSitemapOptions?: ParseSitemapOptions, + ): Promise { return await this.parse( (Array.isArray(urls) ? urls : [urls]).map((url) => ({ type: 'url', url })), proxyUrl, + parseSitemapOptions, ); } @@ -358,11 +400,15 @@ export class Sitemap { return await this.parse([{ type: 'raw', content }], proxyUrl); } - protected static async parse(sources: SitemapSource[], proxyUrl?: string): Promise { + protected static async parse( + sources: SitemapSource[], + proxyUrl?: string, + parseSitemapOptions?: ParseSitemapOptions, + ): Promise { const urls: string[] = []; try { - for await (const item of parseSitemap(sources, proxyUrl)) { + for await (const item of parseSitemap(sources, proxyUrl, parseSitemapOptions)) { urls.push(item.loc); } } catch (e) { diff --git a/test/core/sitemap_request_list.test.ts b/test/core/sitemap_request_list.test.ts index 7f0a6d04829c..0a2b915a67ea 100644 --- a/test/core/sitemap_request_list.test.ts +++ b/test/core/sitemap_request_list.test.ts @@ -18,6 +18,40 @@ beforeAll(async () => { server = await startExpressAppPromise(app, 0); url = `http://localhost:${(server.address() as AddressInfo).port}`; + let attemptCount = 0; + + app.get('/sitemap-unreliable.xml', async (req, res) => { + attemptCount += 1; + if (attemptCount % 2 === 1) { + res.status(500).end(); + return; + } + + res.setHeader('content-type', 'text/xml'); + res.write( + [ + '', + '', + '', + 'http://not-exists.com/', + '', + '', + 'http://not-exists.com/catalog?item=12&desc=vacation_hawaii', + '', + '', + 'http://not-exists.com/catalog?item=73&desc=vacation_new_zealand', + '', + '', + 'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland', + '', + '', + 'http://not-exists.com/catalog?item=83&desc=vacation_usa', + '', + '', + ].join('\n'), + ); + res.end(); + }); app.get('/sitemap.xml', async (req, res) => { res.setHeader('content-type', 'text/xml'); @@ -76,6 +110,45 @@ beforeAll(async () => { res.end(); }); + app.get('/sitemap-unreliable-break-off.xml', async (req, res) => { + attemptCount += 1; + res.setHeader('content-type', 'text/xml'); + + res.write( + [ + '', + '', + '', + 'http://not-exists.com/', + '', + '', + 'http://not-exists.com/catalog?item=12&desc=vacation_hawaii', + '', + '', + 'http://not-exists.com/catalog?item=73&desc=vacation_new_zealand', + ].join('\n'), + ); + + if (attemptCount % 2 === 1) { + res.destroy(); + return; + } + + res.write( + [ + '', + '', + 'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland', + '', + '', + 'http://not-exists.com/catalog?item=83&desc=vacation_usa', + '', + '', + ].join('\n'), + ); + res.end(); + }); + app.get('/sitemap-stream-linger.xml', async (req, res) => { async function* stream() { yield [ @@ -128,7 +201,7 @@ afterAll(async () => { // Storage emulator for persistence const emulator = new MemoryStorageEmulator(); -beforeAll(async () => { +beforeEach(async () => { await emulator.init(); }); @@ -157,6 +230,54 @@ describe('SitemapRequestList', () => { expect(thirdRequest).not.toBe(null); }); + test('retry sitemap load on error', async () => { + const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-unreliable.xml`] }); + + for await (const request of list) { + await list.markRequestHandled(request); + } + + expect(list.handledCount()).toBe(5); + }); + + test('broken off sitemap load resurrects correctly and does not duplicate / lose requests', async () => { + const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-unreliable-break-off.xml`] }); + + const urls = new Set(); + + for await (const request of list) { + await list.markRequestHandled(request); + urls.add(request.url); + } + + expect(list.handledCount()).toBe(5); + expect(urls).toEqual( + new Set([ + 'http://not-exists.com/', + 'http://not-exists.com/catalog?item=12&desc=vacation_hawaii', + 'http://not-exists.com/catalog?item=73&desc=vacation_new_zealand', + 'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland', + 'http://not-exists.com/catalog?item=83&desc=vacation_usa', + ]), + ); + }); + + test('teardown works', async () => { + const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-index.xml`] }); + + for await (const request of list) { + await list.markRequestHandled(request); + + if (list.handledCount() >= 2) { + await list.teardown(); + } + } + + expect(list.handledCount()).toBe(2); + expect(list.isFinished()).resolves.toBe(true); + expect(list.fetchNextRequest()).resolves.toBe(null); + }); + test('draining the request list between sitemaps', async () => { const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-index.xml`] }); @@ -192,7 +313,7 @@ describe('SitemapRequestList', () => { expect(list.handledCount()).toBe(7); }); - test('for..await syntax works with requestIterator', async () => { + test('for..await syntax works with SitemapRequestList', async () => { const list = await SitemapRequestList.open({ sitemapUrls: [`${url}/sitemap-index.xml`] }); for await (const request of list) {