Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
) (opensearch-project#2319)"

This reverts commit b3c2e94.
  • Loading branch information
vmmusings committed Nov 13, 2023
1 parent b9e8910 commit 77af17c
Show file tree
Hide file tree
Showing 19 changed files with 169 additions and 1,055 deletions.
5 changes: 2 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel'
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;
import static org.opensearch.sql.spark.execution.session.SessionState.END_STATE;
import static org.opensearch.sql.spark.execution.statement.StatementId.newStatementId;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createSession;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession;

import java.util.Optional;
import lombok.Builder;
Expand All @@ -18,11 +14,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Interactive session.
Expand All @@ -35,8 +27,9 @@ public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final StateStore stateStore;
private final SessionStateStore sessionStateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
Expand All @@ -48,75 +41,21 @@ public void open(CreateSessionRequest createSessionRequest) {
sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
createSession(stateStore).apply(sessionModel);
sessionStateStore.create(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

/** todo. StatementSweeper will delete doc. */
@Override
public void close() {
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}

/** Submit statement. If submit successfully, Statement in waiting state. */
public StatementId submit(QueryRequest request) {
Optional<SessionModel> model = getSession(stateStore).apply(sessionModel.getId());
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
sessionModel = model.get();
if (!END_STATE.contains(sessionModel.getSessionState())) {
StatementId statementId = newStatementId();
Statement st =
Statement.builder()
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementId(statementId)
.langType(LangType.SQL)
.query(request.getQuery())
.queryId(statementId.getId())
.build();
st.open();
return statementId;
} else {
String errMsg =
String.format(
"can't submit statement, session should not be in end state, "
+ "current session state is: %s",
sessionModel.getSessionState().getSessionState());
LOG.debug(errMsg);
throw new IllegalStateException(errMsg);
}
}
}

@Override
public Optional<Statement> get(StatementId stID) {
return StateStore.getStatement(stateStore)
.apply(stID.getId())
.map(
model ->
Statement.builder()
.sessionId(sessionId)
.applicationId(model.getApplicationId())
.jobId(model.getJobId())
.statementId(model.getStatementId())
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementModel(model)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@

package org.opensearch.sql.spark.execution.session;

import java.util.Optional;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statement.Statement;
import org.opensearch.sql.spark.execution.statement.StatementId;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
Expand All @@ -18,22 +13,6 @@ public interface Session {
/** close session. */
void close();

/**
* submit {@link QueryRequest}.
*
* @param request {@link QueryRequest}
* @return {@link StatementId}
*/
StatementId submit(QueryRequest request);

/**
* get {@link Statement}.
*
* @param stID {@link StatementId}
* @return {@link Statement}
*/
Optional<Statement> get(StatementId stID);

SessionModel getSessionModel();

SessionId getSessionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Singleton Class
Expand All @@ -19,27 +19,27 @@
*/
@RequiredArgsConstructor
public class SessionManager {
private final StateStore stateStore;
private final SessionStateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.stateStore(stateStore)
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = StateStore.getSession(stateStore).apply(sid.getSessionId());
Optional<SessionModel> model = stateStore.get(sid);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.stateStore(stateStore)
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel extends StateModel {
public class SessionModel implements ToXContentObject {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
Expand Down Expand Up @@ -73,27 +73,6 @@ public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

public static SessionModel copyWithState(
SessionModel copy, SessionState state, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(state)
.datasourceName(copy.datasourceName)
.applicationId(copy.getApplicationId())
.jobId(copy.jobId)
.error(UNKNOWN)
.lastUpdateTime(copy.getLastUpdateTime())
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
Expand Down Expand Up @@ -161,9 +140,4 @@ public static SessionModel initInteractiveSession(
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}

@Override
public String getId() {
return sessionId.getSessionId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.opensearch.sql.spark.execution.session;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
Expand All @@ -19,8 +17,6 @@ public enum SessionState {
DEAD("dead"),
FAIL("fail");

public static List<SessionState> END_STATE = ImmutableList.of(DEAD, FAIL);

private final String sessionState;

SessionState(String sessionState) {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 77af17c

Please sign in to comment.