Skip to content

Commit

Permalink
Set disconnect flag immediately when disconnecting a peer (PegaSysEng…
Browse files Browse the repository at this point in the history
…#1521)

Previously was only set after callbacks were invoked, allowing additional messages to be sent while parts of the supporting infrastructure for the peer was being torn down.
  • Loading branch information
ajsutton authored and iikirilov committed Jun 8, 2019
1 parent 087d04e commit 82f51fa
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ final class NettyPeerConnection implements PeerConnection {
private final PeerInfo peerInfo;
private final Set<Capability> agreedCapabilities;
private final Map<String, Capability> protocolToCapability = new HashMap<>();
private final AtomicBoolean disconnectDispatched = new AtomicBoolean(false);
private final AtomicBoolean disconnected = new AtomicBoolean(false);
private final Callbacks callbacks;
private final CapabilityMultiplexer multiplexer;
Expand Down Expand Up @@ -81,6 +80,10 @@ public void send(final Capability capability, final MessageData message) throws
if (isDisconnected()) {
throw new PeerNotConnected("Attempt to send message to a closed peer connection");
}
doSend(capability, message);
}

private void doSend(final Capability capability, final MessageData message) {
if (capability != null) {
// Validate message is valid for this capability
final SubProtocol subProtocol = multiplexer.subProtocol(capability);
Expand Down Expand Up @@ -133,10 +136,9 @@ public Set<Capability> getAgreedCapabilities() {

@Override
public void terminateConnection(final DisconnectReason reason, final boolean peerInitiated) {
if (disconnectDispatched.compareAndSet(false, true)) {
if (disconnected.compareAndSet(false, true)) {
LOG.debug("Disconnected ({}) from {}", reason, peerInfo);
callbacks.invokeDisconnect(this, reason, peerInitiated);
disconnected.set(true);
}
// Always ensure the context gets closed immediately even if we previously sent a disconnect
// message and are waiting to close.
Expand All @@ -145,16 +147,10 @@ public void terminateConnection(final DisconnectReason reason, final boolean pee

@Override
public void disconnect(final DisconnectReason reason) {
if (disconnectDispatched.compareAndSet(false, true)) {
if (disconnected.compareAndSet(false, true)) {
LOG.debug("Disconnecting ({}) from {}", reason, peerInfo);
callbacks.invokeDisconnect(this, reason, false);
try {
send(null, DisconnectMessage.create(reason));
} catch (final PeerNotConnected e) {
// The connection has already been closed - nothing left to do
return;
}
disconnected.set(true);
doSend(null, DisconnectMessage.create(reason));
ctx.channel().eventLoop().schedule((Callable<ChannelFuture>) ctx::close, 2L, SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
package tech.pegasys.pantheon.ethereum.p2p.network.netty;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
Expand All @@ -30,7 +35,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -71,7 +75,21 @@ public void setUp() {
@Test
public void shouldThrowExceptionWhenAttemptingToSendMessageOnClosedConnection() {
connection.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
Assertions.assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
.isInstanceOfAny(PeerNotConnected.class);
}

@Test
public void shouldThrowExceptionWhenAttemptingToSendMessageWhileDisconnecting() {
doAnswer(
invocation ->
assertThatThrownBy(() -> connection.send(null, HelloMessage.create(peerInfo)))
.isInstanceOfAny(PeerNotConnected.class))
.when(callbacks)
.invokeDisconnect(any(), any(), anyBoolean());

connection.disconnect(DisconnectReason.USELESS_PEER);

verify(callbacks).invokeDisconnect(connection, DisconnectReason.USELESS_PEER, false);
}
}

0 comments on commit 82f51fa

Please sign in to comment.