From ecc037f31fc2a5a02484762fdf90302059b34502 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 11 Dec 2024 20:31:30 +0000 Subject: [PATCH] fix: memory leak in the broker (#10567) This PR fixes a memory leak in the prover broker by cleaning up jobs after their result is saved by the orchestrator. The orchestrator then does clean up on its own after the epoch is finished. --- .../src/interfaces/prover-broker.ts | 10 +- .../src/interfaces/proving-job-source.test.ts | 1 + .../src/interfaces/proving-job.ts | 2 +- .../src/prover-agent/memory-proving-queue.ts | 3 +- .../caching_broker_facade.test.ts | 54 ++- .../proving_broker/caching_broker_facade.ts | 42 ++- .../src/proving_broker/proving_agent.test.ts | 2 +- .../src/proving_broker/proving_broker.test.ts | 346 ++++++++++++------ .../src/proving_broker/proving_broker.ts | 187 +++++++--- .../prover-client/src/proving_broker/rpc.ts | 3 +- .../prover-client/src/test/mock_prover.ts | 7 +- 11 files changed, 483 insertions(+), 174 deletions(-) diff --git a/yarn-project/circuit-types/src/interfaces/prover-broker.ts b/yarn-project/circuit-types/src/interfaces/prover-broker.ts index 5f11be3347e..fb2fdadef67 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-broker.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-broker.ts @@ -56,10 +56,16 @@ export interface ProvingJobProducer { enqueueProvingJob(job: ProvingJob): Promise; /** - * Cancels a proving job and clears all of its + * Cancels a proving job. * @param id - The ID of the job to cancel */ - removeAndCancelProvingJob(id: ProvingJobId): Promise; + cancelProvingJob(id: ProvingJobId): Promise; + + /** + * Cleans up after a job has completed. Throws if the job is in-progress + * @param id - The ID of the job to cancel + */ + cleanUpProvingJobState(id: ProvingJobId): Promise; /** * Returns the current status fof the proving job diff --git a/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts b/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts index 57b7d2192be..e372b6bef03 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts @@ -70,6 +70,7 @@ class MockProvingJobSource implements ProvingJobSource { id: 'a-job-id', type: ProvingRequestType.PRIVATE_BASE_ROLLUP, inputsUri: 'inputs-uri' as ProofUri, + epochNumber: 1, }); } heartbeat(jobId: string): Promise { diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index f2013799dac..8c2b44b98a0 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -241,7 +241,7 @@ export type ProvingJobId = z.infer; export const ProvingJob = z.object({ id: ProvingJobId, type: z.nativeEnum(ProvingRequestType), - blockNumber: z.number().optional(), + epochNumber: z.number(), inputsUri: ProofUri, }); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index c98aed86da3..dd7b05a21cf 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -120,6 +120,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource id: job.id, type: job.type, inputsUri: job.inputsUri, + epochNumber: job.epochNumber, }; } catch (err) { if (err instanceof TimeoutError) { @@ -244,7 +245,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource reject, attempts: 1, heartbeat: 0, - epochNumber, + epochNumber: epochNumber ?? 0, }; if (signal) { diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts index f4782e092ac..a72918ade09 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts +++ b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts @@ -24,7 +24,8 @@ describe('CachingBrokerFacade', () => { broker = mock({ enqueueProvingJob: jest.fn(), getProvingJobStatus: jest.fn(), - removeAndCancelProvingJob: jest.fn(), + cancelProvingJob: jest.fn(), + cleanUpProvingJobState: jest.fn(), waitForJobToSettle: jest.fn(), }); cache = new InMemoryProverCache(); @@ -101,4 +102,55 @@ describe('CachingBrokerFacade', () => { await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result); expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once }); + + it('clears broker state after a job resolves', async () => { + const { promise, resolve } = promiseWithResolvers(); + broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); + broker.waitForJobToSettle.mockResolvedValue(promise); + + const inputs = makeBaseParityInputs(); + void facade.getBaseParityProof(inputs); + await jest.advanceTimersToNextTimerAsync(); + + const job = broker.enqueueProvingJob.mock.calls[0][0]; + const result = makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFakeHonk(), + ); + const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); + resolve({ + status: 'fulfilled', + value: outputUri, + }); + + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); + }); + + it('clears broker state after a job is canceled', async () => { + const { promise, resolve } = promiseWithResolvers(); + const catchSpy = jest.fn(); + broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); + broker.waitForJobToSettle.mockResolvedValue(promise); + + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy); + await jest.advanceTimersToNextTimerAsync(); + + expect(broker.cancelProvingJob).not.toHaveBeenCalled(); + controller.abort(); + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cancelProvingJob).toHaveBeenCalled(); + + resolve({ + status: 'rejected', + reason: 'Aborted', + }); + + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); + expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted')); + }); }); diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts index a2ead87ecaa..82c59e7ea0f 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts @@ -59,6 +59,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { id: ProvingJobId, type: T, inputs: ProvingJobInputsMap[T], + epochNumber = 0, signal?: AbortSignal, ): Promise { // first try the cache @@ -95,6 +96,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { id, type, inputsUri, + epochNumber, }); await this.cache.setProvingJobStatus(id, { status: 'in-queue' }); } catch (err) { @@ -107,7 +109,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { // notify broker of cancelled job const abortFn = async () => { signal?.removeEventListener('abort', abortFn); - await this.broker.removeAndCancelProvingJob(id); + await this.broker.cancelProvingJob(id); }; signal?.addEventListener('abort', abortFn); @@ -147,18 +149,21 @@ export class CachingBrokerFacade implements ServerCircuitProver { } } finally { signal?.removeEventListener('abort', abortFn); + // we've saved the result in our cache. We can tell the broker to clear its state + await this.broker.cleanUpProvingJobState(id); } } getAvmProof( inputs: AvmCircuitInputs, signal?: AbortSignal, - _blockNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PUBLIC_VM, inputs), ProvingRequestType.PUBLIC_VM, inputs, + epochNumber, signal, ); } @@ -166,12 +171,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBaseParityProof( inputs: BaseParityInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BASE_PARITY, inputs), ProvingRequestType.BASE_PARITY, inputs, + epochNumber, signal, ); } @@ -179,12 +185,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBlockMergeRollupProof( input: BlockMergeRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input), ProvingRequestType.BLOCK_MERGE_ROLLUP, input, + epochNumber, signal, ); } @@ -192,12 +199,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBlockRootRollupProof( input: BlockRootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input), ProvingRequestType.BLOCK_ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -205,12 +213,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getEmptyBlockRootRollupProof( input: EmptyBlockRootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input), ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -218,12 +227,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getEmptyPrivateKernelProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs), ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, + epochNumber, signal, ); } @@ -231,24 +241,26 @@ export class CachingBrokerFacade implements ServerCircuitProver { getMergeRollupProof( input: MergeRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.MERGE_ROLLUP, input), ProvingRequestType.MERGE_ROLLUP, input, + epochNumber, signal, ); } getPrivateBaseRollupProof( baseRollupInput: PrivateBaseRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput), ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, + epochNumber, signal, ); } @@ -256,12 +268,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getPublicBaseRollupProof( inputs: PublicBaseRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs), ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, + epochNumber, signal, ); } @@ -269,12 +282,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getRootParityProof( inputs: RootParityInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.ROOT_PARITY, inputs), ProvingRequestType.ROOT_PARITY, inputs, + epochNumber, signal, ); } @@ -282,12 +296,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getRootRollupProof( input: RootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.ROOT_ROLLUP, input), ProvingRequestType.ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -295,12 +310,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getTubeProof( tubeInput: TubeInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput), ProvingRequestType.TUBE_PROOF, tubeInput, + epochNumber, signal, ); } diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts index 5a33598a31d..405d16d5c1e 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts @@ -239,7 +239,7 @@ describe('ProvingAgent', () => { const inputs: ProvingJobInputs = { type: ProvingRequestType.BASE_PARITY, inputs: makeBaseParityInputs() }; const job: ProvingJob = { id: randomBytes(8).toString('hex') as ProvingJobId, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: randomBytes(8).toString('hex') as ProofUri, }; diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 304d30a3b37..454e840543f 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,5 +1,6 @@ import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; +import { sleep } from '@aztec/foundation/sleep'; import { openTmpStore } from '@aztec/kv-store/lmdb'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; @@ -10,10 +11,6 @@ import { type ProvingBrokerDatabase } from './proving_broker_database.js'; import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js'; import { KVBrokerDatabase } from './proving_broker_database/persisted.js'; -beforeAll(() => { - jest.useFakeTimers(); -}); - describe.each([ () => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }), () => { @@ -24,21 +21,23 @@ describe.each([ }, ])('ProvingBroker', createDb => { let broker: ProvingBroker; + let brokerIntervalMs: number; let jobTimeoutMs: number; let maxRetries: number; let database: ProvingBrokerDatabase; let cleanup: undefined | (() => Promise | void); - const now = () => Math.floor(Date.now() / 1000); + const now = () => Date.now(); beforeEach(() => { - jobTimeoutMs = 10_000; + jobTimeoutMs = 100; maxRetries = 2; + brokerIntervalMs = jobTimeoutMs / 4; ({ database, cleanup } = createDb()); broker = new ProvingBroker(database, new NoopTelemetryClient(), { jobTimeoutMs, - timeoutIntervalMs: jobTimeoutMs / 4, + timeoutIntervalMs: brokerIntervalMs, maxRetries, }); }); @@ -62,7 +61,7 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); @@ -71,7 +70,7 @@ describe.each([ const id2 = makeProvingJobId(); await broker.enqueueProvingJob({ id: id2, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, inputsUri: makeInputsUri(), }); @@ -82,7 +81,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -95,14 +94,14 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); await expect( broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }), @@ -118,37 +117,36 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); - - await broker.removeAndCancelProvingJob(id); + await assertJobStatus(id, 'in-queue'); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); }); it('cancels jobs in-progress', async () => { const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); + await assertJobStatus(id, 'in-queue'); await broker.getProvingJob(); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-progress' }); - await broker.removeAndCancelProvingJob(id); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + await assertJobStatus(id, 'in-progress'); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); }); it('returns job result if successful', async () => { const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -164,7 +162,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -195,21 +193,21 @@ describe.each([ const provingJob1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const provingJob2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; const provingJob3: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 3, + epochNumber: 3, inputsUri: makeInputsUri(), }; @@ -224,7 +222,7 @@ describe.each([ await broker.enqueueProvingJob({ id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -238,7 +236,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -246,7 +244,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -254,7 +252,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -262,7 +260,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -274,7 +272,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -282,7 +280,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -290,7 +288,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -298,7 +296,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -315,7 +313,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -323,7 +321,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -331,7 +329,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -339,7 +337,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -351,19 +349,19 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.getProvingJob(); await assertJobStatus(id, 'in-progress'); - await broker.removeAndCancelProvingJob(id); - await assertJobStatus(id, 'not-found'); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); const id2 = makeProvingJobId(); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect( @@ -376,14 +374,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -397,7 +395,7 @@ describe.each([ expect(firstAgentJob).toEqual(job1); await assertJobStatus(job1.id, 'in-progress'); - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); await expect( broker.reportProvingJobProgress(job1.id, firstAgentStartedAt, { allowList: [ProvingRequestType.BASE_PARITY], @@ -407,8 +405,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(10_000); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -444,14 +442,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -468,8 +466,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(10_000); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -499,14 +497,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -519,8 +517,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(100 * jobTimeoutMs); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -545,13 +543,13 @@ describe.each([ await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -572,13 +570,13 @@ describe.each([ await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -618,7 +616,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -632,7 +630,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -641,18 +639,54 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await jest.advanceTimersByTimeAsync(jobTimeoutMs); + await sleep(jobTimeoutMs + brokerIntervalMs); // should be back in the queue now await assertJobStatus(id, 'in-queue'); }); + it('cancel stale jobs that time out', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + epochNumber: 1, + inputsUri: makeInputsUri(), + }); + + await assertJobStatus(id, 'in-queue'); + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // advance time so job times out because of no heartbeats + await sleep(jobTimeoutMs + brokerIntervalMs); + + // should be back in the queue now + await assertJobStatus(id, 'in-queue'); + + // another agent picks it up + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // epoch has advances + await broker.enqueueProvingJob({ + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + epochNumber: 10, + inputsUri: makeInputsUri(), + }); + + // advance time again so job times out. This time it should be rejected + await sleep(jobTimeoutMs + brokerIntervalMs); + await assertJobStatus(id, 'rejected'); + }); + it('keeps the jobs in progress while it is alive', async () => { const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -662,7 +696,7 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance the time slightly, not enough for the request to timeout - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); await assertJobStatus(id, 'in-progress'); @@ -670,13 +704,13 @@ describe.each([ await broker.reportProvingJobProgress(id, time); // advance the time again - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); // should still be our request to process await assertJobStatus(id, 'in-progress'); // advance the time again and lose the request - await jest.advanceTimersByTimeAsync(jobTimeoutMs); + await sleep(jobTimeoutMs); await assertJobStatus(id, 'in-queue'); }); }); @@ -686,27 +720,18 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; await broker.enqueueProvingJob(provingJob); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-queue', - }); + await assertJobStatus(provingJob.id, 'in-queue'); await expect(broker.getProvingJob()).resolves.toEqual({ job: provingJob, time: expect.any(Number) }); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-progress', - }); + await assertJobStatus(provingJob.id, 'in-progress'); await broker.reportProvingJobError(provingJob.id, 'test error', true); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-queue', - }); + await assertJobStatus(provingJob.id, 'in-queue'); }); it('retries up to a maximum number of times', async () => { @@ -714,7 +739,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -736,7 +761,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -748,6 +773,41 @@ describe.each([ reason: 'test error', }); }); + + it('does not retry if job is stale', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + epochNumber: 1, + inputsUri: makeInputsUri(), + }); + + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + await broker.reportProvingJobError(id, 'test error', true); + // gets retried once + await assertJobStatus(id, 'in-queue'); + + // pick up the job again + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // advance the epoch height + await broker.enqueueProvingJob({ + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + epochNumber: 3, + inputsUri: makeInputsUri(), + }); + + await broker.reportProvingJobError(id, 'test error', true); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ + status: 'rejected', + reason: 'test error', + }); + }); }); describe('Database management', () => { @@ -761,7 +821,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -769,7 +829,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -782,7 +842,7 @@ describe.each([ job: { id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: expect.any(String), }, time: expect.any(Number), @@ -792,7 +852,7 @@ describe.each([ job: { id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: expect.any(String), }, time: expect.any(Number), @@ -812,7 +872,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -820,7 +880,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -846,7 +906,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await database.setProvingJobResult(id1, makeOutputsUri()); @@ -855,7 +915,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -872,7 +932,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await database.setProvingJobResult(id1, makeOutputsUri()); @@ -881,7 +941,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -892,11 +952,15 @@ describe.each([ jest.spyOn(database, 'deleteProvingJobAndResult'); - await broker.removeAndCancelProvingJob(id1); - await broker.removeAndCancelProvingJob(id2); + await broker.cleanUpProvingJobState(id1); + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(1, id1); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); + await broker.cancelProvingJob(id2); + await broker.cleanUpProvingJobState(id2); + await sleep(brokerIntervalMs); + + expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(2, id2); await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' }); await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' }); @@ -909,7 +973,7 @@ describe.each([ const job: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -928,7 +992,7 @@ describe.each([ broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }), ).rejects.toThrow(new Error('db error')); @@ -941,7 +1005,7 @@ describe.each([ const job: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; jest.spyOn(database, 'setProvingJobResult'); @@ -960,7 +1024,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobSuccess(id, makeOutputsUri())).rejects.toThrow(new Error('db error')); @@ -976,7 +1040,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -993,7 +1057,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobError(id, 'test error')).rejects.toThrow(new Error('db error')); @@ -1025,6 +1089,82 @@ describe.each([ expect(database.setProvingJobError).not.toHaveBeenCalled(); expect(database.addProvingJob).not.toHaveBeenCalled(); }); + + it('cleans up old jobs periodically', async () => { + await broker.start(); + jest.spyOn(database, 'deleteProvingJobAndResult'); + const id1 = 'epoch1' as ProvingJobId; // makeProvingJobId(); // epoch 1 + const id2 = 'epoch2' as ProvingJobId; //makeProvingJobId(); // 2 + const id3 = 'epoch3' as ProvingJobId; //makeProvingJobId(); // 3 + const id4 = 'epoch4' as ProvingJobId; //makeProvingJobId(); // 4 + const id5 = 'epoch5' as ProvingJobId; //makeProvingJobId(); // 4 + + await sleep(10); + await broker.enqueueProvingJob({ + id: id1, + epochNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + await broker.reportProvingJobSuccess(id1, '' as ProofUri); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id2, + epochNumber: 2, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + await broker.reportProvingJobSuccess(id2, '' as ProofUri); + + // nothing got cleaned up yet. The broker first needs to advance to the next epoch + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalled(); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id3, + epochNumber: 3, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // we got a job for epoch 3, we can clean up jobs from epoch 1 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id2); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id4, + epochNumber: 4, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // once we advance to epoch 4 we can clean up finished jobs for epoch 2 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id5, + epochNumber: 5, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // advancing to epoch 5 does not automatically clean up unfinished jobs for epoch 3 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id3); + + await broker.cancelProvingJob(id3); // now job 3 is settled (aborted) + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id3); // and we can clean it up + + await broker.cancelProvingJob(id4); + await broker.cancelProvingJob(id5); + }); }); async function assertJobStatus(id: ProvingJobId, status: string) { diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index dfa605838a8..3487f83e7e8 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -9,6 +9,7 @@ import { type ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { PriorityMemoryQueue } from '@aztec/foundation/queue'; @@ -30,29 +31,33 @@ type ProofRequestBrokerConfig = { timeoutIntervalMs?: number; jobTimeoutMs?: number; maxRetries?: number; + maxEpochsToKeepResultsFor?: number; + maxParallelCleanUps?: number; }; +type EnqueuedProvingJob = Pick; + /** * A broker that manages proof requests and distributes them to workers based on their priority. * It takes a backend that is responsible for storing and retrieving proof requests and results. */ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { private queues: ProvingQueues = { - [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PRIVATE_KERNEL_EMPTY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PRIVATE_KERNEL_EMPTY]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PRIVATE_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PUBLIC_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PRIVATE_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PUBLIC_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BLOCK_MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BLOCK_MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BASE_PARITY]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.ROOT_PARITY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BASE_PARITY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_PARITY]: new PriorityMemoryQueue(provingJobComparator), }; // holds a copy of the database in memory in order to quickly fulfill requests @@ -76,23 +81,46 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { // a map of promises that will be resolved when a job is settled private promises = new Map>(); - private timeoutPromise: RunningPromise; - private timeSource = () => Math.floor(Date.now() / 1000); + private cleanupPromise: RunningPromise; + private msTimeSource = () => Date.now(); private jobTimeoutMs: number; private maxRetries: number; private instrumentation: ProvingBrokerInstrumentation; + private maxParallelCleanUps: number; + + /** + * The broker keeps track of the highest epoch its seen. + * This information is used for garbage collection: once it reaches the next epoch, it can start pruning the database of old state. + * This clean up pass is only done against _settled_ jobs. This pass will not cancel jobs that are in-progress or in-queue. + * It is a client responsibility to cancel jobs if they are no longer necessary. + * Example: + * proving epoch 11 - the broker will wipe all setlled jobs for epochs 9 and lower + * finished proving epoch 11 and got first job for epoch 12 -> the broker will wipe all setlled jobs for epochs 10 and lower + * reorged back to end of epoch 10 -> epoch 11 is skipped and epoch 12 starts -> the broker will wipe all setlled jobs for epochs 10 and lower + */ + private epochHeight = 0; + private maxEpochsToKeepResultsFor = 1; + public constructor( private database: ProvingBrokerDatabase, client: TelemetryClient, - { jobTimeoutMs = 30_000, timeoutIntervalMs = 10_000, maxRetries = 3 }: ProofRequestBrokerConfig = {}, + { + jobTimeoutMs = 30_000, + timeoutIntervalMs = 10_000, + maxRetries = 3, + maxEpochsToKeepResultsFor = 1, + maxParallelCleanUps = 20, + }: ProofRequestBrokerConfig = {}, private logger = createLogger('prover-client:proving-broker'), ) { this.instrumentation = new ProvingBrokerInstrumentation(client); - this.timeoutPromise = new RunningPromise(this.timeoutCheck, timeoutIntervalMs); + this.cleanupPromise = new RunningPromise(this.cleanupPass, timeoutIntervalMs); this.jobTimeoutMs = jobTimeoutMs; this.maxRetries = maxRetries; + this.maxEpochsToKeepResultsFor = maxEpochsToKeepResultsFor; + this.maxParallelCleanUps = maxParallelCleanUps; } private measureQueueDepth: MonitorCallback = (type: ProvingRequestType) => { @@ -127,7 +155,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { } } - this.timeoutPromise.start(); + this.cleanupPromise.start(); this.instrumentation.monitorQueueDepth(this.measureQueueDepth); this.instrumentation.monitorActiveJobs(this.countActiveJobs); @@ -135,8 +163,8 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return Promise.resolve(); } - public stop(): Promise { - return this.timeoutPromise.stop(); + public async stop(): Promise { + await this.cleanupPromise.stop(); } public async enqueueProvingJob(job: ProvingJob): Promise { @@ -159,15 +187,22 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return promiseWithResolvers.promise; } - public async removeAndCancelProvingJob(id: ProvingJobId): Promise { - this.logger.info(`Cancelling job id=${id}`); - await this.database.deleteProvingJobAndResult(id); - + public async cancelProvingJob(id: ProvingJobId): Promise { // notify listeners of the cancellation if (!this.resultsCache.has(id)) { - this.promises.get(id)?.resolve({ status: 'rejected', reason: 'Aborted' }); + this.logger.info(`Cancelling job id=${id}`); + await this.reportProvingJobError(id, 'Aborted', false); + } + } + + public async cleanUpProvingJobState(id: ProvingJobId): Promise { + if (!this.resultsCache.has(id)) { + this.logger.warn(`Can't cleanup busy proving job: id=${id}`); + return; } + this.logger.debug(`Cleaning up state for job id=${id}`); + await this.database.deleteProvingJobAndResult(id); this.jobsCache.delete(id); this.promises.delete(id); this.resultsCache.delete(id); @@ -204,14 +239,15 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { for (const proofType of allowedProofs) { const queue = this.queues[proofType]; - let job: ProvingJob | undefined; + let enqueuedJob: EnqueuedProvingJob | undefined; // exhaust the queue and make sure we're not sending a job that's already in progress // or has already been completed // this can happen if the broker crashes and restarts // it's possible agents will report progress or results for jobs that are in the queue (after the restart) - while ((job = queue.getImmediate())) { - if (!this.inProgress.has(job.id) && !this.resultsCache.has(job.id)) { - const time = this.timeSource(); + while ((enqueuedJob = queue.getImmediate())) { + const job = this.jobsCache.get(enqueuedJob.id); + if (job && !this.inProgress.has(enqueuedJob.id) && !this.resultsCache.has(enqueuedJob.id)) { + const time = this.msTimeSource(); this.inProgress.set(job.id, { id: job.id, startedAt: time, @@ -246,7 +282,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.delete(id); } - if (retry && retries + 1 < this.maxRetries) { + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} already is already settled, ignoring error`); + return; + } + + if (retry && retries + 1 < this.maxRetries && !this.isJobStale(item)) { this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); this.retries.set(id, retries + 1); this.enqueueJobInternal(item); @@ -254,8 +295,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return; } - this.logger.debug( - `Marking proving job id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1} as failed`, + this.logger.warn( + `Marking proving job as failed id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${ + retries + 1 + } err=${err}`, ); await this.database.setProvingJobError(id, err); @@ -265,8 +308,8 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.promises.get(id)!.resolve(result); this.instrumentation.incRejectedJobs(item.type); if (info) { - const duration = this.timeSource() - info.startedAt; - this.instrumentation.recordJobDuration(item.type, duration * 1000); + const duration = this.msTimeSource() - info.startedAt; + this.instrumentation.recordJobDuration(item.type, duration); } } @@ -281,8 +324,13 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} has already been completed`); + return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); + } + const metadata = this.inProgress.get(id); - const now = this.timeSource(); + const now = this.msTimeSource(); if (!metadata) { this.logger.warn( `Proving job id=${id} type=${ProvingRequestType[job.type]} not found in the in-progress cache, adding it`, @@ -293,7 +341,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.set(id, { id, startedAt, - lastUpdatedAt: this.timeSource(), + lastUpdatedAt: this.msTimeSource(), }); return Promise.resolve(undefined); } else if (startedAt <= metadata.startedAt) { @@ -334,6 +382,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.delete(id); } + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} already settled, ignoring result`); + return; + } + this.logger.debug( `Proving job complete id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1}`, ); @@ -346,7 +399,31 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.instrumentation.incResolvedJobs(item.type); } - private timeoutCheck = () => { + private cleanupPass = async () => { + await this.cleanupStaleJobs(); + await this.reEnqueueExpiredJobs(); + }; + + private async cleanupStaleJobs() { + const jobIds = Array.from(this.jobsCache.keys()); + const jobsToClean: ProvingJobId[] = []; + for (const id of jobIds) { + const job = this.jobsCache.get(id)!; + const isComplete = this.resultsCache.has(id); + if (isComplete && this.isJobStale(job)) { + jobsToClean.push(id); + } + } + + if (jobsToClean.length > 0) { + this.logger.info(`Cleaning up [${jobsToClean.join(',')}]`); + await asyncPool(this.maxParallelCleanUps, jobsToClean, async jobId => { + await this.cleanUpProvingJobState(jobId); + }); + } + } + + private async reEnqueueExpiredJobs() { const inProgressEntries = Array.from(this.inProgress.entries()); for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); @@ -356,28 +433,42 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { continue; } - const msSinceLastUpdate = (this.timeSource() - metadata.lastUpdatedAt) * 1000; + const now = this.msTimeSource(); + const msSinceLastUpdate = now - metadata.lastUpdatedAt; if (msSinceLastUpdate >= this.jobTimeoutMs) { - this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); - this.inProgress.delete(id); - this.enqueueJobInternal(item); - this.instrumentation.incTimedOutJobs(item.type); + if (this.isJobStale(item)) { + // the job has timed out and it's also old, just cancel and move on + await this.cancelProvingJob(item.id); + } else { + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); + this.inProgress.delete(id); + this.enqueueJobInternal(item); + this.instrumentation.incTimedOutJobs(item.type); + } } } - }; + } private enqueueJobInternal(job: ProvingJob): void { if (!this.promises.has(job.id)) { this.promises.set(job.id, promiseWithResolvers()); } - this.queues[job.type].put(job); + this.queues[job.type].put({ + epochNumber: job.epochNumber, + id: job.id, + }); this.enqueuedAt.set(job.id, new Timer()); + this.epochHeight = Math.max(this.epochHeight, job.epochNumber); this.logger.debug(`Enqueued new proving job id=${job.id}`); } + + private isJobStale(job: ProvingJob) { + return job.epochNumber < this.epochHeight - this.maxEpochsToKeepResultsFor; + } } type ProvingQueues = { - [K in ProvingRequestType]: PriorityMemoryQueue; + [K in ProvingRequestType]: PriorityMemoryQueue; }; /** @@ -386,12 +477,10 @@ type ProvingQueues = { * @param b - Another proving job * @returns A number indicating the relative priority of the two proving jobs */ -function provingJobComparator(a: ProvingJob, b: ProvingJob): -1 | 0 | 1 { - const aBlockNumber = a.blockNumber ?? 0; - const bBlockNumber = b.blockNumber ?? 0; - if (aBlockNumber < bBlockNumber) { +function provingJobComparator(a: EnqueuedProvingJob, b: EnqueuedProvingJob): -1 | 0 | 1 { + if (a.epochNumber < b.epochNumber) { return -1; - } else if (aBlockNumber > bBlockNumber) { + } else if (a.epochNumber > b.epochNumber) { return 1; } else { return 0; diff --git a/yarn-project/prover-client/src/proving_broker/rpc.ts b/yarn-project/prover-client/src/proving_broker/rpc.ts index 9895e7937dc..0d63dcebeac 100644 --- a/yarn-project/prover-client/src/proving_broker/rpc.ts +++ b/yarn-project/prover-client/src/proving_broker/rpc.ts @@ -28,7 +28,8 @@ const GetProvingJobResponse = z.object({ export const ProvingJobProducerSchema: ApiSchemaFor = { enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()), getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus), - removeAndCancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), + cleanUpProvingJobState: z.function().args(ProvingJobId).returns(z.void()), + cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult), }; diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index 30a26cd7838..a4ab77fc24d 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -82,8 +82,11 @@ export class TestBroker implements ProvingJobProducer { getProvingJobStatus(id: ProvingJobId): Promise { return this.broker.getProvingJobStatus(id); } - removeAndCancelProvingJob(id: ProvingJobId): Promise { - return this.broker.removeAndCancelProvingJob(id); + cleanUpProvingJobState(id: ProvingJobId): Promise { + return this.broker.cleanUpProvingJobState(id); + } + cancelProvingJob(id: string): Promise { + return this.broker.cancelProvingJob(id); } waitForJobToSettle(id: ProvingJobId): Promise { return this.broker.waitForJobToSettle(id);