From b4d082d0dd533a37c9bae3a023382931a8660809 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 8 Sep 2023 12:10:14 +0530 Subject: [PATCH] fix(rln-relay): missed roots during sync --- .../test_rln_group_manager_onchain.nim | 19 +++++++++ .../group_manager/on_chain/group_manager.nim | 39 +++++++------------ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim index 600b3e1b72..75a80bd305 100644 --- a/tests/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -193,6 +193,8 @@ suite "Onchain group manager": manager.initialized manager.rlnContractDeployedBlockNumber > 0 + await manager.stop() + asyncTest "should error on initialization when loaded metadata does not match": let manager = await setup() await manager.init() @@ -220,12 +222,14 @@ suite "Onchain group manager": await manager.init() await manager.startGroupSync() + await manager.stop() asyncTest "startGroupSync: should guard against uninitialized state": let manager = await setup() expect(ValueError): await manager.startGroupSync() + await manager.stop() asyncTest "startGroupSync: should sync to the state of the group": let manager = await setup() @@ -241,6 +245,7 @@ suite "Onchain group manager": proc generateCallback(fut: Future[void]): OnRegisterCallback = proc callback(registrations: seq[Membership]): Future[void] {.async.} = + debug "registrations", registrations require: registrations.len == 1 registrations[0].idCommitment == credentials.idCommitment @@ -262,6 +267,7 @@ suite "Onchain group manager": check: merkleRootBefore != merkleRootAfter + await manager.stop() asyncTest "startGroupSync: should fetch history correctly": let manager = await setup() @@ -303,6 +309,7 @@ suite "Onchain group manager": check: merkleRootBefore != merkleRootAfter manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize + await manager.stop() asyncTest "register: should guard against uninitialized state": let manager = await setup() @@ -310,6 +317,7 @@ suite "Onchain group manager": expect(ValueError): await manager.register(dummyCommitment) + await manager.stop() asyncTest "register: should register successfully": let manager = await setup() @@ -329,6 +337,7 @@ suite "Onchain group manager": check: merkleRootAfter.inHex() != merkleRootBefore.inHex() manager.latestIndex == 1 + await manager.stop() asyncTest "register: callback is called": let manager = await setup() @@ -354,6 +363,7 @@ suite "Onchain group manager": check: manager.rlnInstance.getMetadata().get().validRoots == manager.validRoots.toSeq() + await manager.stop() asyncTest "withdraw: should guard against uninitialized state": let manager = await setup() @@ -361,6 +371,7 @@ suite "Onchain group manager": expect(ValueError): await manager.withdraw(idSecretHash) + await manager.stop() asyncTest "validateRoot: should validate good root": let manager = await setup() @@ -402,6 +413,7 @@ suite "Onchain group manager": check: validated + await manager.stop() asyncTest "validateRoot: should reject bad root": let manager = await setup() @@ -432,6 +444,7 @@ suite "Onchain group manager": check: validated == false + await manager.stop() asyncTest "verifyProof: should verify valid proof": let manager = await setup() @@ -474,6 +487,7 @@ suite "Onchain group manager": check: verifiedRes.get() + await manager.stop() asyncTest "verifyProof: should reject invalid proof": let manager = await setup() @@ -510,6 +524,7 @@ suite "Onchain group manager": check: verifiedRes.get() == false + await manager.stop() asyncTest "backfillRootQueue: should backfill roots in event of chain reorg": let manager = await setup() @@ -554,6 +569,7 @@ suite "Onchain group manager": manager.validRoots.len() == credentialCount - 1 manager.validRootBuffer.len() == 0 manager.validRoots[credentialCount - 2] == expectedLastRoot + await manager.stop() asyncTest "isReady should return false if ethRpc is none": var manager = await setup() @@ -563,6 +579,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == false + await manager.stop() asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": var manager = await setup() @@ -570,6 +587,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == false + await manager.stop() asyncTest "isReady should return true if ethRpc is ready": var manager = await setup() @@ -579,6 +597,7 @@ suite "Onchain group manager": check: (await manager.isReady()) == true + await manager.stop() ################################ diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index f5375f0066..f273d03d6c 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -315,12 +315,7 @@ proc getAndHandleEvents(g: OnchainGroupManager, initializedGuard(g) proc getLatestBlockNumber(): BlockNumber = if toBlock.isSome(): - # if toBlock = 0, that implies the latest block - # which is the case when we are syncing block-by-block - # therefore, toBlock = fromBlock + 1 - # if toBlock != 0, then we are chunking blocks - # therefore, toBlock = fromBlock + blockChunkSize (which is handled) - return max(fromBlock + 1, toBlock.get()) + return toBlock.get() return fromBlock let blockTable = await g.getBlockTable(fromBlock, toBlock) @@ -337,11 +332,12 @@ proc getAndHandleEvents(g: OnchainGroupManager, proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = - let latestBlock = blockheader.number.uint + let latestBlock = BlockNumber(blockheader.number) trace "block received", blockNumber = latestBlock # get logs from the last block try: - asyncSpawn g.getAndHandleEvents(latestBlock) + asyncSpawn g.getAndHandleEvents(min(g.latestProcessedBlock, latestBlock), + some(latestBlock)) except CatchableError: warn "failed to handle log: ", error=getCurrentExceptionMsg() return newHeadCallback @@ -368,28 +364,23 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = let blockChunkSize = 2_000 var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber: - info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock g.latestProcessedBlock + 1 else: - info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber g.rlnContractDeployedBlockNumber - let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) try: # we always want to sync from last processed block => latest - if fromBlock == BlockNumber(0) or - fromBlock + BlockNumber(blockChunkSize) < latestBlock: - # chunk events - while true: - let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) - let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) - info "chunking events", fromBlock = fromBlock, toBlock = toBlock - await g.getAndHandleEvents(fromBlock, some(toBlock)) - fromBlock = toBlock + 1 - if fromBlock >= currentLatestBlock: - break - else: - await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) + # chunk events + while true: + let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) + if fromBlock >= currentLatestBlock: + break + + let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) + debug "fetching events", fromBlock = fromBlock, toBlock = toBlock + await g.getAndHandleEvents(fromBlock, some(toBlock)) + fromBlock = toBlock + 1 + except CatchableError: raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())