Skip to content

Commit

Permalink
feat: store resume (#2919)
Browse files Browse the repository at this point in the history
Co-authored-by: Ivan FB <[email protected]>
  • Loading branch information
SionoiS and Ivansete-status authored Jul 30, 2024
1 parent 04027e5 commit aed2a11
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 1 deletion.
7 changes: 6 additions & 1 deletion tests/waku_store/test_all.nim
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{.used.}

import ./test_client, ./test_rpc_codec, ./test_waku_store, ./test_wakunode_store
import
./test_client,
./test_rpc_codec,
./test_waku_store,
./test_wakunode_store,
./test_resume
113 changes: 113 additions & 0 deletions tests/waku_store/test_resume.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{.used.}

import std/[options, net], testutils/unittests, chronos, results

import
waku/[
node/peer_manager,
node/waku_node,
waku_core,
waku_store/resume,
waku_store/common,
waku_archive/driver,
],
../testlib/[wakucore, testasync, wakunode],
./store_utils,
../waku_archive/archive_utils

suite "Store Resume":
var resume {.threadvar.}: StoreResume

asyncSetup:
let resumeRes: Result[StoreResume, string] =
StoreResume.new(peerManager = nil, wakuArchive = nil, wakuStoreClient = nil)

assert resumeRes.isOk(), $resumeRes.error

resume = resumeRes.get()

asyncTeardown:
await resume.stopWait()

asyncTest "get set roundtrip":
let ts = getNowInNanosecondTime()

let setRes = resume.setLastOnlineTimestamp(ts)
assert setRes.isOk(), $setRes.error

let getRes = resume.getLastOnlineTimestamp()
assert getRes.isOk(), $getRes.error

let getTs = getRes.get()

assert getTs == ts, "wrong timestamp"

suite "Store Resume - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode

var serverDriver {.threadvar.}: ArchiveDriver
var clientDriver {.threadvar.}: ArchiveDriver

asyncSetup:
let messages =
@[
fakeWakuMessage(@[byte 00]),
fakeWakuMessage(@[byte 01]),
fakeWakuMessage(@[byte 02]),
fakeWakuMessage(@[byte 03]),
fakeWakuMessage(@[byte 04]),
fakeWakuMessage(@[byte 05]),
fakeWakuMessage(@[byte 06]),
fakeWakuMessage(@[byte 07]),
fakeWakuMessage(@[byte 08]),
fakeWakuMessage(@[byte 09]),
]

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))

serverDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
clientDriver = newSqliteArchiveDriver()

let mountServerArchiveRes = server.mountArchive(serverDriver)
let mountClientArchiveRes = client.mountArchive(clientDriver)

assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()

await server.mountStore()
await client.mountStore()

client.mountStoreClient()
server.mountStoreClient()

client.setupStoreResume()

await server.start()

let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()

client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)

asyncTeardown:
await allFutures(client.stop(), server.stop())

asyncTest "10 messages resume":
var countRes = await clientDriver.getMessagesCount()
assert countRes.isOk(), $countRes.error

check:
countRes.get() == 0

await client.start()

countRes = await clientDriver.getMessagesCount()
assert countRes.isOk(), $countRes.error

check:
countRes.get() == 10
6 changes: 6 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ type WakuNodeConf* = object
name: "store-max-num-db-connections"
.}: int

storeResume* {.
desc: "Enable store resume functionality",
defaultValue: false,
name: "store-resume"
.}: bool

## Filter config
filter* {.
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"
Expand Down
3 changes: 3 additions & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ proc setupProtocols(
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

if conf.store and conf.storeResume:
node.setupStoreResume()

# NOTE Must be mounted after relay
if conf.lightpush:
try:
Expand Down
15 changes: 15 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import
../waku_store/protocol as store,
../waku_store/client as store_client,
../waku_store/common as store_common,
../waku_store/resume,
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
Expand Down Expand Up @@ -94,6 +95,7 @@ type
wakuLegacyStoreClient*: legacy_store_client.WakuStoreClient
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
Expand Down Expand Up @@ -955,6 +957,13 @@ proc query*(

return ok(response)

proc setupStoreResume*(node: WakuNode) =
node.wakuStoreResume = StoreResume.new(
node.peerManager, node.wakuArchive, node.wakuStoreClient
).valueOr:
error "Failed to setup Store Resume", error = $error
return

## Waku lightpush

proc mountLightPush*(
Expand Down Expand Up @@ -1280,6 +1289,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()

if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()

## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
Expand Down Expand Up @@ -1315,6 +1327,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuArchive.isNil():
await node.wakuArchive.stopWait()

if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
Loading

0 comments on commit aed2a11

Please sign in to comment.