Skip to content

Commit

Permalink
feat: RLN proofs as a lightpush service (#2768)
Browse files Browse the repository at this point in the history
  • Loading branch information
shash256 authored Jun 13, 2024
1 parent b522865 commit 0561e5b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 31 deletions.
86 changes: 83 additions & 3 deletions tests/node/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, tables, sequtils],
std/[options, tables, sequtils, tempfiles],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -23,6 +23,7 @@ import
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_rln_relay
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
../resources/payloads
Expand Down Expand Up @@ -59,7 +60,7 @@ suite "Waku Lightpush - End To End":
await server.start()

await server.mountRelay()
await server.mountLightpush()
await server.mountLightpush() # without rln-relay
client.mountLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
Expand Down Expand Up @@ -103,4 +104,83 @@ suite "Waku Lightpush - End To End":

check:
publishResponse.isErr()
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

suite "RLN Proofs as a Lightpush Service":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler

server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode

serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage

asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

# mount rln-relay
when defined(rln_v2):
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
)
else:
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
)

await allFutures(server.start(), client.start())
await server.start()

await server.mountRelay()
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLightpush()
client.mountLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()

asyncTeardown:
await server.stop()

suite "Lightpush attaching RLN proofs":
asyncTest "Message is published when RLN enabled":
# Given a light lightpush client
let lightpushClient =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
lightpushClient.mountLightpushClient()

# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)

if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()

# Then the message is relayed to the server
assertResultOk publishResponse
43 changes: 16 additions & 27 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_lightpush/self_req_handler,
../waku_lightpush/callbacks,
../waku_enr,
../waku_peer_exchange,
../waku_rln_relay,
Expand Down Expand Up @@ -976,34 +977,22 @@ proc mountLightPush*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
info "mounting light push"

var pushHandler =
if node.wakuRelay.isNil:
debug "mounting lightpush without relay (nil)"
getNilPushHandler()
else:
debug "mounting lightpush with relay"
let rlnPeer =
if isNil(node.wakuRlnRelay):
debug "mounting lightpush without rln-relay"
none(WakuRLNRelay)
else:
debug "mounting lightpush with rln-relay"
some(node.wakuRlnRelay)
getRelayPushHandler(node.wakuRelay, rlnPeer)

var pushHandler: PushMessageHandler
if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message)
if validationRes.isErr():
return err(validationRes.error)

let publishedCount =
await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()

debug "mounting lightpush with relay"
node.wakuLightPush =
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))

Expand Down
63 changes: 63 additions & 0 deletions waku/waku_lightpush/callbacks.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
../waku_core,
../waku_relay,
./common,
./protocol,
../waku_rln_relay,
../waku_rln_relay/protocol_types,
../common/ratelimit
import
std/times,
libp2p/peerid,
stew/byteutils

proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] =
# check if the message already has RLN proof
if message.proof.len > 0:
return ok(message)

if rlnPeer.isNone():
notice "Publishing message without RLN proof"
return ok(message)
# generate and append RLN proof
let
time = getTime().toUnix()
senderEpochTime = float64(time)
var msgWithProof = message
rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime).isOkOr:
return err(error)
return ok(msgWithProof)

proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")

proc getRelayPushHandler*(
wakuRelay: WakuRelay,
rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message)
if msgWithProof.isErr():
return err(msgWithProof.error)

(await wakuRelay.validateMessage(pubSubTopic, msgWithProof.value)).isOkOr:
return err(error)

let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof.value)
if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers", msg_hash = msgHash

return ok()
2 changes: 1 addition & 1 deletion waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ proc new*(
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
wl.initProtocolHandler()
return wl
return wl

0 comments on commit 0561e5b

Please sign in to comment.