From 9443e8ea62237201342f111d846d321612fa2bb3 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 6 Dec 2024 12:08:28 -0300 Subject: [PATCH] chore: Batch archiver requests (#10442) Instead of downloading every block and keeping it in memory, download in batches of (configurable) 100 blocks and store each batch as it is retrieved. Also downloads blocks in parallel within the batch once their logs have been retrieved. Following is an `info` level logging for the archiver, with a batch size of 5 (for testing) and an artifical delay of 1s for fetching logs. Notice the timestamps: ``` [11:10:38.855] INFO: archiver Starting archiver sync to rollup contract 0x2b09a171300b3820c14f2faea559acc9270787de from L1 block 20 to current L1 block 265 [11:10:41.907] INFO: archiver Downloaded L2 block 2 blockHash: "0x0d088ff05560afa67b8a1a9d7099310ba3268426dd4a488bcfdff6fdef59607f" blockNumber: 2 [11:10:41.907] INFO: archiver Downloaded L2 block 1 blockHash: "0x150e8994ed59731daaed645e9e3d1687d7eb3648d08eca027f3dacf2deafdd71" blockNumber: 1 [11:10:43.957] INFO: archiver Downloaded L2 block 3 blockHash: "0x1b0fe9562164a3e081dede1a47765be7f7f1d53842af58b4240686dad4270878" blockNumber: 3 [11:10:43.958] INFO: archiver Downloaded L2 block 5 blockHash: "0x27be6c799c01cd97c203c5141deae0f9a1eb694fc5aaf57a06a90bff9d910916" blockNumber: 5 [11:10:43.959] INFO: archiver Downloaded L2 block 4 blockHash: "0x22904cda410acba1ed66b180bbd0057b2a2cf61564267ac87ba77a237f961986" blockNumber: 4 [11:10:43.959] INFO: archiver Downloaded L2 block 6 blockHash: "0x12857e4319f643ad58f7e2a95470b482b166897038d23263a97c277dbeb82b54" blockNumber: 6 [11:10:44.991] INFO: archiver Downloaded L2 block 7 blockHash: "0x2e82579f660cae4a1c38c80f781d4dc03adf97aa8098b2d6e4157b56c79e415b" blockNumber: 7 [11:10:44.991] INFO: archiver Downloaded L2 block 9 blockHash: "0x01848df8c72f6c51e23e5aa898bdae14a381ff4de7a28e592b0529e1a114a3af" blockNumber: 9 [11:10:44.991] INFO: archiver Downloaded L2 block 8 blockHash: "0x29ca2d4898d1a540d46f45e9a20c029d6765167e031ff7d51701be109189f8ae" blockNumber: 8 [11:10:44.992] INFO: archiver Downloaded L2 block 10 blockHash: "0x0b286a06ffef588628974f94c376829a51f36aa59668f766c1bc4896f0690e75" blockNumber: 10 [11:10:46.027] INFO: archiver Downloaded L2 block 11 blockHash: "0x2a05ceb6fd3c9400a73480b4bf796dbdd8f46cbb35e37e1165e50beb514bb3a4" blockNumber: 11 [11:10:46.027] INFO: archiver Downloaded L2 block 14 blockHash: "0x179ad2642bce681dabbf4a277ef2cd5e1f7b6812bd9fac9358a6346187b81eb6" blockNumber: 14 [11:10:46.027] INFO: archiver Downloaded L2 block 12 blockHash: "0x2242c7eaa8133409069ca6c6f466b08672f0c3308a0ce89c4306908f1ebd2f4d" blockNumber: 12 [11:10:46.028] INFO: archiver Downloaded L2 block 13 blockHash: "0x19070934599ca22124e158f29e3476fd69575773dc987cd93aa3dc25341c2ce0" blockNumber: 13 [11:10:47.060] INFO: archiver Downloaded L2 block 18 blockHash: "0x0b3b73690622ab3110b633d306e77e7ba17dfc52727e47177f274211e85faa5c" blockNumber: 18 [11:10:47.060] INFO: archiver Downloaded L2 block 15 blockHash: "0x1641ce6f8bf798f9e9adb91b74fc0f1a13b822156309e5c9b4139b705ca65854" blockNumber: 15 [11:10:47.060] INFO: archiver Downloaded L2 block 16 blockHash: "0x11d802bf4c814458d463479420dc88fc5fa0ccd114bab910f23a8e028d6af907" blockNumber: 16 [11:10:47.061] INFO: archiver Downloaded L2 block 17 blockHash: "0x2f16f9fe977fd0202b46b494f35a313353eb90eb91402925110422419ada04f3" blockNumber: 17 [11:10:48.094] INFO: archiver Downloaded L2 block 21 blockHash: "0x2bd6a5e4433f87eb5290c0d9daef97ab0819137e67a1b83a2d232c95f887bac5" blockNumber: 21 [11:10:48.094] INFO: archiver Downloaded L2 block 22 blockHash: "0x0fba866a869b819e181f95743047998e6f9d516fc4c40247e72ca76ecf6cafe3" blockNumber: 22 [11:10:48.094] INFO: archiver Downloaded L2 block 20 blockHash: "0x168ca8474eef62fc291eb9c1e73973ffc2b820961d9285b00df3666d617b8f8e" blockNumber: 20 [11:10:48.095] INFO: archiver Downloaded L2 block 19 blockHash: "0x1aa0fdd5d089aae60efb7c1f27d4dff86ba6c0d0d5021e2cec30e94d957df26d" blockNumber: 19 [11:10:49.127] INFO: archiver Downloaded L2 block 25 blockHash: "0x00d65386773e5786f39883d9ce9905a17ab77abe30d1567bec168793ba5af1b8" blockNumber: 25 [11:10:49.128] INFO: archiver Downloaded L2 block 23 blockHash: "0x26fe1e7196f76d6b4ae0342b878ec21525e44ab6b9038d8014039807decf6ab5" blockNumber: 23 [11:10:49.128] INFO: archiver Downloaded L2 block 26 blockHash: "0x052ba994594f33ffa0a3801547351d8af8e1657888ff175381180b801041452f" blockNumber: 26 [11:10:49.128] INFO: archiver Downloaded L2 block 24 blockHash: "0x086bacb80c17ca96234eb727e931195916f6fb5fb5b145d8f96559394dde9e56" blockNumber: 24 [11:10:49.129] INFO: archiver Downloaded L2 block 22 blockHash: "0x0fba866a869b819e181f95743047998e6f9d516fc4c40247e72ca76ecf6cafe3" blockNumber: 22 [11:10:50.172] INFO: archiver Downloaded L2 block 30 blockHash: "0x091c1f8bcaf206eae58921ff2859320ed9e7822a82a95ff99fcc60fca0614a42" blockNumber: 30 [11:10:50.173] INFO: archiver Downloaded L2 block 29 blockHash: "0x2614dc5fc6408580f564b057fac6c3afa2d0e8fd2be428a3481d12ae89ade7e9" blockNumber: 29 [11:10:50.173] INFO: archiver Downloaded L2 block 27 blockHash: "0x140a16edfa828dc786246cb3a834c3103538a19c79d21f6298c4090c49fa8e0e" blockNumber: 27 [11:10:50.173] INFO: archiver Downloaded L2 block 28 blockHash: "0x1d7793e2df27dfad0f8a41bbb4d13e1701be133ecc117b2c8a2c02abb51dd513" blockNumber: 28 [11:10:50.173] INFO: archiver Downloaded L2 block 26 blockHash: "0x052ba994594f33ffa0a3801547351d8af8e1657888ff175381180b801041452f" blockNumber: 26 [11:10:51.207] INFO: archiver Downloaded L2 block 30 blockHash: "0x091c1f8bcaf206eae58921ff2859320ed9e7822a82a95ff99fcc60fca0614a42" blockNumber: 30 [11:10:51.207] INFO: archiver Downloaded L2 block 32 blockHash: "0x204264d3fe9f82b61bb1405bb02cfc59299178dfdebeba75e0691597fd59c000" blockNumber: 32 [11:10:51.208] INFO: archiver Downloaded L2 block 34 blockHash: "0x164d62ba59c01e9a29740d4c83368e15de8bb5a5b8621e8bdd1d0e0978e99d79" blockNumber: 34 [11:10:51.208] INFO: archiver Downloaded L2 block 33 blockHash: "0x007dcb5015c13006cabda887a0e023899e5c327a340179dbd3ec56edc58e146a" blockNumber: 33 [11:10:51.208] INFO: archiver Downloaded L2 block 31 blockHash: "0x00ad45cc4b2b7f0916bd86c11d12628321715b43e204662d15c2c174285ff608" blockNumber: 31 [11:10:53.242] INFO: archiver Downloaded L2 block 34 blockHash: "0x164d62ba59c01e9a29740d4c83368e15de8bb5a5b8621e8bdd1d0e0978e99d79" blockNumber: 34 [11:10:53.243] INFO: archiver Downloaded L2 block 35 blockHash: "0x19252e14d4fd8af5e47651cf31abaeb310a6a59c9754d642fba30ac6853fd076" blockNumber: 35 [11:10:53.244] INFO: archiver Downloaded L2 block 37 blockHash: "0x17ee8d4ba85eb5b2f6a0bcb70115bdd94c90c1dc8fc89f1c9d2418ab944395ac" blockNumber: 37 [11:10:53.244] INFO: archiver Downloaded L2 block 36 blockHash: "0x0b0b0f4694fe81717cb30f59f5dc5d6dc5b632c25832ac424eea074d0c3c19a7" blockNumber: 36 [11:10:55.285] INFO: archiver Downloaded L2 block 41 blockHash: "0x2f7f979d0bae806fcd0bfaed001c1286ae569ecf85aa4063b0906f2097dd5c4c" blockNumber: 41 [11:10:55.285] INFO: archiver Downloaded L2 block 38 blockHash: "0x016418f770c61aa2e649433ab59c2a124acb8e10ac395bb3e1f8dbf84f405039" blockNumber: 38 [11:10:55.286] INFO: archiver Downloaded L2 block 39 blockHash: "0x1588ad911757f334ad2e969588f7fe149bde4a49c763c636af0c6ffb4f475660" blockNumber: 39 [11:10:55.287] INFO: archiver Downloaded L2 block 40 blockHash: "0x06e69f6ba327f6acb88de656058f68eb4998dbee0fec04cf891aabf3b047b071" blockNumber: 40 [11:10:57.327] INFO: archiver Downloaded L2 block 42 blockHash: "0x1ea461af6e875e1bb91dd729f196245ac54b2652799172bba10f77fa2a2fb248" blockNumber: 42 [11:10:57.328] INFO: archiver Downloaded L2 block 45 blockHash: "0x1f7e13643d2286e77dc8de2e37c73d230b77f4cbd7e11fdb9eb7b1c136fe180a" blockNumber: 45 [11:10:57.328] INFO: archiver Downloaded L2 block 43 blockHash: "0x0d89250055f022310144c869ecfdf8da7099fd9bd5e5680687a64b8ad7e588ce" blockNumber: 43 [11:10:57.329] INFO: archiver Downloaded L2 block 44 blockHash: "0x2212d3ae031d4549789c39a403f91f4f06e4e031244700dbb56b381d7730c203" blockNumber: 44 [11:10:59.363] INFO: archiver Downloaded L2 block 46 blockHash: "0x04028bca36e3e6644ceef2425c636ba594e73f4e50d673766e6d4ce8e8b256fb" blockNumber: 46 [11:10:59.364] INFO: archiver Downloaded L2 block 49 blockHash: "0x035b4fc20e48c41bc850e3a4c3c17f4d2004ef3704c660275b63c04925a7e139" blockNumber: 49 [11:10:59.364] INFO: archiver Downloaded L2 block 48 blockHash: "0x2993838786ed7f76d994e9e86f8801657666857b2e46ba9c874c9512a701d112" blockNumber: 48 [11:10:59.365] INFO: archiver Downloaded L2 block 47 blockHash: "0x075078e4ad9c68625585d02bc501bf5645c7cd6601c91090daa88915c8c97a8d" blockNumber: 47 [11:11:00.405] INFO: archiver Downloaded L2 block 53 blockHash: "0x17bd954fbe9ed7c3c376fb340c302dd97297284ebc5c3a5c65f1693a5b26fa0a" blockNumber: 53 [11:11:00.405] INFO: archiver Downloaded L2 block 50 blockHash: "0x0c95eeb66fa8e7704940d81faa1f6d084bc38dc7ffd7a697f1344c50507088dc" blockNumber: 50 [11:11:00.406] INFO: archiver Downloaded L2 block 52 blockHash: "0x2841bc4f270bbc36e31336bbb1e22d90de24134beb16568eda8352b80d364b3b" blockNumber: 52 [11:11:00.406] INFO: archiver Downloaded L2 block 51 blockHash: "0x2dd14899c2e30294e35e795d267a36890f5971d2e265e59645ed5df9acf1b336" blockNumber: 51 [11:11:01.460] INFO: archiver Downloaded L2 block 57 blockHash: "0x1b55c1c5375a29d7f5793c19af6fbf0a5b8183eab97695298e0b1eafd61693b5" blockNumber: 57 [11:11:01.461] INFO: archiver Downloaded L2 block 55 blockHash: "0x19635ee907e378546db1a15810db50b5e77f526d68a19040c482d5ccefb0127e" blockNumber: 55 [11:11:01.461] INFO: archiver Downloaded L2 block 56 blockHash: "0x0847cee0bce17dface027efbb368ac35773f372fef20d9a4afa0947f216ac20d" blockNumber: 56 [11:11:01.463] INFO: archiver Downloaded L2 block 54 blockHash: "0x29043b57317ba3f06994a6638d2185b88b0b1f868e4ccbaf510671a951a3f424" blockNumber: 54 [11:11:02.517] INFO: archiver Downloaded L2 block 58 blockHash: "0x00e95202a2906628cba2c0061519b064757b7a7884e6a8a584b08cfc3baf3825" blockNumber: 58 [11:11:02.518] INFO: archiver Downloaded L2 block 61 blockHash: "0x1b3425cb6f5e199f6f5e0848d3f741a8980fef5fb5fff3a142163c460726dfa9" blockNumber: 61 [11:11:02.518] INFO: archiver Downloaded L2 block 59 blockHash: "0x1e80799672dbfd1fad11e0607cc063e9c0749e8f8b82474f0d579ceb66d5db91" blockNumber: 59 [11:11:02.519] INFO: archiver Downloaded L2 block 60 blockHash: "0x2212c79084a2793836b5fdf446bdd2c4def00d68ec64ebae90b64a16664c6b30" blockNumber: 60 [11:11:03.572] INFO: archiver Downloaded L2 block 62 blockHash: "0x0fd061e3ee9e44a697d4fe81160897cda6df121937b385db7d0b24900ef62a23" blockNumber: 62 [11:11:03.573] INFO: archiver Downloaded L2 block 63 blockHash: "0x2dffde0e911c1f41427d1122cbf6dd28823560c826381094c7f6f26c01f97271" blockNumber: 63 [11:11:03.574] INFO: archiver Downloaded L2 block 65 blockHash: "0x228f48d523eafdc673152ae8e9510eaf7bb856390a0c3d43d8cca97eda0ed217" blockNumber: 65 [11:11:03.574] INFO: archiver Downloaded L2 block 64 blockHash: "0x2bcd0a71d39f8d2e5a290608d8c719e20b264e62227c68152fcd5541f753e25e" blockNumber: 64 [11:11:04.638] INFO: archiver Downloaded L2 block 69 blockHash: "0x1cb98870e6ffbab13afb58add2b024ef29cf9345cced66cd8f0f8f1e0e675ac1" blockNumber: 69 [11:11:04.639] INFO: archiver Downloaded L2 block 67 blockHash: "0x03c0de0563d63f8365e58d34f3ff55f50c85cafdbf97d31b4e698be2720483b9" blockNumber: 67 [11:11:04.640] INFO: archiver Downloaded L2 block 66 blockHash: "0x164d1b0a7ff82e959330bb7a9af22e9f3bf9aa85351891a108cd87586fba048d" blockNumber: 66 [11:11:04.640] INFO: archiver Downloaded L2 block 68 blockHash: "0x0aa4502055e2ee2f44548ece76c8829fdf79b5a514bf40ae238ff62685f8a23b" blockNumber: 68 [11:11:04.641] INFO: archiver Downloaded L2 block 65 blockHash: "0x228f48d523eafdc673152ae8e9510eaf7bb856390a0c3d43d8cca97eda0ed217" blockNumber: 65 [11:11:05.672] INFO: archiver Downloaded L2 block 71 blockHash: "0x2b86465312009ffea5dbc436e97b47e11817068520b3d7f04c1117fe6e969630" blockNumber: 71 [11:11:05.672] INFO: archiver Downloaded L2 block 70 blockHash: "0x2951e898e112dcde4d91d1ca32ef23522baed0aa064a53864f7ea87770b44b32" blockNumber: 70 [11:11:05.672] INFO: archiver Downloaded L2 block 69 blockHash: "0x1cb98870e6ffbab13afb58add2b024ef29cf9345cced66cd8f0f8f1e0e675ac1" blockNumber: 69 [11:11:05.673] INFO: archiver Downloaded L2 block 73 blockHash: "0x2c95db4f54323cb8ef5e9668570752a237b3b176c9d298e3cae650909593514d" blockNumber: 73 [11:11:05.673] INFO: archiver Downloaded L2 block 72 blockHash: "0x210e66af85dc45471729d1ed625eef96ece7a15f54a0dfb18aac0ee35721ec1c" blockNumber: 72 [11:11:06.721] INFO: archiver Downloaded L2 block 77 blockHash: "0x15f71bdd862b0d0016d1001a70b912cf32321e7347e02c3827a64f1bee139922" blockNumber: 77 [11:11:06.721] INFO: archiver Downloaded L2 block 73 blockHash: "0x2c95db4f54323cb8ef5e9668570752a237b3b176c9d298e3cae650909593514d" blockNumber: 73 [11:11:06.722] INFO: archiver Downloaded L2 block 74 blockHash: "0x29fda4905834eb581d02541456bb48653d29e89790ab1203a348cb0d663d4636" blockNumber: 74 [11:11:06.722] INFO: archiver Downloaded L2 block 75 blockHash: "0x1f4118d232a014a3c7776cac40fd1dc1baf219e811726002f715e9abe1120f09" blockNumber: 75 [11:11:06.722] INFO: archiver Downloaded L2 block 76 blockHash: "0x0342ab801e16f9146b109adb704b3d442b45877bf4cf0f38ecbe65e53e9ff447" blockNumber: 76 [11:11:07.767] INFO: archiver Downloaded L2 block 77 blockHash: "0x15f71bdd862b0d0016d1001a70b912cf32321e7347e02c3827a64f1bee139922" blockNumber: 77 [11:11:07.767] INFO: archiver Downloaded L2 block 79 blockHash: "0x1897627bbe054c2dbf1c19062c50b4bab839fc4f119d0683af625c9ab6eef20a" blockNumber: 79 [11:11:07.767] INFO: archiver Downloaded L2 block 81 blockHash: "0x0f502ab8cdb5ab5dcc5dd194e53775c4e512ffdd0645ea363ab52921072c2003" blockNumber: 81 [11:11:07.768] INFO: archiver Downloaded L2 block 78 blockHash: "0x263a83bf5c2410135a08fb48b0323fad959742c5d3d0376b11c55009e0961003" blockNumber: 78 [11:11:07.768] INFO: archiver Downloaded L2 block 80 blockHash: "0x15669498212e68da4e015a4eb90fe1b9404f7e33fa922b242b2c3982c2ac975a" blockNumber: 80 [11:11:08.806] INFO: archiver Downloaded L2 block 81 blockHash: "0x0f502ab8cdb5ab5dcc5dd194e53775c4e512ffdd0645ea363ab52921072c2003" blockNumber: 81 [11:11:08.807] INFO: archiver Downloaded L2 block 82 blockHash: "0x24f6959cf5f228e97ff2934de0b68ee6bdd4b0bc9cef9f6fb60da7e60b354dfb" blockNumber: 82 [11:11:08.811] INFO: archiver Updated proven chain to block 81 (epoch 7) provenBlockNumber: 81 provenEpochNumber: 7 [11:11:08.813] INFO: archiver Initial archiver sync to L1 block 265 complete. ``` Fixes #10428 Fixes https://github.com/AztecProtocol/aztec-packages/issues/10242 --- yarn-project/archiver/package.json | 2 - .../archiver/src/archiver/archiver.test.ts | 205 ++++++++---------- .../archiver/src/archiver/archiver.ts | 194 +++++++++-------- yarn-project/archiver/src/archiver/config.ts | 24 +- .../archiver/src/archiver/data_retrieval.ts | 29 +-- yarn-project/archiver/src/index.ts | 64 +----- yarn-project/foundation/src/config/env_var.ts | 1 + 7 files changed, 220 insertions(+), 299 deletions(-) diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 3366d4b3f1b..22579ff402e 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -22,8 +22,6 @@ "formatting": "run -T prettier --check ./src && run -T eslint ./src", "formatting:fix": "run -T eslint --fix ./src && run -T prettier -w ./src", "test": "NODE_NO_WARNINGS=1 node --experimental-vm-modules ../node_modules/.bin/jest --passWithNoTests", - "start": "node ./dest", - "start:dev": "tsc-watch -p tsconfig.json --onSuccess 'yarn start'", "test:integration": "concurrently -k -s first -c reset,dim -n test,anvil \"yarn test:integration:run\" \"anvil\"", "test:integration:run": "NODE_NO_WARNINGS=1 node --experimental-vm-modules $(yarn bin jest) --no-cache --config jest.integration.config.json" }, diff --git a/yarn-project/archiver/src/archiver/archiver.test.ts b/yarn-project/archiver/src/archiver/archiver.test.ts index bf8621b6e5d..87173270dc5 100644 --- a/yarn-project/archiver/src/archiver/archiver.test.ts +++ b/yarn-project/archiver/src/archiver/archiver.test.ts @@ -56,14 +56,32 @@ describe('Archiver', () => { let archiver: Archiver; let blocks: L2Block[]; + let l2BlockProposedLogs: Log[]; + let l2MessageSentLogs: Log[]; + const GENESIS_ROOT = new Fr(GENESIS_ARCHIVE_ROOT).toString(); beforeEach(() => { now = +new Date(); publicClient = mock>({ + // Return a block with a reasonable timestamp getBlock: ((args: any) => ({ timestamp: args.blockNumber * BigInt(DefaultL1ContractsConfig.ethereumSlotDuration) + BigInt(now), })) as any, + // Return the logs mocked whenever the public client is queried + getLogs: ((args: any) => { + let logs = undefined; + if (args!.event!.name === 'MessageSent') { + logs = l2MessageSentLogs; + } else if (args!.event!.name === 'L2BlockProposed') { + logs = l2BlockProposedLogs; + } else { + throw new Error(`Unknown event: ${args!.event!.name}`); + } + return Promise.resolve( + logs.filter(log => log.blockNumber >= args.fromBlock && log.blockNumber <= args.toBlock), + ); + }) as any, }); instrumentation = mock({ isEnabled: () => true }); @@ -71,12 +89,17 @@ describe('Archiver', () => { archiver = new Archiver( publicClient, - rollupAddress, - inboxAddress, - registryAddress, + { rollupAddress, inboxAddress, registryAddress }, archiverStore, - 1000, + { pollingIntervalMs: 1000, batchSize: 1000 }, instrumentation, + { + l1GenesisTime: BigInt(now), + l1StartBlock: 0n, + epochDuration: 4, + slotDuration: 24, + ethereumSlotDuration: 12, + }, ); blocks = blockNumbers.map(x => L2Block.random(x, txsPerBlock, x + 1, 2)); @@ -97,6 +120,9 @@ describe('Archiver', () => { inboxRead = mock(); ((archiver as any).inbox as any).read = inboxRead; + + l2MessageSentLogs = []; + l2BlockProposedLogs = []; }); afterEach(async () => { @@ -127,27 +153,16 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(2n).mockResolvedValueOnce(6n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(98n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(99n, 1n, 1n), - ], - L2BlockProposed: [makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString())], - }); - - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(2504n, 2n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(2505n, 2n, 1n), - makeMessageSentEventWithIndexInL2BlockSubtree(2505n, 2n, 2n), - makeMessageSentEventWithIndexInL2BlockSubtree(2506n, 3n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(2510n, 2n, blocks[1].archive.root.toString()), - makeL2BlockProposedEvent(2520n, 3n, blocks[2].archive.root.toString()), - ], - }); + makeMessageSentEvent(98n, 1n, 0n); + makeMessageSentEvent(99n, 1n, 1n); + makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString()); + makeMessageSentEvent(2504n, 2n, 0n); + makeMessageSentEvent(2505n, 2n, 1n); + makeMessageSentEvent(2505n, 2n, 2n); + makeMessageSentEvent(2506n, 3n, 1n); + makeL2BlockProposedEvent(2510n, 2n, blocks[1].archive.root.toString()); + makeL2BlockProposedEvent(2520n, 3n, blocks[2].archive.root.toString()); publicClient.getTransaction.mockResolvedValueOnce(rollupTxs[0]); rollupTxs.slice(1).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -228,17 +243,11 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(2n).mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - makeL2BlockProposedEvent(90n, 3n, badArchive), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); + makeL2BlockProposedEvent(90n, 3n, badArchive); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -250,8 +259,10 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - const errorMessage = `Archive mismatch matching, ignoring block ${3} with archive: ${badArchive}, expected ${blocks[2].archive.root.toString()}`; - expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + expect(loggerSpy).toHaveBeenCalledWith(expect.stringMatching(/archive root mismatch/i), { + actual: badArchive, + expected: blocks[2].archive.root.toString(), + }); }, 10_000); it('skip event search if no changes found', async () => { @@ -271,16 +282,10 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(0n).mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -292,10 +297,7 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - - // For some reason, this is 1-indexed. - expect(loggerSpy).toHaveBeenNthCalledWith(1, `Retrieved no new L1 to L2 messages between L1 blocks 1 and 50.`); - expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from 1 to 50`); + expect(loggerSpy).toHaveBeenCalledWith(`No blocks to retrieve from 1 to 50`); }, 10_000); it('handles L2 reorg', async () => { @@ -328,16 +330,10 @@ describe('Archiver', () => { .mockResolvedValueOnce(2n) .mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -350,14 +346,12 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - // For some reason, this is 1-indexed. - expect(loggerSpy).toHaveBeenNthCalledWith(1, `Retrieved no new L1 to L2 messages between L1 blocks 1 and 50.`); - expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from 1 to 50`); + expect(loggerSpy).toHaveBeenCalledWith(`No blocks to retrieve from 1 to 50`); // Lets take a look to see if we can find re-org stuff! await sleep(1000); - expect(loggerSpy).toHaveBeenNthCalledWith(9, `L2 prune has been detected.`); + expect(loggerSpy).toHaveBeenCalledWith(`L2 prune has been detected.`); // Should also see the block number be reduced latestBlockNum = await archiver.getBlockNumber(); @@ -375,57 +369,40 @@ describe('Archiver', () => { // TODO(palla/reorg): Add a unit test for the archiver handleEpochPrune xit('handles an upcoming L2 prune', () => {}); - // logs should be created in order of how archiver syncs. - const mockGetLogs = (logs: { - messageSent?: ReturnType[]; - L2BlockProposed?: ReturnType[]; - }) => { - if (logs.messageSent) { - publicClient.getLogs.mockResolvedValueOnce(logs.messageSent); - } - if (logs.L2BlockProposed) { - publicClient.getLogs.mockResolvedValueOnce(logs.L2BlockProposed); - } + /** + * Makes a fake L2BlockProposed event for testing purposes and registers it to be returned by the public client. + * @param l1BlockNum - L1 block number. + * @param l2BlockNum - L2 Block number. + */ + const makeL2BlockProposedEvent = (l1BlockNum: bigint, l2BlockNum: bigint, archive: `0x${string}`) => { + const log = { + blockNumber: l1BlockNum, + args: { blockNumber: l2BlockNum, archive }, + transactionHash: `0x${l2BlockNum}`, + } as Log; + l2BlockProposedLogs.push(log); }; -}); - -/** - * Makes a fake L2BlockProposed event for testing purposes. - * @param l1BlockNum - L1 block number. - * @param l2BlockNum - L2 Block number. - * @returns An L2BlockProposed event log. - */ -function makeL2BlockProposedEvent(l1BlockNum: bigint, l2BlockNum: bigint, archive: `0x${string}`) { - return { - blockNumber: l1BlockNum, - args: { blockNumber: l2BlockNum, archive }, - transactionHash: `0x${l2BlockNum}`, - } as Log; -} -/** - * Makes fake L1ToL2 MessageSent events for testing purposes. - * @param l1BlockNum - L1 block number. - * @param l2BlockNumber - The L2 block number for which the message was included. - * @param indexInSubtree - the index in the l2Block's subtree in the L1 to L2 Messages Tree. - * @returns MessageSent event logs. - */ -function makeMessageSentEventWithIndexInL2BlockSubtree( - l1BlockNum: bigint, - l2BlockNumber: bigint, - indexInSubtree: bigint, -) { - const index = indexInSubtree + InboxLeaf.smallestIndexFromL2Block(l2BlockNumber); - return { - blockNumber: l1BlockNum, - args: { - l2BlockNumber, - index, - hash: Fr.random().toString(), - }, - transactionHash: `0x${l1BlockNum}`, - } as Log; -} + /** + * Makes fake L1ToL2 MessageSent events for testing purposes and registers it to be returned by the public client. + * @param l1BlockNum - L1 block number. + * @param l2BlockNumber - The L2 block number for which the message was included. + * @param indexInSubtree - the index in the l2Block's subtree in the L1 to L2 Messages Tree. + */ + const makeMessageSentEvent = (l1BlockNum: bigint, l2BlockNumber: bigint, indexInSubtree: bigint) => { + const index = indexInSubtree + InboxLeaf.smallestIndexFromL2Block(l2BlockNumber); + const log = { + blockNumber: l1BlockNum, + args: { + l2BlockNumber, + index, + hash: Fr.random().toString(), + }, + transactionHash: `0x${l1BlockNum}`, + } as Log; + l2MessageSentLogs.push(log); + }; +}); /** * Makes a fake rollup tx for testing purposes. diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 831295da0be..83510020b72 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -38,7 +38,7 @@ import { Fr } from '@aztec/foundation/fields'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { count } from '@aztec/foundation/string'; -import { Timer } from '@aztec/foundation/timer'; +import { elapsed } from '@aztec/foundation/timer'; import { InboxAbi, RollupAbi } from '@aztec/l1-artifacts'; import { ContractClassRegisteredEvent, @@ -61,7 +61,7 @@ import { import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js'; import { type ArchiverConfig } from './config.js'; -import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js'; +import { retrieveBlocksFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js'; import { getEpochNumberAtTimestamp, getSlotAtTimestamp, @@ -112,25 +112,23 @@ export class Archiver implements ArchiveSource { */ constructor( private readonly publicClient: PublicClient, - private readonly rollupAddress: EthAddress, - readonly inboxAddress: EthAddress, - private readonly registryAddress: EthAddress, + private readonly l1Addresses: { rollupAddress: EthAddress; inboxAddress: EthAddress; registryAddress: EthAddress }, readonly dataStore: ArchiverDataStore, - private readonly pollingIntervalMs: number, + private readonly config: { pollingIntervalMs: number; batchSize: number }, private readonly instrumentation: ArchiverInstrumentation, - private readonly l1constants: L1RollupConstants = EmptyL1RollupConstants, + private readonly l1constants: L1RollupConstants, private readonly log: DebugLogger = createDebugLogger('aztec:archiver'), ) { this.store = new ArchiverStoreHelper(dataStore); this.rollup = getContract({ - address: rollupAddress.toString(), + address: l1Addresses.rollupAddress.toString(), abi: RollupAbi, client: publicClient, }); this.inbox = getContract({ - address: inboxAddress.toString(), + address: l1Addresses.inboxAddress.toString(), abi: InboxAbi, client: publicClient, }); @@ -171,11 +169,12 @@ export class Archiver implements ArchiveSource { const archiver = new Archiver( publicClient, - config.l1Contracts.rollupAddress, - config.l1Contracts.inboxAddress, - config.l1Contracts.registryAddress, + config.l1Contracts, archiverStore, - config.archiverPollingIntervalMS ?? 10_000, + { + pollingIntervalMs: config.archiverPollingIntervalMS ?? 10_000, + batchSize: config.archiverBatchSize ?? 100, + }, new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()), { l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration }, ); @@ -196,7 +195,7 @@ export class Archiver implements ArchiveSource { await this.sync(blockUntilSynced); } - this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs); + this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs); this.runningPromise.start(); } @@ -213,9 +212,8 @@ export class Archiver implements ArchiveSource { /** * Fetches logs from L1 contracts and processes them. - * @param blockUntilSynced - If true, blocks until the archiver has fully synced. */ - private async sync(blockUntilSynced: boolean) { + private async sync(initialRun: boolean) { /** * We keep track of three "pointers" to L1 blocks: * 1. the last L1 block that published an L2 block @@ -232,9 +230,9 @@ export class Archiver implements ArchiveSource { const { blocksSynchedTo = l1StartBlock, messagesSynchedTo = l1StartBlock } = await this.store.getSynchPoint(); const currentL1BlockNumber = await this.publicClient.getBlockNumber(); - if (blockUntilSynced) { + if (initialRun) { this.log.info( - `Starting archiver sync to rollup contract ${this.rollupAddress.toString()} from L1 block ${Math.min( + `Starting archiver sync to rollup contract ${this.l1Addresses.rollupAddress.toString()} from L1 block ${Math.min( Number(blocksSynchedTo), Number(messagesSynchedTo), )} to current L1 block ${currentL1BlockNumber}`, @@ -261,7 +259,7 @@ export class Archiver implements ArchiveSource { */ // ********** Events that are processed per L1 block ********** - await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber); + await this.handleL1ToL2Messages(messagesSynchedTo, currentL1BlockNumber); // Store latest l1 block number and timestamp seen. Used for epoch and slots calculations. if (!this.l1BlockNumber || this.l1BlockNumber < currentL1BlockNumber) { @@ -272,7 +270,7 @@ export class Archiver implements ArchiveSource { // ********** Events that are processed per L2 block ********** if (currentL1BlockNumber > blocksSynchedTo) { // First we retrieve new L2 blocks - const { provenBlockNumber } = await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber); + const { provenBlockNumber } = await this.handleL2blocks(blocksSynchedTo, currentL1BlockNumber); // And then we prune the current epoch if it'd reorg on next submission. // Note that we don't do this before retrieving L2 blocks because we may need to retrieve // blocks from more than 2 epochs ago, so we want to make sure we have the latest view of @@ -281,7 +279,7 @@ export class Archiver implements ArchiveSource { await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber); } - if (blockUntilSynced) { + if (initialRun) { this.log.info(`Initial archiver sync to L1 block ${currentL1BlockNumber} complete.`); } } @@ -311,11 +309,18 @@ export class Archiver implements ArchiveSource { } } - private async handleL1ToL2Messages( - blockUntilSynced: boolean, - messagesSynchedTo: bigint, - currentL1BlockNumber: bigint, - ) { + private nextRange(end: bigint, limit: bigint): [bigint, bigint] { + const batchSize = (this.config.batchSize * this.l1constants.slotDuration) / this.l1constants.ethereumSlotDuration; + const nextStart = end + 1n; + const nextEnd = nextStart + BigInt(batchSize); + if (nextEnd > limit) { + return [nextStart, limit]; + } + return [nextStart, nextEnd]; + } + + private async handleL1ToL2Messages(messagesSynchedTo: bigint, currentL1BlockNumber: bigint) { + this.log.trace(`Handling L1 to L2 messages from ${messagesSynchedTo} to ${currentL1BlockNumber}.`); if (currentL1BlockNumber <= messagesSynchedTo) { return; } @@ -325,30 +330,30 @@ export class Archiver implements ArchiveSource { if (localTotalMessageCount === destinationTotalMessageCount) { await this.store.setMessageSynchedL1BlockNumber(currentL1BlockNumber); - this.log.debug( + this.log.trace( `Retrieved no new L1 to L2 messages between L1 blocks ${messagesSynchedTo + 1n} and ${currentL1BlockNumber}.`, ); return; } - const retrievedL1ToL2Messages = await retrieveL1ToL2Messages( - this.inbox, - blockUntilSynced, - messagesSynchedTo + 1n, - currentL1BlockNumber, - ); - - await this.store.addL1ToL2Messages(retrievedL1ToL2Messages); - - this.log.verbose( - `Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${ - messagesSynchedTo + 1n - } and ${currentL1BlockNumber}.`, - ); + // Retrieve messages in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks, + let searchStartBlock: bigint = messagesSynchedTo; + let searchEndBlock: bigint = messagesSynchedTo; + do { + [searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber); + this.log.trace(`Retrieving L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`); + const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(this.inbox, searchStartBlock, searchEndBlock); + this.log.verbose( + `Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`, + ); + await this.store.addL1ToL2Messages(retrievedL1ToL2Messages); + for (const msg of retrievedL1ToL2Messages.retrievedData) { + this.log.debug(`Downloaded L1 to L2 message`, { leaf: msg.leaf.toString(), index: msg.index }); + } + } while (searchEndBlock < currentL1BlockNumber); } private async handleL2blocks( - blockUntilSynced: boolean, blocksSynchedTo: bigint, currentL1BlockNumber: bigint, ): Promise<{ provenBlockNumber: bigint }> { @@ -371,7 +376,10 @@ export class Archiver implements ArchiveSource { await this.store.setProvenL2BlockNumber(Number(provenBlockNumber)); // if we are here then we must have a valid proven epoch number await this.store.setProvenL2EpochNumber(Number(provenEpochNumber)); - this.log.info(`Updated proven chain`, { provenBlockNumber, provenEpochNumber }); + this.log.info(`Updated proven chain to block ${provenBlockNumber} (epoch ${provenEpochNumber})`, { + provenBlockNumber, + provenEpochNumber, + }); } this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber)); }; @@ -436,56 +444,60 @@ export class Archiver implements ArchiveSource { } } - // TODO(palla/log) Downgrade to trace - this.log.debug(`Retrieving L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`); - const retrievedBlocks = await retrieveBlockFromRollup( - this.rollup, - this.publicClient, - blockUntilSynced, - blocksSynchedTo + 1n, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier - currentL1BlockNumber, - this.log, - ); + // Retrieve L2 blocks in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks, + // computed using the L2 block time vs the L1 block time. + let searchStartBlock: bigint = blocksSynchedTo; + let searchEndBlock: bigint = blocksSynchedTo; + + do { + [searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber); + + this.log.trace(`Retrieving L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`); + const retrievedBlocks = await retrieveBlocksFromRollup( + this.rollup, + this.publicClient, + searchStartBlock, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier + searchEndBlock, + this.log, + ); - if (retrievedBlocks.length === 0) { - // We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura. - // See further details in earlier comments. - // TODO(palla/log) Downgrade to trace - this.log.debug(`Retrieved no new L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`); - return { provenBlockNumber }; - } + if (retrievedBlocks.length === 0) { + // We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura. + // See further details in earlier comments. + this.log.trace(`Retrieved no new L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`); + continue; + } - const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber; - this.log.debug( - `Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${ - blocksSynchedTo + 1n - } and ${currentL1BlockNumber} with last processed L1 block ${lastProcessedL1BlockNumber}.`, - ); + const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber; + this.log.debug( + `Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${searchStartBlock} and ${searchEndBlock} with last processed L1 block ${lastProcessedL1BlockNumber}.`, + ); - for (const block of retrievedBlocks) { - this.log.debug(`Ingesting new L2 block ${block.data.number}`, { - ...block.data.header.globalVariables.toInspect(), - blockHash: block.data.hash, - l1BlockNumber: block.l1.blockNumber, - }); - } + for (const block of retrievedBlocks) { + this.log.debug(`Ingesting new L2 block ${block.data.number} with ${block.data.body.txEffects.length} txs`, { + blockHash: block.data.hash(), + l1BlockNumber: block.l1.blockNumber, + ...block.data.header.globalVariables.toInspect(), + ...block.data.getStats(), + }); + } - const timer = new Timer(); - await this.store.addBlocks(retrievedBlocks); + const [processDuration] = await elapsed(() => this.store.addBlocks(retrievedBlocks)); + this.instrumentation.processNewBlocks( + processDuration / retrievedBlocks.length, + retrievedBlocks.map(b => b.data), + ); - for (const block of retrievedBlocks) { - this.log.info(`Downloaded L2 block ${block.data.number}`, { - blockHash: block.data.hash(), - blockNumber: block.data.number, - }); - } + for (const block of retrievedBlocks) { + this.log.info(`Downloaded L2 block ${block.data.number}`, { + blockHash: block.data.hash(), + blockNumber: block.data.number, + }); + } + } while (searchEndBlock < currentL1BlockNumber); // Important that we update AFTER inserting the blocks. await updateProvenBlock(); - this.instrumentation.processNewBlocks( - timer.ms() / retrievedBlocks.length, - retrievedBlocks.map(b => b.data), - ); return { provenBlockNumber }; } @@ -503,11 +515,11 @@ export class Archiver implements ArchiveSource { } public getRollupAddress(): Promise { - return Promise.resolve(this.rollupAddress); + return Promise.resolve(this.l1Addresses.rollupAddress); } public getRegistryAddress(): Promise { - return Promise.resolve(this.registryAddress); + return Promise.resolve(this.l1Addresses.registryAddress); } public getL1BlockNumber(): bigint { @@ -1096,11 +1108,3 @@ type L1RollupConstants = { epochDuration: number; ethereumSlotDuration: number; }; - -const EmptyL1RollupConstants: L1RollupConstants = { - l1StartBlock: 0n, - l1GenesisTime: 0n, - epochDuration: 0, - slotDuration: 0, - ethereumSlotDuration: 0, -}; diff --git a/yarn-project/archiver/src/archiver/config.ts b/yarn-project/archiver/src/archiver/config.ts index 6aa953cd087..d739314d468 100644 --- a/yarn-project/archiver/src/archiver/config.ts +++ b/yarn-project/archiver/src/archiver/config.ts @@ -18,24 +18,19 @@ import { type ConfigMappingsType, getConfigFromMappings, numberConfigHelper } fr * The archiver configuration. */ export type ArchiverConfig = { - /** - * URL for an archiver service. If set, will return an archiver client as opposed to starting a new one. - */ + /** URL for an archiver service. If set, will return an archiver client as opposed to starting a new one. */ archiverUrl?: string; - /** - * The polling interval in ms for retrieving new L2 blocks and encrypted logs. - */ + /** The polling interval in ms for retrieving new L2 blocks and encrypted logs. */ archiverPollingIntervalMS?: number; - /** - * The polling interval viem uses in ms - */ + /** The number of L2 blocks the archiver will attempt to download at a time. */ + archiverBatchSize?: number; + + /** The polling interval viem uses in ms */ viemPollingIntervalMS?: number; - /** - * The deployed L1 contract addresses - */ + /** The deployed L1 contract addresses */ l1Contracts: L1ContractAddresses; /** The max number of logs that can be obtained in 1 "getUnencryptedLogs" call. */ @@ -54,6 +49,11 @@ export const archiverConfigMappings: ConfigMappingsType = { description: 'The polling interval in ms for retrieving new L2 blocks and encrypted logs.', ...numberConfigHelper(1_000), }, + archiverBatchSize: { + env: 'ARCHIVER_BATCH_SIZE', + description: 'The number of L2 blocks the archiver will attempt to download at a time.', + ...numberConfigHelper(100), + }, maxLogs: { env: 'ARCHIVER_MAX_LOGS', description: 'The max number of logs that can be obtained in 1 "getUnencryptedLogs" call.', diff --git a/yarn-project/archiver/src/archiver/data_retrieval.ts b/yarn-project/archiver/src/archiver/data_retrieval.ts index 3249a5fc541..de09a98ac23 100644 --- a/yarn-project/archiver/src/archiver/data_retrieval.ts +++ b/yarn-project/archiver/src/archiver/data_retrieval.ts @@ -1,5 +1,6 @@ import { Body, InboxLeaf, L2Block } from '@aztec/circuit-types'; import { AppendOnlyTreeSnapshot, Fr, Header, Proof } from '@aztec/circuits.js'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { type EthAddress } from '@aztec/foundation/eth-address'; import { type ViemSignature } from '@aztec/foundation/eth-signature'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; @@ -25,16 +26,14 @@ import { type L1Published, type L1PublishedData } from './structs/published.js'; * Fetches new L2 blocks. * @param publicClient - The viem public client to use for transaction retrieval. * @param rollupAddress - The address of the rollup contract. - * @param blockUntilSynced - If true, blocks until the archiver has fully synced. * @param searchStartBlock - The block number to use for starting the search. * @param searchEndBlock - The highest block number that we should search up to. * @param expectedNextL2BlockNum - The next L2 block number that we expect to find. * @returns An array of block; as well as the next eth block to search from. */ -export async function retrieveBlockFromRollup( +export async function retrieveBlocksFromRollup( rollup: GetContractReturnType>, publicClient: PublicClient, - blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, logger: DebugLogger = createDebugLogger('aztec:archiver'), @@ -58,13 +57,13 @@ export async function retrieveBlockFromRollup( const lastLog = l2BlockProposedLogs[l2BlockProposedLogs.length - 1]; logger.debug( - `Got L2 block processed logs for ${l2BlockProposedLogs[0].blockNumber}-${lastLog.blockNumber} between ${searchStartBlock}-${searchEndBlock} L1 blocks`, + `Got ${l2BlockProposedLogs.length} L2 block processed logs for L2 blocks ${l2BlockProposedLogs[0].args.blockNumber}-${lastLog.args.blockNumber} between L1 blocks ${searchStartBlock}-${searchEndBlock}`, ); const newBlocks = await processL2BlockProposedLogs(rollup, publicClient, l2BlockProposedLogs, logger); retrievedBlocks.push(...newBlocks); searchStartBlock = lastLog.blockNumber! + 1n; - } while (blockUntilSynced && searchStartBlock <= searchEndBlock); + } while (searchStartBlock <= searchEndBlock); return retrievedBlocks; } @@ -82,14 +81,13 @@ export async function processL2BlockProposedLogs( logger: DebugLogger, ): Promise[]> { const retrievedBlocks: L1Published[] = []; - for (const log of logs) { + await asyncPool(10, logs, async log => { const l2BlockNumber = log.args.blockNumber!; const archive = log.args.archive!; const archiveFromChain = await rollup.read.archiveAt([l2BlockNumber]); // The value from the event and contract will match only if the block is in the chain. if (archive === archiveFromChain) { - // TODO: Fetch blocks from calldata in parallel const block = await getBlockFromRollupTx(publicClient, log.transactionHash!, l2BlockNumber); const l1: L1PublishedData = { @@ -100,11 +98,12 @@ export async function processL2BlockProposedLogs( retrievedBlocks.push({ data: block, l1 }); } else { - logger.warn( - `Archive mismatch matching, ignoring block ${l2BlockNumber} with archive: ${archive}, expected ${archiveFromChain}`, - ); + logger.warn(`Ignoring L2 block ${l2BlockNumber} due to archive root mismatch`, { + actual: archive, + expected: archiveFromChain, + }); } - } + }); return retrievedBlocks; } @@ -129,10 +128,7 @@ async function getBlockFromRollupTx( l2BlockNum: bigint, ): Promise { const { input: data } = await publicClient.getTransaction({ hash: txHash }); - const { functionName, args } = decodeFunctionData({ - abi: RollupAbi, - data, - }); + const { functionName, args } = decodeFunctionData({ abi: RollupAbi, data }); const allowedMethods = ['propose', 'proposeAndClaim']; @@ -184,7 +180,6 @@ async function getBlockFromRollupTx( */ export async function retrieveL1ToL2Messages( inbox: GetContractReturnType>, - blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, ): Promise> { @@ -213,7 +208,7 @@ export async function retrieveL1ToL2Messages( // handles the case when there are no new messages: searchStartBlock = (messageSentLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n; - } while (blockUntilSynced && searchStartBlock <= searchEndBlock); + } while (searchStartBlock <= searchEndBlock); return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedL1ToL2Messages }; } diff --git a/yarn-project/archiver/src/index.ts b/yarn-project/archiver/src/index.ts index 24112863fc1..4aa32e6d591 100644 --- a/yarn-project/archiver/src/index.ts +++ b/yarn-project/archiver/src/index.ts @@ -1,62 +1,8 @@ -import { jsonStringify } from '@aztec/foundation/json-rpc'; -import { createDebugLogger } from '@aztec/foundation/log'; -import { fileURLToPath } from '@aztec/foundation/url'; -import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; - -import { createPublicClient, http } from 'viem'; -import { localhost } from 'viem/chains'; - -import { Archiver, getArchiverConfigFromEnv } from './archiver/index.js'; -import { ArchiverInstrumentation } from './archiver/instrumentation.js'; -import { MemoryArchiverStore } from './archiver/memory_archiver_store/memory_archiver_store.js'; - export * from './archiver/index.js'; -export * from './rpc/index.js'; export * from './factory.js'; +export * from './rpc/index.js'; -export { retrieveL2ProofVerifiedEvents, retrieveBlockFromRollup } from './archiver/data_retrieval.js'; - -const log = createDebugLogger('aztec:archiver'); - -/** - * A function which instantiates and starts Archiver. - */ -// eslint-disable-next-line require-await -async function main() { - const config = getArchiverConfigFromEnv(); - const { l1RpcUrl: rpcUrl, l1Contracts } = config; - - log.info(`Starting archiver in main(): ${jsonStringify(config)}`); - const publicClient = createPublicClient({ - chain: localhost, - transport: http(rpcUrl), - }); - - const archiverStore = new MemoryArchiverStore(1000); - - const archiver = new Archiver( - publicClient, - l1Contracts.rollupAddress, - l1Contracts.inboxAddress, - l1Contracts.registryAddress, - archiverStore, - 1000, - new ArchiverInstrumentation(new NoopTelemetryClient()), - ); - - const shutdown = async () => { - await archiver.stop(); - process.exit(0); - }; - process.once('SIGINT', shutdown); - process.once('SIGTERM', shutdown); -} - -// See https://twitter.com/Rich_Harris/status/1355289863130673153 -if (process.argv[1] === fileURLToPath(import.meta.url).replace(/\/index\.js$/, '')) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - main().catch(err => { - log.error(err); - process.exit(1); - }); -} +export { + retrieveBlocksFromRollup as retrieveBlockFromRollup, + retrieveL2ProofVerifiedEvents, +} from './archiver/data_retrieval.js'; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index b12e7c3ea78..0d36c4cd300 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -8,6 +8,7 @@ export type EnvVar = | 'ARCHIVER_POLLING_INTERVAL_MS' | 'ARCHIVER_URL' | 'ARCHIVER_VIEM_POLLING_INTERVAL_MS' + | 'ARCHIVER_BATCH_SIZE' | 'ASSUME_PROVEN_THROUGH_BLOCK_NUMBER' | 'AZTEC_NODE_URL' | 'AZTEC_PORT'