From 7e0ee54fa35eafac7735d037a854b87823044134 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 28 Mar 2024 17:04:18 +0530 Subject: [PATCH 01/10] feat: draft changes to use a storage manager for sync --- waku/waku_sync/protocol.nim | 26 +++++++++------- waku/waku_sync/storage.nim | 59 +++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 waku/waku_sync/storage.nim diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 45771cb2e9..1b85a1b31a 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -20,7 +20,8 @@ import ../waku_enr, ../node/peer_manager/peer_manager, ./raw_bindings, - ./common + ./common, + ./storage logScope: topics = "waku sync" @@ -33,7 +34,7 @@ type .} WakuSync* = ref object of LPProtocol - storage: Storage + storageMgr: StorageManager peerManager: PeerManager maxFrameSize: int # Not sure if this should be protocol defined or not... syncInterval: Duration @@ -46,14 +47,18 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) trace "inserting message into storage ", hash = msgHash - - if self.storage.insert(msg.timestamp, msgHash).isErr(): + let storage = storageMgr.retrieveStorage(msg.timestamp).valueOr: + error "failed to ingess message" + if storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() proc request( self: WakuSync, conn: Connection ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr: + #Use latest storage to sync??, Need to rethink + let storage = self.storageMgr.getRecentStorage().valueOr: + return err(error) + let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: return err(error) defer: @@ -133,8 +138,11 @@ proc sync*( proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = debug "Server sync session requested", remotePeer = $conn.peerId - - let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr: + #TODO: Find matching storage based on sync range and continue?? + let storage = self.storageMgr.getRecentStorage().valueOr: + error "could not find latest storage" + return + let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: error "Negentropy initialization error", error = error return @@ -176,9 +184,7 @@ proc new*( syncInterval: Duration = DefaultSyncInterval, callback: Option[WakuSyncCallback] = none(WakuSyncCallback), ): T = - let storage = Storage.new().valueOr: - error "storage creation failed" - return nil + let storageMgr = StorageManager() let sync = WakuSync( storage: storage, diff --git a/waku/waku_sync/storage.nim b/waku/waku_sync/storage.nim new file mode 100644 index 0000000000..a70510cad8 --- /dev/null +++ b/waku/waku_sync/storage.nim @@ -0,0 +1,59 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[times, tables], chronicles, chronos, stew/results + +import ./raw_bindings + +logScope: + topics = "waku sync" + +type WakuSyncStorageManager* = object + storages: OrderedTable[string, Storage] # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH + maxHours: times.Duration + +proc new*(T: type WakuSyncStorageManager, hoursToStore: times.Duration = initDuration(minutes=120)): T = + return WakuSyncStorageManager(maxHours:hoursToStore.inHours) + +#Time should be in YYYYMMDDHH format. +proc createStorage( + self: WakuSyncStorageManager, time: string +): Result[Storage, string] = + let storage:Storage = Storage.new().valueOr: + error "storage creation failed" + return err("storage creation failed") + self.storages[time] = storage + return storage + +proc getRecentStorage(): Result[Storage, string]= + + +#Time should be in YYYYMMDDHH format. +proc getStorage(self: WakuSyncStorageManager, time: string): Result[Storage, string] = + if self.storages.hasKey(time): + return self.storages[time] + return err("no storage found for given dateTime") + +proc retrieveStorage*( + self: WakuSyncStorageManager, time: int64 +): Result[Storage, string] = + let unixf = times.fromUnixFloat(float(time)) + + let dateTime = times.format(unixf, "YYYYMMDDHH") + let storage = self.getStorage(dateTime).valueOr: + #create a new storage + # TODO: May need synchronization?? + # Limit number of storages to configured duration + if initDuration(hours = self.storages.len()) == self.maxHours: + #TODO: Need to delete oldest storage at this point, but what if that is being synced? + error "number of storages reached, need to purge oldest" + return err("number of storages reached, need to purge oldest") + return self.createStorage(dateTime) + + return storage + +proc deleteStorage*(time:string)= + if self.storages.hasKey(time): + self.storages.del(time) From 1d978aa07aeb8367a120095a4ce1b3cfc755440b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 29 Mar 2024 13:50:48 +0530 Subject: [PATCH 02/10] chore: address comments and resolve compilation issues --- waku/waku_sync/protocol.nim | 44 ++++++++++++------ waku/waku_sync/raw_bindings.nim | 2 +- waku/waku_sync/storage.nim | 59 ------------------------ waku/waku_sync/storage_manager.nim | 73 ++++++++++++++++++++++++++++++ 4 files changed, 103 insertions(+), 75 deletions(-) delete mode 100644 waku/waku_sync/storage.nim create mode 100644 waku/waku_sync/storage_manager.nim diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 1b85a1b31a..a7eeadd5f6 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/options, + std/[options, times], stew/results, chronicles, chronos, @@ -21,12 +21,13 @@ import ../node/peer_manager/peer_manager, ./raw_bindings, ./common, - ./storage + ./storage_manager logScope: topics = "waku sync" -const DefaultSyncInterval = 60.minutes +const DefaultSyncInterval: timer.Duration = Hour +const DefaultFrameSize = 153600 type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {. @@ -34,10 +35,10 @@ type .} WakuSync* = ref object of LPProtocol - storageMgr: StorageManager + storageMgr: WakuSyncStorageManager peerManager: PeerManager maxFrameSize: int # Not sure if this should be protocol defined or not... - syncInterval: Duration + syncInterval: timer.Duration callback: Option[WakuSyncCallback] periodicSyncFut: Future[void] @@ -47,8 +48,12 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) trace "inserting message into storage ", hash = msgHash - let storage = storageMgr.retrieveStorage(msg.timestamp).valueOr: - error "failed to ingess message" + let storageOpt = self.storageMgr.retrieveStorage(msg.timestamp).valueOr: + error "failed to ingess message as could not retrieve storage" + return + let storage = storageOpt.valueOr: + error "failed to ingess message as could not retrieve storage" + return if storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() @@ -56,8 +61,11 @@ proc request( self: WakuSync, conn: Connection ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = #Use latest storage to sync??, Need to rethink - let storage = self.storageMgr.getRecentStorage().valueOr: + let storageOpt = self.storageMgr.retrieveStorage(times.getTime().toUnix()).valueOr: return err(error) + let storage = storageOpt.valueOr: + error "failed to handle request as could not retrieve recent storage" + return let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: return err(error) @@ -139,7 +147,11 @@ proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = debug "Server sync session requested", remotePeer = $conn.peerId #TODO: Find matching storage based on sync range and continue?? - let storage = self.storageMgr.getRecentStorage().valueOr: + #TODO: Return error rather than closing stream abruptly? + let storageOpt = self.storageMgr.getRecentStorage().valueOr: + error "could not find latest storage" + return + let storage = storageOpt.valueOr: error "could not find latest storage" return let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: @@ -181,13 +193,13 @@ proc new*( T: type WakuSync, peerManager: PeerManager, maxFrameSize: int = DefaultFrameSize, - syncInterval: Duration = DefaultSyncInterval, + syncInterval: timer.Duration = DefaultSyncInterval, callback: Option[WakuSyncCallback] = none(WakuSyncCallback), ): T = - let storageMgr = StorageManager() + let storageMgr = WakuSyncStorageManager.new() let sync = WakuSync( - storage: storage, + storageMgr: storageMgr, peerManager: peerManager, maxFrameSize: maxFrameSize, syncInterval: syncInterval, @@ -215,11 +227,13 @@ proc periodicSync(self: WakuSync) {.async.} = proc start*(self: WakuSync) = self.started = true - if self.syncInterval > 0.seconds: # start periodic-sync only if interval is set. + if self.syncInterval > ZeroDuration: + # start periodic-sync only if interval is set. self.periodicSyncFut = self.periodicSync() proc stopWait*(self: WakuSync) {.async.} = await self.periodicSyncFut.cancelAndWait() -proc storageSize*(self: WakuSync): int = - return self.storage.size() +#[ TODO:Fetch from storageManager?? + proc storageSize*(self: WakuSync): int = + return self.storage.size() ]# diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim index 7e505df930..28d1006a44 100644 --- a/waku/waku_sync/raw_bindings.nim +++ b/waku/waku_sync/raw_bindings.nim @@ -5,7 +5,7 @@ else: from os import DirSep -import std/[strutils], chronicles, std/options, stew/[results, byteutils], confutils +import std/[strutils], chronicles, std/options, stew/results, confutils import ../waku_core/message const negentropyPath = diff --git a/waku/waku_sync/storage.nim b/waku/waku_sync/storage.nim deleted file mode 100644 index a70510cad8..0000000000 --- a/waku/waku_sync/storage.nim +++ /dev/null @@ -1,59 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import std/[times, tables], chronicles, chronos, stew/results - -import ./raw_bindings - -logScope: - topics = "waku sync" - -type WakuSyncStorageManager* = object - storages: OrderedTable[string, Storage] # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH - maxHours: times.Duration - -proc new*(T: type WakuSyncStorageManager, hoursToStore: times.Duration = initDuration(minutes=120)): T = - return WakuSyncStorageManager(maxHours:hoursToStore.inHours) - -#Time should be in YYYYMMDDHH format. -proc createStorage( - self: WakuSyncStorageManager, time: string -): Result[Storage, string] = - let storage:Storage = Storage.new().valueOr: - error "storage creation failed" - return err("storage creation failed") - self.storages[time] = storage - return storage - -proc getRecentStorage(): Result[Storage, string]= - - -#Time should be in YYYYMMDDHH format. -proc getStorage(self: WakuSyncStorageManager, time: string): Result[Storage, string] = - if self.storages.hasKey(time): - return self.storages[time] - return err("no storage found for given dateTime") - -proc retrieveStorage*( - self: WakuSyncStorageManager, time: int64 -): Result[Storage, string] = - let unixf = times.fromUnixFloat(float(time)) - - let dateTime = times.format(unixf, "YYYYMMDDHH") - let storage = self.getStorage(dateTime).valueOr: - #create a new storage - # TODO: May need synchronization?? - # Limit number of storages to configured duration - if initDuration(hours = self.storages.len()) == self.maxHours: - #TODO: Need to delete oldest storage at this point, but what if that is being synced? - error "number of storages reached, need to purge oldest" - return err("number of storages reached, need to purge oldest") - return self.createStorage(dateTime) - - return storage - -proc deleteStorage*(time:string)= - if self.storages.hasKey(time): - self.storages.del(time) diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim new file mode 100644 index 0000000000..a71b840e21 --- /dev/null +++ b/waku/waku_sync/storage_manager.nim @@ -0,0 +1,73 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[times, tables, options], chronicles, chronos, stew/results + +import ./raw_bindings + +logScope: + topics = "waku sync" + +type WakuSyncStorageManager* = ref object + storages: OrderedTable[string, Storage] + # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH + maxHours: int64 + +proc new*( + T: type WakuSyncStorageManager, + hoursToStore: times.Duration = initDuration(minutes = 120), +): T = + return WakuSyncStorageManager(maxHours: hoursToStore.inHours) + +proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] = + if self.storages.len() == 0: + return ok(none(Storage)) + var storageToFetch: Storage + #is there a more effective way to fetch last element? + for k, storage in self.storages: + storageToFetch = storage + + return ok(some(storageToFetch)) + +proc deleteStorage*(self: WakuSyncStorageManager, time: string) = + var storageToDelete: Storage + + if self.storages.pop(time, storageToDelete): + delete(storageToDelete) + +proc deleteOldestStorage*(self: WakuSyncStorageManager) = + var storageToDelete: Storage + var time: string + #is there a more effective way to fetch first element? + for k, storage in self.storages: + storageToDelete = storage + time = k + break + + if self.storages.pop(time, storageToDelete): + delete(storageToDelete) + +proc retrieveStorage*( + self: WakuSyncStorageManager, time: int64 +): Result[Option[Storage], string] = + let unixf = times.fromUnixFloat(float(time)) + + let dateTime = times.format(unixf, "yyyyMMddHH") + var storage: Storage = self.storages.getOrDefault(dateTime) + if storage == nil: + #create a new storage + # TODO: May need synchronization?? + # Limit number of storages to configured duration + let hours = self.storages.len() + if hours == self.maxHours: + #Need to delete oldest storage at this point, but what if that is being synced? + self.deleteOldestStorage() + info "number of storages reached, deleting the oldest" + storage = Storage.new().valueOr: + error "storage creation failed" + return err(error) + self.storages[dateTime] = storage + + return ok(some(storage)) From 068a628b2ea2ddac85f9671e25c4becb24680413 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 29 Mar 2024 13:52:40 +0530 Subject: [PATCH 03/10] chore: fixing test in progress --- tests/waku_sync/test_protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index 8e040e055e..ac38df6c7e 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -260,7 +260,7 @@ suite "Waku Sync": client5.ingessMessage(DefaultPubsubTopic, msg) server.ingessMessage(DefaultPubsubTopic, msg) i = i + 1 - info "client2 storage size", size = client2.storageSize() + #info "client2 storage size", size = client2.storageSize() var timeBefore = cpuTime() let hashes1 = await client.sync(serverPeerInfo) From 8953744a4bbae1079194edfb07f00125d1dafcbf Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 1 Apr 2024 16:11:00 +0530 Subject: [PATCH 04/10] chore: in progress time based storage creation --- tests/waku_sync/test_protocol.nim | 21 +++++++++++++++++++++ waku/waku_core/time.nim | 12 ++++++++++++ waku/waku_sync/protocol.nim | 9 ++++++--- waku/waku_sync/storage_manager.nim | 10 ++++++---- 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index ac38df6c7e..2c24fe84ca 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -118,6 +118,27 @@ suite "Waku Sync": check: hashes.value.len == 0 + #[ asyncTest "sync 2 nodes duplicate hashes": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + server.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + + var hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 1 + #hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer + client.ingessMessage(DefaultPubsubTopic, msg2) + sleep(1000) + hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 0 ]# asyncTest "sync 2 nodes same hashes": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) diff --git a/waku/waku_core/time.nim b/waku/waku_core/time.nim index b1396015c4..ccf20ecd54 100644 --- a/waku/waku_core/time.nim +++ b/waku/waku_core/time.nim @@ -32,3 +32,15 @@ template nanosecondTime*(collector: Gauge, body: untyped) = metrics.set(collector, nowInUnixFloat() - start) else: body + +proc timestampInSeconds*(time: Timestamp): Timestamp = + let timeStr = $time + var timestamp: Timestamp + + if timeStr.len() > 16: + timestamp = Timestamp(time div Timestamp(1_000_000_000)) + elif timeStr.len() < 16 and timeStr.len() > 13: + timestamp = Timestamp(time div Timestamp(1_000_000)) + elif timeStr.len() > 10: + timestamp = Timestamp(time div Timestamp(1000)) + return timestamp diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index a7eeadd5f6..2c9bae632a 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -45,15 +45,18 @@ type proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: return - + #TODO: Do we need to check if this message has already been ingessed? + # because what if messages is received via gossip and sync as well? + # Might 2 entries to be inserted into storage which is inefficient. let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) - trace "inserting message into storage ", hash = msgHash let storageOpt = self.storageMgr.retrieveStorage(msg.timestamp).valueOr: error "failed to ingess message as could not retrieve storage" return let storage = storageOpt.valueOr: error "failed to ingess message as could not retrieve storage" return + info "inserting message into storage ", hash = msgHash, timestamp = msg.timestamp + if storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() @@ -148,7 +151,7 @@ proc initProtocolHandler(self: WakuSync) = debug "Server sync session requested", remotePeer = $conn.peerId #TODO: Find matching storage based on sync range and continue?? #TODO: Return error rather than closing stream abruptly? - let storageOpt = self.storageMgr.getRecentStorage().valueOr: + let storageOpt = self.storageMgr.retrieveStorage(getNowInNanosecondTime()).valueOr: error "could not find latest storage" return let storage = storageOpt.valueOr: diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim index a71b840e21..03010dbfcc 100644 --- a/waku/waku_sync/storage_manager.nim +++ b/waku/waku_sync/storage_manager.nim @@ -5,7 +5,7 @@ else: import std/[times, tables, options], chronicles, chronos, stew/results -import ./raw_bindings +import ./raw_bindings, ../waku_core/time logScope: topics = "waku sync" @@ -50,11 +50,12 @@ proc deleteOldestStorage*(self: WakuSyncStorageManager) = delete(storageToDelete) proc retrieveStorage*( - self: WakuSyncStorageManager, time: int64 + self: WakuSyncStorageManager, time: Timestamp ): Result[Option[Storage], string] = - let unixf = times.fromUnixFloat(float(time)) + var timestamp: Timestamp = timestampInSeconds(time) + let tsTime = times.fromUnix(timestamp) + let dateTime = times.format(tsTime, "yyyyMMddHH", utc()) - let dateTime = times.format(unixf, "yyyyMMddHH") var storage: Storage = self.storages.getOrDefault(dateTime) if storage == nil: #create a new storage @@ -65,6 +66,7 @@ proc retrieveStorage*( #Need to delete oldest storage at this point, but what if that is being synced? self.deleteOldestStorage() info "number of storages reached, deleting the oldest" + info "creating a new storage for ", time = dateTime storage = Storage.new().valueOr: error "storage creation failed" return err(error) From 84e3923d0a6eac59814e8b37a816a4d11cf054b5 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 1 Apr 2024 16:25:59 +0530 Subject: [PATCH 05/10] fix: handle server not having storage --- waku/waku_core/time.nim | 2 +- waku/waku_sync/protocol.nim | 2 +- waku/waku_sync/storage_manager.nim | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/waku/waku_core/time.nim b/waku/waku_core/time.nim index ccf20ecd54..5bbd5bcdc3 100644 --- a/waku/waku_core/time.nim +++ b/waku/waku_core/time.nim @@ -35,7 +35,7 @@ template nanosecondTime*(collector: Gauge, body: untyped) = proc timestampInSeconds*(time: Timestamp): Timestamp = let timeStr = $time - var timestamp: Timestamp + var timestamp: Timestamp = time if timeStr.len() > 16: timestamp = Timestamp(time div Timestamp(1_000_000_000)) diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 2c9bae632a..ef9b1e00f2 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -151,7 +151,7 @@ proc initProtocolHandler(self: WakuSync) = debug "Server sync session requested", remotePeer = $conn.peerId #TODO: Find matching storage based on sync range and continue?? #TODO: Return error rather than closing stream abruptly? - let storageOpt = self.storageMgr.retrieveStorage(getNowInNanosecondTime()).valueOr: + let storageOpt = self.storageMgr.retrieveStorage(0).valueOr: error "could not find latest storage" return let storage = storageOpt.valueOr: diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim index 03010dbfcc..c0167b143a 100644 --- a/waku/waku_sync/storage_manager.nim +++ b/waku/waku_sync/storage_manager.nim @@ -52,7 +52,12 @@ proc deleteOldestStorage*(self: WakuSyncStorageManager) = proc retrieveStorage*( self: WakuSyncStorageManager, time: Timestamp ): Result[Option[Storage], string] = - var timestamp: Timestamp = timestampInSeconds(time) + var timestamp: Timestamp + if time == 0: + timestamp = timestampInSeconds(getNowInNanosecondTime()) + debug "timestamp not provided, using now to fetch storage", timestamp = timestamp + else: + timestamp = timestampInSeconds(time) let tsTime = times.fromUnix(timestamp) let dateTime = times.format(tsTime, "yyyyMMddHH", utc()) From e2247eee6be10c0f1c7c7522cf614f509146d3ba Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 2 Apr 2024 10:37:01 +0530 Subject: [PATCH 06/10] chore: remove unused methods --- tests/waku_sync/sync_utils.nim | 22 ++++++---------------- waku/waku_sync/storage_manager.nim | 6 ------ 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index 64f31cb498..c0d88b5fb8 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -1,23 +1,13 @@ {.used.} -import - std/options, - chronos, - chronicles, - libp2p/crypto/crypto +import std/options, chronos, chronicles, libp2p/crypto/crypto import - ../../../waku/[ - node/peer_manager, - waku_core, - waku_sync, - ], - ../testlib/[ - common, - wakucore - ] + ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/[common, wakucore] -proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = +proc newTestWakuSync*( + switch: Switch, handler: WakuSyncCallback +): Future[WakuSync] {.async.} = const DefaultFrameSize = 153600 let peerManager = PeerManager.new(switch) @@ -27,4 +17,4 @@ proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSyn proto.start() switch.mount(proto) - return proto \ No newline at end of file + return proto diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim index c0167b143a..64fea97842 100644 --- a/waku/waku_sync/storage_manager.nim +++ b/waku/waku_sync/storage_manager.nim @@ -31,12 +31,6 @@ proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], st return ok(some(storageToFetch)) -proc deleteStorage*(self: WakuSyncStorageManager, time: string) = - var storageToDelete: Storage - - if self.storages.pop(time, storageToDelete): - delete(storageToDelete) - proc deleteOldestStorage*(self: WakuSyncStorageManager) = var storageToDelete: Storage var time: string From ce3256e232c5e57bb3490e44a0695fb218363f28 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 2 Apr 2024 10:41:22 +0530 Subject: [PATCH 07/10] fix: import for trace logging --- waku/waku_sync/raw_bindings.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_sync/raw_bindings.nim b/waku/waku_sync/raw_bindings.nim index 28d1006a44..7e505df930 100644 --- a/waku/waku_sync/raw_bindings.nim +++ b/waku/waku_sync/raw_bindings.nim @@ -5,7 +5,7 @@ else: from os import DirSep -import std/[strutils], chronicles, std/options, stew/results, confutils +import std/[strutils], chronicles, std/options, stew/[results, byteutils], confutils import ../waku_core/message const negentropyPath = From 4b8b7ecfda8ca9755eb2290360c7c0d1a9ff9d29 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 3 Apr 2024 12:42:50 +0530 Subject: [PATCH 08/10] fix: sync exports --- waku/waku_sync.nim | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim index dc60abeaee..55e6f80eee 100644 --- a/waku/waku_sync.nim +++ b/waku/waku_sync.nim @@ -3,8 +3,6 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import - ./waku_sync/protocol +import ./waku_sync/protocol, ./waku_sync/common -export - protocol +export common, protocol From 77fc6e9ab086ccfd2ed0db28e1ca6976b7558be4 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 3 Apr 2024 21:21:12 +0530 Subject: [PATCH 09/10] fix: duplicate invocation of dial due to a bug in valueOr https://github.com/vacp2p/nim-libp2p/pull/1079 --- tests/waku_sync/test_protocol.nim | 2 +- waku/waku_sync/protocol.nim | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index 2c24fe84ca..f92d193304 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -12,7 +12,7 @@ import from std/os import sleep import - ../../../waku/[ + ../../waku/[ common/paging, node/peer_manager, waku_core, diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index ef9b1e00f2..69a8dd9916 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -126,8 +126,8 @@ proc sync*( ): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} = let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: return err("No suitable peer found for sync") - - let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + let conn: Connection = connOpt.valueOr: return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: @@ -138,7 +138,8 @@ proc sync*( proc sync*( self: WakuSync, peer: RemotePeerInfo ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + let conn: Connection = connOpt.valueOr: return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: From 08d908741fffe9b366c25f8328c59017cfaf0d93 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 4 Apr 2024 13:34:31 +0530 Subject: [PATCH 10/10] chore: refactor and introduce sync session --- waku/waku_sync/protocol.nim | 112 +++++----------------------- waku/waku_sync/session.nim | 142 ++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 93 deletions(-) create mode 100644 waku/waku_sync/session.nim diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 69a8dd9916..de46e329e0 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -21,6 +21,7 @@ import ../node/peer_manager/peer_manager, ./raw_bindings, ./common, + ./session, ./storage_manager logScope: @@ -63,63 +64,16 @@ proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) proc request( self: WakuSync, conn: Connection ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - #Use latest storage to sync??, Need to rethink - let storageOpt = self.storageMgr.retrieveStorage(times.getTime().toUnix()).valueOr: - return err(error) - let storage = storageOpt.valueOr: - error "failed to handle request as could not retrieve recent storage" - return - let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: - return err(error) - - defer: - negentropy.delete() - - let payload = negentropy.initiate().valueOr: + let syncSession = SyncSession( + sessType: SyncSessionType.CLIENT, + curState: SyncSessionState.INIT, + frameSize: DefaultFrameSize, + rangeStart: 0, #TODO: Pass start of this hour?? + rangeEnd: times.getTime().toUnix(), + ) + let hashes = (await syncSession.HandleClientSession(conn, self.storageMgr)).valueOr: return err(error) - - debug "Client sync session initialized", remotePeer = conn.peerId - - let writeRes = catch: - await conn.writeLP(seq[byte](payload)) - - trace "request sent to server", payload = toHex(seq[byte](payload)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - var - haveHashes: seq[WakuMessageHash] # What to do with haves ??? - needHashes: seq[WakuMessageHash] - - while true: - let readRes = catch: - await conn.readLp(self.maxFrameSize) - - let buffer: seq[byte] = readRes.valueOr: - return err(error.msg) - - trace "Received Sync request from peer", payload = toHex(buffer) - - let request = NegentropyPayload(buffer) - - let responseOpt = negentropy.clientReconcile(request, haveHashes, needHashes).valueOr: - return err(error) - - let response = responseOpt.valueOr: - debug "Closing connection, client sync session is done" - await conn.close() - break - - trace "Sending Sync response to peer", payload = toHex(seq[byte](response)) - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - return ok(needHashes) + return ok(hashes) proc sync*( self: WakuSync @@ -149,44 +103,16 @@ proc sync*( proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let syncSession = SyncSession( + sessType: SyncSessionType.SERVER, + curState: SyncSessionState.INIT, + frameSize: DefaultFrameSize, + rangeStart: 0, #TODO: Pass start of this hour?? + rangeEnd: 0, + ) debug "Server sync session requested", remotePeer = $conn.peerId - #TODO: Find matching storage based on sync range and continue?? - #TODO: Return error rather than closing stream abruptly? - let storageOpt = self.storageMgr.retrieveStorage(0).valueOr: - error "could not find latest storage" - return - let storage = storageOpt.valueOr: - error "could not find latest storage" - return - let negentropy = Negentropy.new(storage, self.maxFrameSize).valueOr: - error "Negentropy initialization error", error = error - return - - defer: - negentropy.delete() - - while not conn.isClosed: - let requestRes = catch: - await conn.readLp(self.maxFrameSize) - - let buffer = requestRes.valueOr: - if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError: - debug "Connection reading error", error = error.msg - - break - - #TODO: Once we receive needHashes or endOfSync, we should close this stream. - let request = NegentropyPayload(buffer) - - let response = negentropy.serverReconcile(request).valueOr: - error "Reconciliation error", error = error - break - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - if writeRes.isErr(): - error "Connection write error", error = writeRes.error.msg - break + + await syncSession.HandleServerSession(conn, self.storageMgr) debug "Server sync session ended" diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim new file mode 100644 index 0000000000..5509832340 --- /dev/null +++ b/waku/waku_sync/session.nim @@ -0,0 +1,142 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/options, stew/results, chronicles, chronos, libp2p/stream/connection + +import ../common/nimchronos, ../waku_core, ./raw_bindings, ./storage_manager + +logScope: + topics = "waku sync" + +type SyncSessionType* = enum + CLIENT = 1 + SERVER = 2 + +type SyncSessionState* = enum + INIT = 1 + NEGENTROPY_SYNC = 2 + COMPLETE = 3 + +type SyncSession* = ref object + sessType*: SyncSessionType + curState*: SyncSessionState + frameSize*: int + rangeStart*: int64 + rangeEnd*: int64 + negentropy*: Negentropy + +#[ + Session State Machine + 1. negotiate sync params + 2. start negentropy sync + 3. find out local needhashes + 4. If client, share peer's needhashes to peer +]# + +proc initializeNegentropy( + self: SyncSession, storageMgr: WakuSyncStorageManager +): Result[void, string] = + #Use latest storage to sync??, Need to rethink + #Is this the best approach?? Maybe need to improve this. + let storageOpt = storageMgr.retrieveStorage(self.rangeEnd).valueOr: + return err(error) + let storage = storageOpt.valueOr: + error "failed to handle request as could not retrieve recent storage" + return + let negentropy = Negentropy.new(storage, self.frameSize).valueOr: + return err(error) + + self.negentropy = negentropy + + return ok() + +proc HandleClientSession*( + self: SyncSession, conn: Connection, storageMgr: WakuSyncStorageManager +): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + if self.initializeNegentropy(storageMgr).isErr(): + return + defer: + self.negentropy.delete() + + let payload = self.negentropy.initiate().valueOr: + return err(error) + debug "Client sync session initialized", remotePeer = conn.peerId + + let writeRes = catch: + await conn.writeLP(seq[byte](payload)) + + trace "request sent to server", payload = toHex(seq[byte](payload)) + + if writeRes.isErr(): + return err(writeRes.error.msg) + + var + haveHashes: seq[WakuMessageHash] # Send it across to Server at the end of sync + needHashes: seq[WakuMessageHash] + + while true: + let readRes = catch: + await conn.readLp(self.frameSize) + + let buffer: seq[byte] = readRes.valueOr: + return err(error.msg) + + trace "Received Sync request from peer", payload = toHex(buffer) + + let request = NegentropyPayload(buffer) + + let responseOpt = self.negentropy.clientReconcile(request, haveHashes, needHashes).valueOr: + return err(error) + + let response = responseOpt.valueOr: + debug "Closing connection, client sync session is done" + await conn.close() + break + + trace "Sending Sync response to peer", payload = toHex(seq[byte](response)) + + let writeRes = catch: + await conn.writeLP(seq[byte](response)) + + if writeRes.isErr(): + return err(writeRes.error.msg) + + return ok(needHashes) + +proc HandleServerSession*( + self: SyncSession, conn: Connection, storageMgr: WakuSyncStorageManager +) {.async, gcsafe.} = + #TODO: Find matching storage based on sync range and continue?? + #TODO: Return error rather than closing stream abruptly? + if self.initializeNegentropy(storageMgr).isErr(): + return + defer: + self.negentropy.delete() + + while not conn.isClosed: + let requestRes = catch: + await conn.readLp(self.frameSize) + + let buffer = requestRes.valueOr: + if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError: + debug "Connection reading error", error = error.msg + + break + + #TODO: Once we receive needHashes or endOfSync, we should close this stream. + let request = NegentropyPayload(buffer) + + let response = self.negentropy.serverReconcile(request).valueOr: + error "Reconciliation error", error = error + break + + let writeRes = catch: + await conn.writeLP(seq[byte](response)) + + if writeRes.isErr(): + error "Connection write error", error = writeRes.error.msg + break + + return