From 4cc0a6d832e6ee1c3fcc6876517ed3f743f59d4b Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 13 Dec 2024 11:11:08 -0300 Subject: [PATCH] chore: Trace and handle errors in running promises (#10645) - Adds a try/catch to every iteration of a running promise, and logs any exceptions - Adds a `trackSpan` to every running promise job - Adds a `simulatePublicCalls` to the aztec node server just because --- .../archiver/src/archiver/archiver.test.ts | 4 +- .../archiver/src/archiver/archiver.ts | 21 +-- .../archiver/src/archiver/instrumentation.ts | 4 + .../aztec-node/src/aztec-node/server.ts | 11 +- .../aztec.js/src/utils/anvil_test_watcher.ts | 2 +- yarn-project/aztec/src/cli/cmds/start_bot.ts | 10 +- yarn-project/aztec/src/cli/cmds/start_node.ts | 6 +- yarn-project/bot/package.json | 1 + yarn-project/bot/src/runner.ts | 16 +- yarn-project/bot/tsconfig.json | 3 + .../l2_block_downloader/l2_block_stream.ts | 2 +- .../src/promise/running-promise.test.ts | 16 +- .../foundation/src/promise/running-promise.ts | 13 +- yarn-project/p2p/src/client/p2p_client.ts | 13 +- .../p2p/src/service/libp2p_service.ts | 10 +- yarn-project/p2p/src/service/peer_manager.ts | 5 +- .../proof-verifier/src/proof_verifier.ts | 19 ++- .../prover-client/src/mocks/test_context.ts | 2 +- .../src/prover-agent/memory-proving-queue.ts | 12 +- .../src/prover-agent/prover-agent.ts | 111 +++++++----- .../src/proving_broker/proving_agent.ts | 138 +++++++-------- .../src/proving_broker/proving_broker.ts | 13 +- yarn-project/prover-node/src/factory.ts | 4 +- .../src/monitors/claims-monitor.test.ts | 3 +- .../src/monitors/claims-monitor.ts | 16 +- .../src/monitors/epoch-monitor.test.ts | 3 +- .../prover-node/src/monitors/epoch-monitor.ts | 14 +- .../prover-node/src/prover-node.test.ts | 5 +- .../src/sequencer/sequencer.ts | 3 +- .../simulator/src/public/public_processor.ts | 161 ++++++++++-------- yarn-project/telemetry-client/package.json | 1 + .../telemetry-client/src/attributes.ts | 5 + yarn-project/telemetry-client/src/index.ts | 1 + .../telemetry-client/src/wrappers/index.ts | 1 + .../src/wrappers/l2_block_stream.ts | 37 ++++ yarn-project/telemetry-client/tsconfig.json | 3 + .../validator-client/src/validator.ts | 12 +- .../src/synchronizer/instrumentation.ts | 2 +- .../server_world_state_synchronizer.ts | 10 +- yarn-project/yarn.lock | 2 + 40 files changed, 451 insertions(+), 264 deletions(-) create mode 100644 yarn-project/telemetry-client/src/wrappers/index.ts create mode 100644 yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts diff --git a/yarn-project/archiver/src/archiver/archiver.test.ts b/yarn-project/archiver/src/archiver/archiver.test.ts index 87173270dc5..0a5943d603e 100644 --- a/yarn-project/archiver/src/archiver/archiver.test.ts +++ b/yarn-project/archiver/src/archiver/archiver.test.ts @@ -5,6 +5,7 @@ import { EthAddress } from '@aztec/foundation/eth-address'; import { Fr } from '@aztec/foundation/fields'; import { sleep } from '@aztec/foundation/sleep'; import { type InboxAbi, RollupAbi } from '@aztec/l1-artifacts'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; @@ -84,7 +85,8 @@ describe('Archiver', () => { }) as any, }); - instrumentation = mock({ isEnabled: () => true }); + const tracer = new NoopTelemetryClient().getTracer(); + instrumentation = mock({ isEnabled: () => true, tracer }); archiverStore = new MemoryArchiverStore(1000); archiver = new Archiver( diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 0c9a1c7e175..85c362642b9 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -51,7 +51,7 @@ import { PrivateFunctionBroadcastedEvent, UnconstrainedFunctionBroadcastedEvent, } from '@aztec/protocol-contracts'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import groupBy from 'lodash.groupby'; import { @@ -85,7 +85,7 @@ export type ArchiveSource = L2BlockSource & * Responsible for handling robust L1 polling so that other components do not need to * concern themselves with it. */ -export class Archiver implements ArchiveSource { +export class Archiver implements ArchiveSource, Traceable { /** * A promise in which we will be continually fetching new L2 blocks. */ @@ -99,6 +99,8 @@ export class Archiver implements ArchiveSource { public l1BlockNumber: bigint | undefined; public l1Timestamp: bigint | undefined; + public readonly tracer: Tracer; + /** * Creates a new instance of the Archiver. * @param publicClient - A client for interacting with the Ethereum node. @@ -118,6 +120,7 @@ export class Archiver implements ArchiveSource { private readonly l1constants: L1RollupConstants, private readonly log: Logger = createLogger('archiver'), ) { + this.tracer = instrumentation.tracer; this.store = new ArchiverStoreHelper(dataStore); this.rollup = getContract({ @@ -194,24 +197,14 @@ export class Archiver implements ArchiveSource { await this.sync(blockUntilSynced); } - this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs); + this.runningPromise = new RunningPromise(() => this.sync(false), this.log, this.config.pollingIntervalMs); this.runningPromise.start(); } - /** - * Syncs and catches exceptions. - */ - private async safeSync() { - try { - await this.sync(false); - } catch (error) { - this.log.error('Error syncing archiver', error); - } - } - /** * Fetches logs from L1 contracts and processes them. */ + @trackSpan('Archiver.sync', initialRun => ({ [Attributes.INITIAL_SYNC]: initialRun })) private async sync(initialRun: boolean) { /** * We keep track of three "pointers" to L1 blocks: diff --git a/yarn-project/archiver/src/archiver/instrumentation.ts b/yarn-project/archiver/src/archiver/instrumentation.ts index 5dc3ab844c5..716f2948d0a 100644 --- a/yarn-project/archiver/src/archiver/instrumentation.ts +++ b/yarn-project/archiver/src/archiver/instrumentation.ts @@ -8,11 +8,14 @@ import { type LmdbStatsCallback, Metrics, type TelemetryClient, + type Tracer, type UpDownCounter, ValueType, } from '@aztec/telemetry-client'; export class ArchiverInstrumentation { + public readonly tracer: Tracer; + private blockHeight: Gauge; private blockSize: Gauge; private syncDuration: Histogram; @@ -24,6 +27,7 @@ export class ArchiverInstrumentation { private log = createLogger('archiver:instrumentation'); private constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) { + this.tracer = telemetry.getTracer('Archiver'); const meter = telemetry.getMeter('Archiver'); this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, { description: 'The height of the latest block processed by the archiver', diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index fa7904296cc..3a2cd472738 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -74,7 +74,7 @@ import { import { ProtocolContractAddress } from '@aztec/protocol-contracts'; import { GlobalVariableBuilder, type L1Publisher, SequencerClient } from '@aztec/sequencer-client'; import { PublicProcessorFactory } from '@aztec/simulator'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { createValidatorClient } from '@aztec/validator-client'; import { createWorldStateSynchronizer } from '@aztec/world-state'; @@ -85,11 +85,12 @@ import { NodeMetrics } from './node_metrics.js'; /** * The aztec node. */ -export class AztecNodeService implements AztecNode { +export class AztecNodeService implements AztecNode, Traceable { private packageVersion: string; - private metrics: NodeMetrics; + public readonly tracer: Tracer; + constructor( protected config: AztecNodeConfig, protected readonly p2pClient: P2P, @@ -109,6 +110,7 @@ export class AztecNodeService implements AztecNode { ) { this.packageVersion = getPackageInfo().version; this.metrics = new NodeMetrics(telemetry, 'AztecNodeService'); + this.tracer = telemetry.getTracer('AztecNodeService'); this.log.info(`Aztec Node started on chain 0x${l1ChainId.toString(16)}`, config.l1Contracts); } @@ -782,6 +784,9 @@ export class AztecNodeService implements AztecNode { * Simulates the public part of a transaction with the current state. * @param tx - The transaction to simulate. **/ + @trackSpan('AztecNodeService.simulatePublicCalls', (tx: Tx) => ({ + [Attributes.TX_HASH]: tx.tryGetTxHash()?.toString(), + })) public async simulatePublicCalls(tx: Tx): Promise { const txHash = tx.getTxHash(); const blockNumber = (await this.blockSource.getBlockNumber()) + 1; diff --git a/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts b/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts index 33d32ae42fd..307d52a18f0 100644 --- a/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts +++ b/yarn-project/aztec.js/src/utils/anvil_test_watcher.ts @@ -46,7 +46,7 @@ export class AnvilTestWatcher { const isAutoMining = await this.cheatcodes.isAutoMining(); if (isAutoMining) { - this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), 1000); + this.filledRunningPromise = new RunningPromise(() => this.mineIfSlotFilled(), this.logger, 1000); this.filledRunningPromise.start(); this.logger.info(`Watcher started for rollup at ${this.rollup.address}`); } else { diff --git a/yarn-project/aztec/src/cli/cmds/start_bot.ts b/yarn-project/aztec/src/cli/cmds/start_bot.ts index 1d3e110db33..635549c189f 100644 --- a/yarn-project/aztec/src/cli/cmds/start_bot.ts +++ b/yarn-project/aztec/src/cli/cmds/start_bot.ts @@ -2,6 +2,11 @@ import { type BotConfig, BotRunner, botConfigMappings, getBotRunnerApiHandler } import { type AztecNode, type PXE } from '@aztec/circuit-types'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; +import { type TelemetryClient } from '@aztec/telemetry-client'; +import { + createAndStartTelemetryClient, + getConfigEnvVars as getTelemetryClientConfig, +} from '@aztec/telemetry-client/start'; import { extractRelevantOptions } from '../util.js'; @@ -25,14 +30,15 @@ export async function startBot( pxe = await addPXE(options, signalHandlers, services, userLog); } - await addBot(options, signalHandlers, services, { pxe }); + const telemetry = await createAndStartTelemetryClient(getTelemetryClientConfig()); + await addBot(options, signalHandlers, services, { pxe, telemetry }); } export function addBot( options: any, signalHandlers: (() => Promise)[], services: NamespacedApiHandlers, - deps: { pxe?: PXE; node?: AztecNode } = {}, + deps: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient }, ) { const config = extractRelevantOptions(options, botConfigMappings, 'bot'); diff --git a/yarn-project/aztec/src/cli/cmds/start_node.ts b/yarn-project/aztec/src/cli/cmds/start_node.ts index 2459533aa9f..1758ea35a38 100644 --- a/yarn-project/aztec/src/cli/cmds/start_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_node.ts @@ -88,10 +88,10 @@ export async function startNode( } const telemetryConfig = extractRelevantOptions(options, telemetryClientConfigMappings, 'tel'); - const telemetryClient = await createAndStartTelemetryClient(telemetryConfig); + const telemetry = await createAndStartTelemetryClient(telemetryConfig); // Create and start Aztec Node - const node = await createAztecNode(nodeConfig, telemetryClient); + const node = await createAztecNode(nodeConfig, telemetry); // Add node and p2p to services list services.node = [node, AztecNodeApiSchema]; @@ -110,6 +110,6 @@ export async function startNode( // Add a txs bot if requested if (options.bot) { const { addBot } = await import('./start_bot.js'); - await addBot(options, signalHandlers, services, { pxe, node }); + await addBot(options, signalHandlers, services, { pxe, node, telemetry }); } } diff --git a/yarn-project/bot/package.json b/yarn-project/bot/package.json index f2f8c952824..bbd80898817 100644 --- a/yarn-project/bot/package.json +++ b/yarn-project/bot/package.json @@ -64,6 +64,7 @@ "@aztec/foundation": "workspace:^", "@aztec/noir-contracts.js": "workspace:^", "@aztec/protocol-contracts": "workspace:^", + "@aztec/telemetry-client": "workspace:^", "@aztec/types": "workspace:^", "source-map-support": "^0.5.21", "tslib": "^2.4.0", diff --git a/yarn-project/bot/src/runner.ts b/yarn-project/bot/src/runner.ts index a15a1ffba85..626f8f6582c 100644 --- a/yarn-project/bot/src/runner.ts +++ b/yarn-project/bot/src/runner.ts @@ -1,11 +1,12 @@ import { type AztecNode, type PXE, createAztecNodeClient, createLogger } from '@aztec/aztec.js'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { Bot } from './bot.js'; import { type BotConfig } from './config.js'; import { type BotRunnerApi } from './interface.js'; -export class BotRunner implements BotRunnerApi { +export class BotRunner implements BotRunnerApi, Traceable { private log = createLogger('bot'); private bot?: Promise; private pxe?: PXE; @@ -14,13 +15,19 @@ export class BotRunner implements BotRunnerApi { private consecutiveErrors = 0; private healthy = true; - public constructor(private config: BotConfig, dependencies: { pxe?: PXE; node?: AztecNode }) { + public readonly tracer: Tracer; + + public constructor( + private config: BotConfig, + dependencies: { pxe?: PXE; node?: AztecNode; telemetry: TelemetryClient }, + ) { + this.tracer = dependencies.telemetry.getTracer('Bot'); this.pxe = dependencies.pxe; if (!dependencies.node && !config.nodeUrl) { throw new Error(`Missing node URL in config or dependencies`); } this.node = dependencies.node ?? createAztecNodeClient(config.nodeUrl!); - this.runningPromise = new RunningPromise(() => this.#work(), config.txIntervalSeconds * 1000); + this.runningPromise = new RunningPromise(() => this.#work(), this.log, config.txIntervalSeconds * 1000); } /** Initializes the bot if needed. Blocks until the bot setup is finished. */ @@ -126,6 +133,7 @@ export class BotRunner implements BotRunnerApi { } } + @trackSpan('Bot.work') async #work() { if (this.config.maxPendingTxs > 0) { const pendingTxs = await this.node.getPendingTxs(); @@ -146,7 +154,7 @@ export class BotRunner implements BotRunnerApi { } if (!this.healthy && this.config.stopWhenUnhealthy) { - this.log.error(`Stopping bot due to errors`); + this.log.fatal(`Stopping bot due to errors`); process.exit(1); // workaround docker not restarting the container if its unhealthy. We have to exit instead } } diff --git a/yarn-project/bot/tsconfig.json b/yarn-project/bot/tsconfig.json index eb2c5396d8a..fadf21845ae 100644 --- a/yarn-project/bot/tsconfig.json +++ b/yarn-project/bot/tsconfig.json @@ -30,6 +30,9 @@ { "path": "../protocol-contracts" }, + { + "path": "../telemetry-client" + }, { "path": "../types" } diff --git a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts index 00e81a388ff..10724c9b545 100644 --- a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts +++ b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts @@ -21,7 +21,7 @@ export class L2BlockStream { startingBlock?: number; } = {}, ) { - this.runningPromise = new RunningPromise(() => this.work(), this.opts.pollIntervalMS ?? 1000); + this.runningPromise = new RunningPromise(() => this.work(), log, this.opts.pollIntervalMS ?? 1000); } public start() { diff --git a/yarn-project/foundation/src/promise/running-promise.test.ts b/yarn-project/foundation/src/promise/running-promise.test.ts index f1ac79297b7..f3ebe10d973 100644 --- a/yarn-project/foundation/src/promise/running-promise.test.ts +++ b/yarn-project/foundation/src/promise/running-promise.test.ts @@ -1,3 +1,4 @@ +import { type Logger, createLogger } from '../log/pino-logger.js'; import { sleep } from '../sleep/index.js'; import { RunningPromise } from './running-promise.js'; @@ -5,6 +6,7 @@ describe('RunningPromise', () => { let runningPromise: RunningPromise; let counter: number; let fn: () => Promise; + let logger: Logger; beforeEach(() => { counter = 0; @@ -12,7 +14,8 @@ describe('RunningPromise', () => { counter++; await sleep(100); }; - runningPromise = new RunningPromise(fn, 50); + logger = createLogger('test'); + runningPromise = new RunningPromise(fn, logger, 50); }); afterEach(async () => { @@ -40,5 +43,16 @@ describe('RunningPromise', () => { await runningPromise.trigger(); expect(counter).toEqual(2); }); + + it('handles errors', async () => { + const failingFn = async () => { + await fn(); + throw new Error('ouch'); + }; + runningPromise = new RunningPromise(failingFn, logger, 50); + runningPromise.start(); + await sleep(90); + expect(counter).toEqual(1); + }); }); }); diff --git a/yarn-project/foundation/src/promise/running-promise.ts b/yarn-project/foundation/src/promise/running-promise.ts index 47f7d1e643d..69288fcb0ca 100644 --- a/yarn-project/foundation/src/promise/running-promise.ts +++ b/yarn-project/foundation/src/promise/running-promise.ts @@ -1,3 +1,4 @@ +import { createLogger } from '../log/pino-logger.js'; import { InterruptibleSleep } from '../sleep/index.js'; import { type PromiseWithResolvers, promiseWithResolvers } from './utils.js'; @@ -12,7 +13,11 @@ export class RunningPromise { private interruptibleSleep = new InterruptibleSleep(); private requested: PromiseWithResolvers | undefined = undefined; - constructor(private fn: () => void | Promise, private pollingIntervalMS = 10000) {} + constructor( + private fn: () => void | Promise, + private logger = createLogger('running-promise'), + private pollingIntervalMS = 10000, + ) {} /** * Starts the running promise. @@ -23,7 +28,11 @@ export class RunningPromise { const poll = async () => { while (this.running) { const hasRequested = this.requested !== undefined; - await this.fn(); + try { + await this.fn(); + } catch (err) { + this.logger.error('Error in running promise', err); + } // If an immediate run had been requested *before* the function started running, resolve the request. if (hasRequested) { diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index db1ced07f76..eda78826d7a 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -5,7 +5,6 @@ import { type L2Block, type L2BlockId, type L2BlockSource, - L2BlockStream, type L2BlockStreamEvent, type L2Tips, type P2PApi, @@ -16,7 +15,13 @@ import { import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants'; import { createLogger } from '@aztec/foundation/log'; import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store'; -import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; +import { + Attributes, + type TelemetryClient, + TraceableL2BlockStream, + WithTracer, + trackSpan, +} from '@aztec/telemetry-client'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { type ENR } from '@chainsafe/enr'; @@ -221,7 +226,9 @@ export class P2PClient extends WithTracer implements P2P { this.keepAttestationsInPoolFor = keepAttestationsInPoolFor; - this.blockStream = new L2BlockStream(l2BlockSource, this, this, createLogger('p2p:block_stream'), { + const tracer = telemetry.getTracer('P2PL2BlockStream'); + const logger = createLogger('p2p:l2-block-stream'); + this.blockStream = new TraceableL2BlockStream(l2BlockSource, this, this, tracer, 'P2PL2BlockStream', logger, { batchSize: blockRequestBatchSize, pollIntervalMS: blockCheckIntervalMS, }); diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 03bf7fc4ae6..393bae440ec 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -93,7 +93,7 @@ export class LibP2PService extends WithTracer implements P2PService { ) { super(telemetry, 'LibP2PService'); - this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger); + this.peerManager = new PeerManager(node, peerDiscoveryService, config, this.tracer, logger); this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => { return this.peerManager.getPeerScore(peerId); }; @@ -144,9 +144,11 @@ export class LibP2PService extends WithTracer implements P2PService { }); // Start running promise for peer discovery - this.discoveryRunningPromise = new RunningPromise(() => { - this.peerManager.heartbeat(); - }, this.config.peerCheckIntervalMS); + this.discoveryRunningPromise = new RunningPromise( + () => this.peerManager.heartbeat(), + this.logger, + this.config.peerCheckIntervalMS, + ); this.discoveryRunningPromise.start(); // Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 0a174d54258..e2cc4db28cc 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -1,5 +1,6 @@ import { type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; +import { type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; import { type PeerId } from '@libp2p/interface'; @@ -21,7 +22,7 @@ type CachedPeer = { dialAttempts: number; }; -export class PeerManager { +export class PeerManager implements Traceable { private cachedPeers: Map = new Map(); private peerScoring: PeerScoring; private heartbeatCounter: number = 0; @@ -30,6 +31,7 @@ export class PeerManager { private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, + public readonly tracer: Tracer, private logger = createLogger('p2p:peer-manager'), ) { this.peerScoring = new PeerScoring(config); @@ -59,6 +61,7 @@ export class PeerManager { }); } + @trackSpan('PeerManager.heartbeat') public heartbeat() { this.heartbeatCounter++; this.discover(); diff --git a/yarn-project/proof-verifier/src/proof_verifier.ts b/yarn-project/proof-verifier/src/proof_verifier.ts index f87f50612a0..11ba8bfa203 100644 --- a/yarn-project/proof-verifier/src/proof_verifier.ts +++ b/yarn-project/proof-verifier/src/proof_verifier.ts @@ -3,7 +3,16 @@ import { BBCircuitVerifier } from '@aztec/bb-prover'; import { createEthereumChain } from '@aztec/ethereum'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import { Attributes, Metrics, type TelemetryClient, type UpDownCounter, ValueType } from '@aztec/telemetry-client'; +import { + Attributes, + Metrics, + type TelemetryClient, + type Traceable, + type Tracer, + type UpDownCounter, + ValueType, + trackSpan, +} from '@aztec/telemetry-client'; import { type PublicClient, createPublicClient, http } from 'viem'; @@ -11,12 +20,14 @@ import { type ProofVerifierConfig } from './config.js'; const EXPECTED_PROOF_SIZE = 13988; -export class ProofVerifier { +export class ProofVerifier implements Traceable { private runningPromise: RunningPromise; private synchedToL1Block = 0n; private proofVerified: UpDownCounter; + public readonly tracer: Tracer; + constructor( private config: ProofVerifierConfig, private client: PublicClient, @@ -24,7 +35,8 @@ export class ProofVerifier { telemetryClient: TelemetryClient, private logger: Logger, ) { - this.runningPromise = new RunningPromise(this.work.bind(this), config.pollIntervalMs); + this.tracer = telemetryClient.getTracer('ProofVerifier'); + this.runningPromise = new RunningPromise(this.work.bind(this), this.logger, config.pollIntervalMs); this.proofVerified = telemetryClient.getMeter('ProofVerifier').createUpDownCounter(Metrics.PROOF_VERIFIER_COUNT, { valueType: ValueType.INT, description: 'The number of proofs verified by the block verifier bot', @@ -53,6 +65,7 @@ export class ProofVerifier { await this.runningPromise.stop(); } + @trackSpan('ProofVerifier.work') private async work() { const startBlock = this.synchedToL1Block + 1n; this.logger.debug(`Fetching proofs from L1 block ${startBlock}`); diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index 52181faf594..f1af29653b5 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -114,7 +114,7 @@ export class TestContext { const queue = new MemoryProvingQueue(telemetry); const orchestrator = new TestProvingOrchestrator(ws, queue, telemetry, Fr.ZERO); - const agent = new ProverAgent(localProver, proverCount); + const agent = new ProverAgent(localProver, proverCount, undefined, telemetry); queue.start(); agent.start(queue); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index dd7b05a21cf..19657111997 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -35,7 +35,7 @@ import { AbortError, TimeoutError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { PriorityMemoryQueue } from '@aztec/foundation/queue'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store.js'; import { ProvingQueueMetrics } from './queue_metrics.js'; @@ -65,6 +65,8 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource private runningPromise: RunningPromise; private metrics: ProvingQueueMetrics; + public readonly tracer: Tracer; + constructor( client: TelemetryClient, /** Timeout the job if an agent doesn't report back in this time */ @@ -75,8 +77,9 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource private timeSource = defaultTimeSource, private proofStore: ProofStore = new InlineProofStore(), ) { + this.tracer = client.getTracer('MemoryProvingQueue'); this.metrics = new ProvingQueueMetrics(client, 'MemoryProvingQueue'); - this.runningPromise = new RunningPromise(this.poll, pollingIntervalMs); + this.runningPromise = new RunningPromise(this.poll.bind(this), this.log, pollingIntervalMs); } public start() { @@ -202,7 +205,8 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource return this.jobsInProgress.has(jobId); } - private poll = () => { + @trackSpan('MemoryProvingQueue.poll') + private poll() { const now = this.timeSource(); this.metrics.recordQueueSize(this.queue.length()); @@ -220,7 +224,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource this.queue.put(job); } } - }; + } private async enqueue( type: T, diff --git a/yarn-project/prover-client/src/prover-agent/prover-agent.ts b/yarn-project/prover-client/src/prover-agent/prover-agent.ts index b2c6aebbf27..235d8264b93 100644 --- a/yarn-project/prover-client/src/prover-agent/prover-agent.ts +++ b/yarn-project/prover-client/src/prover-agent/prover-agent.ts @@ -11,26 +11,29 @@ import { import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { elapsed } from '@aztec/foundation/timer'; +import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { InlineProofStore } from '../proving_broker/proof_store.js'; const PRINT_THRESHOLD_NS = 6e10; // 60 seconds +type InFlightPromise = { + id: string; + type: ProvingRequestType; + promise: Promise; +}; + /** * A helper class that encapsulates a circuit prover and connects it to a job source. */ -export class ProverAgent implements ProverAgentApi { - private inFlightPromises = new Map< - string, - { - id: string; - type: ProvingRequestType; - promise: Promise; - } - >(); +export class ProverAgent implements ProverAgentApi, Traceable { + private inFlightPromises = new Map(); private runningPromise?: RunningPromise; private proofInputsDatabase = new InlineProofStore(); + public readonly tracer: Tracer; + constructor( /** The prover implementation to defer jobs to */ private circuitProver: ServerCircuitProver, @@ -38,8 +41,13 @@ export class ProverAgent implements ProverAgentApi { private maxConcurrency = 1, /** How long to wait between jobs */ private pollIntervalMs = 100, + /** Telemetry client */ + private telemetry: TelemetryClient = new NoopTelemetryClient(), + /** Logger */ private log = createLogger('prover-client:prover-agent'), - ) {} + ) { + this.tracer = telemetry.getTracer('ProverAgent'); + } setMaxConcurrency(maxConcurrency: number): Promise { if (maxConcurrency < 1) { @@ -74,49 +82,53 @@ export class ProverAgent implements ProverAgentApi { let lastPrint = process.hrtime.bigint(); - this.runningPromise = new RunningPromise(async () => { - for (const jobId of this.inFlightPromises.keys()) { - await jobSource.heartbeat(jobId); - } - - const now = process.hrtime.bigint(); - - if (now - lastPrint >= PRINT_THRESHOLD_NS) { - // only log if we're actually doing work - if (this.inFlightPromises.size > 0) { - const jobs = Array.from(this.inFlightPromises.values()) - .map(job => `id=${job.id},type=${ProvingRequestType[job.type]}`) - .join(' '); - this.log.info(`Agent is running with ${this.inFlightPromises.size} in-flight jobs: ${jobs}`); + this.runningPromise = new RunningPromise( + async () => { + for (const jobId of this.inFlightPromises.keys()) { + await jobSource.heartbeat(jobId); } - lastPrint = now; - } - while (this.inFlightPromises.size < this.maxConcurrency) { - try { - const job = await jobSource.getProvingJob(); - if (!job) { - // job source is fully drained, sleep for a bit and try again - return; + const now = process.hrtime.bigint(); + + if (now - lastPrint >= PRINT_THRESHOLD_NS) { + // only log if we're actually doing work + if (this.inFlightPromises.size > 0) { + const jobs = Array.from(this.inFlightPromises.values()) + .map(job => `id=${job.id},type=${ProvingRequestType[job.type]}`) + .join(' '); + this.log.info(`Agent is running with ${this.inFlightPromises.size} in-flight jobs: ${jobs}`); } + lastPrint = now; + } + while (this.inFlightPromises.size < this.maxConcurrency) { try { - const promise = this.work(jobSource, job).finally(() => this.inFlightPromises.delete(job.id)); - this.inFlightPromises.set(job.id, { - id: job.id, - type: job.type, - promise, - }); + const job = await jobSource.getProvingJob(); + if (!job) { + // job source is fully drained, sleep for a bit and try again + return; + } + + try { + const promise = this.work(jobSource, job).finally(() => this.inFlightPromises.delete(job.id)); + this.inFlightPromises.set(job.id, { + id: job.id, + type: job.type, + promise, + }); + } catch (err) { + this.log.warn( + `Error processing job! type=${ProvingRequestType[job.type]}: ${err}. ${(err as Error).stack}`, + ); + } } catch (err) { - this.log.warn( - `Error processing job! type=${ProvingRequestType[job.type]}: ${err}. ${(err as Error).stack}`, - ); + this.log.error(`Error fetching job`, err); } - } catch (err) { - this.log.error(`Error fetching job`, err); } - } - }, this.pollIntervalMs); + }, + this.log, + this.pollIntervalMs, + ); this.runningPromise.start(); this.log.info(`Agent started with concurrency=${this.maxConcurrency}`); @@ -133,9 +145,16 @@ export class ProverAgent implements ProverAgentApi { this.log.info('Agent stopped'); } + @trackSpan('ProverAgent.work', (_jobSoure, job) => ({ + [Attributes.PROVING_JOB_ID]: job.id, + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[job.type], + })) private async work(jobSource: ProvingJobSource, job: ProvingJob): Promise { try { - this.log.debug(`Picked up proving job id=${job.id} type=${ProvingRequestType[job.type]}`); + this.log.debug(`Picked up proving job ${job.id} ${ProvingRequestType[job.type]}`, { + jobId: job.id, + jobType: ProvingRequestType[job.type], + }); const type = job.type; const inputs = await this.proofInputsDatabase.getProofInput(job.inputsUri); const [time, result] = await elapsed(this.getProof(inputs)); diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.ts index 7cb29c7446a..4d6ad75165a 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.ts @@ -11,7 +11,7 @@ import { import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { Timer } from '@aztec/foundation/timer'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { type ProofStore } from './proof_store.js'; import { ProvingAgentInstrumentation } from './proving_agent_instrumentation.js'; @@ -20,12 +20,14 @@ import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_ /** * A helper class that encapsulates a circuit prover and connects it to a job source. */ -export class ProvingAgent { +export class ProvingAgent implements Traceable { private currentJobController?: ProvingJobController; private runningPromise: RunningPromise; private instrumentation: ProvingAgentInstrumentation; private idleTimer: Timer | undefined; + public readonly tracer: Tracer; + constructor( /** The source of proving jobs */ private broker: ProvingJobConsumer, @@ -41,8 +43,9 @@ export class ProvingAgent { private pollIntervalMs = 1000, private log = createLogger('prover-client:proving-agent'), ) { + this.tracer = client.getTracer('ProvingAgent'); this.instrumentation = new ProvingAgentInstrumentation(client); - this.runningPromise = new RunningPromise(this.safeWork, this.pollIntervalMs); + this.runningPromise = new RunningPromise(this.work.bind(this), this.log, this.pollIntervalMs); } public setCircuitProver(circuitProver: ServerCircuitProver): void { @@ -63,76 +66,73 @@ export class ProvingAgent { await this.runningPromise.stop(); } - private safeWork = async () => { - try { - // every tick we need to - // (1) either do a heartbeat, telling the broker that we're working - // (2) get a new job - // If during (1) the broker returns a new job that means we can cancel the current job and start the new one - let maybeJob: { job: ProvingJob; time: number } | undefined; - if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { - maybeJob = await this.broker.reportProvingJobProgress( - this.currentJobController.getJobId(), - this.currentJobController.getStartedAt(), - { allowList: this.proofAllowList }, - ); - } else { - maybeJob = await this.broker.getProvingJob({ allowList: this.proofAllowList }); - } - - if (!maybeJob) { - return; - } - - let abortedProofJobId: string | undefined; - let abortedProofName: string | undefined; - if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { - abortedProofJobId = this.currentJobController.getJobId(); - abortedProofName = this.currentJobController.getProofTypeName(); - this.currentJobController?.abort(); - } - - const { job, time } = maybeJob; - let inputs: ProvingJobInputs; - try { - inputs = await this.proofStore.getProofInput(job.inputsUri); - } catch (err) { - await this.broker.reportProvingJobError(job.id, 'Failed to load proof inputs', true); - return; - } - - this.currentJobController = new ProvingJobController( - job.id, - inputs, - time, - this.circuitProver, - this.handleJobResult, + @trackSpan('ProvingAgent.safeWork') + private async work() { + // every tick we need to + // (1) either do a heartbeat, telling the broker that we're working + // (2) get a new job + // If during (1) the broker returns a new job that means we can cancel the current job and start the new one + let maybeJob: { job: ProvingJob; time: number } | undefined; + if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { + maybeJob = await this.broker.reportProvingJobProgress( + this.currentJobController.getJobId(), + this.currentJobController.getStartedAt(), + { allowList: this.proofAllowList }, ); + } else { + maybeJob = await this.broker.getProvingJob({ allowList: this.proofAllowList }); + } + + if (!maybeJob) { + return; + } - if (abortedProofJobId) { - this.log.info( - `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( - job.inputsUri, - )}`, - ); - } else { - this.log.info( - `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( - job.inputsUri, - )}`, - ); - } - - if (this.idleTimer) { - this.instrumentation.recordIdleTime(this.idleTimer); - } - this.idleTimer = undefined; - - this.currentJobController.start(); + let abortedProofJobId: string | undefined; + let abortedProofName: string | undefined; + if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { + abortedProofJobId = this.currentJobController.getJobId(); + abortedProofName = this.currentJobController.getProofTypeName(); + this.currentJobController?.abort(); + } + + const { job, time } = maybeJob; + let inputs: ProvingJobInputs; + try { + inputs = await this.proofStore.getProofInput(job.inputsUri); } catch (err) { - this.log.error(`Error in ProvingAgent: ${String(err)}`); + await this.broker.reportProvingJobError(job.id, 'Failed to load proof inputs', true); + return; } - }; + + this.currentJobController = new ProvingJobController( + job.id, + inputs, + time, + this.circuitProver, + this.handleJobResult, + ); + + if (abortedProofJobId) { + this.log.info( + `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + job.inputsUri, + )}`, + ); + } else { + this.log.info( + `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + job.inputsUri, + )}`, + ); + } + + if (this.idleTimer) { + this.instrumentation.recordIdleTime(this.idleTimer); + } + this.idleTimer = undefined; + + this.currentJobController.start(); + } handleJobResult = async ( jobId: ProvingJobId, diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 3487f83e7e8..b23b05a5573 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -14,7 +14,7 @@ import { createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { PriorityMemoryQueue } from '@aztec/foundation/queue'; import { Timer } from '@aztec/foundation/timer'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import assert from 'assert'; @@ -41,7 +41,7 @@ type EnqueuedProvingJob = Pick; * A broker that manages proof requests and distributes them to workers based on their priority. * It takes a backend that is responsible for storing and retrieving proof requests and results. */ -export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { +export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Traceable { private queues: ProvingQueues = { [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), @@ -87,6 +87,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { private maxRetries: number; private instrumentation: ProvingBrokerInstrumentation; + public readonly tracer: Tracer; private maxParallelCleanUps: number; @@ -115,8 +116,9 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { }: ProofRequestBrokerConfig = {}, private logger = createLogger('prover-client:proving-broker'), ) { + this.tracer = client.getTracer('ProvingBroker'); this.instrumentation = new ProvingBrokerInstrumentation(client); - this.cleanupPromise = new RunningPromise(this.cleanupPass, timeoutIntervalMs); + this.cleanupPromise = new RunningPromise(this.cleanupPass.bind(this), this.logger, timeoutIntervalMs); this.jobTimeoutMs = jobTimeoutMs; this.maxRetries = maxRetries; this.maxEpochsToKeepResultsFor = maxEpochsToKeepResultsFor; @@ -399,10 +401,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.instrumentation.incResolvedJobs(item.type); } - private cleanupPass = async () => { + @trackSpan('ProvingBroker.cleanupPass') + private async cleanupPass() { await this.cleanupStaleJobs(); await this.reEnqueueExpiredJobs(); - }; + } private async cleanupStaleJobs() { const jobIds = Array.from(this.jobsCache.keys()); diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 57429290be7..7a4ec700721 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -71,8 +71,8 @@ export async function createProverNode( maxParallelBlocksPerEpoch: config.proverNodeMaxParallelBlocksPerEpoch, }; - const claimsMonitor = new ClaimsMonitor(publisher, proverNodeConfig); - const epochMonitor = new EpochMonitor(archiver, proverNodeConfig); + const claimsMonitor = new ClaimsMonitor(publisher, telemetry, proverNodeConfig); + const epochMonitor = new EpochMonitor(archiver, telemetry, proverNodeConfig); const rollupContract = publisher.getRollupContract(); const walletClient = publisher.getClient(); diff --git a/yarn-project/prover-node/src/monitors/claims-monitor.test.ts b/yarn-project/prover-node/src/monitors/claims-monitor.test.ts index 610284e3caa..0771faaf161 100644 --- a/yarn-project/prover-node/src/monitors/claims-monitor.test.ts +++ b/yarn-project/prover-node/src/monitors/claims-monitor.test.ts @@ -2,6 +2,7 @@ import { type EpochProofClaim } from '@aztec/circuit-types'; import { EthAddress } from '@aztec/circuits.js'; import { sleep } from '@aztec/foundation/sleep'; import { type L1Publisher } from '@aztec/sequencer-client'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { type MockProxy, mock } from 'jest-mock-extended'; @@ -21,7 +22,7 @@ describe('ClaimsMonitor', () => { publisherAddress = EthAddress.random(); l1Publisher.getSenderAddress.mockReturnValue(publisherAddress); - claimsMonitor = new ClaimsMonitor(l1Publisher, { pollingIntervalMs: 10 }); + claimsMonitor = new ClaimsMonitor(l1Publisher, new NoopTelemetryClient(), { pollingIntervalMs: 10 }); }); afterEach(async () => { diff --git a/yarn-project/prover-node/src/monitors/claims-monitor.ts b/yarn-project/prover-node/src/monitors/claims-monitor.ts index 8e2781f86c0..9aecb95b639 100644 --- a/yarn-project/prover-node/src/monitors/claims-monitor.ts +++ b/yarn-project/prover-node/src/monitors/claims-monitor.ts @@ -3,20 +3,28 @@ import { type EthAddress } from '@aztec/circuits.js'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { type L1Publisher } from '@aztec/sequencer-client'; +import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; export interface ClaimsMonitorHandler { handleClaim(proofClaim: EpochProofClaim): Promise; } -export class ClaimsMonitor { +export class ClaimsMonitor implements Traceable { private runningPromise: RunningPromise; private log = createLogger('prover-node:claims-monitor'); private handler: ClaimsMonitorHandler | undefined; private lastClaimEpochNumber: bigint | undefined; - constructor(private readonly l1Publisher: L1Publisher, private options: { pollingIntervalMs: number }) { - this.runningPromise = new RunningPromise(this.work.bind(this), this.options.pollingIntervalMs); + public readonly tracer: Tracer; + + constructor( + private readonly l1Publisher: L1Publisher, + telemetry: TelemetryClient, + private options: { pollingIntervalMs: number }, + ) { + this.tracer = telemetry.getTracer('ClaimsMonitor'); + this.runningPromise = new RunningPromise(this.work.bind(this), this.log, this.options.pollingIntervalMs); } public start(handler: ClaimsMonitorHandler) { @@ -31,9 +39,11 @@ export class ClaimsMonitor { this.log.info('Stopped ClaimsMonitor'); } + @trackSpan('ClaimsMonitor.work') public async work() { const proofClaim = await this.l1Publisher.getProofClaim(); if (!proofClaim) { + this.log.trace(`Found no proof claim`); return; } diff --git a/yarn-project/prover-node/src/monitors/epoch-monitor.test.ts b/yarn-project/prover-node/src/monitors/epoch-monitor.test.ts index f7a0d7dc406..033cc74d1b3 100644 --- a/yarn-project/prover-node/src/monitors/epoch-monitor.test.ts +++ b/yarn-project/prover-node/src/monitors/epoch-monitor.test.ts @@ -1,5 +1,6 @@ import { type L2BlockSource } from '@aztec/circuit-types'; import { sleep } from '@aztec/foundation/sleep'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { type MockProxy, mock } from 'jest-mock-extended'; @@ -20,7 +21,7 @@ describe('EpochMonitor', () => { }, }); - epochMonitor = new EpochMonitor(l2BlockSource, { pollingIntervalMs: 10 }); + epochMonitor = new EpochMonitor(l2BlockSource, new NoopTelemetryClient(), { pollingIntervalMs: 10 }); }); afterEach(async () => { diff --git a/yarn-project/prover-node/src/monitors/epoch-monitor.ts b/yarn-project/prover-node/src/monitors/epoch-monitor.ts index 332923ddbab..503429cf853 100644 --- a/yarn-project/prover-node/src/monitors/epoch-monitor.ts +++ b/yarn-project/prover-node/src/monitors/epoch-monitor.ts @@ -1,22 +1,29 @@ import { type L2BlockSource } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; export interface EpochMonitorHandler { handleInitialEpochSync(epochNumber: bigint): Promise; handleEpochCompleted(epochNumber: bigint): Promise; } -export class EpochMonitor { +export class EpochMonitor implements Traceable { private runningPromise: RunningPromise; private log = createLogger('prover-node:epoch-monitor'); + public readonly tracer: Tracer; private handler: EpochMonitorHandler | undefined; private latestEpochNumber: bigint | undefined; - constructor(private readonly l2BlockSource: L2BlockSource, private options: { pollingIntervalMs: number }) { - this.runningPromise = new RunningPromise(this.work.bind(this), this.options.pollingIntervalMs); + constructor( + private readonly l2BlockSource: L2BlockSource, + telemetry: TelemetryClient, + private options: { pollingIntervalMs: number }, + ) { + this.tracer = telemetry.getTracer('EpochMonitor'); + this.runningPromise = new RunningPromise(this.work.bind(this), this.log, this.options.pollingIntervalMs); } public start(handler: EpochMonitorHandler) { @@ -30,6 +37,7 @@ export class EpochMonitor { this.log.info('Stopped EpochMonitor'); } + @trackSpan('EpochMonitor.work') public async work() { if (!this.latestEpochNumber) { const epochNumber = await this.l2BlockSource.getL2EpochNumber(); diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 98714c2e1d0..a674c6f4068 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -249,8 +249,9 @@ describe('prover-node', () => { let lastEpochComplete: bigint = 0n; beforeEach(() => { - claimsMonitor = new ClaimsMonitor(publisher, config); - epochMonitor = new EpochMonitor(l2BlockSource, config); + const telemetry = new NoopTelemetryClient(); + claimsMonitor = new ClaimsMonitor(publisher, telemetry, config); + epochMonitor = new EpochMonitor(l2BlockSource, telemetry, config); l2BlockSource.isEpochComplete.mockImplementation(epochNumber => Promise.resolve(epochNumber <= lastEpochComplete), diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index d13515124d3..4db869b72d3 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -192,7 +192,7 @@ export class Sequencer { * Starts the sequencer and moves to IDLE state. */ public start() { - this.runningPromise = new RunningPromise(this.work.bind(this), this.pollingIntervalMs); + this.runningPromise = new RunningPromise(this.work.bind(this), this.log, this.pollingIntervalMs); this.setState(SequencerState.IDLE, 0n, true /** force */); this.runningPromise.start(); this.log.info(`Sequencer started with address ${this.publisher.getSenderAddress().toString()}`); @@ -339,6 +339,7 @@ export class Sequencer { this.setState(SequencerState.IDLE, 0n); } + @trackSpan('Sequencer.work') protected async work() { try { await this.doRealWork(); diff --git a/yarn-project/simulator/src/public/public_processor.ts b/yarn-project/simulator/src/public/public_processor.ts index 0462bb66239..83f0e0a8fcb 100644 --- a/yarn-project/simulator/src/public/public_processor.ts +++ b/yarn-project/simulator/src/public/public_processor.ts @@ -26,7 +26,7 @@ import { padArrayEnd } from '@aztec/foundation/collection'; import { createLogger } from '@aztec/foundation/log'; import { Timer } from '@aztec/foundation/timer'; import { ContractClassRegisteredEvent, ProtocolContractAddress } from '@aztec/protocol-contracts'; -import { Attributes, type TelemetryClient, type Tracer, trackSpan } from '@aztec/telemetry-client'; +import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import { computeFeePayerBalanceLeafSlot, computeFeePayerBalanceStorageSlot } from './fee_payment.js'; import { WorldStateDB } from './public_db_sources.js'; @@ -76,7 +76,7 @@ export class PublicProcessorFactory { * Converts Txs lifted from the P2P module into ProcessedTx objects by executing * any public function calls in them. Txs with private calls only are unaffected. */ -export class PublicProcessor { +export class PublicProcessor implements Traceable { private metrics: PublicProcessorMetrics; constructor( protected db: MerkleTreeWriteOperations, @@ -118,80 +118,9 @@ export class PublicProcessor { break; } try { - const [processedTx, returnValues] = !tx.hasPublicCalls() - ? await this.processPrivateOnlyTx(tx) - : await this.processTxWithPublicCalls(tx); - - this.log.verbose( - !tx.hasPublicCalls() - ? `Processed tx ${processedTx.hash} with no public calls` - : `Processed tx ${processedTx.hash} with ${tx.enqueuedPublicFunctionCalls.length} public calls`, - { - txHash: processedTx.hash, - txFee: processedTx.txEffect.transactionFee.toBigInt(), - revertCode: processedTx.txEffect.revertCode.getCode(), - revertReason: processedTx.revertReason, - gasUsed: processedTx.gasUsed, - publicDataWriteCount: processedTx.txEffect.publicDataWrites.length, - nullifierCount: processedTx.txEffect.nullifiers.length, - noteHashCount: processedTx.txEffect.noteHashes.length, - contractClassLogCount: processedTx.txEffect.contractClassLogs.getTotalLogCount(), - unencryptedLogCount: processedTx.txEffect.unencryptedLogs.getTotalLogCount(), - privateLogCount: processedTx.txEffect.privateLogs.length, - l2ToL1MessageCount: processedTx.txEffect.l2ToL1Msgs.length, - }, - ); - - // Commit the state updates from this transaction - await this.worldStateDB.commit(); - - // Re-validate the transaction - if (txValidator) { - // Only accept processed transactions that are not double-spends, - // public functions emitting nullifiers would pass earlier check but fail here. - // Note that we're checking all nullifiers generated in the private execution twice, - // we could store the ones already checked and skip them here as an optimization. - const [_, invalid] = await txValidator.validateTxs([processedTx]); - if (invalid.length) { - throw new Error(`Transaction ${invalid[0].hash} invalid after processing public functions`); - } - } - // if we were given a handler then send the transaction to it for block building or proving - if (processedTxHandler) { - await processedTxHandler.addNewTx(processedTx); - } - // Update the state so that the next tx in the loop has the correct .startState - // NB: before this change, all .startStates were actually incorrect, but the issue was never caught because we either: - // a) had only 1 tx with public calls per block, so this loop had len 1 - // b) always had a txHandler with the same db passed to it as this.db, which updated the db in buildBaseRollupHints in this loop - // To see how this ^ happens, move back to one shared db in test_context and run orchestrator_multi_public_functions.test.ts - // The below is taken from buildBaseRollupHints: - await this.db.appendLeaves( - MerkleTreeId.NOTE_HASH_TREE, - padArrayEnd(processedTx.txEffect.noteHashes, Fr.ZERO, MAX_NOTE_HASHES_PER_TX), - ); - try { - await this.db.batchInsert( - MerkleTreeId.NULLIFIER_TREE, - padArrayEnd(processedTx.txEffect.nullifiers, Fr.ZERO, MAX_NULLIFIERS_PER_TX).map(n => n.toBuffer()), - NULLIFIER_SUBTREE_HEIGHT, - ); - } catch (error) { - if (txValidator) { - // Ideally the validator has already caught this above, but just in case: - throw new Error(`Transaction ${processedTx.hash} invalid after processing public functions`); - } else { - // We have no validator and assume this call should blindly process txs with duplicates being caught later - this.log.warn(`Detected duplicate nullifier after public processing for: ${processedTx.hash}.`); - } - } - - await this.db.sequentialInsert( - MerkleTreeId.PUBLIC_DATA_TREE, - processedTx.txEffect.publicDataWrites.map(x => x.toBuffer()), - ); + const [processedTx, returnValues] = await this.processTx(tx, processedTxHandler, txValidator); result.push(processedTx); - returns = returns.concat(returnValues ?? []); + returns = returns.concat(returnValues); } catch (err: any) { const errorMessage = err instanceof Error ? err.message : 'Unknown error'; this.log.warn(`Failed to process tx ${tx.getTxHash()}: ${errorMessage} ${err?.stack}`); @@ -207,6 +136,88 @@ export class PublicProcessor { return [result, failed, returns]; } + @trackSpan('PublicProcessor.processTx', tx => ({ [Attributes.TX_HASH]: tx.tryGetTxHash()?.toString() })) + private async processTx( + tx: Tx, + processedTxHandler?: ProcessedTxHandler, + txValidator?: TxValidator, + ): Promise<[ProcessedTx, NestedProcessReturnValues[]]> { + const [processedTx, returnValues] = !tx.hasPublicCalls() + ? await this.processPrivateOnlyTx(tx) + : await this.processTxWithPublicCalls(tx); + + this.log.verbose( + !tx.hasPublicCalls() + ? `Processed tx ${processedTx.hash} with no public calls` + : `Processed tx ${processedTx.hash} with ${tx.enqueuedPublicFunctionCalls.length} public calls`, + { + txHash: processedTx.hash, + txFee: processedTx.txEffect.transactionFee.toBigInt(), + revertCode: processedTx.txEffect.revertCode.getCode(), + revertReason: processedTx.revertReason, + gasUsed: processedTx.gasUsed, + publicDataWriteCount: processedTx.txEffect.publicDataWrites.length, + nullifierCount: processedTx.txEffect.nullifiers.length, + noteHashCount: processedTx.txEffect.noteHashes.length, + contractClassLogCount: processedTx.txEffect.contractClassLogs.getTotalLogCount(), + unencryptedLogCount: processedTx.txEffect.unencryptedLogs.getTotalLogCount(), + privateLogCount: processedTx.txEffect.privateLogs.length, + l2ToL1MessageCount: processedTx.txEffect.l2ToL1Msgs.length, + }, + ); + + // Commit the state updates from this transaction + await this.worldStateDB.commit(); + + // Re-validate the transaction + if (txValidator) { + // Only accept processed transactions that are not double-spends, + // public functions emitting nullifiers would pass earlier check but fail here. + // Note that we're checking all nullifiers generated in the private execution twice, + // we could store the ones already checked and skip them here as an optimization. + const [_, invalid] = await txValidator.validateTxs([processedTx]); + if (invalid.length) { + throw new Error(`Transaction ${invalid[0].hash} invalid after processing public functions`); + } + } + // if we were given a handler then send the transaction to it for block building or proving + if (processedTxHandler) { + await processedTxHandler.addNewTx(processedTx); + } + // Update the state so that the next tx in the loop has the correct .startState + // NB: before this change, all .startStates were actually incorrect, but the issue was never caught because we either: + // a) had only 1 tx with public calls per block, so this loop had len 1 + // b) always had a txHandler with the same db passed to it as this.db, which updated the db in buildBaseRollupHints in this loop + // To see how this ^ happens, move back to one shared db in test_context and run orchestrator_multi_public_functions.test.ts + // The below is taken from buildBaseRollupHints: + await this.db.appendLeaves( + MerkleTreeId.NOTE_HASH_TREE, + padArrayEnd(processedTx.txEffect.noteHashes, Fr.ZERO, MAX_NOTE_HASHES_PER_TX), + ); + try { + await this.db.batchInsert( + MerkleTreeId.NULLIFIER_TREE, + padArrayEnd(processedTx.txEffect.nullifiers, Fr.ZERO, MAX_NULLIFIERS_PER_TX).map(n => n.toBuffer()), + NULLIFIER_SUBTREE_HEIGHT, + ); + } catch (error) { + if (txValidator) { + // Ideally the validator has already caught this above, but just in case: + throw new Error(`Transaction ${processedTx.hash} invalid after processing public functions`); + } else { + // We have no validator and assume this call should blindly process txs with duplicates being caught later + this.log.warn(`Detected duplicate nullifier after public processing for: ${processedTx.hash}.`); + } + } + + await this.db.sequentialInsert( + MerkleTreeId.PUBLIC_DATA_TREE, + processedTx.txEffect.publicDataWrites.map(x => x.toBuffer()), + ); + + return [processedTx, returnValues ?? []]; + } + /** * Creates the public data write for paying the tx fee. * This is used in private only txs, since for txs with public calls diff --git a/yarn-project/telemetry-client/package.json b/yarn-project/telemetry-client/package.json index 15ef986d543..fccd13f4fa3 100644 --- a/yarn-project/telemetry-client/package.json +++ b/yarn-project/telemetry-client/package.json @@ -27,6 +27,7 @@ "!*.test.*" ], "dependencies": { + "@aztec/circuit-types": "workspace:^", "@aztec/foundation": "workspace:^", "@opentelemetry/api": "^1.9.0", "@opentelemetry/api-logs": "^0.55.0", diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 1fcfdbcf926..8ca97e5ba9b 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -72,6 +72,8 @@ export const L1_SENDER = 'aztec.l1.sender'; export const TX_PHASE_NAME = 'aztec.tx.phase_name'; /** The proving job type */ export const PROVING_JOB_TYPE = 'aztec.proving.job_type'; +/** The proving job id */ +export const PROVING_JOB_ID = 'aztec.proving.job_id'; export const MERKLE_TREE_NAME = 'aztec.merkle_tree.name'; /** The prover-id in a root rollup proof. */ @@ -89,6 +91,9 @@ export const TARGET_ADDRESS = 'aztec.address.target'; export const SENDER_ADDRESS = 'aztec.address.sender'; export const MANA_USED = 'aztec.mana.used'; +/** Whether a sync process is the initial run, which is usually slower than iterative ones. */ +export const INITIAL_SYNC = 'aztec.initial_sync'; + /** Identifier for the tables in a world state DB */ export const WS_DB_DATA_TYPE = 'aztec.world_state.db_type'; diff --git a/yarn-project/telemetry-client/src/index.ts b/yarn-project/telemetry-client/src/index.ts index ce7d17939bf..acd6b5363b4 100644 --- a/yarn-project/telemetry-client/src/index.ts +++ b/yarn-project/telemetry-client/src/index.ts @@ -3,3 +3,4 @@ export * from './histogram_utils.js'; export * from './with_tracer.js'; export * from './prom_otel_adapter.js'; export * from './lmdb_metrics.js'; +export * from './wrappers/index.js'; diff --git a/yarn-project/telemetry-client/src/wrappers/index.ts b/yarn-project/telemetry-client/src/wrappers/index.ts new file mode 100644 index 00000000000..3a6efb1f42e --- /dev/null +++ b/yarn-project/telemetry-client/src/wrappers/index.ts @@ -0,0 +1 @@ +export * from './l2_block_stream.js'; diff --git a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts new file mode 100644 index 00000000000..e09cce9df36 --- /dev/null +++ b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts @@ -0,0 +1,37 @@ +import { + type L2BlockSource, + L2BlockStream, + type L2BlockStreamEventHandler, + type L2BlockStreamLocalDataProvider, +} from '@aztec/circuit-types'; +import { createLogger } from '@aztec/foundation/log'; +import { type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; + +/** Extends an L2BlockStream with a tracer to create a new trace per iteration. */ +export class TraceableL2BlockStream extends L2BlockStream implements Traceable { + constructor( + l2BlockSource: Pick, + localData: L2BlockStreamLocalDataProvider, + handler: L2BlockStreamEventHandler, + public readonly tracer: Tracer, + private readonly name: string = 'L2BlockStream', + log = createLogger('types:block_stream'), + opts: { + proven?: boolean; + pollIntervalMS?: number; + batchSize?: number; + startingBlock?: number; + } = {}, + ) { + super(l2BlockSource, localData, handler, log, opts); + } + + // We need to use a non-arrow function to be able to access `this` + // See https://www.typescriptlang.org/docs/handbook/2/functions.html#declaring-this-in-a-function + @trackSpan(function () { + return `${this!.name}.work`; + }) + override work() { + return super.work(); + } +} diff --git a/yarn-project/telemetry-client/tsconfig.json b/yarn-project/telemetry-client/tsconfig.json index 63f8ab3e9f7..904b812a155 100644 --- a/yarn-project/telemetry-client/tsconfig.json +++ b/yarn-project/telemetry-client/tsconfig.json @@ -6,6 +6,9 @@ "tsBuildInfoFile": ".tsbuildinfo" }, "references": [ + { + "path": "../circuit-types" + }, { "path": "../foundation" } diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 6e2890334f8..42e673058b9 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -86,9 +86,15 @@ export class ValidatorClient extends WithTracer implements Validator { this.validationService = new ValidationService(keyStore); // Refresh epoch cache every second to trigger commiteeChanged event - this.epochCacheUpdateLoop = new RunningPromise(async () => { - await this.epochCache.getCommittee().catch(err => log.error('Error updating validator committee', err)); - }, 1000); + this.epochCacheUpdateLoop = new RunningPromise( + () => + this.epochCache + .getCommittee() + .then(() => {}) + .catch(err => log.error('Error updating validator committee', err)), + log, + 1000, + ); // Listen to commiteeChanged event to alert operator when their validator has entered the committee this.epochCache.on('committeeChanged', (newCommittee, epochNumber) => { diff --git a/yarn-project/world-state/src/synchronizer/instrumentation.ts b/yarn-project/world-state/src/synchronizer/instrumentation.ts index 93fc76823ce..a57ae79fe86 100644 --- a/yarn-project/world-state/src/synchronizer/instrumentation.ts +++ b/yarn-project/world-state/src/synchronizer/instrumentation.ts @@ -15,7 +15,7 @@ export class WorldStateInstrumentation { private dbNumItems: Gauge; private dbUsedSize: Gauge; - constructor(telemetry: TelemetryClient, private log = createLogger('world-state:instrumentation')) { + constructor(public readonly telemetry: TelemetryClient, private log = createLogger('world-state:instrumentation')) { const meter = telemetry.getMeter('World State'); this.dbMapSize = meter.createGauge(`aztec.world_state.db_map_size`, { description: `The current configured map size for each merkle tree`, diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index 901dcb96229..c43ce6e8bde 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -3,7 +3,7 @@ import { type L2Block, type L2BlockId, type L2BlockSource, - L2BlockStream, + type L2BlockStream, type L2BlockStreamEvent, type L2BlockStreamEventHandler, type L2BlockStreamLocalDataProvider, @@ -23,7 +23,7 @@ import { createLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; import { elapsed } from '@aztec/foundation/timer'; import { SHA256Trunc } from '@aztec/merkle-tree'; -import { type TelemetryClient } from '@aztec/telemetry-client'; +import { type TelemetryClient, TraceableL2BlockStream } from '@aztec/telemetry-client'; import { type WorldStateStatusFull } from '../native/message.js'; import { type MerkleTreeAdminDatabase } from '../world-state-db/merkle_tree_db.js'; @@ -110,8 +110,10 @@ export class ServerWorldStateSynchronizer return this.syncPromise.promise; } - protected createBlockStream() { - return new L2BlockStream(this.l2BlockSource, this, this, createLogger('world_state:block_stream'), { + protected createBlockStream(): L2BlockStream { + const tracer = this.instrumentation.telemetry.getTracer('WorldStateL2BlockStream'); + const logger = createLogger('world-state:block_stream'); + return new TraceableL2BlockStream(this.l2BlockSource, this, this, tracer, 'WorldStateL2BlockStream', logger, { proven: this.config.worldStateProvenBlocksOnly, pollIntervalMS: this.config.worldStateBlockCheckIntervalMS, batchSize: this.config.worldStateBlockRequestBatchSize, diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 9a83f7b8135..2d5cab8dbf8 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -343,6 +343,7 @@ __metadata: "@aztec/foundation": "workspace:^" "@aztec/noir-contracts.js": "workspace:^" "@aztec/protocol-contracts": "workspace:^" + "@aztec/telemetry-client": "workspace:^" "@aztec/types": "workspace:^" "@jest/globals": ^29.5.0 "@types/jest": ^29.5.0 @@ -1237,6 +1238,7 @@ __metadata: version: 0.0.0-use.local resolution: "@aztec/telemetry-client@workspace:telemetry-client" dependencies: + "@aztec/circuit-types": "workspace:^" "@aztec/foundation": "workspace:^" "@jest/globals": ^29.5.0 "@opentelemetry/api": ^1.9.0