From 4d59a5004a8af5329ac645a496d2b3a62c58c908 Mon Sep 17 00:00:00 2001 From: papilonwang Date: Thu, 20 Jun 2024 11:00:54 +0800 Subject: [PATCH] [LIVY-1001][RSC] ContextLauncher has a potential NullPointerException exception during the livy session creating process. --- .../org/apache/livy/rsc/RSCClientFactory.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java index d9d56fb62..3decf3576 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java @@ -25,6 +25,9 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.livy.LivyClient; import org.apache.livy.LivyClientFactory; import org.apache.livy.rsc.rpc.RpcServer; @@ -33,6 +36,7 @@ * Factory for RSC clients. */ public final class RSCClientFactory implements LivyClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(RSCClientFactory.class); private final AtomicInteger refCount = new AtomicInteger(); private RpcServer server = null; @@ -87,27 +91,36 @@ RpcServer getServer() { } private synchronized void ref(RSCConf config) throws IOException { - if (refCount.get() != 0) { + if (refCount.get() > 0) { refCount.incrementAndGet(); return; } - Utils.checkState(server == null, "Server already running but ref count is 0."); + Utils.checkState(server == null, + String.format("Server already running but ref count is %s.", refCount.get())); if (server == null) { try { server = new RpcServer(config); + refCount.incrementAndGet(); } catch (InterruptedException ie) { throw Utils.propagate(ie); } } - - refCount.incrementAndGet(); } synchronized void unref() { - if (refCount.decrementAndGet() == 0) { - server.close(); - server = null; + if (refCount.decrementAndGet() <= 0) { + LOG.info("Un reference rpc server {} refCount {}", server, refCount.get()); + try { + if (server != null) { + server.close(); + } + } catch (Exception e) { + LOG.error("Un reference rpc server exception", e); + } finally { + server = null; + refCount.set(0); + } } }