Skip to content

Commit

Permalink
[PAN-2560] Cleanup PeerConnection interface (PegaSysEng#1282)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored and notlesh committed May 4, 2019
1 parent 9131d4a commit a2518a2
Show file tree
Hide file tree
Showing 27 changed files with 99 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class StubbedPeerConnection {
public static PeerConnection create(final BytesValue nodeId) {
PeerConnection peerConnection = mock(PeerConnection.class);
PeerInfo peerInfo = new PeerInfo(0, "IbftIntTestPeer", emptyList(), 0, nodeId);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerConnection.getPeerInfo()).thenReturn(peerInfo);
return peerConnection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void send(final Message message) {
}
final List<Address> excludeAddressesList =
Lists.newArrayList(
message.getConnection().getPeer().getAddress(), decodedMessage.getAuthor());
message.getConnection().getPeerInfo().getAddress(), decodedMessage.getAuthor());

multicaster.send(messageData, excludeAddressesList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public ValidatorPeers(final VoteTallyCache voteTallyCache) {

@Override
public void add(final PeerConnection newConnection) {
final Address peerAddress = newConnection.getPeer().getAddress();
final Address peerAddress = newConnection.getPeerInfo().getAddress();
peerConnections.put(peerAddress, newConnection);
}

@Override
public void remove(final PeerConnection removedConnection) {
final Address peerAddress = removedConnection.getPeer().getAddress();
final Address peerAddress = removedConnection.getPeerInfo().getAddress();
peerConnections.remove(peerAddress);
}

Expand Down Expand Up @@ -85,7 +85,7 @@ private void sendMessageToSpecificAddresses(
LOG.trace(
"Lost connection to a validator. remoteAddress={} peerInfo={}",
connection.getRemoteAddress(),
connection.getPeer());
connection.getPeerInfo());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class MockPeerFactory {
public static PeerConnection create(final Address address) {
final PeerConnection peerConnection = mock(PeerConnection.class);
final PeerInfo peerInfo = createPeerInfo(address);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerConnection.getPeerInfo()).thenReturn(peerInfo);
return peerConnection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setup() {

final PeerInfo peerInfo = mock(PeerInfo.class);
final PeerConnection peerConnection = mock(PeerConnection.class);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerConnection.getPeerInfo()).thenReturn(peerInfo);
when(peerInfo.getAddress()).thenReturn(address);

peerConnections.add(peerConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public int outstandingRequests() {
}

public BytesValue nodeId() {
return connection.getPeer().getNodeId();
return connection.getPeerInfo().getNodeId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.bytes.Bytes32;

import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Set<Capability> getAgreedCapabilities() {
}

@Override
public PeerInfo getPeer() {
public PeerInfo getPeerInfo() {
return new PeerInfo(5, "Mock", new ArrayList<>(caps), 0, nodeId);
}

Expand All @@ -80,12 +80,12 @@ public void disconnect(final DisconnectReason reason) {
}

@Override
public SocketAddress getLocalAddress() {
public InetSocketAddress getLocalAddress() {
throw new UnsupportedOperationException();
}

@Override
public SocketAddress getRemoteAddress() {
public InetSocketAddress getRemoteAddress() {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public void connectAndAssertAll()
final TestNode destination = nodes.get(j);
try {
LOG.info("Attempting to connect source " + source.shortId() + " to dest " + destination);
assertThat(source.connect(destination).get(30L, TimeUnit.SECONDS).getPeer().getNodeId())
assertThat(
source.connect(destination).get(30L, TimeUnit.SECONDS).getPeerInfo().getNodeId())
.isEqualTo(destination.id());
// Wait for the destination node to finish bonding.
Awaitility.await()
Expand Down Expand Up @@ -118,7 +119,7 @@ public void assertPeerCounts() {

private boolean hasConnection(final TestNode node1, final TestNode node2) {
for (final PeerConnection peer : node1.network.getPeers()) {
if (node2.id().equals(peer.getPeer().getNodeId())) {
if (node2.id().equals(peer.getPeerInfo().getNodeId())) {
return true;
}
}
Expand Down Expand Up @@ -221,7 +222,7 @@ public void assertNoNetworkDisconnections() {
for (final Map.Entry<PeerConnection, DisconnectReason> entry :
node.disconnections.entrySet()) {
final PeerConnection peer = entry.getKey();
final String peerString = peer.getPeer().getNodeId() + "@" + peer.getRemoteAddress();
final String peerString = peer.getPeerInfo().getNodeId() + "@" + peer.getRemoteAddress();
final String unsentTxMsg =
"Node "
+ node.shortId()
Expand All @@ -245,7 +246,7 @@ public void logPeerConnections() {
for (final PeerConnection peer : node.network.getPeers()) {
final String localString = node.shortId() + "@" + peer.getLocalAddress();
final String peerString =
shortId(peer.getPeer().getNodeId()) + "@" + peer.getRemoteAddress();
shortId(peer.getPeerInfo().getNodeId()) + "@" + peer.getRemoteAddress();
connStr.add("Connection: " + localString + " to " + peerString);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ public class PeerResult {
private final String id;

public PeerResult(final PeerConnection peer) {
this.version = Quantity.create(peer.getPeer().getVersion());
this.name = peer.getPeer().getClientId();
this.version = Quantity.create(peer.getPeerInfo().getVersion());
this.name = peer.getPeerInfo().getClientId();
this.caps =
peer.getPeer().getCapabilities().stream()
peer.getPeerInfo().getCapabilities().stream()
.map(Capability::toString)
.map(TextNode::new)
.collect(Collectors.toList());
this.network = new NetworkResult(peer.getLocalAddress(), peer.getRemoteAddress());
this.port = Quantity.create(peer.getPeer().getPort());
this.id = peer.getPeer().getNodeId().toString();
this.port = Quantity.create(peer.getPeerInfo().getPort());
this.id = peer.getPeerInfo().getNodeId().toString();
}

@JsonGetter(value = "version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void assertPeerResultMatchesPeer(
final BytesValue jsonNodeId = BytesValue.fromHexString(peerJson.getString("id"));

final PeerInfo jsonPeer = new PeerInfo(jsonVersion, jsonClient, caps, jsonPort, jsonNodeId);
assertThat(peerConn.getPeer()).isEqualTo(jsonPeer);
assertThat(peerConn.getPeerInfo()).isEqualTo(jsonPeer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static PeerConnection create(
final InetSocketAddress localAddress,
final InetSocketAddress remoteAddress) {
PeerConnection peerConnection = mock(PeerConnection.class);
when(peerConnection.getPeer()).thenReturn(peerInfo);
when(peerConnection.getPeerInfo()).thenReturn(peerInfo);
when(peerConnection.getLocalAddress()).thenReturn(localAddress);
when(peerConnection.getRemoteAddress()).thenReturn(remoteAddress);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.enode.EnodeURL;

import java.net.SocketAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -260,7 +260,7 @@ public Set<Capability> getAgreedCapabilities() {
}

@Override
public PeerInfo getPeer() {
public PeerInfo getPeerInfo() {
return new PeerInfo(
5,
"mock-network-client",
Expand All @@ -287,12 +287,12 @@ public boolean isDisconnected() {
}

@Override
public SocketAddress getLocalAddress() {
public InetSocketAddress getLocalAddress() {
throw new UnsupportedOperationException();
}

@Override
public SocketAddress getRemoteAddress() {
public InetSocketAddress getRemoteAddress() {
throw new UnsupportedOperationException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public void exchangeMessages() throws Exception {
final CompletableFuture<Message> messageFuture = new CompletableFuture<>();
network1.subscribe(cap, messageFuture::complete);
final Predicate<PeerConnection> isPeerOne =
peerConnection -> peerConnection.getPeer().getNodeId().equals(one.getId());
peerConnection -> peerConnection.getPeerInfo().getNodeId().equals(one.getId());
final Predicate<PeerConnection> isPeerTwo =
peerConnection -> peerConnection.getPeer().getNodeId().equals(two.getId());
peerConnection -> peerConnection.getPeerInfo().getNodeId().equals(two.getId());
Assertions.assertThat(network1.getPeers().stream().filter(isPeerTwo).findFirst())
.isNotPresent();
Assertions.assertThat(network2.getPeers().stream().filter(isPeerOne).findFirst())
Expand All @@ -60,8 +60,8 @@ public void exchangeMessages() throws Exception {
final CompletableFuture<PeerConnection> peer1Future = new CompletableFuture<>();
network2.subscribeConnect(peer1Future::complete);
network1.connect(two).get();
Assertions.assertThat(peer1Future.get().getPeer().getNodeId()).isEqualTo(one.getId());
Assertions.assertThat(peer2Future.get().getPeer().getNodeId()).isEqualTo(two.getId());
Assertions.assertThat(peer1Future.get().getPeerInfo().getNodeId()).isEqualTo(one.getId());
Assertions.assertThat(peer2Future.get().getPeerInfo().getNodeId()).isEqualTo(two.getId());
Assertions.assertThat(network1.getPeers().stream().filter(isPeerTwo).findFirst()).isPresent();
final Optional<PeerConnection> optionalConnection =
network2.getPeers().stream().filter(isPeerOne).findFirst();
Expand All @@ -78,7 +78,7 @@ public void exchangeMessages() throws Exception {
final MessageData receivedMessageData = receivedMessage.getData();
Assertions.assertThat(receivedMessageData.getData().compareTo(BytesValue.wrap(data)))
.isEqualTo(0);
Assertions.assertThat(receivedMessage.getConnection().getPeer().getNodeId())
Assertions.assertThat(receivedMessage.getConnection().getPeerInfo().getNodeId())
.isEqualTo(two.getId());
Assertions.assertThat(receivedMessageData.getSize()).isEqualTo(size);
Assertions.assertThat(receivedMessageData.getCode()).isEqualTo(code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public InsufficientPeersPermissioningProvider(
}

private boolean isNotABootnode(final PeerConnection peerConnection) {
return bootnodeEnodes.stream().noneMatch(peerConnection::isRemoteEnode);
return bootnodeEnodes.stream()
.noneMatch((bootNode) -> peerConnection.getRemoteEnode().sameEndpoint(bootNode));
}

private long countP2PNetworkNonBootnodeConnections() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Set;

/** A P2P connection to another node. */
Expand Down Expand Up @@ -70,7 +69,7 @@ default void sendForProtocol(final String protocol, final MessageData message)
*
* @return Peer Description
*/
PeerInfo getPeer();
PeerInfo getPeerInfo();

/**
* Immediately terminate the connection without sending a disconnect message.
Expand All @@ -90,9 +89,9 @@ default void sendForProtocol(final String protocol, final MessageData message)
/** @return True if the peer is disconnected */
boolean isDisconnected();

SocketAddress getLocalAddress();
InetSocketAddress getLocalAddress();

SocketAddress getRemoteAddress();
InetSocketAddress getRemoteAddress();

class PeerNotConnected extends IOException {

Expand All @@ -101,11 +100,11 @@ public PeerNotConnected(final String message) {
}
}

default boolean isRemoteEnode(final EnodeURL remoteEnodeUrl) {
return ((remoteEnodeUrl.getNodeId().equals(this.getPeer().getAddress()))
&& (remoteEnodeUrl.getListeningPort() == this.getPeer().getPort())
&& (remoteEnodeUrl
.getInetAddress()
.equals(((InetSocketAddress) this.getRemoteAddress()).getAddress())));
default EnodeURL getRemoteEnode() {
return EnodeURL.builder()
.nodeId(getPeerInfo().getNodeId())
.listeningPort(getPeerInfo().getPort())
.ipAddress(getRemoteAddress().getAddress())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void onDisconnect(
final PeerConnection connection,
final DisconnectMessage.DisconnectReason reason,
final boolean initiatedByPeer) {
final BytesValue nodeId = connection.getPeer().getNodeId();
final BytesValue nodeId = connection.getPeerInfo().getNodeId();
peerTable.tryEvict(new DefaultPeerId(nodeId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ protected void channelRead0(final ChannelHandlerContext ctx, final MessageData o
LOG.debug(
"Received Wire DISCONNECT ({}) from peer: {}",
reason.name(),
connection.getPeer().getClientId());
connection.getPeerInfo().getClientId());
} catch (final RLPException e) {
LOG.debug(
"Received Wire DISCONNECT with invalid RLP. Peer: {}",
connection.getPeer().getClientId());
connection.getPeerInfo().getClientId());
} catch (final Exception e) {
LOG.error(
"Received Wire DISCONNECT, but unable to parse reason. Peer: {}",
connection.getPeer().getClientId(),
connection.getPeerInfo().getClientId(),
e);
}
connection.terminateConnection(reason, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ protected void initChannel(final SocketChannel ch) {
LOG.debug(
"Disconnecting incoming connection because connection limit of {} has been reached: {}",
maxPeers,
connection.getPeer().getNodeId());
connection.getPeerInfo().getNodeId());
connection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return;
}
Expand All @@ -377,7 +377,7 @@ protected void initChannel(final SocketChannel ch) {

onConnectionEstablished(connection);
LOG.debug(
"Successfully accepted connection from {}", connection.getPeer().getNodeId());
"Successfully accepted connection from {}", connection.getPeerInfo().getNodeId());
logConnections();
});
}
Expand Down Expand Up @@ -591,7 +591,7 @@ private Consumer<PeerDroppedEvent> handlePeerDroppedEvents() {
return event -> {
final Peer peer = event.getPeer();
getPeers().stream()
.filter(p -> p.getPeer().getNodeId().equals(peer.getId()))
.filter(p -> p.getPeerInfo().getNodeId().equals(peer.getId()))
.findFirst()
.ifPresent(p -> p.disconnect(DisconnectReason.REQUESTED));
};
Expand Down Expand Up @@ -626,18 +626,17 @@ private boolean isPeerConnectionAllowed(final PeerConnection peerConnection) {
}

LOG.trace(
"Checking if connection with peer {} is permitted", peerConnection.getPeer().getNodeId());
"Checking if connection with peer {} is permitted",
peerConnection.getPeerInfo().getNodeId());

return nodePermissioningController
.map(
c -> {
final EnodeURL localPeerEnodeURL =
peerInfoToEnodeURL(
ourPeerInfo, (InetSocketAddress) peerConnection.getLocalAddress());
peerInfoToEnodeURL(ourPeerInfo, peerConnection.getLocalAddress());
final EnodeURL remotePeerEnodeURL =
peerInfoToEnodeURL(
peerConnection.getPeer(),
(InetSocketAddress) peerConnection.getRemoteAddress());
peerConnection.getPeerInfo(), peerConnection.getRemoteAddress());
return c.isPermitted(localPeerEnodeURL, remotePeerEnodeURL);
})
.orElse(true);
Expand Down
Loading

0 comments on commit a2518a2

Please sign in to comment.