From 97b50e70f7e17847066f95bcb1964fd856a88c8c Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 9 May 2024 10:14:24 -0400 Subject: [PATCH 1/6] msg transfer mechanism --- waku/node/waku_node.nim | 47 ++++++++++++++++++++++-- waku/waku_store/client.nim | 2 +- waku/waku_sync/common.nim | 12 +++---- waku/waku_sync/protocol.nim | 72 ++++++++++++++++++++----------------- waku/waku_sync/session.nim | 4 +-- 5 files changed, 92 insertions(+), 45 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 1b5ab9f9d6..303e13c1eb 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -205,7 +205,7 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] ): Future[ Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure, gcsafe.} = + ] {.async: (raises: []), closure.} = let archiveCursor = if cursor.isSome(): some(ArchiveCursor(hash: cursor.get())) @@ -243,8 +243,49 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = return ok((elements, cursor)) - #TODO add sync callback and options - node.wakuSync = WakuSync.new(peerManager = node.peerManager, pruneCB = some(prune)) + let transfer: TransferCallback = proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} = + var query = StoreQueryRequest() + query.includeData = true + query.messageHashes = hashes + query.paginationLimit = some(uint64(100)) + + while true: + # Do we need a query retry mechanism?? + let queryRes = catch: + await node.wakuStoreClient.query(query, peerId) + + let res = + if queryRes.isErr(): + return err(queryRes.error.msg) + else: + queryRes.get() + + let response = res.valueOr: + return err($error) + + query.paginationCursor = response.paginationCursor + + for kv in response.messages: + let handleRes = catch: + await node.wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + + if handleRes.isErr(): + trace "message transfer failed", error + # Messages can be synced next time since they are not added to storage yet. + continue + + node.wakuSync.ingessMessage(kv.pubsubTopic.get(), kv.message.get()) + + if query.paginationCursor.isNone(): + break + + return ok() + + node.wakuSync = WakuSync.new( + peerManager = node.peerManager, pruneCB = some(prune), transferCB = some(transfer) + ) let catchRes = catch: node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 4f378ddbcf..48e35a543c 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -51,7 +51,7 @@ proc sendStoreRequest( return ok(res) proc query*( - self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo + self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId ): Future[StoreQueryResult] {.async, gcsafe.} = if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim index 528d0e1855..e4f58a8527 100644 --- a/waku/waku_sync/common.nim +++ b/waku/waku_sync/common.nim @@ -3,25 +3,25 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[options], chronos +import std/[options], chronos, libp2p/peerId import ../waku_core const DefaultSyncInterval*: Duration = 1.hours - DefaultPruneInterval*: Duration = 30.minutes WakuSyncCodec* = "/vac/waku/sync/1.0.0" - DefaultFrameSize* = 153600 + DefaultMaxFrameSize* = 153600 #TODO change to something sensible DefaultGossipSubJitter*: Duration = 20.seconds type - SyncCallback* = - proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.async: (raises: []).} + TransferCallback* = proc( + hashes: seq[WakuMessageHash], peerId: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} PruneCallback* = proc( startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash) ): Future[ Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []).} + ] {.async: (raises: []), closure.} SyncPayload* = object rangeStart*: Option[uint64] diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 6ea78736bf..171d1df933 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -31,12 +31,15 @@ type WakuSync* = ref object of LPProtocol storage: Storage # Negentropy protocol storage peerManager: PeerManager maxFrameSize: int + syncInterval: timer.Duration # Time between each syncronisation attempt relayJitter: Duration # Time delay until all messages are mostly received network wide - syncCallBack: Option[SyncCallBack] # Callback with the result of the syncronisation + transferCallBack: Option[TransferCallback] # Callback for message transfer. + pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query pruneStart: Timestamp # Last pruning start timestamp pruneInterval: Duration # Time between each pruning attempt + periodicSyncFut: Future[void] periodicPruneFut: Future[void] @@ -121,9 +124,7 @@ proc sync*( let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error return err("Sync request error: " & error) return ok((hashes, peer)) @@ -137,9 +138,7 @@ proc sync*( let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error return err("Sync request error: " & error) @@ -184,14 +183,16 @@ proc initProtocolHandler(self: WakuSync) = let hashes = (await self.handleLoop(conn)).valueOr: debug "sync session ended", - server = self.peerManager.switch.peerInfo.peerId, - client = conn.peerId, - error = $error + server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error #TODO send error code and desc to client return - #TODO handle the hashes that the server need from the client + if self.transferCallBack.isSome(): + let callback = self.transferCallBack.get() + + (await callback(hashes, conn.peerId)).isOkOr: + debug "transfer callback failed", error debug "sync session ended gracefully", server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId @@ -202,13 +203,11 @@ proc initProtocolHandler(self: WakuSync) = proc new*( T: type WakuSync, peerManager: PeerManager, - maxFrameSize: int = DefaultFrameSize, + maxFrameSize: int = DefaultMaxFrameSize, syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: Duration = DefaultGossipSubJitter, - #Default gossipsub jitter in network. - syncCB: Option[SyncCallback] = none(SyncCallback), - pruneInterval: Duration = DefaultPruneInterval, pruneCB: Option[PruneCallBack] = none(PruneCallback), + transferCB: Option[TransferCallback] = none(TransferCallback), ): T = let storage = Storage.new().valueOr: debug "storage creation failed" @@ -220,10 +219,8 @@ proc new*( maxFrameSize: maxFrameSize, syncInterval: syncInterval, relayJitter: relayJitter, - syncCallBack: syncCB, pruneCallBack: pruneCB, - pruneStart: getNowInNanosecondTime(), - pruneInterval: pruneInterval, + transferCallBack: transferCB, ) sync.initProtocolHandler() @@ -232,34 +229,39 @@ proc new*( return sync -proc periodicSync(self: WakuSync) {.async.} = +proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = while true: await sleepAsync(self.syncInterval) + debug "periodic sync started" + let (hashes, peer) = (await self.sync()).valueOr: - debug "periodic sync failed", error = error + debug "sync failed", error continue - let callback = self.syncCallBack.valueOr: + (await callback(hashes, peer.peerId)).isOkOr: + debug "transfer callback failed", error continue - await callback(hashes, peer) + debug "periodic sync done", hashSynced = hashes.len -proc periodicPrune(self: WakuSync) {.async.} = - let callback = self.pruneCallBack.valueOr: - return +proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = + await sleepAsync(self.syncInterval) + + var pruneStop = getNowInNanosecondTime() while true: - await sleepAsync(self.pruneInterval) + await sleepAsync(self.syncInterval) - let pruneStop = getNowInNanosecondTime() + debug "periodic prune started", + startTime = self.pruneStart, endTime = pruneStop, storageSize = self.storage.len var (elements, cursor) = (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) while true: (elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr: - debug "storage pruning failed", error = $error + debug "pruning callback failed", error break if elements.len == 0: @@ -267,22 +269,26 @@ proc periodicPrune(self: WakuSync) {.async.} = for (hash, timestamp) in elements: self.storage.erase(timestamp, hash).isOkOr: - trace "element pruning failed", time = timestamp, hash = hash, error = error + trace "storage erase failed", timestamp, hash, error continue if cursor.isNone(): break self.pruneStart = pruneStop + pruneStop = getNowInNanosecondTime() + + debug "periodic prune done", storageSize = self.storage.len proc start*(self: WakuSync) = self.started = true + self.pruneStart = getNowInNanosecondTime() - if self.syncCallBack.isSome() and self.syncInterval > ZeroDuration: - self.periodicSyncFut = self.periodicSync() + if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicSyncFut = self.periodicSync(self.transferCallBack.get()) - if self.pruneCallBack.isSome() and self.pruneInterval > ZeroDuration: - self.periodicPruneFut = self.periodicPrune() + if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get()) info "WakuSync protocol started" diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim index 7ebe9e708d..ea8ee93c7a 100644 --- a/waku/waku_sync/session.nim +++ b/waku/waku_sync/session.nim @@ -53,7 +53,7 @@ type Completed*[T] = object proc clientInitialize*( store: Storage, conn: Connection, - frameSize = DefaultFrameSize, + frameSize = DefaultMaxFrameSize, start = int64.low, `end` = int64.high, ): Result[Reconciled[ClientSync], string] = @@ -80,7 +80,7 @@ proc clientInitialize*( proc serverInitialize*( store: Storage, conn: Connection, - frameSize = DefaultFrameSize, + frameSize = DefaultMaxFrameSize, start = int64.low, `end` = int64.high, ): Result[Sent[ServerSync], string] = From 3a1b588a8856d57b969809d130f8131515cb56c2 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 9 May 2024 10:24:43 -0400 Subject: [PATCH 2/6] fixes --- waku/node/waku_node.nim | 2 +- waku/waku_sync/raw_bindings.nim | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 303e13c1eb..9c6a38f776 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -272,7 +272,7 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = await node.wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) if handleRes.isErr(): - trace "message transfer failed", error + trace "message transfer failed", error = handleRes.error.msg # Messages can be synced next time since they are not added to storage yet. continue diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim index df20f30715..b71bcb29a0 100644 --- a/waku/waku_sync/raw_bindings.nim +++ b/waku/waku_sync/raw_bindings.nim @@ -222,6 +222,9 @@ proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, s else: return err("insert error") +proc len*(storage: Storage): int = + int(storage.size) + proc `==`*(a: NegentropySubRange, b: pointer): bool {.borrow.} proc new*( From 94e626166b3579543b0abbcfa795de33be0597e2 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 9 May 2024 11:59:35 -0400 Subject: [PATCH 3/6] prune test --- tests/waku_sync/sync_utils.nim | 13 +++- tests/waku_sync/test_protocol.nim | 117 ++++++++++++++++++++++-------- waku/waku_sync/protocol.nim | 3 + 3 files changed, 98 insertions(+), 35 deletions(-) diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index 0d01265aa1..5a6c26816b 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -5,12 +5,19 @@ import std/options, chronos, chronicles, libp2p/crypto/crypto import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore proc newTestWakuSync*( - switch: Switch, handler: SyncCallback -): Future[WakuSync] {.async.} = + switch: Switch, + transfer: Option[TransferCallback] = none(TransferCallback), + prune: Option[PruneCallback] = none(PruneCallback), + interval: Duration = DefaultSyncInterval, +): WakuSync = let peerManager = PeerManager.new(switch) proto = WakuSync.new( - peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler) + peerManager = peerManager, + relayJitter = 0.seconds, + transferCB = transfer, + pruneCB = prune, + syncInterval = interval, ) assert proto != nil diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index cd43cde602..fb401b4c46 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, times], + std/options, testutils/unittests, chronos, chronicles, @@ -20,7 +20,7 @@ import waku_sync, waku_sync/raw_bindings, ], - ../testlib/[common, wakucore, testasync], + ../testlib/[wakucore, testasync], ./sync_utils random.randomize() @@ -29,7 +29,7 @@ suite "Waku Sync": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch - var protoHandler {.threadvar.}: SyncCallback + var transferCallback {.threadvar.}: TransferCallback var server {.threadvar.}: WakuSync var client {.threadvar.}: WakuSync @@ -42,15 +42,16 @@ suite "Waku Sync": await allFutures(serverSwitch.start(), clientSwitch.start()) - protoHandler = proc( - hashes: seq[WakuMessageHash], peer: RemotePeerInfo - ) {.async: (raises: []), closure, gcsafe.} = + transferCallback = proc( + hashes: seq[WakuMessageHash], peer: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} = debug "Received needHashes from peer:", len = hashes.len for hash in hashes: - debug "Hash received from peer:", hash = hash.to0xHex() + trace "Hash received from peer:", hash = hash.to0xHex() + return ok() - server = await newTestWakuSync(serverSwitch, handler = protoHandler) - client = await newTestWakuSync(clientSwitch, handler = protoHandler) + server = newTestWakuSync(serverSwitch, transfer = some(transferCallback)) + client = newTestWakuSync(clientSwitch, transfer = some(transferCallback)) serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() @@ -75,7 +76,7 @@ suite "Waku Sync": server.ingessMessage(DefaultPubsubTopic, msg3) var hashes = await client.sync(serverPeerInfo) - await sleepAsync(1) # to ensure graceful shutdown + #await sleepAsync(1) # to ensure graceful shutdown require (hashes.isOk()) check: hashes.value.len == 3 @@ -210,7 +211,7 @@ suite "Waku Sync": ## Setup let client2Switch = newTestSwitch() await client2Switch.start() - let client2 = await newTestWakuSync(client2Switch, handler = protoHandler) + let client2 = newTestWakuSync(client2Switch, transfer = some(transferCallback)) let msgCount = 10000 var i = 0 @@ -259,10 +260,10 @@ suite "Waku Sync": ) let - client2 = await newTestWakuSync(client2Switch, handler = protoHandler) - client3 = await newTestWakuSync(client3Switch, handler = protoHandler) - client4 = await newTestWakuSync(client4Switch, handler = protoHandler) - client5 = await newTestWakuSync(client5Switch, handler = protoHandler) + client2 = newTestWakuSync(client2Switch, transfer = some(transferCallback)) + client3 = newTestWakuSync(client3Switch, transfer = some(transferCallback)) + client4 = newTestWakuSync(client4Switch, transfer = some(transferCallback)) + client5 = newTestWakuSync(client5Switch, transfer = some(transferCallback)) let msgCount = 100000 var i = 0 @@ -283,9 +284,9 @@ suite "Waku Sync": i = i + 1 #info "client2 storage size", size = client2.storageSize() - var timeBefore = cpuTime() + var timeBefore = getNowInNanosecondTime() let hashes1 = await client.sync(serverPeerInfo) - var timeAfter = cpuTime() + var timeAfter = getNowInNanosecondTime() var syncTime = (timeAfter - timeBefore) info "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime assert hashes1.isOk(), $hashes1.error @@ -293,9 +294,9 @@ suite "Waku Sync": hashes1.value.len == 1 #TODO: Check if all diffHashes are there in needHashes - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes2 = await client2.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) info "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime assert hashes2.isOk(), $hashes2.error @@ -303,9 +304,9 @@ suite "Waku Sync": hashes2.value.len == 10 #TODO: Check if all diffHashes are there in needHashes - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes3 = await client3.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) info "sync time in seconds", msgsTotal = msgCount, diff = 100, syncTime = syncTime assert hashes3.isOk(), $hashes3.error @@ -313,9 +314,9 @@ suite "Waku Sync": hashes3.value.len == 100 #TODO: Check if all diffHashes are there in needHashes - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes4 = await client4.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) info "sync time in seconds", msgsTotal = msgCount, diff = 1000, syncTime = syncTime @@ -324,9 +325,9 @@ suite "Waku Sync": hashes4.value.len == 1000 #TODO: Check if all diffHashes are there in needHashes - timeBefore = cpuTime() + timeBefore = getNowInNanosecondTime() let hashes5 = await client5.sync(serverPeerInfo) - timeAfter = cpuTime() + timeAfter = getNowInNanosecondTime() syncTime = (timeAfter - timeBefore) info "sync time in seconds", msgsTotal = msgCount, diff = 10000, syncTime = syncTime @@ -362,17 +363,18 @@ suite "Waku Sync": let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - let protoHandler: SyncCallback = proc( - hashes: seq[WakuMessageHash], peer: RemotePeerInfo - ) {.async: (raises: []), closure, gcsafe.} = + let transferCallback = proc( + hashes: seq[WakuMessageHash], peer: PeerId + ): Future[Result[void, string]] {.async: (raises: []), closure.} = debug "Received needHashes from peer:", len = hashes.len for hash in hashes: - debug "Hash received from peer:", hash = hash.to0xHex() + trace "Hash received from peer:", hash = hash.to0xHex() + return ok() let - node1 = await newTestWakuSync(node1Switch, handler = protoHandler) - node2 = await newTestWakuSync(node2Switch, handler = protoHandler) - node3 = await newTestWakuSync(node3Switch, handler = protoHandler) + node1 = newTestWakuSync(node1Switch, transfer = some(transferCallback)) + node2 = newTestWakuSync(node2Switch, transfer = some(transferCallback)) + node3 = newTestWakuSync(node3Switch, transfer = some(transferCallback)) node1.ingessMessage(DefaultPubsubTopic, msg1) node2.ingessMessage(DefaultPubsubTopic, msg1) @@ -405,6 +407,57 @@ suite "Waku Sync": await allFutures(node1.stop(), node2.stop(), node3.stop()) await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop()) + asyncTest "pruning at interval": + let switch = newTestSwitch() + + await switch.start() + + let + msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + msgHash1 = computeMessageHash(DefaultPubsubTopic, msg1) + msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + msgHash2 = computeMessageHash(DefaultPubsubTopic, msg2) + msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) + msgHash3 = computeMessageHash(DefaultPubsubTopic, msg3) + msg4 = fakeWakuMessage(contentTopic = DefaultContentTopic) + msgHash4 = computeMessageHash(DefaultPubsubTopic, msg4) + + let prune: PruneCallback = proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} = + let kvs = + @[ + (msgHash1, msg1.timestamp), + (msgHash2, msg2.timestamp), + (msgHash3, msg3.timestamp), + (msgHash4, msg4.timestamp), + ] + return ok((kvs, none(WakuMessageHash))) + + let interval = 1.seconds + + let wakuSync = newTestWakuSync( + switch = switch, + transfer = none(TransferCallback), + prune = some(prune), + interval = interval, + ) + + wakuSync.ingessMessage(DefaultPubsubTopic, msg1) + wakuSync.ingessMessage(DefaultPubsubTopic, msg2) + wakuSync.ingessMessage(DefaultPubsubTopic, msg3) + wakuSync.ingessMessage(DefaultPubsubTopic, msg4) + + await sleepAsync(3.seconds) + + check: + wakuSync.storageSize == 0 + + #asyncTest "transfering missing messages after a sync": + #TODO + suite "Bindings": asyncTest "test c integration": let s1Res = Storage.new() diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 171d1df933..199fba7cab 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -43,6 +43,9 @@ type WakuSync* = ref object of LPProtocol periodicSyncFut: Future[void] periodicPruneFut: Future[void] +proc storageSize*(self: WakuSync): int = + self.storage.len + proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: return From 6e293f244e620eb42bd6e120e4024eea90ac8819 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 9 May 2024 15:41:44 -0400 Subject: [PATCH 4/6] test clean-up --- tests/waku_sync/test_protocol.nim | 81 +++++++++++++------------------ 1 file changed, 33 insertions(+), 48 deletions(-) diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index fb401b4c46..1d97dd5b89 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -13,7 +13,6 @@ from std/os import sleep import ../../waku/[ - common/paging, node/peer_manager, waku_core, waku_core/message/digest, @@ -29,8 +28,6 @@ suite "Waku Sync": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch - var transferCallback {.threadvar.}: TransferCallback - var server {.threadvar.}: WakuSync var client {.threadvar.}: WakuSync @@ -42,16 +39,8 @@ suite "Waku Sync": await allFutures(serverSwitch.start(), clientSwitch.start()) - transferCallback = proc( - hashes: seq[WakuMessageHash], peer: PeerId - ): Future[Result[void, string]] {.async: (raises: []), closure.} = - debug "Received needHashes from peer:", len = hashes.len - for hash in hashes: - trace "Hash received from peer:", hash = hash.to0xHex() - return ok() - - server = newTestWakuSync(serverSwitch, transfer = some(transferCallback)) - client = newTestWakuSync(clientSwitch, transfer = some(transferCallback)) + server = newTestWakuSync(serverSwitch) + client = newTestWakuSync(clientSwitch) serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() @@ -76,7 +65,7 @@ suite "Waku Sync": server.ingessMessage(DefaultPubsubTopic, msg3) var hashes = await client.sync(serverPeerInfo) - #await sleepAsync(1) # to ensure graceful shutdown + require (hashes.isOk()) check: hashes.value.len == 3 @@ -106,18 +95,29 @@ suite "Waku Sync": client.ingessMessage(DefaultPubsubTopic, msg1) server.ingessMessage(DefaultPubsubTopic, msg2) - var hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) + var syncRes = await client.sync(serverPeerInfo) + check: - hashes.value.len == 1 - hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + syncRes.isOk() + + var hashes = syncRes.get() + + check: + hashes.len == 1 + hashes[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer client.ingessMessage(DefaultPubsubTopic, msg2) - sleep(1000) - hashes = await client.sync(serverPeerInfo) - require (hashes.isOk()) + + syncRes = await client.sync(serverPeerInfo) + check: - hashes.value.len == 0 + syncRes.isOk() + + hashes = syncRes.get() + + check: + hashes.len == 0 #[ asyncTest "sync 2 nodes duplicate hashes": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -211,7 +211,7 @@ suite "Waku Sync": ## Setup let client2Switch = newTestSwitch() await client2Switch.start() - let client2 = newTestWakuSync(client2Switch, transfer = some(transferCallback)) + let client2 = newTestWakuSync(client2Switch) let msgCount = 10000 var i = 0 @@ -260,10 +260,10 @@ suite "Waku Sync": ) let - client2 = newTestWakuSync(client2Switch, transfer = some(transferCallback)) - client3 = newTestWakuSync(client3Switch, transfer = some(transferCallback)) - client4 = newTestWakuSync(client4Switch, transfer = some(transferCallback)) - client5 = newTestWakuSync(client5Switch, transfer = some(transferCallback)) + client2 = newTestWakuSync(client2Switch) + client3 = newTestWakuSync(client3Switch) + client4 = newTestWakuSync(client4Switch) + client5 = newTestWakuSync(client5Switch) let msgCount = 100000 var i = 0 @@ -363,18 +363,10 @@ suite "Waku Sync": let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - let transferCallback = proc( - hashes: seq[WakuMessageHash], peer: PeerId - ): Future[Result[void, string]] {.async: (raises: []), closure.} = - debug "Received needHashes from peer:", len = hashes.len - for hash in hashes: - trace "Hash received from peer:", hash = hash.to0xHex() - return ok() - let - node1 = newTestWakuSync(node1Switch, transfer = some(transferCallback)) - node2 = newTestWakuSync(node2Switch, transfer = some(transferCallback)) - node3 = newTestWakuSync(node3Switch, transfer = some(transferCallback)) + node1 = newTestWakuSync(node1Switch) + node2 = newTestWakuSync(node2Switch) + node3 = newTestWakuSync(node3Switch) node1.ingessMessage(DefaultPubsubTopic, msg1) node2.ingessMessage(DefaultPubsubTopic, msg1) @@ -407,7 +399,7 @@ suite "Waku Sync": await allFutures(node1.stop(), node2.stop(), node3.stop()) await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop()) - asyncTest "pruning at interval": + asyncTest "pruning": let switch = newTestSwitch() await switch.start() @@ -438,12 +430,8 @@ suite "Waku Sync": let interval = 1.seconds - let wakuSync = newTestWakuSync( - switch = switch, - transfer = none(TransferCallback), - prune = some(prune), - interval = interval, - ) + let wakuSync = + newTestWakuSync(switch = switch, prune = some(prune), interval = interval) wakuSync.ingessMessage(DefaultPubsubTopic, msg1) wakuSync.ingessMessage(DefaultPubsubTopic, msg2) @@ -455,9 +443,6 @@ suite "Waku Sync": check: wakuSync.storageSize == 0 - #asyncTest "transfering missing messages after a sync": - #TODO - suite "Bindings": asyncTest "test c integration": let s1Res = Storage.new() From 94cc5882747c73be2e63067cde01efb4716d4f29 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 10 May 2024 11:54:27 -0400 Subject: [PATCH 5/6] tests and fixes --- tests/node/test_wakunode_sync.nim | 130 ++++++++++++++++++++++++++++++ waku/node/waku_node.nim | 96 +++++++++++++--------- waku/waku_sync/protocol.nim | 15 ++-- 3 files changed, 199 insertions(+), 42 deletions(-) create mode 100644 tests/node/test_wakunode_sync.nim diff --git a/tests/node/test_wakunode_sync.nim b/tests/node/test_wakunode_sync.nim new file mode 100644 index 0000000000..dcd0dc8bd7 --- /dev/null +++ b/tests/node/test_wakunode_sync.nim @@ -0,0 +1,130 @@ +{.used.} + +import stew/shims/net as stewNet, testutils/unittests, chronos, libp2p/crypto/crypto + +import + ../../../waku/ + [node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync], + ../waku_store/store_utils, + ../waku_archive/archive_utils, + ../testlib/[wakucore, wakunode, testasync] + +suite "Waku Sync - End to End": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + var clientRemotePeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + let timeOrigin = now() + + let messages = + @[ + fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), + fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), + fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), + fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), + fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), + fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), + fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), + fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), + fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), + fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), + ] + + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) + + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() + + await server.mountStore() + await client.mountStore() + + client.mountStoreClient() + server.mountStoreClient() + + let mountServerSync = server.mountWakuSync( + maxFrameSize = 0, + relayJitter = 0.seconds, + syncInterval = 1.hours, + enablePruning = false, + ) + let mountClientSync = client.mountWakuSync( + maxFrameSize = 0, + syncInterval = 1.seconds, + relayJitter = 0.seconds, + enablePruning = false, + ) + + assert mountServerSync.isOk() + assert mountClientSync.isOk() + + for msg in messages: + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + + await allFutures(server.start(), client.start()) + + await sleepAsync(chronos.milliseconds(500)) + + serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec) + + client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec) + server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec) + + asyncTeardown: + await allFutures(client.stop(), server.stop()) + + suite "Store Syncronization": + asyncTest "no message set differences": + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "client message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "server message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 9c6a38f776..8c458abd70 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -197,51 +197,62 @@ proc connectToNodes*( ## Waku Sync -proc mountWakuSync*(node: WakuNode): Result[void, string] = +proc mountWakuSync*( + node: WakuNode, + maxFrameSize: int = DefaultMaxFrameSize, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: Duration = DefaultGossipSubJitter, + enablePruning: bool = true, # For testing purposes +): Result[void, string] = if not node.wakuSync.isNil(): return err("Waku sync already mounted, skipping") - let prune: PruneCallback = proc( - pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] - ): Future[ - Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] - ] {.async: (raises: []), closure.} = - let archiveCursor = - if cursor.isSome(): - some(ArchiveCursor(hash: cursor.get())) - else: - none(ArchiveCursor) + var prune = none(PruneCallback) + + if enablePruning: + prune = some( + proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure.} = + let archiveCursor = + if cursor.isSome(): + some(ArchiveCursor(hash: cursor.get())) + else: + none(ArchiveCursor) + + let query = ArchiveQuery( + cursor: archiveCursor, + startTime: some(pruneStart), + endTime: some(pruneStop), + pageSize: 100, + ) - let query = ArchiveQuery( - cursor: archiveCursor, - startTime: some(pruneStart), - endTime: some(pruneStop), - pageSize: 100, - ) + let catchable = catch: + await node.wakuArchive.findMessages(query) - let catchable = catch: - await node.wakuArchive.findMessages(query) + let res = + if catchable.isErr(): + return err(catchable.error.msg) + else: + catchable.get() - let res = - if catchable.isErr(): - return err(catchable.error.msg) - else: - catchable.get() + let response = res.valueOr: + return err($error) - let response = res.valueOr: - return err($error) + let elements = collect(newSeq): + for (hash, msg) in response.hashes.zip(response.messages): + (hash, msg.timestamp) - let elements = collect(newSeq): - for (hash, msg) in response.hashes.zip(response.messages): - (hash, msg.timestamp) + let cursor = + if response.cursor.isNone(): + none(WakuMessageHash) + else: + some(response.cursor.get().hash) - let cursor = - if response.cursor.isNone(): - none(WakuMessageHash) - else: - some(response.cursor.get().hash) - - return ok((elements, cursor)) + return ok((elements, cursor)) + ) let transfer: TransferCallback = proc( hashes: seq[WakuMessageHash], peerId: PeerId @@ -284,7 +295,12 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = return ok() node.wakuSync = WakuSync.new( - peerManager = node.peerManager, pruneCB = some(prune), transferCB = some(transfer) + peerManager = node.peerManager, + maxFrameSize = maxFrameSize, + syncInterval = syncInterval, + relayJitter = relayJitter, + pruneCB = prune, + transferCB = some(transfer), ) let catchRes = catch: @@ -292,6 +308,9 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = if catchRes.isErr(): return err(catchRes.error.msg) + if node.started: + node.wakuSync.start() + return ok() ## Waku Metadata @@ -1319,6 +1338,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuMetadata.isNil(): node.wakuMetadata.start() + if not node.wakuSync.isNil(): + node.wakuSync.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 199fba7cab..72a0c02313 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[options], + std/options, stew/results, chronicles, chronos, @@ -191,7 +191,7 @@ proc initProtocolHandler(self: WakuSync) = #TODO send error code and desc to client return - if self.transferCallBack.isSome(): + if hashes.len > 0 and self.transferCallBack.isSome(): let callback = self.transferCallBack.get() (await callback(hashes, conn.peerId)).isOkOr: @@ -233,6 +233,8 @@ proc new*( return sync proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = + debug "periodic sync initialized", interval = $self.syncInterval + while true: await sleepAsync(self.syncInterval) @@ -242,13 +244,16 @@ proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} = debug "sync failed", error continue - (await callback(hashes, peer.peerId)).isOkOr: - debug "transfer callback failed", error - continue + if hashes.len > 0: + (await callback(hashes, peer.peerId)).isOkOr: + debug "transfer callback failed", error + continue debug "periodic sync done", hashSynced = hashes.len proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = + debug "periodic prune initialized", interval = $self.syncInterval + await sleepAsync(self.syncInterval) var pruneStop = getNowInNanosecondTime() From 70d32e60f4dc5f5db4113fb19c882aecdd098ea7 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 13 May 2024 09:46:07 -0400 Subject: [PATCH 6/6] prune test, prune offset & fixes --- tests/node/test_wakunode_sync.nim | 144 ++++++++++++++++++++++++------ waku/node/waku_node.nim | 1 + waku/waku_sync/protocol.nim | 13 ++- 3 files changed, 127 insertions(+), 31 deletions(-) diff --git a/tests/node/test_wakunode_sync.nim b/tests/node/test_wakunode_sync.nim index dcd0dc8bd7..8449842d26 100644 --- a/tests/node/test_wakunode_sync.nim +++ b/tests/node/test_wakunode_sync.nim @@ -9,13 +9,10 @@ import ../waku_archive/archive_utils, ../testlib/[wakucore, wakunode, testasync] -suite "Waku Sync - End to End": +suite "Store Sync - End to End": var server {.threadvar.}: WakuNode var client {.threadvar.}: WakuNode - var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo - var clientRemotePeerInfo {.threadvar.}: RemotePeerInfo - asyncSetup: let timeOrigin = now() @@ -79,8 +76,8 @@ suite "Waku Sync - End to End": await sleepAsync(chronos.milliseconds(500)) - serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() - clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() + let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() + let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec) server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec) @@ -91,40 +88,133 @@ suite "Waku Sync - End to End": asyncTeardown: await allFutures(client.stop(), server.stop()) - suite "Store Syncronization": - asyncTest "no message set differences": - check: - client.wakuSync.storageSize() == server.wakuSync.storageSize() + asyncTest "no message set differences": + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "client message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + + asyncTest "server message set differences": + let msg = fakeWakuMessage(@[byte 10]) + + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + check: + client.wakuSync.storageSize() != server.wakuSync.storageSize() + + await sleepAsync(1.seconds) + + check: + client.wakuSync.storageSize() == server.wakuSync.storageSize() + +suite "Waku Sync - Pruning": + var server {.threadvar.}: WakuNode + var client {.threadvar.}: WakuNode + + asyncSetup: + let + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + + server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + let serverArchiveDriver = newSqliteArchiveDriver() + let clientArchiveDriver = newSqliteArchiveDriver() + + let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) + let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) - await sleepAsync(1.seconds) + assert mountServerArchiveRes.isOk() + assert mountClientArchiveRes.isOk() - check: - client.wakuSync.storageSize() == server.wakuSync.storageSize() + await server.mountStore() + await client.mountStore() - asyncTest "client message set differences": - let msg = fakeWakuMessage(@[byte 10]) + client.mountStoreClient() + server.mountStoreClient() + let mountServerSync = server.mountWakuSync( + maxFrameSize = 0, + relayJitter = 0.seconds, + syncInterval = 1.hours, + enablePruning = false, + ) + let mountClientSync = client.mountWakuSync( + maxFrameSize = 0, + syncInterval = 1.seconds, + relayJitter = 0.seconds, + enablePruning = true, + ) + + assert mountServerSync.isOk() + assert mountClientSync.isOk() + + await allFutures(server.start(), client.start()) + + await sleepAsync(1.seconds) + + asyncTeardown: + await allFutures(client.stop(), server.stop()) + + asyncTest "pruning": + for _ in 0 ..< 10: + let msg = fakeWakuMessage() client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) - check: - client.wakuSync.storageSize() != server.wakuSync.storageSize() + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) - await sleepAsync(1.seconds) + await sleepAsync(1.seconds) - check: - client.wakuSync.storageSize() == server.wakuSync.storageSize() + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) - asyncTest "server message set differences": - let msg = fakeWakuMessage(@[byte 10]) + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + await sleepAsync(1.seconds) + + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) - check: - client.wakuSync.storageSize() != server.wakuSync.storageSize() + await sleepAsync(1.seconds) + + for _ in 0 ..< 10: + let msg = fakeWakuMessage() + client.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) + + server.wakuSync.ingessMessage(DefaultPubsubTopic, msg) + await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) - await sleepAsync(1.seconds) + await sleepAsync(1.seconds) - check: - client.wakuSync.storageSize() == server.wakuSync.storageSize() + check: + client.wakuSync.storageSize() == 10 diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 8c458abd70..b7b8d9a7f5 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -223,6 +223,7 @@ proc mountWakuSync*( none(ArchiveCursor) let query = ArchiveQuery( + includeData: true, cursor: archiveCursor, startTime: some(pruneStart), endTime: some(pruneStop), diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 72a0c02313..9a2b2500f0 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -37,8 +37,8 @@ type WakuSync* = ref object of LPProtocol transferCallBack: Option[TransferCallback] # Callback for message transfer. pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query - pruneStart: Timestamp # Last pruning start timestamp - pruneInterval: Duration # Time between each pruning attempt + pruneStart: Timestamp # Last pruning start timestamp + pruneOffset: timer.Duration # Offset to prune a bit more than necessary. periodicSyncFut: Future[void] periodicPruneFut: Future[void] @@ -223,6 +223,7 @@ proc new*( syncInterval: syncInterval, relayJitter: relayJitter, pruneCallBack: pruneCB, + pruneOffset: syncInterval div 2, transferCallBack: transferCB, ) @@ -262,13 +263,17 @@ proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} = await sleepAsync(self.syncInterval) debug "periodic prune started", - startTime = self.pruneStart, endTime = pruneStop, storageSize = self.storage.len + startTime = self.pruneStart - self.pruneOffset.nanos, + endTime = pruneStop, + storageSize = self.storage.len var (elements, cursor) = (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) while true: - (elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr: + (elements, cursor) = ( + await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor) + ).valueOr: debug "pruning callback failed", error break