Skip to content

Commit

Permalink
Merge 08d9087 into ba3f9e9
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Apr 4, 2024
2 parents ba3f9e9 + 08d9087 commit 568c3af
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 120 deletions.
22 changes: 6 additions & 16 deletions tests/waku_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
{.used.}

import
std/options,
chronos,
chronicles,
libp2p/crypto/crypto
import std/options, chronos, chronicles, libp2p/crypto/crypto

import
../../../waku/[
node/peer_manager,
waku_core,
waku_sync,
],
../testlib/[
common,
wakucore
]
../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/[common, wakucore]

proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSync] {.async.} =
proc newTestWakuSync*(
switch: Switch, handler: WakuSyncCallback
): Future[WakuSync] {.async.} =
const DefaultFrameSize = 153600
let
peerManager = PeerManager.new(switch)
Expand All @@ -27,4 +17,4 @@ proc newTestWakuSync*(switch: Switch, handler: WakuSyncCallback): Future[WakuSyn
proto.start()
switch.mount(proto)

return proto
return proto
25 changes: 23 additions & 2 deletions tests/waku_sync/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
from std/os import sleep

import
../../../waku/[
../../waku/[
common/paging,
node/peer_manager,
waku_core,
Expand Down Expand Up @@ -118,6 +118,27 @@ suite "Waku Sync":
check:
hashes.value.len == 0

#[ asyncTest "sync 2 nodes duplicate hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg1)
client.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg2)
var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 1
#hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
client.ingessMessage(DefaultPubsubTopic, msg2)
sleep(1000)
hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 0 ]#
asyncTest "sync 2 nodes same hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
Expand Down Expand Up @@ -260,7 +281,7 @@ suite "Waku Sync":
client5.ingessMessage(DefaultPubsubTopic, msg)
server.ingessMessage(DefaultPubsubTopic, msg)
i = i + 1
info "client2 storage size", size = client2.storageSize()
#info "client2 storage size", size = client2.storageSize()

var timeBefore = cpuTime()
let hashes1 = await client.sync(serverPeerInfo)
Expand Down
12 changes: 12 additions & 0 deletions waku/waku_core/time.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,15 @@ template nanosecondTime*(collector: Gauge, body: untyped) =
metrics.set(collector, nowInUnixFloat() - start)
else:
body

proc timestampInSeconds*(time: Timestamp): Timestamp =
let timeStr = $time
var timestamp: Timestamp = time

if timeStr.len() > 16:
timestamp = Timestamp(time div Timestamp(1_000_000_000))
elif timeStr.len() < 16 and timeStr.len() > 13:
timestamp = Timestamp(time div Timestamp(1_000_000))
elif timeStr.len() > 10:
timestamp = Timestamp(time div Timestamp(1000))
return timestamp
6 changes: 2 additions & 4 deletions waku/waku_sync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import
./waku_sync/protocol
import ./waku_sync/protocol, ./waku_sync/common

export
protocol
export common, protocol
146 changes: 48 additions & 98 deletions waku/waku_sync/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/options,
std/[options, times],
stew/results,
chronicles,
chronos,
Expand All @@ -20,98 +20,68 @@ import
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common
./common,
./session,
./storage_manager

logScope:
topics = "waku sync"

const DefaultSyncInterval = 60.minutes
const DefaultSyncInterval: timer.Duration = Hour
const DefaultFrameSize = 153600

type
WakuSyncCallback* = proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.
async: (raises: []), closure, gcsafe
.}

WakuSync* = ref object of LPProtocol
storage: Storage
storageMgr: WakuSyncStorageManager
peerManager: PeerManager
maxFrameSize: int # Not sure if this should be protocol defined or not...
syncInterval: Duration
syncInterval: timer.Duration
callback: Option[WakuSyncCallback]
periodicSyncFut: Future[void]

proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return

#TODO: Do we need to check if this message has already been ingessed?
# because what if messages is received via gossip and sync as well?
# Might 2 entries to be inserted into storage which is inefficient.
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into storage ", hash = msgHash
let storageOpt = self.storageMgr.retrieveStorage(msg.timestamp).valueOr:
error "failed to ingess message as could not retrieve storage"
return
let storage = storageOpt.valueOr:
error "failed to ingess message as could not retrieve storage"
return
info "inserting message into storage ", hash = msgHash, timestamp = msg.timestamp

if self.storage.insert(msg.timestamp, msgHash).isErr():
if storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash = msgHash.toHex()

proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr:
return err(error)

defer:
negentropy.delete()

let payload = negentropy.initiate().valueOr:
let syncSession = SyncSession(
sessType: SyncSessionType.CLIENT,
curState: SyncSessionState.INIT,
frameSize: DefaultFrameSize,
rangeStart: 0, #TODO: Pass start of this hour??
rangeEnd: times.getTime().toUnix(),
)
let hashes = (await syncSession.HandleClientSession(conn, self.storageMgr)).valueOr:
return err(error)

debug "Client sync session initialized", remotePeer = conn.peerId

let writeRes = catch:
await conn.writeLP(seq[byte](payload))

trace "request sent to server", payload = toHex(seq[byte](payload))

if writeRes.isErr():
return err(writeRes.error.msg)

var
haveHashes: seq[WakuMessageHash] # What to do with haves ???
needHashes: seq[WakuMessageHash]

while true:
let readRes = catch:
await conn.readLp(self.maxFrameSize)

let buffer: seq[byte] = readRes.valueOr:
return err(error.msg)

trace "Received Sync request from peer", payload = toHex(buffer)

let request = NegentropyPayload(buffer)

let responseOpt = negentropy.clientReconcile(request, haveHashes, needHashes).valueOr:
return err(error)

let response = responseOpt.valueOr:
debug "Closing connection, client sync session is done"
await conn.close()
break

trace "Sending Sync response to peer", payload = toHex(seq[byte](response))

let writeRes = catch:
await conn.writeLP(seq[byte](response))

if writeRes.isErr():
return err(writeRes.error.msg)

return ok(needHashes)
return ok(hashes)

proc sync*(
self: WakuSync
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} =
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")

let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr:
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")

let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
Expand All @@ -122,7 +92,8 @@ proc sync*(
proc sync*(
self: WakuSync, peer: RemotePeerInfo
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let conn: Connection = (await self.peerManager.dialPeer(peer, WakuSyncCodec)).valueOr:
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")

let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
Expand All @@ -132,37 +103,16 @@ proc sync*(

proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let syncSession = SyncSession(
sessType: SyncSessionType.SERVER,
curState: SyncSessionState.INIT,
frameSize: DefaultFrameSize,
rangeStart: 0, #TODO: Pass start of this hour??
rangeEnd: 0,
)
debug "Server sync session requested", remotePeer = $conn.peerId

let negentropy = Negentropy.new(self.storage, self.maxFrameSize).valueOr:
error "Negentropy initialization error", error = error
return

defer:
negentropy.delete()

while not conn.isClosed:
let requestRes = catch:
await conn.readLp(self.maxFrameSize)

let buffer = requestRes.valueOr:
if error.name != $LPStreamRemoteClosedError or error.name != $LPStreamClosedError:
debug "Connection reading error", error = error.msg

break

#TODO: Once we receive needHashes or endOfSync, we should close this stream.
let request = NegentropyPayload(buffer)

let response = negentropy.serverReconcile(request).valueOr:
error "Reconciliation error", error = error
break

let writeRes = catch:
await conn.writeLP(seq[byte](response))
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
break
await syncSession.HandleServerSession(conn, self.storageMgr)

debug "Server sync session ended"

Expand All @@ -173,15 +123,13 @@ proc new*(
T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultFrameSize,
syncInterval: Duration = DefaultSyncInterval,
syncInterval: timer.Duration = DefaultSyncInterval,
callback: Option[WakuSyncCallback] = none(WakuSyncCallback),
): T =
let storage = Storage.new().valueOr:
error "storage creation failed"
return nil
let storageMgr = WakuSyncStorageManager.new()

let sync = WakuSync(
storage: storage,
storageMgr: storageMgr,
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
Expand Down Expand Up @@ -209,11 +157,13 @@ proc periodicSync(self: WakuSync) {.async.} =

proc start*(self: WakuSync) =
self.started = true
if self.syncInterval > 0.seconds: # start periodic-sync only if interval is set.
if self.syncInterval > ZeroDuration:
# start periodic-sync only if interval is set.
self.periodicSyncFut = self.periodicSync()

proc stopWait*(self: WakuSync) {.async.} =
await self.periodicSyncFut.cancelAndWait()

proc storageSize*(self: WakuSync): int =
return self.storage.size()
#[ TODO:Fetch from storageManager??
proc storageSize*(self: WakuSync): int =
return self.storage.size() ]#
Loading

0 comments on commit 568c3af

Please sign in to comment.