diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 0e4ffc9642..35d28807f3 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -18,7 +18,7 @@ import ../../waku/common/logging, ../../waku/factory/external_config, ../../waku/factory/networks_config, - ../../waku/factory/app, + ../../waku/factory/waku, ../../waku/node/health_monitor, ../../waku/node/waku_metrics, ../../waku/waku_api/rest/builder as rest_server_builder @@ -63,7 +63,7 @@ when isMainModule: ## 5. Start monitoring tools and external interfaces ## 6. Setup graceful shutdown hooks - const versionString = "version / git commit hash: " & app.git_version + const versionString = "version / git commit hash: " & waku.git_version let confRes = WakuNodeConf.load(version = versionString) if confRes.isErr(): @@ -119,7 +119,7 @@ when isMainModule: else: discard - info "Running nwaku node", version = app.git_version + info "Running nwaku node", version = waku.git_version logConfig(conf) # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it @@ -135,25 +135,25 @@ when isMainModule: error "Starting esential REST server failed.", error = $error quit(QuitFailure) - var wakunode2 = App.init(conf).valueOr: - error "App initialization failed", error = error + var waku = Waku.init(conf).valueOr: + error "Waku initialization failed", error = error quit(QuitFailure) - wakunode2.restServer = restServer + waku.restServer = restServer - nodeHealthMonitor.setNode(wakunode2.node) + nodeHealthMonitor.setNode(waku.node) - wakunode2.startApp().isOkOr: - error "Starting app failed", error = error + (waitFor startWaku(addr waku)).isOkOr: + error "Starting waku failed", error = error quit(QuitFailure) rest_server_builder.startRestServerProtocolSupport( - restServer, wakunode2.node, wakunode2.wakuDiscv5, conf + restServer, waku.node, waku.wakuDiscv5, conf ).isOkOr: error "Starting protocols support REST server failed.", error = $error quit(QuitFailure) - wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr: + waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr: error "Starting monitoring and external interfaces failed", error = error quit(QuitFailure) @@ -163,7 +163,7 @@ when isMainModule: ## Setup shutdown hooks for this process. ## Stop node gracefully on shutdown. - proc asyncStopper(node: App) {.async: (raises: [Exception]).} = + proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} = nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) await node.stop() quit(QuitSuccess) @@ -174,7 +174,7 @@ when isMainModule: # workaround for https://github.com/nim-lang/Nim/issues/4057 setupForeignThreadGc() notice "Shutting down after receiving SIGINT" - asyncSpawn asyncStopper(wakunode2) + asyncSpawn asyncStopper(waku) setControlCHook(handleCtrlC) @@ -182,7 +182,7 @@ when isMainModule: when defined(posix): proc handleSigterm(signal: cint) {.noconv.} = notice "Shutting down after receiving SIGTERM" - asyncSpawn asyncStopper(wakunode2) + asyncSpawn asyncStopper(waku) c_signal(ansi_c.SIGTERM, handleSigterm) @@ -195,7 +195,7 @@ when isMainModule: # Not available in -d:release mode writeStackTrace() - waitFor wakunode2.stop() + waitFor waku.stop() quit(QuitFailure) c_signal(ansi_c.SIGSEGV, handleSigsegv) diff --git a/examples/wakustealthcommitments/node_spec.nim b/examples/wakustealthcommitments/node_spec.nim index f13f1cbba5..a080b5107e 100644 --- a/examples/wakustealthcommitments/node_spec.nim +++ b/examples/wakustealthcommitments/node_spec.nim @@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import ../../apps/wakunode2/[networks_config, app, external_config] -import ../../waku/common/logging +import + ../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config] import std/[options, strutils, os, sequtils], stew/shims/net as stewNet, @@ -15,11 +15,11 @@ import libp2p/crypto/crypto export - networks_config, app, logging, options, strutils, os, sequtils, stewNet, chronicles, + networks_config, waku, logging, options, strutils, os, sequtils, stewNet, chronicles, chronos, metrics, libbacktrace, crypto -proc setup*(): App = - const versionString = "version / git commit hash: " & app.git_version +proc setup*(): Waku = + const versionString = "version / git commit hash: " & waku.git_version let rng = crypto.newRng() let confRes = WakuNodeConf.load(version = versionString) @@ -48,48 +48,17 @@ proc setup*(): App = conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit - var wakunode2 = App.init(rng, conf) - ## Peer persistence - let res1 = wakunode2.setupPeerPersistence() - if res1.isErr(): - error "1/5 Setting up storage failed", error = $res1.error - quit(QuitFailure) - - debug "2/5 Retrieve dynamic bootstrap nodes" - - let res3 = wakunode2.setupDyamicBootstrapNodes() - if res3.isErr(): - error "2/5 Retrieving dynamic bootstrap nodes failed", error = $res3.error + debug "Starting node" + var waku = Waku.init(conf).valueOr: + error "Waku initialization failed", error = error quit(QuitFailure) - debug "3/5 Initializing node" - - let res4 = wakunode2.setupWakuApp() - if res4.isErr(): - error "3/5 Initializing node failed", error = $res4.error + (waitFor startWaku(addr waku)).isOkOr: + error "Starting waku failed", error = error quit(QuitFailure) - debug "4/5 Mounting protocols" - - var res5: Result[void, string] - try: - res5 = waitFor wakunode2.setupAndMountProtocols() - if res5.isErr(): - error "4/5 Mounting protocols failed", error = $res5.error - quit(QuitFailure) - except Exception: - error "4/5 Mounting protocols failed", error = getCurrentExceptionMsg() - quit(QuitFailure) - - debug "5/5 Starting node and mounted protocols" - # set triggerSelf to false, we don't want to process our own stealthCommitments - wakunode2.node.wakuRelay.triggerSelf = false - - let res6 = wakunode2.startApp() - if res6.isErr(): - error "5/5 Starting node and protocols failed", error = $res6.error - quit(QuitFailure) + waku.node.wakuRelay.triggerSelf = false info "Node setup complete" - return wakunode2 + return waku diff --git a/examples/wakustealthcommitments/stealth_commitment_protocol.nim b/examples/wakustealthcommitments/stealth_commitment_protocol.nim index 2f4f066a0e..1b1c9cd468 100644 --- a/examples/wakustealthcommitments/stealth_commitment_protocol.nim +++ b/examples/wakustealthcommitments/stealth_commitment_protocol.nim @@ -15,7 +15,7 @@ import export wire_spec, logging type StealthCommitmentProtocol* = object - wakuApp: App + waku: Waku contentTopic: string spendingKeyPair: StealthCommitmentFFI.KeyPair viewingKeyPair: StealthCommitmentFFI.KeyPair @@ -51,10 +51,10 @@ proc sendThruWaku*( timestamp: getNanosecondTime(time), ) - (self.wakuApp.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr: + (self.waku.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr: return err("could not append rate limit proof to the message: " & $error) - (await self.wakuApp.node.publish(some(DefaultPubsubTopic), message)).isOkOr: + (await self.waku.node.publish(some(DefaultPubsubTopic), message)).isOkOr: return err("failed to publish message: " & $error) debug "rate limit proof is appended to the message" @@ -167,7 +167,7 @@ proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler = return handler proc new*( - wakuApp: App, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto") + waku: Waku, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto") ): Result[StealthCommitmentProtocol, string] = let spendingKeyPair = StealthCommitmentFFI.generateKeyPair().valueOr: return err("could not generate spending key pair: " & $error) @@ -178,7 +178,7 @@ proc new*( info "viewing public key", publicKey = viewingKeyPair.publicKey let SCP = StealthCommitmentProtocol( - wakuApp: wakuApp, + waku: waku, contentTopic: contentTopic, spendingKeyPair: spendingKeyPair, viewingKeyPair: viewingKeyPair, @@ -192,5 +192,5 @@ proc new*( except CatchableError: error "could not handle SCP message: ", err = getCurrentExceptionMsg() - wakuApp.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)) + waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)) return ok(SCP) diff --git a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim index a60bad2350..236efb8e6b 100644 --- a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim @@ -1,6 +1,6 @@ import std/[options, sequtils, strutils, json] import chronicles, chronos, stew/results, stew/shims/net -import ../../../../waku/node/waku_node, ../../../alloc +import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc type DebugNodeMsgType* = enum RETRIEVE_LISTENING_ADDRESSES @@ -20,13 +20,13 @@ proc getMultiaddresses(node: WakuNode): seq[string] = return node.info().listenAddresses proc process*( - self: ptr DebugNodeRequest, node: WakuNode + self: ptr DebugNodeRequest, waku: Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) case self.operation of RETRIEVE_LISTENING_ADDRESSES: - return ok($(%*node.getMultiaddresses())) + return ok($(%*waku.node.getMultiaddresses())) return err("unsupported operation in DebugNodeRequest") diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 9a09fe101d..b7fb18e7a6 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -12,7 +12,7 @@ import ../../../../waku/node/peer_manager/peer_manager, ../../../../waku/waku_core, ../../../../waku/factory/external_config, - ../../../../waku/node/waku_node, + ../../../../waku/factory/waku, ../../../../waku/node/config, ../../../../waku/waku_archive/driver/builder, ../../../../waku/waku_archive/driver, @@ -48,16 +48,12 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) -proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} = +proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} = var conf: WakuNodeConf var errorResp: string try: - if not parseConfig( - $configJson, - conf, - errorResp, - ): + if not parseConfig($configJson, conf, errorResp): return err(errorResp) except Exception: return err("exception calling parseConfig: " & getCurrentExceptionMsg()) @@ -69,6 +65,7 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} # The Waku Network config (cluster-id=1) if conf.clusterId == 1: + ## TODO: This section is duplicated in wakunode2.nim. We need to move this to a common module let twnClusterConf = ClusterConf.TheWakuNetworkConf() if len(conf.shards) != 0: conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) @@ -88,31 +85,28 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + let wakuRes = Waku.init(conf).valueOr: + error "waku initialization failed", error = error + return err("Failed setting up Waku: " & $error) - let nodeRes = setupNode(conf).valueOr(): - error "Failed setting up node", error = error - return err("Failed setting up node: " & $error) - - return ok(nodeRes) + return ok(wakuRes) proc process*( - self: ptr NodeLifecycleRequest, node: ptr WakuNode + self: ptr NodeLifecycleRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) case self.operation of CREATE_NODE: - let newNodeRes = await createNode(self.configJson) - if newNodeRes.isErr(): - return err(newNodeRes.error) - - node[] = newNodeRes.get() + waku[] = (await createWaku(self.configJson)).valueOr: + return err("error processing createWaku request: " & $error) of START_NODE: - await node[].start() + (await waku.startWaku()).isOkOr: + return err("problem starting waku: " & $error) of STOP_NODE: try: - await node[].stop() + await waku[].stop() except Exception: return err("exception stopping node: " & getCurrentExceptionMsg()) diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index 05d27800ca..36a70e24d0 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -1,6 +1,6 @@ import std/[options, sequtils, strutils] import chronicles, chronos, stew/results, stew/shims/net -import ../../../../waku/node/waku_node, ../../../alloc +import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc type PeerManagementMsgType* = enum CONNECT_TO @@ -43,14 +43,14 @@ proc connectTo( return ok() proc process*( - self: ptr PeerManagementRequest, node: WakuNode + self: ptr PeerManagementRequest, waku: Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) case self.operation of CONNECT_TO: - let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout) + let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout) if ret.isErr(): return err(ret.error) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index 5ec361f5e9..2f7b85107b 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -2,7 +2,7 @@ import std/[options, sequtils, strutils] import chronicles, chronos, stew/byteutils, stew/results, stew/shims/net import ../../../../../waku/waku_core/message/message, - ../../../../../waku/node/waku_node, + ../../../../../waku/factory/waku, ../../../../../waku/waku_core/message, ../../../../../waku/waku_core/time, # Timestamp ../../../../../waku/waku_core/topics/pubsub_topic, @@ -79,26 +79,26 @@ proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage = return wakuMessage proc process*( - self: ptr RelayRequest, node: ptr WakuNode + self: ptr RelayRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) - if node.wakuRelay.isNil(): + if waku.node.wakuRelay.isNil(): return err("Operation not supported without Waku Relay enabled.") case self.operation of SUBSCRIBE: # TO DO: properly perform 'subscribe' - discard node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback) + discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback) of UNSUBSCRIBE: # TODO: properly perform 'unsubscribe' - node.wakuRelay.unsubscribeAll($self.pubsubTopic) + waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic) of PUBLISH: let msg = self.message.toWakuMessage() let pubsubTopic = $self.pubsubTopic - let numPeers = await node.wakuRelay.publish(pubsubTopic, msg) + let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg) if numPeers == 0: return err("Message not sent because no peers found.") elif numPeers > 0: diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index d92c2a5dd4..274a6839ad 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -1,7 +1,7 @@ import std/[options, sequtils, strutils] import chronos, stew/results, stew/shims/net import - ../../../../../waku/node/waku_node, + ../../../../../waku/factory/waku, ../../../../../waku/waku_archive/driver/builder, ../../../../../waku/waku_archive/driver, ../../../../../waku/waku_archive/retention_policy/builder, @@ -50,20 +50,20 @@ proc destroyShared(self: ptr StoreQueryRequest) = deallocShared(self) proc process( - self: ptr StoreQueryRequest, node: ptr WakuNode + self: ptr StoreQueryRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) proc process*( - self: ptr StoreRequest, node: ptr WakuNode + self: ptr StoreRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: deallocShared(self) case self.operation of REMOTE_QUERY: - return await cast[ptr StoreQueryRequest](self[].storeReq).process(node) + return await cast[ptr StoreQueryRequest](self[].storeReq).process(waku) of LOCAL_QUERY: discard # cast[ptr StoreQueryRequest](request[].reqContent).process(node) diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 146d46fc9e..3b99224528 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -5,7 +5,7 @@ import std/json, stew/results import chronos import - ../../../waku/node/waku_node, + ../../../waku/factory/waku, ./requests/node_lifecycle_request, ./requests/peer_manager_request, ./requests/protocols/relay_request, @@ -32,7 +32,7 @@ proc createShared*( return ret proc process*( - T: type InterThreadRequest, request: ptr InterThreadRequest, node: ptr WakuNode + T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = ## Processes the request and deallocates its memory defer: @@ -43,15 +43,15 @@ proc process*( let retFut = case request[].reqType of LIFECYCLE: - cast[ptr NodeLifecycleRequest](request[].reqContent).process(node) + cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku) of PEER_MANAGER: - cast[ptr PeerManagementRequest](request[].reqContent).process(node[]) + cast[ptr PeerManagementRequest](request[].reqContent).process(waku[]) of RELAY: - cast[ptr RelayRequest](request[].reqContent).process(node) + cast[ptr RelayRequest](request[].reqContent).process(waku) of STORE: - cast[ptr StoreRequest](request[].reqContent).process(node) + cast[ptr StoreRequest](request[].reqContent).process(waku) of DEBUG: - cast[ptr DebugNodeRequest](request[].reqContent).process(node[]) + cast[ptr DebugNodeRequest](request[].reqContent).process(waku[]) return await retFut diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index eb5b49bc06..8dbaca14a4 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -11,7 +11,7 @@ import stew/results, stew/shims/net import - ../../../waku/node/waku_node, + ../../../waku/factory/waku, ../events/[json_message_event, json_base_event], ./inter_thread_communication/waku_thread_request, ./inter_thread_communication/waku_thread_response @@ -48,7 +48,7 @@ proc run(ctx: ptr Context) {.thread.} = ## This is the worker thread body. This thread runs the Waku node ## and attends library user requests (stop, connect_to, etc.) - var node: WakuNode + var waku: Waku while running.load == true: ## Trying to get a request from the libwaku main thread @@ -57,7 +57,7 @@ proc run(ctx: ptr Context) {.thread.} = waitFor ctx.reqSignal.wait() let recvOk = ctx.reqChannel.tryRecv(request) if recvOk == true: - let resultResponse = waitFor InterThreadRequest.process(request, addr node) + let resultResponse = waitFor InterThreadRequest.process(request, addr waku) ## Converting a `Result` into a thread-safe transferable response type let threadSafeResp = InterThreadResponse.createShared(resultResponse) diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index f96dea3f61..9503169021 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -11,51 +11,51 @@ import libp2p/switch import ../testlib/common, ../testlib/wakucore, ../testlib/wakunode -include ../../waku/factory/app +include ../../waku/factory/waku -suite "Wakunode2 - App": +suite "Wakunode2 - Waku": test "compilation version should be reported": ## Given let conf = defaultTestWakuNodeConf() - let wakunode2 = App.init(conf).valueOr: + let waku = Waku.init(conf).valueOr: raiseAssert error ## When - let version = wakunode2.version + let version = waku.version ## Then check: version == git_version -suite "Wakunode2 - App initialization": +suite "Wakunode2 - Waku initialization": test "peer persistence setup should be successfully mounted": ## Given var conf = defaultTestWakuNodeConf() conf.peerPersistence = true - let wakunode2 = App.init(conf).valueOr: + let waku = Waku.init(conf).valueOr: raiseAssert error check: - not wakunode2.node.peerManager.storage.isNil() + not waku.node.peerManager.storage.isNil() test "node setup is successful with default configuration": ## Given let conf = defaultTestWakuNodeConf() ## When - var wakunode2 = App.init(conf).valueOr: + var waku = Waku.init(conf).valueOr: raiseAssert error - wakunode2.startApp().isOkOr: + (waitFor startWaku(addr waku)).isOkOr: raiseAssert error - wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr: + waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr: raiseAssert error ## Then - let node = wakunode2.node + let node = waku.node check: not node.isNil() node.wakuArchive.isNil() @@ -64,7 +64,7 @@ suite "Wakunode2 - App initialization": not node.rendezvous.isNil() ## Cleanup - waitFor wakunode2.stop() + waitFor waku.stop() test "app properly handles dynamic port configuration": ## Given @@ -72,21 +72,21 @@ suite "Wakunode2 - App initialization": conf.tcpPort = Port(0) ## When - var wakunode2 = App.init(conf).valueOr: + var waku = Waku.init(conf).valueOr: raiseAssert error - wakunode2.startApp().isOkOr: + (waitFor startWaku(addr waku)).isOkOr: raiseAssert error ## Then let - node = wakunode2.node + node = waku.node typedNodeEnr = node.enr.toTypedRecord() assert typedNodeEnr.isOk(), $typedNodeEnr.error check: - # App started properly + # Waku started properly not node.isNil() node.wakuArchive.isNil() node.wakuStore.isNil() @@ -97,4 +97,4 @@ suite "Wakunode2 - App initialization": typedNodeEnr.get().tcp.get() != 0 ## Cleanup - waitFor wakunode2.stop() + waitFor waku.stop() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 905f4b8316..7a14c9626b 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -337,7 +337,7 @@ proc setupProtocols( proc startNode*( node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[] -): Future[Result[void, string]] {.async.} = +): Future[Result[void, string]] {.async: (raises: []).} = ## Start a configured node and all mounted protocols. ## Connect to static nodes and start ## keep-alive, if configured. diff --git a/waku/factory/app.nim b/waku/factory/waku.nim similarity index 64% rename from waku/factory/app.nim rename to waku/factory/waku.nim index 4b72a23deb..44f15e4165 100644 --- a/waku/factory/app.nim +++ b/waku/factory/waku.nim @@ -46,37 +46,31 @@ import ../../waku/factory/external_config logScope: - topics = "wakunode app" + topics = "wakunode waku" # Git version in git describe format (defined at compile time) const git_version* {.strdefine.} = "n/a" -type - App* = object - version: string - conf: WakuNodeConf - rng: ref HmacDrbgContext - key: crypto.PrivateKey +type Waku* = object + version: string + conf: WakuNodeConf + rng: ref HmacDrbgContext + key: crypto.PrivateKey - wakuDiscv5*: WakuDiscoveryV5 - dynamicBootstrapNodes: seq[RemotePeerInfo] + wakuDiscv5*: WakuDiscoveryV5 + dynamicBootstrapNodes: seq[RemotePeerInfo] - node: WakuNode + node*: WakuNode - restServer*: WakuRestServerRef - metricsServer*: MetricsHttpServerRef + restServer*: WakuRestServerRef + metricsServer*: MetricsHttpServerRef - AppResult*[T] = Result[T, string] - -func node*(app: App): WakuNode = - app.node - -func version*(app: App): string = - app.version +func version*(waku: Waku): string = + waku.version ## Initialisation -proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = +proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = var confCopy = conf let rng = crypto.newRng() @@ -103,7 +97,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = error "Failed setting up node", error = nodeRes.error return err("Failed setting up node: " & nodeRes.error) - var app = App( + var waku = Waku( version: git_version, conf: confCopy, rng: rng, @@ -112,11 +106,11 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(), ) - ok(app) + ok(waku) proc getPorts( listenAddrs: seq[MultiAddress] -): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = +): Result[tuple[tcpPort, websocketPort: Option[Port]], string] = var tcpPort, websocketPort = none(Port) for a in listenAddrs: @@ -132,9 +126,9 @@ proc getPorts( return ok((tcpPort: tcpPort, websocketPort: websocketPort)) -proc getRunningNetConfig(app: App): AppResult[NetConfig] = - var conf = app.conf - let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr: +proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] = + var conf = waku[].conf + let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr: return err("Could not retrieve ports " & error) if tcpPort.isSome(): @@ -149,67 +143,62 @@ proc getRunningNetConfig(app: App): AppResult[NetConfig] = return ok(netConf) -proc updateEnr(app: var App, netConf: NetConfig): AppResult[void] = - let record = enrConfiguration(app.conf, netConf, app.key).valueOr: +proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] = + let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr: return err("ENR setup failed: " & error) - if isClusterMismatched(record, app.conf.clusterId): + if isClusterMismatched(record, waku[].conf.clusterId): return err("cluster id mismatch configured shards") - app.node.enr = record + waku[].node.enr = record return ok() -proc updateApp(app: var App): AppResult[void] = - if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0): - let netConf = getRunningNetConfig(app).valueOr: +proc updateWaku(waku: ptr Waku): Result[void, string] = + if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0): + let netConf = getRunningNetConfig(waku).valueOr: return err("error calling updateNetConfig: " & $error) - updateEnr(app, netConf).isOkOr: + updateEnr(waku, netConf).isOkOr: return err("error calling updateEnr: " & $error) - app.node.announcedAddresses = netConf.announcedAddresses + waku[].node.announcedAddresses = netConf.announcedAddresses - printNodeNetworkInfo(app.node) + printNodeNetworkInfo(waku[].node) return ok() -proc startApp*(app: var App): AppResult[void] = - let nodeRes = catch: - (waitFor startNode(app.node, app.conf, app.dynamicBootstrapNodes)) - if nodeRes.isErr(): - return err("exception starting node: " & nodeRes.error.msg) - - nodeRes.get().isOkOr: - return err("exception starting node: " & error) +proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} = + (await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr: + return err("error while calling startNode: " & $error) - # Update app data that is set dynamically on node start - app.updateApp().isOkOr: + # Update waku data that is set dynamically on node start + updateWaku(waku).isOkOr: return err("Error in updateApp: " & $error) ## Discv5 - if app.conf.discv5Discovery: - app.wakuDiscV5 = waku_discv5.setupDiscoveryV5( - app.node.enr, app.node.peerManager, app.node.topicSubscriptionQueue, app.conf, - app.dynamicBootstrapNodes, app.rng, app.key, + if waku[].conf.discv5Discovery: + waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5( + waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue, waku.conf, + waku.dynamicBootstrapNodes, waku.rng, waku.key, ) - (waitFor app.wakuDiscV5.start()).isOkOr: + (await waku.wakuDiscV5.start()).isOkOr: return err("failed to start waku discovery v5: " & $error) return ok() -# App shutdown +# Waku shutdown -proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} = - if not app.restServer.isNil(): - await app.restServer.stop() +proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = + if not waku.restServer.isNil(): + await waku.restServer.stop() - if not app.metricsServer.isNil(): - await app.metricsServer.stop() + if not waku.metricsServer.isNil(): + await waku.metricsServer.stop() - if not app.wakuDiscv5.isNil(): - await app.wakuDiscv5.stop() + if not waku.wakuDiscv5.isNil(): + await waku.wakuDiscv5.stop() - if not app.node.isNil(): - await app.node.stop() + if not waku.node.isNil(): + await waku.node.stop() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 6c49a730f8..89c7d92c84 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1063,7 +1063,7 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = proc fetchPeerExchangePeers*( node: Wakunode, amount: uint64 -): Future[Result[int, string]] {.async, raises: [Defect].} = +): Future[Result[int, string]] {.async: (raises: []).} = if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" return err("PeerExchange is not mounted") diff --git a/waku/waku.nim b/waku/waku.nim deleted file mode 100644 index 6a7669aab9..0000000000 --- a/waku/waku.nim +++ /dev/null @@ -1,9 +0,0 @@ -# Waku -# -# Licenses: -# - MIT ([LICENSE-MIT](../LICENSE-MIT) or http://opensource.org/licenses/MIT) -# - APACHEv2 ([LICENSE-APACHEv2](../LICENSE-APACHEv2) or https://www.apache.org/licenses/LICENSE-2.0) - -## An implementation of [Waku v2](https://rfc.vac.dev/spec/10/) in nim. -import waku_node as wakunode2 -export wakunode2 diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index c7d750751a..7fdb69ea11 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -54,7 +54,7 @@ type proc request*( wpx: WakuPeerExchange, numPeers: uint64, conn: Connection -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: numPeers)) var buffer: seq[byte] @@ -79,15 +79,18 @@ proc request*( proc request*( wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = - let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) - if connOpt.isNone(): - return err(dialFailure) - return await wpx.request(numPeers, connOpt.get()) +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = + try: + let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) + if connOpt.isNone(): + return err(dialFailure) + return await wpx.request(numPeers, connOpt.get()) + except CatchableError: + return err("exception dialing peer: " & getCurrentExceptionMsg()) proc request*( wpx: WakuPeerExchange, numPeers: uint64 -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure])