Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Enable PPL lang and add datasource to async query API #2195

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Sample Request::
curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"datasource" : "my_glue",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'
Expand Down
5 changes: 2 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ tasks.register('downloadG4Files', Exec) {

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}
Expand Down
6 changes: 4 additions & 2 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -53,7 +54,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
: CREATE INDEX (IF NOT EXISTS)? indexName
ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
11 changes: 10 additions & 1 deletion spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -174,6 +176,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
new DispatchQueryRequest(
sparkExecutionEngineConfig.getApplicationId(),
createAsyncQueryRequest.getQuery(),
createAsyncQueryRequest.getDatasource(),
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
clusterName.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SparkQueryDispatcher {
public static final String TABLE_TAG_KEY = "table";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";

private EMRServerlessClient EMRServerlessClient;
private EMRServerlessClient emrServerlessClient;

private DataSourceService dataSourceService;

Expand All @@ -57,12 +57,12 @@ public class SparkQueryDispatcher {
private JobExecutionResponseReader jobExecutionResponseReader;

public String dispatch(DispatchQueryRequest dispatchQueryRequest) {
return EMRServerlessClient.startJobRun(getStartJobRequest(dispatchQueryRequest));
return emrServerlessClient.startJobRun(getStartJobRequest(dispatchQueryRequest));
}

// TODO : Fetch from Result Index and then make call to EMR Serverless.
public JSONObject getQueryResponse(String applicationId, String queryId) {
GetJobRunResult getJobRunResult = EMRServerlessClient.getJobRunResult(applicationId, queryId);
GetJobRunResult getJobRunResult = emrServerlessClient.getJobRunResult(applicationId, queryId);
JSONObject result = new JSONObject();
if (getJobRunResult.getJobRun().getState().equals(JobRunState.SUCCESS.toString())) {
result = jobExecutionResponseReader.getResultFromOpensearchIndex(queryId);
Expand All @@ -72,20 +72,23 @@ public JSONObject getQueryResponse(String applicationId, String queryId) {
}

public String cancelJob(String applicationId, String jobId) {
CancelJobRunResult cancelJobRunResult = EMRServerlessClient.cancelJobRun(applicationId, jobId);
CancelJobRunResult cancelJobRunResult = emrServerlessClient.cancelJobRun(applicationId, jobId);
return cancelJobRunResult.getJobRunId();
}

// we currently don't support index queries in PPL language.
// so we are treating all of them as non-index queries which don't require any kind of query
// parsing.
private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryRequest) {
if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) {
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery()))
return getStartJobRequestForIndexRequest(dispatchQueryRequest);
else {
return getStartJobRequestForNonIndexQueries(dispatchQueryRequest);
}
} else {
return getStartJobRequestForNonIndexQueries(dispatchQueryRequest);
}
throw new UnsupportedOperationException(
String.format("UnSupported Lang type:: %s", dispatchQueryRequest.getLangType()));
}

private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) {
Expand Down Expand Up @@ -133,27 +136,17 @@ private String constructSparkParameters(String datasourceName) {
private StartJobRequest getStartJobRequestForNonIndexQueries(
DispatchQueryRequest dispatchQueryRequest) {
StartJobRequest startJobRequest;
FullyQualifiedTableName fullyQualifiedTableName =
SQLQueryUtils.extractFullyQualifiedTableName(dispatchQueryRequest.getQuery());
if (fullyQualifiedTableName.getDatasourceName() == null) {
throw new UnsupportedOperationException("Missing datasource in the query syntax.");
}
dataSourceUserAuthorizationHelper.authorizeDataSource(
this.dataSourceService.getRawDataSourceMetadata(
fullyQualifiedTableName.getDatasourceName()));
String jobName =
dispatchQueryRequest.getClusterName()
+ ":"
+ fullyQualifiedTableName.getFullyQualifiedName();
Map<String, String> tags =
getDefaultTagsForJobSubmission(dispatchQueryRequest, fullyQualifiedTableName);
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()));
String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
constructSparkParameters(fullyQualifiedTableName.getDatasourceName()),
constructSparkParameters(dispatchQueryRequest.getDatasource()),
tags);
return startJobRequest;
}
Expand All @@ -163,46 +156,29 @@ private StartJobRequest getStartJobRequestForIndexRequest(
StartJobRequest startJobRequest;
IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
if (fullyQualifiedTableName.getDatasourceName() == null) {
throw new UnsupportedOperationException("Queries without a datasource are not supported");
}
dataSourceUserAuthorizationHelper.authorizeDataSource(
this.dataSourceService.getRawDataSourceMetadata(
fullyQualifiedTableName.getDatasourceName()));
String jobName =
getJobNameForIndexQuery(dispatchQueryRequest, indexDetails, fullyQualifiedTableName);
Map<String, String> tags =
getDefaultTagsForJobSubmission(dispatchQueryRequest, fullyQualifiedTableName);
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()));
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
constructSparkParameters(fullyQualifiedTableName.getDatasourceName()),
constructSparkParameters(dispatchQueryRequest.getDatasource()),
tags);
return startJobRequest;
}

