Skip to content

Commit

Permalink
feat: Kickoff tube circuits at the beginning of proving job (#11139)
Browse files Browse the repository at this point in the history
Adds the option in the orchestrator to kickoff tube circuits before
going through processed txs, allowing to start proving tubes without AVM
simulation being completed.

Fixes #10998
  • Loading branch information
spalladino authored Jan 10, 2025
1 parent b609f2e commit 85d389f
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 16 deletions.
7 changes: 7 additions & 0 deletions yarn-project/circuit-types/src/interfaces/epoch-prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type BlockHeader, type Fr, type Proof } from '@aztec/circuits.js';
import { type RootRollupPublicInputs } from '@aztec/circuits.js/rollup';

import { type L2Block } from '../l2_block.js';
import { type Tx } from '../tx/tx.js';
import { type BlockBuilder } from './block-builder.js';

/** Coordinates the proving of an entire epoch. */
Expand All @@ -14,6 +15,12 @@ export interface EpochProver extends Omit<BlockBuilder, 'setBlockCompleted'> {
**/
startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number): void;

/**
* Kickstarts tube circuits for the specified txs. These will be used during epoch proving.
* Note that if the tube circuits are not started this way, they will be started nontheless after processing.
*/
startTubeCircuits(txs: Tx[]): void;

/** Pads the block with empty txs if it hasn't reached the declared number of txs. */
setBlockCompleted(blockNumber: number, expectedBlockHeader?: BlockHeader): Promise<L2Block>;

Expand Down
4 changes: 4 additions & 0 deletions yarn-project/circuits.js/src/structs/client_ivc_proof.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export class ClientIvcProof {
return new ClientIvcProof(Buffer.from(''), Buffer.from(''));
}

static fake(fill = Math.floor(Math.random() * 255)) {
return new ClientIvcProof(Buffer.alloc(1, fill), Buffer.alloc(1, fill));
}

