Skip to content

Commit

Permalink
Merge pull request #3045 from freimair/monitor-bsqblocks
Browse files Browse the repository at this point in the history
Monitor fixes
  • Loading branch information
sqrrm authored Aug 9, 2019
2 parents 89072d9 + bc73844 commit b1ebe07
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 130 deletions.
16 changes: 10 additions & 6 deletions monitor/src/main/java/bisq/monitor/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ else if (!configuration.containsKey(INTERVAL))

@Override
public void run() {
Thread.currentThread().setName("Metric: " + getName());
try {
Thread.currentThread().setName("Metric: " + getName());

// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
}
} catch(Throwable e) {
log.error("A metric misbehaved!", e);
}
}

Expand Down
1 change: 0 additions & 1 deletion monitor/src/main/java/bisq/monitor/metric/MarketStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ protected void execute() {
}
amount.find();
timestamp.find();
System.err.println(getName() + ".volume." + market.group(1) + " " + amount.group(1) + " " + timestamp.group(1).substring(0, timestamp.group(1).length() - 3));
reporter.report("volume." + market.group(1), amount.group(1), timestamp.group(1), getName());
});
} catch (IllegalStateException ignore) {
Expand Down
165 changes: 47 additions & 118 deletions monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,166 +17,95 @@

package bisq.monitor.metric;

import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.StatisticsHelper;
import bisq.monitor.ThreadGate;

import bisq.core.proto.network.CoreNetworkProtoResolver;

import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.SetupListener;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.keepalive.messages.Ping;
import bisq.network.p2p.peers.keepalive.messages.Pong;

import bisq.common.proto.network.NetworkEnvelope;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;

import java.io.File;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;

import org.jetbrains.annotations.NotNull;

@Slf4j
public class P2PRoundTripTime extends Metric implements MessageListener, SetupListener {
public class P2PRoundTripTime extends P2PSeedNodeSnapshotBase {

private static final String SAMPLE_SIZE = "run.sampleSize";
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("metric_" + getName());
private int nonce;
private long start;
private List<Long> samples;
private final ThreadGate gate = new ThreadGate();
private final ThreadGate hsReady = new ThreadGate();
private Map<Integer, Long> sentAt = new HashMap<>();
private Map<NodeAddress, Statistics> measurements = new HashMap<>();

public P2PRoundTripTime(Reporter reporter) {
super(reporter);
}

@Override
protected void execute() {

if (null == networkNode) {
// close the gate
hsReady.engage();
/**
* Use a counter to do statistics.
*/
private class Statistics {

networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9052")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, torHiddenServiceDir.getName()));
networkNode.start(this);
private final List<Long> samples = new ArrayList<>();

// wait for the gate to be reopened
hsReady.await();
public synchronized void log(Object message) {
Pong pong = (Pong) message;
Long start = sentAt.get(pong.getRequestNonce());
if(start != null)
samples.add(System.currentTimeMillis() - start);
}

// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);

// init sample bucket
samples = new ArrayList<>();

while (samples.size() < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1"))) {
// so we do not get disconnected due to DoS protection mechanisms
Thread.sleep(200);

nonce = new Random().nextInt();

// close the gate
gate.engage();

start = System.currentTimeMillis();
SettableFuture<Connection> future = networkNode.sendMessage(target, new Ping(nonce, 42));

Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PRoundTripTime.this);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error("Sending ping failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});

// wait for the gate to open again
gate.await();

// remove the message listener so we do not get messages we are not interested in anymore
// (especially relevant when gate.await() times out)
future.get().removeMessageListener(this);
}

// report
reporter.report(StatisticsHelper.process(samples),
getName() + "." + OnionParser.prettyPrint(target));
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
public List<Long> values() {
return samples;
}
}

@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof Pong) {
Pong pong = (Pong) networkEnvelope;
if (pong.getRequestNonce() == nonce) {
samples.add(System.currentTimeMillis() - start);
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, pong.getRequestNonce());
}
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();

connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
// open the gate
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got a message of type <{}>, expected <Pong>", networkEnvelope.getClass().getSimpleName());
}
}
Random random = new Random();
for (int i = 0; i < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1")); i++)
result.add(new Ping(random.nextInt(), 42));

@Override
public void onTorNodeReady() {
return result;
}

@Override
public void onHiddenServicePublished() {
hsReady.proceed();
protected void aboutToSend(NetworkEnvelope message) {
sentAt.put(((Ping) message).getNonce(), System.currentTimeMillis());
}

@Override
public void onSetupFailed(Throwable throwable) {
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof Pong) {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");

measurements.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new Statistics());

measurements.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope);

connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
return true;
}
return false;
}

@Override
public void onRequestCustomBridges() {
void report() {
// report
measurements.forEach(((nodeAddress, samples) ->
reporter.report(StatisticsHelper.process(samples.values()),
getName() + "." + OnionParser.prettyPrint(nodeAddress))
));
// clean up for next round
measurements = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ void report() {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
} catch (MalformedURLException | NullPointerException ignore) {
log.error("we should never have gotten here", ignore);
}
});
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,20 @@ protected void send(NetworkNode networkNode, NetworkEnvelope message) {
NodeAddress target = OnionParser.getNodeAddress(current);

// do the data request
aboutToSend(message);
SettableFuture<Connection> future = networkNode.sendMessage(target, message);

Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.removeMessageListener(P2PSeedNodeSnapshotBase.this);
connection.addMessageListener(P2PSeedNodeSnapshotBase.this);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
"Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage());
}
});

Expand All @@ -153,6 +152,8 @@ public void onFailure(@NotNull Throwable throwable) {
gate.await();
}

protected void aboutToSend(NetworkEnvelope message) { };

/**
* Report all the stuff. Uses the configured reporter directly.
*/
Expand All @@ -168,6 +169,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
log.warn("Got an unexpected message of type <{}>",
networkEnvelope.getClass().getSimpleName());
}
connection.removeMessageListener(this);
}

abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);
Expand Down

0 comments on commit b1ebe07

Please sign in to comment.