Skip to content

Commit

Permalink
Add create and drop materialized view SQL support (#73)
Browse files Browse the repository at this point in the history
* Add MV grammar with empty impl

Signed-off-by: Chen Dai <[email protected]>

* Find mv query in origin

Signed-off-by: Chen Dai <[email protected]>

* Implement create and drop statement in ast builder

Signed-off-by: Chen Dai <[email protected]>

* Add MV SQL IT

Signed-off-by: Chen Dai <[email protected]>

* Add more IT for create statement

Signed-off-by: Chen Dai <[email protected]>

* Add more IT for drop statement

Signed-off-by: Chen Dai <[email protected]>

* Update user manual with MV

Signed-off-by: Chen Dai <[email protected]>

* Update doc with MV index naming convention

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 19, 2023
1 parent 3fcf926 commit ea012af
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 5 deletions.
42 changes: 38 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ A Flint index is ...
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance
- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

Expand Down Expand Up @@ -187,6 +188,30 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs
DROP INDEX elb_and_requestUri ON alb_logs
```

#### Materialized View

```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

DROP MATERIALIZED VIEW name
```

Example:

```sql
CREATE MATERIALIZED VIEW alb_logs_metrics
AS
SELECT
window.start AS startTime,
COUNT(*) AS count
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

DROP MATERIALIZED VIEW alb_logs_metrics
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down Expand Up @@ -226,6 +251,7 @@ OpenSearch index corresponding to the Flint index follows the naming convention

1. Skipping index: `flint_[catalog_database_table]_skipping_index`
2. Covering index: `flint_[catalog_database_table]_[index_name]_index`
3. Materialized view: `flint_[catalog_database_table]_[mv_name]`

It's important to note that any uppercase letters in the index name and table name (catalog, database and table) in SQL statement will be automatically converted to lowercase due to restriction imposed by OpenSearch.

Expand Down Expand Up @@ -345,23 +371,31 @@ val flint = new FlintSpark(spark)

// Skipping index
flint.skippingIndex()
.onTable("alb_logs")
.onTable("spark_catalog.default.alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)
flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index", FULL)

// Covering index
flint.coveringIndex()
.name("elb_and_requestUri")
.onTable("alb_logs")
.onTable("spark_catalog.default.alb_logs")
.addIndexColumns("elb", "requestUri")
.create()

flint.refresh("flint_alb_logs_elb_and_requestUri_index")
flint.refreshIndex("flint_spark_catalog_default_alb_logs_elb_and_requestUri_index")

// Materialized view
flint.materializedView()
.name("spark_catalog.default.alb_logs_metrics")
.query("SELECT ...")
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics")
```

#### Skipping Index Provider SPI
Expand Down
24 changes: 24 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ singleStatement
statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -76,6 +77,29 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| dropMaterializedViewStatement
;

createMaterializedViewStatement
: CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier
AS query=materializedViewQuery
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
*/
materializedViewQuery
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ COMMA: ',';
DOT: '.';


AS: 'AS';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand All @@ -163,12 +164,14 @@ FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -21,6 +22,7 @@ class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[AnyRef]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with SparkSqlAstBuilder {

override def visit(tree: ParseTree): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -54,7 +55,10 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser =>
try {
flintAstBuilder.visit(flintParser.singleStatement())
val ctx = flintParser.singleStatement()
withOrigin(ctx, Some(sqlText)) {
flintAstBuilder.visit(ctx)
}
} catch {
// Fall back to Spark parse plan logic if flint cannot parse
case _: ParseException => sparkParser.parsePlan(sqlText)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.mv

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext}

import org.apache.spark.sql.catalyst.trees.CurrentOrigin

/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
*/
trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
ctx: CreateMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
val mvName = getFullTableName(flint, ctx.mvName)
val query = getMvQuery(ctx.query)

val mvBuilder = flint
.materializedView()
.name(mvName)
.query(query)

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
mvBuilder
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
}

private def getMvQuery(ctx: MaterializedViewQueryContext): String = {
// Assume origin must be preserved at the beginning of parsing
val sqlText = CurrentOrigin.get.sqlText.get
val startIndex = ctx.getStart.getStartIndex
val stopIndex = ctx.getStop.getStopIndex
sqlText.substring(startIndex, stopIndex + 1)
}

private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = {
val fullMvName = getFullTableName(flint, mvNameCtx)
FlintSparkMaterializedView.getFlintIndexName(fullMvName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.sql.Timestamp

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row

class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

/** Test table, MV, index name and query */
private val testTable = "spark_catalog.default.mv_test"
private val testMvName = "spark_catalog.default.mv_test_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(time, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
super.beforeAll()
createTimeSeriesTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
}

test("create materialized view with auto refresh") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 02:00:00"), 1)
*/
))
}
}

test("create materialized view with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| refresh_interval = '5 Seconds',
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("5 Seconds")
index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("create materialized view with index settings") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
the[IllegalStateException] thrownBy
sql(s"CREATE MATERIALIZED VIEW $testMvName AS $testQuery")

sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("drop materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

sql(s"DROP MATERIALIZED VIEW $testMvName")

flint.describeIndex(testFlintIndex) shouldBe empty
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}

0 comments on commit ea012af

Please sign in to comment.