Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor checkMaxConnections #3126

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ configure(project(':common')) {
configure(project(':p2p')) {
dependencies {
compile project(':common')
testCompile project(':core')
devinbileck marked this conversation as resolved.
Show resolved Hide resolved
compile("com.github.JesusMcCloud.netlayer:tor.native:$netlayerVersion") {
exclude(module: 'slf4j-api')
}
Expand Down
110 changes: 65 additions & 45 deletions p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import javax.inject.Inject;

import com.google.common.annotations.VisibleForTesting;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
Expand Down Expand Up @@ -300,65 +302,83 @@ private void doHouseKeeping() {
}
}

private boolean checkMaxConnections() {
@VisibleForTesting
boolean checkMaxConnections() {
Set<Connection> allConnections = new HashSet<>(networkNode.getAllConnections());
int size = allConnections.size();
log.info("We have {} connections open. Our limit is {}", size, maxConnections);

if (size > maxConnections) {
log.info("We have too many connections open. " +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
.filter(e -> e instanceof InboundConnection)
if (size <= maxConnections) {
log.debug("We have not exceeded the maxConnections limit of {} " +
"so don't need to close any connections.", size);
return false;
}

log.info("We have too many connections open. " +
"Lets try first to remove the inbound connections of type PEER.");
List<Connection> candidates = allConnections.stream()
.filter(e -> e instanceof InboundConnection)
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());

if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size <= maxConnectionsPeer) {
log.info("We have not exceeded maxConnectionsPeer limit of {} " +
"so don't need to close any connections", maxConnectionsPeer);
return false;
}

log.info("We have exceeded maxConnectionsPeer limit of {}. " +
"Lets try to remove ANY connection of type PEER.", maxConnectionsPeer);
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());

if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsPeer limit of {}", maxConnectionsPeer);
if (size > maxConnectionsPeer) {
log.info("Lets try to remove ANY connection of type PEER.");
candidates = allConnections.stream()
.filter(e -> e.getPeerType() == Connection.PeerType.PEER)
.collect(Collectors.toList());

if (candidates.isEmpty()) {
log.debug("No candidates found. We check if we exceed our " +
"maxConnectionsNonDirect limit of {}", maxConnectionsNonDirect);
if (size > maxConnectionsNonDirect) {
log.info("Lets try to remove any connection which is not of type DIRECT_MSG_PEER or INITIAL_DATA_REQUEST.");
candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER && e.getPeerType() != Connection.PeerType.INITIAL_DATA_REQUEST)
.collect(Collectors.toList());

if (candidates.isEmpty()) {
log.debug("No candidates found. We check if we exceed our " +
"maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size > maxConnectionsAbsolute) {
log.info("We reached abs. max. connections. Lets try to remove ANY connection.");
candidates = new ArrayList<>(allConnections);
}
}
}
"maxConnectionsNonDirect limit of {}", maxConnectionsNonDirect);
if (size <= maxConnectionsNonDirect) {
log.info("We have not exceeded maxConnectionsNonDirect limit of {} " +
"so don't need to close any connections", maxConnectionsNonDirect);
return false;
}

log.info("We have exceeded maxConnectionsNonDirect limit of {} " +
"Lets try to remove any connection which is not " +
"of type DIRECT_MSG_PEER or INITIAL_DATA_REQUEST.", maxConnectionsNonDirect);
candidates = allConnections.stream()
.filter(e -> e.getPeerType() != Connection.PeerType.DIRECT_MSG_PEER &&
e.getPeerType() != Connection.PeerType.INITIAL_DATA_REQUEST)
.collect(Collectors.toList());

if (candidates.isEmpty()) {
log.info("No candidates found. We check if we exceed our " +
"maxConnectionsAbsolute limit of {}", maxConnectionsAbsolute);
if (size <= maxConnectionsAbsolute) {
log.info("We have not exceeded maxConnectionsAbsolute limit of {} " +
"so don't need to close any connections", maxConnectionsAbsolute);
return false;
}

log.info("We reached abs. max. connections. Lets try to remove ANY connection.");
candidates = new ArrayList<>(allConnections);
}
}
}

if (!candidates.isEmpty()) {
candidates.sort(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()));
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection: {}", candidates.size(), connection);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
if (!connection.isStopped())
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> UserThread.runAfter(this::checkMaxConnections, 100, TimeUnit.MILLISECONDS));
return true;
} else {
log.debug("No candidates found to remove.\n\t" +
"size={}, allConnections={}", size, allConnections);
return false;
}
if (!candidates.isEmpty()) {
candidates.sort(Comparator.comparingLong(o -> o.getStatistic().getLastActivityTimestamp()));
Connection connection = candidates.remove(0);
log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection: {}", candidates.size(), connection);
log.debug("We are going to shut down the oldest connection.\n\tconnection={}", connection.toString());
if (!connection.isStopped())
connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, () -> UserThread.runAfter(this::checkMaxConnections, 100, TimeUnit.MILLISECONDS));
return true;
} else {
log.trace("We only have {} connections open and don't need to close any.", size);
log.info("No candidates found to remove.\n\t" +
"size={}, allConnections={}", size, allConnections);
return false;
}
}
Expand Down
86 changes: 86 additions & 0 deletions p2p/src/test/java/bisq/network/p2p/MockNode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.network.p2p;

