Skip to content

Commit

Permalink
chore: Trace and handle errors in running promises (#10645)
Browse files Browse the repository at this point in the history
- Adds a try/catch to every iteration of a running promise, and logs any
exceptions
- Adds a `trackSpan` to every running promise job
- Adds a `simulatePublicCalls` to the aztec node server just because
  • Loading branch information
spalladino authored Dec 13, 2024
1 parent b16945b commit 4cc0a6d
Show file tree
Hide file tree
Showing 40 changed files with 451 additions and 264 deletions.
4 changes: 3 additions & 1 deletion yarn-project/archiver/src/archiver/archiver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { EthAddress } from '@aztec/foundation/eth-address';
import { Fr } from '@aztec/foundation/fields';
import { sleep } from '@aztec/foundation/sleep';
import { type InboxAbi, RollupAbi } from '@aztec/l1-artifacts';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';
Expand Down Expand Up @@ -84,7 +85,8 @@ describe('Archiver', () => {
}) as any,
});

instrumentation = mock({ isEnabled: () => true });
const tracer = new NoopTelemetryClient().getTracer();
instrumentation = mock<ArchiverInstrumentation>({ isEnabled: () => true, tracer });
archiverStore = new MemoryArchiverStore(1000);

archiver = new Archiver(
Expand Down
21 changes: 7 additions & 14 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import {
PrivateFunctionBroadcastedEvent,
UnconstrainedFunctionBroadcastedEvent,
} from '@aztec/protocol-contracts';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import groupBy from 'lodash.groupby';
import {
Expand Down Expand Up @@ -85,7 +85,7 @@ export type ArchiveSource = L2BlockSource &
* Responsible for handling robust L1 polling so that other components do not need to
* concern themselves with it.
*/
export class Archiver implements ArchiveSource {
export class Archiver implements ArchiveSource, Traceable {
/**
* A promise in which we will be continually fetching new L2 blocks.
*/
Expand All @@ -99,6 +99,8 @@ export class Archiver implements ArchiveSource {
public l1BlockNumber: bigint | undefined;
public l1Timestamp: bigint | undefined;

public readonly tracer: Tracer;

/**
* Creates a new instance of the Archiver.
* @param publicClient - A client for interacting with the Ethereum node.
Expand All @@ -118,6 +120,7 @@ export class Archiver implements ArchiveSource {
private readonly l1constants: L1RollupConstants,
private readonly log: Logger = createLogger('archiver'),
) {
this.tracer = instrumentation.tracer;
this.store = new ArchiverStoreHelper(dataStore);

this.rollup = getContract({
Expand Down Expand Up @@ -194,24 +197,14 @@ export class Archiver implements ArchiveSource {
await this.sync(blockUntilSynced);
}

this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs);
this.runningPromise = new RunningPromise(() => this.sync(false), this.log, this.config.pollingIntervalMs);
this.runningPromise.start();
}

/**
* Syncs and catches exceptions.
*/
private async safeSync() {
try {
await this.sync(false);
} catch (error) {
this.log.error('Error syncing archiver', error);
}
}

/**
* Fetches logs from L1 contracts and processes them.
*/
@trackSpan('Archiver.sync', initialRun => ({ [Attributes.INITIAL_SYNC]: initialRun }))
private async sync(initialRun: boolean) {
/**
* We keep track of three "pointers" to L1 blocks:
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/archiver/src/archiver/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import {
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type Tracer,
type UpDownCounter,
ValueType,
} from '@aztec/telemetry-client';

export class ArchiverInstrumentation {
public readonly tracer: Tracer;

private blockHeight: Gauge;
private blockSize: Gauge;
private syncDuration: Histogram;
Expand All @@ -24,6 +27,7 @@ export class ArchiverInstrumentation {
private log = createLogger('archiver:instrumentation');

private constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) {
this.tracer = telemetry.getTracer('Archiver');
const meter = telemetry.getMeter('Archiver');
this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, {
description: 'The height of the latest block processed by the archiver',
Expand Down
11 changes: 8 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import {
import { ProtocolContractAddress } from '@aztec/protocol-contracts';
import { GlobalVariableBuilder, type L1Publisher, SequencerClient } from '@aztec/sequencer-client';
import { PublicProcessorFactory } from '@aztec/simulator';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';
import { createValidatorClient } from '@aztec/validator-client';
import { createWorldStateSynchronizer } from '@aztec/world-state';
Expand All @@ -85,11 +85,12 @@ import { NodeMetrics } from './node_metrics.js';
/**
* The aztec node.
*/
export class AztecNodeService implements AztecNode {
export class AztecNodeService implements AztecNode, Traceable {
private packageVersion: string;

private metrics: NodeMetrics;

public readonly tracer: Tracer;

constructor(
protected config: AztecNodeConfig,
protected readonly p2pClient: P2P,
Expand All @@ -109,6 +110,7 @@ export class AztecNodeService implements AztecNode {
) {
this.packageVersion = getPackageInfo().version;
this.metrics = new NodeMetrics(telemetry, 'AztecNodeService');
this.tracer = telemetry.getTracer('AztecNodeService');

this.log.info(`Aztec Node started on chain 0x${l1ChainId.toString(16)}`, config.l1Contracts);
}
Expand Down Expand Up @@ -782,6 +784,9 @@ export class AztecNodeService implements AztecNode {
* Simulates the public part of a transaction with the current state.
* @param tx - The transaction to simulate.
**/
@trackSpan('AztecNodeService.simulatePublicCalls', (tx: Tx) => ({
[Attributes.TX_HASH]: tx.tryGetTxHash()?.toString(),
}))
public async simulatePublicCalls(tx: Tx): Promise<PublicSimulationOutput> {
const txHash = tx.getTxHash();
const blockNumber = (await this.blockSource.getBlockNumber()) + 1;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec.js/src/utils/anvil_test_watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AnvilTestWatcher {
const isAutoMining = await this.cheatcodes.isAutoMining();

if (isAutoMining) {
this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), 1000);
this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), this.logger, 1000);
this.filledRunningPromise.start();
this.logger.info(`Watcher started for rollup at ${this.rollup.address}`);
} else {
Expand Down
10 changes: 8 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { type BotConfig, BotRunner, botConfigMappings, getBotRunnerApiHandler }
import { type AztecNode, type PXE } from '@aztec/circuit-types';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
import { type TelemetryClient } from '@aztec/telemetry-client';
import {
createAndStartTelemetryClient,
getConfigEnvVars as getTelemetryClientConfig,
} from '@aztec/telemetry-client/start';

import { extractRelevantOptions } from '../util.js';

Expand All @@ -25,14 +30,15 @@ export async function startBot(
pxe = await addPXE(options, signalHandlers, services, userLog);
}

await addBot(options, signalHandlers, services, { pxe });
const telemetry = await createAndStartTelemetryClient(getTelemetryClientConfig());
await addBot(options, signalHandlers, services, { pxe, telemetry });
}

export function addBot(
options: any,
signalHandlers: (() => Promise<void>)[],
services: NamespacedApiHandlers,
deps: { pxe?: PXE; node?: AztecNode } = {},
deps: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient },
) {
const config = extractRelevantOptions<BotConfig>(options, botConfigMappings, 'bot');

Expand Down
6 changes: 3 additions & 3 deletions yarn-project/aztec/src/cli/cmds/start_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ export async function startNode(
}

const telemetryConfig = extractRelevantOptions<TelemetryClientConfig>(options, telemetryClientConfigMappings, 'tel');
const telemetryClient = await createAndStartTelemetryClient(telemetryConfig);
const telemetry = await createAndStartTelemetryClient(telemetryConfig);

// Create and start Aztec Node
const node = await createAztecNode(nodeConfig, telemetryClient);
const node = await createAztecNode(nodeConfig, telemetry);

// Add node and p2p to services list
services.node = [node, AztecNodeApiSchema];
Expand All @@ -110,6 +110,6 @@ export async function startNode(
// Add a txs bot if requested
if (options.bot) {
const { addBot } = await import('./start_bot.js');
await addBot(options, signalHandlers, services, { pxe, node });
await addBot(options, signalHandlers, services, { pxe, node, telemetry });
}
}
1 change: 1 addition & 0 deletions yarn-project/bot/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"@aztec/foundation": "workspace:^",
"@aztec/noir-contracts.js": "workspace:^",
"@aztec/protocol-contracts": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"source-map-support": "^0.5.21",
"tslib": "^2.4.0",
Expand Down
16 changes: 12 additions & 4 deletions yarn-project/bot/src/runner.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { type AztecNode, type PXE, createAztecNodeClient, createLogger } from '@aztec/aztec.js';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import { Bot } from './bot.js';
import { type BotConfig } from './config.js';
import { type BotRunnerApi } from './interface.js';

export class BotRunner implements BotRunnerApi {
export class BotRunner implements BotRunnerApi, Traceable {
private log = createLogger('bot');
private bot?: Promise<Bot>;
private pxe?: PXE;
Expand All @@ -14,13 +15,19 @@ export class BotRunner implements BotRunnerApi {
private consecutiveErrors = 0;
private healthy = true;

public constructor(private config: BotConfig, dependencies: { pxe?: PXE; node?: AztecNode }) {
public readonly tracer: Tracer;

public constructor(
private config: BotConfig,
dependencies: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient },
) {
this.tracer = dependencies.telemetry.getTracer('Bot');
this.pxe = dependencies.pxe;
if (!dependencies.node && !config.nodeUrl) {
throw new Error(`Missing node URL in config or dependencies`);
}
this.node = dependencies.node ?? createAztecNodeClient(config.nodeUrl!);
this.runningPromise = new RunningPromise(() => this.#work(), config.txIntervalSeconds * 1000);
this.runningPromise = new RunningPromise(() => this.#work(), this.log, config.txIntervalSeconds * 1000);
}

/** Initializes the bot if needed. Blocks until the bot setup is finished. */
Expand Down Expand Up @@ -126,6 +133,7 @@ export class BotRunner implements BotRunnerApi {
}
}

@trackSpan('Bot.work')
async #work() {
if (this.config.maxPendingTxs > 0) {
const pendingTxs = await this.node.getPendingTxs();
Expand All @@ -146,7 +154,7 @@ export class BotRunner implements BotRunnerApi {
}

if (!this.healthy && this.config.stopWhenUnhealthy) {
this.log.error(`Stopping bot due to errors`);
this.log.fatal(`Stopping bot due to errors`);
process.exit(1); // workaround docker not restarting the container if its unhealthy. We have to exit instead
}
}
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/bot/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
{
"path": "../protocol-contracts"
},
{
"path": "../telemetry-client"
},
{
"path": "../types"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class L2BlockStream {
startingBlock?: number;
} = {},
) {
this.runningPromise = new RunningPromise(() => this.work(), this.opts.pollIntervalMS ?? 1000);
this.runningPromise = new RunningPromise(() => this.work(), log, this.opts.pollIntervalMS ?? 1000);
}

public start() {
Expand Down
16 changes: 15 additions & 1 deletion yarn-project/foundation/src/promise/running-promise.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import { type Logger, createLogger } from '../log/pino-logger.js';
import { sleep } from '../sleep/index.js';
import { RunningPromise } from './running-promise.js';

describe('RunningPromise', () => {
let runningPromise: RunningPromise;
let counter: number;
let fn: () => Promise<void>;
let logger: Logger;

beforeEach(() => {
counter = 0;
fn = async () => {
counter++;
await sleep(100);
};
runningPromise = new RunningPromise(fn, 50);
logger = createLogger('test');
runningPromise = new RunningPromise(fn, logger, 50);
});

afterEach(async () => {
Expand Down Expand Up @@ -40,5 +43,16 @@ describe('RunningPromise', () => {
await runningPromise.trigger();
expect(counter).toEqual(2);
});

it('handles errors', async () => {
const failingFn = async () => {
await fn();
throw new Error('ouch');
};
runningPromise = new RunningPromise(failingFn, logger, 50);
runningPromise.start();
await sleep(90);
expect(counter).toEqual(1);
});
});
});
13 changes: 11 additions & 2 deletions yarn-project/foundation/src/promise/running-promise.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createLogger } from '../log/pino-logger.js';
import { InterruptibleSleep } from '../sleep/index.js';
import { type PromiseWithResolvers, promiseWithResolvers } from './utils.js';

Expand All @@ -12,7 +13,11 @@ export class RunningPromise {
private interruptibleSleep = new InterruptibleSleep();
private requested: PromiseWithResolvers<void> | undefined = undefined;

constructor(private fn: () => void | Promise<void>, private pollingIntervalMS = 10000) {}
constructor(
private fn: () => void | Promise<void>,
private logger = createLogger('running-promise'),
private pollingIntervalMS = 10000,
) {}

/**
* Starts the running promise.
Expand All @@ -23,7 +28,11 @@ export class RunningPromise {
const poll = async () => {
while (this.running) {
const hasRequested = this.requested !== undefined;
await this.fn();
try {
await this.fn();
} catch (err) {
this.logger.error('Error in running promise', err);
}

// If an immediate run had been requested *before* the function started running, resolve the request.
if (hasRequested) {
Expand Down
13 changes: 10 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
type L2Block,
type L2BlockId,
type L2BlockSource,
L2BlockStream,
type L2BlockStreamEvent,
type L2Tips,
type P2PApi,
Expand All @@ -16,7 +15,13 @@ import {
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { createLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';
import {
Attributes,
type TelemetryClient,
TraceableL2BlockStream,
WithTracer,
trackSpan,
} from '@aztec/telemetry-client';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { type ENR } from '@chainsafe/enr';
Expand Down Expand Up @@ -221,7 +226,9 @@ export class P2PClient extends WithTracer implements P2P {

this.keepAttestationsInPoolFor = keepAttestationsInPoolFor;

this.blockStream = new L2BlockStream(l2BlockSource, this, this, createLogger('p2p:block_stream'), {
const tracer = telemetry.getTracer('P2PL2BlockStream');
const logger = createLogger('p2p:l2-block-stream');
this.blockStream = new TraceableL2BlockStream(l2BlockSource, this, this, tracer, 'P2PL2BlockStream', logger, {
batchSize: blockRequestBatchSize,
pollIntervalMS: blockCheckIntervalMS,
});
Expand Down
Loading

0 comments on commit 4cc0a6d

Please sign in to comment.