diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 627d6cfcc5..f5ef155c9c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -155,9 +155,10 @@ private URI parseUri(String opensearchUri, String datasourceName) { } } - public Builder structuredStreaming() { - config.put("spark.flint.job.type", "streaming"); - + public Builder structuredStreaming(Boolean isStructuredStreaming) { + if (isStructuredStreaming) { + config.put("spark.flint.job.type", "streaming"); + } return this; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 1fdc391c85..2749d7c934 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -123,11 +123,11 @@ private StartJobRequest getStartJobRequestForIndexRequest( .dataSource( dataSourceService.getRawDataSourceMetadata( dispatchQueryRequest.getDatasource())) - .structuredStreaming() + .structuredStreaming(indexDetails.getAutoRefresh()) .build() .toString(), tags, - true); + indexDetails.getAutoRefresh()); return startJobRequest; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java index 5067439061..86fca60525 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java @@ -12,4 +12,6 @@ public class IndexDetails { private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; + // by default, auto_refresh = false; + private Boolean autoRefresh = false; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index 2ddc34af5a..481591a4f0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.utils; +import java.util.Locale; import lombok.Getter; import lombok.experimental.UtilityClass; import org.antlr.v4.runtime.CommonTokenStream; @@ -132,5 +133,56 @@ public Void visitTableName(FlintSparkSqlExtensionsParser.TableNameContext ctx) { indexDetails.setFullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); return super.visitTableName(ctx); } + + @Override + public Void visitCreateSkippingIndexStatement( + FlintSparkSqlExtensionsParser.CreateSkippingIndexStatementContext ctx) { + visitPropertyList(ctx.propertyList()); + return super.visitCreateSkippingIndexStatement(ctx); + } + + @Override + public Void visitCreateCoveringIndexStatement( + FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext ctx) { + visitPropertyList(ctx.propertyList()); + return super.visitCreateCoveringIndexStatement(ctx); + } + + @Override + public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) { + if (ctx != null) { + ctx.property() + .forEach( + property -> { + // todo. Currently, we use contains() api to avoid unescape string. In future, we + // should leverage + // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal + if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) { + if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) { + indexDetails.setAutoRefresh(true); + } + } + }); + } + return null; + } + + private String propertyKey(FlintSparkSqlExtensionsParser.PropertyKeyContext key) { + if (key.STRING() != null) { + return key.STRING().getText(); + } else { + return key.getText(); + } + } + + private String propertyValue(FlintSparkSqlExtensionsParser.PropertyValueContext value) { + if (value.STRING() != null) { + return value.STRING().getText(); + } else if (value.booleanValue() != null) { + return value.getText(); + } else { + return value.getText(); + } + } } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 91b5befe88..af892fa097 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -5,6 +5,10 @@ package org.opensearch.sql.spark.utils; +import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.index; +import static org.opensearch.sql.spark.utils.SQLQueryUtilsTest.IndexQuery.skippingIndex; + +import lombok.Getter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -107,4 +111,74 @@ void testExtractionFromFlintIndexQueries() { Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); } + + /** https://github.com/opensearch-project/sql/issues/2206 */ + @Test + void testAutoRefresh() { + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).getAutoRefresh()); + + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails( + skippingIndex().withProperty("auto_refresh", "false").getQuery()) + .getAutoRefresh()); + + Assertions.assertTrue( + SQLQueryUtils.extractIndexDetails( + skippingIndex().withProperty("auto_refresh", "true").getQuery()) + .getAutoRefresh()); + + Assertions.assertTrue( + SQLQueryUtils.extractIndexDetails( + skippingIndex().withProperty("\"auto_refresh\"", "true").getQuery()) + .getAutoRefresh()); + + Assertions.assertTrue( + SQLQueryUtils.extractIndexDetails( + skippingIndex().withProperty("\"auto_refresh\"", "\"true\"").getQuery()) + .getAutoRefresh()); + + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails( + skippingIndex().withProperty("auto_refresh", "1").getQuery()) + .getAutoRefresh()); + + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails(skippingIndex().withProperty("interval", "1").getQuery()) + .getAutoRefresh()); + + Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).getAutoRefresh()); + + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "false").getQuery()) + .getAutoRefresh()); + + Assertions.assertTrue( + SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "true").getQuery()) + .getAutoRefresh()); + } + + @Getter + protected static class IndexQuery { + private String query; + + private IndexQuery(String query) { + this.query = query; + } + + public static IndexQuery skippingIndex() { + return new IndexQuery( + "CREATE SKIPPING INDEX ON myS3.default.alb_logs" + "(l_orderkey VALUE_SET)"); + } + + public static IndexQuery index() { + return new IndexQuery( + "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, " + "l_quantity)"); + } + + public IndexQuery withProperty(String key, String value) { + query = String.format("%s with (%s = %s)", query, key, value); + return this; + } + } }