Skip to content

Commit

Permalink
refactor: archiver store & synch
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Jan 15, 2024
1 parent fd1f619 commit 9eae4b7
Show file tree
Hide file tree
Showing 34 changed files with 1,244 additions and 1,150 deletions.
5 changes: 3 additions & 2 deletions yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
"@aztec/circuits.js": "workspace:^",
"@aztec/ethereum": "workspace:^",
"@aztec/foundation": "workspace:^",
"@aztec/kv-store": "workspace:^",
"@aztec/l1-artifacts": "workspace:^",
"@types/lodash.omit": "^4.5.7",
"@aztec/types": "workspace:^",
"debug": "^4.3.4",
"lmdb": "^2.9.1",
"lodash.omit": "^4.5.0",
"tsc-watch": "^6.0.0",
"tslib": "^2.5.0",
Expand All @@ -52,6 +52,7 @@
"@jest/globals": "^29.5.0",
"@types/debug": "^4.1.7",
"@types/jest": "^29.5.0",
"@types/lodash.omit": "^4.5.7",
"@types/node": "^18.15.11",
"@types/ws": "^8.5.4",
"concurrently": "^8.0.1",
Expand Down
34 changes: 20 additions & 14 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
*
*/

// ********** Events that are processed in between blocks **********
// ********** Events that are processed per L1 block **********

// Process l1ToL2Messages, these are consumed as time passes, not each block
const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages(
Expand All @@ -189,24 +189,29 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource
currentL1BlockNumber,
);

