Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stop bot in case of tx errors #9421

Merged
merged 12 commits into from
Oct 30, 2024
4 changes: 3 additions & 1 deletion docker-compose.provernet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ services:
BOT_PUBLIC_TRANSFERS_PER_TX: 0
BOT_NO_WAIT_FOR_TRANSFERS: true
BOT_NO_START: false
BOT_MAX_CONSECUTIVE_ERRORS: 3
BOT_STOP_WHEN_UNHEALTHY: true
PXE_PROVER_ENABLED: "${PROVER_REAL_PROOFS:-false}"
PROVER_REAL_PROOFS: "${PROVER_REAL_PROOFS:-false}"
BB_SKIP_CLEANUP: "${BB_SKIP_CLEANUP:-0}" # Persist tmp dirs for debugging
Expand All @@ -150,7 +152,7 @@ services:
test: [ "CMD", "curl", "-fSs", "http://127.0.0.1:80/status" ]
interval: 3s
timeout: 30s
start_period: 10s
start_period: 90s
restart: on-failure:5
command: [ "start", "--bot", "--pxe" ]

Expand Down
2 changes: 1 addition & 1 deletion yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ export class Archiver implements ArchiveSource {
localBlockForDestinationProvenBlockNumber &&
provenArchive === localBlockForDestinationProvenBlockNumber.archive.root.toString()
) {
this.log.info(`Updating the proven block number to ${provenBlockNumber} and epoch to ${provenEpochNumber}`);
this.log.verbose(`Updating the proven block number to ${provenBlockNumber} and epoch to ${provenEpochNumber}`);
await this.store.setProvenL2BlockNumber(Number(provenBlockNumber));
// if we are here then we must have a valid proven epoch number
await this.store.setProvenL2EpochNumber(Number(provenEpochNumber));
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/archiver/src/test/mock_l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ export class MockL2BlockSource implements L2BlockSource {
return Promise.resolve(this.l2Blocks.length);
}

public async getProvenBlockNumber(): Promise<number> {
return this.provenBlockNumber ?? (await this.getBlockNumber());
public getProvenBlockNumber(): Promise<number> {
return Promise.resolve(this.provenBlockNumber);
}

public getProvenL2EpochNumber(): Promise<number | undefined> {
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec/src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export function injectAztecCommands(program: Command, userLog: LogFn, debugLogge

const app = rpcServer.getApp(options.apiPrefix);
// add status route
const statusRouter = createStatusRouter(options.apiPrefix);
const statusRouter = createStatusRouter(() => rpcServer.isHealthy(), options.apiPrefix);
app.use(statusRouter.routes()).use(statusRouter.allowedMethods());

const httpServer = http.createServer(app.callback());
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec/src/cli/cmds/start_txe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const startTXE = (options: any, debugLogger: DebugLogger) => {
const txeServer = createTXERpcServer(debugLogger);
const app = txeServer.getApp();
// add status route
const statusRouter = createStatusRouter();
const statusRouter = createStatusRouter(() => txeServer.isHealthy());
app.use(statusRouter.routes()).use(statusRouter.allowedMethods());

const httpServer = http.createServer(app.callback());
Expand Down
14 changes: 14 additions & 0 deletions yarn-project/bot/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export type BotConfig = {
daGasLimit: number | undefined;
/** Token contract to use */
contract: SupportedTokenContracts;
/** The maximum number of consecutive errors before the bot shuts down */
maxConsecutiveErrors: number;
/** Stops the bot if service becomes unhealthy */
stopWhenUnhealthy: boolean;
};

export const botConfigMappings: ConfigMappingsType<BotConfig> = {
Expand Down Expand Up @@ -164,6 +168,16 @@ export const botConfigMappings: ConfigMappingsType<BotConfig> = {
return val as SupportedTokenContracts;
},
},
maxConsecutiveErrors: {
env: 'BOT_MAX_CONSECUTIVE_ERRORS',
description: 'The maximum number of consecutive errors before the bot shuts down',
...numberConfigHelper(0),
},
stopWhenUnhealthy: {
env: 'BOT_STOP_WHEN_UNHEALTHY',
description: 'Stops the bot if service becomes unhealthy',
...booleanConfigHelper(false),
},
};

export function getBotConfigFromEnv(): BotConfig {
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/bot/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ import { type BotRunner } from './runner.js';
* @returns An JSON-RPC HTTP server
*/
export function createBotRunnerRpcServer(botRunner: BotRunner) {
return new JsonRpcServer(botRunner, { AztecAddress, EthAddress, Fr, TxHash }, {}, []);
return new JsonRpcServer(botRunner, { AztecAddress, EthAddress, Fr, TxHash }, {}, [], () => botRunner.isHealthy());
}
19 changes: 18 additions & 1 deletion yarn-project/bot/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export class BotRunner {
private pxe?: PXE;
private node: AztecNode;
private runningPromise: RunningPromise;
private consecutiveErrors = 0;
private healthy = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can do without the healthy property and just recompute it on the fly based on consecutiveErrors and config

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to tackle this @alexghr?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair it's a nitpick, fine for me if it goes untackled


public constructor(private config: BotConfig, dependencies: { pxe?: PXE; node?: AztecNode }) {
this.pxe = dependencies.pxe;
Expand Down Expand Up @@ -52,6 +54,10 @@ export class BotRunner {
this.log.info(`Stopped bot`);
}

public isHealthy() {
return this.runningPromise.isRunning() && this.healthy;
}

/** Returns whether the bot is running. */
public isRunning() {
return this.runningPromise.isRunning();
Expand Down Expand Up @@ -96,8 +102,10 @@ export class BotRunner {

try {
await bot.run();
this.consecutiveErrors = 0;
} catch (err) {
this.log.error(`Error running bot: ${err}`);
this.consecutiveErrors += 1;
this.log.error(`Error running bot consecutiveCount=${this.consecutiveErrors}: ${err}`);
throw err;
}
}
Expand Down Expand Up @@ -130,6 +138,15 @@ export class BotRunner {
await this.run();
} catch (err) {
// Already logged in run()
if (this.config.maxConsecutiveErrors > 0 && this.consecutiveErrors >= this.config.maxConsecutiveErrors) {
this.log.error(`Too many errors bot is unhealthy`);
this.healthy = false;
}
}

if (!this.healthy && this.config.stopWhenUnhealthy) {
this.log.error(`Stopping bot due to errors`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a fatal log level :-P

process.exit(1); // workaround docker not restarting the container if its unhealthy. We have to exit instead
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class L2BlockStream {
while (latestBlockNumber < sourceTips.latest.number) {
const from = latestBlockNumber + 1;
const limit = Math.min(this.opts.batchSize ?? 20, sourceTips.latest.number - from + 1);
this.log.debug(`Requesting blocks from ${from} limit ${limit}`);
this.log.debug(`Requesting blocks from ${from} limit ${limit} proven=${this.opts.proven}`);
const blocks = await this.l2BlockSource.getBlocks(from, limit, this.opts.proven);
if (blocks.length === 0) {
break;
Expand Down
21 changes: 17 additions & 4 deletions yarn-project/end-to-end/src/e2e_block_building.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Fr,
L1NotePayload,
type PXE,
TxStatus,
type Wallet,
deriveKeys,
retryUntil,
Expand Down Expand Up @@ -442,21 +443,33 @@ describe('e2e_block_building', () => {
expect(tx1.blockNumber).toEqual(initialBlockNumber + 1);
expect(await contract.methods.get_public_value(ownerAddress).simulate()).toEqual(20n);

// Now move to a new epoch and past the proof claim window
// Now move to a new epoch and past the proof claim window to cause a reorg
logger.info('Advancing past the proof claim window');
await cheatCodes.rollup.advanceToNextEpoch();
await cheatCodes.rollup.advanceSlots(AZTEC_EPOCH_PROOF_CLAIM_WINDOW_IN_L2_SLOTS + 1); // off-by-one?

// Wait a bit before spawning a new pxe
await sleep(2000);

// tx1 is valid because it was build against a proven block number
// the sequencer will bring it back on chain
await retryUntil(
async () => (await aztecNode.getTxReceipt(tx1.txHash)).status === TxStatus.SUCCESS,
'wait for re-inclusion',
60,
1,
);

const newTx1Receipt = await aztecNode.getTxReceipt(tx1.txHash);
expect(newTx1Receipt.blockNumber).toEqual(tx1.blockNumber);
expect(newTx1Receipt.blockHash).not.toEqual(tx1.blockHash);

// Send another tx which should be mined a block that is built on the reorg'd chain
// We need to send it from a new pxe since pxe doesn't detect reorgs (yet)
logger.info(`Creating new PXE service`);
const pxeServiceConfig = { ...getPXEServiceConfig() };
const newPxe = await createPXEService(aztecNode, pxeServiceConfig);
const newWallet = await createAccount(newPxe);
expect(await pxe.getBlockNumber()).toEqual(initialBlockNumber + 1);

// TODO: Contract.at should automatically register the instance in the pxe
logger.info(`Registering contract at ${contract.address} in new pxe`);
Expand All @@ -465,8 +478,8 @@ describe('e2e_block_building', () => {

logger.info('Sending new tx on reorgd chain');
const tx2 = await contractFromNewPxe.methods.increment_public_value(ownerAddress, 10).send().wait();
expect(await contractFromNewPxe.methods.get_public_value(ownerAddress).simulate()).toEqual(10n);
expect(tx2.blockNumber).toEqual(initialBlockNumber + 2);
expect(await contractFromNewPxe.methods.get_public_value(ownerAddress).simulate()).toEqual(30n);
expect(tx2.blockNumber).toEqual(initialBlockNumber + 3);
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type DebugLogger,
EpochProofQuote,
EpochProofQuotePayload,
TxStatus,
createDebugLogger,
sleep,
} from '@aztec/aztec.js';
Expand Down Expand Up @@ -387,12 +388,12 @@ describe('e2e_prover_coordination', () => {
// Progress epochs with a block in each until we hit a reorg
// Note tips are block numbers, not slots
await expectTips({ pending: 3n, proven: 3n });
await contract.methods.create_note(recipient, recipient, 10).send().wait();
const tx2BeforeReorg = await contract.methods.create_note(recipient, recipient, 10).send().wait();
await expectTips({ pending: 4n, proven: 3n });

// Go to epoch 3
await advanceToNextEpoch();
await contract.methods.create_note(recipient, recipient, 10).send().wait();
const tx3BeforeReorg = await contract.methods.create_note(recipient, recipient, 10).send().wait();
await expectTips({ pending: 5n, proven: 3n });

// Go to epoch 4 !!! REORG !!! ay caramba !!!
Expand All @@ -401,15 +402,32 @@ describe('e2e_prover_coordination', () => {
// Wait a bit for the sequencer / node to notice a re-org
await sleep(2000);

// the sequencer will add valid txs again but in a new block
const tx2AfterReorg = await ctx.aztecNode.getTxReceipt(tx2BeforeReorg.txHash);
const tx3AfterReorg = await ctx.aztecNode.getTxReceipt(tx3BeforeReorg.txHash);

// the tx from epoch 2 is still valid since it references a proven block
// this will be added back onto the chain
expect(tx2AfterReorg.status).toEqual(TxStatus.SUCCESS);
expect(tx2AfterReorg.blockNumber).toEqual(tx2BeforeReorg.blockNumber);
expect(tx2AfterReorg.blockHash).not.toEqual(tx2BeforeReorg.blockHash);

// the tx from epoch 3 is not valid anymore, since it was built against a reorged block
// should be dropped
expect(tx3AfterReorg.status).toEqual(TxStatus.DROPPED);

// new pxe, as it does not support reorgs
const pxeServiceConfig = { ...getPXEServiceConfig() };
const newPxe = await createPXEService(ctx.aztecNode, pxeServiceConfig);
const newWallet = await createAccount(newPxe);
const newWalletAddress = newWallet.getAddress();

// The chain will re-org back to block 3, but creating a new account will produce a block, so we expect
// 4 blocks in the pending chain here!
await expectTips({ pending: 4n, proven: 3n });
// The chain will prune back to block 3
// then include the txs from the pruned epochs that are still valid
// bringing us back to block 4 (same number, different hash)
// creating a new account will produce another block
// so we expect 5 blocks in the pending chain here!
await expectTips({ pending: 5n, proven: 3n });

// Submit proof claim for the new epoch
const quoteForEpoch4 = await makeEpochProofQuote({
Expand All @@ -427,7 +445,7 @@ describe('e2e_prover_coordination', () => {

logger.info('Sending new tx on reorged chain');
await contractFromNewPxe.methods.create_note(newWalletAddress, newWalletAddress, 10).send().wait();
await expectTips({ pending: 5n, proven: 3n });
await expectTips({ pending: 6n, proven: 3n });

// Expect the proof claim to be accepted for the chain after the reorg
await expectProofClaimOnL1({ ...quoteForEpoch4.payload, proposer: publisherAddress });
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type EnvVar =
| 'BOT_TOKEN_SALT'
| 'BOT_TX_INTERVAL_SECONDS'
| 'BOT_TX_MINED_WAIT_SECONDS'
| 'BOT_MAX_CONSECUTIVE_ERRORS'
| 'BOT_STOP_WHEN_UNHEALTHY'
| 'COINBASE'
| 'DATA_DIRECTORY'
| 'DEBUG'
Expand Down Expand Up @@ -61,6 +63,7 @@ export type EnvVar =
| 'OTEL_SERVICE_NAME'
| 'OUTBOX_CONTRACT_ADDRESS'
| 'P2P_BLOCK_CHECK_INTERVAL_MS'
| 'P2P_BLOCK_REQUEST_BATCH_SIZE'
| 'P2P_ENABLED'
| 'P2P_GOSSIPSUB_D'
| 'P2P_GOSSIPSUB_DHI'
Expand Down
40 changes: 36 additions & 4 deletions yarn-project/foundation/src/json-rpc/server/json_rpc_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ export class JsonRpcServer {
private objectClassMap: JsonClassConverterInput,
/** List of methods to disallow from calling remotely */
public readonly disallowedMethods: string[] = [],
private healthCheck: StatusCheckFn = () => true,
private log = createDebugLogger('json-rpc:server'),
) {
this.proxy = new JsonProxy(handler, stringClassMap, objectClassMap);
}

public isHealthy(): boolean | Promise<boolean> {
return this.healthCheck();
}

/**
* Get an express app object.
* @param prefix - Our server prefix.
Expand Down Expand Up @@ -205,15 +210,25 @@ export class JsonRpcServer {
}
}

export type StatusCheckFn = () => boolean | Promise<boolean>;

/**
* Creates a router for handling a plain status request that will return 200 status when running.
* @param getCurrentStatus - List of health check functions to run.
* @param apiPrefix - The prefix to use for all api requests
* @returns - The router for handling status requests.
*/
export function createStatusRouter(apiPrefix = '') {
export function createStatusRouter(getCurrentStatus: StatusCheckFn, apiPrefix = '') {
const router = new Router({ prefix: `${apiPrefix}` });
router.get('/status', (ctx: Koa.Context) => {
ctx.status = 200;
router.get('/status', async (ctx: Koa.Context) => {
let ok: boolean;
try {
ok = (await getCurrentStatus()) === true;
} catch (err) {
ok = false;
}

ctx.status = ok ? 200 : 500;
});
return router;
}
Expand Down Expand Up @@ -296,5 +311,22 @@ export function createNamespacedJsonRpcServer(
{ stringClassMap: {}, objectClassMap: {} } as ClassMaps,
);

return new JsonRpcServer(Object.create(handler), classMaps.stringClassMap, classMaps.objectClassMap, [], log);
const aggregateHealthCheck = async () => {
const statuses = await Promise.allSettled(
servers.flatMap(services =>
Object.entries(services).map(async ([name, service]) => ({ name, healthy: await service.isHealthy() })),
),
);
const allHealthy = statuses.every(result => result.status === 'fulfilled' && result.value.healthy);
return allHealthy;
};

return new JsonRpcServer(
Object.create(handler),
classMaps.stringClassMap,
classMaps.objectClassMap,
[],
aggregateHealthCheck,
log,
);
}
Loading
Loading