From 2f883f88e5264964a09f6bf19bef68a410745c33 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 17 Jan 2023 16:35:08 +0530 Subject: [PATCH] feat(rln-relay): init group manager integration fix(rln-relay): integrate group manager. todo spam and reg handlers fix(rln-relay): decouple waku-relay and waku-rln-relay fix(rln-relay): compiles now fix(chat2): compilation fix(rln-relay): wip segfault fix(rln-relay): segfault fix(chat2|wakunode2): use optional field fix(rln-relay): wakunode test fix(rln-relay): uncomment fields in proto decode fix(rln-relay): used pragma on tests fix(rln-relay): include cred processing fix(rln-relay): add reg callback fix(rln-relay): args to mount fix(rln-relay): add timeout to waitForExit fix(rln-relay): use osproc term instead of posix kill fix(rln-relay): use poParentStream to prevent deadlock fix(rln-relay): remove poParentStream, remove ganache log output --- .gitignore | 4 +- apps/chat2/chat2.nim | 16 +- apps/wakunode2/wakunode2.nim | 2 +- tests/all_tests_v2.nim | 3 +- tests/v2/test_rln_group_manager_onchain.nim | 218 +++- tests/v2/test_rln_group_manager_static.nim | 39 +- tests/v2/test_waku_rln_relay.nim | 436 +------ tests/v2/test_waku_rln_relay_onchain.nim | 634 ---------- tests/v2/test_wakunode_rln_relay.nim | 27 +- waku/v2/node/waku_node.nim | 20 +- waku/v2/protocol/waku_rln_relay.nim | 6 +- .../waku_rln_relay/conversion_utils.nim | 46 +- .../group_manager/group_manager_base.nim | 87 +- .../group_manager/on_chain/group_manager.nim | 150 ++- .../group_manager/static/group_manager.nim | 40 +- .../waku_rln_relay/protocol_types.nim | 48 +- waku/v2/protocol/waku_rln_relay/rln_relay.nim | 421 +++++++ waku/v2/protocol/waku_rln_relay/utils.nim | 1042 ----------------- 18 files changed, 934 insertions(+), 2305 deletions(-) delete mode 100644 tests/v2/test_waku_rln_relay_onchain.nim create mode 100644 waku/v2/protocol/waku_rln_relay/rln_relay.nim delete mode 100644 waku/v2/protocol/waku_rln_relay/utils.nim diff --git a/.gitignore b/.gitignore index d4792a976b..c37a3a3790 100644 --- a/.gitignore +++ b/.gitignore @@ -37,8 +37,10 @@ node_modules/ # Ignore Jetbrains IDE files .idea/ + +# RLN / keystore rlnCredentials.txt -testPath.txt +rlnKeystore.json # Nimbus Build System nimbus-build-system.paths diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 93e1cb5da7..eb988dae59 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -9,7 +9,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[tables, strformat, strutils, times, json, options, random] +import std/[strformat, strutils, times, json, options, random] import confutils, chronicles, chronos, stew/shims/net as stewNet, eth/keys, bearssl, stew/[byteutils, results], nimcrypto/pbkdf2 @@ -562,7 +562,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, rlnRelayContentTopic: conf.rlnRelayContentTopic, - rlnRelayMembershipIndex: conf.rlnRelayMembershipIndex, + rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex), rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey, @@ -575,11 +575,13 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = spamHandler=some(spamHandler), registrationHandler=some(registrationHandler)) - echo "your membership index is: ", node.wakuRlnRelay.membershipIndex - echo "your rln identity trapdoor is: ", node.wakuRlnRelay.identityCredential.idTrapdoor.inHex() - echo "your rln identity nullifier is: ", node.wakuRlnRelay.identityCredential.idNullifier.inHex() - echo "your rln identity secret hash is: ", node.wakuRlnRelay.identityCredential.idSecretHash.inHex() - echo "your rln identity commitment key is: ", node.wakuRlnRelay.identityCredential.idCommitment.inHex() + let membershipIndex = node.wakuRlnRelay.groupManager.membershipIndex.get() + let identityCredential = node.wakuRlnRelay.groupManager.idCredentials.get() + echo "your membership index is: ", membershipIndex + echo "your rln identity trapdoor is: ", identityCredential.idTrapdoor.inHex() + echo "your rln identity nullifier is: ", identityCredential.idNullifier.inHex() + echo "your rln identity secret hash is: ", identityCredential.idSecretHash.inHex() + echo "your rln identity commitment key is: ", identityCredential.idCommitment.inHex() else: info "WakuRLNRelay is disabled" if conf.rlnRelay: diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index aa42ae8800..4c7486b910 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -397,7 +397,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, rlnRelayContentTopic: conf.rlnRelayContentTopic, - rlnRelayMembershipIndex: conf.rlnRelayMembershipIndex, + rlnRelayMembershipIndex: some(conf.rlnRelayMembershipIndex), rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayEthClientAddress: conf.rlnRelayEthClientAddress, rlnRelayEthAccountPrivateKey: conf.rlnRelayEthAccountPrivateKey, diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 8063669d0c..85d6394dec 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -96,7 +96,8 @@ when defined(rln): import ./v2/test_waku_rln_relay, ./v2/test_wakunode_rln_relay, - ./v2/test_waku_rln_relay_onchain + ./v2/test_rln_group_manager_onchain, + ./v2/test_rln_group_manager_static # Waku swap test suite import diff --git a/tests/v2/test_rln_group_manager_onchain.nim b/tests/v2/test_rln_group_manager_onchain.nim index d571595979..600dc5b93b 100644 --- a/tests/v2/test_rln_group_manager_onchain.nim +++ b/tests/v2/test_rln_group_manager_onchain.nim @@ -1,3 +1,5 @@ +{.used.} + when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: @@ -5,7 +7,7 @@ else: import std/[options, osproc, streams, strutils, sequtils], - stew/results, + stew/[results, byteutils], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -25,8 +27,6 @@ import ./testlib/common, ./test_utils -from posix import kill, SIGINT - proc generateCredentials(rlnInstance: ptr RLN): IdentityCredential = let credRes = membershipKeyGen(rlnInstance) return credRes.get() @@ -145,20 +145,18 @@ proc runGanache(): Process = proc stopGanache(runGanache: Process) {.used.} = let ganachePID = runGanache.processID - - # We gracefully terminate Ganache daemon by sending a SIGINT signal to the runGanache PID to trigger RPC server termination and clean-up - let returnCodeSIGINT = kill(ganachePID.int32, SIGINT) - debug "Sent SIGINT to Ganache", ganachePID=ganachePID, returnCode=returnCodeSIGINT - # We wait the daemon to exit try: - let returnCodeExit = runGanache.waitForExit() - debug "Ganache daemon terminated", returnCode=returnCodeExit - debug "Ganache daemon run log", log=runGanache.outputstream.readAll() + # We terminate Ganache daemon by sending a SIGTERM signal to the runGanache PID to trigger RPC server termination and clean-up + terminate(runGanache) + # NOTE: the below line must remain commented out, otherwise it will cause a deadlocked state + # ref: https://nim-lang.org/docs/osproc.html#waitForExit%2CProcess%2Cint + # debug "ganache logs", logs=runGanache.outputstream.readAll() + debug "Sent SIGTERM to Ganache", ganachePID=ganachePID except: - error "Ganache daemon termination failed" + error "Ganache daemon termination failed: ", err = getCurrentExceptionMsg() -proc setup(): Future[OnchainGroupManager] {.async.} = +proc setup(signer = true): Future[OnchainGroupManager] {.async.} = let rlnInstanceRes = createRlnInstance() require: rlnInstanceRes.isOk() @@ -172,14 +170,16 @@ proc setup(): Future[OnchainGroupManager] {.async.} = let accounts = await web3.provider.eth_accounts() web3.defaultAccount = accounts[1] - let (pk, _) = await createEthAccount() - - let onchainConfig = OnchainGroupManagerConfig(ethClientUrl: EthClient, - ethContractAddress: $contractAddress, - ethPrivateKey: some($pk)) + var pk = none(string) + if signer: + let (privateKey, _) = await createEthAccount() + pk = some($privateKey) - let manager {.used.} = OnchainGroupManager(config: onchainConfig, - rlnInstance: rlnInstance) + let manager = OnchainGroupManager(ethClientUrl: EthClient, + ethContractAddress: $contractAddress, + ethPrivateKey: pk, + rlnInstance: rlnInstance, + saveKeystore: false) return manager @@ -192,9 +192,9 @@ suite "Onchain group manager": await manager.init() check: - manager.config.ethRpc.isSome() - manager.config.rlnContract.isSome() - manager.config.membershipFee.isSome() + manager.ethRpc.isSome() + manager.rlnContract.isSome() + manager.membershipFee.isSome() manager.initialized asyncTest "startGroupSync: should start group sync": @@ -211,9 +211,7 @@ suite "Onchain group manager": asyncTest "startGroupSync: should sync to the state of the group": let manager = await setup() - let credentials = generateCredentials(manager.rlnInstance) - manager.idCredentials = some(credentials) await manager.init() let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() @@ -221,21 +219,22 @@ suite "Onchain group manager": merkleRootBeforeRes.isOk() let merkleRootBefore = merkleRootBeforeRes.get() - let future = newFuture[void]("startGroupSync") + let fut = newFuture[void]("startGroupSync") - proc generateCallback(fut: Future[void], idCommitment: IDCommitment): OnRegisterCallback = + proc generateCallback(fut: Future[void]): OnRegisterCallback = proc callback(registrations: seq[Membership]): Future[void] {.async.} = require: registrations.len == 1 - registrations[0].idCommitment == idCommitment + registrations[0].idCommitment == manager.idCredentials.get().idCommitment registrations[0].index == 0 fut.complete() return callback - manager.onRegister(generateCallback(future, credentials.idCommitment)) + manager.onRegister(generateCallback(fut)) + await manager.startGroupSync() - await future + await fut let merkleRootAfterRes = manager.rlnInstance.getMerkleRoot() require: @@ -260,12 +259,11 @@ suite "Onchain group manager": proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback = var futureIndex = 0 proc callback(registrations: seq[Membership]): Future[void] {.async.} = - require: - registrations.len == 1 - registrations[0].idCommitment == credentials[futureIndex].idCommitment - registrations[0].index == MembershipIndex(futureIndex) - futs[futureIndex].complete() - futureIndex += 1 + if registrations.len == 1 and + registrations[0].idCommitment == credentials[futureIndex].idCommitment and + registrations[0].index == MembershipIndex(futureIndex + 1): + futs[futureIndex].complete() + futureIndex += 1 return callback manager.onRegister(generateCallback(futures, credentials)) @@ -292,7 +290,7 @@ suite "Onchain group manager": await manager.register(dummyCommitment) asyncTest "register: should register successfully": - let manager = await setup() + let manager = await setup(false) await manager.init() await manager.startGroupSync() @@ -311,9 +309,8 @@ suite "Onchain group manager": manager.latestIndex == 1 asyncTest "register: callback is called": - let manager = await setup() + let manager = await setup(false) - var callbackCalled = false let idCommitment = generateCredentials(manager.rlnInstance).idCommitment let fut = newFuture[void]() @@ -323,7 +320,6 @@ suite "Onchain group manager": registrations.len == 1 registrations[0].idCommitment == idCommitment registrations[0].index == 0 - callbackCalled = true fut.complete() manager.onRegister(callback) @@ -333,8 +329,6 @@ suite "Onchain group manager": await manager.register(idCommitment) await fut - check: - callbackCalled asyncTest "withdraw: should guard against uninitialized state": let manager = await setup() @@ -343,6 +337,146 @@ suite "Onchain group manager": expect(ValueError): await manager.withdraw(idSecretHash) + asyncTest "validateRoot: should validate good root": + let manager = await setup() + await manager.init() + + let fut = newFuture[void]() + + proc callback(registrations: seq[Membership]): Future[void] {.async.} = + if registrations.len == 1 and + registrations[0].idCommitment == manager.idCredentials.get().idCommitment and + registrations[0].index == 0: + fut.complete() + + manager.onRegister(callback) + + await manager.startGroupSync() + await fut + + let messageBytes = "Hello".toBytes() + + # prepare the epoch + let epoch = default(Epoch) + debug "epoch in bytes", epochHex = epoch.inHex() + + # generate proof + let validProofRes = manager.generateProof(data = messageBytes, + epoch = epoch) + require: + validProofRes.isOk() + let validProof = validProofRes.get() + + # validate the root (should be true) + let validated = manager.validateRoot(validProof.merkleRoot) + + check: + validated + + asyncTest "validateRoot: should reject bad root": + let manager = await setup() + await manager.init() + await manager.startGroupSync() + + let idCredential = generateCredentials(manager.rlnInstance) + + ## Assume the registration occured out of band + manager.idCredentials = some(idCredential) + manager.membershipIndex = some(MembershipIndex(0)) + + let messageBytes = "Hello".toBytes() + + # prepare the epoch + let epoch = default(Epoch) + debug "epoch in bytes", epochHex = epoch.inHex() + + # generate proof + let validProofRes = manager.generateProof(data = messageBytes, + epoch = epoch) + require: + validProofRes.isOk() + let validProof = validProofRes.get() + + # validate the root (should be false) + let validated = manager.validateRoot(validProof.merkleRoot) + + check: + validated == false + + asyncTest "verifyProof: should verify valid proof": + let manager = await setup() + await manager.init() + + let fut = newFuture[void]() + + proc callback(registrations: seq[Membership]): Future[void] {.async.} = + if registrations.len == 1 and + registrations[0].idCommitment == manager.idCredentials.get().idCommitment and + registrations[0].index == 0: + fut.complete() + + manager.onRegister(callback) + + await manager.startGroupSync() + await fut + + let messageBytes = "Hello".toBytes() + + # prepare the epoch + let epoch = default(Epoch) + debug "epoch in bytes", epochHex = epoch.inHex() + + # generate proof + let validProofRes = manager.generateProof(data = messageBytes, + epoch = epoch) + require: + validProofRes.isOk() + let validProof = validProofRes.get() + + # verify the proof (should be true) + let verifiedRes = manager.verifyProof(messageBytes, validProof) + require: + verifiedRes.isOk() + + check: + verifiedRes.get() + + asyncTest "verifyProof: should reject invalid proof": + let manager = await setup() + await manager.init() + await manager.startGroupSync() + + let idCredential = generateCredentials(manager.rlnInstance) + await manager.register(idCredential.idCommitment) + + let idCredential2 = generateCredentials(manager.rlnInstance) + + ## Assume the registration occured out of band + manager.idCredentials = some(idCredential2) + manager.membershipIndex = some(MembershipIndex(0)) + + let messageBytes = "Hello".toBytes() + + # prepare the epoch + let epoch = default(Epoch) + debug "epoch in bytes", epochHex = epoch.inHex() + + # generate proof + let invalidProofRes = manager.generateProof(data = messageBytes, + epoch = epoch) + require: + invalidProofRes.isOk() + let invalidProof = invalidProofRes.get() + + + # verify the proof (should be false) + let verifiedRes = manager.verifyProof(messageBytes, invalidProof) + require: + verifiedRes.isOk() + + check: + verifiedRes.get() == false + ################################ ## Terminating/removing Ganache ################################ diff --git a/tests/v2/test_rln_group_manager_static.nim b/tests/v2/test_rln_group_manager_static.nim index d80841b64c..ca57b13746 100644 --- a/tests/v2/test_rln_group_manager_static.nim +++ b/tests/v2/test_rln_group_manager_static.nim @@ -1,3 +1,5 @@ +{.used.} + when (NimMajor, NimMinor) < (1, 4): {.push raises: [Defect].} else: @@ -38,12 +40,10 @@ suite "Static group manager": let rlnInstance = rlnInstanceRes.get() let credentials = generateCredentials(rlnInstance, 10) - let staticConfig = StaticGroupManagerConfig(groupSize: 10, - membershipIndex: 5, - groupKeys: credentials) - - let manager {.used.} = StaticGroupManager(config: staticConfig, - rlnInstance: rlnInstance) + let manager {.used.} = StaticGroupManager(rlnInstance: rlnInstance, + groupSize: 10, + membershipIndex: some(MembershipIndex(5)), + groupKeys: credentials) asyncTest "should initialize successfully": let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() @@ -58,34 +58,33 @@ suite "Static group manager": let merkleRootAfter = merkleRootAfterRes.get() check: manager.idCredentials.isSome() - manager.config.groupKeys.len == 10 - manager.config.groupSize == 10 - manager.config.membershipIndex == 5 - manager.config.groupKeys[5] == manager.idCredentials.get() + manager.groupKeys.len == 10 + manager.groupSize == 10 + manager.membershipIndex == some(MembershipIndex(5)) + manager.groupKeys[5] == manager.idCredentials.get() manager.latestIndex == 9 merkleRootAfter.inHex() != merkleRootBefore.inHex() asyncTest "startGroupSync: should start group sync": await manager.init() + require: + manager.validRoots.len() == 1 + manager.rlnInstance.getMerkleRoot().get() == manager.validRoots[0] await manager.startGroupSync() asyncTest "startGroupSync: should guard against uninitialized state": - let staticConfig = StaticGroupManagerConfig(groupSize: 0, - membershipIndex: 0, - groupKeys: @[]) - - let manager = StaticGroupManager(config: staticConfig, + let manager = StaticGroupManager(groupSize: 0, + membershipIndex: some(MembershipIndex(0)), + groupKeys: @[], rlnInstance: rlnInstance) expect(ValueError): await manager.startGroupSync() asyncTest "register: should guard against uninitialized state": - let staticConfig = StaticGroupManagerConfig(groupSize: 0, - membershipIndex: 0, - groupKeys: @[]) - - let manager = StaticGroupManager(config: staticConfig, + let manager = StaticGroupManager(groupSize: 0, + membershipIndex: some(MembershipIndex(0)), + groupKeys: @[], rlnInstance: rlnInstance) let dummyCommitment = default(IDCommitment) diff --git a/tests/v2/test_waku_rln_relay.nim b/tests/v2/test_waku_rln_relay.nim index 1def75f5e2..7b9b79d0aa 100644 --- a/tests/v2/test_waku_rln_relay.nim +++ b/tests/v2/test_waku_rln_relay.nim @@ -11,79 +11,14 @@ import stint, libp2p/crypto/crypto import - ../../waku/v2/node/waku_node, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_rln_relay, ../../waku/v2/protocol/waku_keystore, - ./testlib/waku2 + ./testlib/common const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" const RlnRelayContentTopic = "waku/2/rlnrelay/proto" - -suite "Waku rln relay": - - asyncTest "mount waku-rln-relay in the off-chain mode": - let - nodeKey = generateSecp256k1Key() - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await node.start() - - # preparing inputs to mount rln-relay - - # create a group of 100 membership keys - let memListRes = createMembershipList(100) - require: - memListRes.isOk() - - let (groupCredentials, root) = memListRes.get() - require: - groupCredentials.len == 100 - let - # convert the keys to IdentityCredential structs - groupIdCredentialsRes = groupCredentials.toIdentityCredentials() - require: - groupIdCredentialsRes.isOk() - - let - groupIdCredentials = groupIdCredentialsRes.get() - # extract the id commitments - groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment) - debug "groupIdCredentials", groupIdCredentials - debug "groupIDCommitments", groupIDCommitments - - # index indicates the position of a membership credential in the static list of group keys i.e., groupIdCredentials - # the corresponding credential will be used to mount rlnRelay on the current node - # index also represents the index of the leaf in the Merkle tree that contains node's commitment key - let index = MembershipIndex(5) - - # -------- mount rln-relay in the off-chain mode - await node.mountRelay(@[RlnRelayPubsubTopic]) - let mountRes = node.wakuRelay.mountRlnRelayStatic(group = groupIDCommitments, - memIdCredential = groupIdCredentials[index], - memIndex = index, - pubsubTopic = RlnRelayPubsubTopic, - contentTopic = RlnRelayContentTopic) - require: - mountRes.isOk() - - let wakuRlnRelay = mountRes.get() - - # get the root of Merkle tree which is constructed inside the mountRlnRelay proc - let calculatedRootRes = wakuRlnRelay.rlnInstance.getMerkleRoot() - require: - calculatedRootRes.isOk() - let calculatedRoot = calculatedRootRes.get().inHex() - debug "calculated root by mountRlnRelay", calculatedRoot - - # this part checks whether the Merkle tree is constructed correctly inside the mountRlnRelay proc - # this check is done by comparing the tree root resulted from mountRlnRelay i.e., calculatedRoot - # against the root which is the expected root - check: - calculatedRoot == root - - await node.stop() - suite "Waku rln relay": test "key_gen Nim Wrappers": @@ -452,7 +387,7 @@ suite "Waku rln relay": list.len == groupSize # check the number of keys root.len == HashHexSize # check the size of the calculated tree root - test "check correctness of toIdentityCredentials and calcMerkleRoot": + test "check correctness of toIdentityCredentials": let groupKeys = StaticGroupKeys # create a set of IdentityCredentials objects from groupKeys @@ -464,12 +399,21 @@ suite "Waku rln relay": # extract the id commitments let groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment) # calculate the Merkle tree root out of the extracted id commitments - let rootRes = calcMerkleRoot(groupIDCommitments) + let rlnInstance = createRLNInstance() + require: + rlnInstance.isOk() + let rln = rlnInstance.get() + + # create a Merkle tree + let membersAdded = rln.insertMembers(0, groupIDCommitments) + require: + membersAdded + let rootRes = rln.getMerkleRoot() require: rootRes.isOk() - let root = rootRes.get() + let root = rootRes.get().inHex() debug "groupIdCredentials", groupIdCredentials debug "groupIDCommitments", groupIDCommitments @@ -516,314 +460,6 @@ suite "Waku rln relay": check: decodednsp.value == rateLimitProof - test "test proofVerify and proofGen for a valid proof": - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let - # peer's index in the Merkle Tree - index = 5'u - # create an identity credential - idCredentialRes = membershipKeyGen(rln) - - require: - idCredentialRes.isOk() - - let idCredential = idCredentialRes.get() - - var members = newSeq[IDCommitment]() - # Create a Merkle tree with random members - for i in 0'u..10'u: - if (i == index): - # insert the current peer's pk - members.add(idCredential.idCommitment) - else: - # create a new identity credential - let idCredentialRes = rln.membershipKeyGen() - require: - idCredentialRes.isOk() - members.add(idCredentialRes.get().idCommitment) - - # Batch the insert - let batchInsertRes = rln.insertMembers(0, members) - require: - batchInsertRes - - # prepare the message - let messageBytes = "Hello".toBytes() - - # prepare the epoch - let epoch = default(Epoch) - debug "epoch", epochHex = epoch.inHex() - - # generate proof - let proofRes = rln.proofGen(data = messageBytes, - memKeys = idCredential, - memIndex = MembershipIndex(index), - epoch = epoch) - require: - proofRes.isOk() - let proof = proofRes.value - - # verify the proof - let verified = rln.proofVerify(data = messageBytes, - proof = proof, - validRoots = @[rln.getMerkleRoot().value()]) - - # Ensure the proof verification did not error out - - require: - verified.isOk() - - check: - verified.value() == true - - test "test proofVerify and proofGen for an invalid proof": - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let - # peer's index in the Merkle Tree - index = 5'u - # create an identity credential - idCredentialRes = membershipKeyGen(rln) - - require: - idCredentialRes.isOk() - - let idCredential = idCredentialRes.get() - - # Create a Merkle tree with random members - for i in 0'u..10'u: - var memberAdded: bool = false - if (i == index): - # insert the current peer's pk - memberAdded = rln.insertMembers(i, @[idCredential.idCommitment]) - else: - # create a new identity credential - let idCredentialRes = rln.membershipKeyGen() - require: - idCredentialRes.isOk() - memberAdded = rln.insertMembers(i, @[idCredentialRes.get().idCommitment]) - # check the member is added - require: - memberAdded - - # prepare the message - let messageBytes = "Hello".toBytes() - - # prepare the epoch - let epoch = default(Epoch) - debug "epoch in bytes", epochHex = epoch.inHex() - - - let badIndex = 4 - # generate proof - let proofRes = rln.proofGen(data = messageBytes, - memKeys = idCredential, - memIndex = MembershipIndex(badIndex), - epoch = epoch) - require: - proofRes.isOk() - let proof = proofRes.value - - # verify the proof (should not be verified) against the internal RLN tree root - let verified = rln.proofVerify(data = messageBytes, - proof = proof, - validRoots = @[rln.getMerkleRoot().value()]) - - require: - verified.isOk() - check: - verified.value() == false - - test "validate roots which are part of the acceptable window": - # Setup: - # This step consists of creating the rln instance and waku-rln-relay, - # Inserting members, and creating a valid proof with the merkle root - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let rlnRelay = WakuRLNRelay(rlnInstance:rln) - - let - # peer's index in the Merkle Tree. - index = 5'u - # create an identity credential - idCredentialRes = membershipKeyGen(rlnRelay.rlnInstance) - - require: - idCredentialRes.isOk() - - let idCredential = idCredentialRes.get() - - let membershipCount: uint = AcceptableRootWindowSize + 5'u - - var members = newSeq[IdentityCredential]() - - # Generate membership keys - for i in 0'u..membershipCount: - if (i == index): - # insert the current peer's pk - members.add(idCredential) - else: - # create a new identity credential - let idCredentialRes = rlnRelay.rlnInstance.membershipKeyGen() - require: - idCredentialRes.isOk() - members.add(idCredentialRes.get()) - - # Batch inserts into the tree - let insertedRes = rlnRelay.insertMembers(0, members.mapIt(it.idCommitment)) - require: - insertedRes.isOk() - - # Given: - # This step includes constructing a valid message with the latest merkle root - # prepare the message - let messageBytes = "Hello".toBytes() - - # prepare the epoch - let epoch = default(Epoch) - debug "epoch in bytes", epochHex = epoch.inHex() - - # generate proof - let validProofRes = rlnRelay.rlnInstance.proofGen(data = messageBytes, - memKeys = idCredential, - memIndex = MembershipIndex(index), - epoch = epoch) - require: - validProofRes.isOk() - let validProof = validProofRes.value - - # validate the root (should be true) - let verified = rlnRelay.validateRoot(validProof.merkleRoot) - - require: - verified == true - - # When: - # This test depends on the local merkle tree root being part of a - # acceptable set of roots, which is denoted by AcceptableRootWindowSize - # The following action is equivalent to a member being removed upon listening to the events emitted by the contract - - # Progress the local tree by removing members - for i in 0..AcceptableRootWindowSize - 2: - let res = rlnRelay.removeMember(MembershipIndex(i)) - # Ensure the local tree root has changed - let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot() - - require: - res.isOk() - currentMerkleRoot.isOk() - currentMerkleRoot.value() != validProof.merkleRoot - - # Then: - # we try to verify a root against this window, - # which should return true - let olderRootVerified = rlnRelay.validateRoot(validProof.merkleRoot) - - check: - olderRootVerified == true - - test "invalidate roots which are not part of the acceptable window": - # Setup: - # This step consists of creating the rln instance and waku-rln-relay, - # Inserting members, and creating a valid proof with the merkle root - - require: - AcceptableRootWindowSize < 10 - - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let rlnRelay = WakuRLNRelay(rlnInstance:rln) - - let - # peer's index in the Merkle Tree. - index = 6'u - # create an identity credential - idCredentialRes = membershipKeyGen(rlnRelay.rlnInstance) - - require: - idCredentialRes.isOk() - - let idCredential = idCredentialRes.get() - - let membershipCount: uint = AcceptableRootWindowSize + 5'u - - # Create a Merkle tree with random members - for i in 0'u..membershipCount: - var memberIsAdded: RlnRelayResult[void] - if (i == index): - # insert the current peer's pk - memberIsAdded = rlnRelay.insertMembers(i, @[idCredential.idCommitment]) - else: - # create a new identity credential - let idCredentialRes = rlnRelay.rlnInstance.membershipKeyGen() - require: - idCredentialRes.isOk() - memberIsAdded = rlnRelay.insertMembers(i, @[idCredentialRes.get().idCommitment]) - # require that the member is added - require: - memberIsAdded.isOk() - - # Given: - # This step includes constructing a valid message with the latest merkle root - # prepare the message - let messageBytes = "Hello".toBytes() - - # prepare the epoch - let epoch = default(Epoch) - debug "epoch in bytes", epochHex = epoch.inHex() - - # generate proof - let validProofRes = rlnRelay.rlnInstance.proofGen(data = messageBytes, - memKeys = idCredential, - memIndex = MembershipIndex(index), - epoch = epoch) - require: - validProofRes.isOk() - let validProof = validProofRes.value - - # validate the root (should be true) - let verified = rlnRelay.validateRoot(validProof.merkleRoot) - - require: - verified == true - - # When: - # This test depends on the local merkle tree root being part of a - # acceptable set of roots, which is denoted by AcceptableRootWindowSize - # The following action is equivalent to a member being removed upon listening to the events emitted by the contract - - # Progress the local tree by removing members - for i in 0..AcceptableRootWindowSize: - discard rlnRelay.removeMember(MembershipIndex(i)) - # Ensure the local tree root has changed - let currentMerkleRoot = rlnRelay.rlnInstance.getMerkleRoot() - require: - currentMerkleRoot.isOk() - currentMerkleRoot.value() != validProof.merkleRoot - - # Then: - # we try to verify a proof against this window, - # which should return false - let olderRootVerified = rlnRelay.validateRoot(validProof.merkleRoot) - - check: - olderRootVerified == false - test "toEpoch and fromEpoch consistency check": # check edge cases let @@ -913,47 +549,17 @@ suite "Waku rln relay": # it is a duplicate result3.value == true - test "validateMessage test": - # setup a wakurlnrelay peer with a static group---------- - - # create a group of 100 membership keys - let memListRes = createMembershipList(100) - - require: - memListRes.isOk() - let - (groupKeys, _) = memListRes.get() - # convert the keys to IdentityCredential structs - groupIdCredentialsRes = groupKeys.toIdentityCredentials() - - require: - groupIdCredentialsRes.isOk() - - let groupIdCredentials = groupIdCredentialsRes.get() - # extract the id commitments - let groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment) - debug "groupIdCredentials", groupIdCredentials - debug "groupIDCommitments", groupIDCommitments - - # index indicates the position of an identity credential in the static list of group keys i.e., groupIdCredentials - # the corresponding identity credential will be used to mount rlnRelay on the current node - # index also represents the index of the leaf in the Merkle tree that contains node's commitment key + asyncTest "validateMessage test": let index = MembershipIndex(5) - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let - wakuRlnRelay = WakuRLNRelay(membershipIndex: index, - identityCredential: groupIdCredentials[index], rlnInstance: rln) - - # add members - let commitmentAddRes = wakuRlnRelay.addAll(groupIDCommitments) + let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayPubsubTopic: RlnRelayPubsubTopic, + rlnRelayContentTopic: RlnRelayContentTopic, + rlnRelayMembershipIndex: some(index.uint)) + let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) require: - commitmentAddRes.isOk() + wakuRlnRelayRes.isOk() + let wakuRlnRelay = wakuRlnRelayRes.get() # get the current epoch time let time = epochTime() diff --git a/tests/v2/test_waku_rln_relay_onchain.nim b/tests/v2/test_waku_rln_relay_onchain.nim deleted file mode 100644 index d643484375..0000000000 --- a/tests/v2/test_waku_rln_relay_onchain.nim +++ /dev/null @@ -1,634 +0,0 @@ - -# contains rln-relay tests that require interaction with Ganache i.e., onchain tests -{.used.} - -import - std/[options, osproc, streams, strutils, sequtils], - testutils/unittests, chronos, chronicles, stint, web3, json, - stew/byteutils, stew/shims/net as stewNet, - libp2p/crypto/crypto, - eth/keys, - ../../waku/v2/protocol/waku_keystore, - ../../waku/v2/protocol/waku_rln_relay, - ../../waku/v2/node/waku_node, - ./testlib/common, - ./testlib/waku2, - ./test_utils - -from posix import kill, SIGINT - -const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" -const RlnRelayContentTopic = "waku/2/rlnrelay/proto" - -# contract ABI -contract(MembershipContract): - proc register(pubkey: Uint256) # external payable - proc MemberRegistered(pubkey: Uint256, index: Uint256) {.event.} - # proc registerBatch(pubkeys: seq[Uint256]) # external payable - # proc withdraw(secret: Uint256, pubkeyIndex: Uint256, receiver: Address) - # proc withdrawBatch( secrets: seq[Uint256], pubkeyIndex: seq[Uint256], receiver: seq[Address]) - -# a util function used for testing purposes -# it deploys membership contract on Ganache (or any Eth client available on EthClient address) -# must be edited if used for a different contract than membership contract -proc uploadRLNContract*(ethClientAddress: string): Future[Address] {.async.} = - let web3 = await newWeb3(ethClientAddress) - debug "web3 connected to", ethClientAddress - - # fetch the list of registered accounts - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[1] - let add = web3.defaultAccount - debug "contract deployer account address ", add - - let balance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest") - debug "Initial account balance: ", balance - - # deploy the poseidon hash contract and gets its address - let - hasherReceipt = await web3.deployContract(PoseidonHasherCode) - hasherAddress = hasherReceipt.contractAddress.get - debug "hasher address: ", hasherAddress - - - # encode membership contract inputs to 32 bytes zero-padded - let - membershipFeeEncoded = encode(MembershipFee).data - depthEncoded = encode(MerkleTreeDepth.u256).data - hasherAddressEncoded = encode(hasherAddress).data - # this is the contract constructor input - contractInput = membershipFeeEncoded & depthEncoded & hasherAddressEncoded - - - debug "encoded membership fee: ", membershipFeeEncoded - debug "encoded depth: ", depthEncoded - debug "encoded hasher address: ", hasherAddressEncoded - debug "encoded contract input:", contractInput - - # deploy membership contract with its constructor inputs - let receipt = await web3.deployContract(MembershipContractCode, - contractInput = contractInput) - let contractAddress = receipt.contractAddress.get - debug "Address of the deployed membership contract: ", contractAddress - - let newBalance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest") - debug "Account balance after the contract deployment: ", newBalance - - await web3.close() - debug "disconnected from ", ethClientAddress - - return contractAddress - - -proc createEthAccount(): Future[(keys.PrivateKey, Address)] {.async.} = - let web3 = await newWeb3(EthClient) - let accounts = await web3.provider.eth_accounts() - let gasPrice = int(await web3.provider.eth_gasPrice()) - web3.defaultAccount = accounts[0] - - let pk = keys.PrivateKey.random(rng[]) - let acc = Address(toCanonicalAddress(pk.toPublicKey())) - - var tx:EthSend - tx.source = accounts[0] - tx.value = some(ethToWei(10.u256)) - tx.to = some(acc) - tx.gasPrice = some(gasPrice) - - # Send 10 eth to acc - discard await web3.send(tx) - let balance = await web3.provider.eth_getBalance(acc, "latest") - assert(balance == ethToWei(10.u256)) - - return (pk, acc) - - -# Installs Ganache Daemon -proc installGanache() = - # We install Ganache. - # Packages will be installed to the ./build folder through the --prefix option - let installGanache = startProcess("npm", args = ["install", "ganache", "--prefix", "./build"], options = {poUsePath}) - let returnCode = installGanache.waitForExit() - debug "Ganache install log", returnCode=returnCode, log=installGanache.outputstream.readAll() - -# Uninstalls Ganache Daemon -proc uninstallGanache() = - # We uninstall Ganache - # Packages will be uninstalled from the ./build folder through the --prefix option. - # Passed option is - # --save: Package will be removed from your dependencies. - # See npm documentation https://docs.npmjs.com/cli/v6/commands/npm-uninstall for further details - let uninstallGanache = startProcess("npm", args = ["uninstall", "ganache", "--save", "--prefix", "./build"], options = {poUsePath}) - let returnCode = uninstallGanache.waitForExit() - debug "Ganache uninstall log", returnCode=returnCode, log=uninstallGanache.outputstream.readAll() - -# Runs Ganache daemon -proc runGanache(): Process = - # We run directly "node node_modules/ganache/dist/node/cli.js" rather than using "npx ganache", so that the daemon does not spawn in a new child process. - # In this way, we can directly send a SIGINT signal to the corresponding PID to gracefully terminate Ganache without dealing with multiple processes. - # Passed options are - # --port Port to listen on. - # --miner.blockGasLimit Sets the block gas limit in WEI. - # --wallet.defaultBalance The default account balance, specified in ether. - # See ganache documentation https://www.npmjs.com/package/ganache for more details - let runGanache = startProcess("node", args = ["./build/node_modules/ganache/dist/node/cli.js", "--port", "8540", "--miner.blockGasLimit", "300000000000000", "--wallet.defaultBalance", "10000"], options = {poUsePath}) - let ganachePID = runGanache.processID - - # We read stdout from Ganache to see when daemon is ready - var ganacheStartLog: string - var cmdline: string - while true: - if runGanache.outputstream.readLine(cmdline): - ganacheStartLog.add(cmdline) - if cmdline.contains("Listening on 127.0.0.1:8540"): - break - debug "Ganache daemon is running and ready", pid=ganachePID, startLog=ganacheStartLog - return runGanache - - -# Stops Ganache daemon -proc stopGanache(runGanache: Process) = - - let ganachePID = runGanache.processID - - # We gracefully terminate Ganache daemon by sending a SIGINT signal to the runGanache PID to trigger RPC server termination and clean-up - let returnCodeSIGINT = kill(ganachePID.int32, SIGINT) - debug "Sent SIGINT to Ganache", ganachePID=ganachePID, returnCode=returnCodeSIGINT - - # We wait the daemon to exit - let returnCodeExit = runGanache.waitForExit() - debug "Ganache daemon terminated", returnCode=returnCodeExit - debug "Ganache daemon run log", log=runGanache.outputstream.readAll() - -procSuite "Waku-rln-relay": - - ################################ - ## Installing/running Ganache - ################################ - - # We install Ganache - installGanache() - - # We run Ganache - let runGanache = runGanache() - - asyncTest "event subscription": - # preparation ------------------------------ - debug "ethereum client address", EthClient - let contractAddress = await uploadRLNContract(EthClient) - # connect to the eth client - let web3 = await newWeb3(EthClient) - debug "web3 connected to", EthClient - - # fetch the list of registered accounts - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[1] - debug "contract deployer account address ", - defaultAccount = web3.defaultAccount - - # prepare a contract sender to interact with it - let contractObj = web3.contractSender(MembershipContract, - contractAddress) # creates a Sender object with a web3 field and contract address of type Address - - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - # generate the membership keys - let identityCredentialRes = membershipKeyGen(rlnInstance.get()) - require: - identityCredentialRes.isOk() - let identityCredential = identityCredentialRes.get() - let pk = identityCredential.idCommitment.toUInt256() - debug "membership commitment key", pk = pk - - # test ------------------------------ - let fut = newFuture[void]() - let s = await contractObj.subscribe(MemberRegistered, %*{"fromBlock": "0x0", - "address": contractAddress}) do( - idCommitment: Uint256, index: Uint256){.raises: [Defect], gcsafe.}: - try: - debug "onRegister", idCommitment = idCommitment, index = index - require: - idCommitment == pk - fut.complete() - except Exception as err: - # chronos still raises exceptions which inherit directly from Exception - doAssert false, err.msg - do (err: CatchableError): - echo "Error from subscription: ", err.msg - - # register a member - let tx = await contractObj.register(pk).send(value = MembershipFee) - debug "a member is registered", tx = tx - - # wait for the event to be received - await fut - - # release resources ----------------------- - await web3.close() - asyncTest "dynamic group management": - # preparation ------------------------------ - debug "ethereum client address", EthClient - let contractAddress = await uploadRLNContract(EthClient) - # connect to the eth client - let web3 = await newWeb3(EthClient) - debug "web3 connected to", EthClient - - # fetch the list of registered accounts - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[1] - debug "contract deployer account address ", - defaultAccount = web3.defaultAccount - - # prepare a contract sender to interact with it - let contractObj = web3.contractSender(MembershipContract, - contractAddress) # creates a Sender object with a web3 field and contract address of type Address - - # test ------------------------------ - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - let idCredentialRes = rln.membershipKeyGen() - require: - idCredentialRes.isOk() - let idCredential = idCredentialRes.get() - let pk = idCredential.idCommitment.toUInt256() - debug "membership commitment key", pk = pk - - # initialize the WakuRLNRelay - let rlnPeer = WakuRLNRelay(identityCredential: idCredential, - membershipIndex: MembershipIndex(0), - ethClientAddress: EthClient, - ethAccountAddress: some(accounts[0]), - membershipContractAddress: contractAddress, - rlnInstance: rln) - - # generate another identity credential - let idCredential2Res = rln.membershipKeyGen() - require: - idCredential2Res.isOk() - let idCredential2 = idCredential2Res.get() - let pk2 = idCredential2.idCommitment.toUInt256() - debug "membership commitment key", pk2 = pk2 - - let events = [newFuture[void](), newFuture[void]()] - var futIndex = 0 - var handler: GroupUpdateHandler - handler = proc (blockNumber: BlockNumber, - members: seq[MembershipTuple]): RlnRelayResult[void] = - debug "handler is called", members = members - events[futIndex].complete() - futIndex += 1 - let index = members[0].index - let insertRes = rlnPeer.insertMembers(index, members.mapIt(it.idComm)) - check: - insertRes.isOk() - return ok() - - # mount the handler for listening to the contract events - await subscribeToGroupEvents(ethClientUri = EthClient, - ethAccountAddress = some(accounts[0]), - contractAddress = contractAddress, - blockNumber = "0x0", - handler = handler) - - # register a member to the contract - let tx = await contractObj.register(pk).send(value = MembershipFee) - debug "a member is registered", tx = tx - - # register another member to the contract - let tx2 = await contractObj.register(pk2).send(value = MembershipFee) - debug "a member is registered", tx2 = tx2 - - # wait for the events to be processed - await allFutures(events) - - # release resources ----------------------- - await web3.close() - - asyncTest "insert a key to the membership contract": - # preparation ------------------------------ - debug "ethereum client address", EthClient - let contractAddress = await uploadRLNContract(EthClient) - # connect to the eth client - let web3 = await newWeb3(EthClient) - debug "web3 connected to", EthClient - - # fetch the list of registered accounts - let accounts = await web3.provider.eth_accounts() - web3.defaultAccount = accounts[1] - let add = web3.defaultAccount - debug "contract deployer account address ", add - - # prepare a contract sender to interact with it - let sender = web3.contractSender(MembershipContract, - contractAddress) # creates a Sender object with a web3 field and contract address of type Address - - # send takes the following parameters, c: ContractCallBase, value = 0.u256, gas = 3000000'u64 gasPrice = 0 - # should use send proc for the contract functions that update the state of the contract - let tx = await sender.register(20.u256).send(value = MembershipFee) # value is the membership fee - debug "The hash of registration tx: ", tx - - # let members: array[2, uint256] = [20.u256, 21.u256] - # debug "This is the batch registration result ", await sender.registerBatch(members).send(value = (members.len * MembershipFee)) # value is the membership fee - - let balance = await web3.provider.eth_getBalance(web3.defaultAccount, "latest") - debug "Balance after registration: ", balance - - await web3.close() - debug "disconnected from", EthClient - - asyncTest "registration procedure": - # preparation ------------------------------ - # deploy the contract - let contractAddress = await uploadRLNContract(EthClient) - - # prepare rln-relay peer inputs - let - web3 = await newWeb3(EthClient) - await web3.close() - - # create an RLN instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - - # generate the membership keys - let identityCredentialRes = membershipKeyGen(rlnInstance.get()) - require: - identityCredentialRes.isOk() - let identityCredential = identityCredentialRes.get() - - # create an Ethereum private key and the corresponding account - let (ethPrivKey, ethacc) = await createEthAccount() - - # test ------------------------------ - # initialize the WakuRLNRelay - let rlnPeer = WakuRLNRelay(identityCredential: identityCredential, - membershipIndex: MembershipIndex(0), - ethClientAddress: EthClient, - ethAccountPrivateKey: some(ethPrivKey), - ethAccountAddress: some(ethacc), - membershipContractAddress: contractAddress) - - # register the rln-relay peer to the membership contract - let isSuccessful = await rlnPeer.register() - check: - isSuccessful.isOk() - - - asyncTest "mounting waku rln-relay: check correct Merkle tree construction in the static/off-chain group management": - # preparation ------------------------------ - let - nodeKey = generateSecp256k1Key() - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await node.start() - - # create current peer's pk - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - # generate an identity credential - let idCredentialRes = rln.membershipKeyGen() - require: - idCredentialRes.isOk() - - let idCredential = idCredentialRes.get() - # current peer index in the Merkle tree - let index = uint(5) - - # Create a group of 10 members - var group = newSeq[IDCommitment]() - for i in 0'u..10'u: - var memberAdded: bool = false - if (i == index): - # insert the current peer's pk - group.add(idCredential.idCommitment) - memberAdded = rln.insertMembers(i, @[idCredential.idCommitment]) - doAssert(memberAdded) - debug "member key", key = idCredential.idCommitment.inHex - else: - let idCredentialRes = rln.membershipKeyGen() - require: - idCredentialRes.isOk() - let idCredential = idCredentialRes.get() - group.add(idCredential.idCommitment) - let memberAdded = rln.insertMembers(i, @[idCredential.idCommitment]) - require: - memberAdded - debug "member key", key = idCredential.idCommitment.inHex - - let expectedRoot = rln.getMerkleRoot().value().inHex - debug "expected root ", expectedRoot - - # test ------------------------------ - # start rln-relay - await node.mountRelay(@[RlnRelayPubsubTopic]) - let mountRes = mountRlnRelayStatic(wakuRelay = node.wakuRelay, - group = group, - memIdCredential = idCredential, - memIndex = index, - pubsubTopic = RlnRelayPubsubTopic, - contentTopic = RlnRelayContentTopic) - - require: - mountRes.isOk() - - let wakuRlnRelay = mountRes.get() - - let calculatedRoot = wakuRlnRelay.rlnInstance.getMerkleRoot().value().inHex() - debug "calculated root ", calculatedRoot - - check: - expectedRoot == calculatedRoot - - await node.stop() - - asyncTest "mounting waku rln-relay: check correct Merkle tree construction in the dynamic/onchain group management": - # preparation ------------------------------ - let - nodeKey = generateSecp256k1Key() - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await node.start() - - # deploy the contract - let contractAddress = await uploadRLNContract(EthClient) - - # prepare rln-relay inputs - let - web3 = await newWeb3(EthClient) - accounts = await web3.provider.eth_accounts() - # choose one of the existing accounts for the rln-relay peer - ethAccountAddress = accounts[0] - web3.defaultAccount = accounts[0] - - - - # create an rln instance - let rlnInstance = createRLNInstance() - require: - rlnInstance.isOk() - let rln = rlnInstance.get() - - # create two identity credentials - let - idCredential1Res = rln.membershipKeyGen() - idCredential2Res = rln.membershipKeyGen() - require: - idCredential1Res.isOk() - idCredential2Res.isOk() - - let - idCredential1 = idCredential1Res.get() - idCredential2 = idCredential2Res.get() - pk1 = idCredential1.idCommitment.toUInt256() - pk2 = idCredential2.idCommitment.toUInt256() - debug "member key1", key = idCredential1.idCommitment.inHex - debug "member key2", key = idCredential2.idCommitment.inHex - - # add the rln keys to the Merkle tree - let - memberIsAdded1 = rln.insertMembers(0, @[idCredential1.idCommitment]) - memberIsAdded2 = rln.insertMembers(1, @[idCredential2.idCommitment]) - - require: - memberIsAdded1 - memberIsAdded2 - - # get the Merkle root - let expectedRoot = rln.getMerkleRoot().value().inHex - - # prepare a contract sender to interact with it - let contractObj = web3.contractSender(MembershipContract, - contractAddress) # creates a Sender object with a web3 field and contract address of type Address - - # register the members to the contract - let tx1Hash = await contractObj.register(pk1).send(value = MembershipFee) - debug "a member is registered", tx1 = tx1Hash - - # register another member to the contract - let tx2Hash = await contractObj.register(pk2).send(value = MembershipFee) - debug "a member is registered", tx2 = tx2Hash - - # create an Ethereum private key and the corresponding account - let (ethPrivKey, ethacc) = await createEthAccount() - - - # test ------------------------------ - # start rln-relay - await node.mountRelay(@[RlnRelayPubsubTopic]) - let mountRes = await mountRlnRelayDynamic(wakuRelay = node.wakuRelay, - ethClientAddr = EthClient, - ethAccountAddress = some(ethacc), - ethAccountPrivKeyOpt = some(ethPrivKey), - memContractAddr = contractAddress, - memIdCredential = some(idCredential1), - memIndex = some(MembershipIndex(0)), - pubsubTopic = RlnRelayPubsubTopic, - contentTopic = RlnRelayContentTopic) - - require: - mountRes.isOk() - - let wakuRlnRelay = mountRes.get() - - await sleepAsync(2000.milliseconds()) # wait for the event to reach the group handler - - # rln pks are inserted into the rln peer's Merkle tree and the resulting root - # is expected to be the same as the calculatedRoot i.e., the one calculated outside of the mountRlnRelayDynamic proc - let calculatedRoot = wakuRlnRelay.rlnInstance.getMerkleRoot().value().inHex - debug "calculated root ", calculatedRoot=calculatedRoot - debug "expected root ", expectedRoot=expectedRoot - - check: - expectedRoot == calculatedRoot - - - await web3.close() - await node.stop() - - asyncTest "mounting waku rln-relay: check correct registration of peers without rln-relay credentials in dynamic/on-chain mode": - # deploy the contract - let contractAddress = await uploadRLNContract(EthClient) - - # prepare rln-relay inputs - let - web3 = await newWeb3(EthClient) - accounts = await web3.provider.eth_accounts() - # choose two of the existing accounts for the rln-relay peers - ethAccountAddress1 = accounts[0] - ethAccountAddress2 = accounts[1] - await web3.close() - - # prepare two nodes - let - nodeKey = generateSecp256k1Key() - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await node.start() - - let - nodeKey2 = generateSecp256k1Key() - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0)) - await node2.start() - - # create an Ethereum private key and the corresponding account - let (ethPrivKey, ethacc) = await createEthAccount() - - # start rln-relay on the first node, leave rln-relay credentials empty - await node.mountRelay(@[RlnRelayPubsubTopic]) - let mountRes = await mountRlnRelayDynamic(wakuRelay=node.wakuRelay, - ethClientAddr = EthClient, - ethAccountAddress = some(ethacc), - ethAccountPrivKeyOpt = some(ethPrivKey), - memContractAddr = contractAddress, - memIdCredential = none(IdentityCredential), - memIndex = none(MembershipIndex), - pubsubTopic = RlnRelayPubsubTopic, - contentTopic = RlnRelayContentTopic) - - require: - mountRes.isOk() - - let wakuRlnRelay = mountRes.get() - - # start rln-relay on the second node, leave rln-relay credentials empty - await node2.mountRelay(@[RlnRelayPubsubTopic]) - let mountRes2 = await mountRlnRelayDynamic(wakuRelay=node2.wakuRelay, - ethClientAddr = EthClient, - ethAccountAddress = some(ethacc), - ethAccountPrivKeyOpt = some(ethPrivKey), - memContractAddr = contractAddress, - memIdCredential = none(IdentityCredential), - memIndex = none(MembershipIndex), - pubsubTopic = RlnRelayPubsubTopic, - contentTopic = RlnRelayContentTopic) - - require: - mountRes2.isOk() - - let wakuRlnRelay2 = mountRes2.get() - - # the two nodes should be registered into the contract - # since nodes are spun up sequentially - # the first node has index 0 whereas the second node gets index 1 - check: - wakuRlnRelay.membershipIndex == MembershipIndex(0) - wakuRlnRelay2.membershipIndex == MembershipIndex(1) - - await node.stop() - await node2.stop() - - ################################ - ## Terminating/removing Ganache - ################################ - - # We stop Ganache daemon - stopGanache(runGanache) - - # We uninstall Ganache - uninstallGanache() diff --git a/tests/v2/test_wakunode_rln_relay.nim b/tests/v2/test_wakunode_rln_relay.nim index 631bb3b6aa..7e82522c38 100644 --- a/tests/v2/test_wakunode_rln_relay.nim +++ b/tests/v2/test_wakunode_rln_relay.nim @@ -24,7 +24,6 @@ import from std/times import epochTime - const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" procSuite "WakuNode - RLN relay": @@ -52,7 +51,7 @@ procSuite "WakuNode - RLN relay": await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(1), + rlnRelayMembershipIndex: some(MembershipIndex(1)), )) await node1.start() @@ -63,7 +62,7 @@ procSuite "WakuNode - RLN relay": await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(2), + rlnRelayMembershipIndex: some(MembershipIndex(2)), )) await node2.start() @@ -74,7 +73,7 @@ procSuite "WakuNode - RLN relay": await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(3), + rlnRelayMembershipIndex: some(MembershipIndex(3)), )) await node3.start() @@ -132,8 +131,6 @@ procSuite "WakuNode - RLN relay": rlnRelayPubSubTopic = RlnRelayPubsubTopic contentTopic = ContentTopic("/waku/2/default-content/proto") - # set up three nodes - # node1 # set up three nodes # node1 await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) @@ -142,7 +139,7 @@ procSuite "WakuNode - RLN relay": await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(1), + rlnRelayMembershipIndex: some(MembershipIndex(1)), )) await node1.start() @@ -153,7 +150,7 @@ procSuite "WakuNode - RLN relay": await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(2), + rlnRelayMembershipIndex: some(MembershipIndex(2)), )) await node2.start() @@ -164,7 +161,7 @@ procSuite "WakuNode - RLN relay": await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(3), + rlnRelayMembershipIndex: some(MembershipIndex(3)), )) await node3.start() @@ -197,10 +194,8 @@ procSuite "WakuNode - RLN relay": contentTopicBytes = contentTopic.toBytes input = concat(payload, contentTopicBytes) extraBytes: seq[byte] = @[byte(1),2,3] - rateLimitProofRes = node1.wakuRlnRelay.rlnInstance.proofGen(data = concat(input, extraBytes), # we add extra bytes to invalidate proof verification against original payload - memKeys = node1.wakuRlnRelay.identityCredential, - memIndex = MembershipIndex(1), - epoch = epoch) + rateLimitProofRes = node1.wakuRlnRelay.groupManager.generateProof(concat(input, extraBytes), # we add extra bytes to invalidate proof verification against original payload + epoch) require: rateLimitProofRes.isOk() let rateLimitProof = rateLimitProofRes.get().encode().buffer @@ -249,7 +244,7 @@ procSuite "WakuNode - RLN relay": await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(1), + rlnRelayMembershipIndex: some(MembershipIndex(1)), )) await node1.start() @@ -261,7 +256,7 @@ procSuite "WakuNode - RLN relay": await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(2), + rlnRelayMembershipIndex: some(MembershipIndex(2)), )) await node2.start() @@ -273,7 +268,7 @@ procSuite "WakuNode - RLN relay": await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: rlnRelayPubSubTopic, rlnRelayContentTopic: contentTopic, - rlnRelayMembershipIndex: MembershipIndex(3), + rlnRelayMembershipIndex: some(MembershipIndex(3)), )) await node3.start() diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 3fa97ff147..3c7f247cfe 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -933,14 +933,24 @@ when defined(rln): registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) {.async.} = info "mounting rln relay" - let rlnRelayRes = await WakuRlnRelay.new(node.wakuRelay, - rlnConf, - spamHandler, + if node.wakuRelay.isNil(): + error "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay" + return + # TODO: check whether the pubsub topic is supported at the relay level + # if rlnConf.rlnRelayPubsubTopic notin node.wakuRelay.defaultPubsubTopics: + # error "The relay protocol does not support the configured pubsub topic for WakuRlnRelay" + + let rlnRelayRes = await WakuRlnRelay.new(rlnConf, registrationHandler) if rlnRelayRes.isErr(): - error "failed to mount rln relay", error=rlnRelayRes.error + error "failed to mount WakuRlnRelay", error=rlnRelayRes.error return - node.wakuRlnRelay = rlnRelayRes.get() + let rlnRelay = rlnRelayRes.get() + let validator = generateRlnValidator(rlnRelay, spamHandler) + let pb = PubSub(node.wakuRelay) + pb.addValidator(rlnRelay.pubsubTopic, validator) + node.wakuRlnRelay = rlnRelay + ## Waku peer-exchange diff --git a/waku/v2/protocol/waku_rln_relay.nim b/waku/v2/protocol/waku_rln_relay.nim index 47b79a524d..c14681a064 100644 --- a/waku/v2/protocol/waku_rln_relay.nim +++ b/waku/v2/protocol/waku_rln_relay.nim @@ -2,16 +2,18 @@ import ./waku_rln_relay/rln, ./waku_rln_relay/constants, ./waku_rln_relay/protocol_types, + ./waku_rln_relay/group_manager, ./waku_rln_relay/protocol_metrics, ./waku_rln_relay/conversion_utils, - ./waku_rln_relay/utils, + ./waku_rln_relay/rln_relay, ./waku_rln_relay/contract export rln, constants, protocol_types, + group_manager, protocol_metrics, conversion_utils, - utils, + rln_relay, contract diff --git a/waku/v2/protocol/waku_rln_relay/conversion_utils.nim b/waku/v2/protocol/waku_rln_relay/conversion_utils.nim index 3b9c647dd4..54f24d57ae 100644 --- a/waku/v2/protocol/waku_rln_relay/conversion_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/conversion_utils.nim @@ -4,21 +4,22 @@ else: {.push raises: [].} import - std/[sequtils], + std/[sequtils, strutils, algorithm], web3, chronicles, stew/[arrayops, results, endians2], stint import - ./constants, - ./protocol_types + ./constants, + ./protocol_types import ../waku_keystore export web3, chronicles, - stint + stint, + constants logScope: topics = "waku rln_relay conversion_utils" @@ -35,6 +36,22 @@ proc toMembershipIndex*(v: UInt256): MembershipIndex = let membershipIndex: MembershipIndex = cast[MembershipIndex](v) return membershipIndex +proc inHex*(value: IdentityTrapdoor or + IdentityNullifier or + IdentitySecretHash or + IDCommitment or + MerkleNode or + Nullifier or + Epoch or + RlnIdentifier): string = + var valueHex = "" #UInt256.fromBytesLE(value) + for b in value.reversed(): + valueHex = valueHex & b.toHex() + # We pad leading zeroes + while valueHex.len < value.len * 2: + valueHex = "0" & valueHex + return toLowerAscii(valueHex) + proc appendLength*(input: openArray[byte]): seq[byte] = ## returns length prefixed version of the input ## with the following format [len<8>|input] @@ -120,27 +137,6 @@ proc toIdentityCredentials*(groupKeys: seq[(string, string, string, string)]): R return err("could not convert the group key to bytes: " & err.msg) return ok(groupIdCredentials) -# Converts a sequence of tuples containing 2 string (i.e. identity secret hash and commitment) to an IndentityCredential -proc toIdentityCredentials*(groupKeys: seq[(string, string)]): RlnRelayResult[seq[ - IdentityCredential]] = - ## groupKeys is sequence of membership key tuples in the form of (identity key, identity commitment) all in the hexadecimal format - ## the toIdentityCredentials proc populates a sequence of IdentityCredentials using the supplied groupKeys - ## Returns an error if the conversion fails - - var groupIdCredentials = newSeq[IdentityCredential]() - - for i in 0..groupKeys.len-1: - try: - let - idSecretHash = IdentitySecretHash(@(hexToUint[CredentialByteSize](groupKeys[i][0]).toBytesLE())) - idCommitment = IDCommitment(@(hexToUint[CredentialByteSize](groupKeys[i][1]).toBytesLE())) - groupIdCredentials.add(IdentityCredential(idSecretHash: idSecretHash, - idCommitment: idCommitment)) - except ValueError as err: - warn "could not convert the group key to bytes", err = err.msg - return err("could not convert the group key to bytes: " & err.msg) - return ok(groupIdCredentials) - proc toEpoch*(t: uint64): Epoch = ## converts `t` to `Epoch` in little-endian order let bytes = toBytes(t, Endianness.littleEndian) diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim b/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim index 12518e9623..80a043d62e 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim @@ -1,15 +1,18 @@ import - ../protocol_types + ../protocol_types, + ../rln import options, chronos, - stew/results + stew/results, + std/[deques, sequtils] export options, chronos, results, - protocol_types + protocol_types, + deques # This module contains the GroupManager interface # The GroupManager is responsible for managing the group state @@ -26,42 +29,43 @@ type OnWithdrawCallback* = proc (withdrawals: seq[Membership]): Future[void] {.g type GroupManagerResult*[T] = Result[T, string] type - GroupManager*[Config] = ref object of RootObj + GroupManager* = ref object of RootObj idCredentials*: Option[IdentityCredential] + membershipIndex*: Option[MembershipIndex] registerCb*: Option[OnRegisterCallback] withdrawCb*: Option[OnWithdrawCallback] - config*: Config rlnInstance*: ptr RLN initialized*: bool latestIndex*: MembershipIndex + validRoots*: Deque[MerkleNode] # This proc is used to initialize the group manager # Any initialization logic should be implemented here method init*(g: GroupManager): Future[void] {.base,gcsafe.} = - return err("init proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "init proc for " & $g.type & " is not implemented yet") # This proc is used to start the group sync process # It should be used to sync the group state with the rest of the group members method startGroupSync*(g: GroupManager): Future[void] {.base,gcsafe.} = - return err("startGroupSync proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "startGroupSync proc for " & $g.type & " is not implemented yet") # This proc is used to register a new identity commitment into the merkle tree # The user may or may not have the identity secret to this commitment # It should be used when detecting new members in the group, and syncing the group state method register*(g: GroupManager, idCommitment: IDCommitment): Future[void] {.base,gcsafe.} = - return err("register proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "register proc for " & $g.type & " is not implemented yet") # This proc is used to register a new identity commitment into the merkle tree # The user should have the identity secret to this commitment # It should be used when the user wants to join the group method register*(g: GroupManager, credentials: IdentityCredential): Future[void] {.base,gcsafe.} = - return err("register proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "register proc for " & $g.type & " is not implemented yet") # This proc is used to register a batch of new identity commitments into the merkle tree # The user may or may not have the identity secret to these commitments # It should be used when detecting a batch of new members in the group, and syncing the group state method registerBatch*(g: GroupManager, idCommitments: seq[IDCommitment]): Future[void] {.base,gcsafe.} = - return err("registerBatch proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "registerBatch proc for " & $g.type & " is not implemented yet") # This proc is used to set a callback that will be called when a new identity commitment is registered # The callback may be called multiple times, and should be used to for any post processing @@ -71,14 +75,73 @@ method onRegister*(g: GroupManager, cb: OnRegisterCallback) {.base,gcsafe.} = # This proc is used to withdraw/remove an identity commitment from the merkle tree # The user should have the identity secret hash to this commitment, by either deriving it, or owning it method withdraw*(g: GroupManager, identitySecretHash: IdentitySecretHash): Future[void] {.base,gcsafe.} = - return err("withdraw proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "withdraw proc for " & $g.type & " is not implemented yet") # This proc is used to withdraw/remove a batch of identity commitments from the merkle tree # The user should have the identity secret hash to these commitments, by either deriving them, or owning them method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretHash]): Future[void] {.base,gcsafe.} = - return err("withdrawBatch proc for " & $g.kind & " is not implemented yet") + raise newException(CatchableError, "withdrawBatch proc for " & $g.type & " is not implemented yet") # This proc is used to set a callback that will be called when an identity commitment is withdrawn # The callback may be called multiple times, and should be used to for any post processing method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} = g.withdrawCb = some(cb) + +# Acceptable roots for merkle root validation of incoming messages +const AcceptableRootWindowSize* = 5 + +proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void = + ## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached + let overflowCount = rootQueue.len() - AcceptableRootWindowSize + if overflowCount >= 0: + # Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`) + for i in 0..overflowCount: + rootQueue.popFirst() + # Push the next root into the queue + rootQueue.addLast(root) + +method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} = + ## returns the index of the root in the merkle tree. + ## returns -1 if the root is not found + return g.validRoots.find(root) + +method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,raises:[].} = + ## validates the root against the valid roots queue + # Check if the root is in the valid roots queue + if g.indexOfRoot(root) >= 0: + return true + return false + +template updateValidRootQueue*(g: GroupManager) = + let rootRes = g.rlnInstance.getMerkleRoot() + if rootRes.isErr(): + raise newException(ValueError, "failed to get merkle root") + let rootAfterUpdate = rootRes.get() + updateValidRootQueue(g.validRoots, rootAfterUpdate) + +method verifyProof*(g: GroupManager, + input: openArray[byte], + proof: RateLimitProof): GroupManagerResult[bool] {.base,gcsafe,raises:[].} = + ## verifies the proof against the input and the current merkle root + let proofVerifyRes = g.rlnInstance.proofVerify(input, proof, g.validRoots.items().toSeq()) + if proofVerifyRes.isErr(): + return err("proof verification failed: " & $proofVerifyRes.error()) + return ok(proofVerifyRes.value()) + +method generateProof*(g: GroupManager, + data: openArray[byte], + epoch: Epoch): GroupManagerResult[RateLimitProof] {.base,gcsafe,raises:[].} = + ## generates a proof for the given data and epoch + ## the proof is generated using the current merkle root + if g.idCredentials.isNone(): + return err("identity credentials are not set") + if g.membershipIndex.isNone(): + return err("membership index is not set") + let proofGenRes = proofGen(rlnInstance = g.rlnInstance, + data = data, + memKeys = g.idCredentials.get(), + memIndex = g.membershipIndex.get(), + epoch = epoch) + if proofGenRes.isErr(): + return err("proof generation failed: " & $proofGenRes.error()) + return ok(proofGenRes.value()) diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim index 371987913c..4e96b620af 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -14,6 +14,7 @@ import stew/[byteutils, arrayops], sequtils import + ../../../waku_keystore, ../../rln, ../../conversion_utils, ../group_manager_base @@ -37,23 +38,29 @@ contract(RlnContract): type RlnContractWithSender = Sender[RlnContract] - OnchainGroupManagerConfig* = object + OnchainGroupManager* = ref object of GroupManager ethClientUrl*: string ethPrivateKey*: Option[string] ethContractAddress*: string ethRpc*: Option[Web3] rlnContract*: Option[RlnContractWithSender] membershipFee*: Option[Uint256] - membershipIndex*: Option[MembershipIndex] latestProcessedBlock*: Option[BlockNumber] + registrationTxHash*: Option[TxHash] + chainId*: Option[Quantity] + keystorePath*: Option[string] + keystorePassword*: Option[string] + saveKeystore*: bool + registrationHandler*: Option[RegistrationHandler] - OnchainGroupManager* = ref object of GroupManager[OnchainGroupManagerConfig] +const DefaultKeyStorePath* = "rlnKeystore.json" +const DefaultKeyStorePassword* = "password" template initializedGuard*(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(ValueError, "OnchainGroupManager is not initialized") -proc register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = +method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = initializedGuard(g) let memberInserted = g.rlnInstance.insertMember(idCommitment) @@ -63,11 +70,13 @@ proc register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] if g.registerCb.isSome(): await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)]) + g.updateValidRootQueue() + g.latestIndex += 1 return -proc registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = +method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, idCommitments) @@ -83,16 +92,18 @@ proc registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): F membersSeq.add(member) await g.registerCb.get()(membersSeq) + g.updateValidRootQueue() + g.latestIndex += MembershipIndex(idCommitments.len()) return -proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} = +method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} = initializedGuard(g) - let ethRpc = g.config.ethRpc.get() - let rlnContract = g.config.rlnContract.get() - let membershipFee = g.config.membershipFee.get() + let ethRpc = g.ethRpc.get() + let rlnContract = g.rlnContract.get() + let membershipFee = g.membershipFee.get() let gasPrice = int(await ethRpc.provider.eth_gasPrice()) * 2 let idCommitment = identityCredentials.idCommitment.toUInt256() @@ -104,8 +115,11 @@ proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): except ValueError as e: raise newException(ValueError, "could not register the member: " & e.msg) + # wait for the transaction to be mined let tsReceipt = await ethRpc.getMinedTransactionReceipt(txHash) + g.registrationTxHash = some(txHash) + # the receipt topic holds the hash of signature of the raised events # TODO: make this robust. search within the event list for the event let firstTopic = tsReceipt.logs[0].topics[0] @@ -122,17 +136,17 @@ proc register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): # In TX log data, uints are encoded in big endian eventIndex = UInt256.fromBytesBE(argumentsBytes[32..^1]) - g.config.membershipIndex = some(eventIndex.toMembershipIndex()) + g.membershipIndex = some(eventIndex.toMembershipIndex()) # don't handle member insertion into the tree here, it will be handled by the event listener return -proc withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = +method withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = initializedGuard(g) # TODO: after slashing is enabled on the contract -proc withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = +method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) # TODO: after slashing is enabled on the contract @@ -164,8 +178,8 @@ type BlockTable* = OrderedTable[BlockNumber, seq[Membership]] proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} = initializedGuard(g) - let ethRpc = g.config.ethRpc.get() - let rlnContract = g.config.rlnContract.get() + let ethRpc = g.ethRpc.get() + let rlnContract = g.rlnContract.get() var normalizedToBlock: BlockNumber if toBlock.isSome(): @@ -214,9 +228,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu let indexGap = startingIndex - latestIndex if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)): raise newException(ValueError, "membership indices are not sequential") - if indexGap != 1.uint and lastIndex != latestIndex: + if indexGap != 1.uint and lastIndex != latestIndex and startingIndex != 0.uint: warn "membership index gap, may have lost connection", lastIndex, currIndex=latestIndex, indexGap = indexGap - g.config.latestProcessedBlock = some(blockNumber) + g.latestProcessedBlock = some(blockNumber) return @@ -244,7 +258,7 @@ proc newHeadErrCallback(error: CatchableError) = proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} = initializedGuard(g) - let ethRpc = g.config.ethRpc.get() + let ethRpc = g.ethRpc.get() let newHeadCallback = g.getNewHeadCallback() try: discard await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback) @@ -265,7 +279,45 @@ proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNum except: raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()) -proc startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} = +proc persistCredentials*(g: OnchainGroupManager): GroupManagerResult[void] = + if not g.saveKeystore: + return ok() + if g.idCredentials.isNone(): + return err("no credentials to persist") + + let index = g.membershipIndex.get() + let idCredential = g.idCredentials.get() + var path = DefaultKeystorePath + var password = DefaultKeystorePassword + + if g.keystorePath.isSome(): + path = g.keystorePath.get() + else: + warn "keystore: no credentials path set, using default path", path=DefaultKeystorePath + + if g.keystorePassword.isSome(): + password = g.keystorePassword.get() + else: + warn "keystore: no credentials password set, using default password", password=DefaultKeystorePassword + + let keystoreCred = MembershipCredentials( + identityCredential: idCredential, + membershipGroups: @[MembershipGroup( + membershipContract: MembershipContract( + chainId: $g.chainId.get(), + address: g.ethContractAddress + ), + treeIndex: index + )] + ) + + let persistRes = addMembershipCredentials(path, @[keystoreCred], password, RLNAppInfo) + if persistRes.isErr(): + error "keystore: failed to persist credentials", error=persistRes.error() + + return ok() + +method startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} = initializedGuard(g) # Get archive history try: @@ -273,28 +325,46 @@ proc startGroupSync*(g: OnchainGroupManager): Future[void] {.async.} = except: raise newException(ValueError, "failed to start onchain sync service: " & getCurrentExceptionMsg()) - if g.config.ethPrivateKey.isSome() and g.idCredentials.isSome(): + if g.ethPrivateKey.isSome() and g.idCredentials.isNone(): + let idCredentialRes = g.rlnInstance.membershipKeyGen() + if idCredentialRes.isErr(): + raise newException(CatchableError, "Identity credential generation failed") + let idCredential = idCredentialRes.get() + g.idCredentials = some(idCredential) + debug "registering commitment on contract" - await g.register(g.idCredentials.get()) + await g.register(idCredential) + if g.registrationHandler.isSome(): + # We need to callback with the tx hash + let handler = g.registrationHandler.get() + handler($g.registrationTxHash.get()) + + let persistRes = g.persistCredentials() + if persistRes.isErr(): + error "failed to persist credentials", error=persistRes.error() return -proc onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = +method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = g.registerCb = some(cb) -proc onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = +method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = g.withdrawCb = some(cb) -proc init*(g: OnchainGroupManager): Future[void] {.async.} = +method init*(g: OnchainGroupManager): Future[void] {.async.} = var ethRpc: Web3 var contract: RlnContractWithSender # check if the Ethereum client is reachable try: - ethRpc = await newWeb3(g.config.ethClientUrl) + ethRpc = await newWeb3(g.ethClientUrl) except: raise newException(ValueError, "could not connect to the Ethereum client") - let contractAddress = web3.fromHex(web3.Address, g.config.ethContractAddress) + # Set the chain id + let chainId = await ethRpc.provider.eth_chainId() + g.chainId = some(chainId) + + let contractAddress = web3.fromHex(web3.Address, g.ethContractAddress) contract = ethRpc.contractSender(RlnContract, contractAddress) # check if the contract exists by calling a static function @@ -304,25 +374,39 @@ proc init*(g: OnchainGroupManager): Future[void] {.async.} = except: raise newException(ValueError, "could not get the membership deposit") - if g.config.ethPrivateKey.isSome(): - let pk = string(g.config.ethPrivateKey.get()) + if g.ethPrivateKey.isSome(): + let pk = g.ethPrivateKey.get() let pkParseRes = keys.PrivateKey.fromHex(pk) if pkParseRes.isErr(): raise newException(ValueError, "could not parse the private key") ethRpc.privateKey = some(pkParseRes.get()) - g.config.ethRpc = some(ethRpc) - g.config.rlnContract = some(contract) - g.config.membershipFee = some(membershipFee) - + g.ethRpc = some(ethRpc) + g.rlnContract = some(contract) + g.membershipFee = some(membershipFee) + + if g.keystorePath.isSome() and g.keystorePassword.isSome(): + let parsedCredsRes = getMembershipCredentials(path = g.keystorePath.get(), + password = g.keystorePassword.get(), + filterMembershipContracts = @[MembershipContract(chainId: $chainId, + address: g.ethContractAddress)], + appInfo = RLNAppInfo) + if parsedCredsRes.isErr(): + raise newException(ValueError, "could not parse the keystore: " & $parsedCredsRes.error()) + let parsedCreds = parsedCredsRes.get() + if parsedCreds.len == 0: + raise newException(ValueError, "keystore is empty") + # TODO: accept an index from the config (related: https://github.com/waku-org/nwaku/pull/1466) + g.idCredentials = some(parsedCreds[0].identityCredential) + g.membershipIndex = some(parsedCreds[0].membershipGroups[0].treeIndex) ethRpc.ondisconnect = proc() = error "Ethereum client disconnected" - let fromBlock = g.config.latestProcessedBlock.get() + let fromBlock = g.latestProcessedBlock.get() info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock try: asyncSpawn g.startOnchainSync(fromBlock) except: - error "failed to restart group sync" + error "failed to restart group sync", error = getCurrentExceptionMsg() g.initialized = true diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim b/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim index 0de9e619dd..213f89bd3e 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim @@ -7,22 +7,22 @@ export group_manager_base type - StaticGroupManagerConfig* = object + StaticGroupManager* = ref object of GroupManager groupKeys*: seq[IdentityCredential] groupSize*: uint - membershipIndex*: MembershipIndex - - StaticGroupManager* = ref object of GroupManager[StaticGroupManagerConfig] template initializedGuard*(g: StaticGroupManager): untyped = if not g.initialized: raise newException(ValueError, "StaticGroupManager is not initialized") -proc init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = +method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = + if g.membershipIndex.isNone(): + raise newException(ValueError, "Membership index is not set") + let - groupSize = g.config.groupSize - groupKeys = g.config.groupKeys - membershipIndex = g.config.membershipIndex + groupSize = g.groupSize + groupKeys = g.groupKeys + membershipIndex = g.membershipIndex.get() if membershipIndex < MembershipIndex(0) or membershipIndex >= MembershipIndex(groupSize): raise newException(ValueError, "Invalid membership index. Must be within 0 and " & $(groupSize - 1) & "but was " & $membershipIndex) @@ -34,33 +34,37 @@ proc init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = if not membersInserted: raise newException(ValueError, "Failed to insert members into the merkle tree") + g.updateValidRootQueue() + g.latestIndex += MembershipIndex(idCommitments.len() - 1) g.initialized = true return -proc startGroupSync*(g: StaticGroupManager): Future[void] = +method startGroupSync*(g: StaticGroupManager): Future[void] = initializedGuard(g) - var retFuture = newFuture[void]("StaticGroupManager.sta rtGroupSync") + var retFuture = newFuture[void]("StaticGroupManager.startGroupSync") # No-op retFuture.complete() return retFuture -proc register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = +method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = initializedGuard(g) let memberInserted = g.rlnInstance.insertMember(idCommitment) if not memberInserted: raise newException(ValueError, "Failed to insert member into the merkle tree") + g.updateValidRootQueue() + g.latestIndex += 1 if g.registerCb.isSome(): await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)]) return -proc registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = +method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) let membersInserted = g.rlnInstance.insertMembers(g.latestIndex + 1, idCommitments) @@ -73,14 +77,16 @@ proc registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Fu memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i))) await g.registerCb.get()(memberSeq) + g.updateValidRootQueue() + g.latestIndex += MembershipIndex(idCommitments.len() - 1) return -proc withdraw*(g: StaticGroupManager, idSecretHash: IdentitySecretHash): Future[void] {.async.} = +method withdraw*(g: StaticGroupManager, idSecretHash: IdentitySecretHash): Future[void] {.async.} = initializedGuard(g) - let groupKeys = g.config.groupKeys + let groupKeys = g.groupKeys for i in 0.. epoch2: + return epoch1 - epoch2 + else: + return epoch2 - epoch1 + +proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, + timeOption: Option[float64] = none(float64)): MessageValidationResult = + ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., + ## the `msg`'s epoch is within MaxEpochGap of the current epoch + ## the `msg` has valid rate limit proof + ## the `msg` does not violate the rate limit + ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) + ## if `timeOption` is supplied, then the current epoch is calculated based on that + let decodeRes = RateLimitProof.init(msg.proof) + if decodeRes.isErr(): + return MessageValidationResult.Invalid + + let proof = decodeRes.get() + + # track message count for metrics + waku_rln_messages_total.inc() + + # checks if the `msg`'s epoch is far from the current epoch + # it corresponds to the validation of rln external nullifier + var epoch: Epoch + if timeOption.isSome(): + epoch = calcEpoch(timeOption.get()) + else: + # get current rln epoch + epoch = getCurrentEpoch() + + debug "current epoch", currentEpoch = fromEpoch(epoch) + let + msgEpoch = proof.epoch + # calculate the gaps + gap = absDiff(epoch, msgEpoch) + + debug "message epoch", msgEpoch = fromEpoch(msgEpoch) + + # validate the epoch + if gap > MaxEpochGap: + # message's epoch is too old or too ahead + # accept messages whose epoch is within +-MaxEpochGap from the current epoch + warn "invalid message: epoch gap exceeds a threshold", gap = gap, + payload = string.fromBytes(msg.payload), msgEpoch = fromEpoch(proof.epoch) + waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"]) + return MessageValidationResult.Invalid + + let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot) + if not rootValidationRes: + debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex()) + waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) + return MessageValidationResult.Invalid + + # verify the proof + let + contentTopicBytes = msg.contentTopic.toBytes + input = concat(msg.payload, contentTopicBytes) + + waku_rln_proof_verification_total.inc() + waku_rln_proof_verification_duration_seconds.nanosecondTime: + let proofVerificationRes = rlnPeer.groupManager.verifyProof(input, proof) + + if proofVerificationRes.isErr(): + waku_rln_errors_total.inc(labelValues=["proof_verification"]) + warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload) + return MessageValidationResult.Invalid + if not proofVerificationRes.value(): + # invalid proof + debug "invalid message: invalid proof", payload = string.fromBytes(msg.payload) + waku_rln_invalid_messages_total.inc(labelValues=["invalid_proof"]) + return MessageValidationResult.Invalid + + # check if double messaging has happened + let hasDup = rlnPeer.hasDuplicate(msg) + if hasDup.isErr(): + waku_rln_errors_total.inc(labelValues=["duplicate_check"]) + elif hasDup.value == true: + debug "invalid message: message is spam", payload = string.fromBytes(msg.payload) + waku_rln_spam_messages_total.inc() + return MessageValidationResult.Spam + + # insert the message to the log + # the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e., + # it will never error out + discard rlnPeer.updateLog(msg) + debug "message is valid", payload = string.fromBytes(msg.payload) + let rootIndex = rlnPeer.groupManager.indexOfRoot(proof.merkleRoot) + waku_rln_valid_messages_total.observe(rootIndex.toFloat()) + return MessageValidationResult.Valid + +proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] = + ## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module + ## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence + let + contentTopicBytes = wakumessage.contentTopic.toBytes() + output = concat(wakumessage.payload, contentTopicBytes) + return output + +proc appendRLNProof*(rlnPeer: WakuRLNRelay, + msg: var WakuMessage, + senderEpochTime: float64): bool = + ## returns true if it can create and append a `RateLimitProof` to the supplied `msg` + ## returns false otherwise + ## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds. + ## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`) + + let input = msg.toRLNSignal() + let epoch = calcEpoch(senderEpochTime) + + let proofGenRes = rlnPeer.groupManager.generateProof(input, epoch) + + if proofGenRes.isErr(): + return false + + msg.proof = proofGenRes.get().encode().buffer + return true + +proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, + spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler = + ## this procedure is a thin wrapper for the pubsub addValidator method + ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic + ## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic + ## the message validation logic is according to https://rfc.vac.dev/spec/17/ + let contentTopic = wakuRlnRelay.contentTopic + proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = + trace "rln-relay topic validator is called" + let decodeRes = WakuMessage.decode(message.data) + if decodeRes.isOk(): + let + wakumessage = decodeRes.value + payload = string.fromBytes(wakumessage.payload) + + # check the contentTopic + if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic): + trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload + return pubsub.ValidationResult.Accept + + + let decodeRes = RateLimitProof.init(wakumessage.proof) + if decodeRes.isErr(): + return pubsub.ValidationResult.Reject + + let msgProof = decodeRes.get() + + # validate the message + let + validationRes = wakuRlnRelay.validateMessage(wakumessage) + proof = toHex(msgProof.proof) + epoch = fromEpoch(msgProof.epoch) + root = inHex(msgProof.merkleRoot) + shareX = inHex(msgProof.shareX) + shareY = inHex(msgProof.shareY) + nullifier = inHex(msgProof.nullifier) + case validationRes: + of Valid: + debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload + trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier + return pubsub.ValidationResult.Accept + of Invalid: + debug "message validity could not be verified, discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload + trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier + return pubsub.ValidationResult.Reject + of Spam: + debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload + trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier + if spamHandler.isSome(): + let handler = spamHandler.get() + handler(wakumessage) + return pubsub.ValidationResult.Reject + return validator + +proc mount(conf: WakuRlnConfig, + registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler) + ): Future[WakuRlnRelay] {.async.} = + var + groupManager: GroupManager + credentials: MembershipCredentials + persistCredentials = false + # create an RLN instance + let rlnInstanceRes = createRLNInstance() + if rlnInstanceRes.isErr(): + raise newException(CatchableError, "RLN instance creation failed") + let rlnInstance = rlnInstanceRes.get() + if not conf.rlnRelayDynamic: + # static setup + let parsedGroupKeysRes = StaticGroupKeys.toIdentityCredentials() + if parsedGroupKeysRes.isErr(): + raise newException(ValueError, "Static group keys are not valid") + groupManager = StaticGroupManager(groupSize: StaticGroupSize, + groupKeys: parsedGroupKeysRes.get(), + membershipIndex: conf.rlnRelayMembershipIndex, + rlnInstance: rlnInstance) + # we don't persist credentials in static mode since they exist in ./constants.nim + else: + # dynamic setup + proc useValueOrNone(s: string): Option[string] = + if s == "": none(string) else: some(s) + let + ethPrivateKey = useValueOrNone(conf.rlnRelayEthAccountPrivateKey) + rlnRelayCredPath = useValueOrNone(conf.rlnRelayCredPath) + rlnRelayCredentialsPassword = useValueOrNone(conf.rlnRelayCredentialsPassword) + groupManager = OnchainGroupManager(ethClientUrl: conf.rlnRelayEthClientAddress, + ethContractAddress: $conf.rlnRelayEthContractAddress, + ethPrivateKey: ethPrivateKey, + rlnInstance: rlnInstance, + registrationHandler: registrationHandler, + keystorePath: rlnRelayCredPath, + keystorePassword: rlnRelayCredentialsPassword, + saveKeystore: true) + + # Initialize the groupManager + await groupManager.init() + # Start the group sync + await groupManager.startGroupSync() + + return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic, + contentTopic: conf.rlnRelayContentTopic, + groupManager: groupManager) + + +proc new*(T: type WakuRlnRelay, + conf: WakuRlnConfig, + registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler) + ): Future[RlnRelayResult[WakuRlnRelay]] {.async.} = + ## Mounts the rln-relay protocol on the node. + ## The rln-relay protocol can be mounted in two modes: on-chain and off-chain. + ## Returns an error if the rln-relay protocol could not be mounted. + debug "rln-relay input validation passed" + try: + waku_rln_relay_mounting_duration_seconds.nanosecondTime: + let rlnRelay = await mount(conf, + registrationHandler) + return ok(rlnRelay) + except CatchableError as e: + return err(e.msg) + diff --git a/waku/v2/protocol/waku_rln_relay/utils.nim b/waku/v2/protocol/waku_rln_relay/utils.nim deleted file mode 100644 index 66a788ee02..0000000000 --- a/waku/v2/protocol/waku_rln_relay/utils.nim +++ /dev/null @@ -1,1042 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - std/[algorithm, sequtils, strutils, tables, times, os, deques], - chronicles, options, chronos, stint, - confutils, - strutils, - web3, json, - web3/ethtypes, - eth/keys, - libp2p/protocols/pubsub/rpc/messages, - libp2p/protocols/pubsub/pubsub, - stew/results, - stew/[byteutils, arrayops] -import - ./rln, - ./conversion_utils, - ./constants, - ./protocol_types, - ./protocol_metrics -import - ../../utils/time, - ../waku_keystore, - ../waku_message, - ../waku_relay - -logScope: - topics = "waku rln_relay" - -type WakuRlnConfig* = object - rlnRelayDynamic*: bool - rlnRelayPubsubTopic*: PubsubTopic - rlnRelayContentTopic*: ContentTopic - rlnRelayMembershipIndex*: uint - rlnRelayEthContractAddress*: string - rlnRelayEthClientAddress*: string - rlnRelayEthAccountPrivateKey*: string - rlnRelayEthAccountAddress*: string - rlnRelayCredPath*: string - rlnRelayCredentialsPassword*: string - -type - SpamHandler* = proc(wakuMessage: WakuMessage): void {.gcsafe, closure, raises: [Defect].} - RegistrationHandler* = proc(txHash: string): void {.gcsafe, closure, raises: [Defect].} - GroupUpdateHandler* = proc(blockNumber: BlockNumber, - members: seq[MembershipTuple]): RlnRelayResult[void] {.gcsafe.} - MembershipTuple* = tuple[index: MembershipIndex, idComm: IDCommitment] - -# membership contract interface -contract(MembershipContractInterface): - proc register(pubkey: Uint256) # external payable - proc MemberRegistered(pubkey: Uint256, index: Uint256) {.event.} - # TODO the followings are to be supported - # proc registerBatch(pubkeys: seq[Uint256]) # external payable - # proc withdraw(secret: Uint256, pubkeyIndex: Uint256, receiver: Address) - # proc withdrawBatch( secrets: seq[Uint256], pubkeyIndex: seq[Uint256], receiver: seq[Address]) - -proc inHex*(value: IdentityTrapdoor or - IdentityNullifier or - IdentitySecretHash or - IDCommitment or - MerkleNode or - Nullifier or - Epoch or - RlnIdentifier): string = - var valueHex = "" #UInt256.fromBytesLE(value) - for b in value.reversed(): - valueHex = valueHex & b.toHex() - # We pad leading zeroes - while valueHex.len < value.len * 2: - valueHex = "0" & valueHex - return toLowerAscii(valueHex) - -proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} = - # TODO may need to also get eth Account Private Key as PrivateKey - ## registers the idComm into the membership contract whose address is in rlnPeer.membershipContractAddress - - var web3: Web3 - try: # check if the Ethereum client is reachable - web3 = await newWeb3(ethClientAddress) - except: - return err("could not connect to the Ethereum client") - - if ethAccountAddress.isSome(): - web3.defaultAccount = ethAccountAddress.get() - # set the account private key - web3.privateKey = some(ethAccountPrivKey) - # set the gas price twice the suggested price in order for the fast mining - let gasPrice = int(await web3.provider.eth_gasPrice()) * 2 - - # when the private key is set in a web3 instance, the send proc (sender.register(pk).send(MembershipFee)) - # does the signing using the provided key - # web3.privateKey = some(ethAccountPrivateKey) - var sender = web3.contractSender(MembershipContractInterface, membershipContractAddress) # creates a Sender object with a web3 field and contract address of type Address - - debug "registering an id commitment", idComm=idComm.inHex() - let pk = idComm.toUInt256() - - var txHash: TxHash - try: # send the registration transaction and check if any error occurs - txHash = await sender.register(pk).send(value = MembershipFee, gasPrice = gasPrice) - except ValueError as e: - return err("registration transaction failed: " & e.msg) - - let tsReceipt = await web3.getMinedTransactionReceipt(txHash) - - # the receipt topic holds the hash of signature of the raised events - let firstTopic = tsReceipt.logs[0].topics[0] - # the hash of the signature of MemberRegistered(uint256,uint256) event is equal to the following hex value - if firstTopic[0..65] != "0x5a92c2530f207992057b9c3e544108ffce3beda4a63719f316967c49bf6159d2": - return err("invalid event signature hash") - - # the arguments of the raised event i.e., MemberRegistered are encoded inside the data field - # data = pk encoded as 256 bits || index encoded as 256 bits - let arguments = tsReceipt.logs[0].data - debug "tx log data", arguments=arguments - let - argumentsBytes = arguments.hexToSeqByte() - # In TX log data, uints are encoded in big endian - eventIdCommUint = UInt256.fromBytesBE(argumentsBytes[0..31]) - eventIndex = UInt256.fromBytesBE(argumentsBytes[32..^1]) - eventIdComm = eventIdCommUint.toIDCommitment() - debug "the identity commitment key extracted from tx log", eventIdComm=eventIdComm.inHex() - debug "the index of registered identity commitment key", eventIndex=eventIndex - - if eventIdComm != idComm: - return err("invalid id commitment key") - - await web3.close() - - if registrationHandler.isSome(): - let handler = registrationHandler.get - handler(toHex(txHash)) - return ok(toMembershipIndex(eventIndex)) - -proc register*(rlnPeer: WakuRLNRelay, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[RlnRelayResult[bool]] {.async.} = - ## registers the public key of the rlnPeer which is rlnPeer.identityCredential.publicKey - ## into the membership contract whose address is in rlnPeer.membershipContractAddress - let pk = rlnPeer.identityCredential.idCommitment - let regResult = await register(idComm = pk, ethAccountAddress = rlnPeer.ethAccountAddress, ethAccountPrivKey = rlnPeer.ethAccountPrivateKey.get(), ethClientAddress = rlnPeer.ethClientAddress, membershipContractAddress = rlnPeer.membershipContractAddress, registrationHandler = registrationHandler) - if regResult.isErr: - return err(regResult.error()) - return ok(true) - -proc updateValidRootQueue*(wakuRlnRelay: WakuRLNRelay, root: MerkleNode): void = - ## updates the valid Merkle root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached - let overflowCount = wakuRlnRelay.validMerkleRoots.len() - AcceptableRootWindowSize - if overflowCount >= 0: - # Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`) - for i in 0..overflowCount: - wakuRlnRelay.validMerkleRoots.popFirst() - # Push the next root into the queue - wakuRlnRelay.validMerkleRoots.addLast(root) - -proc insertMembers*(wakuRlnRelay: WakuRLNRelay, - index: MembershipIndex, - idComms: seq[IDCommitment]): RlnRelayResult[void] = - ## inserts a sequence of id commitments into the local merkle tree, and adds the changed root to the - ## queue of valid roots - ## Returns an error if the insertion fails - waku_rln_membership_insertion_duration_seconds.nanosecondTime: - let actionSucceeded = wakuRlnRelay.rlnInstance.insertMembers(index, idComms) - if not actionSucceeded: - return err("could not insert id commitments into the merkle tree") - - let rootAfterUpdate = ?wakuRlnRelay.rlnInstance.getMerkleRoot() - wakuRlnRelay.updateValidRootQueue(rootAfterUpdate) - return ok() - -proc removeMember*(wakuRlnRelay: WakuRLNRelay, index: MembershipIndex): RlnRelayResult[void] = - ## removes a commitment from the local merkle tree at `index`, and adds the changed root to the - ## queue of valid roots - ## Returns an error if the removal fails - - let actionSucceeded = wakuRlnRelay.rlnInstance.removeMember(index) - if not actionSucceeded: - return err("could not remove id commitment from the merkle tree") - - let rootAfterUpdate = ?wakuRlnRelay.rlnInstance.getMerkleRoot() - wakuRlnRelay.updateValidRootQueue(rootAfterUpdate) - return ok() - -proc validateRoot*(wakuRlnRelay: WakuRLNRelay, root: MerkleNode): bool = - ## Validate against the window of roots stored in wakuRlnRelay.validMerkleRoots - return root in wakuRlnRelay.validMerkleRoots - -proc calcMerkleRoot*(list: seq[IDCommitment]): RlnRelayResult[string] = - ## returns the root of the Merkle tree that is computed from the supplied list - ## the root is in hexadecimal format - ## Returns an error if the computation fails - - let rlnInstance = createRLNInstance() - if rlnInstance.isErr(): - return err("could not create rln instance: " & rlnInstance.error()) - let rln = rlnInstance.get() - - # create a Merkle tree - let membersAdded = rln.insertMembers(0, list) - if not membersAdded: - return err("could not insert members into the tree") - let root = rln.getMerkleRoot().value().inHex() - return ok(root) - -proc createMembershipList*(n: int): RlnRelayResult[( - seq[(string, string, string, string)], string - )] = - ## createMembershipList produces a sequence of identity credentials in the form of (identity trapdoor, identity nullifier, identity secret hash, id commitment) in the hexadecimal format - ## this proc also returns the root of a Merkle tree constructed out of the identity commitment keys of the generated list - ## the output of this proc is used to initialize a static group keys (to test waku-rln-relay in the off-chain mode) - ## Returns an error if it cannot create the membership list - - # initialize a Merkle tree - let rlnInstance = createRLNInstance() - if rlnInstance.isErr(): - return err("could not create rln instance: " & rlnInstance.error()) - let rln = rlnInstance.get() - - var output = newSeq[(string, string, string, string)]() - var idCommitments = newSeq[IDCommitment]() - - for i in 0..n-1: - # generate an identity credential - let idCredentialRes = rln.membershipKeyGen() - if idCredentialRes.isErr(): - return err("could not generate an identity credential: " & idCredentialRes.error()) - let idCredential = idCredentialRes.get() - let idTuple = (idCredential.idTrapdoor.inHex(), idCredential.idNullifier.inHex(), idCredential.idSecretHash.inHex(), idCredential.idCommitment.inHex()) - output.add(idTuple) - idCommitments.add(idCredential.idCommitment) - - # Insert members into tree - let membersAdded = rln.insertMembers(0, idCommitments) - if not membersAdded: - return err("could not insert members into the tree") - - let root = rln.getMerkleRoot().value().inHex() - return ok((output, root)) - -proc rlnRelayStaticSetUp*(rlnRelayMembershipIndex: MembershipIndex): RlnRelayResult[(Option[seq[ - IDCommitment]], Option[IdentityCredential], Option[ - MembershipIndex])] = - ## rlnRelayStaticSetUp is a proc that is used to initialize the static group keys and the static membership index - ## this proc is used to test waku-rln-relay in the off-chain mode - ## it returns the static group id commitments, the static identity credentials, and the static membership indexes - ## Returns an error if it cannot initialize the static group keys and the static membership index - let - # static group - groupKeys = StaticGroupKeys - groupSize = StaticGroupSize - - debug "rln-relay membership index", rlnRelayMembershipIndex - - # validate the user-supplied membership index - if rlnRelayMembershipIndex < MembershipIndex(0) or rlnRelayMembershipIndex >= - MembershipIndex(groupSize): - error "wrong membership index" - return ok((none(seq[IDCommitment]), none(IdentityCredential), none(MembershipIndex))) - - # prepare the outputs from the static group keys - let - # create a sequence of IdentityCredentials from the group keys (group keys are in string format) - groupIdCredentialsRes = groupKeys.toIdentityCredentials() - - if groupIdCredentialsRes.isErr(): - return err("could not convert the group keys to IdentityCredentials: " & - groupIdCredentialsRes.error()) - - let - groupIdCredentials = groupIdCredentialsRes.get() - # extract id commitment keys - groupIDCommitments = groupIdCredentials.mapIt(it.idCommitment) - groupIDCommitmentsOpt = some(groupIDCommitments) - # user selected rln membership credential - groupIdCredentialsOpt = some(groupIdCredentials[rlnRelayMembershipIndex]) - memIndexOpt = some(rlnRelayMembershipIndex) - - return ok((groupIDCommitmentsOpt, groupIdCredentialsOpt, memIndexOpt)) - -proc calcEpoch*(t: float64): Epoch = - ## gets time `t` as `flaot64` with subseconds resolution in the fractional part - ## and returns its corresponding rln `Epoch` value - let e = uint64(t/EpochUnitSeconds) - return toEpoch(e) - -proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] = - ## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same - ## epoch and nullifier as `msg`'s epoch and nullifier but different Shamir secret shares - ## otherwise, returns false - ## Returns an error if it cannot check for duplicates - - let decodeRes = RateLimitProof.init(msg.proof) - if decodeRes.isErr(): - return err("failed to decode the RLN proof") - - let proof = decodeRes.get() - - # extract the proof metadata of the supplied `msg` - let proofMD = ProofMetadata( - nullifier: proof.nullifier, - shareX: proof.shareX, - shareY: proof.shareY - ) - - # check if the epoch exists - if not rlnPeer.nullifierLog.hasKey(proof.epoch): - return ok(false) - try: - if rlnPeer.nullifierLog[proof.epoch].contains(proofMD): - # there is an identical record, ignore rhe mag - return ok(false) - - # check for a message with the same nullifier but different secret shares - let matched = rlnPeer.nullifierLog[proof.epoch].filterIt(( - it.nullifier == proofMD.nullifier) and ((it.shareX != proofMD.shareX) or - (it.shareY != proofMD.shareY))) - - if matched.len != 0: - # there is a duplicate - return ok(true) - - # there is no duplicate - return ok(false) - - except KeyError as e: - return err("the epoch was not found") - -proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] = - ## extracts the `ProofMetadata` of the supplied messages `msg` and - ## saves it in the `nullifierLog` of the `rlnPeer` - ## Returns an error if it cannot update the log - - let decodeRes = RateLimitProof.init(msg.proof) - if decodeRes.isErr(): - return err("failed to decode the RLN proof") - - let proof = decodeRes.get() - - # extract the proof metadata of the supplied `msg` - let proofMD = ProofMetadata( - nullifier: proof.nullifier, - shareX: proof.shareX, - shareY: proof.shareY - ) - debug "proof metadata", proofMD = proofMD - - # check if the epoch exists - if not rlnPeer.nullifierLog.hasKey(proof.epoch): - rlnPeer.nullifierLog[proof.epoch] = @[proofMD] - return ok(true) - - try: - # check if an identical record exists - if rlnPeer.nullifierLog[proof.epoch].contains(proofMD): - return ok(true) - # add proofMD to the log - rlnPeer.nullifierLog[proof.epoch].add(proofMD) - return ok(true) - except KeyError as e: - return err("the epoch was not found") - -proc getCurrentEpoch*(): Epoch = - ## gets the current rln Epoch time - return calcEpoch(epochTime()) - -proc absDiff*(e1, e2: Epoch): uint64 = - ## returns the absolute difference between the two rln `Epoch`s `e1` and `e2` - ## i.e., e1 - e2 - - # convert epochs to their corresponding unsigned numerical values - let - epoch1 = fromEpoch(e1) - epoch2 = fromEpoch(e2) - - # Manually perform an `abs` calculation - if epoch1 > epoch2: - return epoch1 - epoch2 - else: - return epoch2 - epoch1 - -proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, - timeOption: Option[float64] = none(float64)): MessageValidationResult = - ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., - ## the `msg`'s epoch is within MaxEpochGap of the current epoch - ## the `msg` has valid rate limit proof - ## the `msg` does not violate the rate limit - ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) - ## if `timeOption` is supplied, then the current epoch is calculated based on that - let decodeRes = RateLimitProof.init(msg.proof) - if decodeRes.isErr(): - return MessageValidationResult.Invalid - - let proof = decodeRes.get() - - # track message count for metrics - waku_rln_messages_total.inc() - - # checks if the `msg`'s epoch is far from the current epoch - # it corresponds to the validation of rln external nullifier - var epoch: Epoch - if timeOption.isSome(): - epoch = calcEpoch(timeOption.get()) - else: - # get current rln epoch - epoch = getCurrentEpoch() - - debug "current epoch", currentEpoch = fromEpoch(epoch) - let - msgEpoch = proof.epoch - # calculate the gaps - gap = absDiff(epoch, msgEpoch) - - debug "message epoch", msgEpoch = fromEpoch(msgEpoch) - - # validate the epoch - if gap > MaxEpochGap: - # message's epoch is too old or too ahead - # accept messages whose epoch is within +-MaxEpochGap from the current epoch - warn "invalid message: epoch gap exceeds a threshold", gap = gap, - payload = string.fromBytes(msg.payload) - waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"]) - return MessageValidationResult.Invalid - - if not rlnPeer.validateRoot(proof.merkleRoot): - debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) - waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"]) - return MessageValidationResult.Invalid - - # verify the proof - let - contentTopicBytes = msg.contentTopic.toBytes - input = concat(msg.payload, contentTopicBytes) - - waku_rln_proof_verification_total.inc() - waku_rln_proof_verification_duration_seconds.nanosecondTime: - let proofVerificationRes = rlnPeer.rlnInstance.proofVerify(input, proof) - - if proofVerificationRes.isErr(): - waku_rln_errors_total.inc(labelValues=["proof_verification"]) - warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload) - return MessageValidationResult.Invalid - if not proofVerificationRes.value(): - # invalid proof - debug "invalid message: invalid proof", payload = string.fromBytes(msg.payload) - waku_rln_invalid_messages_total.inc(labelValues=["invalid_proof"]) - return MessageValidationResult.Invalid - - # check if double messaging has happened - let hasDup = rlnPeer.hasDuplicate(msg) - if hasDup.isErr(): - waku_rln_errors_total.inc(labelValues=["duplicate_check"]) - elif hasDup.value == true: - debug "invalid message: message is spam", payload = string.fromBytes(msg.payload) - waku_rln_spam_messages_total.inc() - return MessageValidationResult.Spam - - # insert the message to the log - # the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e., - # it will never error out - discard rlnPeer.updateLog(msg) - debug "message is valid", payload = string.fromBytes(msg.payload) - let rootIndex = rlnPeer.validMerkleRoots.find(proof.merkleRoot) - waku_rln_valid_messages_total.observe(rootIndex.toFloat()) - return MessageValidationResult.Valid - -proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] = - ## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module - ## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence - let - contentTopicBytes = wakumessage.contentTopic.toBytes - output = concat(wakumessage.payload, contentTopicBytes) - return output - -proc appendRLNProof*(rlnPeer: WakuRLNRelay, msg: var WakuMessage, - senderEpochTime: float64): bool = - ## returns true if it can create and append a `RateLimitProof` to the supplied `msg` - ## returns false otherwise - ## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds. - ## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`) - - let input = msg.toRLNSignal() - - var proof: RateLimitProofResult = proofGen(rlnInstance = rlnPeer.rlnInstance, data = input, - memKeys = rlnPeer.identityCredential, - memIndex = rlnPeer.membershipIndex, - epoch = calcEpoch(senderEpochTime)) - - if proof.isErr(): - return false - - msg.proof = proof.value.encode().buffer - return true - -proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResult[void] = - # add members to the Merkle tree of the `rlnInstance` - ## Returns an error if it cannot add any member to the Merkle tree - let membersAdded = wakuRlnRelay.insertMembers(0, list) - if not membersAdded.isOk(): - return err("failed to add members to the Merkle tree") - return ok() - -proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = - ## assuming all the members arrive in order - ## TODO: check the index and the pubkey depending on - ## the group update operation - var handler: GroupUpdateHandler - handler = proc(blockNumber: BlockNumber, members: seq[MembershipTuple]): RlnRelayResult[void] = - let startingIndex = members[0].index - debug "starting index", startingIndex = startingIndex, members = members.mapIt(it.idComm.inHex()) - let isSuccessful = rlnPeer.insertMembers(startingIndex, members.mapIt(it.idComm)) - if isSuccessful.isErr(): - return err("failed to add new members to the Merkle tree") - else: - debug "new members added to the Merkle tree", pubkeys=members.mapIt(it.idComm.inHex()) , startingIndex=startingIndex - debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex()) - let lastIndex = members[0].index + members.len.uint - 1 - let indexGap = startingIndex - rlnPeer.lastSeenMembershipIndex - if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)): - return err("the indexes of the new members are not in order") - if indexGap != 1.uint: - warn "membership index gap, may have lost connection", lastIndex, currIndex=rlnPeer.lastSeenMembershipIndex, indexGap = indexGap - rlnPeer.lastSeenMembershipIndex = lastIndex - rlnPeer.lastProcessedBlock = blockNumber - debug "last processed block", blockNumber = blockNumber - return ok() - return handler - -proc parse*(event: type MemberRegistered, - log: JsonNode): RlnRelayResult[MembershipTuple] = - ## parses the `data` parameter of the `MemberRegistered` event `log` - ## returns an error if it cannot parse the `data` parameter - var pubkey: UInt256 - var index: UInt256 - var data: string - # Remove the 0x prefix - try: - data = strip0xPrefix(log["data"].getStr()) - except CatchableError: - return err("failed to parse the data field of the MemberRegistered event: " & getCurrentExceptionMsg()) - var offset = 0 - try: - # Parse the pubkey - offset += decode(data, offset, pubkey) - # Parse the index - offset += decode(data, offset, index) - return ok((index: index.toMembershipIndex(), - idComm: pubkey.toIDCommitment())) - except: - return err("failed to parse the data field of the MemberRegistered event") - -type BlockTable = OrderedTable[BlockNumber, seq[MembershipTuple]] -proc getHistoricalEvents*(ethClientUri: string, - contractAddress: Address, - fromBlock: string = "0x0", - toBlock: string = "latest"): Future[RlnRelayResult[BlockTable]] {.async, gcsafe.} = - ## `ethClientUri` is the URI of the Ethereum client - ## `contractAddress` is the address of the contract - ## `fromBlock` is the block number from which the events are fetched - ## `toBlock` is the block number to which the events are fetched - ## returns a table that maps block numbers to the list of members registered in that block - ## returns an error if it cannot retrieve the historical events - let web3 = await newWeb3(ethClientUri) - let contract = web3.contractSender(MembershipContractInterface, contractAddress) - # Get the historical events, and insert memberships into the tree - let historicalEvents = await contract.getJsonLogs(MemberRegistered, - fromBlock=some(fromBlock.blockId()), - toBlock=some(toBlock.blockId())) - # Create a table that maps block numbers to the list of members registered in that block - var blockTable = OrderedTable[BlockNumber, seq[MembershipTuple]]() - for log in historicalEvents: - # batch according to log.blockNumber - let blockNumber = parseHexInt(log["blockNumber"].getStr()).uint - let parsedEventRes = parse(MemberRegistered, log) - - if parsedEventRes.isErr(): - error "failed to parse the MemberRegistered event", error=parsedEventRes.error() - return err("failed to parse the MemberRegistered event") - let parsedEvent = parsedEventRes.get() - # Add the parsed event to the table - if blockTable.hasKey(blockNumber): - blockTable[blockNumber].add(parsedEvent) - else: - blockTable[blockNumber] = @[parsedEvent] - return ok(blockTable) - -proc subscribeToGroupEvents*(ethClientUri: string, - ethAccountAddress: Option[Address] = none(Address), - contractAddress: Address, - blockNumber: string = "0x0", - handler: GroupUpdateHandler) {.async, gcsafe.} = - ## connects to the eth client whose URI is supplied as `ethClientUri` - ## subscribes to the `MemberRegistered` event emitted from the `MembershipContractInterface` which is available on the supplied `contractAddress` - ## it collects all the events starting from the given `blockNumber` - ## for every received block, it calls the `handler` - let web3 = await newWeb3(ethClientUri) - let contract = web3.contractSender(MembershipContractInterface, contractAddress) - - let blockTableRes = await getHistoricalEvents(ethClientUri, - contractAddress, - fromBlock=blockNumber) - if blockTableRes.isErr(): - error "failed to retrieve historical events", error=blockTableRes.error - return - let blockTable = blockTableRes.get() - # Update MT by batch - for blockNumber, members in blockTable.pairs(): - debug "updating the Merkle tree", blockNumber=blockNumber, members=members - let res = handler(blockNumber, members) - if res.isErr(): - error "failed to update the Merkle tree", error=res.error - - # We don't need the block table after this point - discard blockTable - - var latestBlock: BlockNumber - let handleLog = proc(blockHeader: BlockHeader) {.async, gcsafe.} = - try: - let membershipRegistrationLogs = await contract.getJsonLogs(MemberRegistered, - blockHash = some(blockheader.hash)) - if membershipRegistrationLogs.len == 0: - return - var members: seq[MembershipTuple] - for log in membershipRegistrationLogs: - let parsedEventRes = parse(MemberRegistered, log) - if parsedEventRes.isErr(): - fatal "failed to parse the MemberRegistered event", error=parsedEventRes.error() - return - let parsedEvent = parsedEventRes.get() - members.add(parsedEvent) - let res = handler(blockHeader.number.uint, members) - if res.isErr(): - error "failed to update the Merkle tree", error=res.error - except CatchableError: - warn "failed to get logs", error=getCurrentExceptionMsg() - return - let newHeadCallback = proc (blockheader: BlockHeader) {.gcsafe.} = - latestBlock = blockheader.number.uint - debug "block received", blockNumber = latestBlock - # get logs from the last block - try: - asyncSpawn handleLog(blockHeader) - except CatchableError: - warn "failed to handle log: ", error=getCurrentExceptionMsg() - - let newHeadErrorHandler = proc (err: CatchableError) {.gcsafe.} = - error "Error from subscription: ", err=err.msg - discard await web3.subscribeForBlockHeaders(newHeadCallback, newHeadErrorHandler) - - web3.onDisconnect = proc() = - debug "connection to ethereum node dropped", lastBlock = latestBlock - -proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} = - ## generates the groupUpdateHandler which is called when a new member is registered, - ## and has the WakuRLNRelay instance as a closure - let handler = generateGroupUpdateHandler(rlnPeer) - await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, - ethAccountAddress = rlnPeer.ethAccountAddress, - contractAddress = rlnPeer.membershipContractAddress, - handler = handler) - -proc addRLNRelayValidator*(wakuRlnRelay: WakuRLNRelay, - wakuRelay: WakuRelay, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - spamHandler: Option[SpamHandler] = none(SpamHandler)) = - ## this procedure is a thin wrapper for the pubsub addValidator method - ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic - ## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic - ## the message validation logic is according to https://rfc.vac.dev/spec/17/ - proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = - trace "rln-relay topic validator is called" - let decodeRes = WakuMessage.decode(message.data) - if decodeRes.isOk(): - let - wakumessage = decodeRes.value - payload = string.fromBytes(wakumessage.payload) - - # check the contentTopic - if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic): - trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload - return pubsub.ValidationResult.Accept - - - let decodeRes = RateLimitProof.init(wakumessage.proof) - if decodeRes.isErr(): - return pubsub.ValidationResult.Reject - - let msgProof = decodeRes.get() - - # validate the message - let - validationRes = wakuRlnRelay.validateMessage(wakumessage) - proof = toHex(msgProof.proof) - epoch = fromEpoch(msgProof.epoch) - root = inHex(msgProof.merkleRoot) - shareX = inHex(msgProof.shareX) - shareY = inHex(msgProof.shareY) - nullifier = inHex(msgProof.nullifier) - case validationRes: - of Valid: - debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload - trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - return pubsub.ValidationResult.Accept - of Invalid: - debug "message validity could not be verified, discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload - trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - return pubsub.ValidationResult.Reject - of Spam: - debug "A spam message is found! yay! discarding:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload - trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - if spamHandler.isSome(): - let handler = spamHandler.get() - handler(wakumessage) - return pubsub.ValidationResult.Reject - # set a validator for the supplied pubsubTopic - wakuRelay.addValidator(pubsubTopic, validator) - -proc mountRlnRelayStatic*(wakuRelay: WakuRelay, - group: seq[IDCommitment], - memIdCredential: IdentityCredential, - memIndex: MembershipIndex, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - spamHandler: Option[SpamHandler] = none(SpamHandler)): RlnRelayResult[WakuRlnRelay] = - # Returns RlnRelayResult[void] to indicate the success of the call - - debug "mounting rln-relay in off-chain/static mode" - # check the peer's index and the inclusion of user's identity commitment in the group - if memIdCredential.idCommitment != group[int(memIndex)]: - return err("The peer's index is not consistent with the group") - - # create an RLN instance - let rlnInstance = createRLNInstance() - if rlnInstance.isErr(): - return err("RLN instance creation failed") - let rln = rlnInstance.get() - - # create the WakuRLNRelay - let rlnPeer = WakuRLNRelay(identityCredential: memIdCredential, - membershipIndex: memIndex, - rlnInstance: rln, - pubsubTopic: pubsubTopic, - contentTopic: contentTopic) - - # add members to the Merkle tree - let membersAdded = rlnPeer.insertMembers(0, group) - if membersAdded.isErr(): - return err("member addition to the Merkle tree failed: " & membersAdded.error) - - # adds a topic validator for the supplied pubsub topic at the relay protocol - # messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped - # the topic validator checks for the correct non-spamming proof of the message - rlnPeer.addRLNRelayValidator(wakuRelay, pubsubTopic, contentTopic, spamHandler) - debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic - - return ok(rlnPeer) - -proc mountRlnRelayDynamic*(wakuRelay: WakuRelay, - ethClientAddr: string = "", - ethAccountAddress: Option[web3.Address] = none(web3.Address), - ethAccountPrivKeyOpt: Option[keys.PrivateKey], - memContractAddr: web3.Address, - memIdCredential: Option[IdentityCredential] = none(IdentityCredential), - memIndex: Option[MembershipIndex] = none(MembershipIndex), - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - spamHandler: Option[SpamHandler] = none(SpamHandler), - registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) : Future[RlnRelayResult[WakuRlnRelay]] {.async.} = - debug "mounting rln-relay in on-chain/dynamic mode" - # create an RLN instance - let rlnInstance = createRLNInstance() - - if rlnInstance.isErr(): - return err("RLN instance creation failed.") - let rln = rlnInstance.get() - - # prepare rln membership credential - var - idCredential: IdentityCredential - rlnIndex: MembershipIndex - if memIdCredential.isNone: # no rln credentials provided - if ethAccountPrivKeyOpt.isSome: # if an ethereum private key is supplied, then create rln credentials and register to the membership contract - trace "no rln-relay key is provided, generating one" - let idCredentialRes = rln.membershipKeyGen() - if idCredentialRes.isErr(): - error "failed to generate rln-relay identity credential" - return err("failed to generate rln-relay identity credential: " & idCredentialRes.error()) - idCredential = idCredentialRes.value() - # register the rln-relay peer to the membership contract - waku_rln_registration_duration_seconds.nanosecondTime: - let regIndexRes = await register(idComm = idCredential.idCommitment, - ethAccountAddress = ethAccountAddress, - ethAccountPrivKey = ethAccountPrivKeyOpt.get(), - ethClientAddress = ethClientAddr, - membershipContractAddress = memContractAddr, - registrationHandler = registrationHandler) - # check whether registration is done - if regIndexRes.isErr(): - debug "membership registration failed", err=regIndexRes.error() - return err("membership registration failed: " & regIndexRes.error()) - rlnIndex = regIndexRes.value - debug "peer is successfully registered into the membership contract" - else: # if no eth private key is available, skip registration - debug "running waku-rln-relay in relay-only mode" - else: - debug "Peer is already registered to the membership contract" - idCredential = memIdCredential.get() - rlnIndex = memIndex.get() - - # create the WakuRLNRelay - var rlnPeer = WakuRLNRelay(identityCredential: idCredential, - membershipIndex: rlnIndex, - membershipContractAddress: memContractAddr, - ethClientAddress: ethClientAddr, - ethAccountAddress: ethAccountAddress, - ethAccountPrivateKey: ethAccountPrivKeyOpt, - rlnInstance: rln, - pubsubTopic: pubsubTopic, - contentTopic: contentTopic) - - asyncSpawn rlnPeer.handleGroupUpdates() - debug "dynamic group management is started" - # adds a topic validator for the supplied pubsub topic at the relay protocol - # messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped - # the topic validator checks for the correct non-spamming proof of the message - rlnPeer.addRLNRelayValidator(wakuRelay, pubsubTopic, contentTopic, spamHandler) - debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic - - return ok(rlnPeer) - -proc mount(wakuRelay: WakuRelay, - conf: WakuRlnConfig, - spamHandler: Option[SpamHandler] = none(SpamHandler), - registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler) - ): Future[RlnRelayResult[WakuRlnRelay]] {.async.} = - - if not conf.rlnRelayDynamic: - info " setting up waku-rln-relay in off-chain mode... " - # set up rln relay inputs - let staticSetupRes = rlnRelayStaticSetUp(MembershipIndex(conf.rlnRelayMembershipIndex)) - if staticSetupRes.isErr(): - return err("rln relay static setup failed: " & staticSetupRes.error()) - let (groupOpt, idCredentialOpt, memIndexOpt) = staticSetupRes.get() - if memIndexOpt.isNone(): - error "failed to mount WakuRLNRelay" - return err("failed to mount WakuRLNRelay") - else: - # mount rlnrelay in off-chain mode with a static group of users - let mountRes = mountRlnRelayStatic(wakuRelay, - group = groupOpt.get(), - memIdCredential = idCredentialOpt.get(), - memIndex = memIndexOpt.get(), - pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler) - - if mountRes.isErr(): - return err("Failed to mount WakuRLNRelay: " & mountRes.error()) - - let rlnRelay = mountRes.get() - - info "membership id key", idkey=idCredentialOpt.get().idSecretHash.inHex() - info "membership id commitment key", idCommitmentkey=idCredentialOpt.get().idCommitment.inHex() - - # check the correct construction of the tree by comparing the calculated root against the expected root - # no error should happen as it is already captured in the unit tests - # TODO have added this check to account for unseen corner cases, will remove it later - let - rootRes = rlnRelay.rlnInstance.getMerkleRoot() - expectedRoot = StaticGroupMerkleRoot - - if rootRes.isErr(): - return err(rootRes.error()) - - let root = rootRes.value() - - if root.inHex() != expectedRoot: - error "root mismatch: something went wrong not in Merkle tree construction" - debug "the calculated root", root - info "WakuRLNRelay is mounted successfully", pubsubtopic=conf.rlnRelayPubsubTopic, contentTopic=conf.rlnRelayContentTopic - return ok(rlnRelay) - else: # mount the rln relay protocol in the on-chain/dynamic mode - debug "setting up waku-rln-relay in on-chain mode... " - - debug "on-chain setup parameters", contractAddress=conf.rlnRelayEthContractAddress - # read related inputs to run rln-relay in on-chain mode and do type conversion when needed - let - ethClientAddr = conf.rlnRelayEthClientAddress - - var ethMemContractAddress: web3.Address - try: - ethMemContractAddress = web3.fromHex(web3.Address, conf.rlnRelayEthContractAddress) - except ValueError as err: - return err("invalid eth contract address: " & err.msg) - var ethAccountPrivKeyOpt = none(keys.PrivateKey) - var ethAccountAddressOpt = none(Address) - var credentials = none(MembershipCredentials) - var rlnRelayRes: RlnRelayResult[WakuRlnRelay] - var rlnRelayCredPath: string - var persistCredentials: bool = false - # The RLN membership contract - let rlnMembershipContract = MembershipContract(chainId: "5", # This is Goerli ChainID. TODO: pass chainId to web3 as config option - address: conf.rlnRelayEthContractAddress) - - - if conf.rlnRelayEthAccountPrivateKey != "": - ethAccountPrivKeyOpt = some(keys.PrivateKey(SkSecretKey.fromHex(conf.rlnRelayEthAccountPrivateKey).value)) - - if conf.rlnRelayEthAccountAddress != "": - var ethAccountAddress: web3.Address - try: - ethAccountAddress = web3.fromHex(web3.Address, conf.rlnRelayEthAccountAddress) - except ValueError as err: - return err("invalid eth account address: " & err.msg) - ethAccountAddressOpt = some(ethAccountAddress) - - # if the rlnRelayCredPath config option is non-empty, then rln-relay credentials should be persisted - # if the path does not contain any credential file, then a new set is generated and pesisted in the same path - # if there is a credential file, then no new credentials are generated, instead the content of the file is read and used to mount rln-relay - if conf.rlnRelayCredPath != "": - - rlnRelayCredPath = joinPath(conf.rlnRelayCredPath, RlnCredentialsFilename) - debug "rln-relay credential path", rlnRelayCredPath - - # check if there is an rln-relay credential file in the supplied path - if fileExists(rlnRelayCredPath): - - info "A RLN credential file exists in provided path", path=rlnRelayCredPath - - # retrieve rln-relay credential - waku_rln_membership_credentials_import_duration_seconds.nanosecondTime: - let readCredentialsRes = getMembershipCredentials(path = rlnRelayCredPath, - password = conf.rlnRelayCredentialsPassword, - filterMembershipContracts = @[rlnMembershipContract], - # TODO: the following can be embedded in conf - appInfo = RLNAppInfo) - - if readCredentialsRes.isErr(): - return err("RLN credentials cannot be read") - - # getMembershipCredentials returns all credentials in keystore as sequence matching the filter - let allMatchingCredentials = readCredentialsRes.get() - # if any is found, we return the first credential, otherwise credentials is none - if allMatchingCredentials.len() > 0: - credentials = some(allMatchingCredentials[0]) - else: - credentials = none(MembershipCredentials) - - else: # there is no credential file available in the supplied path - # mount the rln-relay protocol leaving rln-relay credentials arguments unassigned - # this infroms mountRlnRelayDynamic proc that new credentials should be generated and registered to the membership contract - info "no rln credential is provided" - - if credentials.isSome(): - # mount rln-relay in on-chain mode, with credentials that were read or generated - rlnRelayRes = await mountRlnRelayDynamic(wakuRelay, - memContractAddr = ethMemContractAddress, - ethClientAddr = ethClientAddr, - ethAccountAddress = ethAccountAddressOpt, - ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, - pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler, - registrationHandler = registrationHandler, - memIdCredential = some(credentials.get().identityCredential), - memIndex = some(credentials.get().membershipGroups[0].treeIndex)) # TODO: use a proper proc to get a certain membership index - else: - # mount rln-relay in on-chain mode, with the provided private key - rlnRelayRes = await mountRlnRelayDynamic(wakuRelay, - memContractAddr = ethMemContractAddress, - ethClientAddr = ethClientAddr, - ethAccountAddress = ethAccountAddressOpt, - ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, - pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler, - registrationHandler = registrationHandler) - - persistCredentials = true - - else: - # do not persist or use a persisted rln-relay credential - # a new credential will be generated during the mount process but will not be persisted - info "no need to persist or use a persisted rln-relay credential" - rlnRelayRes = await mountRlnRelayDynamic(wakuRelay, - memContractAddr = ethMemContractAddress, - ethClientAddr = ethClientAddr, - ethAccountAddress = ethAccountAddressOpt, - ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, - pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler, - registrationHandler = registrationHandler) - - if rlnRelayRes.isErr(): - return err("dynamic rln-relay could not be mounted: " & rlnRelayRes.error()) - let wakuRlnRelay = rlnRelayRes.get() - if persistCredentials: - - credentials = some(MembershipCredentials(identityCredential: wakuRlnRelay.identityCredential, - membershipGroups: @[MembershipGroup(membershipContract: rlnMembershipContract, treeIndex: wakuRlnRelay.membershipIndex)] - )) - - if addMembershipCredentials(path = rlnRelayCredPath, - credentials = @[credentials.get()], - password = conf.rlnRelayCredentialsPassword, - # TODO: the following can be embedded in conf - appInfo = RLNAppInfo).isErr(): - return err("error in storing rln credentials") - return ok(wakuRlnRelay) - -proc new*(T: type WakuRlnRelay, - wakuRelay: WakuRelay, - conf: WakuRlnConfig, - spamHandler: Option[SpamHandler] = none(SpamHandler), - registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler) - ): Future[RlnRelayResult[WakuRlnRelay]] {.async.} = - ## Mounts the rln-relay protocol on the node. - ## The rln-relay protocol can be mounted in two modes: on-chain and off-chain. - ## Returns an error if the rln-relay protocol could not be mounted. - - # check whether inputs are provided - # relay protocol is the prerequisite of rln-relay - if wakuRelay.isNil(): - return err("WakuRelay protocol is not mounted") - - # TODO: Review this. The Waku Relay instance is no longer keeping track of the default pubsub topics - # # check whether the pubsub topic is supported at the relay level - # if conf.rlnRelayPubsubTopic notin wakuRelay.defaultPubsubTopics: - # return err("The relay protocol does not support the configured pubsub topic") - - debug "rln-relay input validation passed" - waku_rln_relay_mounting_duration_seconds.nanosecondTime: - let rlnRelayRes = await mount( - wakuRelay, - conf, - spamHandler, - registrationHandler - ) - return rlnRelayRes