Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync storage manager #2561

Merged
merged 10 commits into from
Apr 4, 2024
26 changes: 16 additions & 10 deletions waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common
./common,
./storage

logScope:
topics = "waku sync"
Expand All @@ -33,7 +34,7 @@ type
.}

WakuSync* = ref object of LPProtocol
storage: Storage
storageMgr: StorageManager
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: Duration
Expand All @@ -46,14 +47,18 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage)

let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into storage ", hash = msgHash

if self.storage.insert(msg.timestamp, msgHash).isErr():
let storage = storageMgr.retrieveStorage(msg.timestamp).valueOr:
error "failed to ingess message"
if storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash = msgHash.toHex()

proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr:
#Use latest storage to sync??, Need to rethink
let storage = self.storageMgr.getRecentStorage().valueOr:
return err(error)
let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr:
return err(error)

defer:
Expand Down Expand Up @@ -133,8 +138,11 @@ proc sync*(
proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
debug "Server sync session requested", remotePeer = $conn.peerId

let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr:
#TODO: Find matching storage based on sync range and continue??
let storage = self.storageMgr.getRecentStorage().valueOr:
error "could not find latest storage"
return
let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr:
error "Negentropy initialization error", error = error
return

Expand Down Expand Up @@ -176,9 +184,7 @@ proc new*(
syncInterval: Duration = DefaultSyncInterval,
callback: Option[WakuSyncCallback] = none(WakuSyncCallback),
): T =
let storage = Storage.new().valueOr:
error "storage creation failed"
return nil
let storageMgr = StorageManager()

let sync = WakuSync(
storage: storage,
Expand Down
59 changes: 59 additions & 0 deletions waku/waku_sync/storage.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import std/[times, tables], chronicles, chronos, stew/results

import ./raw_bindings

logScope:
topics = "waku sync"

type WakuSyncStorageManager* = object
storages: OrderedTable[string, Storage] # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
maxHours: times.Duration

proc new*(T: type WakuSyncStorageManager, hoursToStore: times.Duration = initDuration(minutes=120)): T =
return WakuSyncStorageManager(maxHours:hoursToStore.inHours)

#Time should be in YYYYMMDDHH format.
proc createStorage(
self: WakuSyncStorageManager, time: string
): Result[Storage, string] =
let storage:Storage = Storage.new().valueOr:
error "storage creation failed"
return err("storage creation failed")
self.storages[time] = storage
return storage
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

proc getRecentStorage(): Result[Storage, string]=


#Time should be in YYYYMMDDHH format.
proc getStorage(self: WakuSyncStorageManager, time: string): Result[Storage, string] =
if self.storages.hasKey(time):
return self.storages[time]
return err("no storage found for given dateTime")
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

proc retrieveStorage*(
self: WakuSyncStorageManager, time: int64
): Result[Storage, string] =
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
let unixf = times.fromUnixFloat(float(time))

let dateTime = times.format(unixf, "YYYYMMDDHH")
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
let storage = self.getStorage(dateTime).valueOr:
#create a new storage
# TODO: May need synchronization??
# Limit number of storages to configured duration
if initDuration(hours = self.storages.len()) == self.maxHours:
#TODO: Need to delete oldest storage at this point, but what if that is being synced?
error "number of storages reached, need to purge oldest"
return err("number of storages reached, need to purge oldest")
return self.createStorage(dateTime)

return storage

proc deleteStorage*(time:string)=
if self.storages.hasKey(time):
self.storages.del(time)
Loading