Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2239] Disconnect peer removed from node whitelist #877

Merged
merged 16 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.URI;
import java.util.Collections;

import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -53,6 +54,18 @@ public void permissionedNodeShouldDiscoverOnlyAllowedNode() {
permissionedNode.verify(net.awaitPeerCount(1));
}

@Test
public void permissionedNodeShouldDisconnectFromNodeRemovedFromWhitelist() {
permissionedNode.verify(net.awaitPeerCount(1));

// remove allowed node from the whitelist
permissionedNode.verify(
perm.removeNodesFromWhitelist(Lists.newArrayList(((PantheonNode) allowedNode).enodeUrl())));

// node should not be connected to any peers
permissionedNode.verify(net.awaitPeerCount(0));
}

private URI getEnodeURI(final Node node) {
return URI.create(((PantheonNode) node).getConfiguration().enodeUrl());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerDroppedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement;
Expand Down Expand Up @@ -86,6 +87,7 @@ public abstract class PeerDiscoveryAgent implements DisconnectCallback {
/* Is discovery enabled? */
private boolean isActive = false;
private final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers = new Subscribers<>();
private final Subscribers<Consumer<PeerDroppedEvent>> peerDroppedObservers = new Subscribers<>();

public PeerDiscoveryAgent(
final SECP256K1.KeyPair keyPair,
Expand Down Expand Up @@ -164,7 +166,8 @@ private PeerDiscoveryController createController() {
peerRequirement,
peerBlacklist,
nodeWhitelistController,
peerBondedObservers);
peerBondedObservers,
peerDroppedObservers);
}

protected boolean validatePacketSize(final int packetSize) {
Expand Down Expand Up @@ -261,6 +264,29 @@ public boolean removePeerBondedObserver(final long observerId) {
return peerBondedObservers.unsubscribe(observerId);
}

/**
* Adds an observer that will get called when a peer is dropped from the peer table.
*
* <p><i>No guarantees are made about the order in which observers are invoked.</i>
*
* @param observer The observer to call.
* @return A unique ID identifying this observer, to that it can be removed later.
*/
public long observePeerDroppedEvents(final Consumer<PeerDroppedEvent> observer) {
checkNotNull(observer);
return peerDroppedObservers.subscribe(observer);
}

/**
* Removes an previously added peer dropped observer.
*
* @param observerId The unique ID identifying the observer to remove.
* @return Whether the observer was located and removed.
*/
public boolean removePeerDroppedObserver(final long observerId) {
return peerDroppedObservers.unsubscribe(observerId);
}

/**
* Returns the count of observers that are registered on this controller.
*
Expand Down Expand Up @@ -292,7 +318,7 @@ public void onDisconnect(
final DisconnectMessage.DisconnectReason reason,
final boolean initiatedByPeer) {
final BytesValue nodeId = connection.getPeer().getNodeId();
peerTable.evict(new DefaultPeerId(nodeId));
peerTable.tryEvict(new DefaultPeerId(nodeId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ synchronized boolean evict(final PeerId peer) {
}
// If found, shift all subsequent elements to the left, and decrement tailIndex.
for (int i = 0; i <= tailIndex; i++) {
if (peer.equals(kBucket[i])) {
// Peer comparison here must be done by peer id
if (peer.getId().equals(kBucket[i].getId())) {
arraycopy(kBucket, i + 1, kBucket, i, tailIndex - i);
kBucket[tailIndex--] = null;
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;

import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DiscoveryProtocolLogger {

private static final Logger LOG = LogManager.getLogger();

static void logSendingPacket(final Peer peer, final Packet packet) {
LOG.trace(
"<<< Sending {} packet from peer {} ({}): {}",
shortenPacketType(packet),
peer.getId().slice(0, 16),
peer.getEndpoint(),
packet);
}

static void logReceivedPacket(final Peer peer, final Packet packet) {
LOG.trace(
">>> Received {} packet from peer {} ({}): {}",
shortenPacketType(packet),
peer.getId().slice(0, 16),
peer.getEndpoint(),
packet);
}

private static String shortenPacketType(final Packet packet) {
switch (packet.getType()) {
case PING:
return "PING ";
case PONG:
return "PONG ";
case FIND_NEIGHBORS:
return "FINDN";
case NEIGHBORS:
return "NEIGH";
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.AddResult.Outcome;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.DiscoveryProtocolLogger.logReceivedPacket;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.DiscoveryProtocolLogger.logSendingPacket;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.AddResult.AddOutcome;

import tech.pegasys.pantheon.crypto.SECP256K1;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerBondedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerDroppedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryStatus;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.EvictResult;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable.EvictResult.EvictOutcome;
import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodeWhitelistUpdatedEvent;
import tech.pegasys.pantheon.util.Subscribers;
import tech.pegasys.pantheon.util.bytes.BytesValue;

Expand Down Expand Up @@ -123,6 +129,7 @@ public class PeerDiscoveryController {

// Observers for "peer bonded" discovery events.
private final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers;
private final Subscribers<Consumer<PeerDroppedEvent>> peerDroppedObservers;

private RecursivePeerRefreshState recursivePeerRefreshState;

Expand All @@ -137,7 +144,8 @@ public PeerDiscoveryController(
final PeerRequirement peerRequirement,
final PeerBlacklist peerBlacklist,
final Optional<NodeWhitelistController> nodeWhitelistController,
final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers) {
final Subscribers<Consumer<PeerBondedEvent>> peerBondedObservers,
final Subscribers<Consumer<PeerDroppedEvent>> peerDroppedObservers) {
this.timerUtil = timerUtil;
this.keypair = keypair;
this.localPeer = localPeer;
Expand All @@ -149,6 +157,7 @@ public PeerDiscoveryController(
this.nodeWhitelistController = nodeWhitelistController;
this.outboundMessageHandler = outboundMessageHandler;
this.peerBondedObservers = peerBondedObservers;
this.peerDroppedObservers = peerDroppedObservers;
}

public CompletableFuture<?> start() {
Expand Down Expand Up @@ -184,6 +193,9 @@ public CompletableFuture<?> start() {
this::refreshTableIfRequired);
tableRefreshTimerId = OptionalLong.of(timerId);

nodeWhitelistController.ifPresent(
c -> c.subscribeToListUpdatedEvent(this::handleNodeWhitelistUpdatedEvent));

return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -217,12 +229,7 @@ private boolean whitelistIfPresentIsNodePermitted(final DiscoveryPeer sender) {
* @param sender The sender.
*/
public void onMessage(final Packet packet, final DiscoveryPeer sender) {
LOG.trace(
"<<< Received {} discovery packet from {} ({}): {}",
packet.getType(),
sender.getEndpoint(),
sender.getId().slice(0, 16),
packet);
logReceivedPacket(sender, packet);

// Message from self. This should not happen.
if (sender.getId().equals(localPeer.getId())) {
Expand All @@ -242,14 +249,12 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {

switch (packet.getType()) {
case PING:
LOG.trace("Received PING packet from {}", sender.getEnodeURI());
if (!peerBlacklisted && addToPeerTable(peer)) {
final PingPacketData ping = packet.getPacketData(PingPacketData.class).get();
respondToPing(ping, packet.getHash(), peer);
}
break;
case PONG:
LOG.trace("Received PONG packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction -> {
Expand All @@ -261,15 +266,13 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {
});
break;
case NEIGHBORS:
LOG.trace("Received NEIGHBORS packet from {}", sender.getEnodeURI());
matchInteraction(packet)
.ifPresent(
interaction ->
recursivePeerRefreshState.onNeighboursPacketReceived(
peer, packet.getPacketData(NeighborsPacketData.class).orElse(null)));
break;
case FIND_NEIGHBORS:
LOG.trace("Received FIND_NEIGHBORS packet from {}", sender.getEnodeURI());
if (!peerKnown || peerBlacklisted) {
break;
}
Expand All @@ -282,7 +285,7 @@ public void onMessage(final Packet packet, final DiscoveryPeer sender) {

private boolean addToPeerTable(final DiscoveryPeer peer) {
final PeerTable.AddResult result = peerTable.tryAdd(peer);
if (result.getOutcome() == Outcome.SELF) {
if (result.getOutcome() == AddOutcome.SELF) {
return false;
}

Expand All @@ -298,23 +301,45 @@ private boolean addToPeerTable(final DiscoveryPeer peer) {
notifyPeerBonded(peer, now);
}

if (result.getOutcome() == Outcome.ALREADY_EXISTED) {
if (result.getOutcome() == AddOutcome.ALREADY_EXISTED) {
// Bump peer.
peerTable.evict(peer);
peerTable.tryEvict(peer);
peerTable.tryAdd(peer);
} else if (result.getOutcome() == Outcome.BUCKET_FULL) {
peerTable.evict(result.getEvictionCandidate());
} else if (result.getOutcome() == AddOutcome.BUCKET_FULL) {
peerTable.tryEvict(result.getEvictionCandidate());
peerTable.tryAdd(peer);
}

return true;
}

private void handleNodeWhitelistUpdatedEvent(final NodeWhitelistUpdatedEvent event) {
event.getRemovedNodes().stream()
.map(e -> new DiscoveryPeer(DiscoveryPeer.fromURI(e.toURI())))
.forEach(this::dropFromPeerTable);
}

@VisibleForTesting
boolean dropFromPeerTable(final DiscoveryPeer peer) {
final EvictResult evictResult = peerTable.tryEvict(peer);
if (evictResult.getOutcome() == EvictOutcome.EVICTED) {
notifyPeerDropped(peer, System.currentTimeMillis());
return true;
} else {
return false;
}
}

private void notifyPeerBonded(final DiscoveryPeer peer, final long now) {
final PeerBondedEvent event = new PeerBondedEvent(peer, now);
dispatchEvent(peerBondedObservers, event);
}

private void notifyPeerDropped(final DiscoveryPeer peer, final long now) {
final PeerDroppedEvent event = new PeerDroppedEvent(peer, now);
dispatchEvent(peerDroppedObservers, event);
}

private Optional<PeerInteractionState> matchInteraction(final Packet packet) {
final PeerInteractionState interaction = inflightInteractions.get(packet.getNodeId());
if (interaction == null || !interaction.test(packet)) {
Expand Down Expand Up @@ -389,10 +414,12 @@ void bond(final DiscoveryPeer peer) {

private void sendPacket(final DiscoveryPeer peer, final PacketType type, final PacketData data) {
Packet packet = createPacket(type, data);
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
}

private void sendPacket(final DiscoveryPeer peer, final Packet packet) {
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
}

Expand Down
Loading