Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: big refactor to add waku component in libwaku instead of only waku node #2658

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/factory/networks_config,
../../waku/factory/app,
../../waku/factory/waku,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder
Expand Down Expand Up @@ -63,7 +63,7 @@ when isMainModule:
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks

const versionString = "version / git commit hash: " & app.git_version
const versionString = "version / git commit hash: " & waku.git_version

let confRes = WakuNodeConf.load(version = versionString)
if confRes.isErr():
Expand Down Expand Up @@ -119,7 +119,7 @@ when isMainModule:
else:
discard

info "Running nwaku node", version = app.git_version
info "Running nwaku node", version = waku.git_version
logConfig(conf)

# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
Expand All @@ -135,25 +135,25 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)

var wakunode2 = App.init(conf).valueOr:
error "App initialization failed", error = error
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

wakunode2.restServer = restServer
waku.restServer = restServer

nodeHealthMonitor.setNode(wakunode2.node)
nodeHealthMonitor.setNode(waku.node)

wakunode2.startApp().isOkOr:
error "Starting app failed", error = error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)

rest_server_builder.startRestServerProtocolSupport(
restServer, wakunode2.node, wakunode2.wakuDiscv5, conf
restServer, waku.node, waku.wakuDiscv5, conf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)

wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)

Expand All @@ -163,7 +163,7 @@ when isMainModule:
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.

proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
quit(QuitSuccess)
Expand All @@ -174,15 +174,15 @@ when isMainModule:
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

setControlCHook(handleCtrlC)

# Handle SIGTERM
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

c_signal(ansi_c.SIGTERM, handleSigterm)

Expand All @@ -195,7 +195,7 @@ when isMainModule:
# Not available in -d:release mode
writeStackTrace()

waitFor wakunode2.stop()
waitFor waku.stop()
quit(QuitFailure)

c_signal(ansi_c.SIGSEGV, handleSigsegv)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils, json]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
Expand All @@ -20,13 +20,13 @@ proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses

proc process*(
self: ptr DebugNodeRequest, node: WakuNode
self: ptr DebugNodeRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of RETRIEVE_LISTENING_ADDRESSES:
return ok($(%*node.getMultiaddresses()))
return ok($(%*waku.node.getMultiaddresses()))

return err("unsupported operation in DebugNodeRequest")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/factory/external_config,
../../../../waku/node/waku_node,
../../../../waku/factory/waku,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
Expand Down Expand Up @@ -48,16 +48,12 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} =
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
var conf: WakuNodeConf
var errorResp: string

try:
if not parseConfig(
$configJson,
conf,
errorResp,
):
if not parseConfig($configJson, conf, errorResp):
return err(errorResp)
except Exception:
return err("exception calling parseConfig: " & getCurrentExceptionMsg())
Expand All @@ -69,6 +65,7 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}

# The Waku Network config (cluster-id=1)
if conf.clusterId == 1:
## TODO: This section is duplicated in wakunode2.nim. We need to move this to a common module
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
Expand All @@ -88,31 +85,28 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit

let wakuRes = Waku.init(conf).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

let nodeRes = setupNode(conf).valueOr():
error "Failed setting up node", error = error
return err("Failed setting up node: " & $error)

return ok(nodeRes)
return ok(wakuRes)

proc process*(
self: ptr NodeLifecycleRequest, node: ptr WakuNode
self: ptr NodeLifecycleRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)

node[] = newNodeRes.get()
waku[] = (await createWaku(self.configJson)).valueOr:
return err("error processing createWaku request: " & $error)
of START_NODE:
await node[].start()
(await waku.startWaku()).isOkOr:
return err("problem starting waku: " & $error)
of STOP_NODE:
try:
await node[].stop()
await waku[].stop()
except Exception:
return err("exception stopping node: " & getCurrentExceptionMsg())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type PeerManagementMsgType* = enum
CONNECT_TO
Expand Down Expand Up @@ -43,14 +43,14 @@ proc connectTo(
return ok()

proc process*(
self: ptr PeerManagementRequest, node: WakuNode
self: ptr PeerManagementRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CONNECT_TO:
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import std/[options, sequtils, strutils]
import chronicles, chronos, stew/byteutils, stew/results, stew/shims/net
import
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
Expand Down Expand Up @@ -79,26 +79,26 @@ proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
return wakuMessage

proc process*(
self: ptr RelayRequest, node: ptr WakuNode
self: ptr RelayRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

if node.wakuRelay.isNil():
if waku.node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")

case self.operation
of SUBSCRIBE:
# TO DO: properly perform 'subscribe'
discard node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
# TODO: properly perform 'unsubscribe'
node.wakuRelay.unsubscribeAll($self.pubsubTopic)
waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic

let numPeers = await node.wakuRelay.publish(pubsubTopic, msg)
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import std/[options, sequtils, strutils]
import chronos, stew/results, stew/shims/net
import
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_archive/driver/builder,
../../../../../waku/waku_archive/driver,
../../../../../waku/waku_archive/retention_policy/builder,
Expand Down Expand Up @@ -50,20 +50,20 @@ proc destroyShared(self: ptr StoreQueryRequest) =
deallocShared(self)

proc process(
self: ptr StoreQueryRequest, node: ptr WakuNode
self: ptr StoreQueryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

proc process*(
self: ptr StoreRequest, node: ptr WakuNode
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
deallocShared(self)

case self.operation
of REMOTE_QUERY:
return await cast[ptr StoreQueryRequest](self[].storeReq).process(node)
return await cast[ptr StoreQueryRequest](self[].storeReq).process(waku)
of LOCAL_QUERY:
discard
# cast[ptr StoreQueryRequest](request[].reqContent).process(node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import std/json, stew/results
import chronos
import
../../../waku/node/waku_node,
../../../waku/factory/waku,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request,
Expand All @@ -32,7 +32,7 @@ proc createShared*(
return ret

proc process*(
T: type InterThreadRequest, request: ptr InterThreadRequest, node: ptr WakuNode
T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
## Processes the request and deallocates its memory
defer:
Expand All @@ -43,15 +43,15 @@ proc process*(
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(node)
cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku)
of PEER_MANAGER:
cast[ptr PeerManagementRequest](request[].reqContent).process(node[])
cast[ptr PeerManagementRequest](request[].reqContent).process(waku[])
of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(node)
cast[ptr RelayRequest](request[].reqContent).process(waku)
of STORE:
cast[ptr StoreRequest](request[].reqContent).process(node)
cast[ptr StoreRequest](request[].reqContent).process(waku)
of DEBUG:
cast[ptr DebugNodeRequest](request[].reqContent).process(node[])
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])

return await retFut

Expand Down
6 changes: 3 additions & 3 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
../../../waku/factory/waku,
../events/[json_message_event, json_base_event],
./inter_thread_communication/waku_thread_request,
./inter_thread_communication/waku_thread_response
Expand Down Expand Up @@ -48,7 +48,7 @@ proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)

var node: WakuNode
var waku: Waku

while running.load == true:
## Trying to get a request from the libwaku main thread
Expand All @@ -57,7 +57,7 @@ proc run(ctx: ptr Context) {.thread.} =
waitFor ctx.reqSignal.wait()
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse = waitFor InterThreadRequest.process(request, addr node)
let resultResponse = waitFor InterThreadRequest.process(request, addr waku)

## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)
Expand Down
Loading
Loading