From 5933730056dc32b4d9a2a5a55b27032b04775d75 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Tue, 10 Dec 2024 12:07:57 +0000 Subject: [PATCH 1/3] fix: memory leak in the broker --- .../src/interfaces/prover-broker.ts | 10 +++- yarn-project/ethereum/package.json | 2 +- .../caching_broker_facade.test.ts | 54 ++++++++++++++++++- .../proving_broker/caching_broker_facade.ts | 4 +- .../src/proving_broker/proving_broker.test.ts | 40 ++++++-------- .../src/proving_broker/proving_broker.ts | 26 ++++++--- .../prover-client/src/proving_broker/rpc.ts | 3 +- .../prover-client/src/test/mock_prover.ts | 7 ++- 8 files changed, 107 insertions(+), 39 deletions(-) diff --git a/yarn-project/circuit-types/src/interfaces/prover-broker.ts b/yarn-project/circuit-types/src/interfaces/prover-broker.ts index 5f11be3347e..fb2fdadef67 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-broker.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-broker.ts @@ -56,10 +56,16 @@ export interface ProvingJobProducer { enqueueProvingJob(job: ProvingJob): Promise; /** - * Cancels a proving job and clears all of its + * Cancels a proving job. * @param id - The ID of the job to cancel */ - removeAndCancelProvingJob(id: ProvingJobId): Promise; + cancelProvingJob(id: ProvingJobId): Promise; + + /** + * Cleans up after a job has completed. Throws if the job is in-progress + * @param id - The ID of the job to cancel + */ + cleanUpProvingJobState(id: ProvingJobId): Promise; /** * Returns the current status fof the proving job diff --git a/yarn-project/ethereum/package.json b/yarn-project/ethereum/package.json index 0e34d8dcbf1..3300f21bba8 100644 --- a/yarn-project/ethereum/package.json +++ b/yarn-project/ethereum/package.json @@ -91,4 +91,4 @@ "engines": { "node": ">=18" } -} \ No newline at end of file +} diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts index f4782e092ac..a72918ade09 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts +++ b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts @@ -24,7 +24,8 @@ describe('CachingBrokerFacade', () => { broker = mock({ enqueueProvingJob: jest.fn(), getProvingJobStatus: jest.fn(), - removeAndCancelProvingJob: jest.fn(), + cancelProvingJob: jest.fn(), + cleanUpProvingJobState: jest.fn(), waitForJobToSettle: jest.fn(), }); cache = new InMemoryProverCache(); @@ -101,4 +102,55 @@ describe('CachingBrokerFacade', () => { await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result); expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once }); + + it('clears broker state after a job resolves', async () => { + const { promise, resolve } = promiseWithResolvers(); + broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); + broker.waitForJobToSettle.mockResolvedValue(promise); + + const inputs = makeBaseParityInputs(); + void facade.getBaseParityProof(inputs); + await jest.advanceTimersToNextTimerAsync(); + + const job = broker.enqueueProvingJob.mock.calls[0][0]; + const result = makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFakeHonk(), + ); + const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); + resolve({ + status: 'fulfilled', + value: outputUri, + }); + + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); + }); + + it('clears broker state after a job is canceled', async () => { + const { promise, resolve } = promiseWithResolvers(); + const catchSpy = jest.fn(); + broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); + broker.waitForJobToSettle.mockResolvedValue(promise); + + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy); + await jest.advanceTimersToNextTimerAsync(); + + expect(broker.cancelProvingJob).not.toHaveBeenCalled(); + controller.abort(); + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cancelProvingJob).toHaveBeenCalled(); + + resolve({ + status: 'rejected', + reason: 'Aborted', + }); + + await jest.advanceTimersToNextTimerAsync(); + expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); + expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted')); + }); }); diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts index a2ead87ecaa..73d15b082c7 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts @@ -107,7 +107,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { // notify broker of cancelled job const abortFn = async () => { signal?.removeEventListener('abort', abortFn); - await this.broker.removeAndCancelProvingJob(id); + await this.broker.cancelProvingJob(id); }; signal?.addEventListener('abort', abortFn); @@ -147,6 +147,8 @@ export class CachingBrokerFacade implements ServerCircuitProver { } } finally { signal?.removeEventListener('abort', abortFn); + // we've saved the result in our cache. We can tell the broker to clear its state + await this.broker.cleanUpProvingJobState(id); } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 304d30a3b37..ce488e2085e 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -122,11 +122,10 @@ describe.each([ type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); - - await broker.removeAndCancelProvingJob(id); + await assertJobStatus(id, 'in-queue'); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); }); it('cancels jobs in-progress', async () => { @@ -137,11 +136,11 @@ describe.each([ type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); + await assertJobStatus(id, 'in-queue'); await broker.getProvingJob(); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-progress' }); - await broker.removeAndCancelProvingJob(id); - await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + await assertJobStatus(id, 'in-progress'); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); }); it('returns job result if successful', async () => { @@ -356,8 +355,8 @@ describe.each([ }); await broker.getProvingJob(); await assertJobStatus(id, 'in-progress'); - await broker.removeAndCancelProvingJob(id); - await assertJobStatus(id, 'not-found'); + await broker.cancelProvingJob(id); + await assertJobStatus(id, 'rejected'); const id2 = makeProvingJobId(); await broker.enqueueProvingJob({ @@ -691,22 +690,13 @@ describe.each([ }; await broker.enqueueProvingJob(provingJob); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-queue', - }); + await assertJobStatus(provingJob.id, 'in-queue'); await expect(broker.getProvingJob()).resolves.toEqual({ job: provingJob, time: expect.any(Number) }); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-progress', - }); + await assertJobStatus(provingJob.id, 'in-progress'); await broker.reportProvingJobError(provingJob.id, 'test error', true); - - await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ - status: 'in-queue', - }); + await assertJobStatus(provingJob.id, 'in-queue'); }); it('retries up to a maximum number of times', async () => { @@ -892,8 +882,10 @@ describe.each([ jest.spyOn(database, 'deleteProvingJobAndResult'); - await broker.removeAndCancelProvingJob(id1); - await broker.removeAndCancelProvingJob(id2); + await broker.cleanUpProvingJobState(id1); + + await broker.cancelProvingJob(id2); + await broker.cleanUpProvingJobState(id2); expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index dfa605838a8..2aaef89cd38 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -159,15 +159,20 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return promiseWithResolvers.promise; } - public async removeAndCancelProvingJob(id: ProvingJobId): Promise { - this.logger.info(`Cancelling job id=${id}`); - await this.database.deleteProvingJobAndResult(id); - + public async cancelProvingJob(id: ProvingJobId): Promise { // notify listeners of the cancellation if (!this.resultsCache.has(id)) { - this.promises.get(id)?.resolve({ status: 'rejected', reason: 'Aborted' }); + this.logger.info(`Cancelling job id=${id}`); + await this.reportProvingJobError(id, 'Aborted', false); + } + } + + public async cleanUpProvingJobState(id: ProvingJobId): Promise { + if (!this.resultsCache.has(id)) { + throw new Error(`Can't cancel busy proving job: id=${id}`); } + await this.database.deleteProvingJobAndResult(id); this.jobsCache.delete(id); this.promises.delete(id); this.resultsCache.delete(id); @@ -254,8 +259,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return; } - this.logger.debug( - `Marking proving job id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1} as failed`, + this.logger.warn( + `Marking proving job as failed id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${ + retries + 1 + } err=${err}`, ); await this.database.setProvingJobError(id, err); @@ -281,6 +288,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} has already been completed`); + return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); + } + const metadata = this.inProgress.get(id); const now = this.timeSource(); if (!metadata) { diff --git a/yarn-project/prover-client/src/proving_broker/rpc.ts b/yarn-project/prover-client/src/proving_broker/rpc.ts index 9895e7937dc..0d63dcebeac 100644 --- a/yarn-project/prover-client/src/proving_broker/rpc.ts +++ b/yarn-project/prover-client/src/proving_broker/rpc.ts @@ -28,7 +28,8 @@ const GetProvingJobResponse = z.object({ export const ProvingJobProducerSchema: ApiSchemaFor = { enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()), getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus), - removeAndCancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), + cleanUpProvingJobState: z.function().args(ProvingJobId).returns(z.void()), + cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult), }; diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index 30a26cd7838..a4ab77fc24d 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -82,8 +82,11 @@ export class TestBroker implements ProvingJobProducer { getProvingJobStatus(id: ProvingJobId): Promise { return this.broker.getProvingJobStatus(id); } - removeAndCancelProvingJob(id: ProvingJobId): Promise { - return this.broker.removeAndCancelProvingJob(id); + cleanUpProvingJobState(id: ProvingJobId): Promise { + return this.broker.cleanUpProvingJobState(id); + } + cancelProvingJob(id: string): Promise { + return this.broker.cancelProvingJob(id); } waitForJobToSettle(id: ProvingJobId): Promise { return this.broker.waitForJobToSettle(id); From 27adf5fbf401db01c4679d6ba6d660725377fb7f Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 11 Dec 2024 09:46:22 +0000 Subject: [PATCH 2/3] feat: broker auto-cleans stale jobs --- .../src/interfaces/proving-job-source.test.ts | 1 + .../src/interfaces/proving-job.ts | 2 +- .../src/prover-agent/memory-proving-queue.ts | 3 +- .../proving_broker/caching_broker_facade.ts | 38 ++- .../src/proving_broker/proving_agent.test.ts | 2 +- .../src/proving_broker/proving_broker.test.ts | 235 ++++++++++++------ .../src/proving_broker/proving_broker.ts | 134 +++++++--- 7 files changed, 283 insertions(+), 132 deletions(-) diff --git a/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts b/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts index 57b7d2192be..e372b6bef03 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts @@ -70,6 +70,7 @@ class MockProvingJobSource implements ProvingJobSource { id: 'a-job-id', type: ProvingRequestType.PRIVATE_BASE_ROLLUP, inputsUri: 'inputs-uri' as ProofUri, + epochNumber: 1, }); } heartbeat(jobId: string): Promise { diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index f2013799dac..8c2b44b98a0 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -241,7 +241,7 @@ export type ProvingJobId = z.infer; export const ProvingJob = z.object({ id: ProvingJobId, type: z.nativeEnum(ProvingRequestType), - blockNumber: z.number().optional(), + epochNumber: z.number(), inputsUri: ProofUri, }); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index c98aed86da3..dd7b05a21cf 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -120,6 +120,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource id: job.id, type: job.type, inputsUri: job.inputsUri, + epochNumber: job.epochNumber, }; } catch (err) { if (err instanceof TimeoutError) { @@ -244,7 +245,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource reject, attempts: 1, heartbeat: 0, - epochNumber, + epochNumber: epochNumber ?? 0, }; if (signal) { diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts index 73d15b082c7..82c59e7ea0f 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts @@ -59,6 +59,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { id: ProvingJobId, type: T, inputs: ProvingJobInputsMap[T], + epochNumber = 0, signal?: AbortSignal, ): Promise { // first try the cache @@ -95,6 +96,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { id, type, inputsUri, + epochNumber, }); await this.cache.setProvingJobStatus(id, { status: 'in-queue' }); } catch (err) { @@ -155,12 +157,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getAvmProof( inputs: AvmCircuitInputs, signal?: AbortSignal, - _blockNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PUBLIC_VM, inputs), ProvingRequestType.PUBLIC_VM, inputs, + epochNumber, signal, ); } @@ -168,12 +171,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBaseParityProof( inputs: BaseParityInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BASE_PARITY, inputs), ProvingRequestType.BASE_PARITY, inputs, + epochNumber, signal, ); } @@ -181,12 +185,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBlockMergeRollupProof( input: BlockMergeRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input), ProvingRequestType.BLOCK_MERGE_ROLLUP, input, + epochNumber, signal, ); } @@ -194,12 +199,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getBlockRootRollupProof( input: BlockRootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input), ProvingRequestType.BLOCK_ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -207,12 +213,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getEmptyBlockRootRollupProof( input: EmptyBlockRootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input), ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -220,12 +227,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getEmptyPrivateKernelProof( inputs: PrivateKernelEmptyInputData, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs), ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, + epochNumber, signal, ); } @@ -233,24 +241,26 @@ export class CachingBrokerFacade implements ServerCircuitProver { getMergeRollupProof( input: MergeRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.MERGE_ROLLUP, input), ProvingRequestType.MERGE_ROLLUP, input, + epochNumber, signal, ); } getPrivateBaseRollupProof( baseRollupInput: PrivateBaseRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput), ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, + epochNumber, signal, ); } @@ -258,12 +268,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getPublicBaseRollupProof( inputs: PublicBaseRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs), ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, + epochNumber, signal, ); } @@ -271,12 +282,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getRootParityProof( inputs: RootParityInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.ROOT_PARITY, inputs), ProvingRequestType.ROOT_PARITY, inputs, + epochNumber, signal, ); } @@ -284,12 +296,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getRootRollupProof( input: RootRollupInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.ROOT_ROLLUP, input), ProvingRequestType.ROOT_ROLLUP, input, + epochNumber, signal, ); } @@ -297,12 +310,13 @@ export class CachingBrokerFacade implements ServerCircuitProver { getTubeProof( tubeInput: TubeInputs, signal?: AbortSignal, - _epochNumber?: number, + epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput), ProvingRequestType.TUBE_PROOF, tubeInput, + epochNumber, signal, ); } diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts index 5a33598a31d..405d16d5c1e 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts @@ -239,7 +239,7 @@ describe('ProvingAgent', () => { const inputs: ProvingJobInputs = { type: ProvingRequestType.BASE_PARITY, inputs: makeBaseParityInputs() }; const job: ProvingJob = { id: randomBytes(8).toString('hex') as ProvingJobId, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: randomBytes(8).toString('hex') as ProofUri, }; diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index ce488e2085e..e88c69f5afa 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,5 +1,6 @@ import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; +import { sleep } from '@aztec/foundation/sleep'; import { openTmpStore } from '@aztec/kv-store/lmdb'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; @@ -10,10 +11,6 @@ import { type ProvingBrokerDatabase } from './proving_broker_database.js'; import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js'; import { KVBrokerDatabase } from './proving_broker_database/persisted.js'; -beforeAll(() => { - jest.useFakeTimers(); -}); - describe.each([ () => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }), () => { @@ -24,21 +21,23 @@ describe.each([ }, ])('ProvingBroker', createDb => { let broker: ProvingBroker; + let brokerIntervalMs: number; let jobTimeoutMs: number; let maxRetries: number; let database: ProvingBrokerDatabase; let cleanup: undefined | (() => Promise | void); - const now = () => Math.floor(Date.now() / 1000); + const now = () => Date.now(); beforeEach(() => { - jobTimeoutMs = 10_000; + jobTimeoutMs = 100; maxRetries = 2; + brokerIntervalMs = jobTimeoutMs / 4; ({ database, cleanup } = createDb()); broker = new ProvingBroker(database, new NoopTelemetryClient(), { jobTimeoutMs, - timeoutIntervalMs: jobTimeoutMs / 4, + timeoutIntervalMs: brokerIntervalMs, maxRetries, }); }); @@ -62,7 +61,7 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); @@ -71,7 +70,7 @@ describe.each([ const id2 = makeProvingJobId(); await broker.enqueueProvingJob({ id: id2, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, inputsUri: makeInputsUri(), }); @@ -82,7 +81,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -95,14 +94,14 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); await expect( broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }), @@ -118,7 +117,7 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); @@ -132,7 +131,7 @@ describe.each([ const id = makeProvingJobId(); await broker.enqueueProvingJob({ id, - blockNumber: 1, + epochNumber: 1, type: ProvingRequestType.BASE_PARITY, inputsUri: makeInputsUri(), }); @@ -147,7 +146,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -163,7 +162,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -194,21 +193,21 @@ describe.each([ const provingJob1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const provingJob2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; const provingJob3: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 3, + epochNumber: 3, inputsUri: makeInputsUri(), }; @@ -223,7 +222,7 @@ describe.each([ await broker.enqueueProvingJob({ id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -237,7 +236,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -245,7 +244,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -253,7 +252,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -261,7 +260,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -273,7 +272,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -281,7 +280,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -289,7 +288,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -297,7 +296,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -314,7 +313,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseParity1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -322,7 +321,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup1, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -330,7 +329,7 @@ describe.each([ await broker.enqueueProvingJob({ id: baseRollup2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -338,7 +337,7 @@ describe.each([ await broker.enqueueProvingJob({ id: rootParity1, type: ProvingRequestType.ROOT_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -350,7 +349,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.getProvingJob(); @@ -362,7 +361,7 @@ describe.each([ await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect( @@ -375,14 +374,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -396,7 +395,7 @@ describe.each([ expect(firstAgentJob).toEqual(job1); await assertJobStatus(job1.id, 'in-progress'); - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); await expect( broker.reportProvingJobProgress(job1.id, firstAgentStartedAt, { allowList: [ProvingRequestType.BASE_PARITY], @@ -406,8 +405,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(10_000); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -443,14 +442,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -467,8 +466,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(10_000); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -498,14 +497,14 @@ describe.each([ const job1: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; const job2: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }; @@ -518,8 +517,8 @@ describe.each([ // restart the broker! await broker.stop(); - // fake some time passing while the broker restarts - await jest.advanceTimersByTimeAsync(100 * jobTimeoutMs); + // time passes while the broker restarts + await sleep(10 * jobTimeoutMs); broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); @@ -544,13 +543,13 @@ describe.each([ await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -571,13 +570,13 @@ describe.each([ await broker.enqueueProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await broker.enqueueProvingJob({ id: id2, type: ProvingRequestType.BASE_PARITY, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -617,7 +616,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -631,7 +630,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -640,7 +639,7 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await jest.advanceTimersByTimeAsync(jobTimeoutMs); + await sleep(jobTimeoutMs + brokerIntervalMs); // should be back in the queue now await assertJobStatus(id, 'in-queue'); @@ -651,7 +650,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -661,7 +660,7 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance the time slightly, not enough for the request to timeout - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); await assertJobStatus(id, 'in-progress'); @@ -669,13 +668,13 @@ describe.each([ await broker.reportProvingJobProgress(id, time); // advance the time again - await jest.advanceTimersByTimeAsync(jobTimeoutMs / 2); + await sleep(jobTimeoutMs / 2); // should still be our request to process await assertJobStatus(id, 'in-progress'); // advance the time again and lose the request - await jest.advanceTimersByTimeAsync(jobTimeoutMs); + await sleep(jobTimeoutMs); await assertJobStatus(id, 'in-queue'); }); }); @@ -685,7 +684,7 @@ describe.each([ const provingJob: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -704,7 +703,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -726,7 +725,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -751,7 +750,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -759,7 +758,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -772,7 +771,7 @@ describe.each([ job: { id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: expect.any(String), }, time: expect.any(Number), @@ -782,7 +781,7 @@ describe.each([ job: { id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: expect.any(String), }, time: expect.any(Number), @@ -802,7 +801,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -810,7 +809,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -836,7 +835,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await database.setProvingJobResult(id1, makeOutputsUri()); @@ -845,7 +844,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -862,7 +861,7 @@ describe.each([ await database.addProvingJob({ id: id1, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await database.setProvingJobResult(id1, makeOutputsUri()); @@ -871,7 +870,7 @@ describe.each([ await database.addProvingJob({ id: id2, type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - blockNumber: 2, + epochNumber: 2, inputsUri: makeInputsUri(), }); @@ -883,12 +882,14 @@ describe.each([ jest.spyOn(database, 'deleteProvingJobAndResult'); await broker.cleanUpProvingJobState(id1); + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(1, id1); await broker.cancelProvingJob(id2); await broker.cleanUpProvingJobState(id2); + await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); - expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); + expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(2, id2); await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' }); await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' }); @@ -901,7 +902,7 @@ describe.each([ const job: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; @@ -920,7 +921,7 @@ describe.each([ broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }), ).rejects.toThrow(new Error('db error')); @@ -933,7 +934,7 @@ describe.each([ const job: ProvingJob = { id: makeProvingJobId(), type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }; jest.spyOn(database, 'setProvingJobResult'); @@ -952,7 +953,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobSuccess(id, makeOutputsUri())).rejects.toThrow(new Error('db error')); @@ -968,7 +969,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); @@ -985,7 +986,7 @@ describe.each([ await broker.enqueueProvingJob({ id, type: ProvingRequestType.BASE_PARITY, - blockNumber: 1, + epochNumber: 1, inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobError(id, 'test error')).rejects.toThrow(new Error('db error')); @@ -1017,6 +1018,82 @@ describe.each([ expect(database.setProvingJobError).not.toHaveBeenCalled(); expect(database.addProvingJob).not.toHaveBeenCalled(); }); + + it('cleans up old jobs periodically', async () => { + await broker.start(); + jest.spyOn(database, 'deleteProvingJobAndResult'); + const id1 = 'epoch1' as ProvingJobId; // makeProvingJobId(); // epoch 1 + const id2 = 'epoch2' as ProvingJobId; //makeProvingJobId(); // 2 + const id3 = 'epoch3' as ProvingJobId; //makeProvingJobId(); // 3 + const id4 = 'epoch4' as ProvingJobId; //makeProvingJobId(); // 4 + const id5 = 'epoch5' as ProvingJobId; //makeProvingJobId(); // 4 + + await sleep(10); + await broker.enqueueProvingJob({ + id: id1, + epochNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + await broker.reportProvingJobSuccess(id1, '' as ProofUri); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id2, + epochNumber: 2, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + await broker.reportProvingJobSuccess(id2, '' as ProofUri); + + // nothing got cleaned up yet. The broker first needs to advance to the next epoch + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalled(); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id3, + epochNumber: 3, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // we got a job for epoch 3, we can clean up jobs from epoch 1 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id2); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id4, + epochNumber: 4, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // once we advance to epoch 4 we can clean up finished jobs for epoch 2 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2); + + await sleep(10); + await broker.enqueueProvingJob({ + id: id5, + epochNumber: 5, + type: ProvingRequestType.BASE_PARITY, + inputsUri: '' as ProofUri, + }); + + // advancing to epoch 5 does not automatically clean up unfinished jobs for epoch 3 + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).not.toHaveBeenCalledWith(id3); + + await broker.cancelProvingJob(id3); // now job 3 is settled (aborted) + await sleep(brokerIntervalMs); + expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id3); // and we can clean it up + + await broker.cancelProvingJob(id4); + await broker.cancelProvingJob(id5); + }); }); async function assertJobStatus(id: ProvingJobId, status: string) { diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 2aaef89cd38..66ca164b30a 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -9,6 +9,7 @@ import { type ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { PriorityMemoryQueue } from '@aztec/foundation/queue'; @@ -30,29 +31,33 @@ type ProofRequestBrokerConfig = { timeoutIntervalMs?: number; jobTimeoutMs?: number; maxRetries?: number; + maxEpochsToKeepResultsFor?: number; + maxParallelCleanUps?: number; }; +type EnqueuedProvingJob = Pick; + /** * A broker that manages proof requests and distributes them to workers based on their priority. * It takes a backend that is responsible for storing and retrieving proof requests and results. */ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { private queues: ProvingQueues = { - [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PRIVATE_KERNEL_EMPTY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PRIVATE_KERNEL_EMPTY]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PRIVATE_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.PUBLIC_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PRIVATE_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PUBLIC_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BLOCK_MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BLOCK_MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.BASE_PARITY]: new PriorityMemoryQueue(provingJobComparator), - [ProvingRequestType.ROOT_PARITY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BASE_PARITY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_PARITY]: new PriorityMemoryQueue(provingJobComparator), }; // holds a copy of the database in memory in order to quickly fulfill requests @@ -76,23 +81,46 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { // a map of promises that will be resolved when a job is settled private promises = new Map>(); - private timeoutPromise: RunningPromise; - private timeSource = () => Math.floor(Date.now() / 1000); + private cleanupPromise: RunningPromise; + private msTimeSource = () => Date.now(); private jobTimeoutMs: number; private maxRetries: number; private instrumentation: ProvingBrokerInstrumentation; + private maxParallelCleanUps: number; + + /** + * The broker keeps track of the highest epoch its seen. + * This information is used for garbage collection: once it reaches the next epoch, it can start pruning the database of old state. + * This clean up pass is only done against _settled_ jobs. This pass will not cancel jobs that are in-progress or in-queue. + * It is a client responsibility to cancel jobs if they are no longer necessary. + * Example: + * proving epoch 11 - the broker will wipe all setlled jobs for epochs 9 and lower + * finished proving epoch 11 and got first job for epoch 12 -> the broker will wipe all setlled jobs for epochs 10 and lower + * reorged back to end of epoch 10 -> epoch 11 is skipped and epoch 12 starts -> the broker will wipe all setlled jobs for epochs 10 and lower + */ + private epochHeight = 0; + private maxEpochsToKeepResultsFor = 1; + public constructor( private database: ProvingBrokerDatabase, client: TelemetryClient, - { jobTimeoutMs = 30_000, timeoutIntervalMs = 10_000, maxRetries = 3 }: ProofRequestBrokerConfig = {}, + { + jobTimeoutMs = 30_000, + timeoutIntervalMs = 10_000, + maxRetries = 3, + maxEpochsToKeepResultsFor = 1, + maxParallelCleanUps = 20, + }: ProofRequestBrokerConfig = {}, private logger = createLogger('prover-client:proving-broker'), ) { this.instrumentation = new ProvingBrokerInstrumentation(client); - this.timeoutPromise = new RunningPromise(this.timeoutCheck, timeoutIntervalMs); + this.cleanupPromise = new RunningPromise(this.cleanupPass, timeoutIntervalMs); this.jobTimeoutMs = jobTimeoutMs; this.maxRetries = maxRetries; + this.maxEpochsToKeepResultsFor = maxEpochsToKeepResultsFor; + this.maxParallelCleanUps = maxParallelCleanUps; } private measureQueueDepth: MonitorCallback = (type: ProvingRequestType) => { @@ -127,7 +155,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { } } - this.timeoutPromise.start(); + this.cleanupPromise.start(); this.instrumentation.monitorQueueDepth(this.measureQueueDepth); this.instrumentation.monitorActiveJobs(this.countActiveJobs); @@ -135,8 +163,8 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { return Promise.resolve(); } - public stop(): Promise { - return this.timeoutPromise.stop(); + public async stop(): Promise { + await this.cleanupPromise.stop(); } public async enqueueProvingJob(job: ProvingJob): Promise { @@ -169,9 +197,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { public async cleanUpProvingJobState(id: ProvingJobId): Promise { if (!this.resultsCache.has(id)) { - throw new Error(`Can't cancel busy proving job: id=${id}`); + this.logger.warn(`Can't cleanup busy proving job: id=${id}`); + return; } + this.logger.debug(`Cleaning up state for job id=${id}`); await this.database.deleteProvingJobAndResult(id); this.jobsCache.delete(id); this.promises.delete(id); @@ -209,14 +239,15 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { for (const proofType of allowedProofs) { const queue = this.queues[proofType]; - let job: ProvingJob | undefined; + let enqueuedJob: EnqueuedProvingJob | undefined; // exhaust the queue and make sure we're not sending a job that's already in progress // or has already been completed // this can happen if the broker crashes and restarts // it's possible agents will report progress or results for jobs that are in the queue (after the restart) - while ((job = queue.getImmediate())) { - if (!this.inProgress.has(job.id) && !this.resultsCache.has(job.id)) { - const time = this.timeSource(); + while ((enqueuedJob = queue.getImmediate())) { + const job = this.jobsCache.get(enqueuedJob.id); + if (job && !this.inProgress.has(enqueuedJob.id) && !this.resultsCache.has(enqueuedJob.id)) { + const time = this.msTimeSource(); this.inProgress.set(job.id, { id: job.id, startedAt: time, @@ -272,8 +303,8 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.promises.get(id)!.resolve(result); this.instrumentation.incRejectedJobs(item.type); if (info) { - const duration = this.timeSource() - info.startedAt; - this.instrumentation.recordJobDuration(item.type, duration * 1000); + const duration = this.msTimeSource() - info.startedAt; + this.instrumentation.recordJobDuration(item.type, duration); } } @@ -294,7 +325,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { } const metadata = this.inProgress.get(id); - const now = this.timeSource(); + const now = this.msTimeSource(); if (!metadata) { this.logger.warn( `Proving job id=${id} type=${ProvingRequestType[job.type]} not found in the in-progress cache, adding it`, @@ -305,7 +336,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.set(id, { id, startedAt, - lastUpdatedAt: this.timeSource(), + lastUpdatedAt: this.msTimeSource(), }); return Promise.resolve(undefined); } else if (startedAt <= metadata.startedAt) { @@ -358,7 +389,31 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.instrumentation.incResolvedJobs(item.type); } - private timeoutCheck = () => { + private cleanupPass = async () => { + await this.cleanupStaleJobs(); + this.reEnqueueExpiredJobs(); + }; + + private async cleanupStaleJobs() { + const jobIds = Array.from(this.jobsCache.keys()); + const jobsToClean: ProvingJobId[] = []; + for (const id of jobIds) { + const job = this.jobsCache.get(id)!; + const isComplete = this.resultsCache.has(id); + if (isComplete && job.epochNumber < this.epochHeight - this.maxEpochsToKeepResultsFor) { + jobsToClean.push(id); + } + } + + if (jobsToClean.length > 0) { + this.logger.info(`Cleaning up [${jobsToClean.join(',')}]`); + await asyncPool(this.maxParallelCleanUps, jobsToClean, async jobId => { + await this.cleanUpProvingJobState(jobId); + }); + } + } + + private reEnqueueExpiredJobs() { const inProgressEntries = Array.from(this.inProgress.entries()); for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); @@ -368,7 +423,8 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { continue; } - const msSinceLastUpdate = (this.timeSource() - metadata.lastUpdatedAt) * 1000; + const now = this.msTimeSource(); + const msSinceLastUpdate = now - metadata.lastUpdatedAt; if (msSinceLastUpdate >= this.jobTimeoutMs) { this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); this.inProgress.delete(id); @@ -376,20 +432,24 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.instrumentation.incTimedOutJobs(item.type); } } - }; + } private enqueueJobInternal(job: ProvingJob): void { if (!this.promises.has(job.id)) { this.promises.set(job.id, promiseWithResolvers()); } - this.queues[job.type].put(job); + this.queues[job.type].put({ + epochNumber: job.epochNumber, + id: job.id, + }); this.enqueuedAt.set(job.id, new Timer()); + this.epochHeight = Math.max(this.epochHeight, job.epochNumber); this.logger.debug(`Enqueued new proving job id=${job.id}`); } } type ProvingQueues = { - [K in ProvingRequestType]: PriorityMemoryQueue; + [K in ProvingRequestType]: PriorityMemoryQueue; }; /** @@ -398,12 +458,10 @@ type ProvingQueues = { * @param b - Another proving job * @returns A number indicating the relative priority of the two proving jobs */ -function provingJobComparator(a: ProvingJob, b: ProvingJob): -1 | 0 | 1 { - const aBlockNumber = a.blockNumber ?? 0; - const bBlockNumber = b.blockNumber ?? 0; - if (aBlockNumber < bBlockNumber) { +function provingJobComparator(a: EnqueuedProvingJob, b: EnqueuedProvingJob): -1 | 0 | 1 { + if (a.epochNumber < b.epochNumber) { return -1; - } else if (aBlockNumber > bBlockNumber) { + } else if (a.epochNumber > b.epochNumber) { return 1; } else { return 0; From 0faec6bcd8b048695cd3ac062961c4d978c183ad Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 11 Dec 2024 15:12:58 +0000 Subject: [PATCH 3/3] fix: don't retry stale jobs --- .../src/proving_broker/proving_broker.test.ts | 71 +++++++++++++++++++ .../src/proving_broker/proving_broker.ts | 35 ++++++--- 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index e88c69f5afa..454e840543f 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -645,6 +645,42 @@ describe.each([ await assertJobStatus(id, 'in-queue'); }); + it('cancel stale jobs that time out', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + epochNumber: 1, + inputsUri: makeInputsUri(), + }); + + await assertJobStatus(id, 'in-queue'); + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // advance time so job times out because of no heartbeats + await sleep(jobTimeoutMs + brokerIntervalMs); + + // should be back in the queue now + await assertJobStatus(id, 'in-queue'); + + // another agent picks it up + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // epoch has advances + await broker.enqueueProvingJob({ + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + epochNumber: 10, + inputsUri: makeInputsUri(), + }); + + // advance time again so job times out. This time it should be rejected + await sleep(jobTimeoutMs + brokerIntervalMs); + await assertJobStatus(id, 'rejected'); + }); + it('keeps the jobs in progress while it is alive', async () => { const id = makeProvingJobId(); await broker.enqueueProvingJob({ @@ -737,6 +773,41 @@ describe.each([ reason: 'test error', }); }); + + it('does not retry if job is stale', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + epochNumber: 1, + inputsUri: makeInputsUri(), + }); + + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + await broker.reportProvingJobError(id, 'test error', true); + // gets retried once + await assertJobStatus(id, 'in-queue'); + + // pick up the job again + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // advance the epoch height + await broker.enqueueProvingJob({ + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + epochNumber: 3, + inputsUri: makeInputsUri(), + }); + + await broker.reportProvingJobError(id, 'test error', true); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ + status: 'rejected', + reason: 'test error', + }); + }); }); describe('Database management', () => { diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 66ca164b30a..3487f83e7e8 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -282,7 +282,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.delete(id); } - if (retry && retries + 1 < this.maxRetries) { + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} already is already settled, ignoring error`); + return; + } + + if (retry && retries + 1 < this.maxRetries && !this.isJobStale(item)) { this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); this.retries.set(id, retries + 1); this.enqueueJobInternal(item); @@ -377,6 +382,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.inProgress.delete(id); } + if (this.resultsCache.has(id)) { + this.logger.warn(`Proving job id=${id} already settled, ignoring result`); + return; + } + this.logger.debug( `Proving job complete id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1}`, ); @@ -391,7 +401,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { private cleanupPass = async () => { await this.cleanupStaleJobs(); - this.reEnqueueExpiredJobs(); + await this.reEnqueueExpiredJobs(); }; private async cleanupStaleJobs() { @@ -400,7 +410,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { for (const id of jobIds) { const job = this.jobsCache.get(id)!; const isComplete = this.resultsCache.has(id); - if (isComplete && job.epochNumber < this.epochHeight - this.maxEpochsToKeepResultsFor) { + if (isComplete && this.isJobStale(job)) { jobsToClean.push(id); } } @@ -413,7 +423,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { } } - private reEnqueueExpiredJobs() { + private async reEnqueueExpiredJobs() { const inProgressEntries = Array.from(this.inProgress.entries()); for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); @@ -426,10 +436,15 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { const now = this.msTimeSource(); const msSinceLastUpdate = now - metadata.lastUpdatedAt; if (msSinceLastUpdate >= this.jobTimeoutMs) { - this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); - this.inProgress.delete(id); - this.enqueueJobInternal(item); - this.instrumentation.incTimedOutJobs(item.type); + if (this.isJobStale(item)) { + // the job has timed out and it's also old, just cancel and move on + await this.cancelProvingJob(item.id); + } else { + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); + this.inProgress.delete(id); + this.enqueueJobInternal(item); + this.instrumentation.incTimedOutJobs(item.type); + } } } } @@ -446,6 +461,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.epochHeight = Math.max(this.epochHeight, job.epochNumber); this.logger.debug(`Enqueued new proving job id=${job.id}`); } + + private isJobStale(job: ProvingJob) { + return job.epochNumber < this.epochHeight - this.maxEpochsToKeepResultsFor; + } } type ProvingQueues = {