Skip to content

Commit

Permalink
Retry support and additional fixes (#743)
Browse files Browse the repository at this point in the history
- retries: for failed pages, set retry to 5 in cases multiple retries
may be needed.
- redirect: if page url is /path/ -> /path, don't add as extra seed
- proxy: don't use global dispatcher, pass dispatcher explicitly when
using proxy, as proxy may interfere with local network requests
- final exit flag: if crawl is done and also interrupted, ensure WACZ is
still written/uploaded by setting final exit to true
- hashtag only change force reload: if loading page with same URL but
different hashtag, eg. `https://example.com/#B` after
`https://example.com/#A`, do a full reload
  • Loading branch information
ikreymer authored Jan 26, 2025
1 parent 5d9c62e commit f7cbf96
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 74 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "browsertrix-crawler",
"version": "1.5.0-beta.2",
"version": "1.5.0-beta.3",
"main": "browsertrix-crawler",
"type": "module",
"repository": "https://github.com/webrecorder/browsertrix-crawler",
Expand Down
74 changes: 52 additions & 22 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
ExtractSelector,
PAGE_OP_TIMEOUT_SECS,
SITEMAP_INITIAL_FETCH_TIMEOUT_SECS,
MAX_RETRY_FAILED,
} from "./util/constants.js";

import { AdBlockRules, BlockRuleDecl, BlockRules } from "./util/blockrules.js";
Expand Down Expand Up @@ -1152,13 +1153,13 @@ self.__bx_behaviors.selectMainBehavior();
}

