diff --git a/tests/waku_sync/sync_utils.nim b/tests/waku_sync/sync_utils.nim index 64f31cb498..c0d88b5fb8 100644 --- a/tests/waku_sync/sync_utils.nim +++ b/tests/waku_sync/sync_utils.nim @@ -1,23 +1,13 @@ {.used.} -import - std/options, - chronos, - chronicles, - libp2p/crypto/crypto +import std/options, chronos, chronicles, libp2p/crypto/crypto import - ../../../waku/[ - node/peer_manager, - waku_core, - waku_sync, - ], - ../testlib/[ - common, - wakucore - ] + ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/[common, wakucore] -proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} = +proc newTestWakuSync*( + switch: Switch, handler: WakuSyncCallback +): Future[WakuSync] {.async.} = const DefaultFrameSize = 153600 let peerManager = PeerManager.new(switch) @@ -27,4 +17,4 @@ proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSyn proto.start() switch.mount(proto) - return proto \ No newline at end of file + return proto diff --git a/tests/waku_sync/test_protocol.nim b/tests/waku_sync/test_protocol.nim index 8e040e055e..f92d193304 100644 --- a/tests/waku_sync/test_protocol.nim +++ b/tests/waku_sync/test_protocol.nim @@ -12,7 +12,7 @@ import from std/os import sleep import - ../../../waku/[ + ../../waku/[ common/paging, node/peer_manager, waku_core, @@ -118,6 +118,27 @@ suite "Waku Sync": check: hashes.value.len == 0 + #[ asyncTest "sync 2 nodes duplicate hashes": + let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) + let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) + + server.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg1) + client.ingessMessage(DefaultPubsubTopic, msg1) + server.ingessMessage(DefaultPubsubTopic, msg2) + + var hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 1 + #hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + #Assuming message is fetched from peer + client.ingessMessage(DefaultPubsubTopic, msg2) + sleep(1000) + hashes = await client.sync(serverPeerInfo) + require (hashes.isOk()) + check: + hashes.value.len == 0 ]# asyncTest "sync 2 nodes same hashes": let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic) let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic) @@ -260,7 +281,7 @@ suite "Waku Sync": client5.ingessMessage(DefaultPubsubTopic, msg) server.ingessMessage(DefaultPubsubTopic, msg) i = i + 1 - info "client2 storage size", size = client2.storageSize() + #info "client2 storage size", size = client2.storageSize() var timeBefore = cpuTime() let hashes1 = await client.sync(serverPeerInfo) diff --git a/waku/waku_core/time.nim b/waku/waku_core/time.nim index b1396015c4..5bbd5bcdc3 100644 --- a/waku/waku_core/time.nim +++ b/waku/waku_core/time.nim @@ -32,3 +32,15 @@ template nanosecondTime*(collector: Gauge, body: untyped) = metrics.set(collector, nowInUnixFloat() - start) else: body + +proc timestampInSeconds*(time: Timestamp): Timestamp = + let timeStr = $time + var timestamp: Timestamp = time + + if timeStr.len() > 16: + timestamp = Timestamp(time div Timestamp(1_000_000_000)) + elif timeStr.len() < 16 and timeStr.len() > 13: + timestamp = Timestamp(time div Timestamp(1_000_000)) + elif timeStr.len() > 10: + timestamp = Timestamp(time div Timestamp(1000)) + return timestamp diff --git a/waku/waku_sync.nim b/waku/waku_sync.nim index dc60abeaee..55e6f80eee 100644 --- a/waku/waku_sync.nim +++ b/waku/waku_sync.nim @@ -3,8 +3,6 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import - ./waku_sync/protocol +import ./waku_sync/protocol, ./waku_sync/common -export - protocol +export common, protocol diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 45771cb2e9..de46e329e0 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -4,7 +4,7 @@ else: {.push raises: [].} import - std/options, + std/[options, times], stew/results, chronicles, chronos, @@ -20,12 +20,15 @@ import ../waku_enr, ../node/peer_manager/peer_manager, ./raw_bindings, - ./common + ./common, + ./session, + ./storage_manager logScope: topics = "waku sync" -const DefaultSyncInterval = 60.minutes +const DefaultSyncInterval: timer.Duration = Hour +const DefaultFrameSize = 153600 type WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {. @@ -33,85 +36,52 @@ type .} WakuSync* = ref object of LPProtocol - storage: Storage + storageMgr: WakuSyncStorageManager peerManager: PeerManager maxFrameSize: int # Not sure if this should be protocol defined or not... - syncInterval: Duration + syncInterval: timer.Duration callback: Option[WakuSyncCallback] periodicSyncFut: Future[void] proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) = if msg.ephemeral: return - + #TODO: Do we need to check if this message has already been ingessed? + # because what if messages is received via gossip and sync as well? + # Might 2 entries to be inserted into storage which is inefficient. let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg) - trace "inserting message into storage ", hash = msgHash + let storageOpt = self.storageMgr.retrieveStorage(msg.timestamp).valueOr: + error "failed to ingess message as could not retrieve storage" + return + let storage = storageOpt.valueOr: + error "failed to ingess message as could not retrieve storage" + return + info "inserting message into storage ", hash = msgHash, timestamp = msg.timestamp - if self.storage.insert(msg.timestamp, msgHash).isErr(): + if storage.insert(msg.timestamp, msgHash).isErr(): debug "failed to insert message ", hash = msgHash.toHex() proc request( self: WakuSync, conn: Connection ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr: - return err(error) - - defer: - negentropy.delete() - - let payload = negentropy.initiate().valueOr: + let syncSession = SyncSession( + sessType: SyncSessionType.CLIENT, + curState: SyncSessionState.INIT, + frameSize: DefaultFrameSize, + rangeStart: 0, #TODO: Pass start of this hour?? + rangeEnd: times.getTime().toUnix(), + ) + let hashes = (await syncSession.HandleClientSession(conn, self.storageMgr)).valueOr: return err(error) - - debug "Client sync session initialized", remotePeer = conn.peerId - - let writeRes = catch: - await conn.writeLP(seq[byte](payload)) - - trace "request sent to server", payload = toHex(seq[byte](payload)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - var - haveHashes: seq[WakuMessageHash] # What to do with haves ??? - needHashes: seq[WakuMessageHash] - - while true: - let readRes = catch: - await conn.readLp(self.maxFrameSize) - - let buffer: seq[byte] = readRes.valueOr: - return err(error.msg) - - trace "Received Sync request from peer", payload = toHex(buffer) - - let request = NegentropyPayload(buffer) - - let responseOpt = negentropy.clientReconcile(request, haveHashes, needHashes).valueOr: - return err(error) - - let response = responseOpt.valueOr: - debug "Closing connection, client sync session is done" - await conn.close() - break - - trace "Sending Sync response to peer", payload = toHex(seq[byte](response)) - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - - if writeRes.isErr(): - return err(writeRes.error.msg) - - return ok(needHashes) + return ok(hashes) proc sync*( self: WakuSync ): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} = let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr: return err("No suitable peer found for sync") - - let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + let conn: Connection = connOpt.valueOr: return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: @@ -122,7 +92,8 @@ proc sync*( proc sync*( self: WakuSync, peer: RemotePeerInfo ): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = - let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr: + let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec) + let conn: Connection = connOpt.valueOr: return err("Cannot establish sync connection") let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr: @@ -132,37 +103,16 @@ proc sync*( proc initProtocolHandler(self: WakuSync) = proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let syncSession = SyncSession( + sessType: SyncSessionType.SERVER, + curState: SyncSessionState.INIT, + frameSize: DefaultFrameSize, + rangeStart: 0, #TODO: Pass start of this hour?? + rangeEnd: 0, + ) debug "Server sync session requested", remotePeer = $conn.peerId - let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr: - error "Negentropy initialization error", error = error - return - - defer: - negentropy.delete() - - while not conn.isClosed: - let requestRes = catch: - await conn.readLp(self.maxFrameSize) - - let buffer = requestRes.valueOr: - if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError: - debug "Connection reading error", error = error.msg - - break - - #TODO: Once we receive needHashes or endOfSync, we should close this stream. - let request = NegentropyPayload(buffer) - - let response = negentropy.serverReconcile(request).valueOr: - error "Reconciliation error", error = error - break - - let writeRes = catch: - await conn.writeLP(seq[byte](response)) - if writeRes.isErr(): - error "Connection write error", error = writeRes.error.msg - break + await syncSession.HandleServerSession(conn, self.storageMgr) debug "Server sync session ended" @@ -173,15 +123,13 @@ proc new*( T: type WakuSync, peerManager: PeerManager, maxFrameSize: int = DefaultFrameSize, - syncInterval: Duration = DefaultSyncInterval, + syncInterval: timer.Duration = DefaultSyncInterval, callback: Option[WakuSyncCallback] = none(WakuSyncCallback), ): T = - let storage = Storage.new().valueOr: - error "storage creation failed" - return nil + let storageMgr = WakuSyncStorageManager.new() let sync = WakuSync( - storage: storage, + storageMgr: storageMgr, peerManager: peerManager, maxFrameSize: maxFrameSize, syncInterval: syncInterval, @@ -209,11 +157,13 @@ proc periodicSync(self: WakuSync) {.async.} = proc start*(self: WakuSync) = self.started = true - if self.syncInterval > 0.seconds: # start periodic-sync only if interval is set. + if self.syncInterval > ZeroDuration: + # start periodic-sync only if interval is set. self.periodicSyncFut = self.periodicSync() proc stopWait*(self: WakuSync) {.async.} = await self.periodicSyncFut.cancelAndWait() -proc storageSize*(self: WakuSync): int = - return self.storage.size() +#[ TODO:Fetch from storageManager?? + proc storageSize*(self: WakuSync): int = + return self.storage.size() ]# diff --git a/waku/waku_sync/session.nim b/waku/waku_sync/session.nim new file mode 100644 index 0000000000..5509832340 --- /dev/null +++ b/waku/waku_sync/session.nim @@ -0,0 +1,142 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/options, stew/results, chronicles, chronos, libp2p/stream/connection + +import ../common/nimchronos, ../waku_core, ./raw_bindings, ./storage_manager + +logScope: + topics = "waku sync" + +type SyncSessionType* = enum + CLIENT = 1 + SERVER = 2 + +type SyncSessionState* = enum + INIT = 1 + NEGENTROPY_SYNC = 2 + COMPLETE = 3 + +type SyncSession* = ref object + sessType*: SyncSessionType + curState*: SyncSessionState + frameSize*: int + rangeStart*: int64 + rangeEnd*: int64 + negentropy*: Negentropy + +#[ + Session State Machine + 1. negotiate sync params + 2. start negentropy sync + 3. find out local needhashes + 4. If client, share peer's needhashes to peer +]# + +proc initializeNegentropy( + self: SyncSession, storageMgr: WakuSyncStorageManager +): Result[void, string] = + #Use latest storage to sync??, Need to rethink + #Is this the best approach?? Maybe need to improve this. + let storageOpt = storageMgr.retrieveStorage(self.rangeEnd).valueOr: + return err(error) + let storage = storageOpt.valueOr: + error "failed to handle request as could not retrieve recent storage" + return + let negentropy = Negentropy.new(storage, self.frameSize).valueOr: + return err(error) + + self.negentropy = negentropy + + return ok() + +proc HandleClientSession*( + self: SyncSession, conn: Connection, storageMgr: WakuSyncStorageManager +): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} = + if self.initializeNegentropy(storageMgr).isErr(): + return + defer: + self.negentropy.delete() + + let payload = self.negentropy.initiate().valueOr: + return err(error) + debug "Client sync session initialized", remotePeer = conn.peerId + + let writeRes = catch: + await conn.writeLP(seq[byte](payload)) + + trace "request sent to server", payload = toHex(seq[byte](payload)) + + if writeRes.isErr(): + return err(writeRes.error.msg) + + var + haveHashes: seq[WakuMessageHash] # Send it across to Server at the end of sync + needHashes: seq[WakuMessageHash] + + while true: + let readRes = catch: + await conn.readLp(self.frameSize) + + let buffer: seq[byte] = readRes.valueOr: + return err(error.msg) + + trace "Received Sync request from peer", payload = toHex(buffer) + + let request = NegentropyPayload(buffer) + + let responseOpt = self.negentropy.clientReconcile(request, haveHashes, needHashes).valueOr: + return err(error) + + let response = responseOpt.valueOr: + debug "Closing connection, client sync session is done" + await conn.close() + break + + trace "Sending Sync response to peer", payload = toHex(seq[byte](response)) + + let writeRes = catch: + await conn.writeLP(seq[byte](response)) + + if writeRes.isErr(): + return err(writeRes.error.msg) + + return ok(needHashes) + +proc HandleServerSession*( + self: SyncSession, conn: Connection, storageMgr: WakuSyncStorageManager +) {.async, gcsafe.} = + #TODO: Find matching storage based on sync range and continue?? + #TODO: Return error rather than closing stream abruptly? + if self.initializeNegentropy(storageMgr).isErr(): + return + defer: + self.negentropy.delete() + + while not conn.isClosed: + let requestRes = catch: + await conn.readLp(self.frameSize) + + let buffer = requestRes.valueOr: + if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError: + debug "Connection reading error", error = error.msg + + break + + #TODO: Once we receive needHashes or endOfSync, we should close this stream. + let request = NegentropyPayload(buffer) + + let response = self.negentropy.serverReconcile(request).valueOr: + error "Reconciliation error", error = error + break + + let writeRes = catch: + await conn.writeLP(seq[byte](response)) + + if writeRes.isErr(): + error "Connection write error", error = writeRes.error.msg + break + + return diff --git a/waku/waku_sync/storage_manager.nim b/waku/waku_sync/storage_manager.nim new file mode 100644 index 0000000000..64fea97842 --- /dev/null +++ b/waku/waku_sync/storage_manager.nim @@ -0,0 +1,74 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[times, tables, options], chronicles, chronos, stew/results + +import ./raw_bindings, ../waku_core/time + +logScope: + topics = "waku sync" + +type WakuSyncStorageManager* = ref object + storages: OrderedTable[string, Storage] + # Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH + maxHours: int64 + +proc new*( + T: type WakuSyncStorageManager, + hoursToStore: times.Duration = initDuration(minutes = 120), +): T = + return WakuSyncStorageManager(maxHours: hoursToStore.inHours) + +proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] = + if self.storages.len() == 0: + return ok(none(Storage)) + var storageToFetch: Storage + #is there a more effective way to fetch last element? + for k, storage in self.storages: + storageToFetch = storage + + return ok(some(storageToFetch)) + +proc deleteOldestStorage*(self: WakuSyncStorageManager) = + var storageToDelete: Storage + var time: string + #is there a more effective way to fetch first element? + for k, storage in self.storages: + storageToDelete = storage + time = k + break + + if self.storages.pop(time, storageToDelete): + delete(storageToDelete) + +proc retrieveStorage*( + self: WakuSyncStorageManager, time: Timestamp +): Result[Option[Storage], string] = + var timestamp: Timestamp + if time == 0: + timestamp = timestampInSeconds(getNowInNanosecondTime()) + debug "timestamp not provided, using now to fetch storage", timestamp = timestamp + else: + timestamp = timestampInSeconds(time) + let tsTime = times.fromUnix(timestamp) + let dateTime = times.format(tsTime, "yyyyMMddHH", utc()) + + var storage: Storage = self.storages.getOrDefault(dateTime) + if storage == nil: + #create a new storage + # TODO: May need synchronization?? + # Limit number of storages to configured duration + let hours = self.storages.len() + if hours == self.maxHours: + #Need to delete oldest storage at this point, but what if that is being synced? + self.deleteOldestStorage() + info "number of storages reached, deleting the oldest" + info "creating a new storage for ", time = dateTime + storage = Storage.new().valueOr: + error "storage creation failed" + return err(error) + self.storages[dateTime] = storage + + return ok(some(storage))