From e81d7c3d32bdec1736099eae6b34edf3e1e05b0d Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Mon, 6 May 2024 11:14:48 -0400 Subject: [PATCH] feat: pruning storage mehcanism (#2673) --- tests/waku_sync/sync_utils.nim | 10 ++-- tests/waku_sync/test_protocol.nim | 4 +- waku/node/waku_node.nim | 45 +++++++++++++++- waku/waku_sync/common.nim | 33 +++++++----- waku/waku_sync/protocol.nim | 85 +++++++++++++++++++++++-------- 5 files changed, 134 insertions(+), 43 deletions(-) diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index 124c543e1e..0d01265aa1 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -2,16 +2,16 @@ import std/options, chronos, chronicles, libp2p/crypto/crypto -import - ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/[common, wakucore] +import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore proc newTestWakuSync*( - switch: Switch, handler: WakuSyncCallback + switch: Switch, handler: SyncCallback ): Future[WakuSync] {.async.} = - const DefaultFrameSize = 153600 let peerManager = PeerManager.new(switch) - proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, 0, some(handler)) + proto = WakuSync.new( + peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler) + ) assert proto != nil proto.start() diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index be12902060..cd43cde602 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -29,7 +29,7 @@ suite "Waku Sync": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch - var protoHandler {.threadvar.}: WakuSyncCallback + var protoHandler {.threadvar.}: SyncCallback var server {.threadvar.}: WakuSync var client {.threadvar.}: WakuSync @@ -362,7 +362,7 @@ suite "Waku Sync": let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash3 = computeMessageHash(DefaultPubsubTopic, msg3) - let protoHandler: WakuSyncCallback = proc( + let protoHandler: SyncCallback = proc( hashes: seq[WakuMessageHash], peer: RemotePeerInfo ) {.async: (raises: []), closure, gcsafe.} = debug "Received needHashes from peer:", len = hashes.len diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7c2fbad918..22af67b416 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -202,9 +202,50 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] = if not node.wakuSync.isNil(): return err("Waku sync already mounted, skipping") - let sync = WakuSync.new(node.peerManager) #TODO add the callback and the options + let prune: PruneCallback = proc( + pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []), closure, gcsafe.} = + let archiveCursor = + if cursor.isSome(): + some(ArchiveCursor(hash: cursor.get())) + else: + none(ArchiveCursor) + + let query = ArchiveQuery( + cursor: archiveCursor, + startTime: some(pruneStart), + endTime: some(pruneStop), + pageSize: 100, + ) + + let catchable = catch: + await node.wakuArchive.findMessages(query) + + let res = + if catchable.isErr(): + return err(catchable.error.msg) + else: + catchable.get() + + let response = res.valueOr: + return err($error) + + let elements = collect(newSeq): + for (hash, msg) in response.hashes.zip(response.messages): + (hash, msg.timestamp) + + let cursor = + if response.cursor.isNone(): + none(WakuMessageHash) + else: + some(response.cursor.get().hash) + + return ok((elements, cursor)) - node.wakuSync = sync + #TODO add sync callback and options + node.wakuSync = WakuSync.new(peerManager = node.peerManager, pruneCB = some(prune)) let catchRes = catch: node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec)) diff --git a/waku/waku_sync/common.nim b/waku/waku_sync/common.nim index 37502f8050..528d0e1855 100644 --- a/waku/waku_sync/common.nim +++ b/waku/waku_sync/common.nim @@ -6,20 +6,29 @@ else: import std/[options], chronos import ../waku_core -const DefaultSyncInterval*: timer.Duration = Hour -const WakuSyncCodec* = "/vac/waku/sync/1.0.0" -const DefaultFrameSize* = 153600 +const + DefaultSyncInterval*: Duration = 1.hours + DefaultPruneInterval*: Duration = 30.minutes + WakuSyncCodec* = "/vac/waku/sync/1.0.0" + DefaultFrameSize* = 153600 + DefaultGossipSubJitter*: Duration = 20.seconds -type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {. - async: (raises: []), closure -.} +type + SyncCallback* = + proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.async: (raises: []).} -type SyncPayload* = object - rangeStart*: Option[uint64] - rangeEnd*: Option[uint64] + PruneCallback* = proc( + startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash) + ): Future[ + Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] + ] {.async: (raises: []).} - frameSize*: Option[uint64] + SyncPayload* = object + rangeStart*: Option[uint64] + rangeEnd*: Option[uint64] - negentropy*: seq[byte] # negentropy protocol payload + frameSize*: Option[uint64] - hashes*: seq[WakuMessageHash] + negentropy*: seq[byte] # negentropy protocol payload + + hashes*: seq[WakuMessageHash] diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 13632ade97..6ea78736bf 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/[options, times], + std/[options], stew/results, chronicles, chronos, @@ -30,11 +30,15 @@ logScope: type WakuSync* = ref object of LPProtocol storage: Storage # Negentropy protocol storage peerManager: PeerManager - maxFrameSize: int # Not sure if this should be protocol defined or not... + maxFrameSize: int syncInterval: timer.Duration # Time between each syncronisation attempt - relayJitter: int64 # Time delay until all messages are mostly received network wide - callback: Option[WakuSyncCallback] # Callback with the result of the syncronisation + relayJitter: Duration # Time delay until all messages are mostly received network wide + syncCallBack: Option[SyncCallBack] # Callback with the result of the syncronisation + pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query + pruneStart: Timestamp # Last pruning start timestamp + pruneInterval: Duration # Time between each pruning attempt periodicSyncFut: Future[void] + periodicPruneFut: Future[void] proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: @@ -50,13 +54,13 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) if self.storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() -proc calculateRange(relayJitter: int64): (int64, int64) = +proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) = var now = getNowInNanosecondTime() - # Because of message jitter inherent to GossipSub - now -= relayJitter + # Because of message jitter inherent to Relay protocol + now -= jitter.nanos - let range = getNanosecondTime(3600) # 1 hour + let range = syncRange.nanos let start = now - range let `end` = now @@ -68,14 +72,13 @@ proc request( ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = let (start, `end`) = calculateRange(self.relayJitter) - let frameSize = DefaultFrameSize - - let initialized = ?clientInitialize(self.storage, conn, frameSize, start, `end`) + let initialized = + ?clientInitialize(self.storage, conn, self.maxFrameSize, start, `end`) debug "sync session initialized", client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId, - frameSize = frameSize, + frameSize = self.maxFrameSize, timeStart = start, timeEnd = `end` @@ -147,9 +150,8 @@ proc handleLoop( ): Future[Result[seq[WakuMessageHash], string]] {.async.} = let (start, `end`) = calculateRange(self.relayJitter) - let frameSize = DefaultFrameSize - - let initialized = ?serverInitialize(self.storage, conn, frameSize, start, `end`) + let initialized = + ?serverInitialize(self.storage, conn, self.maxFrameSize, start, `end`) var sent = initialized @@ -202,8 +204,11 @@ proc new*( peerManager: PeerManager, maxFrameSize: int = DefaultFrameSize, syncInterval: timer.Duration = DefaultSyncInterval, - relayJitter: int64 = 20, #Default gossipsub jitter in network. - callback: Option[WakuSyncCallback] = none(WakuSyncCallback), + relayJitter: Duration = DefaultGossipSubJitter, + #Default gossipsub jitter in network. + syncCB: Option[SyncCallback] = none(SyncCallback), + pruneInterval: Duration = DefaultPruneInterval, + pruneCB: Option[PruneCallBack] = none(PruneCallback), ): T = let storage = Storage.new().valueOr: debug "storage creation failed" @@ -214,8 +219,11 @@ proc new*( peerManager: peerManager, maxFrameSize: maxFrameSize, syncInterval: syncInterval, - callback: callback, relayJitter: relayJitter, + syncCallBack: syncCB, + pruneCallBack: pruneCB, + pruneStart: getNowInNanosecondTime(), + pruneInterval: pruneInterval, ) sync.initProtocolHandler() @@ -229,20 +237,53 @@ proc periodicSync(self: WakuSync) {.async.} = await sleepAsync(self.syncInterval) let (hashes, peer) = (await self.sync()).valueOr: - debug "periodic sync error", error = error + debug "periodic sync failed", error = error continue - let callback = self.callback.valueOr: + let callback = self.syncCallBack.valueOr: continue await callback(hashes, peer) +proc periodicPrune(self: WakuSync) {.async.} = + let callback = self.pruneCallBack.valueOr: + return + + while true: + await sleepAsync(self.pruneInterval) + + let pruneStop = getNowInNanosecondTime() + + var (elements, cursor) = + (newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash)) + + while true: + (elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr: + debug "storage pruning failed", error = $error + break + + if elements.len == 0: + break + + for (hash, timestamp) in elements: + self.storage.erase(timestamp, hash).isOkOr: + trace "element pruning failed", time = timestamp, hash = hash, error = error + continue + + if cursor.isNone(): + break + + self.pruneStart = pruneStop + proc start*(self: WakuSync) = self.started = true - if self.syncInterval > ZeroDuration: - # start periodic-sync only if interval is set. + + if self.syncCallBack.isSome() and self.syncInterval > ZeroDuration: self.periodicSyncFut = self.periodicSync() + if self.pruneCallBack.isSome() and self.pruneInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune() + info "WakuSync protocol started" proc stopWait*(self: WakuSync) {.async.} =