Skip to content

Commit

Permalink
[Backport 2.x] Make models free of XContent (#2677) (#2684)
Browse files Browse the repository at this point in the history
* Make models free of XContent

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit afec315)

* Add comments

Signed-off-by: Tomoyuki Morita <[email protected]>
(cherry picked from commit 2c849f7)
  • Loading branch information
ykmr1224 authored May 20, 2024
1 parent af74fe2 commit fa6d824
Show file tree
Hide file tree
Showing 35 changed files with 1,221 additions and 472 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@

package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;
Expand All @@ -28,10 +18,6 @@
@Data
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
public static final String TYPE_JOBMETA = "jobmeta";
public static final String JOB_TYPE = "jobType";
public static final String INDEX_NAME = "indexName";

private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
Expand Down Expand Up @@ -134,29 +120,6 @@ public String toString() {
return new Gson().toJson(this);
}

/**
* Converts JobMetadata to XContentBuilder.
*
* @return XContentBuilder {@link XContentBuilder}
* @throws Exception Exception.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId.getId())
.field("type", TYPE_JOBMETA)
.field("jobId", jobId)
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}

/** copy builder. update seqNo and primaryTerm */
public static AsyncQueryJobMetadata copy(
AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) {
Expand All @@ -173,72 +136,6 @@ public static AsyncQueryJobMetadata copy(
primaryTerm);
}

/**
* Convert xcontent parser to JobMetadata.
*
* @param parser parser.
* @return JobMetadata {@link AsyncQueryJobMetadata}
* @throws IOException IOException.
*/
@SneakyThrows
public static AsyncQueryJobMetadata fromXContent(
XContentParser parser, long seqNo, long primaryTerm) {
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case QUERY_ID:
queryId = new AsyncQueryId(parser.textOrNull());
break;
case "jobId":
jobId = parser.textOrNull();
break;
case "applicationId":
applicationId = parser.textOrNull();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (jobId == null || applicationId == null) {
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
public String getId() {
return queryId.docId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.*;
import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;

import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -27,6 +26,7 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;

/** Define Spark Submit Parameters. */
@AllArgsConstructor
Expand Down Expand Up @@ -181,7 +181,7 @@ public Builder extraParameters(String params) {
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_REQUEST_INDEX, OpenSearchStateStoreUtil.getIndexName(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.opensearch.sql.spark.cluster;

import static org.opensearch.sql.spark.execution.session.SessionModel.LAST_UPDATE_TIME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME;
import static org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer.SUBMIT_TIME;
import static org.opensearch.sql.spark.execution.xcontent.XContentCommonAttributes.LAST_UPDATE_TIME;

import java.time.Clock;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,16 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Plugin create Index DML result. */
@Data
@EqualsAndHashCode(callSuper = false)
public class IndexDMLResult extends StateModel {
private static final String QUERY_ID = "queryId";
private static final String QUERY_RUNTIME = "queryRunTime";
private static final String UPDATE_TIME = "updateTime";
private static final String DOC_ID_PREFIX = "index";
public static final String DOC_ID_PREFIX = "index";

private final String queryId;
private final String status;
Expand Down Expand Up @@ -55,20 +47,4 @@ public long getSeqNo() {
public long getPrimaryTerm() {
return SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(QUERY_ID, queryId)
.field("status", status)
.field("error", error)
.field(DATASOURCE_NAME, datasourceName)
.field(QUERY_RUNTIME, queryRunTime)
.field(UPDATE_TIME, updateTime)
.field("result", ImmutableList.of())
.field("schema", ImmutableList.of())
.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,17 @@
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel extends StateModel {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
public static final String SESSION_ID = "sessionId";
public static final String SESSION_STATE = "state";
public static final String DATASOURCE_NAME = "dataSourceName";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String APPLICATION_ID = "applicationId";
public static final String JOB_ID = "jobId";
public static final String ERROR = "error";

public static final String UNKNOWN = "unknown";
public static final String SESSION_DOC_TYPE = "session";

private final String version;
private final SessionType sessionType;
Expand All @@ -48,24 +33,6 @@ public class SessionModel extends StateModel {
private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
Expand Down Expand Up @@ -99,52 +66,6 @@ public static SessionModel copyWithState(
.build();
}

@SneakyThrows
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
Expand Down
Loading

0 comments on commit fa6d824

Please sign in to comment.