static get schema() {
return bufferSchemaFor(ClientIvcProof);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { type MerkleTreeId, type PublicInputsAndRecursiveProof } from '@aztec/circuit-types';
import {
type MerkleTreeId,
type ProofAndVerificationKey,
type PublicInputsAndRecursiveProof,
} from '@aztec/circuit-types';
import {
ARCHIVE_HEIGHT,
AppendOnlyTreeSnapshot,
Expand All @@ -9,6 +13,7 @@ import {
MembershipWitness,
type NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH,
NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP,
type TUBE_PROOF_LENGTH,
VK_TREE_HEIGHT,
} from '@aztec/circuits.js';
import {
Expand Down Expand Up @@ -56,6 +61,9 @@ export class EpochProvingState {
private rootRollupProvingOutput: PublicInputsAndRecursiveProof<RootRollupPublicInputs> | undefined;
private provingStateLifecycle = PROVING_STATE_LIFECYCLE.PROVING_STATE_CREATED;

// Map from tx hash to tube proof promise. Used when kickstarting tube proofs before tx processing.
public readonly cachedTubeProofs = new Map<string, Promise<ProofAndVerificationKey<typeof TUBE_PROOF_LENGTH>>>();

public blocks: (BlockProvingState | undefined)[] = [];

constructor(
Expand Down
70 changes: 57 additions & 13 deletions yarn-project/prover-client/src/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import {
MerkleTreeId,
type ProcessedTx,
type ServerCircuitProver,
type Tx,
toNumBlobFields,
} from '@aztec/circuit-types';
import {
type EpochProver,
type ForkMerkleTreeOperations,
type MerkleTreeWriteOperations,
type ProofAndVerificationKey,
} from '@aztec/circuit-types/interfaces';
import { type CircuitName } from '@aztec/circuit-types/stats';
import {
Expand All @@ -26,6 +28,7 @@ import {
NUM_BASE_PARITY_PER_ROOT_PARITY,
PartialStateReference,
StateReference,
type TUBE_PROOF_LENGTH,
VerificationKeyData,
makeEmptyRecursiveProof,
} from '@aztec/circuits.js';
Expand All @@ -34,6 +37,7 @@ import {
EmptyBlockRootRollupInputs,
PrivateBaseRollupInputs,
SingleTxBlockRootRollupInputs,
TubeInputs,
} from '@aztec/circuits.js/rollup';
import { makeTuple } from '@aztec/foundation/array';
import { padArrayEnd } from '@aztec/foundation/collection';
Expand Down Expand Up @@ -267,7 +271,7 @@ export class ProvingOrchestrator implements EpochProver {
const [hints, treeSnapshots] = await this.prepareTransaction(tx, provingState);
const txProvingState = new TxProvingState(tx, hints, treeSnapshots);
const txIndex = provingState.addNewTx(txProvingState);
this.enqueueTube(provingState, txIndex);
this.getOrEnqueueTube(provingState, txIndex);
if (txProvingState.requireAvmProof) {
logger.debug(`Enqueueing public VM for tx ${txIndex}`);
this.enqueueVM(provingState, txIndex);
Expand All @@ -280,6 +284,25 @@ export class ProvingOrchestrator implements EpochProver {
}
}

/**
* Kickstarts tube circuits for the specified txs. These will be used during epoch proving.
* Note that if the tube circuits are not started this way, they will be started nontheless after processing.
*/
@trackSpan('ProvingOrchestrator.startTubeCircuits')
public startTubeCircuits(txs: Tx[]) {
if (!this.provingState?.verifyState()) {
throw new Error(`Invalid proving state, call startNewEpoch before starting tube circuits`);
}
for (const tx of txs) {
const txHash = tx.getTxHash().toString();
const tubeInputs = new TubeInputs(tx.clientIvcProof);
const tubeProof = promiseWithResolvers<ProofAndVerificationKey<typeof TUBE_PROOF_LENGTH>>();
logger.debug(`Starting tube circuit for tx ${txHash}`);
this.doEnqueueTube(txHash, tubeInputs, proof => tubeProof.resolve(proof));
this.provingState?.cachedTubeProofs.set(txHash, tubeProof.promise);
}
}

/**
* Marks the block as completed.
* Computes the block header and updates the archive tree.
Expand Down Expand Up @@ -567,37 +590,58 @@ export class ProvingOrchestrator implements EpochProver {
);
}

// Enqueues the tube circuit for a given transaction index
// Enqueues the tube circuit for a given transaction index, or reuses the one already enqueued
// Once completed, will enqueue the next circuit, either a public kernel or the base rollup
private enqueueTube(provingState: BlockProvingState, txIndex: number) {
private getOrEnqueueTube(provingState: BlockProvingState, txIndex: number) {
if (!provingState.verifyState()) {
logger.debug('Not running tube circuit, state invalid');
return;
}

const txProvingState = provingState.getTxProvingState(txIndex);
const txHash = txProvingState.processedTx.hash.toString();

const handleResult = (result: ProofAndVerificationKey<typeof TUBE_PROOF_LENGTH>) => {
logger.debug(`Got tube proof for tx index: ${txIndex}`, { txHash });
txProvingState.setTubeProof(result);
this.provingState?.cachedTubeProofs.delete(txHash);
this.checkAndEnqueueNextTxCircuit(provingState, txIndex);
};

if (this.provingState?.cachedTubeProofs.has(txHash)) {
logger.debug(`Tube proof already enqueued for tx index: ${txIndex}`, { txHash });
void this.provingState!.cachedTubeProofs.get(txHash)!.then(handleResult);
return;
}

logger.debug(`Enqueuing tube circuit for tx index: ${txIndex}`);
this.doEnqueueTube(txHash, txProvingState.getTubeInputs(), handleResult);
}

private doEnqueueTube(
txHash: string,
inputs: TubeInputs,
handler: (result: ProofAndVerificationKey<typeof TUBE_PROOF_LENGTH>) => void,
provingState: EpochProvingState | BlockProvingState = this.provingState!,
) {
if (!provingState?.verifyState()) {
logger.debug('Not running tube circuit, state invalid');
return;
}

this.deferredProving(
provingState,
wrapCallbackInSpan(
this.tracer,
'ProvingOrchestrator.prover.getTubeProof',
{
[Attributes.TX_HASH]: txProvingState.processedTx.hash.toString(),
[Attributes.TX_HASH]: txHash,
[Attributes.PROTOCOL_CIRCUIT_TYPE]: 'server',
[Attributes.PROTOCOL_CIRCUIT_NAME]: 'tube-circuit' satisfies CircuitName,
},
signal => {
const inputs = txProvingState.getTubeInputs();
return this.prover.getTubeProof(inputs, signal, provingState.epochNumber);
},
signal => this.prover.getTubeProof(inputs, signal, this.provingState!.epochNumber),
),
result => {
logger.debug(`Completed tube proof for tx index: ${txIndex}`);
txProvingState.setTubeProof(result);
this.checkAndEnqueueNextTxCircuit(provingState, txIndex);
},
handler,
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
type PublicInputsAndRecursiveProof,
type ServerCircuitProver,
type Tx,
makePublicInputsAndRecursiveProof,
} from '@aztec/circuit-types';
import {
ClientIvcProof,
Fr,
type GlobalVariables,
NESTED_RECURSIVE_PROOF_LENGTH,
Expand All @@ -18,6 +20,7 @@ import { promiseWithResolvers } from '@aztec/foundation/promise';
import { sleep } from '@aztec/foundation/sleep';
import { ProtocolCircuitVks } from '@aztec/noir-protocol-circuits-types/vks';

import { jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { TestContext } from '../mocks/test_context.js';
Expand Down Expand Up @@ -98,9 +101,11 @@ describe('prover/orchestrator', () => {
});

describe('with simulated prover', () => {
let prover: ServerCircuitProver;

beforeEach(async () => {
context = await TestContext.new(logger);
({ orchestrator, globalVariables } = context);
({ prover, orchestrator, globalVariables } = context);
});

it('waits for block to be completed before enqueueing block root proof', async () => {
Expand All @@ -119,6 +124,27 @@ describe('prover/orchestrator', () => {
const result = await orchestrator.finaliseEpoch();
expect(result.proof).toBeDefined();
});

it('can start tube proofs before adding processed txs', async () => {
const getTubeSpy = jest.spyOn(prover, 'getTubeProof');
orchestrator.startNewEpoch(1, 1, 1);
const processedTxs = [context.makeProcessedTx(1), context.makeProcessedTx(2)];
processedTxs.forEach((tx, i) => (tx.clientIvcProof = ClientIvcProof.fake(i + 1)));
const txs = processedTxs.map(tx => ({ getTxHash: () => tx.hash, clientIvcProof: tx.clientIvcProof } as Tx));
orchestrator.startTubeCircuits(txs);

await sleep(100);
expect(getTubeSpy).toHaveBeenCalledTimes(2);
getTubeSpy.mockReset();

await orchestrator.startNewBlock(globalVariables, []);
await context.setEndTreeRoots(processedTxs);
await orchestrator.addTxs(processedTxs);
await orchestrator.setBlockCompleted(context.blockNumber);
const result = await orchestrator.finaliseEpoch();
expect(result.proof).toBeDefined();
expect(getTubeSpy).toHaveBeenCalledTimes(0);
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type EpochProver, type L2Block, type ProcessedTx } from '@aztec/circuit-types';
import { type EpochProver, type L2Block, type ProcessedTx, type Tx } from '@aztec/circuit-types';
import { type BlockHeader, type Fr, type GlobalVariables, type Proof } from '@aztec/circuits.js';
import { type RootRollupPublicInputs } from '@aztec/circuits.js/rollup';

Expand All @@ -13,6 +13,9 @@ export class ServerEpochProver implements EpochProver {
this.orchestrator.startNewEpoch(epochNumber, firstBlockNumber, totalNumBlocks);
this.facade.start();
}
startTubeCircuits(txs: Tx[]): void {
this.orchestrator.startTubeCircuits(txs);
}
setBlockCompleted(blockNumber: number, expectedBlockHeader?: BlockHeader): Promise<L2Block> {
return this.orchestrator.setBlockCompleted(blockNumber, expectedBlockHeader);
}
Expand Down
1 change: 1 addition & 0 deletions yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export class EpochProvingJob implements Traceable {

try {
this.prover.startNewEpoch(epochNumber, fromBlock, epochSizeBlocks);
this.prover.startTubeCircuits(this.txs);

await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => {
this.checkState();
Expand Down

0 comments on commit 85d389f

Please sign in to comment.