Skip to content

Commit

Permalink
feat: pruning storage mehcanism (#2673)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Aug 13, 2024
1 parent ade92d7 commit e81d7c3
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 43 deletions.
10 changes: 5 additions & 5 deletions tests/waku_sync/sync_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

import std/options, chronos, chronicles, libp2p/crypto/crypto

import
../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/[common, wakucore]
import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore

proc newTestWakuSync*(
switch: Switch, handler: WakuSyncCallback
switch: Switch, handler: SyncCallback
): Future[WakuSync] {.async.} =
const DefaultFrameSize = 153600
let
peerManager = PeerManager.new(switch)
proto = WakuSync.new(peerManager, DefaultFrameSize, 0.seconds, 0, some(handler))
proto = WakuSync.new(
peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler)
)
assert proto != nil

proto.start()
Expand Down
4 changes: 2 additions & 2 deletions tests/waku_sync/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suite "Waku Sync":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch

var protoHandler {.threadvar.}: WakuSyncCallback
var protoHandler {.threadvar.}: SyncCallback

var server {.threadvar.}: WakuSync
var client {.threadvar.}: WakuSync
Expand Down Expand Up @@ -362,7 +362,7 @@ suite "Waku Sync":
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash3 = computeMessageHash(DefaultPubsubTopic, msg3)

let protoHandler: WakuSyncCallback = proc(
let protoHandler: SyncCallback = proc(
hashes: seq[WakuMessageHash], peer: RemotePeerInfo
) {.async: (raises: []), closure, gcsafe.} =
debug "Received needHashes from peer:", len = hashes.len
Expand Down
45 changes: 43 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,50 @@ proc mountWakuSync*(node: WakuNode): Result[void, string] =
if not node.wakuSync.isNil():
return err("Waku sync already mounted, skipping")

let sync = WakuSync.new(node.peerManager) #TODO add the callback and the options
let prune: PruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure, gcsafe.} =
let archiveCursor =
if cursor.isSome():
some(ArchiveCursor(hash: cursor.get()))
else:
none(ArchiveCursor)

let query = ArchiveQuery(
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)

let catchable = catch:
await node.wakuArchive.findMessages(query)

let res =
if catchable.isErr():
return err(catchable.error.msg)
else:
catchable.get()

let response = res.valueOr:
return err($error)

let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)

let cursor =
if response.cursor.isNone():
none(WakuMessageHash)
else:
some(response.cursor.get().hash)

return ok((elements, cursor))

node.wakuSync = sync
#TODO add sync callback and options
node.wakuSync = WakuSync.new(peerManager = node.peerManager, pruneCB = some(prune))

let catchRes = catch:
node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec))
Expand Down
33 changes: 21 additions & 12 deletions waku/waku_sync/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,29 @@ else:
import std/[options], chronos
import ../waku_core

const DefaultSyncInterval*: timer.Duration = Hour
const WakuSyncCodec* = "/vac/waku/sync/1.0.0"
const DefaultFrameSize* = 153600
const
DefaultSyncInterval*: Duration = 1.hours
DefaultPruneInterval*: Duration = 30.minutes
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
DefaultFrameSize* = 153600
DefaultGossipSubJitter*: Duration = 20.seconds

type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.
async: (raises: []), closure
.}
type
SyncCallback* =
proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.async: (raises: []).}

type SyncPayload* = object
rangeStart*: Option[uint64]
rangeEnd*: Option[uint64]
PruneCallback* = proc(
startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash)
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []).}

frameSize*: Option[uint64]
SyncPayload* = object
rangeStart*: Option[uint64]
rangeEnd*: Option[uint64]

negentropy*: seq[byte] # negentropy protocol payload
frameSize*: Option[uint64]

hashes*: seq[WakuMessageHash]
negentropy*: seq[byte] # negentropy protocol payload

