From 0725da0b6685950796b6fed08c120cd621942aa1 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 27 Oct 2022 17:29:09 +0200 Subject: [PATCH] chore(node): waku node code reorganization --- apps/chat2/chat2.nim | 2 +- apps/wakubridge/wakubridge.nim | 4 +- examples/v2/basic2.nim | 4 +- waku/v2/node/waku_node.nim | 537 +++++++++++++++++---------------- 4 files changed, 280 insertions(+), 267 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 986f5f3e45..f1b38acd8d 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -67,7 +67,7 @@ type Chat = ref object type PrivateKey* = crypto.PrivateKey - Topic* = waku_node.Topic + Topic* = waku_node.PubsubTopic ##################### ## chat2 protobufs ## diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index c91f284557..69fd656535 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -51,7 +51,7 @@ type WakuBridge* = ref object of RootObj nodev1*: EthereumNode nodev2*: WakuNode - nodev2PubsubTopic: waku_node.Topic # Pubsub topic to bridge to/from + nodev2PubsubTopic: waku_node.PubsubTopic # Pubsub topic to bridge to/from seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication. rng: ref HmacDrbgContext v1Pool: seq[Node] # Pool of v1 nodes for possible connections @@ -228,7 +228,7 @@ proc new*(T: type WakuBridge, nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](), nameResolver: NameResolver = nil, # Bridge configuration - nodev2PubsubTopic: waku_node.Topic, + nodev2PubsubTopic: waku_node.PubsubTopic, v1Pool: seq[Node] = @[], targetV1Peers = 0): T {.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} = diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 4756567342..50cc522dc6 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -29,8 +29,8 @@ proc runBackground() {.async.} = await node.mountRelay() # Subscribe to a topic - let topic = cast[Topic]("foobar") - proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = + let topic = cast[PubsubTopic]("foobar") + proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = let message = WakuMessage.init(data).value let payload = cast[string](message.payload) info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 59ee67c7f1..93cfc817d4 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -56,8 +56,8 @@ const WakuFilterTimeout: Duration = 1.days # key and crypto modules different type - # XXX: Weird type, should probably be using pubsub Topic object name? - Topic* = string + # XXX: Weird type, should probably be using pubsub PubsubTopic object name? + PubsubTopic* = string Message* = seq[byte] WakuInfo* = object @@ -67,7 +67,7 @@ type #multiaddrStrings*: seq[string] # NOTE based on Eth2Node in NBC eth2_network.nim - WakuNode* = ref object of RootObj + WakuNode* = ref object peerManager*: PeerManager switch*: Switch wakuRelay*: WakuRelay @@ -128,27 +128,25 @@ template wsFlag(wssEnabled: bool): MultiAddress = if wssEnabled: MultiAddress.init("/wss").tryGet() else: MultiAddress.init("/ws").tryGet() -proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, - bindIp: ValidIpAddress, bindPort: Port, - extIp = none(ValidIpAddress), extPort = none(Port), - peerStorage: PeerStorage = nil, - maxConnections = builders.MaxConnections, - wsBindPort: Port = (Port)8000, - wsEnabled: bool = false, - wssEnabled: bool = false, - secureKey: string = "", - secureCert: string = "", - wakuFlags = none(WakuEnrBitfield), - nameResolver: NameResolver = nil, - sendSignedPeerRecord = false, - dns4DomainName = none(string), - discv5UdpPort = none(Port) - ): T - {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = - ## Creates a Waku Node. - ## - ## Status: Implemented. - ## +proc new*(T: type WakuNode, + nodeKey: crypto.PrivateKey, + bindIp: ValidIpAddress, + bindPort: Port, + extIp = none(ValidIpAddress), + extPort = none(Port), + peerStorage: PeerStorage = nil, + maxConnections = builders.MaxConnections, + wsBindPort: Port = (Port)8000, + wsEnabled: bool = false, + wssEnabled: bool = false, + secureKey: string = "", + secureCert: string = "", + wakuFlags = none(WakuEnrBitfield), + nameResolver: NameResolver = nil, + sendSignedPeerRecord = false, + dns4DomainName = none(string), + discv5UdpPort = none(Port)): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = + ## Creates a Waku Node instance. ## Initialize addresses let @@ -176,14 +174,14 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled)) var announcedAddresses: seq[MultiAddress] - if hostExtAddress.isSome: + if hostExtAddress.isSome(): announcedAddresses.add(hostExtAddress.get()) else: announcedAddresses.add(hostAddress) # We always have at least a bind address for the host - if wsExtAddress.isSome: + if wsExtAddress.isSome(): announcedAddresses.add(wsExtAddress.get()) - elif wsHostAddress.isSome: + elif wsHostAddress.isSome(): announcedAddresses.add(wsHostAddress.get()) ## Initialize peer @@ -193,8 +191,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, else: some(bindIp) enrTcpPort = if extPort.isSome(): extPort else: some(bindPort) - enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field - elif wsHostAddress.isSome: @[wsHostAddress.get()] + enrMultiaddrs = if wsExtAddress.isSome(): @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field + elif wsHostAddress.isSome(): @[wsHostAddress.get()] else: @[] enr = initEnr(nodeKey, enrIp, @@ -205,7 +203,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, info "Initializing networking", addrs=announcedAddresses - var switch = newWakuSwitch(some(nodekey), + let switch = newWakuSwitch( + some(nodekey), hostAddress, wsHostAddress, transportFlags = {ServerFlags.ReuseAddr}, @@ -215,7 +214,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, secureKeyPath = secureKey, secureCertPath = secureCert, nameResolver = nameResolver, - sendSignedPeerRecord = sendSignedPeerRecord) + sendSignedPeerRecord = sendSignedPeerRecord + ) let wakuNode = WakuNode( peerManager: PeerManager.new(switch, peerStorage), @@ -228,13 +228,36 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, return wakuNode + proc peerInfo*(node: WakuNode): PeerInfo = node.switch.peerInfo -proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = - if node.wakuRelay.isNil: +# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc +proc info*(node: WakuNode): WakuInfo = + ## Returns information about the Node, such as what multiaddress it can be reached at. + + let peerInfo = node.switch.peerInfo + + var listenStr : seq[string] + for address in node.announcedAddresses: + var fulladdr = $address & "/p2p/" & $peerInfo.peerId + listenStr &= fulladdr + let enrUri = node.enr.toUri() + let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri) + return wakuInfo + +proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = + ## `source` indicates source of node addrs (static config, api call, discovery, etc) + # NOTE This is dialing on WakuRelay protocol specifically + await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source) + + +## Waku relay + +proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]) = + if node.wakuRelay.isNil(): error "Invalid API call to `subscribe`. WakuRelay not mounted." - # @TODO improved error handling + # TODO: improved error handling return info "subscribe", topic=topic @@ -266,55 +289,22 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = debug "Registering default handler", topic=topic wakuRelay.subscribe(topic, defaultHandler) - if handler.isSome: + if handler.isSome(): debug "Registering handler", topic=topic wakuRelay.subscribe(topic, handler.get()) -proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = +proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. ## ## NOTE The data field SHOULD be decoded as a WakuMessage. - ## Status: Implemented. node.subscribe(topic, some(handler)) -proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = - ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - ## FilterHandler is a method that takes a MessagePush. - ## - ## Status: Implemented. - - # Sanity check for well-formed subscribe FilterRequest - doAssert(request.subscribe, "invalid subscribe request") - - info "subscribe content", filter=request - - var id = generateRequestId(node.rng) - - if node.wakuFilter.isNil == false: - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) - - if resSubscription.isOk(): - id = resSubscription.get() - else: - # Failed to subscribe - error "remote subscription to filter failed", filter = request - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - - # Register handler for filter, whether remote subscription succeeded or not - node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) - waku_node_filters.set(node.filters.len.int64) - -proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = +proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) = ## Unsubscribes a handler from a PubSub topic. - ## - ## Status: Implemented. - if node.wakuRelay.isNil: + if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." - # @TODO improved error handling + # TODO: improved error handling return info "unsubscribe", topic=topic @@ -322,14 +312,12 @@ proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = let wakuRelay = node.wakuRelay wakuRelay.unsubscribe(@[(topic, handler)]) -proc unsubscribeAll*(node: WakuNode, topic: Topic) = +proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) = ## Unsubscribes all handlers registered on a specific PubSub topic. - ## - ## Status: Implemented. - if node.wakuRelay.isNil: + if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." - # @TODO improved error handling + # TODO: improved error handling return info "unsubscribeAll", topic=topic @@ -337,27 +325,7 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) = let wakuRelay = node.wakuRelay wakuRelay.unsubscribeAll(topic) - -proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = - ## Unsubscribe from a content filter. - ## - ## Status: Implemented. - - # Sanity check for well-formed unsubscribe FilterRequest - doAssert(request.subscribe == false, "invalid unsubscribe request") - - info "unsubscribe content", filter=request - - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) - node.filters.removeContentFilters(request.contentFilters) - - waku_node_filters.set(node.filters.len.int64) - - -proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = +proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## `contentTopic` field for light node functionality. This field may be also ## be omitted. @@ -372,66 +340,79 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf let data = message.encode().buffer discard await node.wakuRelay.publish(topic, data) -proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = - ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. - ## Returns whether relaying was successful or not. - ## `WakuMessage` should contain a `contentTopic` field for light node - ## functionality. - debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic +proc startRelay*(node: WakuNode) {.async.} = + if node.wakuRelay.isNil(): + trace "Failed to start relay. Not mounted." + return - let rpc = PushRequest(pubSubTopic: topic, message: message) - return await node.wakuLightPush.request(rpc) + ## Setup and start relay protocol + info "starting relay" + + # PubsubTopic subscriptions + for topic in node.wakuRelay.defaultTopics: + node.subscribe(topic, none(TopicHandler)) -proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = - discard await node.lightpush(topic, message) + # Resume previous relay connections + if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): + info "Found previous WakuRelay peers. Reconnecting." + + # Reconnect to previous relay peers. This will respect a backoff period, if necessary + let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) -proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = - ## Queries known nodes for historical messages + await node.peerManager.reconnectPeers(WakuRelayCodec, + protocolMatcher(WakuRelayCodec), + backoffPeriod) + + # Start the WakuRelay protocol + await node.wakuRelay.start() - # TODO: Once waku swap is less experimental, this can simplified - if node.wakuSwap.isNil: - debug "Using default query" - return await node.wakuStore.query(query) - else: - debug "Using SWAP accounting query" - # TODO: wakuSwap now part of wakuStore object - return await node.wakuStore.queryWithAccounting(query) + info "relay started successfully" -proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = - ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online - ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) - ## messages are stored in the the wakuStore's messages field and in the message db - ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message - ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. - ## The history gets fetched successfully if the dialed peer has been online during the queried time window. - if node.wakuStore.isNil(): - return +proc mountRelay*(node: WakuNode, + topics: seq[string] = newSeq[string](), + triggerSelf = true, + peerExchangeHandler = none(RoutingRecordsHandler)) + # TODO: Better error handling: CatchableError is raised by `waitFor` + {.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = - let retrievedMessages = await node.wakuStore.resume(peerList) - if retrievedMessages.isErr(): - error "failed to resume store", error=retrievedMessages.error - return + proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] = + let mh = MultiHash.digest("sha2-256", m.data) + if mh.isOk(): + return ok(mh[].data.buffer) + else: + return ok(($m.data.hash).toBytes()) + + let wakuRelay = WakuRelay.init( + switch = node.switch, + msgIdProvider = msgIdProvider, + triggerSelf = triggerSelf, + sign = false, + verifySignature = false, + maxMessageSize = MaxWakuMessageSize + ) - info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value + info "mounting relay" -# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc -proc info*(node: WakuNode): WakuInfo = - ## Returns information about the Node, such as what multiaddress it can be reached at. - ## - ## Status: Implemented. - ## + ## The default relay topics is the union of + ## all configured topics plus the hard-coded defaultTopic(s) + wakuRelay.defaultTopics = concat(@[defaultTopic], topics) + + ## Add peer exchange handler + if peerExchangeHandler.isSome(): + wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p + wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) + + node.wakuRelay = wakuRelay + if node.started: + # Node has started already. Let's start relay too. + await node.startRelay() + + node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) + + info "relay mounted successfully" - let peerInfo = node.switch.peerInfo - - var listenStr : seq[string] - for address in node.announcedAddresses: - var fulladdr = $address & "/p2p/" & $peerInfo.peerId - listenStr &= fulladdr - let enrUri = node.enr.toUri() - let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri) - return wakuInfo + +## Waku filter proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = info "mounting filter" @@ -454,6 +435,63 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) { node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) +proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = + if node.wakuFilter.isNil(): + error "could not set peer, waku filter is nil" + return + + info "Set filter peer", peer=peer + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + node.wakuFilter.setPeer(remotePeer) + +proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + ## FilterHandler is a method that takes a MessagePush. + + # Sanity check for well-formed subscribe FilterRequest + doAssert(request.subscribe, "invalid subscribe request") + + info "subscribe content", filter=request + + var id = generateRequestId(node.rng) + + if not node.wakuFilter.isNil(): + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + + let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) + if resSubscription.isOk(): + id = resSubscription.get() + else: + # Failed to subscribe + error "remote subscription to filter failed", filter = request + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + + # Register handler for filter, whether remote subscription succeeded or not + node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) + waku_node_filters.set(node.filters.len.int64) + +proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = + ## Unsubscribe from a content filter. + + # Sanity check for well-formed unsubscribe FilterRequest + doAssert(request.subscribe == false, "invalid unsubscribe request") + + info "unsubscribe content", filter=request + + let + pubsubTopic = request.pubsubTopic + contentTopics = request.contentFilters.mapIt(it.contentTopic) + discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) + node.filters.removeContentFilters(request.contentFilters) + + waku_node_filters.set(node.filters.len.int64) + + +## Waku swap # NOTE: If using the swap protocol, it must be mounted before store. This is # because store is using a reference to the swap protocol. @@ -468,6 +506,8 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec)) +## Waku store + const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes proc executeMessageRetentionPolicy*(node: WakuNode) = @@ -517,77 +557,50 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) - -proc startRelay*(node: WakuNode) {.async.} = - if node.wakuRelay.isNil: - trace "Failed to start relay. Not mounted." +proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = + if node.wakuStore.isNil(): + error "could not set peer, waku store is nil" return - ## Setup and start relay protocol - info "starting relay" - - # Topic subscriptions - for topic in node.wakuRelay.defaultTopics: - node.subscribe(topic, none(TopicHandler)) + info "Set store peer", peer=peer - # Resume previous relay connections - if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): - info "Found previous WakuRelay peers. Reconnecting." - - # Reconnect to previous relay peers. This will respect a backoff period, if necessary - let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + node.wakuStore.setPeer(remotePeer) - await node.peerManager.reconnectPeers(WakuRelayCodec, - protocolMatcher(WakuRelayCodec), - backoffPeriod) - - # Start the WakuRelay protocol - await node.wakuRelay.start() - - info "relay started successfully" +proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = + ## Queries known nodes for historical messages -proc mountRelay*(node: WakuNode, - topics: seq[string] = newSeq[string](), - triggerSelf = true, - peerExchangeHandler = none(RoutingRecordsHandler)) - # @TODO: Better error handling: CatchableError is raised by `waitFor` - {.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = + # TODO: Once waku swap is less experimental, this can simplified + if node.wakuSwap.isNil(): + debug "Using default query" + return await node.wakuStore.query(query) + else: + debug "Using SWAP accounting query" + # TODO: wakuSwap now part of wakuStore object + return await node.wakuStore.queryWithAccounting(query) - proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] = - let mh = MultiHash.digest("sha2-256", m.data) - if mh.isOk(): - return ok(mh[].data.buffer) - else: - return ok(($m.data.hash).toBytes()) +proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online + ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) + ## messages are stored in the the wakuStore's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + if node.wakuStore.isNil(): + return - let wakuRelay = WakuRelay.init( - switch = node.switch, - msgIdProvider = msgIdProvider, - triggerSelf = triggerSelf, - sign = false, - verifySignature = false, - maxMessageSize = MaxWakuMessageSize - ) + let retrievedMessages = await node.wakuStore.resume(peerList) + if retrievedMessages.isErr(): + error "failed to resume store", error=retrievedMessages.error + return - info "mounting relay" - - ## The default relay topics is the union of - ## all configured topics plus the hard-coded defaultTopic(s) - wakuRelay.defaultTopics = concat(@[defaultTopic], topics) + info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value - ## Add peer exchange handler - if peerExchangeHandler.isSome(): - wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p - wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) - node.wakuRelay = wakuRelay - if node.started: - # Node has started already. Let's start relay too. - await node.startRelay() - - node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) - - info "relay mounted successfully" +## Waku lightpush proc mountLightPush*(node: WakuNode) {.async.} = info "mounting light push" @@ -612,6 +625,33 @@ proc mountLightPush*(node: WakuNode) {.async.} = node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) +proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = + if node.wakuLightPush.isNil(): + error "could not set peer, waku lightpush is nil" + return + + info "Set lightpush peer", peer=peer + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + node.wakuLightPush.setPeer(remotePeer) + +proc lightpush*(node: WakuNode, topic: PubsubTopic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} = + ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. + ## Returns whether relaying was successful or not. + ## `WakuMessage` should contain a `contentTopic` field for light node + ## functionality. + debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic + + let rpc = PushRequest(pubSubTopic: topic, message: message) + return await node.wakuLightPush.request(rpc) + +proc lightpush2*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = + discard await node.lightpush(topic, message) + + +## Waku peer-exchange + proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting waku peer exchange" @@ -626,6 +666,20 @@ proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) +proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = + if node.wakuPeerExchange.isNil(): + error "could not set peer, waku peer-exchange is nil" + return + + info "Set peer-exchange peer", peer=peer + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + node.wakuPeerExchange.setPeer(remotePeer) + + +## Other protocols + proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting libp2p ping protocol" @@ -656,8 +710,8 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = for peer in peers: let connOpt = await node.peerManager.dialPeer(peer, PingCodec) - if connOpt.isNone: - # @TODO more sophisticated error handling here + if connOpt.isNone(): + # TODO: more sophisticated error handling here debug "failed to connect to remote peer", peer=peer waku_node_errors.inc(labelValues = ["keep_alive_failure"]) return @@ -673,46 +727,6 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) -proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuStore.setPeer(peer) - -proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set store peer", address = address - - let peer = parseRemotePeerInfo(address) - n.setStorePeer(peer) - -proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuFilter.setPeer(peer) - -proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set filter peer", address = address - - let peer = parseRemotePeerInfo(address) - n.setFilterPeer(peer) - -proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) = - n.wakuLightPush.setPeer(peer) - -proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set lightpush peer", address = address - - let peer = parseRemotePeerInfo(address) - n.wakuLightPush.setPeer(peer) - -proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = - info "Set peer exchange peer", address = address - - let remotePeer = parseRemotePeerInfo(address) - - n.wakuPeerExchange.setPeer(remotePeer) - -proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - # NOTE This is dialing on WakuRelay protocol specifically - await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source) - - proc runDiscv5Loop(node: WakuNode) {.async.} = ## Continuously add newly discovered nodes ## using Node Discovery v5 @@ -725,9 +739,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = while node.wakuDiscv5.listening: trace "Running discovery loop" ## Query for a random target and collect all discovered nodes - ## @TODO: we could filter nodes here + ## TODO: we could filter nodes here let discoveredPeers = await node.wakuDiscv5.findRandomPeers() - if discoveredPeers.isOk: + if discoveredPeers.isOk(): ## Let's attempt to connect to peers we ## have not encountered before @@ -738,7 +752,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = if newPeers.len > 0: debug "Connecting to newly discovered peers", count=newPeers.len() - await connectToNodes(node, newPeers, "discv5") + await node.connectToNodes(newPeers, "discv5") # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. @@ -751,7 +765,7 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = info "Starting discovery v5 service" - if not node.wakuDiscv5.isNil: + if not node.wakuDiscv5.isNil(): ## First start listening on configured port try: trace "Start listening on discv5 port" @@ -776,7 +790,7 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = ## Stop Discovery v5 service - if not node.wakuDiscv5.isNil: + if not node.wakuDiscv5.isNil(): info "Stopping discovery v5 service" ## Stop Discovery v5 process and close listening port @@ -786,11 +800,10 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = debug "Successfully stopped discovery v5 service" + proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and ## all its mounted protocols. - ## - ## Status: Implemented. waku_version.set(1, labelValues=[git_version]) @@ -810,7 +823,7 @@ proc start*(node: WakuNode) {.async.} = info "DNS: discoverable ENR ", enr = node.enr.toUri() # Perform relay-specific startup tasks TODO: this should be rethought - if not node.wakuRelay.isNil: + if not node.wakuRelay.isNil(): await node.startRelay() ## Update switch peer info with announced addrs @@ -821,10 +834,10 @@ proc start*(node: WakuNode) {.async.} = info "Node started successfully" proc stop*(node: WakuNode) {.async.} = - if not node.wakuRelay.isNil: + if not node.wakuRelay.isNil(): await node.wakuRelay.stop() - if not node.wakuDiscv5.isNil: + if not node.wakuDiscv5.isNil(): discard await node.stopDiscv5() await node.switch.stop()