private static Map<String, String> getDefaultTagsForJobSubmission(
DispatchQueryRequest dispatchQueryRequest, FullyQualifiedTableName fullyQualifiedTableName) {
DispatchQueryRequest dispatchQueryRequest) {
Map<String, String> tags = new HashMap<>();
tags.put(CLUSTER_NAME_TAG_KEY, dispatchQueryRequest.getClusterName());
tags.put(DATASOURCE_TAG_KEY, fullyQualifiedTableName.getDatasourceName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(DATASOURCE_TAG_KEY, dispatchQueryRequest.getDatasource());
return tags;
}

private static String getJobNameForIndexQuery(
DispatchQueryRequest dispatchQueryRequest,
IndexDetails indexDetails,
FullyQualifiedTableName fullyQualifiedTableName) {
return dispatchQueryRequest.getClusterName()
+ ":"
+ fullyQualifiedTableName.getFullyQualifiedName()
+ "."
+ indexDetails.getIndexName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class DispatchQueryRequest {
private final String applicationId;
private final String query;
private final String datasource;
private final LangType langType;
private final String executionRoleARN;
private final String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,43 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.Validate;
import org.opensearch.core.xcontent.XContentParser;

@Data
@AllArgsConstructor
public class CreateAsyncQueryRequest {

private String query;
private String datasource;
private LangType lang;

public CreateAsyncQueryRequest(String query, String datasource, LangType lang) {
this.query = Validate.notNull(query, "Query can't be null");
this.datasource = Validate.notNull(datasource, "Datasource can't be null");
this.lang = Validate.notNull(lang, "lang can't be null");
}

public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser)
throws IOException {
String query = null;
LangType lang = null;
String datasource = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
lang = LangType.fromString(parser.textOrNull());
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
if (lang == null || query == null) {
throw new IllegalArgumentException("lang and query are required fields.");
}
return new CreateAsyncQueryRequest(query, lang);
return new CreateAsyncQueryRequest(query, datasource, lang);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void testCreateAsyncQuery() {
new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings);
CreateAsyncQueryRequest createAsyncQueryRequest =
new CreateAsyncQueryRequest("select * from my_glue.default.http_logs", LangType.SQL);
new CreateAsyncQueryRequest(
"select * from my_glue.default.http_logs", "my_glue", LangType.SQL);
when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG))
.thenReturn(
"{\"applicationId\":\"00fd775baqpu4g0p\",\"executionRoleARN\":\"arn:aws:iam::270824043731:role/emr-job-execution-role\",\"region\":\"eu-west-1\"}");
Expand All @@ -58,6 +59,7 @@ void testCreateAsyncQuery() {
new DispatchQueryRequest(
"00fd775baqpu4g0p",
"select * from my_glue.default.http_logs",
"my_glue",
LangType.SQL,
"arn:aws:iam::270824043731:role/emr-job-execution-role",
TEST_CLUSTER_NAME)))
Expand All @@ -73,6 +75,7 @@ void testCreateAsyncQuery() {
new DispatchQueryRequest(
"00fd775baqpu4g0p",
"select * from my_glue.default.http_logs",
"my_glue",
LangType.SQL,
"arn:aws:iam::270824043731:role/emr-job-execution-role",
TEST_CLUSTER_NAME));
Expand Down
Loading