diff --git a/api/src/main/java/org/apache/livy/LivyClient.java b/api/src/main/java/org/apache/livy/LivyClient.java index fc03a1f7a..5181fba45 100644 --- a/api/src/main/java/org/apache/livy/LivyClient.java +++ b/api/src/main/java/org/apache/livy/LivyClient.java @@ -107,4 +107,18 @@ public interface LivyClient { */ Future addFile(URI uri); + /** + * Retrieves the session id internally created by Livy. + * + * @return An integer representing the session id of the running session + */ + int getSessionId(); + + /** + * Retrieves the internally generated session app id. + * + * @return A string representing the livy session app id. + */ + String getSessionAppId(); + } diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java b/api/src/test/java/org/apache/livy/TestClientFactory.java index 622908c04..793a87b4f 100644 --- a/api/src/test/java/org/apache/livy/TestClientFactory.java +++ b/api/src/test/java/org/apache/livy/TestClientFactory.java @@ -91,6 +91,11 @@ public Future addFile(URI uri) { throw new UnsupportedOperationException(); } + @Override + public int getSessionId(){throw new UnsupportedOperationException();} + + @Override + public String getSessionAppId(){throw new UnsupportedOperationException();} } } diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..7587d6ec3 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -44,7 +44,7 @@ class HttpClient implements LivyClient { private final HttpConf config; private final LivyConnection conn; - private final int sessionId; + private final SessionInfo session; private final ScheduledExecutorService executor; private final Serializer serializer; @@ -66,8 +66,8 @@ class HttpClient implements LivyClient { m.group(1), uri.getQuery(), uri.getFragment()); this.conn = new LivyConnection(base, httpConf); - this.sessionId = Integer.parseInt(m.group(2)); - conn.post(null, SessionInfo.class, "/%d/connect", sessionId); + int sessionId = Integer.parseInt(m.group(2)); + session = conn.post(null, SessionInfo.class, "/%d/connect", sessionId); } else { Map sessionConf = new HashMap<>(); for (Map.Entry e : config) { @@ -76,7 +76,7 @@ class HttpClient implements LivyClient { ClientMessage create = new CreateClientRequest(sessionConf); this.conn = new LivyConnection(uri, httpConf); - this.sessionId = conn.post(create, SessionInfo.class, "/").id; + this.session = conn.post(create, SessionInfo.class, "/"); } } catch (Exception e) { throw propagate(e); @@ -87,7 +87,7 @@ class HttpClient implements LivyClient { this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "HttpClient-" + sessionId); + Thread t = new Thread(r, "HttpClient-" + session.id); t.setDaemon(true); return t; } @@ -112,7 +112,7 @@ public synchronized void stop(boolean shutdownContext) { executor.shutdownNow(); try { if (shutdownContext) { - conn.delete(Map.class, "/%s", sessionId); + conn.delete(Map.class, "/%s", session.id); } } catch (Exception e) { throw propagate(e); @@ -149,7 +149,7 @@ private Future uploadResource(final File file, final String command, final St Callable task = new Callable() { @Override public Void call() throws Exception { - conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command); + conn.post(file, Void.class, paramName, "/%d/%s", session.id, command); return null; } }; @@ -161,7 +161,7 @@ private Future addResource(final String command, final URI resource) { @Override public Void call() throws Exception { ClientMessage msg = new AddResource(resource.toString()); - conn.post(msg, Void.class, "/%d/%s", sessionId, command); + conn.post(msg, Void.class, "/%d/%s", session.id, command); return null; } }; @@ -170,7 +170,7 @@ public Void call() throws Exception { private JobHandleImpl sendJob(final String command, Job job) { final ByteBuffer serializedJob = serializer.serialize(job); - JobHandleImpl handle = new JobHandleImpl(config, conn, sessionId, executor, serializer); + JobHandleImpl handle = new JobHandleImpl(config, conn, session.id, executor, serializer); handle.start(command, serializedJob); return handle; } @@ -183,9 +183,12 @@ private RuntimeException propagate(Exception cause) { } } - // For testing. - int getSessionId() { - return sessionId; + + public int getSessionId() { + return session.id; } + + public String getSessionAppId() { return session.appId;} + } diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..dcd46ec86 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -181,6 +181,30 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni testJob(false, response = Some(null)) } + withClient("should retrieve session id and sessionAppID") { + var id = client.getSessionId() + assert(id === sessionId) + + var appId = client.getSessionAppId() + assert(Some(appId) === session.appId) + + } + + withClient("should retrieve session id when reconnecting to a session") { + var sid = client.asInstanceOf[HttpClient].getSessionId() + val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + + s"${LivyConnection.SESSIONS_URI}/$sid" + val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build() + newClient.stop(false) + + var id = client.getSessionId() + assert(id === sessionId) + + var appId = client.getSessionAppId() + assert(Some(appId) === session.appId) + + } + withClient("should connect to existing sessions") { var sid = client.asInstanceOf[HttpClient].getSessionId() val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + @@ -256,6 +280,7 @@ private object HttpClientSpec { // Hack warning: keep the session object available so that individual tests can mock // the desired behavior before making requests to the server. var session: InteractiveSession = _ + var sessionId: Int = _ } @@ -272,9 +297,10 @@ private class HttpClientTestBootstrap extends LifeCycle { override protected def createSession(req: HttpServletRequest): InteractiveSession = { val session = mock(classOf[InteractiveSession]) val id = sessionManager.nextId() + val sessionAppId: String = "ASD" when(session.id).thenReturn(id) when(session.name).thenReturn(None) - when(session.appId).thenReturn(None) + when(session.appId).thenReturn(Some(sessionAppId)) when(session.appInfo).thenReturn(AppInfo()) when(session.state).thenReturn(SessionState.Idle) when(session.proxyUser).thenReturn(None) @@ -282,6 +308,7 @@ private class HttpClientTestBootstrap extends LifeCycle { when(session.stop()).thenReturn(Future.successful(())) require(HttpClientSpec.session == null, "Session already created?") HttpClientSpec.session = session + HttpClientSpec.sessionId = id session } } diff --git a/rsc/pom.xml b/rsc/pom.xml index 833ad8366..613de06f6 100644 --- a/rsc/pom.xml +++ b/rsc/pom.xml @@ -121,6 +121,12 @@ slf4j-api provided + + org.apache.livy + livy-api + 0.8.0-incubating-SNAPSHOT + compile + diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index c1c953400..cf7a92739 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -46,7 +46,10 @@ import org.apache.livy.rsc.rpc.Rpc; import org.apache.livy.sessions.SessionState; -import static org.apache.livy.rsc.RSCConf.Entry.*; + + +import static org.apache.livy.rsc.RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT; +import static org.apache.livy.rsc.RSCConf.Entry.RPC_MAX_THREADS; public class RSCClient implements LivyClient { private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class); @@ -426,10 +429,15 @@ private void handle(ChannelHandlerContext ctx, ReplState msg) { LOG.trace("Received repl state for {}", msg.state); // Update last activity timestamp when state change is from busy to idle. if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null && - SessionState.Idle$.MODULE$.state().equals(msg.state)) { + SessionState.Idle$.MODULE$.state().equals(msg.state)) { replLastActivity = System.nanoTime(); } replState = msg.state; } } + @Override + public String getSessionAppId(){throw new UnsupportedOperationException();} + + @Override + public int getSessionId(){throw new UnsupportedOperationException();} }