Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(networkmonitor): add metric listing content topics + messages #1335

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ Monitoring tool to run in an existing `waku` network with the following features
* Keeps discovering new peers using `discv5`
* Tracks advertised capabilities of each node as per stored in the ENR `waku` field
* Attempts to connect to all nodes, tracking which protocols each node supports
* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, etc.
* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer.
* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, content topics and the amount of rx messages in each one.
* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer. These metrics are exposed via a rest api.

### Usage

Expand All @@ -79,14 +79,19 @@ The following options are available:

### Example

Connect to the network through a given bootstrap node, with default parameters. Once its running, metrics will be live at `localhost:8008/metrics`
Connect to the network through a given bootstrap node, with default parameters. See metrics section for the data that it exposes.

```console
./build/networkmonitor --log-level=INFO --b="enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"
```

### Metrics

### metrics
Metrics are divided into two categories:
* Prometheus metrics, exposed as i.e. gauges.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Prometheus metrics, exposed as i.e. gauges.
* Prometheus metrics, exposed as e.g. gauges.

perhaps? :)

* Custom metrics, used for unconstrained labels such as peer information or content topics. These metrics are not exposed through prometheus because since they are unconstrained, they can end up breaking the backend, as a new datapoint is generated for each one and it can reach up a point where is too much to handle.

#### Prometheus Metrics

The following metrics are available. See `http://localhost:8008/metrics`

Expand All @@ -98,6 +103,8 @@ Other relevant metrics reused from `nim-eth`:
* routing_table_nodes: Inherited from nim-eth, number of nodes in the routing table
* discovery_message_requests_outgoing_total: Inherited from nim-eth, number of outging discovery requests, useful to know if the node is actiely looking for new peers

The following metrics are exposed via a custom rest api. See `http://localhost:8009/allpeersinfo`
#### Custom Metrics

* json list of all peers with extra information such as ip, locatio, supported protocols and last connection time.
The following endpoints are available:
* `http://localhost:8009/allpeersinfo`: json list of all peers with extra information such as ip, location, supported protocols and last connection time.
* `http://localhost:8009/contenttopics`: content topic messages and its message count.
64 changes: 54 additions & 10 deletions tools/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/waku_node,
../../waku/v2/utils/wakuenr,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/peers,
./networkmonitor_metrics,
./networkmonitor_config,
Expand Down Expand Up @@ -94,6 +95,7 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
continue

# try to connect to the peer
# TODO: check last connection time and if not > x, skip connecting
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
if timedOut:
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
Expand Down Expand Up @@ -146,7 +148,7 @@ proc crawlNetwork(node: WakuNode,
conf: NetworkMonitorConf,
allPeersRef: CustomPeersTableRef) {.async.} =

let crawlInterval = conf.refreshInterval * 1000 * 60
let crawlInterval = conf.refreshInterval * 1000
let client = newHttpClient()
while true:
# discover new random nodes
Expand Down Expand Up @@ -201,21 +203,53 @@ proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
error("could not start node")

proc startRestApiServer(conf: NetworkMonitorConf,
allPeersRef: CustomPeersTableRef): Result[void, string] =
allPeersInfo: CustomPeersTableRef,
numMessagesPerContentTopic: ContentTopicMessageTableRef
): Result[void, string] =
try:
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
proc validate(pattern: string, value: string): int =
if pattern.startsWith("{") and pattern.endsWith("}"): 0
else: 1
var router = RestRouter.init(validate)
router.installHandler(allPeersRef)
router.installHandler(allPeersInfo, numMessagesPerContentTopic)
var sres = RestServerRef.new(router, serverAddress)
let restServer = sres.get()
restServer.start()
except:
error("could not start rest api server")
ok()

# handles rx of messages over a topic (see subscribe)
# counts the number of messages per content topic
proc subscribeAndHandleMessages(node: WakuNode,
pubsubTopic: PubsubTopic,
msgPerContentTopic: ContentTopicMessageTableRef) =

# handle function
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let messageRes = WakuMessage.decode(data)
if messageRes.isErr():
warn "could not decode message", data=data, pubsubTopic=pubsubTopic

let message = messageRes.get()
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic

# If we reach a table limit size, remove c topics with the least messages.
let tableSize = 100
if msgPerContentTopic.len > (tableSize - 1):
let minIndex = toSeq(msgPerContentTopic.values()).minIndex()
msgPerContentTopic.del(toSeq(msgPerContentTopic.keys())[minIndex])

# TODO: Will overflow at some point
# +1 if content topic existed, init to 1 otherwise
if msgPerContentTopic.hasKey(message.contentTopic):
msgPerContentTopic[message.contentTopic] += 1
else:
msgPerContentTopic[message.contentTopic] = 1

node.subscribe(pubsubTopic, handler)

when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
{.pop.}
Expand All @@ -231,28 +265,38 @@ when isMainModule:
setLogLevel(conf.logLevel)

# list of peers that we have discovered/connected
var allPeersRef = CustomPeersTableRef()
var allPeersInfo = CustomPeersTableRef()

# content topic and the number of messages that were received
var msgPerContentTopic = ContentTopicMessageTableRef()

# start metrics server
if conf.metricsServer:
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
if res.isErr:
if res.isErr():
error "could not start metrics server", err=res.error
quit(1)

# start rest server for custom metrics
let res = startRestApiServer(conf, allPeersRef)
if res.isErr:
let res = startRestApiServer(conf, allPeersInfo, msgPerContentTopic)
if res.isErr():
error "could not start rest api server", err=res.error

# start waku node
let node = initAndStartNode(conf)
if node.isErr:
let nodeRes = initAndStartNode(conf)
if nodeRes.isErr():
error "could not start node"
quit 1

let node = nodeRes.get()

waitFor node.mountRelay()

# Subscribe the node to the default pubsubtopic, to count messages
subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic)

# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)
asyncSpawn crawlNetwork(node.get(), conf, allPeersRef)
asyncSpawn crawlNetwork(node, conf, allPeersInfo)

runForever()
4 changes: 2 additions & 2 deletions tools/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type
abbr: "b" }: seq[string]

refreshInterval* {.
desc: "How often new peers are discovered and connected to (in minutes)",
defaultValue: 10,
desc: "How often new peers are discovered and connected to (in seconds)",
defaultValue: 5,
name: "refresh-interval",
abbr: "r" }: int

Expand Down
12 changes: 10 additions & 2 deletions tools/networkmonitor/networkmonitor_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,21 @@ type
supportedProtocols*: seq[string]
userAgent*: string

# Stores information about all discovered/connected peers
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]

# GET /allpeersinfo
proc installHandler*(router: var RestRouter, allPeers: CustomPeersTableRef) =
# stores the content topic and the count of rx messages
ContentTopicMessageTableRef* = TableRef[string, int]

proc installHandler*(router: var RestRouter,
allPeers: CustomPeersTableRef,
numMessagesPerContentTopic: ContentTopicMessageTableRef) =
router.api(MethodGet, "/allpeersinfo") do () -> RestApiResponse:
let values = toSeq(allPeers.values())
return RestApiResponse.response(values.toJson(), contentType="application/json")
router.api(MethodGet, "/contenttopics") do () -> RestApiResponse:
# TODO: toJson() includes the hash
return RestApiResponse.response($(%numMessagesPerContentTopic), contentType="application/json")

proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port): Result[void, string] =
info "Starting metrics HTTP server", serverIp, serverPort
Expand Down