async pageFinished(data: PageState) {
await this.writePage(data);

// if page loaded, considered page finished successfully
// (even if behaviors timed out)
const { loadState, logDetails, depth, url } = data;
const { loadState, logDetails, depth, url, retry } = data;

if (data.loadState >= LoadState.FULL_PAGE_LOADED) {
await this.writePage(data);

logger.info("Page Finished", { loadState, ...logDetails }, "pageStatus");

await this.crawlState.markFinished(url);
Expand All @@ -1171,6 +1172,9 @@ self.__bx_behaviors.selectMainBehavior();

await this.checkLimits();
} else {
if (retry >= MAX_RETRY_FAILED) {
await this.writePage(data);
}
await this.crawlState.markFailed(url);

if (this.healthChecker) {
Expand Down Expand Up @@ -1370,7 +1374,7 @@ self.__bx_behaviors.selectMainBehavior();
}

if (this.params.failOnFailedLimit) {
const numFailed = await this.crawlState.numFailed();
const numFailed = await this.crawlState.numFailedWillRetry();
const failedLimit = this.params.failOnFailedLimit;
if (numFailed >= failedLimit) {
logger.fatal(
Expand Down Expand Up @@ -1498,6 +1502,7 @@ self.__bx_behaviors.selectMainBehavior();
logger.info("crawl already finished, running post-crawl tasks", {
state: initState,
});
this.finalExit = true;
await this.postCrawl();
return;
} else if (await this.crawlState.isCrawlStopped()) {
Expand Down Expand Up @@ -1581,8 +1586,11 @@ self.__bx_behaviors.selectMainBehavior();

await this.writeStats();

// if crawl has been stopped, mark as final exit for post-crawl tasks
if (await this.crawlState.isCrawlStopped()) {
// if crawl has been stopped or finished, mark as final exit for post-crawl tasks
if (
(await this.crawlState.isCrawlStopped()) ||
(await this.crawlState.isFinished())
) {
this.finalExit = true;
}

Expand Down Expand Up @@ -1822,16 +1830,19 @@ self.__bx_behaviors.selectMainBehavior();

const realSize = await this.crawlState.queueSize();
const pendingPages = await this.crawlState.getPendingList();
const done = await this.crawlState.numDone();
const failed = await this.crawlState.numFailed();
const total = realSize + pendingPages.length + done;
const pending = pendingPages.length;
const crawled = await this.crawlState.numDone();
const failedWillRetry = await this.crawlState.numFailedWillRetry();
const failed = await this.crawlState.numFailedNoRetry();
const total = realSize + pendingPages.length + crawled;
const limit = { max: this.pageLimit || 0, hit: this.limitHit };
const stats = {
crawled: done,
total: total,
pending: pendingPages.length,
failed: failed,
limit: limit,
crawled,
total,
pending,
failedWillRetry,
failed,
limit,
pendingPages,
};

Expand Down Expand Up @@ -1885,22 +1896,39 @@ self.__bx_behaviors.selectMainBehavior();
}
};

page.on("response", waitFirstResponse);
const handleFirstLoadEvents = () => {
page.on("response", waitFirstResponse);

// store that domcontentloaded was finished
page.once("domcontentloaded", () => {
data.loadState = LoadState.CONTENT_LOADED;
});
// store that domcontentloaded was finished
page.once("domcontentloaded", () => {
data.loadState = LoadState.CONTENT_LOADED;
});
};

const gotoOpts = data.isHTMLPage
? this.gotoOpts
: { waitUntil: "domcontentloaded" };

logger.info("Awaiting page load", logDetails);

const urlNoHash = url.split("#")[0];

const fullRefresh = urlNoHash === page.url().split("#")[0];

try {
if (!fullRefresh) {
handleFirstLoadEvents();
}
// store the page load response when page fully loads
fullLoadedResponse = await page.goto(url, gotoOpts);

if (fullRefresh) {
logger.debug("Hashtag-only change, doing full page reload");

handleFirstLoadEvents();

fullLoadedResponse = await page.reload(gotoOpts);
}
} catch (e) {
if (!(e instanceof Error)) {
throw e;
Expand All @@ -1921,7 +1949,7 @@ self.__bx_behaviors.selectMainBehavior();
} else if (!downloadResponse) {
// log if not already log and rethrow, consider page failed
if (msg !== "logged") {
logger.error("Page Load Failed, skipping page", {
logger.error("Page Load Failed, will retry", {
msg,
loadState: data.loadState,
...logDetails,
Expand All @@ -1944,7 +1972,8 @@ self.__bx_behaviors.selectMainBehavior();
if (
depth === 0 &&
!isChromeError &&
respUrl !== url.split("#")[0] &&
respUrl !== urlNoHash &&
respUrl + "/" !== url &&
!downloadResponse
) {
data.seedId = await this.crawlState.addExtraSeed(
Expand Down Expand Up @@ -2652,8 +2681,9 @@ self.__bx_behaviors.selectMainBehavior();
if (this.origConfig) {
this.origConfig.state = state;
}
const res = yaml.dump(this.origConfig, { lineWidth: -1 });

try {
const res = yaml.dump(this.origConfig, { lineWidth: -1 });
logger.info(`Saving crawl state to: ${filename}`);
await fsp.writeFile(filename, res);
} catch (e) {
Expand Down
4 changes: 3 additions & 1 deletion src/util/blockrules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { HTTPRequest, Page } from "puppeteer-core";
import { Browser } from "./browser.js";

import { fetch } from "undici";
import { getProxyDispatcher } from "./proxy.js";

const RULE_TYPES = ["block", "allowOnly"];

Expand Down Expand Up @@ -271,7 +272,7 @@ export class BlockRules {
logDetails: Record<string, any>,
) {
try {
const res = await fetch(reqUrl);
const res = await fetch(reqUrl, { dispatcher: getProxyDispatcher() });
const text = await res.text();

return !!text.match(frameTextMatch);
Expand Down Expand Up @@ -302,6 +303,7 @@ export class BlockRules {
method: "PUT",
headers: { "Content-Type": "text/html" },
body,
dispatcher: getProxyDispatcher(),
});
}
}
Expand Down
1 change: 1 addition & 0 deletions src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const ADD_LINK_FUNC = "__bx_addLink";
export const FETCH_FUNC = "__bx_fetch";

export const MAX_DEPTH = 1000000;
export const MAX_RETRY_FAILED = 5;

export const FETCH_HEADERS_TIMEOUT_SECS = 30;
export const PAGE_OP_TIMEOUT_SECS = 5;
Expand Down
3 changes: 2 additions & 1 deletion src/util/file_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import util from "util";
import { exec as execCallback } from "child_process";

import { logger } from "./logger.js";
import { getProxyDispatcher } from "./proxy.js";

const exec = util.promisify(execCallback);

Expand Down Expand Up @@ -85,7 +86,7 @@ async function collectOnlineBehavior(url: string): Promise<FileSources> {
const behaviorFilepath = `/app/behaviors/${filename}`;

try {
const res = await fetch(url);
const res = await fetch(url, { dispatcher: getProxyDispatcher() });
const fileContents = await res.text();
await fsp.writeFile(behaviorFilepath, fileContents);
logger.info(
Expand Down
6 changes: 5 additions & 1 deletion src/util/originoverride.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { formatErr, logger } from "./logger.js";
import { Browser } from "./browser.js";

import { fetch } from "undici";
import { getProxyDispatcher } from "./proxy.js";

export class OriginOverride {
originOverride: { origUrl: URL; destUrl: URL }[];
Expand Down Expand Up @@ -45,7 +46,10 @@ export class OriginOverride {
headers.set("origin", orig.origin);
}

const resp = await fetch(newUrl, { headers });
const resp = await fetch(newUrl, {
headers,
dispatcher: getProxyDispatcher(),
});

const body = Buffer.from(await resp.arrayBuffer());
const respHeaders = Object.fromEntries(resp.headers);
Expand Down
10 changes: 8 additions & 2 deletions src/util/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import net from "net";
import { Agent, Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici";
import { Agent, Dispatcher, ProxyAgent } from "undici";

import child_process from "child_process";

Expand All @@ -13,6 +13,8 @@ const SSH_PROXY_LOCAL_PORT = 9722;

const SSH_WAIT_TIMEOUT = 30000;

let proxyDispatcher: Dispatcher | undefined = undefined;

export function getEnvProxyUrl() {
if (process.env.PROXY_SERVER) {
return process.env.PROXY_SERVER;
Expand Down Expand Up @@ -46,10 +48,14 @@ export async function initProxy(

// set global fetch() dispatcher (with proxy, if any)
const dispatcher = createDispatcher(proxy, agentOpts);
setGlobalDispatcher(dispatcher);
proxyDispatcher = dispatcher;
return proxy;
}

export function getProxyDispatcher() {
return proxyDispatcher;
}

export function createDispatcher(
proxyUrl: string,
opts: Agent.Options,
Expand Down
23 changes: 14 additions & 9 deletions src/util/recorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
isRedirectStatus,
} from "./reqresp.js";

import { fetch, getGlobalDispatcher, Response } from "undici";
import { fetch, Response } from "undici";

import {
getCustomRewriter,
Expand All @@ -23,6 +23,7 @@ import { WARCWriter } from "./warcwriter.js";
import { RedisCrawlState, WorkerId } from "./state.js";
import { CDPSession, Protocol } from "puppeteer-core";
import { Crawler } from "../crawler.js";
import { getProxyDispatcher } from "./proxy.js";

const MAX_BROWSER_DEFAULT_FETCH_SIZE = 5_000_000;
const MAX_TEXT_REWRITE_SIZE = 25_000_000;
Expand Down Expand Up @@ -1588,14 +1589,18 @@ class AsyncFetcher {

const headers = reqresp.getRequestHeadersDict();

const dispatcher = getGlobalDispatcher().compose((dispatch) => {
return (opts, handler) => {
if (opts.headers) {
reqresp.requestHeaders = opts.headers as Record<string, string>;
}
return dispatch(opts, handler);
};
});
let dispatcher = getProxyDispatcher();

if (dispatcher) {
dispatcher = dispatcher.compose((dispatch) => {
return (opts, handler) => {
if (opts.headers) {
reqresp.requestHeaders = opts.headers as Record<string, string>;
}
return dispatch(opts, handler);
};
});
}

const resp = await fetch(url!, {
method,
Expand Down
6 changes: 5 additions & 1 deletion src/util/sitemapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { DETECT_SITEMAP } from "./constants.js";
import { sleep } from "./timing.js";

import { fetch, Response } from "undici";
import { getProxyDispatcher } from "./proxy.js";

const SITEMAP_CONCURRENCY = 5;

Expand Down Expand Up @@ -65,7 +66,10 @@ export class SitemapReader extends EventEmitter {

async _fetchWithRetry(url: string, message: string) {
while (true) {
const resp = await fetch(url, { headers: this.headers });
const resp = await fetch(url, {
headers: this.headers,
dispatcher: getProxyDispatcher(),
});

if (resp.ok) {
return resp;
Expand Down
Loading

0 comments on commit f7cbf96

Please sign in to comment.