Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-891] Expose the sessionID via the client APIs [WIP] #348

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,18 @@ public interface LivyClient {
*/
Future<?> addFile(URI uri);

/**
* Retrieves the session id internally created by Livy.
*
* @return An integer representing the session id of the running session
*/
int getSessionId();

/**
* Retrieves the internally generated session app id.
*
* @return A string representing the livy session app id.
*/
String getSessionAppId();

}
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/livy/TestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public Future<?> addFile(URI uri) {
throw new UnsupportedOperationException();
}

@Override
public int getSessionId(){throw new UnsupportedOperationException();}

@Override
public String getSessionAppId(){throw new UnsupportedOperationException();}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class HttpClient implements LivyClient {

private final HttpConf config;
private final LivyConnection conn;
private final int sessionId;
private final SessionInfo session;
private final ScheduledExecutorService executor;
private final Serializer serializer;

Expand All @@ -66,8 +66,8 @@ class HttpClient implements LivyClient {
m.group(1), uri.getQuery(), uri.getFragment());

this.conn = new LivyConnection(base, httpConf);
this.sessionId = Integer.parseInt(m.group(2));
conn.post(null, SessionInfo.class, "/%d/connect", sessionId);
int sessionId = Integer.parseInt(m.group(2));
session = conn.post(null, SessionInfo.class, "/%d/connect", sessionId);
} else {
Map<String, String> sessionConf = new HashMap<>();
for (Map.Entry<String, String> e : config) {
Expand All @@ -76,7 +76,7 @@ class HttpClient implements LivyClient {

ClientMessage create = new CreateClientRequest(sessionConf);
this.conn = new LivyConnection(uri, httpConf);
this.sessionId = conn.post(create, SessionInfo.class, "/").id;
this.session = conn.post(create, SessionInfo.class, "/");
}
} catch (Exception e) {
throw propagate(e);
Expand All @@ -87,7 +87,7 @@ class HttpClient implements LivyClient {
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "HttpClient-" + sessionId);
Thread t = new Thread(r, "HttpClient-" + session.id);
t.setDaemon(true);
return t;
}
Expand All @@ -112,7 +112,7 @@ public synchronized void stop(boolean shutdownContext) {
executor.shutdownNow();
try {
if (shutdownContext) {
conn.delete(Map.class, "/%s", sessionId);
conn.delete(Map.class, "/%s", session.id);
}
} catch (Exception e) {
throw propagate(e);
Expand Down Expand Up @@ -149,7 +149,7 @@ private Future<?> uploadResource(final File file, final String command, final St
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
conn.post(file, Void.class, paramName, "/%d/%s", session.id, command);
return null;
}
};
Expand All @@ -161,7 +161,7 @@ private Future<?> addResource(final String command, final URI resource) {
@Override
public Void call() throws Exception {
ClientMessage msg = new AddResource(resource.toString());
conn.post(msg, Void.class, "/%d/%s", sessionId, command);
conn.post(msg, Void.class, "/%d/%s", session.id, command);
return null;
}
};
Expand All @@ -170,7 +170,7 @@ public Void call() throws Exception {

private <T> JobHandleImpl<T> sendJob(final String command, Job<T> job) {
final ByteBuffer serializedJob = serializer.serialize(job);
JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, sessionId, executor, serializer);
JobHandleImpl<T> handle = new JobHandleImpl<T>(config, conn, session.id, executor, serializer);
handle.start(command, serializedJob);
return handle;
}
Expand All @@ -183,9 +183,12 @@ private RuntimeException propagate(Exception cause) {
}
}

// For testing.
int getSessionId() {
return sessionId;

public int getSessionId() {
return session.id;
}


public String getSessionAppId() { return session.appId;}

}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,30 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
testJob(false, response = Some(null))
}

withClient("should retrieve session id and sessionAppID") {
var id = client.getSessionId()
assert(id === sessionId)

var appId = client.getSessionAppId()
assert(Some(appId) === session.appId)

}

withClient("should retrieve session id when reconnecting to a session") {
var sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
s"${LivyConnection.SESSIONS_URI}/$sid"
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build()
newClient.stop(false)

var id = client.getSessionId()
assert(id === sessionId)

var appId = client.getSessionAppId()
assert(Some(appId) === session.appId)

}

withClient("should connect to existing sessions") {
var sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
Expand Down Expand Up @@ -256,6 +280,7 @@ private object HttpClientSpec {
// Hack warning: keep the session object available so that individual tests can mock
// the desired behavior before making requests to the server.
var session: InteractiveSession = _
var sessionId: Int = _

}

Expand All @@ -272,16 +297,18 @@ private class HttpClientTestBootstrap extends LifeCycle {
override protected def createSession(req: HttpServletRequest): InteractiveSession = {
val session = mock(classOf[InteractiveSession])
val id = sessionManager.nextId()
val sessionAppId: String = "ASD"
when(session.id).thenReturn(id)
when(session.name).thenReturn(None)
when(session.appId).thenReturn(None)
when(session.appId).thenReturn(Some(sessionAppId))
when(session.appInfo).thenReturn(AppInfo())
when(session.state).thenReturn(SessionState.Idle)
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark)
when(session.stop()).thenReturn(Future.successful(()))
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
HttpClientSpec.sessionId = id
session
}
}
Expand Down
6 changes: 6 additions & 0 deletions rsc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-api</artifactId>
<version>0.8.0-incubating-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
12 changes: 10 additions & 2 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@
import org.apache.livy.rsc.rpc.Rpc;
import org.apache.livy.sessions.SessionState;

import static org.apache.livy.rsc.RSCConf.Entry.*;


import static org.apache.livy.rsc.RSCConf.Entry.CLIENT_SHUTDOWN_TIMEOUT;
import static org.apache.livy.rsc.RSCConf.Entry.RPC_MAX_THREADS;

public class RSCClient implements LivyClient {
private static final Logger LOG = LoggerFactory.getLogger(RSCClient.class);
Expand Down Expand Up @@ -426,10 +429,15 @@ private void handle(ChannelHandlerContext ctx, ReplState msg) {
LOG.trace("Received repl state for {}", msg.state);
// Update last activity timestamp when state change is from busy to idle.
if (SessionState.Busy$.MODULE$.state().equals(replState) && msg != null &&
SessionState.Idle$.MODULE$.state().equals(msg.state)) {
SessionState.Idle$.MODULE$.state().equals(msg.state)) {
replLastActivity = System.nanoTime();
}
replState = msg.state;
}
}
@Override
public String getSessionAppId(){throw new UnsupportedOperationException();}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this is pointing to the fact that it is an inappropriate extension to the interface.
If there are only 2 implementations of the interface and 1 of them throws exceptions for this methods then I don't think they are appropriate for the interface. What is the story here? Session ID isn't known within RSCClient usage?


@Override
public int getSessionId(){throw new UnsupportedOperationException();}
}