From 75ef73cb2665319ddc142ef9792038424c0e242d Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 21 Oct 2024 16:05:05 -0400 Subject: [PATCH 1/3] add log and archive message ingress for sync --- waku/waku_archive/archive.nim | 35 ++++++++++++++++++++++++++++++++++- waku/waku_sync/protocol.nim | 4 +++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 53053585f9..7b976eff07 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -53,7 +53,7 @@ type WakuArchive* = ref object proc validate*(msg: WakuMessage): Result[void, string] = if msg.ephemeral: # Ephemeral message, do not store - return + return ok() let now = getNanosecondTime(getTime().toUnixFloat()) @@ -87,6 +87,12 @@ proc handleMessage*( ) {.async.} = self.validator(msg).isOkOr: waku_archive_errors.inc(labelValues = [error]) + trace "failed to insert message", + hash_hash = computeMessageHash(pubsubTopic, msg).to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error return let msgHash = computeMessageHash(pubsubTopic, msg) @@ -111,6 +117,33 @@ proc handleMessage*( let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) +proc syncMessageIngress*( + self: WakuArchive, + msgHash: WakuMessageHash, + pubsubTopic: PubsubTopic, + msg: WakuMessage, +) {.async.} = + let insertStartTime = getTime().toUnixFloat() + + (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: + waku_archive_errors.inc(labelValues = [insertFailure]) + trace "failed to insert message", + hash_hash = msgHash.to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error + return + + trace "message archived", + hash_hash = msgHash.to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp + + let insertDuration = getTime().toUnixFloat() - insertStartTime + waku_archive_insert_duration_seconds.observe(insertDuration) + proc findMessages*( self: WakuArchive, query: ArchiveQuery ): Future[ArchiveResult] {.async, gcsafe.} = diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 0a5e6e49d8..620ff1fa6c 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -289,7 +289,9 @@ proc createTransferCallback( for kv in response.messages: let handleRes = catch: - await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + await wakuArchive.syncMessageIngress( + kv.messageHash, kv.pubsubTopic.get(), kv.message.get() + ) if handleRes.isErr(): error "message transfer failed", error = handleRes.error.msg From b44a6562b1dd7f6973748dc211f35f4e7aaf65bd Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 22 Oct 2024 07:43:42 -0400 Subject: [PATCH 2/3] renaming --- waku/waku_archive/archive.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 7b976eff07..b9264908d2 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -87,8 +87,8 @@ proc handleMessage*( ) {.async.} = self.validator(msg).isOkOr: waku_archive_errors.inc(labelValues = [error]) - trace "failed to insert message", - hash_hash = computeMessageHash(pubsubTopic, msg).to0xHex(), + trace "invalid message", + msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, @@ -101,7 +101,7 @@ proc handleMessage*( (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, @@ -109,7 +109,7 @@ proc handleMessage*( return trace "message archived", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp @@ -128,7 +128,7 @@ proc syncMessageIngress*( (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, @@ -136,7 +136,7 @@ proc syncMessageIngress*( return trace "message archived", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp From db5ef740e05571b4317bcda7adfc8e5ccefc56a7 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 22 Oct 2024 09:12:30 -0400 Subject: [PATCH 3/3] remove ok --- waku/waku_archive/archive.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index b9264908d2..914c7366db 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -53,7 +53,7 @@ type WakuArchive* = ref object proc validate*(msg: WakuMessage): Result[void, string] = if msg.ephemeral: # Ephemeral message, do not store - return ok() + return let now = getNanosecondTime(getTime().toUnixFloat())