diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index 1c7fd35c5e..cbb5779699 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -7,19 +7,9 @@ package org.opensearch.sql.spark.asyncquery.model; -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; - import com.google.gson.Gson; -import java.io.IOException; -import java.util.Locale; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.SneakyThrows; -import org.opensearch.core.common.Strings; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -134,29 +124,6 @@ public String toString() { return new Gson().toJson(this); } - /** - * Converts JobMetadata to XContentBuilder. - * - * @return XContentBuilder {@link XContentBuilder} - * @throws Exception Exception. - */ - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId.getId()) - .field("type", TYPE_JOBMETA) - .field("jobId", jobId) - .field("applicationId", applicationId) - .field("resultIndex", resultIndex) - .field("sessionId", sessionId) - .field(DATASOURCE_NAME, datasourceName) - .field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT)) - .field(INDEX_NAME, indexName) - .endObject(); - return builder; - } - /** copy builder. update seqNo and primaryTerm */ public static AsyncQueryJobMetadata copy( AsyncQueryJobMetadata copy, long seqNo, long primaryTerm) { @@ -173,72 +140,6 @@ public static AsyncQueryJobMetadata copy( primaryTerm); } - /** - * Convert xcontent parser to JobMetadata. - * - * @param parser parser. - * @return JobMetadata {@link AsyncQueryJobMetadata} - * @throws IOException IOException. - */ - @SneakyThrows - public static AsyncQueryJobMetadata fromXContent( - XContentParser parser, long seqNo, long primaryTerm) { - AsyncQueryId queryId = null; - String jobId = null; - String applicationId = null; - String resultIndex = null; - String sessionId = null; - String datasourceName = null; - String jobTypeStr = null; - String indexName = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case QUERY_ID: - queryId = new AsyncQueryId(parser.textOrNull()); - break; - case "jobId": - jobId = parser.textOrNull(); - break; - case "applicationId": - applicationId = parser.textOrNull(); - break; - case "resultIndex": - resultIndex = parser.textOrNull(); - break; - case "sessionId": - sessionId = parser.textOrNull(); - break; - case DATASOURCE_NAME: - datasourceName = parser.textOrNull(); - case JOB_TYPE: - jobTypeStr = parser.textOrNull(); - case INDEX_NAME: - indexName = parser.textOrNull(); - case "type": - break; - default: - throw new IllegalArgumentException("Unknown field: " + fieldName); - } - } - if (jobId == null || applicationId == null) { - throw new IllegalArgumentException("jobId and applicationId are required fields."); - } - return new AsyncQueryJobMetadata( - queryId, - applicationId, - jobId, - resultIndex, - sessionId, - datasourceName, - Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), - indexName, - seqNo, - primaryTerm); - } - @Override public String getId() { return queryId.docId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java index b01ecf55ba..8fe0eecc9d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDMLResult.java @@ -5,13 +5,8 @@ package org.opensearch.sql.spark.dispatcher.model; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; - -import com.google.common.collect.ImmutableList; -import java.io.IOException; import lombok.Data; import lombok.EqualsAndHashCode; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -19,10 +14,10 @@ @Data @EqualsAndHashCode(callSuper = false) public class IndexDMLResult extends StateModel { - private static final String QUERY_ID = "queryId"; - private static final String QUERY_RUNTIME = "queryRunTime"; - private static final String UPDATE_TIME = "updateTime"; - private static final String DOC_ID_PREFIX = "index"; + public static final String QUERY_ID = "queryId"; + public static final String QUERY_RUNTIME = "queryRunTime"; + public static final String UPDATE_TIME = "updateTime"; + public static final String DOC_ID_PREFIX = "index"; private final String queryId; private final String status; @@ -55,20 +50,4 @@ public long getSeqNo() { public long getPrimaryTerm() { return SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(QUERY_ID, queryId) - .field("status", status) - .field("error", error) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY_RUNTIME, queryRunTime) - .field(UPDATE_TIME, updateTime) - .field("result", ImmutableList.of()) - .field("schema", ImmutableList.of()) - .endObject(); - return builder; - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java index 806cdb083e..db78ddbd9d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java @@ -8,13 +8,8 @@ import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -48,24 +43,6 @@ public class SessionModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, SESSION_DOC_TYPE) - .field(SESSION_TYPE, sessionType.getSessionType()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(SESSION_STATE, sessionState.getSessionState()) - .field(DATASOURCE_NAME, datasourceName) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { return builder() .version(copy.version) @@ -99,52 +76,6 @@ public static SessionModel copyWithState( .build(); } - @SneakyThrows - public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - SessionModelBuilder builder = new SessionModelBuilder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case SESSION_TYPE: - builder.sessionType(SessionType.fromString(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case SESSION_STATE: - builder.sessionState(SessionState.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case ERROR: - builder.error(parser.text()); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LAST_UPDATE_TIME: - builder.lastUpdateTime(parser.longValue()); - break; - case TYPE: - // do nothing. - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static SessionModel initInteractiveSession( String applicationId, String jobId, SessionId sid, String datasourceName) { return builder() diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java index adc147c905..a0eae1a4b9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statement/StatementModel.java @@ -5,18 +5,10 @@ package org.opensearch.sql.spark.execution.statement; -import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; -import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; import static org.opensearch.sql.spark.execution.statement.StatementState.WAITING; -import java.io.IOException; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -55,27 +47,6 @@ public class StatementModel extends StateModel { private final long seqNo; private final long primaryTerm; - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, version) - .field(TYPE, STATEMENT_DOC_TYPE) - .field(STATEMENT_STATE, statementState.getState()) - .field(STATEMENT_ID, statementId.getId()) - .field(SESSION_ID, sessionId.getSessionId()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LANG, langType.getText()) - .field(DATASOURCE_NAME, datasourceName) - .field(QUERY, query) - .field(QUERY_ID, queryId) - .field(SUBMIT_TIME, submitTime) - .field(ERROR, error) - .endObject(); - return builder; - } - public static StatementModel copy(StatementModel copy, long seqNo, long primaryTerm) { return builder() .version("1.0") @@ -115,61 +86,6 @@ public static StatementModel copyWithState( .build(); } - @SneakyThrows - public static StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { - StatementModel.StatementModelBuilder builder = StatementModel.builder(); - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case VERSION: - builder.version(parser.text()); - break; - case TYPE: - // do nothing - break; - case STATEMENT_STATE: - builder.statementState(StatementState.fromString(parser.text())); - break; - case STATEMENT_ID: - builder.statementId(new StatementId(parser.text())); - break; - case SESSION_ID: - builder.sessionId(new SessionId(parser.text())); - break; - case APPLICATION_ID: - builder.applicationId(parser.text()); - break; - case JOB_ID: - builder.jobId(parser.text()); - break; - case LANG: - builder.langType(LangType.fromString(parser.text())); - break; - case DATASOURCE_NAME: - builder.datasourceName(parser.text()); - break; - case QUERY: - builder.query(parser.text()); - break; - case QUERY_ID: - builder.queryId(parser.text()); - break; - case SUBMIT_TIME: - builder.submitTime(parser.longValue()); - break; - case ERROR: - builder.error(parser.text()); - break; - } - } - builder.seqNo(seqNo); - builder.primaryTerm(primaryTerm); - return builder.build(); - } - public static StatementModel submitStatement( SessionId sid, String applicationId, diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java new file mode 100644 index 0000000000..6ae980086d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/CopyBuilder.java @@ -0,0 +1,5 @@ +package org.opensearch.sql.spark.execution.statestore; + +public interface CopyBuilder { + T of(T copy, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java new file mode 100644 index 0000000000..56bc4b6bbc --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/FromXContent.java @@ -0,0 +1,7 @@ +package org.opensearch.sql.spark.execution.statestore; + +import org.opensearch.core.xcontent.XContentParser; + +public interface FromXContent { + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java new file mode 100644 index 0000000000..ff90f02115 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateCopyBuilder.java @@ -0,0 +1,5 @@ +package org.opensearch.sql.spark.execution.statestore; + +public interface StateCopyBuilder { + T of(T copy, S state, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java index fe105cc8e4..71ee27bd91 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateModel.java @@ -5,10 +5,7 @@ package org.opensearch.sql.spark.execution.statestore; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentParser; - -public abstract class StateModel implements ToXContentObject { +public abstract class StateModel { public static final String VERSION_1_0 = "1.0"; public static final String TYPE = "type"; public static final String STATE = "state"; @@ -19,16 +16,4 @@ public abstract class StateModel implements ToXContentObject { public abstract long getSeqNo(); public abstract long getPrimaryTerm(); - - public interface CopyBuilder { - T of(T copy, long seqNo, long primaryTerm); - } - - public interface StateCopyBuilder { - T of(T copy, S state, long seqNo, long primaryTerm); - } - - public interface FromXContent { - T fromXContent(XContentParser parser, long seqNo, long primaryTerm); - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index e50a2837d9..d422e809a1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -55,6 +55,12 @@ import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.execution.xcontent.AsyncQueryJobMetadataXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.IndexDMLResultXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; +import org.opensearch.sql.spark.execution.xcontent.XContentSerializer; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -78,16 +84,17 @@ public class StateStore { private final ClusterService clusterService; @VisibleForTesting - public T create( - T st, StateModel.CopyBuilder builder, String indexName) { + public T create(T st, CopyBuilder builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); } + XContentSerializer serializer = getXContentSerializer(st); IndexRequest indexRequest = new IndexRequest(indexName) .id(st.getId()) - .source(st.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .source( + serializer.toXContent(st, XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .setIfSeqNo(st.getSeqNo()) .setIfPrimaryTerm(st.getPrimaryTerm()) .create(true) @@ -114,7 +121,7 @@ public T create( @VisibleForTesting public Optional get( - String sid, StateModel.FromXContent builder, String indexName) { + String sid, FromXContent builder, String indexName) { try { if (!this.clusterService.state().routingTable().hasIndex(indexName)) { createIndex(indexName); @@ -146,16 +153,18 @@ public Optional get( @VisibleForTesting public T updateState( - T st, S state, StateModel.StateCopyBuilder builder, String indexName) { + T st, S state, StateCopyBuilder builder, String indexName) { try { T model = builder.of(st, state, st.getSeqNo(), st.getPrimaryTerm()); + XContentSerializer serializer = getXContentSerializer(st); UpdateRequest updateRequest = new UpdateRequest() .index(indexName) .id(model.getId()) .setIfSeqNo(model.getSeqNo()) .setIfPrimaryTerm(model.getPrimaryTerm()) - .doc(model.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .doc( + serializer.toXContent(st, XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .fetchSource(true) .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); try (ThreadContext.StoredContext ignored = @@ -260,9 +269,10 @@ public static Function createStatement( public static Function> getStatement( StateStore stateStore, String datasourceName) { + StatementModelXContentSerializer serializer = new StatementModelXContentSerializer(); return (docId) -> stateStore.get( - docId, StatementModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + docId, serializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } public static BiFunction updateStatementState( @@ -284,9 +294,10 @@ public static Function createSession( public static Function> getSession( StateStore stateStore, String datasourceName) { + SessionModelXContentSerializer serializer = new SessionModelXContentSerializer(); return (docId) -> stateStore.get( - docId, SessionModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + docId, serializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } public static BiFunction updateSessionState( @@ -310,10 +321,12 @@ public static Function createJobMe public static Function> getJobMetaData( StateStore stateStore, String datasourceName) { + AsyncQueryJobMetadataXContentSerializer asyncQueryJobMetadataXContentSerializer = + new AsyncQueryJobMetadataXContentSerializer(); return (docId) -> stateStore.get( docId, - AsyncQueryJobMetadata::fromXContent, + asyncQueryJobMetadataXContentSerializer::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } @@ -396,4 +409,22 @@ public static Supplier activeStatementsCount(StateStore stateStore, String StatementState.RUNNING.getState(), StatementState.WAITING.getState()))); } + + @SuppressWarnings("unchecked") + private XContentSerializer getXContentSerializer(T st) { + if (st instanceof StatementModel) { + return (XContentSerializer) new StatementModelXContentSerializer(); + } else if (st instanceof SessionModel) { + return (XContentSerializer) new SessionModelXContentSerializer(); + } else if (st instanceof FlintIndexStateModel) { + return (XContentSerializer) new FlintIndexStateModelXContentSerializer(); + } else if (st instanceof AsyncQueryJobMetadata) { + return (XContentSerializer) new AsyncQueryJobMetadataXContentSerializer(); + } else if (st instanceof IndexDMLResult) { + return (XContentSerializer) new IndexDMLResultXContentSerializer(); + } else { + throw new IllegalArgumentException( + "Unsupported StateModel subclass: " + st.getClass().getSimpleName()); + } + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java new file mode 100644 index 0000000000..d8a1ac2a00 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/AsyncQueryJobMetadataXContentSerializer.java @@ -0,0 +1,100 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.INDEX_NAME; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.JOB_TYPE; +import static org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.TYPE_JOBMETA; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; + +import java.io.IOException; +import java.util.Locale; +import lombok.SneakyThrows; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.dispatcher.model.JobType; + +public class AsyncQueryJobMetadataXContentSerializer + implements XContentSerializer { + @Override + public XContentBuilder toXContent( + AsyncQueryJobMetadata jobMetadata, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder + .startObject() + .field(QUERY_ID, jobMetadata.getQueryId().getId()) + .field("type", TYPE_JOBMETA) + .field("jobId", jobMetadata.getJobId()) + .field("applicationId", jobMetadata.getApplicationId()) + .field("resultIndex", jobMetadata.getResultIndex()) + .field("sessionId", jobMetadata.getSessionId()) + .field(DATASOURCE_NAME, jobMetadata.getDatasourceName()) + .field(JOB_TYPE, jobMetadata.getJobType().getText().toLowerCase(Locale.ROOT)) + .field(INDEX_NAME, jobMetadata.getIndexName()) + .endObject(); + return builder; + } + + @Override + @SneakyThrows + public AsyncQueryJobMetadata fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + AsyncQueryId queryId = null; + String jobId = null; + String applicationId = null; + String resultIndex = null; + String sessionId = null; + String datasourceName = null; + String jobTypeStr = null; + String indexName = null; + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case QUERY_ID: + queryId = new AsyncQueryId(parser.textOrNull()); + break; + case "jobId": + jobId = parser.textOrNull(); + break; + case "applicationId": + applicationId = parser.textOrNull(); + break; + case "resultIndex": + resultIndex = parser.textOrNull(); + break; + case "sessionId": + sessionId = parser.textOrNull(); + break; + case DATASOURCE_NAME: + datasourceName = parser.textOrNull(); + case JOB_TYPE: + jobTypeStr = parser.textOrNull(); + case INDEX_NAME: + indexName = parser.textOrNull(); + case "type": + break; + default: + throw new IllegalArgumentException("Unknown field: " + fieldName); + } + } + if (jobId == null || applicationId == null) { + throw new IllegalArgumentException("jobId and applicationId are required fields."); + } + return new AsyncQueryJobMetadata( + queryId, + applicationId, + jobId, + resultIndex, + sessionId, + datasourceName, + Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr), + indexName, + seqNo, + primaryTerm); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java new file mode 100644 index 0000000000..1d6698a90d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/FlintIndexStateModelXContentSerializer.java @@ -0,0 +1,83 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; +import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.statestore.StateModel.STATE; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; +import static org.opensearch.sql.spark.execution.statestore.StateModel.VERSION_1_0; +import static org.opensearch.sql.spark.flint.FlintIndexStateModel.FLINT_INDEX_DOC_TYPE; +import static org.opensearch.sql.spark.flint.FlintIndexStateModel.LATEST_ID; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class FlintIndexStateModelXContentSerializer + implements XContentSerializer { + @Override + public XContentBuilder toXContent( + FlintIndexStateModel flintIndexStateModel, XContentBuilder builder, ToXContent.Params params) + throws IOException { + // Implement the toXContent logic here + builder + .startObject() + .field(VERSION, VERSION_1_0) + .field(TYPE, FLINT_INDEX_DOC_TYPE) + .field(STATE, flintIndexStateModel.getIndexState().getState()) + .field(APPLICATION_ID, flintIndexStateModel.getApplicationId()) + .field(JOB_ID, flintIndexStateModel.getJobId()) + .field(LATEST_ID, flintIndexStateModel.getLatestId()) + .field(DATASOURCE_NAME, flintIndexStateModel.getDatasourceName()) + .field(LAST_UPDATE_TIME, flintIndexStateModel.getLastUpdateTime()) + .field(ERROR, flintIndexStateModel.getError()) + .endObject(); + return builder; + } + + @Override + @SneakyThrows + public FlintIndexStateModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + FlintIndexStateModel.FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case STATE: + builder.indexState(FlintIndexState.fromString(parser.text())); + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LATEST_ID: + builder.latestId(parser.text()); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java new file mode 100644 index 0000000000..6b7df004c2 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/IndexDMLResultXContentSerializer.java @@ -0,0 +1,38 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.dispatcher.model.IndexDMLResult.QUERY_RUNTIME; +import static org.opensearch.sql.spark.dispatcher.model.IndexDMLResult.UPDATE_TIME; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +public class IndexDMLResultXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent( + IndexDMLResult dmlResult, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder + .startObject() + .field(QUERY_ID, dmlResult.getQueryId()) + .field("status", dmlResult.getStatus()) + .field("error", dmlResult.getError()) + .field(DATASOURCE_NAME, dmlResult.getDatasourceName()) + .field(QUERY_RUNTIME, dmlResult.getQueryRunTime()) + .field(UPDATE_TIME, dmlResult.getUpdateTime()) + .field("result", ImmutableList.of()) + .field("schema", ImmutableList.of()) + .endObject(); + return builder; + } + + @Override + public IndexDMLResult fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + throw new UnsupportedOperationException("IndexDMLResult to fromXContent Not supported"); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java new file mode 100644 index 0000000000..c255f0d944 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/SessionModelXContentSerializer.java @@ -0,0 +1,94 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.ERROR; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_STATE; +import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_TYPE; +import static org.opensearch.sql.spark.execution.session.SessionModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; + +public class SessionModelXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent( + SessionModel sessionModel, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder + .startObject() + .field(VERSION, sessionModel.getVersion()) + .field(TYPE, SESSION_DOC_TYPE) + .field(SESSION_TYPE, sessionModel.getSessionType().getSessionType()) + .field(SESSION_ID, sessionModel.getSessionId().getSessionId()) + .field(SESSION_STATE, sessionModel.getSessionState().getSessionState()) + .field(DATASOURCE_NAME, sessionModel.getDatasourceName()) + .field(APPLICATION_ID, sessionModel.getApplicationId()) + .field(JOB_ID, sessionModel.getJobId()) + .field(LAST_UPDATE_TIME, sessionModel.getLastUpdateTime()) + .field(ERROR, sessionModel.getError()) + .endObject(); + return builder; + } + + @Override + @SneakyThrows + public SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + // Implement the fromXContent logic here + SessionModel.SessionModelBuilder builder = SessionModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case SESSION_TYPE: + builder.sessionType(SessionType.fromString(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case SESSION_STATE: + builder.sessionState(SessionState.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case ERROR: + builder.error(parser.text()); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case TYPE: + // do nothing. + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java new file mode 100644 index 0000000000..1d0cf1b9d9 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/StatementModelXContentSerializer.java @@ -0,0 +1,109 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID; +import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; +import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; +import static org.opensearch.sql.spark.execution.statement.StatementModel.LANG; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY; +import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_ID; +import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_STATE; +import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; +import static org.opensearch.sql.spark.execution.statestore.StateModel.TYPE; + +import java.io.IOException; +import lombok.SneakyThrows; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.statement.StatementId; +import org.opensearch.sql.spark.execution.statement.StatementModel; +import org.opensearch.sql.spark.execution.statement.StatementState; +import org.opensearch.sql.spark.rest.model.LangType; + +public class StatementModelXContentSerializer implements XContentSerializer { + @Override + public XContentBuilder toXContent( + StatementModel statementModel, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder + .startObject() + .field(VERSION, statementModel.getVersion()) + .field(TYPE, STATEMENT_DOC_TYPE) + .field(STATEMENT_STATE, statementModel.getStatementState().getState()) + .field(STATEMENT_ID, statementModel.getStatementId().getId()) + .field(SESSION_ID, statementModel.getSessionId().getSessionId()) + .field(APPLICATION_ID, statementModel.getApplicationId()) + .field(JOB_ID, statementModel.getJobId()) + .field(LANG, statementModel.getLangType().getText()) + .field(DATASOURCE_NAME, statementModel.getDatasourceName()) + .field(QUERY, statementModel.getQuery()) + .field(QUERY_ID, statementModel.getQueryId()) + .field(SUBMIT_TIME, statementModel.getSubmitTime()) + .field(ERROR, statementModel.getError()) + .endObject(); + return builder; + } + + @Override + @SneakyThrows + public StatementModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + StatementModel.StatementModelBuilder builder = StatementModel.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case TYPE: + // do nothing + break; + case STATEMENT_STATE: + builder.statementState(StatementState.fromString(parser.text())); + break; + case STATEMENT_ID: + builder.statementId(new StatementId(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LANG: + builder.langType(LangType.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case QUERY: + builder.query(parser.text()); + break; + case QUERY_ID: + builder.queryId(parser.text()); + break; + case SUBMIT_TIME: + builder.submitTime(parser.longValue()); + break; + case ERROR: + builder.error(parser.text()); + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java new file mode 100644 index 0000000000..e9897b270a --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/xcontent/XContentSerializer.java @@ -0,0 +1,14 @@ +package org.opensearch.sql.spark.execution.xcontent; + +import java.io.IOException; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +public interface XContentSerializer { + + XContentBuilder toXContent(T object, XContentBuilder builder, ToXContent.Params params) + throws IOException; + + T fromXContent(XContentParser parser, long seqNo, long primaryTerm); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java index bb73f439a2..3a13805c0a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModel.java @@ -9,14 +9,11 @@ import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME; import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR; -import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION; -import java.io.IOException; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.SneakyThrows; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.sql.spark.execution.statestore.StateModel; @@ -130,21 +127,4 @@ public static FlintIndexStateModel fromXContent( public String getId() { return latestId; } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder - .startObject() - .field(VERSION, VERSION_1_0) - .field(TYPE, FLINT_INDEX_DOC_TYPE) - .field(STATE, indexState.getState()) - .field(APPLICATION_ID, applicationId) - .field(JOB_ID, jobId) - .field(LATEST_ID, latestId) - .field(DATASOURCE_NAME, datasourceName) - .field(LAST_UPDATE_TIME, lastUpdateTime) - .field(ERROR, error) - .endObject(); - return builder; - } }