OpenSearch and Spark Integration P1 Demo #317
dai-chen
started this conversation in
Show and tell
Replies: 2 comments
-
demo.mov |
Beta Was this translation helpful? Give feedback.
0 replies
-
Flint Covering Index Acceleration DemoVID_20240426173659784.movTest TableMake use of the same
Creating Covering IndexCreate covering index with all required columns indexed:
Query Test# Same test query as above EXPLAIN SELECT SUM(l_extendedprice*l_discount) AS revenue FROM ds_tables.lineitem_tiny WHERE l_shipdate = '1997-02-06' AND l_discount between 0.02 - 0.01 and 0.02 + 0.01 AND l_quantity < 24; # Without covering index acceleration: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[sum((l_extendedprice#15 * l_discount#16))]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=213] +- HashAggregate(keys=[], functions=[partial_sum((l_extendedprice#15 * l_discount#16))]) +- Project [l_extendedprice#15, l_discount#16] +- Filter ((((((isnotnull(l_shipdate#25) AND isnotnull(l_discount#16)) AND isnotnull(l_quantity#14)) AND (l_shipdate#25 = 1997-02-06)) AND (l_discount#16 > 0.01)) AND (l_discount#16 <= 0.03)) AND (l_quantity#14 < 24.0)) +- FileScan json ds_tables.lineitem_tiny[l_quantity#14,l_extendedprice#15,l_discount#16,l_shipdate#25] Batched: false, DataFilters: [isnotnull(l_shipdate#25), isnotnull(l_discount#16), isnotnull(l_quantity#14), (l_shipdate#25 = 1..., Format: JSON, Location: InMemoryFileIndex(1 paths)[s3://.../tpch-lineitem-tiny], PartitionFilters: [], PushedFilters: [IsNotNull(l_shipdate), IsNotNull(l_discount), IsNotNull(l_quantity), EqualTo(l_shipdate,1997-02-..., ReadSchema: struct # With covering index acceleration: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[sum((l_extendedprice#15 * l_discount#16))]) +- HashAggregate(keys=[], functions=[partial_sum((l_extendedprice#15 * l_discount#16))]) +- Project [l_extendedprice#15, l_discount#16] +- BatchScan[l_quantity#14, l_shipdate#25, l_extendedprice#15, l_discount#16, l_orderkey#10L] class org.apache.spark.sql.flint.FlintScan, PushedPredicates: [l_shipdate IS NOT NULL, l_discount IS NOT NULL, l_quantity IS NOT NULL, l_shipdate = 9898, l_discount > 0.01, l_discount <= 0.03, l_quantity < 24.0] RuntimeFilters: [] # TPC-H Q3 query test: EXPLAIN SELECT l_orderkey, SUM(l_extendedprice * (1 - l_discount)) AS revenue, o_orderdate, o_shippriority FROM ds_tables.orders_tiny AS o JOIN ds_tables.lineitem_tiny AS l ON o.o_orderkey = l.l_orderkey WHERE l_shipdate = '1997-02-06' GROUP BY l_orderkey, o_orderdate, o_shippriority ORDER BY revenue DESC, o_orderdate; == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [revenue#157 DESC NULLS LAST, o_orderdate#70 ASC NULLS FIRST], true, 0 +- HashAggregate(keys=[l_orderkey#10L, o_orderdate#70, o_shippriority#72], functions=[sum((l_extendedprice#15 * (1.0 - l_discount#16)))]) +- HashAggregate(keys=[l_orderkey#10L, o_orderdate#70, o_shippriority#72], functions=[partial_sum((l_extendedprice#15 * (1.0 - l_discount#16)))]) +- Project [o_orderdate#70, o_shippriority#72, l_orderkey#10L, l_extendedprice#15, l_discount#16] +- BroadcastHashJoin [o_orderkey#69L], [l_orderkey#10L], Inner, BuildLeft, false :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=286] : +- Filter isnotnull(o_orderkey#69L) : +- FileScan json ds_tables.orders_tiny[o_orderkey#69L,o_orderdate#70,o_shippriority#72] Batched: false, DataFilters: [isnotnull(o_orderkey#69L)], Format: JSON, Location: InMemoryFileIndex(1 paths)[s3://.../tpch-orders-tiny], PartitionFilters: [], PushedFilters: [IsNotNull(o_orderkey)], ReadSchema: struct +- Project [l_orderkey#10L, l_extendedprice#15, l_discount#16] +- BatchScan[l_quantity#14, l_shipdate#25, l_extendedprice#15, l_discount#16, l_orderkey#10L] class org.apache.spark.sql.flint.FlintScan, PushedPredicates: [l_shipdate IS NOT NULL, l_shipdate = 9898, l_orderkey IS NOT NULL] RuntimeFilters: [] |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Here is a follow-up demo after https://github.com/opensearch-project/sql/discussions/1465. The demo covers how to use our Spark extension to create skipping index with OpenSearch which can accelerate query with applicable filtering condition. Please find more details in https://github.com/opensearch-project/sql/blob/feature/flint/flint/docs/index.md.
Demo Video
Flint.skipping.index.demo.P1.-.Part1.mov
Flint.skipping.index.demo.P1.-.Part2.mov
Flint.skipping.index.demo.P1.-.Part3.mov
Demo Setup
Demo Data
Use
lineitem
dataset in TPC-H:Launch SparkSQL CLI
Login to EMR primary node and start spark-sql:
Create Skipping Index
You can check what OpenSearch index looks like:
Query Test
Use TPC-H Q6:
Beta Was this translation helpful? Give feedback.
All reactions