Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RLN proofs as a lightpush service #2768

Merged
merged 9 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions tests/node/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, tables, sequtils],
std/[options, tables, sequtils, tempfiles],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
Expand All @@ -23,6 +23,7 @@ import
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_rln_relay
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils],
../resources/payloads
Expand Down Expand Up @@ -59,7 +60,7 @@ suite "Waku Lightpush - End To End":
await server.start()

await server.mountRelay()
await server.mountLightpush()
await server.mountLightpush() # without rln-relay
client.mountLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
Expand Down Expand Up @@ -103,4 +104,83 @@ suite "Waku Lightpush - End To End":

check:
publishResponse.isErr()
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"
publishResponse.error == fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSize} bytes"

suite "RLN Proofs as a Lightpush Service":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler

server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode

serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage

asyncSetup:
handlerFuture = newPushHandlerFuture()
handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

# mount rln-relay
when defined(rln_v2):
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
)
else:
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
)

await allFutures(server.start(), client.start())
await server.start()

await server.mountRelay()
await server.mountRlnRelay(wakuRlnConfig)
await server.mountLightpush()
client.mountLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()

asyncTeardown:
await server.stop()

suite "Lightpush attaching RLN proofs":
asyncTest "Message is published when RLN enabled":
# Given a light lightpush client
let lightpushClient =
newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
lightpushClient.mountLightpushClient()

# When the client publishes a message
let publishResponse = await lightpushClient.lightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)

if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()

# Then the message is relayed to the server
NagyZoltanPeter marked this conversation as resolved.
Show resolved Hide resolved
assertResultOk publishResponse
36 changes: 10 additions & 26 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import
../waku_lightpush/common,
../waku_lightpush/protocol,
../waku_lightpush/self_req_handler,
../waku_lightpush/callbacks,
../waku_enr,
../waku_peer_exchange,
../waku_rln_relay,
Expand Down Expand Up @@ -931,33 +932,16 @@ proc mountLightPush*(
) {.async.} =
info "mounting light push"

var pushHandler: PushMessageHandler
if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message)
if validationRes.isErr():
return err(validationRes.error)

let publishedCount =
await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()
let rlnPeer =
if node.wakuRlnRelay.isNil:
shash256 marked this conversation as resolved.
Show resolved Hide resolved
debug "mounting lightpush without rln-relay"
none(WakuRLNRelay)
else:
debug "mounting lightpush with rln-relay"
some(node.wakuRlnRelay)

var pushHandler = getPushHandler(node.wakuRelay, rlnPeer)
shash256 marked this conversation as resolved.
Show resolved Hide resolved

debug "mounting lightpush with relay"
node.wakuLightPush =
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))

Expand Down
66 changes: 66 additions & 0 deletions waku/waku_lightpush/callbacks.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
../waku_core,
../waku_relay,
./common,
./protocol,
../waku_rln_relay,
../waku_rln_relay/protocol_types,
../common/ratelimit
import
std/times,
libp2p/peerid,
stew/byteutils

proc generateAndValidateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): (string, WakuMessage) =
shash256 marked this conversation as resolved.
Show resolved Hide resolved
var rlnResultInfo = ""
# TODO: check and validate if the message already has RLN proof?
shash256 marked this conversation as resolved.
Show resolved Hide resolved
if rlnPeer.isNone():
return (rlnResultInfo, message) # publishing message without RLN proof

# generate and append RLN proof
let
time = getTime().toUnix()
senderEpochTime = float64(time)
var msgWithProof = message
let appendProofRes = rlnPeer.get().appendRLNProof(msgWithProof, senderEpochTime)
if appendProofRes.isErr():
rlnResultInfo = "RLN proof generation failed: " & appendProofRes.error

return (rlnResultInfo, msgWithProof)

proc getPushHandler*(
shash256 marked this conversation as resolved.
Show resolved Hide resolved
wakuRelay: WakuRelay,
rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
if wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
debug "mounting lightpush with relay"
shash256 marked this conversation as resolved.
Show resolved Hide resolved
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let (rlnResultInfo, msgWithProof) = generateAndValidateRLNProof(rlnPeer, message)
if rlnResultInfo == "":
shash256 marked this conversation as resolved.
Show resolved Hide resolved
let validationRes = await wakuRelay.validateMessage(pubSubTopic, msgWithProof)
if validationRes.isErr():
return err(validationRes.error)

let publishedCount = await wakuRelay.publish(pubsubTopic, msgWithProof)
if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers", msg_hash = msgHash

return ok()
else:
return err(rlnResultInfo)
2 changes: 1 addition & 1 deletion waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ proc new*(
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
wl.initProtocolHandler()
return wl
return wl
Loading