Skip to content

Commit

Permalink
temp snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 24, 2023
1 parent 877c9c3 commit b7788a1
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService(
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClient, pluginSettings));
new SessionManager(stateStore, emrServerlessClient, pluginSettings),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SESSION_CLASS_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createFlintIndexState;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -25,8 +25,6 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand All @@ -45,8 +43,13 @@
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statement.QueryRequest;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.SQLQueryUtils;
Expand Down Expand Up @@ -76,6 +79,8 @@ public class SparkQueryDispatcher {

private SessionManager sessionManager;

private StateStore stateStore;

public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
return handleSQLQuery(dispatchQueryRequest);
Expand Down Expand Up @@ -162,6 +167,9 @@ private DispatchQueryResponse handleIndexQuery(
tags,
indexDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());

createFlintIndexState(stateStore, dispatchQueryRequest.getDatasource()).apply(new FlintIndexStateModel());

String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
Expand Down Expand Up @@ -250,22 +258,22 @@ private DispatchQueryResponse handleDropIndexQuery(
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
if (indexMetadata.isAutoRefresh()) {
emrServerlessClient.cancelJobRun(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
if (!response.isAcknowledged()) {
LOG.error("failed to delete index");
}
status = JobRunState.SUCCESS.toString();
} catch (InterruptedException | ExecutionException e) {
LOG.error("failed to delete index");
}
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, dispatchQueryRequest.getDatasource(),
emrServerlessClient,
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
jobCancelOp.apply(indexMetadata);
} catch (Exception e) {
LOG.error(e);
}
try {
FlintIndexOp indexDeleteOp =
new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource(),
client, indexDetails);
indexDeleteOp.apply(indexMetadata);
status = JobRunState.SUCCESS.toString();
} catch (Exception e) {
LOG.error(e);
}
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import org.opensearch.core.xcontent.XContentParser;

public abstract class StateModel implements ToXContentObject {
public static final String VERSION_1_0 = "1.0";
public static final String TYPE = "type";
public static final String STATE = "state";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";

public abstract String getId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.sql.spark.execution.session.SessionState;
import org.opensearch.sql.spark.execution.statement.StatementModel;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;

/**
* State Store maintain the state of Session and Statement. State State create/update/get doc on
Expand Down Expand Up @@ -253,4 +255,28 @@ public static Function<String, Optional<AsyncQueryJobMetadata>> getJobMetaData(
AsyncQueryJobMetadata::fromXContent,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

public static BiFunction<FlintIndexStateModel, FlintIndexState, FlintIndexStateModel> updateFlintIndexState(
StateStore stateStore, String datasourceName) {
return (old, state) ->
stateStore.updateState(
old,
state,
FlintIndexStateModel::copyWithState,
DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

public static Function<String, Optional<FlintIndexStateModel>> getFlintIndexState(
StateStore stateStore, String datasourceName) {
return (docId) ->
stateStore.get(
docId, FlintIndexStateModel::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}

public static Function<FlintIndexStateModel, FlintIndexStateModel> createFlintIndexState(
StateStore stateStore, String datasourceName) {
return (st) ->
stateStore.create(
st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import lombok.Data;

@Data
Expand All @@ -19,8 +20,13 @@ public class FlintIndexMetadata {
public static final String AUTO_REFRESH = "auto_refresh";
public static final String AUTO_REFRESH_DEFAULT = "false";

public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID";
public static final String FLINT_INDEX_STATE_DOC_ID = "FLINT_INDEX_STATE_DOC_ID";

private final String jobId;
private final boolean autoRefresh;
private final String appId;
private final String flintIndexStateDocId;

public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
Map<String, Object> propertiesMap = (Map<String, Object>) metaMap.get(PROPERTIES_KEY);
Expand All @@ -32,6 +38,12 @@ public static FlintIndexMetadata fromMetatdata(Map<String, Object> metaMap) {
!((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT))
.toLowerCase(Locale.ROOT)
.equalsIgnoreCase(AUTO_REFRESH_DEFAULT);
return new FlintIndexMetadata(jobId, autoRefresh);
String appId = (String) envMap.getOrDefault(APP_ID, null);
String flintIndexStateDocId = (String) envMap.getOrDefault(APP_ID, null);
return new FlintIndexMetadata(jobId, autoRefresh, appId, flintIndexStateDocId);
}

public Optional<String> getFlintIndexStateDocId() {
return Optional.of(flintIndexStateDocId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;

/**
* Flint index state.
*/
@Getter
public enum FlintIndexState {
// stable state
EMPTY("empty"),
// transitioning state
CREATING("creating"),
// transitioning state
REFRESHING("refreshing"),
// transitioning state
CANCELLING("cancelling"),
// stable state
ACTIVE("active"),
// transitioning state
DELETING("deleting"),
// stable state
DELETED("deleted");

private final String state;

FlintIndexState(String state) {
this.state = state;
}

private static Map<String, FlintIndexState> STATES =
Arrays.stream(FlintIndexState.values())
.collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t));

public static FlintIndexState fromString(String key) {
for (FlintIndexState ss : FlintIndexState.values()) {
if (ss.getState().toLowerCase(Locale.ROOT).equals(key)) {
return ss;
}
}
throw new IllegalArgumentException("Invalid flint index state: " + key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import static org.opensearch.sql.spark.execution.session.SessionModel.APPLICATION_ID;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.session.SessionModel.JOB_ID;
import static org.opensearch.sql.spark.execution.statement.StatementModel.ERROR;
import static org.opensearch.sql.spark.execution.statement.StatementModel.VERSION;

import java.io.IOException;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/**
* Flint Index Model maintain the index state.
*/
@Getter
@Builder
@EqualsAndHashCode(callSuper = false)
public class FlintIndexStateModel extends StateModel {
public static final String FLINT_INDEX_DOC_TYPE = "flintindexstate";
public static final String DOC_ID_PREFIX = "flint";

private final FlintIndexState indexState;
private final String applicationId;
private final String jobId;
private final String datasourceName;
private final long lastUpdateTime;
private final String error;

@EqualsAndHashCode.Exclude
private final long seqNo;
@EqualsAndHashCode.Exclude
private final long primaryTerm;

public FlintIndexStateModel(FlintIndexState indexState, String applicationId,
String jobId, String datasourceName, long lastUpdateTime, String error,
long seqNo,
long primaryTerm) {
this.indexState = indexState;
this.applicationId = applicationId;
this.jobId = jobId;
this.datasourceName = datasourceName;
this.lastUpdateTime = lastUpdateTime;
this.error = error;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public static FlintIndexStateModel copy(FlintIndexStateModel copy, long seqNo, long primaryTerm) {
return new FlintIndexStateModel(copy.indexState, copy.applicationId, copy.jobId,
copy.datasourceName, copy.lastUpdateTime, copy.error, seqNo, primaryTerm);
}

public static FlintIndexStateModel copyWithState(
FlintIndexStateModel copy, FlintIndexState state, long seqNo, long primaryTerm) {
return new FlintIndexStateModel(state, copy.applicationId, copy.jobId,
copy.datasourceName, copy.lastUpdateTime, copy.error, seqNo, primaryTerm);
}

@SneakyThrows
public static FlintIndexStateModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
FlintIndexStateModelBuilder builder = FlintIndexStateModel.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
case TYPE:
// do nothing
break;
case STATE:
builder.indexState(FlintIndexState.fromString(parser.text()));
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case ERROR:
builder.error(parser.text());
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

@Override
public String getId() {
return DOC_ID_PREFIX + jobId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, VERSION_1_0)
.field(TYPE, FLINT_INDEX_DOC_TYPE)
.field(STATE, indexState.getState())
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(DATASOURCE_NAME, datasourceName)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}
}
Loading

0 comments on commit b7788a1

Please sign in to comment.