-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: Waku Sync Protocol * feat: state machine (#2656) * feat: pruning storage mehcanism (#2673) * feat: message transfer mechanism & tests (#2688) * update docker files * added ENR filed for sync & misc. fixes * adding new sync range param & fixes --------- Co-authored-by: Ivan FB <[email protected]> Co-authored-by: Prem Chaitanya Prathi <[email protected]>
- Loading branch information
1 parent
54b5222
commit 2cc86c5
Showing
27 changed files
with
2,334 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
{.used.} | ||
|
||
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto | ||
|
||
import | ||
../../waku/ | ||
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync], | ||
../waku_store/store_utils, | ||
../waku_archive/archive_utils, | ||
../testlib/[wakucore, wakunode, testasync] | ||
|
||
suite "Store Sync - End to End": | ||
var server {.threadvar.}: WakuNode | ||
var client {.threadvar.}: WakuNode | ||
|
||
asyncSetup: | ||
let timeOrigin = now() | ||
|
||
let messages = | ||
@[ | ||
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)), | ||
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)), | ||
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)), | ||
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)), | ||
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)), | ||
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)), | ||
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)), | ||
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)), | ||
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)), | ||
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)), | ||
] | ||
|
||
let | ||
serverKey = generateSecp256k1Key() | ||
clientKey = generateSecp256k1Key() | ||
|
||
server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) | ||
client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) | ||
|
||
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) | ||
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages) | ||
|
||
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) | ||
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) | ||
|
||
assert mountServerArchiveRes.isOk() | ||
assert mountClientArchiveRes.isOk() | ||
|
||
await server.mountStore() | ||
await client.mountStore() | ||
|
||
client.mountStoreClient() | ||
server.mountStoreClient() | ||
|
||
let mountServerSync = await server.mountWakuSync( | ||
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds | ||
) | ||
let mountClientSync = await client.mountWakuSync( | ||
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds | ||
) | ||
|
||
assert mountServerSync.isOk(), mountServerSync.error | ||
assert mountClientSync.isOk(), mountClientSync.error | ||
|
||
# messages are retreived when mounting Waku sync | ||
# but based on interval so this is needed for client only | ||
for msg in messages: | ||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg) | ||
|
||
await allFutures(server.start(), client.start()) | ||
|
||
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() | ||
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo() | ||
|
||
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec) | ||
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec) | ||
|
||
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec) | ||
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec) | ||
|
||
asyncTeardown: | ||
# prevent premature channel shutdown | ||
await sleepAsync(10.milliseconds) | ||
|
||
await allFutures(client.stop(), server.stop()) | ||
|
||
asyncTest "no message set differences": | ||
check: | ||
client.wakuSync.storageSize() == server.wakuSync.storageSize() | ||
|
||
await sleepAsync(10.milliseconds) | ||
|
||
check: | ||
client.wakuSync.storageSize() == server.wakuSync.storageSize() | ||
|
||
asyncTest "client message set differences": | ||
let msg = fakeWakuMessage(@[byte 10]) | ||
|
||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg) | ||
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) | ||
|
||
check: | ||
client.wakuSync.storageSize() != server.wakuSync.storageSize() | ||
|
||
await sleepAsync(10.milliseconds) | ||
|
||
check: | ||
client.wakuSync.storageSize() == server.wakuSync.storageSize() | ||
|
||
asyncTest "server message set differences": | ||
let msg = fakeWakuMessage(@[byte 10]) | ||
|
||
server.wakuSync.messageIngress(DefaultPubsubTopic, msg) | ||
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) | ||
|
||
check: | ||
client.wakuSync.storageSize() != server.wakuSync.storageSize() | ||
|
||
await sleepAsync(10.milliseconds) | ||
|
||
check: | ||
client.wakuSync.storageSize() == server.wakuSync.storageSize() | ||
|
||
suite "Waku Sync - Pruning": | ||
var server {.threadvar.}: WakuNode | ||
var client {.threadvar.}: WakuNode | ||
|
||
asyncSetup: | ||
let | ||
serverKey = generateSecp256k1Key() | ||
clientKey = generateSecp256k1Key() | ||
|
||
server = newTestWakuNode(serverKey, IPv4_any(), Port(0)) | ||
client = newTestWakuNode(clientKey, IPv4_any(), Port(0)) | ||
|
||
let serverArchiveDriver = newSqliteArchiveDriver() | ||
let clientArchiveDriver = newSqliteArchiveDriver() | ||
|
||
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver) | ||
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver) | ||
|
||
assert mountServerArchiveRes.isOk() | ||
assert mountClientArchiveRes.isOk() | ||
|
||
await server.mountStore() | ||
await client.mountStore() | ||
|
||
client.mountStoreClient() | ||
server.mountStoreClient() | ||
|
||
let mountServerSync = await server.mountWakuSync( | ||
maxFrameSize = 0, | ||
relayJitter = 0.seconds, | ||
syncRange = 1.hours, | ||
syncInterval = 5.minutes, | ||
) | ||
let mountClientSync = await client.mountWakuSync( | ||
maxFrameSize = 0, | ||
syncRange = 10.milliseconds, | ||
syncInterval = 10.milliseconds, | ||
relayJitter = 0.seconds, | ||
) | ||
|
||
assert mountServerSync.isOk(), mountServerSync.error | ||
assert mountClientSync.isOk(), mountClientSync.error | ||
|
||
await allFutures(server.start(), client.start()) | ||
|
||
asyncTeardown: | ||
await sleepAsync(10.milliseconds) | ||
|
||
await allFutures(client.stop(), server.stop()) | ||
|
||
asyncTest "pruning": | ||
for _ in 0 ..< 4: | ||
for _ in 0 ..< 10: | ||
let msg = fakeWakuMessage() | ||
client.wakuSync.messageIngress(DefaultPubsubTopic, msg) | ||
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg) | ||
|
||
server.wakuSync.messageIngress(DefaultPubsubTopic, msg) | ||
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg) | ||
|
||
await sleepAsync(10.milliseconds) | ||
|
||
check: | ||
client.wakuSync.storageSize() == 10 | ||
server.wakuSync.storageSize() == 40 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
{.used.} | ||
|
||
import std/options, chronos, chronicles, libp2p/crypto/crypto | ||
|
||
import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore | ||
|
||
proc newTestWakuSync*( | ||
switch: Switch, | ||
transfer: Option[TransferCallback] = none(TransferCallback), | ||
prune: Option[PruneCallback] = none(PruneCallback), | ||
interval: Duration = DefaultSyncInterval, | ||
): Future[WakuSync] {.async.} = | ||
let peerManager = PeerManager.new(switch) | ||
|
||
let fakePruneCallback = proc( | ||
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash] | ||
): Future[ | ||
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string] | ||
] {.async: (raises: []), closure.} = | ||
return ok((@[], none(WakuMessageHash))) | ||
|
||
let res = await WakuSync.new( | ||
peerManager = peerManager, | ||
relayJitter = 0.seconds, | ||
syncInterval = interval, | ||
wakuArchive = nil, | ||
wakuStoreClient = nil, | ||
pruneCallback = some(fakePruneCallback), | ||
transferCallback = none(TransferCallback), | ||
) | ||
|
||
let proto = res.get() | ||
|
||
proto.start() | ||
switch.mount(proto) | ||
|
||
return proto |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{.used.} | ||
|
||
import ./test_protocol, ./test_bindings |
Oops, something went wrong.