Skip to content

Commit

Permalink
HBASE-26666. Use AtomicReference to prevent race on connection channel
Browse files Browse the repository at this point in the history
  • Loading branch information
anmolnar committed Mar 1, 2022
1 parent 8f350cf commit 4debffc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_NETTY_RPCSERVER_TLS_ENABLED;
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import java.io.IOException;
Expand All @@ -30,6 +29,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -58,7 +58,6 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
Expand All @@ -72,12 +71,7 @@

/**
* RPC connection implementation based on netty.
* <p/>
* Most operations are executed in handlers. Netty handler is always executed in the same
* thread(EventLoop) so no lock is needed.
* <p/>
* <strong>Implementation assumptions:</strong> All the private methods should be called in the
* {@link #eventLoop} thread, otherwise there will be races.
*
* @since 2.0.0
*/
@InterfaceAudience.Private
Expand All @@ -91,24 +85,17 @@ class NettyRpcConnection extends RpcConnection {

private final NettyRpcClient rpcClient;

// the event loop used to set up the connection, we will also execute other operations for this
// connection in this event loop, to avoid locking everywhere.
private final EventLoop eventLoop;

private ByteBuf connectionHeaderPreamble;

private ByteBuf connectionHeaderWithLength;

// make it volatile so in the isActive method below we do not need to switch to the event loop
// thread to access this field.
private volatile Channel channel;
private final AtomicReference<Channel> channelRef = new AtomicReference<>();

NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
rpcClient.metrics);
this.rpcClient = rpcClient;
this.eventLoop = rpcClient.group.next();
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
this.connectionHeaderPreamble =
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
Expand All @@ -120,43 +107,40 @@ class NettyRpcConnection extends RpcConnection {

@Override
protected void callTimeout(Call call) {
execute(eventLoop, () -> {
if (channel != null) {
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
}
});
Channel channel = channelRef.get();
if (channel != null) {
channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
}
}

@Override
public boolean isActive() {
return channel != null;
return channelRef.get() != null;
}

private void shutdown0() {
assert eventLoop.inEventLoop();
Channel channel = channelRef.get();
if (channel != null) {
channel.close();
channel = null;
channelRef.compareAndSet(channel, null);
}
}

@Override
public void shutdown() {
execute(eventLoop, this::shutdown0);
this.shutdown0();
}

@Override
public void cleanupConnection() {
execute(eventLoop, () -> {
if (connectionHeaderPreamble != null) {
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
connectionHeaderPreamble = null;
}
if (connectionHeaderWithLength != null) {
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
connectionHeaderWithLength = null;
}
});
if (connectionHeaderPreamble != null) {
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
connectionHeaderPreamble = null;
}
if (connectionHeaderWithLength != null) {
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
connectionHeaderWithLength = null;
}
}

private void established(Channel ch) {
Expand Down Expand Up @@ -190,9 +174,7 @@ private void scheduleRelogin(Throwable error) {
} catch (IOException e) {
LOG.warn("Relogin failed", e);
}
eventLoop.execute(() -> {
reloginInProgress = false;
});
reloginInProgress = false;
}, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -276,7 +258,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
});
}

private void connect() throws UnknownHostException, InterruptedException {
private Channel connect() throws UnknownHostException, InterruptedException {
LOG.trace("Connecting to {}", remoteId.getAddress());
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
Bootstrap bootstrap = new Bootstrap()
Expand All @@ -291,7 +273,7 @@ private void connect() throws UnknownHostException, InterruptedException {

bootstrap.validate();

this.channel = bootstrap
return bootstrap
.localAddress(rpcClient.localAddr)
.remoteAddress(remoteAddr)
.connect()
Expand Down Expand Up @@ -325,6 +307,7 @@ private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException
@Override
public void run(Object parameter) {
setCancelled(call);
Channel channel = channelRef.get();
if (channel != null) {
channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
}
Expand All @@ -336,9 +319,11 @@ public void run(boolean cancelled) throws IOException {
if (cancelled) {
setCancelled(call);
} else {
Channel channel = channelRef.get();
if (channel == null) {
try {
connect();
channelRef.compareAndSet(null, connect());
channel = channelRef.get();
} catch (InterruptedException e) {
LOG.warn("Connection has been interrupted", e);
setCancelled(call);
Expand Down

This file was deleted.

0 comments on commit 4debffc

Please sign in to comment.