Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory leak in the broker #10567

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions yarn-project/circuit-types/src/interfaces/prover-broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ export interface ProvingJobProducer {
enqueueProvingJob(job: ProvingJob): Promise<void>;

/**
* Cancels a proving job and clears all of its
* Cancels a proving job.
* @param id - The ID of the job to cancel
*/
removeAndCancelProvingJob(id: ProvingJobId): Promise<void>;
cancelProvingJob(id: ProvingJobId): Promise<void>;

/**
* Cleans up after a job has completed. Throws if the job is in-progress
* @param id - The ID of the job to cancel
*/
cleanUpProvingJobState(id: ProvingJobId): Promise<void>;

/**
* Returns the current status fof the proving job
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/ethereum/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@
"engines": {
"node": ">=18"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ describe('CachingBrokerFacade', () => {
broker = mock<ProvingJobProducer>({
enqueueProvingJob: jest.fn<any>(),
getProvingJobStatus: jest.fn<any>(),
removeAndCancelProvingJob: jest.fn<any>(),
cancelProvingJob: jest.fn<any>(),
cleanUpProvingJobState: jest.fn<any>(),
waitForJobToSettle: jest.fn<any>(),
});
cache = new InMemoryProverCache();
Expand Down Expand Up @@ -101,4 +102,55 @@ describe('CachingBrokerFacade', () => {
await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result);
expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once
});

it('clears broker state after a job resolves', async () => {
const { promise, resolve } = promiseWithResolvers<any>();
broker.enqueueProvingJob.mockResolvedValue(Promise.resolve());
broker.waitForJobToSettle.mockResolvedValue(promise);

const inputs = makeBaseParityInputs();
void facade.getBaseParityProof(inputs);
await jest.advanceTimersToNextTimerAsync();

const job = broker.enqueueProvingJob.mock.calls[0][0];
const result = makePublicInputsAndRecursiveProof(
makeParityPublicInputs(),
makeRecursiveProof(RECURSIVE_PROOF_LENGTH),
VerificationKeyData.makeFakeHonk(),
);
const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result);
resolve({
status: 'fulfilled',
value: outputUri,
});

await jest.advanceTimersToNextTimerAsync();
expect(broker.cleanUpProvingJobState).toHaveBeenCalled();
});

it('clears broker state after a job is canceled', async () => {
const { promise, resolve } = promiseWithResolvers<any>();
const catchSpy = jest.fn();
broker.enqueueProvingJob.mockResolvedValue(Promise.resolve());
broker.waitForJobToSettle.mockResolvedValue(promise);

const inputs = makeBaseParityInputs();
const controller = new AbortController();
void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy);
await jest.advanceTimersToNextTimerAsync();

expect(broker.cancelProvingJob).not.toHaveBeenCalled();
controller.abort();
await jest.advanceTimersToNextTimerAsync();
expect(broker.cancelProvingJob).toHaveBeenCalled();

resolve({
status: 'rejected',
reason: 'Aborted',
});

await jest.advanceTimersToNextTimerAsync();
expect(broker.cleanUpProvingJobState).toHaveBeenCalled();
expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted'));
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
// notify broker of cancelled job
const abortFn = async () => {
signal?.removeEventListener('abort', abortFn);
await this.broker.removeAndCancelProvingJob(id);
await this.broker.cancelProvingJob(id);
};

signal?.addEventListener('abort', abortFn);
Expand Down Expand Up @@ -147,6 +147,8 @@ export class CachingBrokerFacade implements ServerCircuitProver {
}
} finally {
signal?.removeEventListener('abort', abortFn);
// we've saved the result in our cache. We can tell the broker to clear its state
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind me: who's expected to be persisting proving data? I wouldn't want to clean up the broker data if we were counting on it to persist work info across crashes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 I misplaced my comment sorry about that #10567 (comment)

await this.broker.cleanUpProvingJobState(id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this correctly. Does this mean the orchestrator effectively tells the broker when to clear state. This feels like it's quite coupled. Would a better approach be for the broker to simply e.g. delete all state for epochs < N - 1 when it is asked to prove something for epoch N.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you that's right. I think your suggestion makes a lot of sense!

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,10 @@ describe.each([
type: ProvingRequestType.BASE_PARITY,
inputsUri: makeInputsUri(),
});
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' });

await broker.removeAndCancelProvingJob(id);
await assertJobStatus(id, 'in-queue');

await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' });
await broker.cancelProvingJob(id);
await assertJobStatus(id, 'rejected');
});

it('cancels jobs in-progress', async () => {
Expand All @@ -137,11 +136,11 @@ describe.each([
type: ProvingRequestType.BASE_PARITY,
inputsUri: makeInputsUri(),
});
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' });
await assertJobStatus(id, 'in-queue');
await broker.getProvingJob();
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-progress' });
await broker.removeAndCancelProvingJob(id);
await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' });
await assertJobStatus(id, 'in-progress');
await broker.cancelProvingJob(id);
await assertJobStatus(id, 'rejected');
});

it('returns job result if successful', async () => {
Expand Down Expand Up @@ -356,8 +355,8 @@ describe.each([
});
await broker.getProvingJob();
await assertJobStatus(id, 'in-progress');
await broker.removeAndCancelProvingJob(id);
await assertJobStatus(id, 'not-found');
await broker.cancelProvingJob(id);
await assertJobStatus(id, 'rejected');

const id2 = makeProvingJobId();
await broker.enqueueProvingJob({
Expand Down Expand Up @@ -691,22 +690,13 @@ describe.each([
};

await broker.enqueueProvingJob(provingJob);

await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({
status: 'in-queue',
});
await assertJobStatus(provingJob.id, 'in-queue');

await expect(broker.getProvingJob()).resolves.toEqual({ job: provingJob, time: expect.any(Number) });

await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({
status: 'in-progress',
});
await assertJobStatus(provingJob.id, 'in-progress');

await broker.reportProvingJobError(provingJob.id, 'test error', true);

await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({
status: 'in-queue',
});
await assertJobStatus(provingJob.id, 'in-queue');
});

it('retries up to a maximum number of times', async () => {
Expand Down Expand Up @@ -892,8 +882,10 @@ describe.each([

jest.spyOn(database, 'deleteProvingJobAndResult');

await broker.removeAndCancelProvingJob(id1);
await broker.removeAndCancelProvingJob(id2);
await broker.cleanUpProvingJobState(id1);

await broker.cancelProvingJob(id2);
await broker.cleanUpProvingJobState(id2);

expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1);
expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2);
Expand Down
26 changes: 19 additions & 7 deletions yarn-project/prover-client/src/proving_broker/proving_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,20 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer {
return promiseWithResolvers.promise;
}

public async removeAndCancelProvingJob(id: ProvingJobId): Promise<void> {
this.logger.info(`Cancelling job id=${id}`);
await this.database.deleteProvingJobAndResult(id);

public async cancelProvingJob(id: ProvingJobId): Promise<void> {
// notify listeners of the cancellation
if (!this.resultsCache.has(id)) {
this.promises.get(id)?.resolve({ status: 'rejected', reason: 'Aborted' });
this.logger.info(`Cancelling job id=${id}`);
await this.reportProvingJobError(id, 'Aborted', false);
}
}

public async cleanUpProvingJobState(id: ProvingJobId): Promise<void> {
if (!this.resultsCache.has(id)) {
throw new Error(`Can't cancel busy proving job: id=${id}`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about not rethrowing and instead just logging the error here? Otherwise, if we go into the Failed to cache proving job branch in the caching facade, we won't have the result in the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! The cache gets reaped once the prover finishes the current epoch or starts working on the next one

await this.proverCacheManager.removeStaleCaches(epochNumber);

}

await this.database.deleteProvingJobAndResult(id);
this.jobsCache.delete(id);
this.promises.delete(id);
this.resultsCache.delete(id);
Expand Down Expand Up @@ -254,8 +259,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer {
return;
}

this.logger.debug(
`Marking proving job id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1} as failed`,
this.logger.warn(
`Marking proving job as failed id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${
retries + 1
} err=${err}`,
);

await this.database.setProvingJobError(id, err);
Expand All @@ -281,6 +288,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer {
return filter ? this.getProvingJob(filter) : Promise.resolve(undefined);
}

if (this.resultsCache.has(id)) {
this.logger.warn(`Proving job id=${id} has already been completed`);
return filter ? this.getProvingJob(filter) : Promise.resolve(undefined);
}

const metadata = this.inProgress.get(id);
const now = this.timeSource();
if (!metadata) {
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/prover-client/src/proving_broker/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const GetProvingJobResponse = z.object({
export const ProvingJobProducerSchema: ApiSchemaFor<ProvingJobProducer> = {
enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()),
getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus),
removeAndCancelProvingJob: z.function().args(ProvingJobId).returns(z.void()),
cleanUpProvingJobState: z.function().args(ProvingJobId).returns(z.void()),
cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()),
waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult),
};

Expand Down
7 changes: 5 additions & 2 deletions yarn-project/prover-client/src/test/mock_prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ export class TestBroker implements ProvingJobProducer {
getProvingJobStatus(id: ProvingJobId): Promise<ProvingJobStatus> {
return this.broker.getProvingJobStatus(id);
}
removeAndCancelProvingJob(id: ProvingJobId): Promise<void> {
return this.broker.removeAndCancelProvingJob(id);
cleanUpProvingJobState(id: ProvingJobId): Promise<void> {
return this.broker.cleanUpProvingJobState(id);
}
cancelProvingJob(id: string): Promise<void> {
return this.broker.cancelProvingJob(id);
}
waitForJobToSettle(id: ProvingJobId): Promise<ProvingJobSettledResult> {
return this.broker.waitForJobToSettle(id);
Expand Down
Loading