Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: message transfer mechanism #2688

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions tests/node/test_wakunode_sync.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
{.used.}

import stew/shims/net as stewNet, testutils/unittests, chronos, libp2p/crypto/crypto

import
../../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]

suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode

asyncSetup:
let timeOrigin = now()

let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)

let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)

assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()

await server.mountStore()
await client.mountStore()

client.mountStoreClient()
server.mountStoreClient()

let mountServerSync = server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncInterval = 1.hours,
enablePruning = false,
)
let mountClientSync = client.mountWakuSync(
maxFrameSize = 0,
syncInterval = 1.seconds,
relayJitter = 0.seconds,
enablePruning = false,
)

assert mountServerSync.isOk()
assert mountClientSync.isOk()

for msg in messages:
server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)

await allFutures(server.start(), client.start())

await sleepAsync(chronos.milliseconds(500))

let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()

client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)

client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)

asyncTeardown:
await allFutures(client.stop(), server.stop())

asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

await sleepAsync(1.seconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])

client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()

await sleepAsync(1.seconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])

server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()

await sleepAsync(1.seconds)

check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()

suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode

asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()

let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)

assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()

await server.mountStore()
await client.mountStore()

client.mountStoreClient()
server.mountStoreClient()

let mountServerSync = server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncInterval = 1.hours,
enablePruning = false,
)
let mountClientSync = client.mountWakuSync(
maxFrameSize = 0,
syncInterval = 1.seconds,
relayJitter = 0.seconds,
enablePruning = true,
)

assert mountServerSync.isOk()
assert mountClientSync.isOk()

await allFutures(server.start(), client.start())

await sleepAsync(1.seconds)

asyncTeardown:
await allFutures(client.stop(), server.stop())

asyncTest "pruning":
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

await sleepAsync(1.seconds)

for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

await sleepAsync(1.seconds)

for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

await sleepAsync(1.seconds)

for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)

await sleepAsync(1.seconds)

check:
client.wakuSync.storageSize() == 10
13 changes: 10 additions & 3 deletions tests/waku_sync/sync_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ import std/options, chronos, chronicles, libp2p/crypto/crypto
import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore

proc newTestWakuSync*(
switch: Switch, handler: SyncCallback
): Future[WakuSync] {.async.} =
switch: Switch,
transfer: Option[TransferCallback] = none(TransferCallback),
prune: Option[PruneCallback] = none(PruneCallback),
interval: Duration = DefaultSyncInterval,
): WakuSync =
let
peerManager = PeerManager.new(switch)
proto = WakuSync.new(
peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler)
peerManager = peerManager,
relayJitter = 0.seconds,
transferCB = transfer,
pruneCB = prune,
syncInterval = interval,
)
assert proto != nil

Expand Down
Loading
Loading