diff --git a/Makefile b/Makefile index bf6f9b3861..4573a9035a 100644 --- a/Makefile +++ b/Makefile @@ -342,6 +342,33 @@ docker-image: docker-push: docker push $(DOCKER_IMAGE_NAME) +#################################### +## Container lite-protocol-tester ## +#################################### +# -d:insecure - Necessary to enable Prometheus HTTP endpoint for metrics +# -d:chronicles_colors:none - Necessary to disable colors in logs for Docker +DOCKER_LPT_NIMFLAGS ?= -d:chronicles_colors:none -d:insecure + +# build a docker image for the fleet +docker-liteprotocoltester: DOCKER_LPT_TAG ?= latest +docker-liteprotocoltester: DOCKER_LPT_NAME ?= wakuorg/liteprotocoltester:$(DOCKER_LPT_TAG) +docker-liteprotocoltester: + docker build \ + --no-cache \ + --build-arg="MAKE_TARGET=liteprotocoltester" \ + --build-arg="NIMFLAGS=$(DOCKER_LPT_NIMFLAGS)" \ + --build-arg="NIM_COMMIT=$(DOCKER_NIM_COMMIT)" \ + --build-arg="LOG_LEVEL=TRACE" \ + --label="commit=$(shell git rev-parse HEAD)" \ + --label="version=$(GIT_VERSION)" \ + --target $(TARGET) \ + --tag $(DOCKER_LPT_NAME) \ + --file apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile \ + . + +docker-liteprotocoltester-push: + docker push $(DOCKER_LPT_NAME) + ################ ## C Bindings ## diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env index 0d0d0ef6c4..b2f08188b4 100644 --- a/apps/liteprotocoltester/.env +++ b/apps/liteprotocoltester/.env @@ -17,11 +17,11 @@ MAX_MESSAGE_SIZE=145Kb #CLUSTER_ID=66 ## for status.prod -#PUBSUB=/waku/2/rs/16/32 -#CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet -#CLUSTER_ID=16 +PUBSUB=/waku/2/rs/16/32 +CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet +CLUSTER_ID=16 ## for TWN -PUBSUB=/waku/2/rs/1/4 -CONTENT_TOPIC=/tester/2/light-pubsub-test/twn -CLUSTER_ID=1 +#PUBSUB=/waku/2/rs/1/4 +#CONTENT_TOPIC=/tester/2/light-pubsub-test/twn +#CLUSTER_ID=1 diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile index 6e3184c9ac..4c64265dd2 100644 --- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile +++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile @@ -1,58 +1,57 @@ # BUILD NIM APP ---------------------------------------------------------------- - FROM rust:1.77.1-alpine3.18 AS nim-build +FROM rust:1.77.1-alpine3.18 AS nim-build - ARG NIMFLAGS - ARG MAKE_TARGET=liteprotocoltester - ARG NIM_COMMIT - ARG LOG_LEVEL=DEBUG +ARG NIMFLAGS +ARG MAKE_TARGET=liteprotocoltester +ARG NIM_COMMIT +ARG LOG_LEVEL=TRACE - # Get build tools and required header files - RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq +# Get build tools and required header files +RUN apk add --no-cache bash git build-base openssl-dev pcre-dev linux-headers curl jq - WORKDIR /app - COPY . . +WORKDIR /app +COPY . . - # workaround for alpine issue: https://github.com/alpinelinux/docker-alpine/issues/383 - RUN apk update && apk upgrade +# workaround for alpine issue: https://github.com/alpinelinux/docker-alpine/issues/383 +RUN apk update && apk upgrade - # Ran separately from 'make' to avoid re-doing - RUN git submodule update --init --recursive +# Ran separately from 'make' to avoid re-doing +RUN git submodule update --init --recursive - # Slowest build step for the sake of caching layers - RUN make -j$(nproc) deps QUICK_AND_DIRTY_COMPILER=1 ${NIM_COMMIT} +# Slowest build step for the sake of caching layers +RUN make -j$(nproc) deps QUICK_AND_DIRTY_COMPILER=1 ${NIM_COMMIT} - # Build the final node binary - RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="${NIMFLAGS}" +# Build the final node binary +RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="${NIMFLAGS}" - # PRODUCTION IMAGE ------------------------------------------------------------- +# PRODUCTION IMAGE ------------------------------------------------------------- - FROM alpine:3.18 AS prod +FROM alpine:3.18 AS prod - ARG MAKE_TARGET=liteprotocoltester +ARG MAKE_TARGET=liteprotocoltester - LABEL maintainer="jakub@status.im" - LABEL source="https://github.com/waku-org/nwaku" - LABEL description="Lite Protocol Tester: Waku light-client" - LABEL commit="unknown" - LABEL version="unknown" +LABEL maintainer="zoltan@status.im" +LABEL source="https://github.com/waku-org/nwaku" +LABEL description="Lite Protocol Tester: Waku light-client" +LABEL commit="unknown" +LABEL version="unknown" - # DevP2P, LibP2P, and JSON RPC ports - EXPOSE 30303 60000 8545 +# DevP2P, LibP2P, and JSON RPC ports +EXPOSE 30303 60000 8545 - # Referenced in the binary - RUN apk add --no-cache libgcc pcre-dev libpq-dev +# Referenced in the binary +RUN apk add --no-cache libgcc pcre-dev libpq-dev \ + wget \ + iproute2 - # Fix for 'Error loading shared library libpcre.so.3: No such file or directory' - RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 +# Fix for 'Error loading shared library libpcre.so.3: No such file or directory' +RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3 - # Copy to separate location to accomodate different MAKE_TARGET values - COPY --from=nim-build /app/build/$MAKE_TARGET /usr/bin/ +COPY --from=nim-build /app/build/liteprotocoltester /usr/bin/ +COPY --from=nim-build /app/apps/liteprotocoltester/run_tester_node.sh /usr/bin/ - # Copy migration scripts for DB upgrades - COPY --from=nim-build /app/migrations/ /app/migrations/ +ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"] - ENTRYPOINT ["/usr/bin/liteprotocoltester"] - - # By default just show help if called without arguments - CMD ["--help"] +# # By default just show help if called without arguments +CMD ["--help"] diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md index 1e7d45f5cf..1fad6eb750 100644 --- a/apps/liteprotocoltester/README.md +++ b/apps/liteprotocoltester/README.md @@ -17,43 +17,28 @@ and multiple receivers. Publishers are fill all message payloads with information about the test message and sender, helping the receiver side to calculate results. -## Phases of development - -### Phase 1 - -At the first phase we aims to demonstrate the concept of the testing all boundled into a docker-compose environment where we run -one service (full)node and a publisher and a receiver node. -At this stage we can only configure number of messages and fixed frequency of the message pump. We do not expect message losses and any significant latency hence the test setup is very simple. - -### Further plans - -- Add more configurability (randomized message sizes, usage of more content topics and support for static sharding). -- Extend collected metrics and polish reporting. - - Add test metrics to graphana dashboard. -- Support for static sharding and auto sharding for being able to test under different conditions. -- ... - ## Usage -### Phase 1 - -> NOTICE: This part is obsolate due integration with waku-simulator. -> It needs some rework to make it work again standalone. +### Using lpt-runner -Lite Protocol Tester application is built under name `liteprotocoltester` in apps/liteprotocoltester folder. +For ease of use, you can clone lpt-runner repository. That will utilize previously pushed liteprotocoltester docker image. +It is recommended to use this method for fleet testing. -Starting from nwaku repository root: ```bash -make liteprotocoltester -cd apps/liteprotocoltester -docker compose build +git clone https://github.com/waku-org/lpt-runner.git +cd lpt-runner + +# check Reame.md for more information +# edit .env file to your needs + docker compose up -d -docker compose logs -f receivernode + +# navigate localhost:3033 to see the lite-protocol-tester dashboard ``` -### Phase 2 +> See more detailed examples below. -> Integration with waku-simulator! +### Integration with waku-simulator! - For convenience, integration is done in cooperation with waku-simulator repository, but nothing is tightly coupled. - waku-simulator must be started separately with its own configuration. @@ -100,9 +85,7 @@ docker compose -f docker-compose-on-simularor.yml logs -f receivernode Navigate to http://localhost:3033 to see the lite-protocol-tester dashboard. -### Phase 3 - -> Run independently on a chosen waku fleet +### Run independently on a chosen waku fleet This option is simple as is just to run the built liteprotocoltester binary with run_tester_node.sh script. @@ -136,7 +119,7 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin > RECEIVER side will periodically print statistics to standard output. -## Configure +## Configuration ### Environment variables for docker compose runs @@ -158,6 +141,7 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin | :--- | :--- | :--- | | --test_func | separation of PUBLISHER or RECEIVER mode | RECEIVER | | --service-node| Address of the service node to use for lightpush and/or filter service | - | +| --bootstrap-node| Address of the fleet's bootstrap node to use to determine service peer randomly choosen from the network. `--service-node` switch has precedence over this | - | | --num-messages | Number of message to publish | 120 | | --delay-messages | Frequency of messages in milliseconds | 1000 | | --min-message-size | Minimum message size in bytes | 1KiB | @@ -173,8 +157,17 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin | --rest-allow-origin | For convenience rest configuration can be done here | * | | --log-level | Log level for the application | DEBUG | | --log-format | Logging output format (TEXT or JSON) | TEXT | +| --metrics-port | Metrics scarpe port | 8003 | +### Specifying peer addresses +Service node or bootstrap addresses can be specified in multiadress or ENR form. + +### Using bootstrap nodes + +There are multiple benefits of using bootstrap nodes. By using them liteprotocoltester will use Peer Exchange protocol to get possible peers from the network that are capable to serve as service peers for testing. Additionally it will test dial them to verify their connectivity - this will be reported in the logs and on dashboard metrics. +Also by using bootstrap node and peer exchange discovery, litprotocoltester will be able to simulate service peer switch in case of failures. There are built in tresholds count for service peer failures (3) after service peer will be switched during the test. Also there will be max 10 trials of switching peer before test declared failed and quit. +These service peer failures are reported, thus extending network reliability measures. ### Docker image notice @@ -182,13 +175,13 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin Please note that currently to ease testing and development tester application docker image is based on ubuntu and uses the externally pre-built binary of 'liteprotocoltester'. This speeds up image creation. Another dokcer build file is provided for proper build of boundle image. -> `Dockerfile.liteprotocoltester.copy` will create an image with the binary copied from the build directory. +> `Dockerfile.liteprotocoltester` will create an ubuntu based image with the binary copied from the build directory. -> `Dockerfile.liteprotocoltester.compile` will create an image completely compiled from source. This can be quite slow. +> `Dockerfile.liteprotocoltester.compile` will create an ubuntu based image completely compiled from source. This can be slow. #### Creating standalone runner docker image -To ease the work with lite-proto-tester, a docker image is possible to build. +To ease the work with lite-protocol-tester, a docker image is possible to build. With that image it is easy to run the application in a container. > `Dockerfile.liteprotocoltester` will create an ubuntu image with the binary copied from the build directory. You need to pre-build the application. @@ -205,7 +198,104 @@ docker build -t liteprotocoltester:latest -f Dockerfile.liteprotocoltester ../.. # edit and adjust .env file to your needs and for the network configuration -docker run --env-file .env liteprotocoltester:latest RECEIVER +docker run --env-file .env liteprotocoltester:latest RECEIVER + +docker run --env-file .env liteprotocoltester:latest SENDER +``` + +#### Run test with auto service peer selection from a fleet using bootstrap node + +```bash + +docker run --env-file .env liteprotocoltester:latest RECEIVER BOOTSTRAP + +docker run --env-file .env liteprotocoltester:latest SENDER BOOTSTRAP +``` + +> Notice that official image is also available at harbor.status.im/wakuorg/liteprotocoltester:latest + +## Examples + +### Bootstrap or Service node selection + +The easiest way to get the proper bootstrap nodes for the tests from https://fleets.status.im page. +Adjust on which fleets you would like to run the tests. + +> Please note that not all of them configured to support Peer Exchange protocol, those ones cannot be for bootstrap nodes for `liteprotocoltester`. + +### Environment variables +You need not necessary to use .env file, although it can be more convenient. +Anytime you can override all or part of the environment variables defined in the .env file. + +### Run standalone + +Example of running the liteprotocoltester in standalone mode on status.stagin network. +Testing includes using bootstrap nodes to gather service peers from the network via Peer Exchange protocol. +Both parties will test-dial all the peers retrieved with the corresponding protocol. +Sender will start publishing messages after 60 seconds, sending 200 messages with 1 second delay between them. +Message size will be between 15KiB and 145KiB. +Cluster id and Pubsub-topic must be accurately set according to the network configuration. + +The example shows that either multiaddress or ENR form accepted. + +```bash +export START_PUBLISHING_AFTER=60 +export NUM_MESSAGES=200 +export DELAY_MESSAGES=1000 +export MIN_MESSAGE_SIZE=15Kb +export MAX_MESSAGE_SIZE=145Kb +export PUBSUB=/waku/2/rs/16/32 +export CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet +export CLUSTER_ID=16 + +docker run harbor.status.im/wakuorg/liteprotocoltester:latest RECEIVER /dns4/boot-01.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAmQE7FXQc6iZHdBzYfw3qCSDa9dLc1wsBJKoP4aZvztq2d BOOTSTRAP + +# in different terminal session, repeat the exports and run the other party of the test. +docker run harbor.status.im/wakuorg/liteprotocoltester:latest SENDER enr:-QEiuECJPv2vL00Jp5sTEMAFyW7qXkK2cFgphlU_G8-FJuJqoW_D5aWIy3ylGdv2K8DkiG7PWgng4Ql_VI7Qc2RhBdwfAYJpZIJ2NIJpcIQvTKi6im11bHRpYWRkcnO4cgA2NjFib290LTAxLmFjLWNuLWhvbmdrb25nLWMuc3RhdHVzLnN0YWdpbmcuc3RhdHVzLmltBnZfADg2MWJvb3QtMDEuYWMtY24taG9uZ2tvbmctYy5zdGF0dXMuc3RhZ2luZy5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEDkbgV7oqPNmFtX5FzSPi9WH8kkmrPB1R3n9xRXge91M-DdGNwgnZfg3VkcIIjKIV3YWt1Mg0 BOOTSTRAP -docker run --env-file .env liteprotocoltester:latest SENDER ``` + +### Use of lpt-runner + +Another method is to use [lpt-runner repository](https://github.com/waku-org/lpt-runner/tree/master). +This extends testing with grafana dashboard and ease the test setup. +Please read the corresponding [README](https://github.com/waku-org/lpt-runner/blob/master/README.md) there as well. + +In this example we will run similar test as above but there will be 3 instances of publisher nodes and 1 receiver node. +This test uses waku.sandbox fleet which is connected to TWN. This implies lower message rates due to the RLN rate limation. +Also leave a gap of 120 seconds before starting to publish messages to let receiver side fully finish peer test-dialing. +For TWN network it is always wise to use bootstrap nodes with Peer Exchange support. + +> Theoritically we can use the same bootstrap nodes for both parties, but it is recommended to use different ones to simulate different network edges, thus getting more meaningful results. + +```bash +git clone https://github.com/waku-org/lpt-runner.git +cd lpt-runner + +export NUM_PUBLISHER_NODES=3 +export NUM_RECEIVER_NODES=1 +export START_PUBLISHING_AFTER=120 +export NUM_MESSAGES=300 +export DELAY_MESSAGES=7000 +export MIN_MESSAGE_SIZE=15Kb +export MAX_MESSAGE_SIZE=145Kb +export PUBSUB=/waku/2/rs/1/4 +export CONTENT_TOPIC=/tester/2/light-pubsub-test/twn +export CLUSTER_ID=1 + +export FILTER_BOOTSTRAP=/dns4/node-01.ac-cn-hongkong-c.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmQYiojgZ8APsh9wqbWNyCstVhnp9gbeNrxSEQnLJchC92 +export LIGHTPUSH_BOOTSTRAP=/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb + +docker compose up -d + +# we can check logs from one or all SENDER +docker compose logs -f --index 1 publishernode + +# for checking receiver side performance +docker compose logs -f receivernode + +# when test completed +docker compose down +``` + +For dashboard navigate to http://localhost:3033 diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim index 66718be7c7..9f255764fb 100644 --- a/apps/liteprotocoltester/diagnose_connections.nim +++ b/apps/liteprotocoltester/diagnose_connections.nim @@ -4,93 +4,74 @@ else: {.push raises: [].} import - std/[options, strutils, os, sequtils, net, strformat], + std/[options, net, strformat], chronicles, chronos, metrics, libbacktrace, - system/ansi_c, libp2p/crypto/crypto, confutils, libp2p/wire import - ../../waku/common/logging, - ../../waku/factory/waku, - ../../waku/factory/external_config, - ../../waku/node/health_monitor, - ../../waku/node/waku_metrics, - ../../waku/waku_api/rest/builder as rest_server_builder, - ../../waku/node/peer_manager, - ../../waku/waku_lightpush/common, - ../../waku/waku_relay, - ../../waku/waku_filter_v2, - ../../waku/waku_api/rest/client, - ../../waku/waku_api/rest/admin/client, - ./tester_config, - ./lightpush_publisher, - ./filter_subscriber - + waku/[ + factory/external_config, + node/peer_manager, + waku_lightpush/common, + waku_relay, + waku_filter_v2, + waku_peer_exchange/protocol, + waku_core/multiaddrstr, + waku_enr/capabilities, + ] logScope: topics = "diagnose connections" -proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} = - trace "Starting logSelfPeersLoop diagnosis loop" - while true: - let selfLighpushPeers = pm.wakuPeerStore.getPeersByProtocol(WakuLightPushCodec) - let selfRelayPeers = pm.wakuPeerStore.getPeersByProtocol(WakuRelayCodec) - let selfFilterPeers = pm.wakuPeerStore.getPeersByProtocol(WakuFilterSubscribeCodec) +proc `$`*(cap: Capabilities): string = + case cap + of Capabilities.Relay: + return "Relay" + of Capabilities.Store: + return "Store" + of Capabilities.Filter: + return "Filter" + of Capabilities.Lightpush: + return "Lightpush" + of Capabilities.Sync: + return "Sync" + +proc allPeers(pm: PeerManager): string = + var allStr: string = "" + for idx, peer in pm.wakuPeerStore.peers(): + allStr.add( + " " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " & + $peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n" + ) + return allStr - let printable = catch: - """*------------------------------------------------------------------------------------------* -| Self ({pm.switch.peerInfo}) peers: +proc logSelfPeers*(pm: PeerManager) = + let selfLighpushPeers = pm.wakuPeerStore.getPeersByProtocol(WakuLightPushCodec) + let selfRelayPeers = pm.wakuPeerStore.getPeersByProtocol(WakuRelayCodec) + let selfFilterPeers = pm.wakuPeerStore.getPeersByProtocol(WakuFilterSubscribeCodec) + let selfPxPeers = pm.wakuPeerStore.getPeersByProtocol(WakuPeerExchangeCodec) + + let printable = catch: + """*------------------------------------------------------------------------------------------* +| Self ({constructMultiaddrStr(pm.switch.peerInfo)}) peers: *------------------------------------------------------------------------------------------* | Lightpush peers({selfLighpushPeers.len()}): ${selfLighpushPeers} *------------------------------------------------------------------------------------------* | Filter peers({selfFilterPeers.len()}): ${selfFilterPeers} *------------------------------------------------------------------------------------------* | Relay peers({selfRelayPeers.len()}): ${selfRelayPeers} +*------------------------------------------------------------------------------------------* +| PX peers({selfPxPeers.len()}): ${selfPxPeers} +*------------------------------------------------------------------------------------------* +| All peers with protocol support: +{allPeers(pm)} *------------------------------------------------------------------------------------------*""".fmt() - if printable.isErr(): - echo "Error while printing statistics: " & printable.error().msg - else: - echo printable.get() - - await sleepAsync(interval) - -proc logServiceRelayPeers( - pm: PeerManager, codec: string, interval: Duration -) {.async.} = - trace "Starting service node connectivity diagnosys loop" - while true: - echo "*------------------------------------------------------------------------------------------*" - echo "| Service peer connectivity:" - let selfLighpushPeers = pm.selectPeer(codec) - if selfLighpushPeers.isSome(): - let ma = selfLighpushPeers.get().addrs[0] - var serviceIp = initTAddress(ma).valueOr: - echo "Error while parsing multiaddress: " & $error - continue - - serviceIp.port = Port(8645) - let restClient = newRestHttpClient(initTAddress($serviceIp)) - - let getPeersRes = await restClient.getPeers() - - if getPeersRes.status == 200: - let nrOfPeers = getPeersRes.data.len() - echo "Service node (@" & $ma & ") peers: " & $getPeersRes.data - else: - echo "Error while fetching service node (@" & $ma & ") peers: " & - $getPeersRes.data - else: - echo "No service node peers found" - - echo "*------------------------------------------------------------------------------------------*" - - await sleepAsync(interval) - -proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} = - asyncSpawn logSelfPeersLoop(pm, chronos.seconds(60)) - # asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20)) + if printable.isErr(): + echo "Error while printing statistics: " & printable.error().msg + else: + echo printable.get() diff --git a/apps/liteprotocoltester/docker-compose.yml b/apps/liteprotocoltester/docker-compose.yml index 32f67fea1a..9f3bd380e5 100644 --- a/apps/liteprotocoltester/docker-compose.yml +++ b/apps/liteprotocoltester/docker-compose.yml @@ -23,6 +23,9 @@ x-test-running-conditions: &test_running_conditions MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb} START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds STANDALONE: ${STANDALONE:-1} + RECEIVER_METRICS_PORT: 8003 + PUBLISHER_METRICS_PORT: 8003 + # Services definitions services: diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim index dca8eb880b..fa8c38a4c5 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/filter_subscriber.nim @@ -10,23 +10,33 @@ import stew/byteutils, results, serialization, - json_serialization as js, - times + json_serialization as js + import - waku/[common/logging, node/peer_manager, waku_node, waku_core, waku_filter_v2/client], + waku/[ + common/logging, + node/peer_manager, + waku_node, + waku_core, + waku_filter_v2/client, + waku_filter_v2/common, + waku_core/multiaddrstr, + ], ./tester_config, ./tester_message, - ./statistics + ./statistics, + ./diagnose_connections, + ./service_peer_management, + ./lpt_metrics + +var actualFilterPeer {.threadvar.}: RemotePeerInfo proc unsubscribe( - wakuNode: WakuNode, - filterPeer: RemotePeerInfo, - filterPubsubTopic: PubsubTopic, - filterContentTopic: ContentTopic, + wakuNode: WakuNode, filterPubsubTopic: PubsubTopic, filterContentTopic: ContentTopic ) {.async.} = notice "unsubscribing from filter" let unsubscribeRes = await wakuNode.wakuFilterClient.unsubscribe( - filterPeer, filterPubsubTopic, @[filterContentTopic] + actualFilterPeer, filterPubsubTopic, @[filterContentTopic] ) if unsubscribeRes.isErr: notice "unsubscribe request failed", err = unsubscribeRes.error @@ -34,47 +44,86 @@ proc unsubscribe( notice "unsubscribe request successful" proc maintainSubscription( - wakuNode: WakuNode, - filterPeer: RemotePeerInfo, - filterPubsubTopic: PubsubTopic, - filterContentTopic: ContentTopic, + wakuNode: WakuNode, filterPubsubTopic: PubsubTopic, filterContentTopic: ContentTopic ) {.async.} = + const maxFailedSubscribes = 3 + const maxFailedServiceNodeSwitches = 10 + var noFailedSubscribes = 0 + var noFailedServiceNodeSwitches = 0 + var isFirstPingOnNewPeer = true while true: - trace "maintaining subscription" + info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer) # First use filter-ping to check if we have an active subscription - let pingRes = await wakuNode.wakuFilterClient.ping(filterPeer) + let pingRes = await wakuNode.wakuFilterClient.ping(actualFilterPeer) if pingRes.isErr(): + if isFirstPingOnNewPeer == false: + # Very first ping expected to fail as we have not yet subscribed at all + lpt_receiver_lost_subscription_count.inc() + isFirstPingOnNewPeer = false # No subscription found. Let's subscribe. error "ping failed.", err = pingRes.error trace "no subscription found. Sending subscribe request" let subscribeRes = await wakuNode.filterSubscribe( - some(filterPubsubTopic), filterContentTopic, filterPeer + some(filterPubsubTopic), filterContentTopic, actualFilterPeer ) if subscribeRes.isErr(): - error "subscribe request failed. Quitting.", err = subscribeRes.error - break + noFailedSubscribes += 1 + lpt_service_peer_failure_count.inc(labelValues = ["receiver"]) + error "Subscribe request failed.", + err = subscribeRes.error, + peer = actualFilterPeer, + failCount = noFailedSubscribes + + # TODO: disconnet from failed actualFilterPeer + # asyncSpawn(wakuNode.peerManager.switch.disconnect(p)) + # wakunode.peerManager.peerStore.delete(actualFilterPeer) + + if noFailedSubscribes < maxFailedSubscribes: + await sleepAsync(2.seconds) # Wait a bit before retrying + continue + else: + let peerOpt = selectRandomServicePeer( + wakuNode.peerManager, some(actualFilterPeer), WakuFilterSubscribeCodec + ) + if peerOpt.isOk(): + actualFilterPeer = peerOpt.get() + + info "Found new peer for codec", + codec = filterPubsubTopic, peer = constructMultiaddrStr(actualFilterPeer) + + noFailedSubscribes = 0 + lpt_change_service_peer_count.inc(labelValues = ["receiver"]) + isFirstPingOnNewPeer = true + continue # try again with new peer without delay + else: + error "Failed to find new service peer. Exiting." + noFailedServiceNodeSwitches += 1 + break else: + if noFailedSubscribes > 0: + noFailedSubscribes -= 1 + notice "subscribe request successful." else: - trace "subscription found." + info "subscription is live." - await sleepAsync(chtimer.seconds(60)) # Subscription maintenance interval + await sleepAsync(30.seconds) # Subscription maintenance interval -proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = +proc setupAndSubscribe*( + wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo +) = if isNil(wakuNode.wakuFilterClient): - error "WakuFilterClient not initialized" - return + # if we have not yet initialized lightpush client, then do it as the only way we can get here is + # by having a service peer discovered. + waitFor wakuNode.mountFilterClient() - info "Start receiving messages to service node using lightpush", - serviceNode = conf.serviceNode + info "Start receiving messages to service node using filter", + servicePeer = servicePeer var stats: PerPeerStatistics - - let remotePeer = parsePeerInfo(conf.serviceNode).valueOr: - error "Couldn't parse the peer info properly", error = error - return + actualFilterPeer = servicePeer let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = let payloadStr = string.fromBytes(message.payload) @@ -103,9 +152,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = stats.echoStats() if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived(): - waitFor unsubscribe( - wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0] - ) + waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0]) info "All messages received. Exiting." ## for gracefull shutdown through signal hooks @@ -117,6 +164,4 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = discard setTimer(Moment.fromNow(interval), printStats) # Start maintaining subscription - asyncSpawn maintainSubscription( - wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0] - ) + asyncSpawn maintainSubscription(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0]) diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index ddb2946ca5..739b0298d8 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -1,5 +1,5 @@ import - std/[strformat, sysrand, random, sequtils], + std/[strformat, sysrand, random, strutils, sequtils], system/ansi_c, chronicles, chronos, @@ -14,11 +14,14 @@ import node/peer_manager, waku_core, waku_lightpush/client, + waku_lightpush/common, common/utils/parse_size_units, ], ./tester_config, ./tester_message, - ./lpt_metrics + ./lpt_metrics, + ./diagnose_connections, + ./service_peer_management randomize() @@ -73,43 +76,46 @@ var failedToSendCause {.threadvar.}: Table[string, uint32] var failedToSendCount {.threadvar.}: uint32 var numMessagesToSend {.threadvar.}: uint32 var messagesSent {.threadvar.}: uint32 +var noOfServicePeerSwitches {.threadvar.}: uint32 -proc reportSentMessages() {.async.} = - while true: - await sleepAsync(chtimer.seconds(60)) - let report = catch: - """*----------------------------------------* +proc reportSentMessages() = + let report = catch: + """*----------------------------------------* +| Service Peer Switches: {noOfServicePeerSwitches:>15} | +*----------------------------------------* | Expected | Sent | Failed | |{numMessagesToSend+failedToSendCount:>11} |{messagesSent:>11} |{failedToSendCount:>11} | *----------------------------------------*""".fmt() - if report.isErr: - echo "Error while printing statistics" - else: - echo report.get() - - echo "*--------------------------------------------------------------------------------------------------*" - echo "| Failur cause | count |" - for (cause, count) in failedToSendCause.pairs: - echo fmt"|{cause:<87}|{count:>10}|" - echo "*--------------------------------------------------------------------------------------------------*" - - echo "*--------------------------------------------------------------------------------------------------*" - echo "| Index | Relayed | Hash |" - for (index, info) in sentMessages.pairs: - echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash:<76}|" - echo "*--------------------------------------------------------------------------------------------------*" - # evere sent message hash should logged once - sentMessages.clear() + if report.isErr: + echo "Error while printing statistics" + else: + echo report.get() + + echo "*--------------------------------------------------------------------------------------------------*" + echo "| Failure cause | count |" + for (cause, count) in failedToSendCause.pairs: + echo fmt"|{cause:<87}|{count:>10}|" + echo "*--------------------------------------------------------------------------------------------------*" + + echo "*--------------------------------------------------------------------------------------------------*" + echo "| Index | Relayed | Hash |" + for (index, info) in sentMessages.pairs: + echo fmt"|{index+1:>10}|{info.relayed:<9}| {info.hash:<76}|" + echo "*--------------------------------------------------------------------------------------------------*" + # evere sent message hash should logged once + sentMessages.clear() proc publishMessages( wakuNode: WakuNode, + servicePeer: RemotePeerInfo, lightpushPubsubTopic: PubsubTopic, lightpushContentTopic: ContentTopic, numMessages: uint32, messageSizeRange: SizeRange, delayMessages: Duration, ) {.async.} = + var actualServicePeer = servicePeer let startedAt = getNowInNanosecondTime() var prevMessageAt = startedAt var renderMsgSize = messageSizeRange @@ -119,24 +125,35 @@ proc publishMessages( renderMsgSize.min = min(renderMsgSize.min, renderMsgSize.max) renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max) + const maxFailedPush = 3 + var noFailedPush = 0 + var noFailedServiceNodeSwitches = 0 + let selfPeerId = $wakuNode.switch.peerInfo.peerId failedToSendCount = 0 numMessagesToSend = if numMessages == 0: uint32.high else: numMessages - messagesSent = 1 + messagesSent = 0 - while numMessagesToSend >= messagesSent: + while messagesSent < numMessagesToSend: let (message, msgSize) = prepareMessage( - selfPeerId, messagesSent, numMessagesToSend, startedAt, prevMessageAt, - lightpushContentTopic, renderMsgSize, + selfPeerId, + messagesSent + 1, + numMessagesToSend, + startedAt, + prevMessageAt, + lightpushContentTopic, + renderMsgSize, + ) + let wlpRes = await wakuNode.lightpushPublish( + some(lightpushPubsubTopic), message, actualServicePeer ) - let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message) let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex if wlpRes.isOk(): sentMessages[messagesSent] = (hash: msgHash, relayed: true) notice "published message using lightpush", - index = messagesSent, + index = messagesSent + 1, count = numMessagesToSend, size = msgSize, pubsubTopic = lightpushPubsubTopic, @@ -144,6 +161,8 @@ proc publishMessages( inc(messagesSent) lpt_publisher_sent_messages_count.inc() lpt_publisher_sent_bytes.inc(amount = msgSize.int64) + if noFailedPush > 0: + noFailedPush -= 1 else: sentMessages[messagesSent] = (hash: msgHash, relayed: false) failedToSendCause.mgetOrPut(wlpRes.error, 1).inc() @@ -151,17 +170,43 @@ proc publishMessages( err = wlpRes.error, hash = msgHash inc(failedToSendCount) lpt_publisher_failed_messages_count.inc(labelValues = [wlpRes.error]) + if not wlpRes.error.toLower().contains("dial"): + # retry sending after shorter wait + await sleepAsync(2.seconds) + continue + else: + noFailedPush += 1 + lpt_service_peer_failure_count.inc(labelValues = ["publisher"]) + if noFailedPush > maxFailedPush: + info "Max push failure limit reached, Try switching peer." + let peerOpt = selectRandomServicePeer( + wakuNode.peerManager, some(actualServicePeer), WakuLightPushCodec + ) + if peerOpt.isOk(): + actualServicePeer = peerOpt.get() - await sleepAsync(delayMessages) + info "New service peer in use", + codec = lightpushPubsubTopic, + peer = constructMultiaddrStr(actualServicePeer) - waitFor reportSentMessages() + noFailedPush = 0 + noOfServicePeerSwitches += 1 + lpt_change_service_peer_count.inc(labelValues = ["publisher"]) + continue # try again with new peer without delay + else: + error "Failed to find new service peer. Exiting." + noFailedServiceNodeSwitches += 1 + break - discard c_raise(ansi_c.SIGTERM) + await sleepAsync(delayMessages) -proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = +proc setupAndPublish*( + wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo +) = if isNil(wakuNode.wakuLightpushClient): - error "WakuFilterClient not initialized" - return + # if we have not yet initialized lightpush client, then do it as the only way we can get here is + # by having a service peer discovered. + wakuNode.mountLightPushClient() # give some time to receiver side to set up let waitTillStartTesting = conf.startPublishingAfter.seconds @@ -180,14 +225,32 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) = info "Start sending messages to service node using lightpush" sentMessages.sort(system.cmp) + + let interval = secs(60) + var printStats: CallbackFunc + + printStats = CallbackFunc( + proc(udata: pointer) {.gcsafe.} = + reportSentMessages() + + if messagesSent >= numMessagesToSend: + info "All messages are sent. Exiting." + + ## for gracefull shutdown through signal hooks + discard c_raise(ansi_c.SIGTERM) + else: + discard setTimer(Moment.fromNow(interval), printStats) + ) + + discard setTimer(Moment.fromNow(interval), printStats) + # Start maintaining subscription asyncSpawn publishMessages( wakuNode, + servicePeer, conf.pubsubTopics[0], conf.contentTopics[0], conf.numMessages, (min: parsedMinMsgSize, max: parsedMaxMsgSize), conf.delayMessages.milliseconds, ) - - asyncSpawn reportSentMessages() diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index a109a7bb0e..4d9f190d61 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -12,19 +12,26 @@ import import waku/[ + common/enr, common/logging, factory/waku, factory/external_config, + waku_node, node/health_monitor, node/waku_metrics, + node/peer_manager, waku_api/rest/builder as rest_server_builder, waku_lightpush/common, waku_filter_v2, + waku_peer_exchange/protocol, + waku_core/peers, + waku_core/multiaddrstr, ], ./tester_config, ./lightpush_publisher, ./filter_subscriber, - ./diagnose_connections + ./diagnose_connections, + ./service_peer_management logScope: topics = "liteprotocoltester main" @@ -83,13 +90,15 @@ when isMainModule: wakuConf.logLevel = conf.logLevel wakuConf.logFormat = conf.logFormat - wakuConf.staticnodes = @[conf.serviceNode] wakuConf.nat = conf.nat wakuConf.maxConnections = 500 wakuConf.restAddress = conf.restAddress wakuConf.restPort = conf.restPort wakuConf.restAllowOrigin = conf.restAllowOrigin + wakuConf.dnsAddrs = true + wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] + wakuConf.pubsubTopics = conf.pubsubTopics wakuConf.contentTopics = conf.contentTopics wakuConf.clusterId = conf.clusterId @@ -97,13 +106,12 @@ when isMainModule: wakuConf.metricsServer = true wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0") - wakuConf.metricsServerPort = 8003 - - if conf.testFunc == TesterFunctionality.SENDER: - wakuConf.lightpushnode = conf.serviceNode - else: - wakuConf.filterNode = conf.serviceNode + wakuConf.metricsServerPort = conf.metricsPort + # If bootstrap option is chosen we expect our clients will not mounted + # so we will mount PeerExchange manually to gather possible service peers, + # if got some we will mount the client protocols afterward. + wakuConf.peerExchange = false wakuConf.relay = false wakuConf.filter = false wakuConf.lightpush = false @@ -191,13 +199,50 @@ when isMainModule: info "Node setup complete" + var codec = WakuLightPushCodec + # mounting relevant client, for PX filter client must be mounted ahead + if conf.testFunc == TesterFunctionality.SENDER: + wakuApp.node.mountLightPushClient() + codec = WakuLightPushCodec + else: + waitFor wakuApp.node.mountFilterClient() + codec = WakuFilterSubscribeCodec + + var lookForServiceNode = false + var serviceNodePeerInfo: RemotePeerInfo + if conf.serviceNode.len == 0: + if conf.bootstrapNode.len > 0: + info "Bootstrapping with PeerExchange to gather random service node" + let futForServiceNode = pxLookupServiceNode(wakuApp.node, conf) + if not (waitFor futForServiceNode.withTimeout(20.minutes)): + error "Service node not found in time via PX" + quit(QuitFailure) + + if futForServiceNode.read().isErr(): + error "Service node for test not found via PX" + quit(QuitFailure) + + serviceNodePeerInfo = selectRandomServicePeer( + wakuApp.node.peerManager, none(RemotePeerInfo), codec + ).valueOr: + error "Service node selection failed" + quit(QuitFailure) + else: + error "No service or bootstrap node provided" + quit(QuitFailure) + else: + # support for both ENR and URI formatted service node addresses + serviceNodePeerInfo = translateToRemotePeerInfo(conf.serviceNode).valueOr: + error "failed to parse service-node", node = conf.serviceNode + quit(QuitFailure) + + info "Service node to be used", serviceNode = $serviceNodePeerInfo + + logSelfPeers(wakuApp.node.peerManager) + if conf.testFunc == TesterFunctionality.SENDER: - waitFor startPeriodicPeerDiagnostic(wakuApp.node.peerManager, WakuLightPushCodec) - setupAndPublish(wakuApp.node, conf) + setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo) else: - waitFor startPeriodicPeerDiagnostic( - wakuApp.node.peerManager, WakuFilterSubscribeCodec - ) - setupAndSubscribe(wakuApp.node, conf) + setupAndSubscribe(wakuApp.node, conf, serviceNodePeerInfo) runForever() diff --git a/apps/liteprotocoltester/lpt_metrics.nim b/apps/liteprotocoltester/lpt_metrics.nim index 655ec098c9..2cdc515b65 100644 --- a/apps/liteprotocoltester/lpt_metrics.nim +++ b/apps/liteprotocoltester/lpt_metrics.nim @@ -22,9 +22,25 @@ declarePublicCounter lpt_receiver_duplicate_messages_count, declarePublicGauge lpt_receiver_distinct_duplicate_messages_count, "number of distinct duplicate messages per peer", ["peer"] +declarePublicCounter lpt_receiver_lost_subscription_count, + "number of filter service peer failed PING requests - lost subscription" + declarePublicCounter lpt_publisher_sent_messages_count, "number of messages published" declarePublicCounter lpt_publisher_failed_messages_count, "number of messages failed to publish per failure cause", ["cause"] declarePublicCounter lpt_publisher_sent_bytes, "number of total bytes sent" + +declarePublicCounter lpt_service_peer_failure_count, + "number of failure during using service peer [publisher/receiever]", ["role"] + +declarePublicCounter lpt_change_service_peer_count, + "number of times [publisher/receiver] had to change service peer", ["role"] + +declarePublicGauge lpt_px_peers, + "Number of peers PeerExchange discovered and can be dialed" + +declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed" + +declarePublicGauge lpt_dial_failures, "Number of dial failures by cause" diff --git a/apps/liteprotocoltester/monitoring/configuration/dashboards/liter-protocol-test-monitoring.json b/apps/liteprotocoltester/monitoring/configuration/dashboards/liter-protocol-test-monitoring.json index 96d574cfe0..22770e27c4 100644 --- a/apps/liteprotocoltester/monitoring/configuration/dashboards/liter-protocol-test-monitoring.json +++ b/apps/liteprotocoltester/monitoring/configuration/dashboards/liter-protocol-test-monitoring.json @@ -28,19 +28,588 @@ "x": 0, "y": 0 }, + "id": 13, + "panels": [], + "title": "Peer statistics", + "type": "row" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "continuous-GrYlRd" + }, + "fieldMinMax": false, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 15, + "w": 5, + "x": 0, + "y": 1 + }, + "id": 15, + "options": { + "displayMode": "lcd", + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "11.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_px_peers{instance=~\".*publisher.*\"}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "{{instance}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Lightpush capable peers found via PX", + "type": "bargauge" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 15, + "w": 7, + "x": 5, + "y": 1 + }, + "id": 22, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_dialed_peers{instance=~\".*publisher.*\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Working filter peers {{instance}}", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_dial_failures{instance=~\".*publisher.*\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Failed to dial {{instance}}", + "range": true, + "refId": "C", + "useBackend": false + } + ], + "title": "Tested lightpush peers", + "type": "timeseries" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "continuous-GrYlRd" + }, + "fieldMinMax": false, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 15, + "w": 5, + "x": 12, + "y": 1 + }, + "id": 21, + "options": { + "displayMode": "lcd", + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "11.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_px_peers{instance=~\".*receiver.*\"}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "{{instance}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Filter capable peers found via PX", + "type": "bargauge" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 15, + "w": 7, + "x": 17, + "y": 1 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_dialed_peers{instance=~\".*receivernode.*\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Working filter peers {{instance}}", + "range": true, + "refId": "B", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_dial_failures{instance=~\".*receivernode.*\"}", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Failed to dial {{instance}}", + "range": true, + "refId": "C", + "useBackend": false + } + ], + "title": "Tested filter peers", + "type": "timeseries" + }, + { + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 16 + }, "id": 12, "title": "Test publisher monitor", "type": "row" }, { "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 23, + "gradientMode": "hue", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 17 + }, + "id": 16, + "options": { + "legend": { + "calcs": [ + "last" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "sum by(instace) (lpt_service_peer_failure_count_total{instance=~\".*publishernode.*\", role=\"publisher\"})", + "format": "time_series", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Push failed", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "sum by(instace) (lpt_change_service_peer_count_total{instance=~\".*publishernode.*\", role=\"publisher\"})", + "hide": false, + "instant": false, + "legendFormat": "Peer switch", + "range": true, + "refId": "B" + } + ], + "title": "Lightpush service peer failures and switches", + "type": "timeseries" + }, + { + "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, "fieldConfig": { "defaults": { "color": { - "mode": "thresholds" + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 23, + "gradientMode": "hue", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "normal" + }, + "thresholdsStyle": { + "mode": "off" + } }, "mappings": [], "thresholds": { @@ -55,17 +624,117 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [] }, "gridPos": { "h": 8, "w": 12, + "x": 12, + "y": 17 + }, + "id": 17, + "options": { + "legend": { + "calcs": [ + "last" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "sum by(instace) (lpt_service_peer_failure_count_total{instance=~\".*receivernode.*\", role=\"receiver\"})", + "format": "time_series", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Subscribe failed", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "sum by(instace) (lpt_change_service_peer_count_total{instance=~\".*receivernode.*\", role=\"receiver\"})", + "hide": false, + "instant": false, + "legendFormat": "Peer switch", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "code", + "expr": "sum by(instace) (lpt_receiver_lost_subscription_count_total{instance=~\".*receivernode.*\"})", + "hide": false, + "instant": false, + "legendFormat": "Subscription loss - ping fail", + "range": true, + "refId": "C" + } + ], + "title": "Filter service peer failures and switches", + "type": "timeseries" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "percentage", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 70 + }, + { + "color": "red", + "value": 85 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 12, + "w": 12, "x": 0, - "y": 1 + "y": 25 }, - "id": 8, + "id": 18, "options": { "minVizHeight": 75, "minVizWidth": 75, @@ -78,10 +747,10 @@ "values": false }, "showThresholdLabels": false, - "showThresholdMarkers": true, + "showThresholdMarkers": false, "sizing": "auto" }, - "pluginVersion": "10.4.2", + "pluginVersion": "11.2.0", "targets": [ { "datasource": { @@ -89,27 +758,120 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "lpt_receiver_sender_peer_count{instance=\"receivernode:8003\"}", + "editorMode": "code", + "expr": "count(\n group(\n last_over_time(lpt_px_peers{instance=~\".*publishernode.*\"}[24h])\n ) by (instance)\n)", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "__auto", + "legendFormat": "Number or publishers", "range": true, "refId": "A", "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "count(\n group(\n last_over_time(lpt_px_peers{instance=~\".*receivernode.*\"}[24h])\n ) by (instance)\n)", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "Number or receivers", + "range": true, + "refId": "B", + "useBackend": false } ], - "title": "Number of publisher peers", + "title": "Number of tester nodes", "type": "gauge" }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 12, + "w": 12, + "x": 12, + "y": 25 + }, + "id": 8, + "options": { + "displayMode": "lcd", + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "top", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "11.2.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "lpt_receiver_sender_peer_count{instance=~\".*receivernode.*\"}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "{{instance}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Receiver detected message from number of publisher peers", + "type": "bargauge" + }, { "collapsed": true, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 9 + "y": 37 }, "id": 11, "panels": [], @@ -118,6 +880,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -133,6 +896,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -171,7 +935,8 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [ { @@ -198,6 +963,10 @@ "group": "A", "mode": "normal" } + }, + { + "id": "unit", + "value": "reqps" } ] } @@ -207,13 +976,15 @@ "h": 8, "w": 12, "x": 0, - "y": 10 + "y": 38 }, "id": 1, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "last" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -229,9 +1000,9 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(job) (lpt_publisher_sent_messages_count_total)", + "expr": "sum by(instace) (lpt_publisher_sent_messages_count_total{instance=~\".*publishernode.*\"})", "format": "time_series", "fullMetaSearch": false, "includeNullMetadata": true, @@ -247,9 +1018,9 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(job) (rate(lpt_publisher_sent_messages_count_total[$__rate_interval]))", + "expr": "sum by(instance) (rate(lpt_publisher_sent_messages_count_total{instance=~\".*publishernode.*\"}[$__rate_interval]))", "format": "time_series", "fullMetaSearch": false, "hide": false, @@ -261,11 +1032,12 @@ "useBackend": false } ], - "title": "Publishes test messages", + "title": "Published test messages", "type": "timeseries" }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -281,6 +1053,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -319,7 +1092,8 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [ { @@ -345,6 +1119,10 @@ { "id": "custom.gradientMode", "value": "hue" + }, + { + "id": "unit", + "value": "reqps" } ] } @@ -354,13 +1132,15 @@ "h": 8, "w": 12, "x": 12, - "y": 10 + "y": 38 }, "id": 2, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "max" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -376,8 +1156,8 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "sum by(instance) (lpt_receiver_received_messages_count_total{instance=\"receivernode:8003\"})", + "editorMode": "code", + "expr": "sum by(instance) (lpt_receiver_received_messages_count_total{instance=~\".*receivernode.*\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -392,8 +1172,8 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "sum by(instance) (rate(lpt_receiver_received_messages_count_total{instance=\"receivernode:8003\"}[$__rate_interval]))", + "editorMode": "code", + "expr": "sum by(instance) (rate(lpt_receiver_received_messages_count_total{instance=~\".*receivernode.*\"}[$__rate_interval]))", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, @@ -409,6 +1189,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -425,6 +1206,7 @@ "axisLabel": "", "axisPlacement": "left", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -470,7 +1252,7 @@ { "matcher": { "id": "byName", - "options": "Test message transfer rate" + "options": "Send message transfer rate" }, "properties": [ { @@ -497,6 +1279,10 @@ "group": "A", "mode": "normal" } + }, + { + "id": "unit", + "value": "KiBs" } ] } @@ -506,13 +1292,15 @@ "h": 8, "w": 12, "x": 0, - "y": 18 + "y": 46 }, "id": 5, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "last" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -528,13 +1316,13 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(job) (lpt_publisher_sent_bytes_total)", + "expr": "sum by(instance) (lpt_publisher_sent_bytes_total{instance=~\".*publishernode.*\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Total received bytes", + "legendFormat": "Total sent bytes", "range": true, "refId": "A", "useBackend": false @@ -545,15 +1333,15 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(job) (rate(lpt_publisher_sent_bytes_total[$__rate_interval]))", + "expr": "sum by(instance) (rate(lpt_publisher_sent_bytes_total{instance=~\".*publishernode.*\"}[$__rate_interval]))", "format": "time_series", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Test message transfer rate", + "legendFormat": "Send message transfer rate", "range": true, "refId": "B", "useBackend": false @@ -564,6 +1352,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -580,6 +1369,7 @@ "axisLabel": "", "axisPlacement": "left", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -652,6 +1442,10 @@ "group": "A", "mode": "normal" } + }, + { + "id": "unit", + "value": "KiBs" } ] } @@ -661,13 +1455,15 @@ "h": 8, "w": 12, "x": 12, - "y": 18 + "y": 46 }, "id": 4, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "last" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -683,9 +1479,9 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(instance) (lpt_receiver_received_bytes_total{instance=\"receivernode:8003\"})", + "expr": "sum by(instance) (lpt_receiver_received_bytes_total{instance=~\".*receivernode.*\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -700,9 +1496,9 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", + "editorMode": "code", "exemplar": false, - "expr": "sum by(instance) (rate(lpt_receiver_received_bytes_total{instance=\"receivernode:8003\"}[$__rate_interval]))", + "expr": "sum by(instance) (rate(lpt_receiver_received_bytes_total{instance=~\".*receivernode.*\"}[$__rate_interval]))", "format": "time_series", "fullMetaSearch": false, "hide": false, @@ -723,7 +1519,7 @@ "h": 1, "w": 24, "x": 0, - "y": 26 + "y": 54 }, "id": 10, "panels": [], @@ -732,6 +1528,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -747,6 +1544,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 23, "gradientMode": "hue", @@ -785,7 +1583,8 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [ { @@ -814,13 +1613,15 @@ "h": 8, "w": 12, "x": 0, - "y": 27 + "y": 55 }, "id": 6, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "last" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -838,12 +1639,12 @@ "disableTextWrap": false, "editorMode": "code", "exemplar": false, - "expr": "lpt_publisher_failed_messages_count_total{instance=\"publishernode:8003\"}", + "expr": "lpt_publisher_failed_messages_count_total{instance=~\".*publishernode.*\"}", "format": "time_series", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Failed to publish count", + "legendFormat": "{{instance}} - {{cause}}", "range": true, "refId": "A", "useBackend": false @@ -854,6 +1655,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -870,6 +1672,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 0, "gradientMode": "none", @@ -908,7 +1711,8 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [] }, @@ -916,13 +1720,15 @@ "h": 8, "w": 12, "x": 12, - "y": 27 + "y": 55 }, "id": 7, "options": { "legend": { - "calcs": [], - "displayMode": "list", + "calcs": [ + "last" + ], + "displayMode": "table", "placement": "bottom", "showLegend": true }, @@ -939,11 +1745,11 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "sum by(instance) (lpt_receiver_duplicate_messages_count_total{instance=\"receivernode:8003\"})", + "expr": "lpt_receiver_duplicate_messages_count_total{instance=~\".*receivernode.*\"}", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Total duplicates", + "legendFormat": "Total duplicates at {{instance}}", "range": true, "refId": "A", "useBackend": false @@ -955,12 +1761,12 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "sum by(instance) (lpt_receiver_distinct_duplicate_messages_count{instance=\"receivernode:8003\"})", + "expr": "lpt_receiver_distinct_duplicate_messages_count{instance=~\".*receivernode.*\"}", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Distinct duplicates", + "legendFormat": "Distinct duplicates at {{instance}}", "range": true, "refId": "B", "useBackend": false @@ -971,6 +1777,7 @@ }, { "datasource": { + "default": true, "type": "prometheus", "uid": "PBFA97CFB590B2093" }, @@ -987,6 +1794,7 @@ "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, + "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 21, "gradientMode": "none", @@ -1025,7 +1833,8 @@ "value": 80 } ] - } + }, + "unit": "none" }, "overrides": [] }, @@ -1033,14 +1842,16 @@ "h": 8, "w": 12, "x": 12, - "y": 35 + "y": 63 }, "id": 9, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right", + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", "showLegend": true }, "tooltip": { @@ -1055,14 +1866,14 @@ "uid": "PBFA97CFB590B2093" }, "disableTextWrap": false, - "editorMode": "builder", - "expr": "lpt_receiver_missing_messages_count", + "editorMode": "code", + "expr": "lpt_receiver_missing_messages_count{instance=~\".*receivernode.*\"}", "format": "time_series", "fullMetaSearch": false, "hide": false, "includeNullMetadata": true, "instant": false, - "legendFormat": "Publisher {{peer}}", + "legendFormat": "Receiver {{instance}}:Publisher {{peer}}", "range": true, "refId": "B", "useBackend": false @@ -1072,15 +1883,62 @@ "type": "timeseries" } ], - "refresh": "5s", + "refresh": "", "schemaVersion": 39, "tags": [], "templating": { - "list": [] + "list": [ + { + "current": { + "selected": true, + "text": "lpt-runner-publishernode-1:8003", + "value": "lpt-runner-publishernode-1:8003" + }, + "definition": "label_values({instance=~\".*publishernode.*\"},instance)", + "hide": 0, + "includeAll": false, + "multi": false, + "name": "publisher", + "options": [], + "query": { + "qryType": 1, + "query": "label_values({instance=~\".*publishernode.*\"},instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + }, + { + "current": { + "selected": true, + "text": "lpt-runner-receivernode-1:8003", + "value": "lpt-runner-receivernode-1:8003" + }, + "definition": "label_values({instance=~\".*receivernode.*\"},instance)", + "hide": 0, + "includeAll": false, + "multi": false, + "name": "receiver", + "options": [], + "query": { + "qryType": 1, + "query": "label_values({instance=~\".*receivernode.*\"},instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] }, "time": { - "from": "now-15m", - "to": "now" + "from": "2024-10-02T22:07:37.000Z", + "to": "2024-10-02T22:23:21.000Z" }, "timepicker": {}, "timezone": "browser", diff --git a/apps/liteprotocoltester/monitoring/prometheus-config.yml b/apps/liteprotocoltester/monitoring/prometheus-config.yml index 7ea2d1747a..d04eaf0c60 100644 --- a/apps/liteprotocoltester/monitoring/prometheus-config.yml +++ b/apps/liteprotocoltester/monitoring/prometheus-config.yml @@ -7,12 +7,29 @@ global: scrape_configs: - job_name: "liteprotocoltester" static_configs: - - targets: ["lightpush-service:8003", - "filter-service:8003", - "liteprotocoltester-publishernode-1:8003", + - targets: ["liteprotocoltester-publishernode-1:8003", "liteprotocoltester-publishernode-2:8003", "liteprotocoltester-publishernode-3:8003", "liteprotocoltester-publishernode-4:8003", "liteprotocoltester-publishernode-5:8003", "liteprotocoltester-publishernode-6:8003", - "receivernode:8003"] + "liteprotocoltester-receivernode-1:8003", + "liteprotocoltester-receivernode-2:8003", + "liteprotocoltester-receivernode-3:8003", + "liteprotocoltester-receivernode-4:8003", + "liteprotocoltester-receivernode-5:8003", + "liteprotocoltester-receivernode-6:8003", + "publishernode:8003", + "publishernode-1:8003", + "publishernode-2:8003", + "publishernode-3:8003", + "publishernode-4:8003", + "publishernode-5:8003", + "publishernode-6:8003", + "receivernode:8003", + "receivernode-1:8003", + "receivernode-2:8003", + "receivernode-3:8003", + "receivernode-4:8003", + "receivernode-5:8003", + "receivernode-6:8003",] diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh index 146eb1b25d..d278af1fcd 100755 --- a/apps/liteprotocoltester/run_tester_node.sh +++ b/apps/liteprotocoltester/run_tester_node.sh @@ -40,6 +40,16 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then exit 1 fi +SELECTOR=$4 +if [ -z "${SELECTOR}" ] || [ "${SELECTOR}" = "SERVICE" ]; then + SERVICE_NODE_DIRECT=true +elif [ "${SELECTOR}" = "BOOTSTRAP" ]; then + SERVICE_NODE_DIRECT=false +else + echo "Invalid selector '${SELECTOR}'. Failing" + exit 1 +fi + DO_DETECT_SERVICENODE=0 if [ "${SERIVCE_NODE_ADDR}" = "servicenode" ]; then @@ -77,6 +87,12 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then exit 1 fi +if $SERVICE_NODE_DIRECT; then + FULL_NODE=--service-node="${SERIVCE_NODE_ADDR}" +else + FULL_NODE=--bootstrap-node="${SERIVCE_NODE_ADDR}" +fi + if [ -n "${PUBSUB}" ]; then PUBSUB=--pubsub-topic="${PUBSUB}" else @@ -119,8 +135,8 @@ echo "My external IP: ${MY_EXT_IP}" exec "${BINARY_PATH}"\ --log-level=INFO\ - --service-node="${SERIVCE_NODE_ADDR}"\ --nat=extip:${MY_EXT_IP}\ + ${FULL_NODE}\ ${DELAY_MESSAGES}\ ${NUM_MESSAGES}\ ${PUBSUB}\ diff --git a/apps/liteprotocoltester/service_peer_management.nim b/apps/liteprotocoltester/service_peer_management.nim new file mode 100644 index 0000000000..6286b79132 --- /dev/null +++ b/apps/liteprotocoltester/service_peer_management.nim @@ -0,0 +1,217 @@ +{.push raises: [].} + +import + std/[options, net, sysrand, random, strformat, strutils, sequtils], + chronicles, + chronos, + metrics, + libbacktrace, + libp2p/crypto/crypto, + confutils, + libp2p/wire + +import + waku/[ + factory/external_config, + common/enr, + waku_node, + node/peer_manager, + waku_lightpush/common, + waku_relay, + waku_filter_v2, + waku_peer_exchange/protocol, + waku_core/multiaddrstr, + waku_core/topics/pubsub_topic, + waku_enr/capabilities, + waku_enr/sharding, + ], + ./tester_config, + ./diagnose_connections, + ./lpt_metrics + +logScope: + topics = "service peer mgmt" + +randomize() + +proc translateToRemotePeerInfo*(peerAddress: string): Result[RemotePeerInfo, void] = + var peerInfo: RemotePeerInfo + var enrRec: enr.Record + if enrRec.fromURI(peerAddress): + trace "Parsed ENR", enrRec = $enrRec + peerInfo = enrRec.toRemotePeerInfo().valueOr: + error "failed to convert ENR to RemotePeerInfo", error = error + return err() + else: + peerInfo = parsePeerInfo(peerAddress).valueOr: + error "failed to parse node waku peer-exchange peerId", error = error + return err() + + return ok(peerInfo) + +## To retrieve peers from PeerExchange partner and return one randomly selected one +## among the ones successfully dialed +## Note: This is kept for future use. +proc selectRandomCapablePeer*( + pm: PeerManager, codec: string, pubsubTopic: PubsubTopic +): Future[Option[RemotePeerInfo]] {.async.} = + var cap = Capabilities.Filter + if codec.contains("lightpush"): + cap = Capabilities.Lightpush + elif codec.contains("filter"): + cap = Capabilities.Filter + + var supportivePeers = pm.wakuPeerStore.getPeersByCapability(cap) + + trace "Found supportive peers count", count = supportivePeers.len() + trace "Found supportive peers", supportivePeers = $supportivePeers + if supportivePeers.len == 0: + return none(RemotePeerInfo) + + var found = none(RemotePeerInfo) + while found.isNone() and supportivePeers.len > 0: + let rndPeerIndex = rand(0 .. supportivePeers.len - 1) + let randomPeer = supportivePeers[rndPeerIndex] + + debug "Dialing random peer", + idx = $rndPeerIndex, peer = constructMultiaddrStr(randomPeer) + + supportivePeers.delete(rndPeerIndex .. rndPeerIndex) + + let connOpt = pm.dialPeer(randomPeer, codec) + if (await connOpt.withTimeout(10.seconds)): + if connOpt.value().isSome(): + found = some(randomPeer) + debug "Dialing successful", + peer = constructMultiaddrStr(randomPeer), codec = codec + else: + debug "Dialing failed", peer = constructMultiaddrStr(randomPeer), codec = codec + else: + debug "Timeout dialing service peer", + peer = constructMultiaddrStr(randomPeer), codec = codec + + return found + +# Debugging PX gathered peers connectivity +proc tryCallAllPxPeers*( + pm: PeerManager, codec: string, pubsubTopic: PubsubTopic +): Future[Option[seq[RemotePeerInfo]]] {.async.} = + var capability = Capabilities.Filter + if codec.contains("lightpush"): + capability = Capabilities.Lightpush + elif codec.contains("filter"): + capability = Capabilities.Filter + + var supportivePeers = pm.wakuPeerStore.getPeersByCapability(capability) + + lpt_px_peers.set(supportivePeers.len) + debug "Found supportive peers count", count = supportivePeers.len() + debug "Found supportive peers", supportivePeers = $supportivePeers + if supportivePeers.len == 0: + return none(seq[RemotePeerInfo]) + + var okPeers: seq[RemotePeerInfo] = @[] + + while supportivePeers.len > 0: + let rndPeerIndex = rand(0 .. supportivePeers.len - 1) + let randomPeer = supportivePeers[rndPeerIndex] + + debug "Dialing random peer", + idx = $rndPeerIndex, peer = constructMultiaddrStr(randomPeer) + + supportivePeers.delete(rndPeerIndex, rndPeerIndex) + + let connOpt = pm.dialPeer(randomPeer, codec) + if (await connOpt.withTimeout(10.seconds)): + if connOpt.value().isSome(): + okPeers.add(randomPeer) + info "Dialing successful", + peer = constructMultiaddrStr(randomPeer), codec = codec + lpt_dialed_peers.inc() + else: + lpt_dial_failures.inc() + error "Dialing failed", peer = constructMultiaddrStr(randomPeer), codec = codec + else: + lpt_dial_failures.inc() + error "Timeout dialing service peer", + peer = constructMultiaddrStr(randomPeer), codec = codec + + var okPeersStr: string = "" + for idx, peer in okPeers: + okPeersStr.add( + " " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " & + $peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n" + ) + echo "PX returned peers found callable for " & codec & " / " & $capability & ":\n" + echo okPeersStr + + return some(okPeers) + +proc pxLookupServiceNode*( + node: WakuNode, conf: LiteProtocolTesterConf +): Future[Result[bool, void]] {.async.} = + var codec: string = WakuLightPushCodec + if conf.testFunc == TesterFunctionality.RECEIVER: + codec = WakuFilterSubscribeCodec + + if node.wakuPeerExchange.isNil(): + let peerExchangeNode = translateToRemotePeerInfo(conf.bootstrapNode).valueOr: + error "Failed to parse bootstrap node - cannot use PeerExchange.", + node = conf.bootstrapNode + return err() + info "PeerExchange node", peer = constructMultiaddrStr(peerExchangeNode) + node.peerManager.addServicePeer(peerExchangeNode, WakuPeerExchangeCodec) + + try: + await node.mountPeerExchange(some(conf.clusterId)) + except CatchableError: + error "failed to mount waku peer-exchange protocol", + error = getCurrentExceptionMsg() + return err() + + var trialCount = 5 + while trialCount > 0: + let futPeers = node.fetchPeerExchangePeers(conf.reqPxPeers) + if not await futPeers.withTimeout(30.seconds): + notice "Cannot get peers from PX", round = 5 - trialCount + else: + if futPeers.value().isErr(): + info "PeerExchange reported error", error = futPeers.read().error + return err() + + if conf.testPeers: + let peersOpt = + await tryCallAllPxPeers(node.peerManager, codec, conf.pubsubTopics[0]) + if peersOpt.isSome(): + info "Found service peers for codec", + codec = codec, peer_count = peersOpt.get().len() + return ok(peersOpt.get().len > 0) + else: + let peerOpt = + await selectRandomCapablePeer(node.peerManager, codec, conf.pubsubTopics[0]) + if peerOpt.isSome(): + info "Found service peer for codec", codec = codec, peer = peerOpt.get() + return ok(true) + + await sleepAsync(5.seconds) + trialCount -= 1 + + return err() + +var alreadyUsedServicePeers {.threadvar.}: seq[RemotePeerInfo] + +## Select service peers by codec from peer store randomly. +proc selectRandomServicePeer*( + pm: PeerManager, actualPeer: Option[RemotePeerInfo], codec: string +): Result[RemotePeerInfo, void] = + if actualPeer.isSome(): + alreadyUsedServicePeers.add(actualPeer.get()) + + let supportivePeers = pm.wakuPeerStore.getPeersByProtocol(codec).filterIt( + it notin alreadyUsedServicePeers + ) + if supportivePeers.len == 0: + return err() + + let rndPeerIndex = rand(0 .. supportivePeers.len - 1) + return ok(supportivePeers[rndPeerIndex]) diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim index db4a8a81a1..333ed04c4e 100644 --- a/apps/liteprotocoltester/statistics.nim +++ b/apps/liteprotocoltester/statistics.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[sets, tables, strutils, sequtils, options, strformat], + std/[sets, tables, sequtils, options, strformat], chronos/timer as chtimer, chronicles, chronos, @@ -290,6 +290,6 @@ proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async. shallWait = true if shallWait: - await sleepAsync(chtimer.seconds(20)) + await sleepAsync(20.seconds) return true diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index 54221e7950..b21bf44f68 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -1,8 +1,6 @@ import - std/[strutils, strformat], results, chronos, - regex, confutils, confutils/defs, confutils/std/net, @@ -11,9 +9,8 @@ import libp2p/crypto/crypto, libp2p/crypto/secp, libp2p/multiaddress, - nimcrypto/utils, - secp256k1, - json + secp256k1 + import waku/[ common/confutils/envvar/defs as confEnvvarDefs, @@ -58,8 +55,16 @@ type LiteProtocolTesterConf* = object .}: logging.LogFormat ## Test configuration - servicenode* {.desc: "Peer multiaddr of the service node.", name: "service-node".}: - string + serviceNode* {. + desc: "Peer multiaddr of the service node.", defaultValue: "", name: "service-node" + .}: string + + bootstrapNode* {. + desc: + "Peer multiaddr of the bootstrap node. If `service-node` not set, it is used to retrieve potential service nodes of the network.", + defaultValue: "", + name: "bootstrap-node" + .}: string nat* {. desc: @@ -135,6 +140,18 @@ type LiteProtocolTesterConf* = object name: "rest-address" .}: IpAddress + testPeers* {. + desc: "Run dial test on gathered PeerExchange peers.", + defaultValue: true, + name: "test-peers" + .}: bool + + reqPxPeers* {. + desc: "Number of peers to request on PeerExchange.", + defaultValue: 100, + name: "req-px-peers" + .}: uint16 + restPort* {. desc: "Listening port of the REST HTTP server.", defaultValue: 8654, @@ -150,6 +167,12 @@ type LiteProtocolTesterConf* = object name: "rest-allow-origin" .}: seq[string] + metricsPort* {. + desc: "Listening port of the Metrics HTTP server.", + defaultValue: 8003, + name: "metrics-port" + .}: uint16 + {.push warning[ProveInit]: off.} proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] = diff --git a/ci/Jenkinsfile.lpt b/ci/Jenkinsfile.lpt new file mode 100644 index 0000000000..9e1357e5b4 --- /dev/null +++ b/ci/Jenkinsfile.lpt @@ -0,0 +1,93 @@ +#!/usr/bin/env groovy +library 'status-jenkins-lib@v1.8.17' + +pipeline { + agent { label 'linux' } + + options { + timestamps() + timeout(time: 20, unit: 'MINUTES') + buildDiscarder(logRotator( + numToKeepStr: '10', + daysToKeepStr: '30', + )) + } + + parameters { + string( + name: 'IMAGE_TAG', + description: 'Name of Docker tag to push. Optional Parameter.', + defaultValue: 'latest' + ) + string( + name: 'IMAGE_NAME', + description: 'Name of Docker image to push.', + defaultValue: params.IMAGE_NAME ?: 'wakuorg/liteprotocoltester', + ) + string( + name: 'DOCKER_CRED', + description: 'Name of Docker Registry credential.', + defaultValue: params.DOCKER_CRED ?: 'harbor-telemetry-robot', + ) + string( + name: 'DOCKER_REGISTRY', + description: 'URL of the Docker Registry', + defaultValue: params.DOCKER_REGISTRY ?: 'harbor.status.im' + ) + string( + name: 'NIMFLAGS', + description: 'Flags for Nim compilation.', + defaultValue: params.NIMFLAGS ?: [ + '--colors:off', + '-d:disableMarchNative', + '-d:chronicles_colors:none', + '-d:insecure', + ].join(' ') + ) + choice( + name: "LOWEST_LOG_LEVEL_ALLOWED", + choices: ['TRACE', 'DEGUG', 'INFO', 'NOTICE', 'WARN', 'ERROR', 'FATAL'], + description: "Defines the log level, which will be available at runtime (Chronicles log level)" + ) + } + + stages { + stage('Build') { + steps { script { + image = docker.build( + "${DOCKER_REGISTRY}/${params.IMAGE_NAME}:${params.IMAGE_TAG ?: env.GIT_COMMIT.take(8)}", + "--label=commit='${git.commit()}' " + + "--label=version='${git.describe('--tags')}' " + + "--build-arg=MAKE_TARGET='liteprotocoltester' " + + "--build-arg=NIMFLAGS='${params.NIMFLAGS}' " + + "--build-arg=LOG_LEVEL='${params.LOWEST_LOG_LEVEL_ALLOWED}' " + + "--file=apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile " + + " ." + ) + } } + } + + stage('Check') { + steps { script { + image.inside('--entrypoint=""') { c -> + sh '/usr/bin/liteprotocoltester --version' + } + } } + } + + stage('Push') { + when { expression { params.IMAGE_TAG != '' } } + steps { script { + withDockerRegistry([ + credentialsId: params.DOCKER_CRED, url: "https://${DOCKER_REGISTRY}" + ]) { + image.push(params.IMAGE_TAG) + } + } } + } + } // stages + + post { + cleanup { cleanWs() } + } // post +} // pipeline diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 2446352c10..904dee509c 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -160,7 +160,7 @@ type WakuNodeConf* = object .}: uint16 agentString* {. - defaultValue: "nwaku-" & git_version, + defaultValue: "nwaku-" & external_config.git_version, desc: "Node agent string which is used as identifier in network", name: "agent-string" .}: string diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 060237a9dc..a91ad874d8 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1072,20 +1072,22 @@ proc lightpushPublish*( target_peer_id = peer.peerId, msg_hash = msgHash return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) + try: + if pubsubTopic.isSome(): + return await internalPublish(node, pubsubTopic.get(), message, peer) - if pubsubTopic.isSome(): - return await internalPublish(node, pubsubTopic.get(), message, peer) - - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic) + let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic) - let topicMap = - if topicMapRes.isErr(): - return err(topicMapRes.error) - else: - topicMapRes.get() + let topicMap = + if topicMapRes.isErr(): + return err(topicMapRes.error) + else: + topicMapRes.get() - for pubsub, _ in topicMap.pairs: # There's only one pair anyway - return await internalPublish(node, $pubsub, message, peer) + for pubsub, _ in topicMap.pairs: # There's only one pair anyway + return await internalPublish(node, $pubsub, message, peer) + except CatchableError: + return err(getCurrentExceptionMsg()) # TODO: Move to application module (e.g., wakunode2.nim) proc lightpushPublish*(