Skip to content

Commit

Permalink
Merge cc9ef5c into 5ee4cba
Browse files Browse the repository at this point in the history
  • Loading branch information
shash256 authored May 20, 2024
2 parents 5ee4cba + cc9ef5c commit 9807c71
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 34 deletions.
29 changes: 1 addition & 28 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -931,35 +931,8 @@ proc mountLightPush*(
) {.async.} =
info "mounting light push"

var pushHandler: PushMessageHandler
if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
pushHandler = proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
let validationRes = await node.wakuRelay.validateMessage(pubSubTopic, message)
if validationRes.isErr():
return err(validationRes.error)

let publishedCount =
await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()

debug "mounting lightpush with relay"
node.wakuLightPush =
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
WakuLightPush.new(node.peerManager, node.rng, node.wakuRelay, some(rateLimit))

if node.started:
# Node has started already. Let's start lightpush too.
Expand Down
28 changes: 22 additions & 6 deletions waku/waku_lightpush/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_relay,
./common,
./rpc,
./rpc_codec,
Expand All @@ -23,7 +24,7 @@ logScope:
type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
wakuRelay*: WakuRelay
requestRateLimiter*: Option[TokenBucket]

proc handleRequest*(
Expand Down Expand Up @@ -56,9 +57,24 @@ proc handleRequest*(
pubsubTopic = pubsubTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex()

let handleRes = await wl.pushHandler(peerId, pubsubTopic, message)
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
# Validate the message using WakuRelay
let validationRes = await wl.wakuRelay.validateMessage(pubsubTopic, message)
if validationRes.isErr():
pushResponseInfo = validationRes.error
else:
# Publish the message using WakuRelay
let publishedCount = await wl.wakuRelay.publish(pubsubTopic, message)

if publishedCount == 0:
pushResponseInfo = "Message not published to any peers"
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
msg_hash = msgHash

isSuccess = publishedCount > 0

pushResponseInfo = (if isSuccess: "OK" else: pushResponseInfo)

if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
Expand Down Expand Up @@ -105,13 +121,13 @@ proc new*(
T: type WakuLightPush,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
wakuRelay: WakuRelay,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
wakuRelay: wakuRelay,
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
wl.initProtocolHandler()
Expand Down

0 comments on commit 9807c71

Please sign in to comment.