Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix small p2p network issues #3154

Merged
merged 8 commits into from
Aug 29, 2019
10 changes: 9 additions & 1 deletion common/src/main/java/bisq/common/app/Capabilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class Capabilities {
*/
public static final Capabilities app = new Capabilities();

// Defines which most recent capability any node need to support.
// This helps to clean network from very old inactive but still running nodes.
private static final Capability mandatoryCapability = Capability.DAO_STATE;

protected final Set<Capability> capabilities = new HashSet<>();

public Capabilities(Capability... capabilities) {
Expand Down Expand Up @@ -71,7 +75,7 @@ public void addAll(Capability... capabilities) {
}

public void addAll(Capabilities capabilities) {
if(capabilities != null)
if (capabilities != null)
this.capabilities.addAll(capabilities.capabilities);
}

Expand Down Expand Up @@ -111,6 +115,10 @@ public static Capabilities fromIntList(List<Integer> capabilities) {
.collect(Collectors.toSet()));
}

public static boolean hasMandatoryCapability(Capabilities capabilities) {
return capabilities.capabilities.stream().anyMatch(c -> c == mandatoryCapability);
}

@Override
public String toString() {
return Arrays.toString(Capabilities.toIntList(this).toArray());
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/java/bisq/core/setup/CoreNetworkCapabilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
@Slf4j
public class CoreNetworkCapabilities {
public static void setSupportedCapabilities(BisqEnvironment bisqEnvironment) {
Capabilities.app.addAll(Capability.TRADE_STATISTICS, Capability.TRADE_STATISTICS_2, Capability.ACCOUNT_AGE_WITNESS, Capability.ACK_MSG);
Capabilities.app.addAll(Capability.BUNDLE_OF_ENVELOPES, Capability.SIGNED_ACCOUNT_AGE_WITNESS);
Capabilities.app.addAll(
Capability.TRADE_STATISTICS,
Capability.TRADE_STATISTICS_2,
Capability.ACCOUNT_AGE_WITNESS,
Capability.ACK_MSG,
Capability.BUNDLE_OF_ENVELOPES
);

if (BisqEnvironment.isDaoActivated(bisqEnvironment)) {
Capabilities.app.addAll(Capability.PROPOSAL, Capability.BLIND_VOTE, Capability.DAO_STATE);
Capabilities.app.addAll(
Capability.PROPOSAL,
Capability.BLIND_VOTE,
Capability.DAO_STATE
);

maybeApplyDaoFullMode(bisqEnvironment);
}
Expand Down
52 changes: 35 additions & 17 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public enum PeerType {
private static ConnectionConfig connectionConfig;

// Leaving some constants package-private for tests to know limits.
static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
private static final int PERMITTED_MESSAGE_SIZE = 200 * 1024; // 200 kb
private static final int MAX_PERMITTED_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB (425 offers resulted in about 660 kb, mailbox msg will add more to it) offer has usually 2 kb, mailbox 3kb.
//TODO decrease limits again after testing
private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(120);

Expand Down Expand Up @@ -172,7 +172,7 @@ public static int getPermittedMessageSize() {
private RuleViolation ruleViolation;
private final ConcurrentHashMap<RuleViolation, Integer> ruleViolations = new ConcurrentHashMap<>();

private Capabilities capabilities = new Capabilities();
private final Capabilities capabilities = new Capabilities();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -233,9 +233,9 @@ public Capabilities getCapabilities() {
return capabilities;
}

Object lock = new Object();
Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();
private final Object lock = new Object();
private final Queue<BundleOfEnvelopes> queueOfBundles = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService bundleSender = Executors.newSingleThreadScheduledExecutor();

// Called from various threads
public void sendMessage(NetworkEnvelope networkEnvelope) {
Expand All @@ -250,7 +250,7 @@ public void sendMessage(NetworkEnvelope networkEnvelope) {
log.debug("Sending message: {}", Utilities.toTruncatedString(proto.toString(), 10000));

if (networkEnvelope instanceof Ping | networkEnvelope instanceof RefreshOfferMessage) {
// pings and offer refresh msg we dont want to log in production
// pings and offer refresh msg we don't want to log in production
log.trace("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Sending direct message to peer" +
"Write object to outputStream to peer: {} (uid={})\ntruncated message={} / size={}" +
Expand Down Expand Up @@ -298,10 +298,13 @@ public void sendMessage(NetworkEnvelope networkEnvelope) {
if (!stopped) {
synchronized (lock) {
BundleOfEnvelopes current = queueOfBundles.poll();
if(current.getEnvelopes().size() == 1)
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
else
protoOutputStream.writeEnvelope(current);
if (current != null) {
if (current.getEnvelopes().size() == 1) {
protoOutputStream.writeEnvelope(current.getEnvelopes().get(0));
} else {
protoOutputStream.writeEnvelope(current);
}
}
}
}
}, lastSendTimeStamp - now, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -386,10 +389,10 @@ private boolean violatesThrottleLimit() {
messageTimeStamps.add(now);

// clean list
while(messageTimeStamps.size() > msgThrottlePer10Sec)
while (messageTimeStamps.size() > msgThrottlePer10Sec)
messageTimeStamps.remove(0);

return violatesThrottleLimit(now,1, msgThrottlePerSec) || violatesThrottleLimit(now,10, msgThrottlePer10Sec);
return violatesThrottleLimit(now, 1, msgThrottlePerSec) || violatesThrottleLimit(now, 10, msgThrottlePer10Sec);
}

private boolean violatesThrottleLimit(long now, int seconds, int messageCountLimit) {
Expand All @@ -399,7 +402,7 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim
long compareValue = messageTimeStamps.get(messageTimeStamps.size() - messageCountLimit);

// if duration < seconds sec we received too much network_messages
if(now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
if (now - compareValue < TimeUnit.SECONDS.toMillis(seconds)) {
log.error("violatesThrottleLimit {}/{} second(s)", messageCountLimit, seconds);

return true;
Expand Down Expand Up @@ -436,7 +439,7 @@ public void setPeerType(PeerType peerType) {
this.peerType = peerType;
}

public void setPeersNodeAddress(NodeAddress peerNodeAddress) {
private void setPeersNodeAddress(NodeAddress peerNodeAddress) {
checkNotNull(peerNodeAddress, "peerAddress must not be null");
peersNodeAddressOptional = Optional.of(peerNodeAddress);

Expand Down Expand Up @@ -494,6 +497,7 @@ public void shutDown(CloseConnectionReason closeConnectionReason, @Nullable Runn

stopped = true;

//noinspection UnstableApiUsage
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage());
Expand Down Expand Up @@ -534,6 +538,7 @@ private void doShutDown(CloseConnectionReason closeConnectionReason, @Nullable R
e.printStackTrace();
}

//noinspection UnstableApiUsage
MoreExecutors.shutdownAndAwaitTermination(singleThreadExecutor, 500, TimeUnit.MILLISECONDS);

log.debug("Connection shutdown complete " + this.toString());
Expand Down Expand Up @@ -705,7 +710,7 @@ public void run() {
Thread.sleep(20);
}

// Reading the protobuffer message from the inputstream
// Reading the protobuffer message from the inputStream
protobuf.NetworkEnvelope proto = protobuf.NetworkEnvelope.parseDelimitedFrom(protoInputStream);

if (proto == null) {
Expand Down Expand Up @@ -793,7 +798,20 @@ && reportInvalidRequest(RuleViolation.WRONG_NETWORK_ID)) {
if (networkEnvelope instanceof SupportedCapabilitiesMessage) {
Capabilities supportedCapabilities = ((SupportedCapabilitiesMessage) networkEnvelope).getSupportedCapabilities();
if (supportedCapabilities != null) {
capabilities.set(supportedCapabilities);
if (!capabilities.equals(supportedCapabilities)) {
if (!Capabilities.hasMandatoryCapability(capabilities)) {
shutDown(CloseConnectionReason.RULE_VIOLATION);
return;
}

capabilities.set(supportedCapabilities);
capabilitiesListeners.forEach(weakListener -> {
SupportedCapabilitiesListener supportedCapabilitiesListener = weakListener.get();
if (supportedCapabilitiesListener != null) {
UserThread.execute(() -> supportedCapabilitiesListener.onChanged(supportedCapabilities));
}
});
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public abstract class NetworkNode implements MessageListener {
// when the events happen.
abstract public void start(@Nullable SetupListener setupListener);

public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress, NetworkEnvelope networkEnvelope) {
public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddress,
NetworkEnvelope networkEnvelope) {
log.debug("sendMessage: peersNodeAddress=" + peersNodeAddress + "\n\tmessage=" + Utilities.toTruncatedString(networkEnvelope));
checkNotNull(peersNodeAddress, "peerAddress must not be null");

Expand All @@ -112,9 +113,9 @@ public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddr

final SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress);
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress());

if(peersNodeAddress.equals(getNodeAddress())){
if (peersNodeAddress.equals(getNodeAddress())) {
throw new ConnectException("We do not send a message to ourselves");
}

Expand Down Expand Up @@ -162,7 +163,8 @@ public void onConnection(Connection connection) {
}

@Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
public void onDisconnect(CloseConnectionReason closeConnectionReason,
Connection connection) {
log.trace("onDisconnect connectionListener\n\tconnection={}" + connection);
//noinspection SuspiciousMethodCalls
outBoundConnections.remove(connection);
Expand Down Expand Up @@ -264,7 +266,8 @@ public Socks5Proxy getSocksProxy() {
public SettableFuture<Connection> sendMessage(Connection connection, NetworkEnvelope networkEnvelope) {
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = executorService.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + connection.getUid());
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ public Date getDate() {

@Override
public void onChanged(Capabilities supportedCapabilities) {
if (!supportedCapabilities.isEmpty())
if (!supportedCapabilities.isEmpty()) {
capabilities.set(supportedCapabilities);
}
}


Expand Down
Loading