Skip to content

Commit

Permalink
add async-query-core module
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Apr 18, 2024
1 parent 5978512 commit ef8307b
Show file tree
Hide file tree
Showing 29 changed files with 192 additions and 200 deletions.
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ include 'prometheus'
include 'benchmarks'
include 'datasources'
include 'spark'

include 'async-query-core'
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ ELSE: 'ELSE';
END: 'END';
ESCAPE: 'ESCAPE';
ESCAPED: 'ESCAPED';
EVOLUTION: 'EVOLUTION';
EXCEPT: 'EXCEPT';
EXCHANGE: 'EXCHANGE';
EXCLUDE: 'EXCLUDE';
Expand Down
4 changes: 3 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ dmlStatementNoWith
| fromClause multiInsertQueryBody+ #multiInsertQuery
| DELETE FROM identifierReference tableAlias whereClause? #deleteFromTable
| UPDATE identifierReference tableAlias setClause whereClause? #updateTable
| MERGE INTO target=identifierReference targetAlias=tableAlias
| MERGE (WITH SCHEMA EVOLUTION)? INTO target=identifierReference targetAlias=tableAlias
USING (source=identifierReference |
LEFT_PAREN sourceQuery=query RIGHT_PAREN) sourceAlias=tableAlias
ON mergeCondition=booleanExpression
Expand Down Expand Up @@ -1399,6 +1399,7 @@ ansiNonReserved
| DOUBLE
| DROP
| ESCAPED
| EVOLUTION
| EXCHANGE
| EXCLUDE
| EXISTS
Expand Down Expand Up @@ -1715,6 +1716,7 @@ nonReserved
| END
| ESCAPE
| ESCAPED
| EVOLUTION
| EXCHANGE
| EXCLUDE
| EXECUTE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class AsyncQueryJobMetadata extends StateModel {
private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
private final String resultLocation;
// optional sessionId.
private final String sessionId;
// since 2.13
Expand All @@ -41,12 +41,12 @@ public class AsyncQueryJobMetadata extends StateModel {
@EqualsAndHashCode.Exclude private final long primaryTerm;

public AsyncQueryJobMetadata(
AsyncQueryId queryId, String applicationId, String jobId, String resultIndex) {
AsyncQueryId queryId, String applicationId, String jobId, String resultLocation) {
this(
queryId,
applicationId,
jobId,
resultIndex,
resultLocation,
null,
null,
JobType.INTERACTIVE,
Expand All @@ -59,13 +59,13 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String resultLocation,
String sessionId) {
this(
queryId,
applicationId,
jobId,
resultIndex,
resultLocation,
sessionId,
null,
JobType.INTERACTIVE,
Expand All @@ -78,7 +78,7 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String resultLocation,
String sessionId,
String datasourceName,
JobType jobType,
Expand All @@ -87,7 +87,7 @@ public AsyncQueryJobMetadata(
queryId,
applicationId,
jobId,
resultIndex,
resultLocation,
sessionId,
datasourceName,
jobType,
Expand All @@ -100,7 +100,7 @@ public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String resultLocation,
String sessionId,
String datasourceName,
JobType jobType,
Expand All @@ -110,7 +110,7 @@ public AsyncQueryJobMetadata(
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.resultLocation = resultLocation;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
Expand All @@ -131,7 +131,7 @@ public static AsyncQueryJobMetadata copy(
copy.getQueryId(),
copy.getApplicationId(),
copy.getJobId(),
copy.getResultIndex(),
copy.getResultLocation(),
copy.getSessionId(),
copy.datasourceName,
copy.jobType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
// either empty json when the result is not available or data with status
// Fetch from Result Index
return jobExecutionResponseReader.getResultFromOpensearchIndex(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
return jobExecutionResponseReader.getResultWithJobId(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultLocation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand Down Expand Up @@ -54,8 +53,6 @@ public class IndexDMLHandler extends AsyncQueryHandler {
private final FlintIndexStateModelService flintIndexStateModelService;
private final IndexDMLResultStorageService indexDMLResultStorageService;

private final Client client;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}
Expand Down Expand Up @@ -137,7 +134,9 @@ private void executeIndexOp(
case VACUUM:
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(
flintIndexStateModelService, dispatchQueryRequest.getDatasource(), client);
flintIndexStateModelService,
dispatchQueryRequest.getDatasource(),
flintIndexMetadataService);
indexVacuumOp.apply(indexMetadata);
break;
default:
Expand All @@ -163,7 +162,7 @@ private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails)
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
queryId, asyncQueryJobMetadata.getResultLocation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String queryId = asyncQueryJobMetadata.getQueryId().getId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
queryId, asyncQueryJobMetadata.getResultLocation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Map;
import lombok.AllArgsConstructor;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
Expand Down Expand Up @@ -48,8 +47,6 @@ public class SparkQueryDispatcher {

private FlintIndexMetadataService flintIndexMetadataService;

private Client client;

private SessionManager sessionManager;

private LeaseManager leaseManager;
Expand Down Expand Up @@ -166,8 +163,7 @@ private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessC
jobExecutionResponseReader,
flintIndexMetadataService,
flintIndexStateModelService,
indexDMLResultStorageService,
client);
indexDMLResultStorageService);
}

// TODO: Revisit this logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.sql.spark.execution.statement.StatementId;
import org.opensearch.sql.spark.execution.statement.StatementStorageService;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.utils.TimeProvider;

Expand All @@ -41,7 +40,6 @@ public class InteractiveSession implements Session {
public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final StatementStorageService statementStorageService;
private final SessionStorageService sessionStorageService;
private final EMRServerlessClient serverlessClient;
Expand Down Expand Up @@ -103,7 +101,6 @@ public StatementId submit(QueryRequest request) {
.sessionId(sessionId)
.applicationId(sessionModel.getApplicationId())
.jobId(sessionModel.getJobId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementId(statementId)
.langType(LangType.SQL)
Expand Down Expand Up @@ -139,7 +136,6 @@ public Optional<Statement> get(StatementId stID) {
.langType(model.getLangType())
.query(model.getQuery())
.queryId(model.getQueryId())
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.statementModel(model)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statement.StatementStorageService;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.utils.RealTimeProvider;

/**
Expand All @@ -22,20 +21,16 @@
* <p>todo. add Session cache and Session sweeper.
*/
public class SessionManager {
private final StateStore stateStore;
private final StatementStorageService statementStorageService;

private final SessionStorageService sessionStorageService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private Settings settings;

public SessionManager(
StateStore stateStore,
StatementStorageService statementStorageService,
SessionStorageService sessionStorageService,
EMRServerlessClientFactory emrServerlessClientFactory,
Settings settings) {
this.stateStore = stateStore;
this.statementStorageService = statementStorageService;
this.sessionStorageService = sessionStorageService;
this.emrServerlessClientFactory = emrServerlessClientFactory;
Expand All @@ -46,7 +41,6 @@ public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId(request.getDatasourceName()))
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.sessionStorageService(sessionStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
Expand Down Expand Up @@ -80,7 +74,6 @@ public Optional<Session> getSession(SessionId sid, String dataSourceName) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.stateStore(stateStore)
.statementStorageService(statementStorageService)
.sessionStorageService(sessionStorageService)
.serverlessClient(emrServerlessClientFactory.getClient())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.rest.model.LangType;

/** Statement represent query to execute in session. One statement map to one session. */
Expand All @@ -32,7 +31,6 @@ public class Statement {
private final String datasourceName;
private final String query;
private final String queryId;
private final StateStore stateStore;
private final StatementStorageService statementStorageService;

@Setter private StatementModel statementModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public XContentBuilder toXContent(AsyncQueryJobMetadata jobMetadata, ToXContent.
.field("type", TYPE_JOBMETA)
.field("jobId", jobMetadata.getJobId())
.field("applicationId", jobMetadata.getApplicationId())
.field("resultIndex", jobMetadata.getResultIndex())
.field("resultIndex", jobMetadata.getResultLocation())
.field("sessionId", jobMetadata.getSessionId())
.field(DATASOURCE_NAME, jobMetadata.getDatasourceName())
.field(JOB_TYPE, jobMetadata.getJobType().getText().toLowerCase(Locale.ROOT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ public interface FlintIndexMetadataService {
* @param flintIndexOptions flintIndexOptions.
*/
void updateIndexToManualRefresh(String indexName, FlintIndexOptions flintIndexOptions);

/**
* Deletes FlintIndex.
*
* @param indexName indexName.
*/
void deleteFlintIndex(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;

Expand Down Expand Up @@ -87,6 +89,14 @@ public void updateIndexToManualRefresh(String indexName, FlintIndexOptions flint
client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get();
}

@Override
public void deleteFlintIndex(String indexName) {
LOGGER.info("Vacuuming Flint index {}", indexName);
DeleteIndexRequest request = new DeleteIndexRequest().indices(indexName);
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOGGER.info("OpenSearch index delete result: {}", response.isAcknowledged());
}

private void validateFlintIndexOptions(
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
if ((newOptions.containsKey(INCREMENTAL_REFRESH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
Expand All @@ -21,14 +19,14 @@ public class FlintIndexOpVacuum extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

/** OpenSearch client. */
private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
Client client) {
FlintIndexMetadataService flintIndexMetadataService) {
super(flintIndexStateModelService, datasourceName);
this.client = client;
this.flintIndexMetadataService = flintIndexMetadataService;
}

@Override
Expand All @@ -43,11 +41,7 @@ FlintIndexState transitioningState() {

@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
flintIndexMetadataService.deleteFlintIndex(flintIndexMetadata.getOpensearchIndexName());
}

@Override
Expand Down
Loading

0 comments on commit ef8307b

Please sign in to comment.