Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: log enhancement for message reliability analysis #2640

Merged
merged 10 commits into from
May 1, 2024
19 changes: 13 additions & 6 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
trace "waku.relay received",
peerId = node.peerId,
debug "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
hash = topic.computeMessageHash(msg).to0xHex(),
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

Expand Down Expand Up @@ -914,7 +914,9 @@ proc mountLightPush*(

if publishedCount == 0:
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
debug "Lightpush request has not been published to any peers"
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
debug "Lightpush request has not been published to any peers",
msg_hash = msgHash

return ok()

Expand Down Expand Up @@ -953,16 +955,21 @@ proc lightpushPublish*(
message: WakuMessage,
peer: RemotePeerInfo,
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
debug "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
peer = peer.peerId
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)

if not node.wakuLightPush.isNil():
debug "publishing message with self hosted lightpush",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be debug?

Because in the "publishing message with lightpush" case, it is being logged at info level.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per previous conversations we had in PM meeting, I will move back all info to debug

pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
msg_hash = msgHash
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)

if pubsubTopic.isSome():
Expand Down
17 changes: 14 additions & 3 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,37 @@ proc handleMessage*(

let
msgDigest = computeDigest(msg)
msgDigestHex = msgDigest.data.to0xHex()
msgHash = computeMessageHash(pubsubTopic, msg)
msgHashHex = msgHash.to0xHex()
msgTimestamp =
if msg.timestamp > 0:
msg.timestamp
else:
getNanosecondTime(getTime().toUnixFloat())

trace "handling message",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = toHex(msgDigest.data),
messageHash = toHex(msgHash)
digest = msgDigestHex

let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
debug "failed to insert message", err = error
error "failed to insert message", error = error

debug "message archived",
msg_hash = msgHashHex,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
msgTimestamp = msg.timestamp,
usedTimestamp = msgTimestamp,
digest = msgDigestHex

let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

Expand Down
36 changes: 24 additions & 12 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,31 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =

if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
trace "no addresses for peer", peer = peer
error "no addresses for peer", peer_id = shortLog(peer)
return

## TODO: Check if dial is necessary always???
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
if conn.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
trace "no connection to peer", peer = peer
error "no connection to peer", peer_id = shortLog(peer)
return

await conn.get().writeLp(buffer)

proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
) {.async.} =
let targetPeerIds = peers.mapIt(shortLog(it))
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()

debug "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
peers = peers,
hash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
target_peer_ids = targetPeerIds,
msg_hash = msgHash

let bufferToPublish = messagePush.encode().buffer

Expand Down Expand Up @@ -210,7 +214,10 @@ const MessagePushTimeout = 20.seconds
proc handleMessage*(
wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage
) {.async.} =
trace "handling message", pubsubTopic = pubsubTopic, message = message
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "handling message",
pubsubTopic = pubsubTopic, message = message, msg_hash = msgHash

let handleMessageStartTime = Moment.now()

Expand All @@ -219,7 +226,7 @@ proc handleMessage*(
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len == 0:
trace "no subscribed peers found",
debug "no subscribed peers found",
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
return

Expand All @@ -228,16 +235,20 @@ proc handleMessage*(
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(
MessagePushTimeout
):
debug "timed out pushing message to peers",
error "timed out pushing message to peers",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex()
msg_hash = msgHash,
numPeers = subscribedPeers.len,
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
else:
debug "pushed message succesfully to all subscribers",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
hash = pubsubTopic.computeMessageHash(message).to0xHex()
msg_hash = msgHash,
numPeers = subscribedPeers.len,
target_peer_ids = subscribedPeers.mapIt(shortLog(it))

let
handleMessageDuration = Moment.now() - handleMessageStartTime
Expand All @@ -247,22 +258,23 @@ proc handleMessage*(

proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peerId = conn.peerId
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)

let buf = await conn.readLp(int(DefaultMaxSubscribeSize))

let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():
error "Failed to decode filter subscribe request",
peerId = conn.peerId, err = decodeRes.error
peer_id = conn.peerId, err = decodeRes.error
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return

let request = decodeRes.value #TODO: toAPI() split here

let response = wf.handleSubscribeRequest(conn.peerId, request)

debug "sending filter subscribe response", peerId = conn.peerId, response = response
debug "sending filter subscribe response",
peer_id = shortLog(conn.peerId), response = response

await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
return
Expand Down
38 changes: 28 additions & 10 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ else:

import
std/strformat,
stew/results,
stew/[results, byteutils],
sequtils,
chronos,
chronicles,
Expand Down Expand Up @@ -201,38 +201,54 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
): Future[ValidationResult] {.async.} =
# can be optimized by checking if the message is a WakuMessage without allocating memory
# see nim-libp2p protobuf library
let msgRes = WakuMessage.decode(message.data)
if msgRes.isErr():
trace "protocol generateOrderedValidator reject decode error",
error = msgRes.error
let msg = WakuMessage.decode(message.data).valueOr:
error "protocol generateOrderedValidator reject decode error",
pubsubTopic = pubsubTopic, error = $error
return ValidationResult.Reject
let msg = msgRes.get()

let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()

# now sequentially validate the message
for (validator, _) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)

if validatorRes != ValidationResult.Accept:
error "protocol generateOrderedValidator reject waku validator",
msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes

return validatorRes

return ValidationResult.Accept

return wrappedValidator

proc validateMessage*(
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
): Future[Result[void, string]] {.async.} =
let messageSizeBytes = msg.encode().buffer.len
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()

if messageSizeBytes > w.maxMessageSize:
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
debug "Invalid Waku Message", error = message
error "too large Waku message",
msg_hash = msgHash,
error = message,
messageSizeBytes = messageSizeBytes,
maxMessageSize = w.maxMessageSize

return err(message)

for (validator, message) in w.wakuValidators:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
if message.len > 0:
error "invalid Waku message", msg_hash = msgHash, error = message
return err(message)
else:
return err("Validator failed")
## This should never happen
error "uncertain invalid Waku message", msg_hash = msgHash, error = message
return err("validator failed")

return ok()

proc subscribe*(
Expand All @@ -248,7 +264,7 @@ proc subscribe*(
if decMsg.isErr():
# fine if triggerSelf enabled, since validators are bypassed
error "failed to decode WakuMessage, validator passed a wrong message",
error = decMsg.error
pubsubTopic = pubsubTopic, error = decMsg.error
let fut = newFuture[void]()
fut.complete()
return fut
Expand Down Expand Up @@ -288,7 +304,9 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[int] {.async.} =
trace "publish", pubsubTopic = pubsubTopic
let data = message.encode().buffer
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()

debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic

return await procCall GossipSub(w).publish(pubsubTopic, data)
Loading