From 0d5d1883ddb2789d4dd2d3671a560941bce42e62 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 11 Oct 2024 16:23:15 -0700 Subject: [PATCH 1/3] backport 768 bug fix to 0.5 Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 97 ++++++++++++++++++- ppl-spark-integration/README.md | 6 +- .../org/opensearch/sql/ast/tree/Relation.java | 7 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 24 +---- .../sql/ppl/utils/RelationUtils.java | 25 ++++- ...lPlanBasicQueriesTranslatorTestSuite.scala | 21 +++- 6 files changed, 150 insertions(+), 30 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 6d9c3a5ab..0f2ef1861 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -5,12 +5,13 @@ package org.opensearch.flint.spark.ppl -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.execution.ExplainMode +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand} import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLBasicITSuite @@ -21,12 +22,20 @@ class FlintSparkPPLBasicITSuite /** Test table and index name */ private val testTable = "spark_catalog.default.flint_ppl_test" - + private val t1 = "`spark_catalog`.`default`.`flint_ppl_test1`" + private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" + private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" + private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" + override def beforeAll(): Unit = { super.beforeAll() // Create test table createPartitionedStateCountryTable(testTable) + createPartitionedStateCountryTable(t1) + createPartitionedStateCountryTable(t2) + createPartitionedStateCountryTable(t3) + createPartitionedStateCountryTable(t4) } protected override def afterEach(): Unit = { @@ -118,6 +127,88 @@ class FlintSparkPPLBasicITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + test("test backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql( + s""" + | source = $table| head 2 + | """.stripMargin) + assert(frame.collect().length == 2) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql( + s""" + | source = $table| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql( + s""" + | source = $t7| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) + } + + test("test describe backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql( + s""" + | describe $table + | """.stripMargin) + assert(frame.collect().length > 0) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql( + s""" + | describe $table + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql( + s""" + | describe $t7 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) + } + + test("test explain backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql( + s""" + | explain extended | source = $table + | """.stripMargin) + assert(frame.collect().length > 0) + } + // test read table which is unable to create + val table = "`spark_catalog`.default.`flint/ppl/test4.log`" + val frame = sql( + s""" + | explain extended | source = $table + | """.stripMargin) + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint/ppl/test4.log")) + val expectedPlan: LogicalPlan = + ExplainCommand( + Project(Seq(UnresolvedStar(None)), relation), + ExplainMode.fromString("extended")) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql( + s""" + | explain extended | source = $t7 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) + } + test("create ppl simple query test") { val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" Seq(testTable, testTableQuoted).foreach { table => diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index ab16808a1..eb5ca2a13 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -226,7 +226,11 @@ See the next samples of PPL queries : **Describe** - `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command - + - `describe schema.table` + - `` describe schema.`table` `` + - `describe catalog.schema.table` + - `` describe catalog.schema.`table` `` + - `` describe `catalog`.`schema`.`table` `` **Fields** - `source = table` - `source = table | fields a,b,c` diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java index 6a482db67..da442ec01 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -38,7 +38,6 @@ public Relation(UnresolvedExpression tableName, String alias) { private String alias; /** - * Return table name. * * @return table name */ @@ -46,7 +45,11 @@ public List getTableName() { return tableName.stream().map(Object::toString).collect(Collectors.toList()); } - + + public List getQualifiedNames() { + return tableName.stream().map(t -> (QualifiedName) t).collect(Collectors.toList()); + } + /** * Return alias. * diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index be00796a8..adada043e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -92,6 +92,7 @@ import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEvent; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEventAndKeepEmpty; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; +import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; @@ -126,22 +127,7 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { if (node instanceof DescribeRelation) { - TableIdentifier identifier; - if (node.getTableQualifiedName().getParts().size() == 1) { - identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); - } else if (node.getTableQualifiedName().getParts().size() == 2) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(1), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); - } else if (node.getTableQualifiedName().getParts().size() == 3) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(2), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(1))); - } else { - throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() - + " Syntax: [ database_name. ] table_name"); - } + TableIdentifier identifier = getTableIdentifier(node.getTableQualifiedName()); return context.with( new DescribeTableCommand( identifier, @@ -149,10 +135,10 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { true, DescribeRelation$.MODULE$.getOutputAttrs())); } - //regular sql algebraic relations - node.getTableName().forEach(t -> + //regular sql algebraic relations + node.getQualifiedNames().forEach(q -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index 33cb5611d..c325f5185 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -1,8 +1,10 @@ package org.opensearch.sql.ppl.utils; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.QualifiedName; +import scala.Option$; import java.util.List; import java.util.Optional; @@ -15,7 +17,6 @@ public interface RelationUtils { * * @param relations * @param node - * @param contextRelations * @return */ static Optional resolveField(List relations, QualifiedName node, List tables) { @@ -29,4 +30,26 @@ static Optional resolveField(List relations, .findFirst() .map(rel -> node); } + + static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { + TableIdentifier identifier; + if (qualifiedName.getParts().isEmpty()) { + throw new IllegalArgumentException("Empty table name is invalid"); + } else if (qualifiedName.getParts().size() == 1) { + identifier = new TableIdentifier(qualifiedName.getParts().get(0)); + } else if (qualifiedName.getParts().size() == 2) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(1), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else if (qualifiedName.getParts().size() == 3) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(2), + Option$.MODULE$.apply(qualifiedName.getParts().get(1)), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else { + throw new IllegalArgumentException("Invalid table name: " + qualifiedName + + " Syntax: [ database_name. ] table_name"); + } + return identifier; + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 34de86d92..c65e65a3d 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -36,14 +36,27 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite assert( thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") } + + test("test describe with backticks") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "describe t.b.`c.d`", false), context) + val expectedPlan = DescribeTableCommand( + TableIdentifier("c.d", Option("b"), Option("t")), + Map.empty[String, String].empty, + isExtended = true, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + test("test describe FQN table clause") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "describe schema.default.http_logs", false), context) + planTransformer.visit(plan(pplParser, "describe catalog.schema.http_logs", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("http_logs", Option("schema"), Option("default")), + TableIdentifier("http_logs", Option("schema"), Option("catalog")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) @@ -64,10 +77,10 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test FQN table describe table clause") { val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context) + val logPlan = planTransformer.visit(plan(pplParser, "describe schema.t", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("t", Option("catalog")), + TableIdentifier("t", Option("schema")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) From 3c868375dc971b015db8d52293433952306fcb65 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 11 Oct 2024 16:38:43 -0700 Subject: [PATCH 2/3] remove explain test Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 0f2ef1861..4570153e8 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -179,36 +179,6 @@ class FlintSparkPPLBasicITSuite assert(ex.getMessage().contains("Invalid table name")) } - test("test explain backtick table names and name contains '.'") { - Seq(t1, t2, t3, t4).foreach { table => - val frame = sql( - s""" - | explain extended | source = $table - | """.stripMargin) - assert(frame.collect().length > 0) - } - // test read table which is unable to create - val table = "`spark_catalog`.default.`flint/ppl/test4.log`" - val frame = sql( - s""" - | explain extended | source = $table - | """.stripMargin) - val logicalPlan: LogicalPlan = frame.queryExecution.logical - val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint/ppl/test4.log")) - val expectedPlan: LogicalPlan = - ExplainCommand( - Project(Seq(UnresolvedStar(None)), relation), - ExplainMode.fromString("extended")) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) - - val t7 = "spark_catalog.default.flint_ppl_test7.log" - val ex = intercept[IllegalArgumentException](sql( - s""" - | explain extended | source = $t7 - | """.stripMargin)) - assert(ex.getMessage().contains("Invalid table name")) - } - test("create ppl simple query test") { val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" Seq(testTable, testTableQuoted).foreach { table => From 0875ed83953d4e98979e2eef177c7477b1a4edd0 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 11 Oct 2024 16:50:42 -0700 Subject: [PATCH 3/3] fix scala format Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 20 +++++++------------ ...lPlanBasicQueriesTranslatorTestSuite.scala | 4 ++-- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 4570153e8..45f66bc24 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -26,7 +26,7 @@ class FlintSparkPPLBasicITSuite private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" - + override def beforeAll(): Unit = { super.beforeAll() @@ -129,8 +129,7 @@ class FlintSparkPPLBasicITSuite test("test backtick table names and name contains '.'") { Seq(t1, t2, t3, t4).foreach { table => - val frame = sql( - s""" + val frame = sql(s""" | source = $table| head 2 | """.stripMargin) assert(frame.collect().length == 2) @@ -139,15 +138,13 @@ class FlintSparkPPLBasicITSuite val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" val t6 = "spark_catalog.default.`flint_ppl_test6.log`" Seq(t5, t6).foreach { table => - val ex = intercept[AnalysisException](sql( - s""" + val ex = intercept[AnalysisException](sql(s""" | source = $table| head 2 | """.stripMargin)) assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) } val t7 = "spark_catalog.default.flint_ppl_test7.log" - val ex = intercept[IllegalArgumentException](sql( - s""" + val ex = intercept[IllegalArgumentException](sql(s""" | source = $t7| head 2 | """.stripMargin)) assert(ex.getMessage().contains("Invalid table name")) @@ -155,8 +152,7 @@ class FlintSparkPPLBasicITSuite test("test describe backtick table names and name contains '.'") { Seq(t1, t2, t3, t4).foreach { table => - val frame = sql( - s""" + val frame = sql(s""" | describe $table | """.stripMargin) assert(frame.collect().length > 0) @@ -165,15 +161,13 @@ class FlintSparkPPLBasicITSuite val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" val t6 = "spark_catalog.default.`flint_ppl_test6.log`" Seq(t5, t6).foreach { table => - val ex = intercept[AnalysisException](sql( - s""" + val ex = intercept[AnalysisException](sql(s""" | describe $table | """.stripMargin)) assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) } val t7 = "spark_catalog.default.flint_ppl_test7.log" - val ex = intercept[IllegalArgumentException](sql( - s""" + val ex = intercept[IllegalArgumentException](sql(s""" | describe $t7 | """.stripMargin)) assert(ex.getMessage().contains("Invalid table name")) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index c65e65a3d..054ab97db 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -36,7 +36,7 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite assert( thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") } - + test("test describe with backticks") { val context = new CatalystPlanContext val logPlan = @@ -49,7 +49,7 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite output = DescribeRelation.getOutputAttrs) comparePlans(expectedPlan, logPlan, false) } - + test("test describe FQN table clause") { val context = new CatalystPlanContext val logPlan =