// TODO (#717): optimize this - there could be messages in confirmed that are also in pending.
// Or messages in pending that are also cancelled in the same block. No need to modify storage for them.
const messagesByBlock = new Map<bigint, [L1ToL2Message[], Fr[]]>();
for (const [message, blockNumber] of retrievedPendingL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[0].push(message);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedPendingL1ToL2Messages.retrievedData.length) {
// Store l1 to l2 messages
this.log(`Adding ${retrievedPendingL1ToL2Messages.retrievedData.length} pending l1 to l2 messages to store`);
await this.store.addPendingL1ToL2Messages(retrievedPendingL1ToL2Messages.retrievedData);
for (const [messageKey, blockNumber] of retrievedCancelledL1ToL2Messages.retrievedData) {
const messages = messagesByBlock.get(blockNumber) || [[], []];
messages[1].push(messageKey);
messagesByBlock.set(blockNumber, messages);
}

if (retrievedCancelledL1ToL2Messages.retrievedData.length) {
// remove cancelled messages from the pending message store:
const l1Blocks = Array.from(messagesByBlock.keys()).sort((a, b) => Number(a - b));
for (const l1Block of l1Blocks) {
const [newMessages, cancelledMessages] = messagesByBlock.get(l1Block)!;
this.log(
`Removing ${retrievedCancelledL1ToL2Messages.retrievedData.length} pending l1 to l2 messages from store where messages were cancelled`,
`Adding ${newMessages.length} new messages and ${cancelledMessages.length} cancelled messages in L1 block ${l1Block}`,
);
await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData);
await this.store.processL1ToL2Messages(newMessages, cancelledMessages, l1Block);
}

// ********** Events that are processed per block **********
// ********** Events that are processed per L2 block **********

// Read all data from chain and then write to our stores at the end
const nextExpectedL2BlockNum = BigInt((await this.store.getBlockNumber()) + 1);
Expand Down Expand Up @@ -264,9 +269,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource

// from retrieved L2Blocks, confirm L1 to L2 messages that have been published
// from each l2block fetch all messageKeys in a flattened array:
const messageKeysToRemove = retrievedBlocks.retrievedData.map(l2block => l2block.newL1ToL2Messages).flat();
this.log(`Confirming l1 to l2 messages in store`);
await this.store.confirmL1ToL2Messages(messageKeysToRemove);
for (const block of retrievedBlocks.retrievedData) {
await this.store.confirmL1ToL2Messages(block.newL1ToL2Messages, block.number);
}

// store retrieved L2 blocks after removing new logs information.
// remove logs to serve "lightweight" block information. Logs can be fetched separately if needed.
Expand Down
21 changes: 7 additions & 14 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
CancelledL1ToL2Message,
ContractData,
ExtendedContractData,
GetUnencryptedLogsResponse,
Expand All @@ -9,7 +8,6 @@ import {
L2Tx,
LogFilter,
LogType,
PendingL1ToL2Message,
TxHash,
} from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
Expand Down Expand Up @@ -56,26 +54,21 @@ export interface ArchiverDataStore {
): Promise<boolean>;

/**
* Append new pending L1 to L2 messages to the store.
* @param messages - The L1 to L2 messages to be added to the store.
* @returns True if the operation is successful.
*/
addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise<boolean>;

/**
* Remove pending L1 to L2 messages from the store (if they were cancelled).
* @param message - The message keys to be removed from the store.
* @returns True if the operation is successful.
* Processes new messages from an L1 block.
* @param newMessages - New L1 to L2 messages to be added to the store.
* @param cancelledMessages - Cancelled L1 to L2 messages to be removed from the store.
* @param l1BlockNumber - The block number of the L1 block that contains the messages.
*/
cancelPendingL1ToL2Messages(message: CancelledL1ToL2Message[]): Promise<boolean>;
processL1ToL2Messages(newMessages: L1ToL2Message[], cancelledMessages: Fr[], l1BlockNumber: bigint): Promise<boolean>;

/**
* Messages that have been published in an L2 block are confirmed.
* Add them to the confirmed store, also remove them from the pending store.
* @param messageKeys - The message keys to be removed from the store.
* @param l2BlockNumber - The block number of the L2 block that contains the messages.
* @returns True if the operation is successful.
*/
confirmL1ToL2Messages(messageKeys: Fr[]): Promise<boolean>;
confirmL1ToL2Messages(messageKeys: Fr[], l2BlockNumber: number): Promise<boolean>;

/**
* Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee
Expand Down
162 changes: 53 additions & 109 deletions yarn-project/archiver/src/archiver/archiver_store_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import {
CancelledL1ToL2Message,
ExtendedContractData,
INITIAL_L2_BLOCK_NUM,
L1ToL2Message,
L2Block,
L2BlockContext,
LogId,
LogType,
PendingL1ToL2Message,
TxHash,
UnencryptedL2Log,
} from '@aztec/circuit-types';
Expand Down Expand Up @@ -151,66 +149,48 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch

describe('addPendingL1ToL2Messages', () => {
it('stores pending L1 to L2 messages', async () => {
await expect(
store.addPendingL1ToL2Messages([new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0)]),
).resolves.toEqual(true);
await expect(store.processL1ToL2Messages([L1ToL2Message.random(Fr.random())], [], 1n)).resolves.toEqual(true);
});

it('allows duplicate pending messages in different positions in the same block', async () => {
it('allows duplicate pending messages in the same block', async () => {
const message = L1ToL2Message.random(Fr.random());
await expect(
store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]),
).resolves.toEqual(true);

await expect(store.processL1ToL2Messages([message, message], [], 1n)).resolves.toEqual(true);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]);
});

it('allows duplicate pending messages in different blocks', async () => {
const message = L1ToL2Message.random(Fr.random());
await expect(
store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 2n, 0),
]),
).resolves.toEqual(true);

await expect(store.processL1ToL2Messages([message], [], 1n)).resolves.toEqual(true);
await expect(store.processL1ToL2Messages([message], [], 2n)).resolves.toEqual(true);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]);
});

it('is idempotent', async () => {
const message = L1ToL2Message.random(Fr.random());
await expect(
store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 0),
]),
).resolves.toEqual(true);
await expect(store.processL1ToL2Messages([message], [], 1n)).resolves.toEqual(true);
await expect(store.processL1ToL2Messages([message], [], 1n)).resolves.toEqual(false);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]);
});
});

describe('getPendingL1ToL2Messages', () => {
it('returns previously stored pending L1 to L2 messages', async () => {
const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0);
await store.addPendingL1ToL2Messages([messageCtx]);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([messageCtx.message.entryKey!]);
const message = L1ToL2Message.random(Fr.random());
await store.processL1ToL2Messages([message], [], 1n);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]);
});

it('returns messages ordered by fee', async () => {
const messageCtxs = Array.from({ length: 3 }).map(
(_, i) => new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, i),
);
const messages = Array.from({ length: 3 }, () => L1ToL2Message.random(Fr.random()));

// add a duplicate message
messageCtxs.push(new PendingL1ToL2Message(messageCtxs[0].message, 1n, 3));
messages.push(messages[0]);

await store.addPendingL1ToL2Messages(messageCtxs);
await store.processL1ToL2Messages(messages, [], 1n);

messageCtxs.sort((a, b) => b.message.fee - a.message.fee);
await expect(store.getPendingL1ToL2MessageKeys(messageCtxs.length)).resolves.toEqual(
messageCtxs.map(({ message }) => message.entryKey!),
messages.sort((a, b) => b.fee - a.fee);
await expect(store.getPendingL1ToL2MessageKeys(messages.length)).resolves.toEqual(
messages.map(message => message.entryKey!),
);
});

Expand All @@ -221,117 +201,82 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch

describe('confirmL1ToL2Messages', () => {
it('updates a message from pending to confirmed', async () => {
const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0);
await store.addPendingL1ToL2Messages([messageCtx]);
await expect(store.confirmL1ToL2Messages([messageCtx.message.entryKey!])).resolves.toEqual(true);
const message = L1ToL2Message.random(Fr.random());
await store.processL1ToL2Messages([message], [], 1n);
await expect(store.confirmL1ToL2Messages([message.entryKey!], 1)).resolves.toEqual(true);
});

it('once confirmed, a message is no longer pending', async () => {
const pendingMessage = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0);
await store.addPendingL1ToL2Messages([pendingMessage]);
await store.confirmL1ToL2Messages([pendingMessage.message.entryKey!]);
const pendingMessage = L1ToL2Message.random(Fr.random());
await store.processL1ToL2Messages([pendingMessage], [], 1n);
await store.confirmL1ToL2Messages([pendingMessage.entryKey!], 1);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]);
});

it('once confirmed a message can also be pending if added again', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]);
await store.confirmL1ToL2Messages([message.entryKey!]);
await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]);
await store.processL1ToL2Messages([message], [], 1n);
await store.confirmL1ToL2Messages([message.entryKey!], 1);
await store.processL1ToL2Messages([message], [], 2n);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]);
});

it('once confirmed a message can remain pending if more of it were pending', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]);

await store.confirmL1ToL2Messages([message.entryKey!]);
await store.processL1ToL2Messages([message, message], [], 1n);
await store.confirmL1ToL2Messages([message.entryKey!], 1);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]);
});
});

describe('cancelL1ToL2Messages', () => {
it('cancels a pending message', async () => {
it('cancels a pending message in the same block', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]);
await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]);
await store.processL1ToL2Messages([message], [message.entryKey!], 1n);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]);
});

it('cancels only one of the pending messages if duplicates exist', async () => {
it('cancels only one of the pending messages if duplicates exist in the same block', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]);
await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 2n, 0)]);
await store.processL1ToL2Messages([message, message], [message.entryKey!], 1n);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey]);
});

it('once canceled a message can also be pending if added again', async () => {
it('cancels multiple pending messages in the same block', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]);

await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]);

await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]);
await store.processL1ToL2Messages([message, message], [message.entryKey!, message.entryKey!], 1n);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]);
});

it('allows adding and cancelling in the same block', async () => {
it('cancels a pending message in different blocks', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]);
await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]);
await store.processL1ToL2Messages([message], [], 1n);
await store.processL1ToL2Messages([], [message.entryKey!], 2n);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]);
});

it('allows duplicates cancellations in different positions in the same block', async () => {
it('cancels pending messages added in a previous block', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]);

await store.cancelPendingL1ToL2Messages([
new CancelledL1ToL2Message(message.entryKey!, 2n, 0),
new CancelledL1ToL2Message(message.entryKey!, 2n, 1),
]);

await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]);
await store.processL1ToL2Messages([message, message], [], 1n);
await store.processL1ToL2Messages([], [message.entryKey!], 2n);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey]);
});

it('allows duplicates cancellations in different blocks', async () => {
it('cancels multiple pending messages added in a previous block', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]);

await store.cancelPendingL1ToL2Messages([
new CancelledL1ToL2Message(message.entryKey!, 2n, 0),
new CancelledL1ToL2Message(message.entryKey!, 3n, 0),
]);

await store.processL1ToL2Messages([message, message], [], 1n);
await store.processL1ToL2Messages([], [message.entryKey!, message.entryKey!], 2n);
await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]);
});

it('is idempotent', async () => {
it('once canceled a message can also be pending if added again', async () => {
const message = L1ToL2Message.random(Fr.random());
await store.addPendingL1ToL2Messages([
new PendingL1ToL2Message(message, 1n, 0),
new PendingL1ToL2Message(message, 1n, 1),
]);
await store.processL1ToL2Messages([message], [message.entryKey!], 1n);

await store.cancelPendingL1ToL2Messages([
new CancelledL1ToL2Message(message.entryKey!, 2n, 0),
new CancelledL1ToL2Message(message.entryKey!, 2n, 0),
]);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]);

await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]);
await store.processL1ToL2Messages([message], [], 2n);
await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]);
});
});

Expand Down Expand Up @@ -392,10 +337,9 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch
const secondContract = ExtendedContractData.random(block.newContractData[1]);
await store.addExtendedContractData([secondContract], block.number);

await expect(store.getExtendedContractDataInBlock(block.number)).resolves.toEqual([
firstContract,
secondContract,
]);
await expect(store.getExtendedContractDataInBlock(block.number)).resolves.toEqual(
expect.arrayContaining([firstContract, secondContract]),
);
});
});

Expand Down
Loading

0 comments on commit 9eae4b7

Please sign in to comment.