Skip to content

Commit

Permalink
chore: catch up note processors could be synced more efficiently (Azt…
Browse files Browse the repository at this point in the history
…ecProtocol#3933)

In this PR, when catching up note processors, instead of processing them
sequentially, we sort them to know the oldest block we need, then start
at that point and fetch `limit` blocks.

When we have the blocks, we provide them to all processors that need it.
  • Loading branch information
just-mitch authored Jan 11, 2024
1 parent bdeb10c commit df54f33
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 62 deletions.
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/memory_fifo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,18 @@ export class MemoryFifo<T> {
/**
* Put an item onto back of the queue.
* @param item - The item to enqueue.
* @returns A boolean indicating whether the item was successfully added to the queue.
*/
public put(item: T) {
public put(item: T): boolean {
if (this.flushing) {
this.log.warn('Discarding item because queue is flushing');
return;
return false;
} else if (this.waiting.length) {
this.waiting.shift()!(item);
return true;
} else {
this.items.push(item);
return true;
}
}

Expand Down
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/serial_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ export class SerialQueue {
* Enqueues fn for execution on the serial queue.
* Returns the result of the function after execution.
* @param fn - The function to enqueue.
* @returns A resolution promise.
* @returns A resolution promise. Rejects if the function does, or if the function could not be enqueued.
*/
public put<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.put(async () => {
const accepted = this.queue.put(async () => {
try {
const res = await fn();
resolve(res);
} catch (e) {
reject(e);
}
});
if (!accepted) {
reject(new Error('Could not enqueue function'));
}
});
}

Expand Down
9 changes: 0 additions & 9 deletions yarn-project/pxe/src/pxe_service/pxe_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ export class PXEService implements PXE {
// serialize synchronizer and calls to simulateTx.
// ensures that state is not changed while simulating
private jobQueue = new SerialQueue();
private running = false;

constructor(
private keyStore: KeyStore,
Expand Down Expand Up @@ -105,7 +104,6 @@ export class PXEService implements PXE {
await this.restoreNoteProcessors();
const info = await this.getNodeInfo();
this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`);
this.running = true;
}

private async restoreNoteProcessors() {
Expand Down Expand Up @@ -355,9 +353,6 @@ export class PXEService implements PXE {
if (txRequest.functionData.isInternal === undefined) {
throw new Error(`Unspecified internal are not allowed`);
}
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
Expand Down Expand Up @@ -398,10 +393,6 @@ export class PXEService implements PXE {
to: AztecAddress,
_from?: AztecAddress,
): Promise<DecodedReturn> {
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// TODO - Should check if `from` has the permission to call the view function.
Expand Down
71 changes: 50 additions & 21 deletions yarn-project/pxe/src/synchronizer/synchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,47 +91,76 @@ describe('Synchronizer', () => {
});

it('note processor successfully catches up', async () => {
const block = L2Block.random(1, 4);
const blocks = [L2Block.random(1, 4), L2Block.random(2, 4)];

aztecNode.getBlocks
// called by synchronizer.work
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))]);

// getBlocks is called by both synchronizer.work and synchronizer.workNoteProcessorCatchUp
aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]);
aztecNode.getLogs
.mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchronizer.workNoteProcessorCatchUp
// called by synchronizer.work
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[0].newUnencryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newUnencryptedLogs!])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!]);

// Sync the synchronizer so that note processor has something to catch up to
await synchronizer.work();
aztecNode.getBlockNumber.mockResolvedValue(INITIAL_L2_BLOCK_NUM + 1);

// Used in synchronizer.isAccountStateSynchronized
aztecNode.getBlockNumber.mockResolvedValueOnce(1);
// Sync the synchronizer so that note processor has something to catch up to
// There are two blocks, and we have a limit of 1 block per work call
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(false);
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(true);

// Manually adding account to database so that we can call synchronizer.isAccountStateSynchronized
const keyStore = new TestKeyStore(new Grumpkin(), await AztecLmdbStore.create(EthAddress.random()));
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
const addAddress = async (startingBlockNum: number) => {
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
synchronizer.addAccount(completeAddress.publicKey, keyStore, startingBlockNum);
return completeAddress;
};

const [completeAddressA, completeAddressB, completeAddressC] = await Promise.all([
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM + 1),
]);

await synchronizer.workNoteProcessorCatchUp();

// Add the account which will add the note processor to the synchronizer
synchronizer.addAccount(completeAddress.publicKey, keyStore, INITIAL_L2_BLOCK_NUM);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(false);

await synchronizer.workNoteProcessorCatchUp();

expect(await synchronizer.isAccountStateSynchronized(completeAddress.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(true);
});
});

class TestSynchronizer extends Synchronizer {
public work() {
return super.work();
public work(limit = 1) {
return super.work(limit);
}

public initialSync(): Promise<void> {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<boolean> {
return super.workNoteProcessorCatchUp();
public workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
return super.workNoteProcessorCatchUp(limit);
}
}
87 changes: 63 additions & 24 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,45 @@ export class Synchronizer {
}

/**
* Catch up a note processor that is lagging behind the main sync,
* Catch up note processors that are lagging behind the main sync.
* e.g. because we just added a new account.
*
* @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
* @returns true if there could be more work, false if there was an error which allows a retry with delay.
*/
protected async workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const toBlockNumber = this.getSynchedBlockNumber();

if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor already synched, nothing to do
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
// could be more work if there are more note processors to catch up
// filter out note processors that are already caught up
// and sort them by the block number they are lagging behind in ascending order
this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(noteProcessor => {
if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor is ahead of main sync, nothing to do
this.noteProcessors.push(noteProcessor);
return false;
}
return true;
});

if (!this.noteProcessorsToCatchUp.length) {
// No note processors to catch up, nothing to do here,
// but we return true to continue with the normal flow.
return true;
}

const from = noteProcessor.status.syncedToBlock + 1;
// create a copy so that:
// 1. we can modify the original array while iterating over it
// 2. we don't need to serialize insertions into the array
const catchUpGroup = this.noteProcessorsToCatchUp
.slice()
// sort by the block number they are lagging behind
.sort((a, b) => a.status.syncedToBlock - b.status.syncedToBlock);

// grab the note processor that is lagging behind the most
const from = catchUpGroup[0].status.syncedToBlock + 1;
// Ensuring that the note processor does not sync further than the main sync.
limit = Math.min(limit, toBlockNumber - from + 1);
// this.log(`Catching up ${catchUpGroup.length} note processors by up to ${limit} blocks starting at block ${from}`);

if (limit < 1) {
throw new Error(`Unexpected limit ${limit} for note processor catch up`);
Expand Down Expand Up @@ -209,22 +227,43 @@ export class Synchronizer {
const blockContexts = blocks.map(block => new L2BlockContext(block));

const logCount = L2BlockL2Logs.getTotalLogCount(encryptedLogs);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processor in catch up mode`);
await noteProcessor.process(blockContexts, encryptedLogs);

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processors in catch up mode`);

for (const noteProcessor of catchUpGroup) {
// find the index of the first block that the note processor is not yet synced to
const index = blockContexts.findIndex(block => block.block.number > noteProcessor.status.syncedToBlock);
if (index === -1) {
// Due to the limit, we might not have fetched a new enough block for the note processor.
// And since the group is sorted, we break as soon as we find a note processor
// that needs blocks newer than the newest block we fetched.
break;
}

this.log.debug(
`Catching up note processor ${noteProcessor.publicKey.toString()} by processing ${
blockContexts.length - index
} blocks`,
);
await noteProcessor.process(blockContexts.slice(index), encryptedLogs.slice(index));

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);

this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(
np => !np.publicKey.equals(noteProcessor.publicKey),
);
this.noteProcessors.push(noteProcessor);
}
}
return true;

return true; // could be more work, immediately continue syncing
} catch (err) {
this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err);
return false;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EthAddress } from '@aztec/circuits.js';

import { L2Block } from './l2_block.js';
import { L2Tx } from './l2_tx.js';
import { TxHash } from './tx/index.js';
import { TxHash } from './tx/tx_hash.js';

/**
* Interface of classes allowing for the retrieval of L2 blocks.
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/types/src/notes/extended_note.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { AztecAddress, Fr } from '@aztec/circuits.js';
import { BufferReader } from '@aztec/foundation/serialize';

import { Note } from '../logs/index.js';
import { TxHash } from '../tx/index.js';
import { Note } from '../logs/l1_note_payload/note.js';
import { TxHash } from '../tx/tx_hash.js';

/**
* A note with contextual data.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/tx/tx_receipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AztecAddress } from '@aztec/foundation/aztec-address';
import { Fr } from '@aztec/foundation/fields';

import { ContractData } from '../contract_data.js';
import { ExtendedNote } from '../notes/index.js';
import { ExtendedNote } from '../notes/extended_note.js';
import { PublicDataWrite } from '../public_data_write.js';
import { TxHash } from './tx_hash.js';

Expand Down

0 comments on commit df54f33

Please sign in to comment.