diff --git a/monitor/src/main/java/bisq/monitor/Metric.java b/monitor/src/main/java/bisq/monitor/Metric.java index 7ef00f9a397..342980602f6 100644 --- a/monitor/src/main/java/bisq/monitor/Metric.java +++ b/monitor/src/main/java/bisq/monitor/Metric.java @@ -119,13 +119,17 @@ else if (!configuration.containsKey(INTERVAL)) @Override public void run() { - Thread.currentThread().setName("Metric: " + getName()); + try { + Thread.currentThread().setName("Metric: " + getName()); - // execute all the things - synchronized (this) { - log.info("{} started", getName()); - execute(); - log.info("{} done", getName()); + // execute all the things + synchronized (this) { + log.info("{} started", getName()); + execute(); + log.info("{} done", getName()); + } + } catch(Throwable e) { + log.error("A metric misbehaved!", e); } } diff --git a/monitor/src/main/java/bisq/monitor/metric/MarketStats.java b/monitor/src/main/java/bisq/monitor/metric/MarketStats.java index acee92f92fd..b17e78ee123 100644 --- a/monitor/src/main/java/bisq/monitor/metric/MarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/MarketStats.java @@ -103,7 +103,6 @@ protected void execute() { } amount.find(); timestamp.find(); - System.err.println(getName() + ".volume." + market.group(1) + " " + amount.group(1) + " " + timestamp.group(1).substring(0, timestamp.group(1).length() - 3)); reporter.report("volume." + market.group(1), amount.group(1), timestamp.group(1), getName()); }); } catch (IllegalStateException ignore) { diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java b/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java index 0cd7ff75951..a0c1fc0a96a 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java @@ -17,166 +17,95 @@ package bisq.monitor.metric; -import bisq.monitor.AvailableTor; -import bisq.monitor.Metric; -import bisq.monitor.Monitor; import bisq.monitor.OnionParser; import bisq.monitor.Reporter; import bisq.monitor.StatisticsHelper; -import bisq.monitor.ThreadGate; -import bisq.core.proto.network.CoreNetworkProtoResolver; - -import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.CloseConnectionReason; import bisq.network.p2p.network.Connection; -import bisq.network.p2p.network.MessageListener; -import bisq.network.p2p.network.NetworkNode; -import bisq.network.p2p.network.SetupListener; -import bisq.network.p2p.network.TorNetworkNode; import bisq.network.p2p.peers.keepalive.messages.Ping; import bisq.network.p2p.peers.keepalive.messages.Pong; import bisq.common.proto.network.NetworkEnvelope; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; - -import java.io.File; - import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; -import lombok.extern.slf4j.Slf4j; +import static com.google.common.base.Preconditions.checkNotNull; -import org.jetbrains.annotations.NotNull; - -@Slf4j -public class P2PRoundTripTime extends Metric implements MessageListener, SetupListener { +public class P2PRoundTripTime extends P2PSeedNodeSnapshotBase { private static final String SAMPLE_SIZE = "run.sampleSize"; - private static final String HOSTS = "run.hosts"; - private static final String TOR_PROXY_PORT = "run.torProxyPort"; - private NetworkNode networkNode; - private final File torHiddenServiceDir = new File("metric_" + getName()); - private int nonce; - private long start; - private List samples; - private final ThreadGate gate = new ThreadGate(); - private final ThreadGate hsReady = new ThreadGate(); + private Map sentAt = new HashMap<>(); + private Map measurements = new HashMap<>(); public P2PRoundTripTime(Reporter reporter) { super(reporter); } - @Override - protected void execute() { - - if (null == networkNode) { - // close the gate - hsReady.engage(); + /** + * Use a counter to do statistics. + */ + private class Statistics { - networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9052")), - new CoreNetworkProtoResolver(), false, - new AvailableTor(Monitor.TOR_WORKING_DIR, torHiddenServiceDir.getName())); - networkNode.start(this); + private final List samples = new ArrayList<>(); - // wait for the gate to be reopened - hsReady.await(); + public synchronized void log(Object message) { + Pong pong = (Pong) message; + Long start = sentAt.get(pong.getRequestNonce()); + if(start != null) + samples.add(System.currentTimeMillis() - start); } - // for each configured host - for (String current : configuration.getProperty(HOSTS, "").split(",")) { - try { - // parse Url - NodeAddress target = OnionParser.getNodeAddress(current); - - // init sample bucket - samples = new ArrayList<>(); - - while (samples.size() < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1"))) { - // so we do not get disconnected due to DoS protection mechanisms - Thread.sleep(200); - - nonce = new Random().nextInt(); - - // close the gate - gate.engage(); - - start = System.currentTimeMillis(); - SettableFuture future = networkNode.sendMessage(target, new Ping(nonce, 42)); - - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Connection connection) { - connection.addMessageListener(P2PRoundTripTime.this); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - gate.proceed(); - log.error("Sending ping failed. That is expected if the peer is offline.\n\tException=" - + throwable.getMessage()); - } - }); - - // wait for the gate to open again - gate.await(); - - // remove the message listener so we do not get messages we are not interested in anymore - // (especially relevant when gate.await() times out) - future.get().removeMessageListener(this); - } - - // report - reporter.report(StatisticsHelper.process(samples), - getName() + "." + OnionParser.prettyPrint(target)); - } catch (Exception e) { - gate.proceed(); // release the gate on error - e.printStackTrace(); - } + public List values() { + return samples; } } @Override - public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { - if (networkEnvelope instanceof Pong) { - Pong pong = (Pong) networkEnvelope; - if (pong.getRequestNonce() == nonce) { - samples.add(System.currentTimeMillis() - start); - } else { - log.warn("Nonce not matching. That should never happen.\n\t" + - "We drop that message. nonce={} / requestNonce={}", - nonce, pong.getRequestNonce()); - } + protected List getRequests() { + List result = new ArrayList<>(); - connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN); - // open the gate - gate.proceed(); - } else if (networkEnvelope instanceof CloseConnectionMessage) { - gate.unlock(); - } else { - log.warn("Got a message of type <{}>, expected ", networkEnvelope.getClass().getSimpleName()); - } - } + Random random = new Random(); + for (int i = 0; i < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1")); i++) + result.add(new Ping(random.nextInt(), 42)); - @Override - public void onTorNodeReady() { + return result; } @Override - public void onHiddenServicePublished() { - hsReady.proceed(); + protected void aboutToSend(NetworkEnvelope message) { + sentAt.put(((Ping) message).getNonce(), System.currentTimeMillis()); } @Override - public void onSetupFailed(Throwable throwable) { + protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) { + if (networkEnvelope instanceof Pong) { + checkNotNull(connection.getPeersNodeAddressProperty(), + "although the property is nullable, we need it to not be null"); + + measurements.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new Statistics()); + + measurements.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope); + + connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN); + return true; + } + return false; } @Override - public void onRequestCustomBridges() { + void report() { + // report + measurements.forEach(((nodeAddress, samples) -> + reporter.report(StatisticsHelper.process(samples.values()), + getName() + "." + OnionParser.prettyPrint(nodeAddress)) + )); + // clean up for next round + measurements = new HashMap<>(); } } diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java index 2dbc6b68b31..02fa2f5e248 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java @@ -190,8 +190,8 @@ void report() { try { report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType, String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value())); - } catch (MalformedURLException ignore) { - log.error("we should never got here"); + } catch (MalformedURLException | NullPointerException ignore) { + log.error("we should never have gotten here", ignore); } }); try { diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java index 84733aed460..d379fb2fde8 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java @@ -118,12 +118,12 @@ protected void send(NetworkNode networkNode, NetworkEnvelope message) { NodeAddress target = OnionParser.getNodeAddress(current); // do the data request + aboutToSend(message); SettableFuture future = networkNode.sendMessage(target, message); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(Connection connection) { - connection.removeMessageListener(P2PSeedNodeSnapshotBase.this); connection.addMessageListener(P2PSeedNodeSnapshotBase.this); } @@ -131,8 +131,7 @@ public void onSuccess(Connection connection) { public void onFailure(@NotNull Throwable throwable) { gate.proceed(); log.error( - "Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException=" - + throwable.getMessage()); + "Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage()); } }); @@ -153,6 +152,8 @@ public void onFailure(@NotNull Throwable throwable) { gate.await(); } + protected void aboutToSend(NetworkEnvelope message) { }; + /** * Report all the stuff. Uses the configured reporter directly. */ @@ -168,6 +169,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { log.warn("Got an unexpected message of type <{}>", networkEnvelope.getClass().getSimpleName()); } + connection.removeMessageListener(this); } abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);