import bisq.core.network.p2p.seed.DefaultSeedNodeRepository;
devinbileck marked this conversation as resolved.
Show resolved Hide resolved

import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.InboundConnection;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.OutboundConnection;
import bisq.network.p2p.network.Statistic;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.peerexchange.PeerList;

import bisq.common.ClockWatcher;
import bisq.common.proto.persistable.PersistenceProtoResolver;
import bisq.common.storage.CorruptedDatabaseFilesHandler;
import bisq.common.storage.Storage;

import java.io.File;

import java.util.HashSet;
import java.util.Set;

import lombok.Getter;

import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockNode {
@Getter
public NetworkNode networkNode;
@Getter
public PeerManager peerManager;
@Getter
public Set<Connection> connections;
@Getter
public int maxConnections;

public MockNode(int maxConnections) {
this.maxConnections = maxConnections;
networkNode = mock(NetworkNode.class);
Storage<PeerList> storage = new Storage<>(mock(File.class), mock(PersistenceProtoResolver.class), mock(CorruptedDatabaseFilesHandler.class));
peerManager = new PeerManager(networkNode, mock(DefaultSeedNodeRepository.class), new ClockWatcher(), maxConnections, storage);
devinbileck marked this conversation as resolved.
Show resolved Hide resolved
connections = new HashSet<>();
when(networkNode.getAllConnections()).thenReturn(connections);
}

public void addInboundConnection(Connection.PeerType peerType) {
InboundConnection inboundConnection = mock(InboundConnection.class);
when(inboundConnection.getPeerType()).thenReturn(peerType);
Statistic statistic = mock(Statistic.class);
long lastActivityTimestamp = System.currentTimeMillis();
when(statistic.getLastActivityTimestamp()).thenReturn(lastActivityTimestamp);
when(inboundConnection.getStatistic()).thenReturn(statistic);
doNothing().when(inboundConnection).run();
connections.add(inboundConnection);
}

public void addOutboundConnection(Connection.PeerType peerType) {
OutboundConnection outboundConnection = mock(OutboundConnection.class);
when(outboundConnection.getPeerType()).thenReturn(peerType);
Statistic statistic = mock(Statistic.class);
long lastActivityTimestamp = System.currentTimeMillis();
when(statistic.getLastActivityTimestamp()).thenReturn(lastActivityTimestamp);
when(outboundConnection.getStatistic()).thenReturn(statistic);
doNothing().when(outboundConnection).run();
connections.add(outboundConnection);
}
}
Loading