From e558f47d3ff066c94ccc8ac612a44032b3168786 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 28 Apr 2024 18:29:17 +0200 Subject: [PATCH 01/10] log enhancement for message reliability analysis The next modules are touched: - waku_node.nim - archive.nim - waku_filter_v2/protocol.nim - waku_lightpush/protocol.nim - waku_relay/protocol.nim --- waku/node/waku_node.nim | 18 +++++---- waku/waku_archive/archive.nim | 15 +++++++- waku/waku_filter_v2/protocol.nim | 37 +++++++++++------- waku/waku_lightpush/protocol.nim | 65 +++++++++++++++++++++++--------- waku/waku_relay/protocol.nim | 34 +++++++++++++---- 5 files changed, 121 insertions(+), 48 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 0a7abb4b0e..4eb46b3893 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -227,10 +227,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = return proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - trace "waku.relay received", - peerId = node.peerId, + info "waku.relay received", + my_peer_id = node.peerId, pubsubTopic = topic, - hash = topic.computeMessageHash(msg).to0xHex(), + msg_hash = topic.computeMessageHash(msg).to0xHex(), receivedTime = getNowInNanosecondTime(), payloadSizeBytes = msg.payload.len @@ -914,7 +914,7 @@ proc mountLightPush*( if publishedCount == 0: ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - debug "Lightpush request has not been published to any peers" + info "Lightpush request has not been published to any peers" return ok() @@ -954,15 +954,19 @@ proc lightpushPublish*( peer: RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = if not node.wakuLightpushClient.isNil(): - debug "publishing message with lightpush", + info "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - peer = peer.peerId + target_peer_id = peer.peerId, + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) if not node.wakuLightPush.isNil(): debug "publishing message with self hosted lightpush", - pubsubTopic = pubsubTopic, contentTopic = message.contentTopic + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) if pubsubTopic.isSome(): diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index efbf17e0a9..c66544ecff 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -94,7 +94,9 @@ proc handleMessage*( let msgDigest = computeDigest(msg) + msgDigestHex = msgDigest.data.to0xHex() msgHash = computeMessageHash(pubsubTopic, msg) + msgHashHex = msgHash.to0xHex() msgTimestamp = if msg.timestamp > 0: msg.timestamp @@ -102,18 +104,27 @@ proc handleMessage*( getNanosecondTime(getTime().toUnixFloat()) trace "handling message", + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, msgTimestamp = msg.timestamp, usedTimestamp = msgTimestamp, - digest = toHex(msgDigest.data), - messageHash = toHex(msgHash) + digest = msgDigestHex let insertStartTime = getTime().toUnixFloat() (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) debug "failed to insert message", err = error + + info "message archived", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + msgTimestamp = msg.timestamp, + usedTimestamp = msgTimestamp, + digest = msgDigestHex + let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 11dcca6c82..2aca825089 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -156,7 +156,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): # Check that peer has not been removed from peer store - trace "no addresses for peer", peer = peer + error "no addresses for peer", peer_id = shortLog(peer) return ## TODO: Check if dial is necessary always??? @@ -164,7 +164,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = if conn.isNone(): ## We do not remove this peer, but allow the underlying peer manager ## to do so if it is deemed necessary - trace "no connection to peer", peer = peer + error "no connection to peer", peer_id = shortLog(peer) return await conn.get().writeLp(buffer) @@ -172,11 +172,12 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = proc pushToPeers( wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush ) {.async.} = - debug "pushing message to subscribed peers", + info "pushing message to subscribed peers", pubsubTopic = messagePush.pubsubTopic, contentTopic = messagePush.wakuMessage.contentTopic, - peers = peers, - hash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + target_peer_ids = peers.mapIt(shortLog(it)), + msg_hash = + messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() let bufferToPublish = messagePush.encode().buffer @@ -210,7 +211,10 @@ const MessagePushTimeout = 20.seconds proc handleMessage*( wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage ) {.async.} = - trace "handling message", pubsubTopic = pubsubTopic, message = message + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + + info "handling message", + pubsubTopic = pubsubTopic, message = message, msg_hash = msgHash let handleMessageStartTime = Moment.now() @@ -219,7 +223,7 @@ proc handleMessage*( let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) if subscribedPeers.len == 0: - trace "no subscribed peers found", + info "no subscribed peers found", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic return @@ -228,16 +232,20 @@ proc handleMessage*( if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout( MessagePushTimeout ): - debug "timed out pushing message to peers", + info "timed out pushing message to peers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash, + numPeers = subscribedPeers.len, + target_peer_ids = subscribedPeers.mapIt(shortLog(it)) waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - debug "pushed message succesfully to all subscribers", + info "pushed message succesfully to all subscribers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash, + numPeers = subscribedPeers.len, + target_peer_ids = subscribedPeers.mapIt(shortLog(it)) let handleMessageDuration = Moment.now() - handleMessageStartTime @@ -247,14 +255,14 @@ proc handleMessage*( proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async.} = - trace "filter subscribe request handler triggered", peerId = conn.peerId + trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId) let buf = await conn.readLp(int(DefaultMaxSubscribeSize)) let decodeRes = FilterSubscribeRequest.decode(buf) if decodeRes.isErr(): error "Failed to decode filter subscribe request", - peerId = conn.peerId, err = decodeRes.error + peer_id = conn.peerId, err = decodeRes.error waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return @@ -262,7 +270,8 @@ proc initProtocolHandler(wf: WakuFilter) = let response = wf.handleSubscribeRequest(conn.peerId, request) - debug "sending filter subscribe response", peerId = conn.peerId, response = response + info "sending filter subscribe response", + peer_id = shortLog(conn.peerId), response = response await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here return diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 16be3beb50..a8a2e9a8e7 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -26,6 +26,24 @@ type WakuLightPush* = ref object of LPProtocol pushHandler*: PushMessageHandler requestRateLimiter*: Option[TokenBucket] +proc extractInfoFromReq( + self: PushRPC +): tuple[reqId: string, pubsubTopic: string, msgHash: string, message: WakuMessage] = + ## Simply extract a tuple with the underlying data stored in `PushRPC` + + let requestId = self.requestId + var + pubsubTopic = "" + msgHash = "" + message: WakuMessage + + if not self.request.isNone(): + message = self.request.get().message + pubSubTopic = self.request.get().pubSubTopic + msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + + return (requestId, pubsubTopic, msgHash, message) + proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] ): Future[PushRPC] {.async.} = @@ -35,44 +53,57 @@ proc handleRequest*( isRejectedDueRateLimit = false pushResponseInfo = "" requestId = "" + pubsubTopic = "" + msgHash = "" if reqDecodeRes.isErr(): pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error + error "bad lightpush request", error = $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure + error "lightpush request is none" elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): isRejectedDueRateLimit = true let pushRpcRequest = reqDecodeRes.get() - debug "lightpush request rejected due rate limit exceeded", - peerId = peerId, requestId = pushRpcRequest.requestId + + let reqInfo = pushRpcRequest.extractInfoFromReq() + + error "lightpush request rejected due rate limit exceeded", + peer_id = peerId, + requestId = reqInfo.reqId, + pubsubTopic = reqInfo.pubsubTopic, + msg_hash = reqInfo.msgHash + pushResponseInfo = TooManyRequestsMessage waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: waku_service_requests.inc(labelValues = ["Lightpush"]) + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - let pushRpcRequest = reqDecodeRes.get() - - requestId = pushRpcRequest.requestId + let reqInfo = reqDecodeRes.get().extractInfoFromReq() - let - request = pushRpcRequest.request + requestId = reqInfo.reqId + pubsubTopic = reqInfo.pubsubTopic + msgHash = reqInfo.msgHash - pubSubTopic = request.get().pubSubTopic - message = request.get().message - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - debug "push request", - peerId = peerId, - requestId = requestId, - pubsubTopic = pubsubTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + let handleRes = await wl.pushHandler(peerId, pubsubTopic, reqInfo.message) - let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) isSuccess = handleRes.isOk() pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) if not isSuccess and not isRejectedDueRateLimit: waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) - error "failed to push message", error = pushResponseInfo + + error "failed to push message", + pubsubTopic = pubsubTopic, msg_hash = msgHash, error = pushResponseInfo + + if isSuccess: + info "lightpush request processed correctly", + lightpush_client_peer_id = shortLog(peerId), + requestId = requestId, + pubsubTopic = pubsubTopic, + msg_hash = msgHash + let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) let rpc = PushRPC(requestId: requestId, response: some(response)) return rpc diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index f2537c58a4..f20d4df69c 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -9,7 +9,7 @@ else: import std/strformat, - stew/results, + stew/[results, byteutils], sequtils, chronos, chronicles, @@ -203,16 +203,22 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = # see nim-libp2p protobuf library let msgRes = WakuMessage.decode(message.data) if msgRes.isErr(): - trace "protocol generateOrderedValidator reject decode error", - error = msgRes.error + error "protocol generateOrderedValidator reject decode error", + pubsubTopic = pubsubTopic, error = msgRes.error return ValidationResult.Reject let msg = msgRes.get() + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() # now sequentially validate the message for (validator, _) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) + if validatorRes != ValidationResult.Accept: + error "protocol generateOrderedValidator rejest waku validator", + msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes + return validatorRes + return ValidationResult.Accept return wrappedValidator @@ -220,19 +226,29 @@ proc validateMessage*( w: WakuRelay, pubsubTopic: string, msg: WakuMessage ): Future[Result[void, string]] {.async.} = let messageSizeBytes = msg.encode().buffer.len + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() if messageSizeBytes > w.maxMessageSize: - let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" - debug "Invalid Waku Message", error = message + let message = fmt"message size exceeded maximum of {w.maxMessageSize} bytes" + error "too large Waku message", + msg_hash = msgHash, + error = message, + messageSizeBytes = messageSizeBytes, + maxMessageSize = w.maxMessageSize + return err(message) for (validator, message) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: if message.len > 0: + error "invalid Waku message", msg_hash = msgHash, error = message return err(message) else: - return err("Validator failed") + ## This should never happen + error "uncertain invalid Waku message", msg_hash = msgHash, error = message + return err("validator failed") + return ok() proc subscribe*( @@ -248,7 +264,7 @@ proc subscribe*( if decMsg.isErr(): # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", - error = decMsg.error + pubsubTopic = pubsubTopic, error = decMsg.error let fut = newFuture[void]() fut.complete() return fut @@ -288,7 +304,9 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[int] {.async.} = - trace "publish", pubsubTopic = pubsubTopic let data = message.encode().buffer + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + + info "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic return await procCall GossipSub(w).publish(pubsubTopic, data) From 642bc6ffa88bc33041ba2fd267b68a50882b18cc Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 28 Apr 2024 20:27:40 +0200 Subject: [PATCH 02/10] waku_relay/protocol.nim: restore back upper case --- waku/waku_relay/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index f20d4df69c..5b924066cb 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -229,7 +229,7 @@ proc validateMessage*( let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() if messageSizeBytes > w.maxMessageSize: - let message = fmt"message size exceeded maximum of {w.maxMessageSize} bytes" + let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" error "too large Waku message", msg_hash = msgHash, error = message, From 45e46652e2d2bdbf2b729df07989f0e36afa0044 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 29 Apr 2024 10:16:39 +0200 Subject: [PATCH 03/10] Update waku/waku_relay/protocol.nim Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> --- waku/waku_relay/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 5b924066cb..2ee980a493 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -214,7 +214,7 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: - error "protocol generateOrderedValidator rejest waku validator", + error "protocol generateOrderedValidator reject waku validator", msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes return validatorRes From aebe405c7d8f3ecccb6dfc988d1e781bf6539030 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 29 Apr 2024 11:08:57 +0200 Subject: [PATCH 04/10] =?UTF-8?q?waku=5Ffilter=5Fv2/protocol.nim:=20apply?= =?UTF-8?q?=20Zolt=C3=A1n's=20optimization=20suggestions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- waku/waku_filter_v2/protocol.nim | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 2aca825089..76f8d7015b 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -172,12 +172,15 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = proc pushToPeers( wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush ) {.async.} = + let targetPeerIds = peers.mapIt(shortLog(it)) + let msgHash = + messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + info "pushing message to subscribed peers", pubsubTopic = messagePush.pubsubTopic, contentTopic = messagePush.wakuMessage.contentTopic, - target_peer_ids = peers.mapIt(shortLog(it)), - msg_hash = - messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + target_peer_ids = targetPeerIds, + msg_hash = msgHash let bufferToPublish = messagePush.encode().buffer From a29db36a8d3ddc680762cf7f77336d1cc7d5e2dd Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Mon, 29 Apr 2024 11:11:33 +0200 Subject: [PATCH 05/10] waku_filter_v2/protocol: log info -> error when timed out push --- waku/waku_filter_v2/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 76f8d7015b..ab0de301b0 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -235,7 +235,7 @@ proc handleMessage*( if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout( MessagePushTimeout ): - info "timed out pushing message to peers", + error "timed out pushing message to peers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, msg_hash = msgHash, From b86fc11144d686a84dceb394bf560b545260b4aa Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 30 Apr 2024 20:59:31 +0200 Subject: [PATCH 06/10] waku_node.nim: only calculate msgHash once in lightpushPublish --- waku/node/waku_node.nim | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4eb46b3893..7762991247 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -953,20 +953,21 @@ proc lightpushPublish*( message: WakuMessage, peer: RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): info "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, - msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) if not node.wakuLightPush.isNil(): - debug "publishing message with self hosted lightpush", + info "publishing message with self hosted lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, - msg_hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) if pubsubTopic.isSome(): From ef72b2ba55e40df4b689d50cf6d17a23a94aaef2 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 30 Apr 2024 21:35:52 +0200 Subject: [PATCH 07/10] change info to debug the log lines that were previously set to info --- waku/node/waku_node.nim | 10 ++++++---- waku/waku_archive/archive.nim | 4 ++-- waku/waku_filter_v2/protocol.nim | 10 +++++----- waku/waku_lightpush/protocol.nim | 8 ++++---- waku/waku_relay/protocol.nim | 10 +++++----- 5 files changed, 22 insertions(+), 20 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7762991247..3dd514f6c4 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -227,7 +227,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = return proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - info "waku.relay received", + debug "waku.relay received", my_peer_id = node.peerId, pubsubTopic = topic, msg_hash = topic.computeMessageHash(msg).to0xHex(), @@ -914,7 +914,9 @@ proc mountLightPush*( if publishedCount == 0: ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - info "Lightpush request has not been published to any peers" + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + debug "Lightpush request has not been published to any peers", + msg_hash = msgHash return ok() @@ -955,7 +957,7 @@ proc lightpushPublish*( ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): - info "publishing message with lightpush", + debug "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, @@ -963,7 +965,7 @@ proc lightpushPublish*( return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) if not node.wakuLightPush.isNil(): - info "publishing message with self hosted lightpush", + debug "publishing message with self hosted lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, target_peer_id = peer.peerId, diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index c66544ecff..1829ff0552 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -115,9 +115,9 @@ proc handleMessage*( (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) - debug "failed to insert message", err = error + error "failed to insert message", error = error - info "message archived", + debug "message archived", msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index ab0de301b0..a072a99871 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -176,7 +176,7 @@ proc pushToPeers( let msgHash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() - info "pushing message to subscribed peers", + debug "pushing message to subscribed peers", pubsubTopic = messagePush.pubsubTopic, contentTopic = messagePush.wakuMessage.contentTopic, target_peer_ids = targetPeerIds, @@ -216,7 +216,7 @@ proc handleMessage*( ) {.async.} = let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - info "handling message", + debug "handling message", pubsubTopic = pubsubTopic, message = message, msg_hash = msgHash let handleMessageStartTime = Moment.now() @@ -226,7 +226,7 @@ proc handleMessage*( let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) if subscribedPeers.len == 0: - info "no subscribed peers found", + debug "no subscribed peers found", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic return @@ -243,7 +243,7 @@ proc handleMessage*( target_peer_ids = subscribedPeers.mapIt(shortLog(it)) waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - info "pushed message succesfully to all subscribers", + debug "pushed message succesfully to all subscribers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, msg_hash = msgHash, @@ -273,7 +273,7 @@ proc initProtocolHandler(wf: WakuFilter) = let response = wf.handleSubscribeRequest(conn.peerId, request) - info "sending filter subscribe response", + debug "sending filter subscribe response", peer_id = shortLog(conn.peerId), response = response await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index a8a2e9a8e7..c2ab01265c 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -28,7 +28,7 @@ type WakuLightPush* = ref object of LPProtocol proc extractInfoFromReq( self: PushRPC -): tuple[reqId: string, pubsubTopic: string, msgHash: string, message: WakuMessage] = +): tuple[requestId: string, pubsubTopic: string, msgHash: string, message: WakuMessage] = ## Simply extract a tuple with the underlying data stored in `PushRPC` let requestId = self.requestId @@ -70,7 +70,7 @@ proc handleRequest*( error "lightpush request rejected due rate limit exceeded", peer_id = peerId, - requestId = reqInfo.reqId, + requestId = reqInfo.requestId, pubsubTopic = reqInfo.pubsubTopic, msg_hash = reqInfo.msgHash @@ -82,7 +82,7 @@ proc handleRequest*( let reqInfo = reqDecodeRes.get().extractInfoFromReq() - requestId = reqInfo.reqId + requestId = reqInfo.requestId pubsubTopic = reqInfo.pubsubTopic msgHash = reqInfo.msgHash @@ -98,7 +98,7 @@ proc handleRequest*( pubsubTopic = pubsubTopic, msg_hash = msgHash, error = pushResponseInfo if isSuccess: - info "lightpush request processed correctly", + debug "lightpush request processed correctly", lightpush_client_peer_id = shortLog(peerId), requestId = requestId, pubsubTopic = pubsubTopic, diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 2ee980a493..171b1deeb5 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -201,12 +201,11 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = ): Future[ValidationResult] {.async.} = # can be optimized by checking if the message is a WakuMessage without allocating memory # see nim-libp2p protobuf library - let msgRes = WakuMessage.decode(message.data) - if msgRes.isErr(): + let msg = WakuMessage.decode(message.data).valueOr: error "protocol generateOrderedValidator reject decode error", - pubsubTopic = pubsubTopic, error = msgRes.error + pubsubTopic = pubsubTopic, error = $error return ValidationResult.Reject - let msg = msgRes.get() + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() # now sequentially validate the message @@ -220,6 +219,7 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = return validatorRes return ValidationResult.Accept + return wrappedValidator proc validateMessage*( @@ -307,6 +307,6 @@ proc publish*( let data = message.encode().buffer let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() - info "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic + debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic return await procCall GossipSub(w).publish(pubsubTopic, data) From e3ce42c81be1663b82c5dd78093ed4968cafb561 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 30 Apr 2024 22:04:40 +0200 Subject: [PATCH 08/10] revert changes in waku_lightpush/protocol --- waku/waku_lightpush/protocol.nim | 65 +++++++++----------------------- 1 file changed, 17 insertions(+), 48 deletions(-) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index c2ab01265c..16be3beb50 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -26,24 +26,6 @@ type WakuLightPush* = ref object of LPProtocol pushHandler*: PushMessageHandler requestRateLimiter*: Option[TokenBucket] -proc extractInfoFromReq( - self: PushRPC -): tuple[requestId: string, pubsubTopic: string, msgHash: string, message: WakuMessage] = - ## Simply extract a tuple with the underlying data stored in `PushRPC` - - let requestId = self.requestId - var - pubsubTopic = "" - msgHash = "" - message: WakuMessage - - if not self.request.isNone(): - message = self.request.get().message - pubSubTopic = self.request.get().pubSubTopic - msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - - return (requestId, pubsubTopic, msgHash, message) - proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] ): Future[PushRPC] {.async.} = @@ -53,57 +35,44 @@ proc handleRequest*( isRejectedDueRateLimit = false pushResponseInfo = "" requestId = "" - pubsubTopic = "" - msgHash = "" if reqDecodeRes.isErr(): pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error - error "bad lightpush request", error = $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure - error "lightpush request is none" elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): isRejectedDueRateLimit = true let pushRpcRequest = reqDecodeRes.get() - - let reqInfo = pushRpcRequest.extractInfoFromReq() - - error "lightpush request rejected due rate limit exceeded", - peer_id = peerId, - requestId = reqInfo.requestId, - pubsubTopic = reqInfo.pubsubTopic, - msg_hash = reqInfo.msgHash - + debug "lightpush request rejected due rate limit exceeded", + peerId = peerId, requestId = pushRpcRequest.requestId pushResponseInfo = TooManyRequestsMessage waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: waku_service_requests.inc(labelValues = ["Lightpush"]) - waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - let reqInfo = reqDecodeRes.get().extractInfoFromReq() + let pushRpcRequest = reqDecodeRes.get() - requestId = reqInfo.requestId - pubsubTopic = reqInfo.pubsubTopic - msgHash = reqInfo.msgHash + requestId = pushRpcRequest.requestId - let handleRes = await wl.pushHandler(peerId, pubsubTopic, reqInfo.message) + let + request = pushRpcRequest.request + pubSubTopic = request.get().pubSubTopic + message = request.get().message + waku_lightpush_messages.inc(labelValues = ["PushRequest"]) + debug "push request", + peerId = peerId, + requestId = requestId, + pubsubTopic = pubsubTopic, + hash = pubsubTopic.computeMessageHash(message).to0xHex() + + let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) isSuccess = handleRes.isOk() pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) if not isSuccess and not isRejectedDueRateLimit: waku_lightpush_errors.inc(labelValues = [pushResponseInfo]) - - error "failed to push message", - pubsubTopic = pubsubTopic, msg_hash = msgHash, error = pushResponseInfo - - if isSuccess: - debug "lightpush request processed correctly", - lightpush_client_peer_id = shortLog(peerId), - requestId = requestId, - pubsubTopic = pubsubTopic, - msg_hash = msgHash - + error "failed to push message", error = pushResponseInfo let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo)) let rpc = PushRPC(requestId: requestId, response: some(response)) return rpc From 3d400a4087eea36dec920f844564057656f335aa Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 1 May 2024 01:28:40 +0200 Subject: [PATCH 09/10] d --- waku/waku_lightpush/protocol.nim | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 16be3beb50..74ac288447 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -38,13 +38,15 @@ proc handleRequest*( if reqDecodeRes.isErr(): pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error + # error "bad lightpush request", error = $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure + # error "lightpush request is none" elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): isRejectedDueRateLimit = true let pushRpcRequest = reqDecodeRes.get() - debug "lightpush request rejected due rate limit exceeded", - peerId = peerId, requestId = pushRpcRequest.requestId + # debug "lightpush request rejected due rate limit exceeded", + # peerid = peerId, requestId = pushRpcRequest.requestId pushResponseInfo = TooManyRequestsMessage waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: @@ -60,14 +62,25 @@ proc handleRequest*( pubSubTopic = request.get().pubSubTopic message = request.get().message waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - debug "push request", - peerId = peerId, - requestId = requestId, - pubsubTopic = pubsubTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + + # let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() + + # debug "lightpush request", + # peerId = peerId, + # requestId = requestId, + # pubsubTopic = pubsubTopic, + # msg_hash = msgHash let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) isSuccess = handleRes.isOk() + + # if isSuccess: + # debug "lightpush request processed correctly", + # lightpush_client_peer_id = shortLog(peerId), + # requestId = requestId, + # pubsubTopic = pubsubTopic, + # msg_hash = msgHash + pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) if not isSuccess and not isRejectedDueRateLimit: From 679544d8eb00f86dc73e28a16e3f03b75bdc180a Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 1 May 2024 01:44:04 +0200 Subject: [PATCH 10/10] revert lightpush changes --- waku/waku_lightpush/protocol.nim | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 74ac288447..16be3beb50 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -38,15 +38,13 @@ proc handleRequest*( if reqDecodeRes.isErr(): pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error - # error "bad lightpush request", error = $reqDecodeRes.error elif reqDecodeRes.get().request.isNone(): pushResponseInfo = emptyRequestBodyFailure - # error "lightpush request is none" elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1): isRejectedDueRateLimit = true let pushRpcRequest = reqDecodeRes.get() - # debug "lightpush request rejected due rate limit exceeded", - # peerid = peerId, requestId = pushRpcRequest.requestId + debug "lightpush request rejected due rate limit exceeded", + peerId = peerId, requestId = pushRpcRequest.requestId pushResponseInfo = TooManyRequestsMessage waku_service_requests_rejected.inc(labelValues = ["Lightpush"]) else: @@ -62,25 +60,14 @@ proc handleRequest*( pubSubTopic = request.get().pubSubTopic message = request.get().message waku_lightpush_messages.inc(labelValues = ["PushRequest"]) - - # let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() - - # debug "lightpush request", - # peerId = peerId, - # requestId = requestId, - # pubsubTopic = pubsubTopic, - # msg_hash = msgHash + debug "push request", + peerId = peerId, + requestId = requestId, + pubsubTopic = pubsubTopic, + hash = pubsubTopic.computeMessageHash(message).to0xHex() let handleRes = await wl.pushHandler(peerId, pubsubTopic, message) isSuccess = handleRes.isOk() - - # if isSuccess: - # debug "lightpush request processed correctly", - # lightpush_client_peer_id = shortLog(peerId), - # requestId = requestId, - # pubsubTopic = pubsubTopic, - # msg_hash = msgHash - pushResponseInfo = (if isSuccess: "OK" else: handleRes.error) if not isSuccess and not isRejectedDueRateLimit: