Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor fixes #3045

Merged
merged 8 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions monitor/src/main/java/bisq/monitor/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,17 @@ else if (!configuration.containsKey(INTERVAL))

@Override
public void run() {
Thread.currentThread().setName("Metric: " + getName());
try {
Thread.currentThread().setName("Metric: " + getName());

// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
}
} catch(Throwable e) {
log.error("A metric misbehaved!", e);
}
}

Expand Down
1 change: 0 additions & 1 deletion monitor/src/main/java/bisq/monitor/metric/MarketStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ protected void execute() {
}
amount.find();
timestamp.find();
System.err.println(getName() + ".volume." + market.group(1) + " " + amount.group(1) + " " + timestamp.group(1).substring(0, timestamp.group(1).length() - 3));
reporter.report("volume." + market.group(1), amount.group(1), timestamp.group(1), getName());
});
} catch (IllegalStateException ignore) {
Expand Down
165 changes: 47 additions & 118 deletions monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,166 +17,95 @@

package bisq.monitor.metric;

import bisq.monitor.AvailableTor;
import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;
import bisq.monitor.StatisticsHelper;
import bisq.monitor.ThreadGate;

import bisq.core.proto.network.CoreNetworkProtoResolver;

import bisq.network.p2p.CloseConnectionMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.CloseConnectionReason;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.SetupListener;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.keepalive.messages.Ping;
import bisq.network.p2p.peers.keepalive.messages.Pong;

import bisq.common.proto.network.NetworkEnvelope;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;

import java.io.File;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import lombok.extern.slf4j.Slf4j;
import static com.google.common.base.Preconditions.checkNotNull;

import org.jetbrains.annotations.NotNull;

@Slf4j
public class P2PRoundTripTime extends Metric implements MessageListener, SetupListener {
public class P2PRoundTripTime extends P2PSeedNodeSnapshotBase {

private static final String SAMPLE_SIZE = "run.sampleSize";
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("metric_" + getName());
private int nonce;
private long start;
private List<Long> samples;
private final ThreadGate gate = new ThreadGate();
private final ThreadGate hsReady = new ThreadGate();
private Map<Integer, Long> sentAt = new HashMap<>();
private Map<NodeAddress, Statistics> measurements = new HashMap<>();

public P2PRoundTripTime(Reporter reporter) {
super(reporter);
}

@Override
protected void execute() {

if (null == networkNode) {
// close the gate
hsReady.engage();
/**
* Use a counter to do statistics.
*/
private class Statistics {

networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9052")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, torHiddenServiceDir.getName()));
networkNode.start(this);
private final List<Long> samples = new ArrayList<>();

// wait for the gate to be reopened
hsReady.await();
public synchronized void log(Object message) {
Pong pong = (Pong) message;
Long start = sentAt.get(pong.getRequestNonce());
if(start != null)
samples.add(System.currentTimeMillis() - start);
}

// for each configured host
for (String current : configuration.getProperty(HOSTS, "").split(",")) {
try {
// parse Url
NodeAddress target = OnionParser.getNodeAddress(current);

// init sample bucket
samples = new ArrayList<>();

while (samples.size() < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1"))) {
// so we do not get disconnected due to DoS protection mechanisms
Thread.sleep(200);

nonce = new Random().nextInt();

// close the gate
gate.engage();

start = System.currentTimeMillis();
SettableFuture<Connection> future = networkNode.sendMessage(target, new Ping(nonce, 42));

Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.addMessageListener(P2PRoundTripTime.this);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error("Sending ping failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
}
});

// wait for the gate to open again
gate.await();

// remove the message listener so we do not get messages we are not interested in anymore
// (especially relevant when gate.await() times out)
future.get().removeMessageListener(this);
}

// report
reporter.report(StatisticsHelper.process(samples),
getName() + "." + OnionParser.prettyPrint(target));
} catch (Exception e) {
gate.proceed(); // release the gate on error
e.printStackTrace();
}
public List<Long> values() {
return samples;
}
}

@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof Pong) {
Pong pong = (Pong) networkEnvelope;
if (pong.getRequestNonce() == nonce) {
samples.add(System.currentTimeMillis() - start);
} else {
log.warn("Nonce not matching. That should never happen.\n\t" +
"We drop that message. nonce={} / requestNonce={}",
nonce, pong.getRequestNonce());
}
protected List<NetworkEnvelope> getRequests() {
List<NetworkEnvelope> result = new ArrayList<>();

connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
// open the gate
gate.proceed();
} else if (networkEnvelope instanceof CloseConnectionMessage) {
gate.unlock();
} else {
log.warn("Got a message of type <{}>, expected <Pong>", networkEnvelope.getClass().getSimpleName());
}
}
Random random = new Random();
for (int i = 0; i < Integer.parseInt(configuration.getProperty(SAMPLE_SIZE, "1")); i++)
result.add(new Ping(random.nextInt(), 42));

@Override
public void onTorNodeReady() {
return result;
}

@Override
public void onHiddenServicePublished() {
hsReady.proceed();
protected void aboutToSend(NetworkEnvelope message) {
sentAt.put(((Ping) message).getNonce(), System.currentTimeMillis());
}

@Override
public void onSetupFailed(Throwable throwable) {
protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof Pong) {
checkNotNull(connection.getPeersNodeAddressProperty(),
"although the property is nullable, we need it to not be null");

measurements.putIfAbsent(connection.getPeersNodeAddressProperty().getValue(), new Statistics());

measurements.get(connection.getPeersNodeAddressProperty().getValue()).log(networkEnvelope);

connection.shutDown(CloseConnectionReason.APP_SHUT_DOWN);
return true;
}
return false;
}

@Override
public void onRequestCustomBridges() {
void report() {
// report
measurements.forEach(((nodeAddress, samples) ->
reporter.report(StatisticsHelper.process(samples.values()),
getName() + "." + OnionParser.prettyPrint(nodeAddress))
));
// clean up for next round
measurements = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ void report() {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
} catch (MalformedURLException | NullPointerException ignore) {
log.error("we should never have gotten here", ignore);
}
});
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,20 @@ protected void send(NetworkNode networkNode, NetworkEnvelope message) {
NodeAddress target = OnionParser.getNodeAddress(current);

// do the data request
aboutToSend(message);
SettableFuture<Connection> future = networkNode.sendMessage(target, message);

Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
connection.removeMessageListener(P2PSeedNodeSnapshotBase.this);
connection.addMessageListener(P2PSeedNodeSnapshotBase.this);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
gate.proceed();
log.error(
"Sending PreliminaryDataRequest failed. That is expected if the peer is offline.\n\tException="
+ throwable.getMessage());
"Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage());
}
});

Expand All @@ -153,6 +152,8 @@ public void onFailure(@NotNull Throwable throwable) {
gate.await();
}

protected void aboutToSend(NetworkEnvelope message) { };

/**
* Report all the stuff. Uses the configured reporter directly.
*/
Expand All @@ -168,6 +169,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
log.warn("Got an unexpected message of type <{}>",
networkEnvelope.getClass().getSimpleName());
}
connection.removeMessageListener(this);
}

abstract protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection connection);
Expand Down