Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Set disconnect flag immediately when disconnecting a peer (#1521)
Browse files Browse the repository at this point in the history
Previously was only set after callbacks were invoked, allowing additional messages to be sent while parts of the supporting infrastructure for the peer was being torn down.
  • Loading branch information
ajsutton authored Jun 3, 2019
1 parent afae31e commit 871256f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ final class NettyPeerConnection implements PeerConnection {
private final PeerInfo peerInfo;
private final Set<Capability> agreedCapabilities;
private final Map<String, Capability> protocolToCapability = new HashMap<>();
private final AtomicBoolean disconnectDispatched = new AtomicBoolean(false);
private final AtomicBoolean disconnected = new AtomicBoolean(false);
private final Callbacks callbacks;
private final CapabilityMultiplexer multiplexer;
Expand Down Expand Up @@ -81,6 +80,10 @@ public void send(final Capability capability, final MessageData message) throws
if (isDisconnected()) {
throw new PeerNotConnected("Attempt to send message to a closed peer connection");
}
doSend(capability, message);
}

private void doSend(final Capability capability, final MessageData message) {
if (capability != null) {
// Validate message is valid for this capability
final SubProtocol subProtocol = multiplexer.subProtocol(capability);
Expand Down Expand Up @@ -133,10 +136,9 @@ public Set<Capability> getAgreedCapabilities() {

@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {
if (disconnectDispatched.compareAndSet(false, true)) {
if (disconnected.compareAndSet(false, true)) {
LOG.debug("Disconnected ({}) from {}", reason, peerInfo);
callbacks.invokeDisconnect(this, reason, peerInitiated);
disconnected.set(true);
}
// Always ensure the context gets closed immediately even if we previously sent a disconnect
// message and are waiting to close.
Expand All @@ -145,16 +147,10 @@ public void terminateConnection(final DisconnectReason reason, final boolean pee

@Override
public void disconnect(final DisconnectReason reason) {
if (disconnectDispatched.compareAndSet(false, true)) {
if (disconnected.compareAndSet(false, true)) {
LOG.debug("Disconnecting ({}) from {}", reason, peerInfo);
callbacks.invokeDisconnect(this, reason, false);
try {
send(null, DisconnectMessage.create(reason));
} catch (final PeerNotConnected e) {
// The connection has already been closed - nothing left to do
return;
}
disconnected.set(true);
doSend(null, DisconnectMessage.create(reason));
ctx.channel().eventLoop().schedule((Callable<ChannelFuture>) ctx::close, 2L, SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
package tech.pegasys.pantheon.ethereum.p2p.network.netty;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
Expand All @@ -30,7 +35,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -71,7 +75,21 @@ public void setUp() {
@Test
public void shouldThrowExceptionWhenAttemptingToSendMessageOnClosedConnection() {
connection.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
Assertions.assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
.isInstanceOfAny(PeerNotConnected.class);
}

@Test
public void shouldThrowExceptionWhenAttemptingToSendMessageWhileDisconnecting() {
doAnswer(
invocation ->
assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
.isInstanceOfAny(PeerNotConnected.class))
.when(callbacks)
.invokeDisconnect(any(), any(), anyBoolean());

connection.disconnect(DisconnectReason.USELESS_PEER);

verify(callbacks).invokeDisconnect(connection, DisconnectReason.USELESS_PEER, false);
}
}

0 comments on commit 871256f

Please sign in to comment.