Skip to content

Commit

Permalink
Add missing tags and MV support
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Oct 20, 2023
1 parent f835112 commit b00b4c8
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 179 deletions.
34 changes: 34 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ singleStatement
statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -76,6 +77,39 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
;

createMaterializedViewStatement
: CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier
AS query=materializedViewQuery
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
*/
materializedViewQuery
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
5 changes: 5 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,26 @@ COMMA: ',';
DOT: '.';


AS: 'AS';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
Expand All @@ -62,9 +61,8 @@ public class SparkQueryDispatcher {

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
public static final String TABLE_TAG_KEY = "table";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String DEFAULT_SCHEMA_NAME = "schema";

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -182,6 +180,8 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexDetails);

if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
} else {
Expand All @@ -192,17 +192,28 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
}
}

// TODO: Revisit this logic.
// Currently, Spark if datasource is not provided in query.
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -213,12 +224,12 @@ private DispatchQueryResponse handleIndexQuery(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.getAutoRefresh())
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexDetails.getAutoRefresh(),
indexDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(jobId, false, dataSourceMetadata.getResultIndex(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,126 @@

package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.spark.flint.FlintIndexType;

/** Index details in an async query. */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@EqualsAndHashCode
public class IndexDetails {
private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private Boolean autoRefresh = false;
private boolean autoRefresh;
private boolean isDropIndex;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexDetails() {}

public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
}

// Builder class
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;

public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
}

public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
return this;
}

public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
return this;
}

public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
return this;
}

public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
return this;
}

public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
return this;
}

public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
}
}

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
String indexName = StringUtils.EMPTY;
switch (getIndexType()) {
case COVERING:
indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
break;
case SKIPPING:
indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
break;
case MATERIALIZED_VIEW:
indexName = "flint" + "_" + getMvName();
break;
}
return indexName.toLowerCase();
}
}
Loading

0 comments on commit b00b4c8

Please sign in to comment.