From f7cbf9645be90f101677692c4e37be8903da336f Mon Sep 17 00:00:00 2001 From: Ilya Kreymer <ikreymer@users.noreply.github.com> Date: Sat, 25 Jan 2025 22:55:49 -0800 Subject: [PATCH] Retry support and additional fixes (#743) - retries: for failed pages, set retry to 5 in cases multiple retries may be needed. - redirect: if page url is /path/ -> /path, don't add as extra seed - proxy: don't use global dispatcher, pass dispatcher explicitly when using proxy, as proxy may interfere with local network requests - final exit flag: if crawl is done and also interrupted, ensure WACZ is still written/uploaded by setting final exit to true - hashtag only change force reload: if loading page with same URL but different hashtag, eg. `https://example.com/#B` after `https://example.com/#A`, do a full reload --- package.json | 2 +- src/crawler.ts | 74 +++++++++++++++++++++++---------- src/util/blockrules.ts | 4 +- src/util/constants.ts | 1 + src/util/file_reader.ts | 3 +- src/util/originoverride.ts | 6 ++- src/util/proxy.ts | 10 ++++- src/util/recorder.ts | 23 +++++++---- src/util/sitemapper.ts | 6 ++- src/util/state.ts | 85 ++++++++++++++++++++++---------------- tests/file_stats.test.js | 1 + tests/retry-failed.test.js | 71 +++++++++++++++++++++++++++++++ 12 files changed, 212 insertions(+), 74 deletions(-) create mode 100644 tests/retry-failed.test.js diff --git a/package.json b/package.json index 8f71b907..edcca228 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "browsertrix-crawler", - "version": "1.5.0-beta.2", + "version": "1.5.0-beta.3", "main": "browsertrix-crawler", "type": "module", "repository": "https://github.com/webrecorder/browsertrix-crawler", diff --git a/src/crawler.ts b/src/crawler.ts index b1f12a08..10f1f81f 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -46,6 +46,7 @@ import { ExtractSelector, PAGE_OP_TIMEOUT_SECS, SITEMAP_INITIAL_FETCH_TIMEOUT_SECS, + MAX_RETRY_FAILED, } from "./util/constants.js"; import { AdBlockRules, BlockRuleDecl, BlockRules } from "./util/blockrules.js"; @@ -1152,13 +1153,13 @@ self.__bx_behaviors.selectMainBehavior(); } async pageFinished(data: PageState) { - await this.writePage(data); - // if page loaded, considered page finished successfully // (even if behaviors timed out) - const { loadState, logDetails, depth, url } = data; + const { loadState, logDetails, depth, url, retry } = data; if (data.loadState >= LoadState.FULL_PAGE_LOADED) { + await this.writePage(data); + logger.info("Page Finished", { loadState, ...logDetails }, "pageStatus"); await this.crawlState.markFinished(url); @@ -1171,6 +1172,9 @@ self.__bx_behaviors.selectMainBehavior(); await this.checkLimits(); } else { + if (retry >= MAX_RETRY_FAILED) { + await this.writePage(data); + } await this.crawlState.markFailed(url); if (this.healthChecker) { @@ -1370,7 +1374,7 @@ self.__bx_behaviors.selectMainBehavior(); } if (this.params.failOnFailedLimit) { - const numFailed = await this.crawlState.numFailed(); + const numFailed = await this.crawlState.numFailedWillRetry(); const failedLimit = this.params.failOnFailedLimit; if (numFailed >= failedLimit) { logger.fatal( @@ -1498,6 +1502,7 @@ self.__bx_behaviors.selectMainBehavior(); logger.info("crawl already finished, running post-crawl tasks", { state: initState, }); + this.finalExit = true; await this.postCrawl(); return; } else if (await this.crawlState.isCrawlStopped()) { @@ -1581,8 +1586,11 @@ self.__bx_behaviors.selectMainBehavior(); await this.writeStats(); - // if crawl has been stopped, mark as final exit for post-crawl tasks - if (await this.crawlState.isCrawlStopped()) { + // if crawl has been stopped or finished, mark as final exit for post-crawl tasks + if ( + (await this.crawlState.isCrawlStopped()) || + (await this.crawlState.isFinished()) + ) { this.finalExit = true; } @@ -1822,16 +1830,19 @@ self.__bx_behaviors.selectMainBehavior(); const realSize = await this.crawlState.queueSize(); const pendingPages = await this.crawlState.getPendingList(); - const done = await this.crawlState.numDone(); - const failed = await this.crawlState.numFailed(); - const total = realSize + pendingPages.length + done; + const pending = pendingPages.length; + const crawled = await this.crawlState.numDone(); + const failedWillRetry = await this.crawlState.numFailedWillRetry(); + const failed = await this.crawlState.numFailedNoRetry(); + const total = realSize + pendingPages.length + crawled; const limit = { max: this.pageLimit || 0, hit: this.limitHit }; const stats = { - crawled: done, - total: total, - pending: pendingPages.length, - failed: failed, - limit: limit, + crawled, + total, + pending, + failedWillRetry, + failed, + limit, pendingPages, }; @@ -1885,12 +1896,14 @@ self.__bx_behaviors.selectMainBehavior(); } }; - page.on("response", waitFirstResponse); + const handleFirstLoadEvents = () => { + page.on("response", waitFirstResponse); - // store that domcontentloaded was finished - page.once("domcontentloaded", () => { - data.loadState = LoadState.CONTENT_LOADED; - }); + // store that domcontentloaded was finished + page.once("domcontentloaded", () => { + data.loadState = LoadState.CONTENT_LOADED; + }); + }; const gotoOpts = data.isHTMLPage ? this.gotoOpts @@ -1898,9 +1911,24 @@ self.__bx_behaviors.selectMainBehavior(); logger.info("Awaiting page load", logDetails); + const urlNoHash = url.split("#")[0]; + + const fullRefresh = urlNoHash === page.url().split("#")[0]; + try { + if (!fullRefresh) { + handleFirstLoadEvents(); + } // store the page load response when page fully loads fullLoadedResponse = await page.goto(url, gotoOpts); + + if (fullRefresh) { + logger.debug("Hashtag-only change, doing full page reload"); + + handleFirstLoadEvents(); + + fullLoadedResponse = await page.reload(gotoOpts); + } } catch (e) { if (!(e instanceof Error)) { throw e; @@ -1921,7 +1949,7 @@ self.__bx_behaviors.selectMainBehavior(); } else if (!downloadResponse) { // log if not already log and rethrow, consider page failed if (msg !== "logged") { - logger.error("Page Load Failed, skipping page", { + logger.error("Page Load Failed, will retry", { msg, loadState: data.loadState, ...logDetails, @@ -1944,7 +1972,8 @@ self.__bx_behaviors.selectMainBehavior(); if ( depth === 0 && !isChromeError && - respUrl !== url.split("#")[0] && + respUrl !== urlNoHash && + respUrl + "/" !== url && !downloadResponse ) { data.seedId = await this.crawlState.addExtraSeed( @@ -2652,8 +2681,9 @@ self.__bx_behaviors.selectMainBehavior(); if (this.origConfig) { this.origConfig.state = state; } - const res = yaml.dump(this.origConfig, { lineWidth: -1 }); + try { + const res = yaml.dump(this.origConfig, { lineWidth: -1 }); logger.info(`Saving crawl state to: ${filename}`); await fsp.writeFile(filename, res); } catch (e) { diff --git a/src/util/blockrules.ts b/src/util/blockrules.ts index 5d3238fb..0e7fb511 100644 --- a/src/util/blockrules.ts +++ b/src/util/blockrules.ts @@ -5,6 +5,7 @@ import { HTTPRequest, Page } from "puppeteer-core"; import { Browser } from "./browser.js"; import { fetch } from "undici"; +import { getProxyDispatcher } from "./proxy.js"; const RULE_TYPES = ["block", "allowOnly"]; @@ -271,7 +272,7 @@ export class BlockRules { logDetails: Record<string, any>, ) { try { - const res = await fetch(reqUrl); + const res = await fetch(reqUrl, { dispatcher: getProxyDispatcher() }); const text = await res.text(); return !!text.match(frameTextMatch); @@ -302,6 +303,7 @@ export class BlockRules { method: "PUT", headers: { "Content-Type": "text/html" }, body, + dispatcher: getProxyDispatcher(), }); } } diff --git a/src/util/constants.ts b/src/util/constants.ts index 72506c6c..0c5d6faf 100644 --- a/src/util/constants.ts +++ b/src/util/constants.ts @@ -27,6 +27,7 @@ export const ADD_LINK_FUNC = "__bx_addLink"; export const FETCH_FUNC = "__bx_fetch"; export const MAX_DEPTH = 1000000; +export const MAX_RETRY_FAILED = 5; export const FETCH_HEADERS_TIMEOUT_SECS = 30; export const PAGE_OP_TIMEOUT_SECS = 5; diff --git a/src/util/file_reader.ts b/src/util/file_reader.ts index 7eea4162..284f0dd8 100644 --- a/src/util/file_reader.ts +++ b/src/util/file_reader.ts @@ -6,6 +6,7 @@ import util from "util"; import { exec as execCallback } from "child_process"; import { logger } from "./logger.js"; +import { getProxyDispatcher } from "./proxy.js"; const exec = util.promisify(execCallback); @@ -85,7 +86,7 @@ async function collectOnlineBehavior(url: string): Promise<FileSources> { const behaviorFilepath = `/app/behaviors/${filename}`; try { - const res = await fetch(url); + const res = await fetch(url, { dispatcher: getProxyDispatcher() }); const fileContents = await res.text(); await fsp.writeFile(behaviorFilepath, fileContents); logger.info( diff --git a/src/util/originoverride.ts b/src/util/originoverride.ts index a00a2b54..1b2b8c41 100644 --- a/src/util/originoverride.ts +++ b/src/util/originoverride.ts @@ -3,6 +3,7 @@ import { formatErr, logger } from "./logger.js"; import { Browser } from "./browser.js"; import { fetch } from "undici"; +import { getProxyDispatcher } from "./proxy.js"; export class OriginOverride { originOverride: { origUrl: URL; destUrl: URL }[]; @@ -45,7 +46,10 @@ export class OriginOverride { headers.set("origin", orig.origin); } - const resp = await fetch(newUrl, { headers }); + const resp = await fetch(newUrl, { + headers, + dispatcher: getProxyDispatcher(), + }); const body = Buffer.from(await resp.arrayBuffer()); const respHeaders = Object.fromEntries(resp.headers); diff --git a/src/util/proxy.ts b/src/util/proxy.ts index cf6a3437..5b15d6e2 100644 --- a/src/util/proxy.ts +++ b/src/util/proxy.ts @@ -1,5 +1,5 @@ import net from "net"; -import { Agent, Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici"; +import { Agent, Dispatcher, ProxyAgent } from "undici"; import child_process from "child_process"; @@ -13,6 +13,8 @@ const SSH_PROXY_LOCAL_PORT = 9722; const SSH_WAIT_TIMEOUT = 30000; +let proxyDispatcher: Dispatcher | undefined = undefined; + export function getEnvProxyUrl() { if (process.env.PROXY_SERVER) { return process.env.PROXY_SERVER; @@ -46,10 +48,14 @@ export async function initProxy( // set global fetch() dispatcher (with proxy, if any) const dispatcher = createDispatcher(proxy, agentOpts); - setGlobalDispatcher(dispatcher); + proxyDispatcher = dispatcher; return proxy; } +export function getProxyDispatcher() { + return proxyDispatcher; +} + export function createDispatcher( proxyUrl: string, opts: Agent.Options, diff --git a/src/util/recorder.ts b/src/util/recorder.ts index a10c8175..2e0005b6 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -8,7 +8,7 @@ import { isRedirectStatus, } from "./reqresp.js"; -import { fetch, getGlobalDispatcher, Response } from "undici"; +import { fetch, Response } from "undici"; import { getCustomRewriter, @@ -23,6 +23,7 @@ import { WARCWriter } from "./warcwriter.js"; import { RedisCrawlState, WorkerId } from "./state.js"; import { CDPSession, Protocol } from "puppeteer-core"; import { Crawler } from "../crawler.js"; +import { getProxyDispatcher } from "./proxy.js"; const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000; const MAX_TEXT_REWRITE_SIZE = 25_000_000; @@ -1588,14 +1589,18 @@ class AsyncFetcher { const headers = reqresp.getRequestHeadersDict(); - const dispatcher = getGlobalDispatcher().compose((dispatch) => { - return (opts, handler) => { - if (opts.headers) { - reqresp.requestHeaders = opts.headers as Record<string, string>; - } - return dispatch(opts, handler); - }; - }); + let dispatcher = getProxyDispatcher(); + + if (dispatcher) { + dispatcher = dispatcher.compose((dispatch) => { + return (opts, handler) => { + if (opts.headers) { + reqresp.requestHeaders = opts.headers as Record<string, string>; + } + return dispatch(opts, handler); + }; + }); + } const resp = await fetch(url!, { method, diff --git a/src/util/sitemapper.ts b/src/util/sitemapper.ts index a13eae16..3ffb40c7 100644 --- a/src/util/sitemapper.ts +++ b/src/util/sitemapper.ts @@ -10,6 +10,7 @@ import { DETECT_SITEMAP } from "./constants.js"; import { sleep } from "./timing.js"; import { fetch, Response } from "undici"; +import { getProxyDispatcher } from "./proxy.js"; const SITEMAP_CONCURRENCY = 5; @@ -65,7 +66,10 @@ export class SitemapReader extends EventEmitter { async _fetchWithRetry(url: string, message: string) { while (true) { - const resp = await fetch(url, { headers: this.headers }); + const resp = await fetch(url, { + headers: this.headers, + dispatcher: getProxyDispatcher(), + }); if (resp.ok) { return resp; diff --git a/src/util/state.ts b/src/util/state.ts index 9388e478..e5676f0b 100644 --- a/src/util/state.ts +++ b/src/util/state.ts @@ -3,7 +3,7 @@ import { v4 as uuidv4 } from "uuid"; import { logger } from "./logger.js"; -import { MAX_DEPTH } from "./constants.js"; +import { MAX_DEPTH, MAX_RETRY_FAILED } from "./constants.js"; import { ScopedSeed } from "./seeds.js"; import { Frame } from "puppeteer-core"; @@ -35,6 +35,7 @@ export type QueueEntry = { extraHops: number; ts?: number; pageid?: string; + retry?: number; }; // ============================================================================ @@ -54,6 +55,7 @@ export class PageState { seedId: number; depth: number; extraHops: number; + retry: number; status: number; @@ -87,6 +89,7 @@ export class PageState { } this.pageid = redisData.pageid || uuidv4(); this.status = 0; + this.retry = redisData.retry || 0; } } @@ -115,17 +118,12 @@ declare module "ioredis" { uid: string, ): Result<void, Context>; - movefailed( - pkey: string, - fkey: string, - url: string, - value: string, - state: string, - ): Result<void, Context>; + movefailed(pkey: string, fkey: string, url: string): Result<void, Context>; requeuefailed( fkey: string, qkey: string, + ffkey: string, maxRetryPending: number, maxRegularDepth: number, ): Result<number, Context>; @@ -170,7 +168,7 @@ export type SaveState = { // ============================================================================ export class RedisCrawlState { redis: Redis; - maxRetryPending = 1; + maxRetryPending = MAX_RETRY_FAILED; uid: string; key: string; @@ -181,6 +179,7 @@ export class RedisCrawlState { skey: string; dkey: string; fkey: string; + ffkey: string; ekey: string; pageskey: string; esKey: string; @@ -202,6 +201,8 @@ export class RedisCrawlState { this.dkey = this.key + ":d"; // failed this.fkey = this.key + ":f"; + // failed final, no more retry + this.ffkey = this.key + ":ff"; // crawler errors this.ekey = this.key + ":e"; // pages @@ -283,7 +284,6 @@ local json = redis.call('hget', KEYS[1], ARGV[1]); if json then local data = cjson.decode(json); - data[ARGV[3]] = ARGV[2]; json = cjson.encode(data); redis.call('lpush', KEYS[2], json); @@ -294,23 +294,25 @@ end }); redis.defineCommand("requeuefailed", { - numberOfKeys: 2, + numberOfKeys: 3, lua: ` local json = redis.call('rpop', KEYS[1]); if json then local data = cjson.decode(json); data['retry'] = (data['retry'] or 0) + 1; - if tonumber(data['retry']) <= tonumber(ARGV[1]) then - json = cjson.encode(data); + + if data['retry'] <= tonumber(ARGV[1]) then + local json = cjson.encode(data); local score = (data['depth'] or 0) + ((data['extraHops'] or 0) * ARGV[2]); redis.call('zadd', KEYS[2], score, json); - return 1; + return data['retry']; else - return 2; + redis.call('lpush', KEYS[3], json); + return 0; end end -return 0; +return -1; `, }); @@ -382,9 +384,7 @@ return inx; } async markFailed(url: string) { - await this.redis.movefailed(this.pkey, this.fkey, url, "1", "failed"); - - return await this.redis.incr(this.dkey); + await this.redis.movefailed(this.pkey, this.fkey, url); } async markExcluded(url: string) { @@ -400,7 +400,10 @@ return inx; } async isFinished() { - return (await this.queueSize()) == 0 && (await this.numDone()) > 0; + return ( + (await this.queueSize()) + (await this.numFailedWillRetry()) == 0 && + (await this.numDone()) + (await this.numFailedNoRetry()) > 0 + ); } async setStatus(status_: string) { @@ -572,25 +575,22 @@ return inx; async nextFromQueue() { let json = await this._getNext(); - let retryFailed = false; + let retry = 0; if (!json) { - const res = await this.redis.requeuefailed( + retry = await this.redis.requeuefailed( this.fkey, this.qkey, + this.ffkey, this.maxRetryPending, MAX_DEPTH, ); - switch (res) { - case 1: - json = await this._getNext(); - retryFailed = true; - break; - - case 2: - logger.debug("Did not retry failed, already retried", {}, "state"); - return null; + if (retry > 0) { + json = await this._getNext(); + } else if (retry === 0) { + logger.debug("Did not retry failed, already retried", {}, "state"); + return null; } } @@ -607,8 +607,8 @@ return inx; return null; } - if (retryFailed) { - logger.debug("Retring failed URL", { url: data.url }, "state"); + if (retry) { + logger.debug("Retrying failed URL", { url: data.url, retry }, "state"); } await this.markStarted(data.url); @@ -626,11 +626,14 @@ return inx; const seen = await this._iterSet(this.skey); const queued = await this._iterSortedKey(this.qkey, seen); const pending = await this.getPendingList(); - const failed = await this._iterListKeys(this.fkey, seen); + const failedWillRetry = await this._iterListKeys(this.fkey, seen); + const failedNoRetry = await this._iterListKeys(this.ffkey, seen); const errors = await this.getErrorList(); const extraSeeds = await this._iterListKeys(this.esKey, seen); const sitemapDone = await this.isSitemapDone(); + const failed = failedWillRetry.concat(failedNoRetry); + const finished = [...seen.values()]; return { @@ -721,6 +724,7 @@ return inx; await this.redis.del(this.pkey); await this.redis.del(this.dkey); await this.redis.del(this.fkey); + await this.redis.del(this.ffkey); await this.redis.del(this.skey); await this.redis.del(this.ekey); @@ -803,7 +807,12 @@ return inx; for (const json of state.failed) { const data = JSON.parse(json); - await this.redis.zadd(this.qkey, this._getScore(data), json); + const retry = data.retry || 0; + if (retry <= this.maxRetryPending) { + await this.redis.zadd(this.qkey, this._getScore(data), json); + } else { + await this.redis.rpush(this.ffkey, json); + } seen.push(data.url); } @@ -831,10 +840,14 @@ return inx; return res; } - async numFailed() { + async numFailedWillRetry() { return await this.redis.llen(this.fkey); } + async numFailedNoRetry() { + return await this.redis.llen(this.ffkey); + } + async getPendingList() { return await this.redis.hvals(this.pkey); } diff --git a/tests/file_stats.test.js b/tests/file_stats.test.js index 61042e38..ca13d8f4 100644 --- a/tests/file_stats.test.js +++ b/tests/file_stats.test.js @@ -50,6 +50,7 @@ test("check that stats file format is correct", () => { expect(dataJSON.total).toEqual(3); expect(dataJSON.pending).toEqual(0); expect(dataJSON.failed).toEqual(0); + expect(dataJSON.failedWillRetry).toEqual(0); expect(dataJSON.limit.max).toEqual(3); expect(dataJSON.limit.hit).toBe(true); expect(dataJSON.pendingPages.length).toEqual(0); diff --git a/tests/retry-failed.test.js b/tests/retry-failed.test.js new file mode 100644 index 00000000..0256e4c3 --- /dev/null +++ b/tests/retry-failed.test.js @@ -0,0 +1,71 @@ +import { execSync, spawn } from "child_process"; +import fs from "fs"; +import Redis from "ioredis"; + +const DOCKER_HOST_NAME = process.env.DOCKER_HOST_NAME || "host.docker.internal"; + +async function sleep(time) { + await new Promise((resolve) => setTimeout(resolve, time)); +} + +test("run crawl", async () => { + let status = 0; + execSync(`docker run -d -v $PWD/test-crawls:/crawls -e CRAWL_ID=test -p 36387:6379 --rm webrecorder/browsertrix-crawler crawl --url http://${DOCKER_HOST_NAME}:31501 --url https://example.com/ --limit 2 --pageExtraDelay 10 --debugAccessRedis --collection retry-fail`); + +/* + async function runServer() { + console.log("Waiting to start server"); + await sleep(2000); + + console.log("Starting server"); + //spawn("../../node_modules/.bin/http-server", ["-p", "31501", "--username", "user", "--password", "pass"], {cwd: "./docs/site"}); + } +*/ + const redis = new Redis("redis://127.0.0.1:36387/0", { lazyConnect: true, retryStrategy: () => null }); + + await sleep(3000); + + let numRetries = 0; + + try { + await redis.connect({ + maxRetriesPerRequest: 100, + }); + + //runServer(); + + while (true) { + const res = await redis.lrange("test:ff", 0, -1); + if (res.length) { + const data = JSON.parse(res); + if (data.retry) { + numRetries = data.retry; + break; + } + } + await sleep(20); + } + + } catch (e) { + console.error(e); + } finally { + expect(numRetries).toBe(5); + } +}); + + +test("check only one failed page entry is made", () => { + expect( + fs.existsSync("test-crawls/collections/retry-fail/pages/pages.jsonl"), + ).toBe(true); + + expect( + fs + .readFileSync( + "test-crawls/collections/retry-fail/pages/pages.jsonl", + "utf8", + ).trim().split("\n").length + ).toBe(3); +}); + +