Skip to content

Commit

Permalink
[PAN-3155] Handle discovery peers with updated endpoints (#12)
Browse files Browse the repository at this point in the history
Update discovery logic to consider a peer with an unknown discovery endpoint to be unknown regardless of whether we've encountered a peer with the same node id before. This makes the discovery logic more forgiving in the face of node restarts.

If nodeA bonds with nodeB, then nodeB leaves the network and later comes back with a different ip address or listening port, nodeA would previously continue trying to communicate with nodeB at its original address. With these changes, nodeA will now treat the restarted nodeB as a new peer and communicate with it on its updated endpoint. Additionally, nodeB's information will be updated in the peer table so that neighbors requests return updated information on this node.

Signed-off-by: Danno Ferrin <[email protected]>
  • Loading branch information
mbaxter authored and shemnon committed Sep 17, 2019
1 parent e47edb5 commit 1d74e60
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 10 deletions.
2 changes: 0 additions & 2 deletions acceptance-tests/dsl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,4 @@ dependencies {
implementation 'tech.pegasys.ethsigner.internal:core'
implementation 'tech.pegasys.ethsigner.internal:file-based'
implementation 'tech.pegasys.ethsigner.internal:signing-api'


}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public Endpoint getEndpoint() {
return endpoint;
}

public boolean discoveryEndpointMatches(final DiscoveryPeer peer) {
return peer.getEndpoint().getHost().equals(endpoint.getHost())
&& peer.getEndpoint().getUdpPort() == endpoint.getUdpPort();
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DiscoveryPeer{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ synchronized Optional<DiscoveryPeer> add(final DiscoveryPeer peer)

// Avoid duplicating the peer if it already exists in the bucket.
for (int i = 0; i <= tailIndex; i++) {
if (peer.equals(kBucket[i])) {
if (peer.getId().equals(kBucket[i].getId())) {
throw new IllegalArgumentException(
String.format("Tried to add duplicate peer to k-bucket: %s", peer.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
}

// Load the peer from the table, or use the instance that comes in.
final Optional<DiscoveryPeer> maybeKnownPeer = peerTable.get(sender);
final Optional<DiscoveryPeer> maybeKnownPeer =
peerTable.get(sender).filter(known -> known.discoveryEndpointMatches(sender));
final DiscoveryPeer peer = maybeKnownPeer.orElse(sender);
final boolean peerKnown = maybeKnownPeer.isPresent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
Expand Down Expand Up @@ -308,6 +310,104 @@ public void bonding_disallowOutgoingBonding() {
assertThat(remoteIncomingPackets).isEmpty();
}

/**
* These tests simulates the case where a node crashes then comes back up with a new ip address or
* listening port.
*/
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedPort() {
simulatePeerRestartingOnDifferentEndpoint(false, true);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHost() {
simulatePeerRestartingOnDifferentEndpoint(true, false);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHostAndPort() {
simulatePeerRestartingOnDifferentEndpoint(true, true);
}

public void simulatePeerRestartingOnDifferentEndpoint(
final boolean updateHost, final boolean updatePort) {
// Setup peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent();
final DiscoveryPeer agentPeer = agent.getAdvertisedPeer().get();

final KeyPair remoteKeyPair = SECP256K1.KeyPair.generate();
final String remoteIp = "1.2.3.4";
final MockPeerDiscoveryAgent remoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(remoteIp)
.bootstrapPeers(agentPeer));

agent.start(999);
remoteAgent.start(888);
final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get();

// Remote agent should have bonded with agent
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.streamDiscoveredPeers()).contains(remoteAgent.getAdvertisedPeer().get());

// Create a new remote agent with same id, and new endpoint
remoteAgent.stop();
final int newPort = updatePort ? 0 : remotePeer.getEndpoint().getUdpPort();
final String newIp = updateHost ? "1.2.3.5" : remoteIp;
final MockPeerDiscoveryAgent updatedRemoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(newIp)
.bindPort(newPort)
.bootstrapPeers(agentPeer));
updatedRemoteAgent.start(889);
final DiscoveryPeer updatedRemotePeer = updatedRemoteAgent.getAdvertisedPeer().get();

// Sanity check
assertThat(
updatedRemotePeer.getEndpoint().getUdpPort() == remotePeer.getEndpoint().getUdpPort())
.isEqualTo(!updatePort);
assertThat(updatedRemotePeer.getEndpoint().getHost().equals(remotePeer.getEndpoint().getHost()))
.isEqualTo(!updateHost);
assertThat(updatedRemotePeer.getId()).isEqualTo(remotePeer.getId());

// Check that our restarted agent receives a PONG response
final List<IncomingPacket> incomingPackets = updatedRemoteAgent.getIncomingPackets();
assertThat(incomingPackets).hasSizeGreaterThan(0);
final long pongCount =
incomingPackets.stream()
.filter(packet -> packet.fromAgent.equals(agent))
.filter(packet -> packet.packet.getType().equals(PacketType.PONG))
.count();
assertThat(pongCount).isGreaterThan(0);

// Check that agent has an endpoint matching the restarted node
final List<DiscoveryPeer> matchingPeers =
agent
.streamDiscoveredPeers()
.filter(peer -> peer.getId().equals(updatedRemotePeer.getId()))
.collect(toList());
// We should have only one peer matching this id
assertThat(matchingPeers.size()).isEqualTo(1);
final DiscoveryPeer discoveredPeer = matchingPeers.get(0);
assertThat(discoveredPeer.getEndpoint().getUdpPort())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEndpoint().getHost())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
// Check endpoint is consistent with enodeURL
assertThat(discoveredPeer.getEnodeURL().getDiscoveryPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.getEnodeURL().getListeningPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getFunctionalTcpPort());
assertThat(discoveredPeer.getEnodeURL().getIpAsString())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
}

@Test
public void neighbors_allowOutgoingRequest() {
// Setup peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.hyperledger.besu.ethereum.p2p.discovery;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;

import org.hyperledger.besu.crypto.SECP256K1;
Expand All @@ -31,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -179,6 +181,9 @@ public static class AgentBuilder {
private List<EnodeURL> bootnodes = Collections.emptyList();
private boolean active = true;
private PeerPermissions peerPermissions = PeerPermissions.noop();
private String advertisedHost = "127.0.0.1";
private OptionalInt bindPort = OptionalInt.empty();
private KeyPair keyPair = SECP256K1.KeyPair.generate();

private AgentBuilder(
final Map<BytesValue, MockPeerDiscoveryAgent> agents,
Expand Down Expand Up @@ -215,14 +220,37 @@ public AgentBuilder active(final boolean active) {
return this;
}

public AgentBuilder advertisedHost(final String host) {
checkNotNull(host);
this.advertisedHost = host;
return this;
}

public AgentBuilder bindPort(final int bindPort) {
if (bindPort == 0) {
// Zero means pick the next available port
this.bindPort = OptionalInt.empty();
return this;
}
this.bindPort = OptionalInt.of(bindPort);
return this;
}

public AgentBuilder keyPair(final KeyPair keyPair) {
checkNotNull(keyPair);
this.keyPair = keyPair;
return this;
}

public MockPeerDiscoveryAgent build() {
final int port = bindPort.orElseGet(nextAvailablePort::incrementAndGet);
final DiscoveryConfiguration config = new DiscoveryConfiguration();
config.setBootnodes(bootnodes);
config.setBindPort(nextAvailablePort.incrementAndGet());
config.setAdvertisedHost(advertisedHost);
config.setBindPort(port);
config.setActive(active);

return new MockPeerDiscoveryAgent(
SECP256K1.KeyPair.generate(), config, peerPermissions, agents);
return new MockPeerDiscoveryAgent(keyPair, config, peerPermissions, agents);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
// The set of known agents operating on the network
private final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork;
private final Deque<IncomingPacket> incomingPackets = new ArrayDeque<>();
private boolean isRunning = false;

public MockPeerDiscoveryAgent(
final KeyPair keyPair,
Expand Down Expand Up @@ -65,25 +66,45 @@ public List<IncomingPacket> getIncomingPackets() {

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
isRunning = true;
// Skip network setup for tests
InetSocketAddress address =
new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort());
InetSocketAddress address = new InetSocketAddress(config.getBindHost(), config.getBindPort());
return CompletableFuture.completedFuture(address);
}

@Override
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer toPeer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (!this.isRunning) {
result.completeExceptionally(new Exception("Attempt to send message from an inactive agent"));
}

MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId());
if (toAgent == null) {
result.completeExceptionally(
new Exception(
"Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper."));
return result;
}

final DiscoveryPeer agentPeer = toAgent.getAdvertisedPeer().get();
if (!toPeer.getEndpoint().getHost().equals(agentPeer.getEndpoint().getHost())) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong host address. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getHost(),
agentPeer.getEndpoint().getHost());
} else if (toPeer.getEndpoint().getUdpPort() != agentPeer.getEndpoint().getUdpPort()) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong udp port. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getUdpPort(),
agentPeer.getEndpoint().getUdpPort());
} else if (!toAgent.isRunning) {
LOG.warn("Attempt to send packet to an inactive peer.");
} else {
toAgent.processIncomingPacket(this, packet);
result.complete(null);
}
result.complete(null);
return result;
}

Expand All @@ -99,6 +120,7 @@ protected AsyncExecutor createWorkerExecutor() {

@Override
public CompletableFuture<?> stop() {
isRunning = false;
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@

import static org.assertj.core.api.Assertions.assertThat;

import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
import org.hyperledger.besu.ethereum.p2p.discovery.Endpoint;
import org.hyperledger.besu.ethereum.p2p.discovery.PeerDiscoveryTestHelper;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerTable.EvictResult.EvictOutcome;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.List;
import java.util.OptionalInt;

import org.junit.Test;

Expand Down Expand Up @@ -73,6 +77,69 @@ public void peerExists() {
});
}

@Test
public void peerExists_withDifferentIp() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30303, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void peerExists_withDifferentUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void peerExists_withDifferentIdAndUdpPort() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
final BytesValue peerId = KeyPair.generate().getPublicKey().getEncodedBytes();
final DiscoveryPeer peer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.1", 30303, OptionalInt.empty()));

assertThat(table.tryAdd(peer).getOutcome()).isEqualTo(AddOutcome.ADDED);

final DiscoveryPeer duplicatePeer =
DiscoveryPeer.fromIdAndEndpoint(
peerId, new Endpoint("1.1.1.2", 30301, OptionalInt.empty()));
assertThat(table.tryAdd(duplicatePeer))
.satisfies(
result -> {
assertThat(result.getOutcome()).isEqualTo(AddOutcome.ALREADY_EXISTED);
assertThat(result.getEvictionCandidate()).isNull();
});
}

@Test
public void evictExistingPeerShouldEvict() {
final PeerTable table = new PeerTable(Peer.randomId(), 16);
Expand Down

0 comments on commit 1d74e60

Please sign in to comment.