hashes*: seq[WakuMessageHash]
85 changes: 63 additions & 22 deletions waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[options, times],
std/[options],
stew/results,
chronicles,
chronos,
Expand All @@ -30,11 +30,15 @@ logScope:
type WakuSync* = ref object of LPProtocol
storage: Storage # Negentropy protocol storage
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
maxFrameSize: int
syncInterval: timer.Duration # Time between each syncronisation attempt
relayJitter: int64 # Time delay until all messages are mostly received network wide
callback: Option[WakuSyncCallback] # Callback with the result of the syncronisation
relayJitter: Duration # Time delay until all messages are mostly received network wide
syncCallBack: Option[SyncCallBack] # Callback with the result of the syncronisation
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneInterval: Duration # Time between each pruning attempt
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]

proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
Expand All @@ -50,13 +54,13 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage)
if self.storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash = msgHash.toHex()

proc calculateRange(relayJitter: int64): (int64, int64) =
proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) =
var now = getNowInNanosecondTime()

# Because of message jitter inherent to GossipSub
now -= relayJitter
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos

let range = getNanosecondTime(3600) # 1 hour
let range = syncRange.nanos

let start = now - range
let `end` = now
Expand All @@ -68,14 +72,13 @@ proc request(
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let (start, `end`) = calculateRange(self.relayJitter)

let frameSize = DefaultFrameSize

let initialized = ?clientInitialize(self.storage, conn, frameSize, start, `end`)
let initialized =
?clientInitialize(self.storage, conn, self.maxFrameSize, start, `end`)

debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
frameSize = frameSize,
frameSize = self.maxFrameSize,
timeStart = start,
timeEnd = `end`

Expand Down Expand Up @@ -147,9 +150,8 @@ proc handleLoop(
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (start, `end`) = calculateRange(self.relayJitter)

let frameSize = DefaultFrameSize

let initialized = ?serverInitialize(self.storage, conn, frameSize, start, `end`)
let initialized =
?serverInitialize(self.storage, conn, self.maxFrameSize, start, `end`)

var sent = initialized

Expand Down Expand Up @@ -202,8 +204,11 @@ proc new*(
peerManager: PeerManager,
maxFrameSize: int = DefaultFrameSize,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: int64 = 20, #Default gossipsub jitter in network.
callback: Option[WakuSyncCallback] = none(WakuSyncCallback),
relayJitter: Duration = DefaultGossipSubJitter,
#Default gossipsub jitter in network.
syncCB: Option[SyncCallback] = none(SyncCallback),
pruneInterval: Duration = DefaultPruneInterval,
pruneCB: Option[PruneCallBack] = none(PruneCallback),
): T =
let storage = Storage.new().valueOr:
debug "storage creation failed"
Expand All @@ -214,8 +219,11 @@ proc new*(
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
callback: callback,
relayJitter: relayJitter,
syncCallBack: syncCB,
pruneCallBack: pruneCB,
pruneStart: getNowInNanosecondTime(),
pruneInterval: pruneInterval,
)

sync.initProtocolHandler()
Expand All @@ -229,20 +237,53 @@ proc periodicSync(self: WakuSync) {.async.} =
await sleepAsync(self.syncInterval)

let (hashes, peer) = (await self.sync()).valueOr:
debug "periodic sync error", error = error
debug "periodic sync failed", error = error
continue

let callback = self.callback.valueOr:
let callback = self.syncCallBack.valueOr:
continue

await callback(hashes, peer)

proc periodicPrune(self: WakuSync) {.async.} =
let callback = self.pruneCallBack.valueOr:
return

while true:
await sleepAsync(self.pruneInterval)

let pruneStop = getNowInNanosecondTime()

var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))

while true:
(elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr:
debug "storage pruning failed", error = $error
break

if elements.len == 0:
break

for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
trace "element pruning failed", time = timestamp, hash = hash, error = error
continue

if cursor.isNone():
break

self.pruneStart = pruneStop

proc start*(self: WakuSync) =
self.started = true
if self.syncInterval > ZeroDuration:
# start periodic-sync only if interval is set.

if self.syncCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync()

if self.pruneCallBack.isSome() and self.pruneInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune()

info "WakuSync protocol started"

proc stopWait*(self: WakuSync) {.async.} =
Expand Down

0 comments on commit e81d7c3

Please sign in to comment.