From 9c988b01607b5ed8f9eab3e36f9ee5757f223503 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Thu, 8 Jun 2023 06:11:02 +0000 Subject: [PATCH 01/11] Initial spark application draft Signed-off-by: Rupal Mahajan --- spark-sql-application/.gitignore | 14 +++ spark-sql-application/README.md | 1 + spark-sql-application/build.sbt | 22 +++++ .../project/build.properties | 1 + spark-sql-application/project/plugins.sbt | 1 + .../scala/org/opensearch/sql/SQLJob.scala | 90 +++++++++++++++++++ 6 files changed, 129 insertions(+) create mode 100644 spark-sql-application/.gitignore create mode 100644 spark-sql-application/README.md create mode 100644 spark-sql-application/build.sbt create mode 100644 spark-sql-application/project/build.properties create mode 100644 spark-sql-application/project/plugins.sbt create mode 100644 spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala diff --git a/spark-sql-application/.gitignore b/spark-sql-application/.gitignore new file mode 100644 index 0000000000..ec13a702be --- /dev/null +++ b/spark-sql-application/.gitignore @@ -0,0 +1,14 @@ +# Compiled output +target/ +project/target/ + +# sbt-specific files +.sbtserver +.sbt/ +.bsp/ + +# Miscellaneous +.DS_Store +*.class +*.log +*.zip \ No newline at end of file diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md new file mode 100644 index 0000000000..cb657e65bb --- /dev/null +++ b/spark-sql-application/README.md @@ -0,0 +1 @@ +# spark-sql-application \ No newline at end of file diff --git a/spark-sql-application/build.sbt b/spark-sql-application/build.sbt new file mode 100644 index 0000000000..436cad8e66 --- /dev/null +++ b/spark-sql-application/build.sbt @@ -0,0 +1,22 @@ +name := "sql-job" + +version := "1.0" + +scalaVersion := "2.12.15" + +val sparkVersion = "3.3.1" + +mainClass := Some("org.opensearch.sql.SQLJob") + +artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => + "sql-job.jar" +} + +resolvers ++= Seq( + ("apache-snapshots" at "http://repository.apache.org/snapshots/").withAllowInsecureProtocol(true) +) + +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" +) diff --git a/spark-sql-application/project/build.properties b/spark-sql-application/project/build.properties new file mode 100644 index 0000000000..46e43a97ed --- /dev/null +++ b/spark-sql-application/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.8.2 diff --git a/spark-sql-application/project/plugins.sbt b/spark-sql-application/project/plugins.sbt new file mode 100644 index 0000000000..784a39b267 --- /dev/null +++ b/spark-sql-application/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") \ No newline at end of file diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala new file mode 100644 index 0000000000..f4eb66d543 --- /dev/null +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -0,0 +1,90 @@ +package org.opensearch.sql + +import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext, Row} +import org.apache.spark.SparkConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ + +object SQLJob { + def main(args: Array[String]) { + // Get the SQL query and Opensearch Config from the command line arguments + val query = args(0) + val index = args(1) + val aos_host = args(2) + val aos_region = args(3) + + val aos = Map( + "host" -> aos_host, + "port" -> "-1", + "scheme" -> "https", + "auth" -> "sigv4", + "region" -> aos_region) + + // Create a SparkSession + val spark = SparkSession.builder().appName("SQLJob").getOrCreate() + + try { + // Create a tableschema for the DataFrame + val tableschema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("age", IntegerType, nullable = false), + StructField("city", StringType, nullable = false) + )) + + // Create a sequence of Row objects representing the data + val tabledata = Seq( + Row("Tina", 29, "Bellevue"), + Row("Jane", 25, "London"), + Row("Mike", 35, "Paris") + ) + + // Create a DataFrame from the tableschema and data + val df = spark.createDataFrame(spark.sparkContext.parallelize(tabledata), tableschema) + + // Register the DataFrame as a temporary table/view + df.createOrReplaceTempView("my_table") // Replace "my_table" with your desired name + + // Execute SQL query + val result: DataFrame = spark.sql(query) + val resultJsonBlob = result.select(to_json(collect_list(struct(result.columns.map(col): _*)))).first().getString(0) + + // Convert the schema to a DataFrame + // Get the schema of the DataFrame + val resultschema = result.schema + val resultschemaRows = resultschema.fields.map { field => + Row(field.name, field.dataType.typeName) + } + val resultschemaDF = spark.createDataFrame(spark.sparkContext.parallelize(resultschemaRows), StructType(Seq( + StructField("column_name", StringType, nullable = false), + StructField("data_type", StringType, nullable = false) + ))) + val schemaJsonBlob = resultschemaDF.select(to_json(collect_list(struct(resultschemaDF.columns.map(col): _*)))).first().getString(0) + + // Create a step DataFrame from the tableschema and data + val envVarValue = sys.env.getOrElse("EMR_STEP_ID", "") + + val stepRows = Seq( + Row(envVarValue) + ) + + val stepDF = spark.createDataFrame(spark.sparkContext.parallelize(stepRows), StructType(Seq( + StructField("step_id", StringType, nullable = false) + ))) + + // Add JSON blob column to df1 + val stepWithSchema = stepDF.withColumn("schema", lit(schemaJsonBlob)).withColumn("result",lit(resultJsonBlob)) + stepWithSchema.show(false) + + stepWithSchema.write + .format("flint") + .options(aos) + .mode("append") + .save(index) + println("Result saved to OpenSearch index successfully.") + + } finally { + // Stop SparkSession + spark.stop() + } + } +} From e234a516277864fcd156b27587ef8080d1b3497b Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Thu, 8 Jun 2023 06:48:50 +0000 Subject: [PATCH 02/11] Remove temp table Signed-off-by: Rupal Mahajan --- .../scala/org/opensearch/sql/SQLJob.scala | 56 +++++-------------- 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index f4eb66d543..bba21ad8cb 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -13,7 +13,7 @@ object SQLJob { val aos_host = args(2) val aos_region = args(3) - val aos = Map( + val aos = Map( "host" -> aos_host, "port" -> "-1", "scheme" -> "https", @@ -24,63 +24,37 @@ object SQLJob { val spark = SparkSession.builder().appName("SQLJob").getOrCreate() try { - // Create a tableschema for the DataFrame - val tableschema = StructType(Seq( - StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false), - StructField("city", StringType, nullable = false) - )) - - // Create a sequence of Row objects representing the data - val tabledata = Seq( - Row("Tina", 29, "Bellevue"), - Row("Jane", 25, "London"), - Row("Mike", 35, "Paris") - ) - - // Create a DataFrame from the tableschema and data - val df = spark.createDataFrame(spark.sparkContext.parallelize(tabledata), tableschema) - - // Register the DataFrame as a temporary table/view - df.createOrReplaceTempView("my_table") // Replace "my_table" with your desired name - // Execute SQL query val result: DataFrame = spark.sql(query) - val resultJsonBlob = result.select(to_json(collect_list(struct(result.columns.map(col): _*)))).first().getString(0) + val resultJson = result.select(to_json(collect_list(struct(result.columns.map(col): _*)))).first().getString(0) // Convert the schema to a DataFrame - // Get the schema of the DataFrame - val resultschema = result.schema - val resultschemaRows = resultschema.fields.map { field => + val schema = result.schema + val schemaRow = schema.fields.map { field => Row(field.name, field.dataType.typeName) } - val resultschemaDF = spark.createDataFrame(spark.sparkContext.parallelize(resultschemaRows), StructType(Seq( + val schemaDF = spark.createDataFrame(spark.sparkContext.parallelize(schemaRow), StructType(Seq( StructField("column_name", StringType, nullable = false), StructField("data_type", StringType, nullable = false) ))) - val schemaJsonBlob = resultschemaDF.select(to_json(collect_list(struct(resultschemaDF.columns.map(col): _*)))).first().getString(0) - - // Create a step DataFrame from the tableschema and data - val envVarValue = sys.env.getOrElse("EMR_STEP_ID", "") + val schemaJson = schemaDF.select(to_json(collect_list(struct(schemaDF.columns.map(col): _*)))).first().getString(0) - val stepRows = Seq( - Row(envVarValue) + // Create a DataFrame with stepId, schema and result + val dataRow = Seq( + Row(sys.env.getOrElse("EMR_STEP_ID", "")) ) - - val stepDF = spark.createDataFrame(spark.sparkContext.parallelize(stepRows), StructType(Seq( - StructField("step_id", StringType, nullable = false) + val dataDF = spark.createDataFrame(spark.sparkContext.parallelize(dataRow), StructType(Seq( + StructField("stepId", StringType, nullable = false) ))) - // Add JSON blob column to df1 - val stepWithSchema = stepDF.withColumn("schema", lit(schemaJsonBlob)).withColumn("result",lit(resultJsonBlob)) - stepWithSchema.show(false) + val data = dataDF.withColumn("schema", lit(schemaJson)).withColumn("result",lit(resultJson)) - stepWithSchema.write + // Write data to OpenSearch index + data.write .format("flint") .options(aos) .mode("append") - .save(index) - println("Result saved to OpenSearch index successfully.") + .save(index) } finally { // Stop SparkSession From 25160fdd86e65439f51e067a61491804e5e9d4ff Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Thu, 8 Jun 2023 15:56:46 +0000 Subject: [PATCH 03/11] Add license header Signed-off-by: Rupal Mahajan --- spark-sql-application/build.sbt | 5 ++++ spark-sql-application/project/plugins.sbt | 7 ++++- .../scala/org/opensearch/sql/SQLJob.scala | 29 ++++++++++++------- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/spark-sql-application/build.sbt b/spark-sql-application/build.sbt index 436cad8e66..2f109d66c8 100644 --- a/spark-sql-application/build.sbt +++ b/spark-sql-application/build.sbt @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + name := "sql-job" version := "1.0" diff --git a/spark-sql-application/project/plugins.sbt b/spark-sql-application/project/plugins.sbt index 784a39b267..06b2357652 100644 --- a/spark-sql-application/project/plugins.sbt +++ b/spark-sql-application/project/plugins.sbt @@ -1 +1,6 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") \ No newline at end of file +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") \ No newline at end of file diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index bba21ad8cb..a4cbba4b78 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -1,7 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql -import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext, Row} -import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession, Row} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ @@ -10,15 +14,11 @@ object SQLJob { // Get the SQL query and Opensearch Config from the command line arguments val query = args(0) val index = args(1) - val aos_host = args(2) - val aos_region = args(3) - - val aos = Map( - "host" -> aos_host, - "port" -> "-1", - "scheme" -> "https", - "auth" -> "sigv4", - "region" -> aos_region) + val host = args(2) + val port = args(3) + val scheme = args(4) + val auth = args(5) + val region = args(6) // Create a SparkSession val spark = SparkSession.builder().appName("SQLJob").getOrCreate() @@ -50,6 +50,13 @@ object SQLJob { val data = dataDF.withColumn("schema", lit(schemaJson)).withColumn("result",lit(resultJson)) // Write data to OpenSearch index + val aos = Map( + "host" -> host, + "port" -> port, + "scheme" -> scheme, + "auth" -> auth, + "region" -> region) + data.write .format("flint") .options(aos) From 6f95cf0c91f08c5c77dc7d461148fbf9f4f79886 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Thu, 8 Jun 2023 13:38:00 -0700 Subject: [PATCH 04/11] Add scalastyle-config and update readme Signed-off-by: Rupal Mahajan --- spark-sql-application/README.md | 65 +++++++++- spark-sql-application/project/plugins.sbt | 3 +- spark-sql-application/scalastyle-config.xml | 117 ++++++++++++++++++ .../scala/org/opensearch/sql/SQLJob.scala | 46 ++++--- 4 files changed, 211 insertions(+), 20 deletions(-) create mode 100644 spark-sql-application/scalastyle-config.xml diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index cb657e65bb..548ca3560d 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -1 +1,64 @@ -# spark-sql-application \ No newline at end of file +# Spark SQL Application + +This application execute sql query and store the result in OpenSearch index in following format +``` +"stepId":"", +"schema": "json blob", +"result": "json blob" +``` + +## Prerequisites + ++ Spark 3.3.1 ++ Scala 2.12.15 ++ flint-spark-integration + +## Usage + +To use this application, you can run Spark with Flint extension: + +``` +./bin/spark-submit \ + --class org.opensearch.sql.SQLJob \ + --jars \ + sql-job.jar \ + \ + \ + \ + \ + \ + \ + \ +``` + +## Build + +To build and run this application with Spark, you can run: + +``` +sbt clean publishLocal +``` + +## Scalastyle + +To check code with scalastyle, you can run: + +``` +sbt scalastyle +``` + +## Code of Conduct + +This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md). + +## Security + +If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public GitHub issue. + +## License + +See the [LICENSE](../LICENSE.txt) file for our project's licensing. We will ask you to confirm the licensing of your contribution. + +## Copyright + +Copyright OpenSearch Contributors. See [NOTICE](../NOTICE) for details. \ No newline at end of file diff --git a/spark-sql-application/project/plugins.sbt b/spark-sql-application/project/plugins.sbt index 06b2357652..f587336319 100644 --- a/spark-sql-application/project/plugins.sbt +++ b/spark-sql-application/project/plugins.sbt @@ -3,4 +3,5 @@ * SPDX-License-Identifier: Apache-2.0 */ - addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") \ No newline at end of file + addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") + addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") \ No newline at end of file diff --git a/spark-sql-application/scalastyle-config.xml b/spark-sql-application/scalastyle-config.xml new file mode 100644 index 0000000000..7e3596f126 --- /dev/null +++ b/spark-sql-application/scalastyle-config.xml @@ -0,0 +1,117 @@ + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index a4cbba4b78..4562f00954 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -26,28 +26,14 @@ object SQLJob { try { // Execute SQL query val result: DataFrame = spark.sql(query) - val resultJson = result.select(to_json(collect_list(struct(result.columns.map(col): _*)))).first().getString(0) + val resultJson = getJson(result) // Convert the schema to a DataFrame - val schema = result.schema - val schemaRow = schema.fields.map { field => - Row(field.name, field.dataType.typeName) - } - val schemaDF = spark.createDataFrame(spark.sparkContext.parallelize(schemaRow), StructType(Seq( - StructField("column_name", StringType, nullable = false), - StructField("data_type", StringType, nullable = false) - ))) - val schemaJson = schemaDF.select(to_json(collect_list(struct(schemaDF.columns.map(col): _*)))).first().getString(0) + val schema = schemaDF(spark, result) + val schemaJson = getJson(schema) // Create a DataFrame with stepId, schema and result - val dataRow = Seq( - Row(sys.env.getOrElse("EMR_STEP_ID", "")) - ) - val dataDF = spark.createDataFrame(spark.sparkContext.parallelize(dataRow), StructType(Seq( - StructField("stepId", StringType, nullable = false) - ))) - - val data = dataDF.withColumn("schema", lit(schemaJson)).withColumn("result",lit(resultJson)) + val data = stepIdDF(spark).withColumn("schema", lit(schemaJson)).withColumn("result",lit(resultJson)) // Write data to OpenSearch index val aos = Map( @@ -68,4 +54,28 @@ object SQLJob { spark.stop() } } + + def getJson(df: DataFrame): String = { + df.select(to_json(collect_list(struct(df.columns.map(col): _*)))).first().getString(0) + } + + def schemaDF(spark: SparkSession, result: DataFrame): DataFrame = { + val schema = result.schema + val schemaRow = schema.fields.map { field => + Row(field.name, field.dataType.typeName) + } + spark.createDataFrame(spark.sparkContext.parallelize(schemaRow), StructType(Seq( + StructField("column_name", StringType, nullable = false), + StructField("data_type", StringType, nullable = false) + ))) + } + + def stepIdDF(spark: SparkSession): DataFrame = { + val dataRow = Seq( + Row(sys.env.getOrElse("EMR_STEP_ID", "")) + ) + spark.createDataFrame(spark.sparkContext.parallelize(dataRow), StructType(Seq( + StructField("stepId", StringType, nullable = false) + ))) + } } From 7e0216010d80983cd5b57a966758ffacae229b08 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Thu, 8 Jun 2023 23:38:54 -0700 Subject: [PATCH 05/11] Fix datatype for result and schema Signed-off-by: Rupal Mahajan --- .../scala/org/opensearch/sql/SQLJob.scala | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index 4562f00954..2f4ff56c50 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -26,14 +26,10 @@ object SQLJob { try { // Execute SQL query val result: DataFrame = spark.sql(query) - val resultJson = getJson(result) - - // Convert the schema to a DataFrame - val schema = schemaDF(spark, result) - val schemaJson = getJson(schema) - // Create a DataFrame with stepId, schema and result - val data = stepIdDF(spark).withColumn("schema", lit(schemaJson)).withColumn("result",lit(resultJson)) + val resultJson = getJson(result) + val schemaJson = getJson(getSchema(spark, result)) + val data = getList(spark, resultJson, "result").join(getList(spark, schemaJson, "schema")).withColumn("stepId", lit(sys.env.getOrElse("EMR_STEP_ID", ""))) // Write data to OpenSearch index val aos = Map( @@ -55,27 +51,25 @@ object SQLJob { } } - def getJson(df: DataFrame): String = { - df.select(to_json(collect_list(struct(df.columns.map(col): _*)))).first().getString(0) + def getJson(df: DataFrame): DataFrame = { + df.select(to_json(struct(df.columns.map(col): _*))).toDF("json") } - def schemaDF(spark: SparkSession, result: DataFrame): DataFrame = { - val schema = result.schema - val schemaRow = schema.fields.map { field => + def getSchema(spark: SparkSession, result: DataFrame): DataFrame = { + val resultschema = result.schema + val resultschemaRows = resultschema.fields.map { field => Row(field.name, field.dataType.typeName) } - spark.createDataFrame(spark.sparkContext.parallelize(schemaRow), StructType(Seq( + spark.createDataFrame(spark.sparkContext.parallelize(resultschemaRows), StructType(Seq( StructField("column_name", StringType, nullable = false), StructField("data_type", StringType, nullable = false) ))) } - def stepIdDF(spark: SparkSession): DataFrame = { - val dataRow = Seq( - Row(sys.env.getOrElse("EMR_STEP_ID", "")) - ) - spark.createDataFrame(spark.sparkContext.parallelize(dataRow), StructType(Seq( - StructField("stepId", StringType, nullable = false) - ))) + def getList(spark: SparkSession, resultJson: DataFrame, name:String): DataFrame = { + val list = resultJson.agg(collect_list("json").as(name)).head().getSeq[String](0) + val schema = StructType(Seq(StructField(name, ArrayType(StringType)))) + val rdd = spark.sparkContext.parallelize(Seq(Row(list))) + spark.createDataFrame(rdd, schema) } } From b72c5a318dc7bd93ad67c8b7becbaea3c1e3ad25 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Fri, 9 Jun 2023 14:42:37 -0700 Subject: [PATCH 06/11] Add test Signed-off-by: Rupal Mahajan --- spark-sql-application/README.md | 8 ++ spark-sql-application/build.sbt | 5 +- .../scala/org/opensearch/sql/SQLJobTest.scala | 85 +++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index 548ca3560d..e473701e19 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -39,6 +39,14 @@ To build and run this application with Spark, you can run: sbt clean publishLocal ``` +## Test + +To run tests, you can use: + +``` +sbt test +``` + ## Scalastyle To check code with scalastyle, you can run: diff --git a/spark-sql-application/build.sbt b/spark-sql-application/build.sbt index 2f109d66c8..79d69a30d1 100644 --- a/spark-sql-application/build.sbt +++ b/spark-sql-application/build.sbt @@ -9,7 +9,7 @@ version := "1.0" scalaVersion := "2.12.15" -val sparkVersion = "3.3.1" +val sparkVersion = "3.3.2" mainClass := Some("org.opensearch.sql.SQLJob") @@ -23,5 +23,6 @@ resolvers ++= Seq( libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", - "org.apache.spark" %% "spark-sql" % sparkVersion % "provided" + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.scalatest" %% "scalatest" % "3.2.15" % Test ) diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala new file mode 100644 index 0000000000..576f255a19 --- /dev/null +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql + +import org.scalatest.funsuite.AnyFunSuite +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} + + +class SQLJobTest extends AnyFunSuite{ + + val spark = SparkSession.builder().appName("Test").master("local").getOrCreate() + + // Define input dataframe + val inputSchema = StructType(Seq( + StructField("Letter", StringType, nullable = false), + StructField("Number", IntegerType, nullable = false) + )) + val inputRows = Seq( + Row("A", 1), + Row("B", 2), + Row("C", 3) + ) + val inputDF: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) + + test("Test getJson method") { + // Define expected dataframe + val expectedSchema = StructType(Seq( + StructField("json", StringType, nullable = true) + )) + val expectedRows = Seq( + Row("{\"Letter\":\"A\",\"Number\":1}"), + Row("{\"Letter\":\"B\",\"Number\":2}"), + Row("{\"Letter\":\"C\",\"Number\":3}") + ) + val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) + + // Compare the result + val result = SQLJob.getJson(inputDF) + assertEqualDataframe(expected, result) + } + + test("Test getSchema method") { + // Define expected dataframe + val expectedSchema = StructType(Seq( + StructField("column_name", StringType, nullable = false), + StructField("data_type", StringType, nullable = false) + )) + val expectedRows = Seq( + Row("Letter","string"), + Row("Number", "integer") + ) + val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) + + // Compare the result + val result = SQLJob.getSchema(spark, inputDF) + assertEqualDataframe(expected, result) + } + + test("Test getList method") { + val name = "list" + // Define expected dataframe + val expectedSchema = StructType(Seq( + StructField(name, ArrayType(StringType, containsNull = true), nullable = true) + )) + val expectedRows = Seq( + Row(Seq("{\"Letter\":\"A\",\"Number\":1}", + "{\"Letter\":\"B\",\"Number\":2}", + "{\"Letter\":\"C\",\"Number\":3}")) + ) + val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) + + // Compare the result + val result = SQLJob.getList(spark, SQLJob.getJson(inputDF), name) + assertEqualDataframe(expected, result) + } + + def assertEqualDataframe(expected: DataFrame, result: DataFrame): Unit ={ + assert(expected.schema === result.schema) + assert(expected.collect() === result.collect()) + } +} From 1883b256ac7377c097816a5207a6c5fa600d45d0 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 12 Jun 2023 13:30:07 -0700 Subject: [PATCH 07/11] Simplify code using toJSON.collect.toList Signed-off-by: Rupal Mahajan --- spark-sql-application/project/plugins.sbt | 1 - spark-sql-application/scalastyle-config.xml | 19 ++----- .../scala/org/opensearch/sql/SQLJob.scala | 38 ++++++------- .../scala/org/opensearch/sql/SQLJobTest.scala | 53 ++++--------------- 4 files changed, 35 insertions(+), 76 deletions(-) diff --git a/spark-sql-application/project/plugins.sbt b/spark-sql-application/project/plugins.sbt index f587336319..4d14ba6c10 100644 --- a/spark-sql-application/project/plugins.sbt +++ b/spark-sql-application/project/plugins.sbt @@ -3,5 +3,4 @@ * SPDX-License-Identifier: Apache-2.0 */ - addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") \ No newline at end of file diff --git a/spark-sql-application/scalastyle-config.xml b/spark-sql-application/scalastyle-config.xml index 7e3596f126..37b1978cd7 100644 --- a/spark-sql-application/scalastyle-config.xml +++ b/spark-sql-application/scalastyle-config.xml @@ -8,21 +8,10 @@ - + diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index 2f4ff56c50..9bd21d05af 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -7,7 +7,6 @@ package org.opensearch.sql import org.apache.spark.sql.{DataFrame, SparkSession, Row} import org.apache.spark.sql.types._ -import org.apache.spark.sql.functions._ object SQLJob { def main(args: Array[String]) { @@ -27,9 +26,8 @@ object SQLJob { // Execute SQL query val result: DataFrame = spark.sql(query) - val resultJson = getJson(result) - val schemaJson = getJson(getSchema(spark, result)) - val data = getList(spark, resultJson, "result").join(getList(spark, schemaJson, "schema")).withColumn("stepId", lit(sys.env.getOrElse("EMR_STEP_ID", ""))) + // Get Data + val data = getData(result, spark) // Write data to OpenSearch index val aos = Map( @@ -51,25 +49,29 @@ object SQLJob { } } - def getJson(df: DataFrame): DataFrame = { - df.select(to_json(struct(df.columns.map(col): _*))).toDF("json") - } - - def getSchema(spark: SparkSession, result: DataFrame): DataFrame = { - val resultschema = result.schema - val resultschemaRows = resultschema.fields.map { field => + def getData(result: DataFrame, spark: SparkSession): DataFrame = { + // Create the schema dataframe + val schemaRows = result.schema.fields.map { field => Row(field.name, field.dataType.typeName) } - spark.createDataFrame(spark.sparkContext.parallelize(resultschemaRows), StructType(Seq( + val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq( StructField("column_name", StringType, nullable = false), StructField("data_type", StringType, nullable = false) ))) - } - def getList(spark: SparkSession, resultJson: DataFrame, name:String): DataFrame = { - val list = resultJson.agg(collect_list("json").as(name)).head().getSeq[String](0) - val schema = StructType(Seq(StructField(name, ArrayType(StringType)))) - val rdd = spark.sparkContext.parallelize(Seq(Row(list))) - spark.createDataFrame(rdd, schema) + // Define the data schema + val schema = StructType(Seq( + StructField("result", ArrayType(StringType, containsNull = true), nullable = true), + StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), + StructField("stepId", StringType, nullable = true) + )) + + // Create the data rows + val rows = Seq( + (result.toJSON.collect.toList, resultSchema.toJSON.collect.toList, sys.env.getOrElse("EMR_STEP_ID", "")) + ) + + // Create the DataFrame for data + spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) } } diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala index 576f255a19..e11606d5c9 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -24,57 +24,26 @@ class SQLJobTest extends AnyFunSuite{ Row("B", 2), Row("C", 3) ) - val inputDF: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) + val input: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) - test("Test getJson method") { + test("Test getData method") { // Define expected dataframe val expectedSchema = StructType(Seq( - StructField("json", StringType, nullable = true) + StructField("result", ArrayType(StringType, containsNull = true), nullable = true), + StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), + StructField("stepId", StringType, nullable = true) )) val expectedRows = Seq( - Row("{\"Letter\":\"A\",\"Number\":1}"), - Row("{\"Letter\":\"B\",\"Number\":2}"), - Row("{\"Letter\":\"C\",\"Number\":3}") + Row( + Array("{\"Letter\":\"A\",\"Number\":1}","{\"Letter\":\"B\",\"Number\":2}", "{\"Letter\":\"C\",\"Number\":3}"), + Array("{\"column_name\":\"Letter\",\"data_type\":\"string\"}", "{\"column_name\":\"Number\",\"data_type\":\"integer\"}"), + "" + ) ) val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) // Compare the result - val result = SQLJob.getJson(inputDF) - assertEqualDataframe(expected, result) - } - - test("Test getSchema method") { - // Define expected dataframe - val expectedSchema = StructType(Seq( - StructField("column_name", StringType, nullable = false), - StructField("data_type", StringType, nullable = false) - )) - val expectedRows = Seq( - Row("Letter","string"), - Row("Number", "integer") - ) - val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) - - // Compare the result - val result = SQLJob.getSchema(spark, inputDF) - assertEqualDataframe(expected, result) - } - - test("Test getList method") { - val name = "list" - // Define expected dataframe - val expectedSchema = StructType(Seq( - StructField(name, ArrayType(StringType, containsNull = true), nullable = true) - )) - val expectedRows = Seq( - Row(Seq("{\"Letter\":\"A\",\"Number\":1}", - "{\"Letter\":\"B\",\"Number\":2}", - "{\"Letter\":\"C\",\"Number\":3}")) - ) - val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) - - // Compare the result - val result = SQLJob.getList(spark, SQLJob.getJson(inputDF), name) + val result = SQLJob.getData(input, spark) assertEqualDataframe(expected, result) } From 32610280791d7953fc635c7adca2af4a90cdcda3 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 12 Jun 2023 14:54:47 -0700 Subject: [PATCH 08/11] Add example in readme Signed-off-by: Rupal Mahajan --- spark-sql-application/README.md | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index e473701e19..1cc8fee837 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -31,6 +31,41 @@ To use this application, you can run Spark with Flint extension: \ ``` +## Result Specifications + +Following example shows how the result is written to OpenSearch index after query execution. + +Let's assume sql query result is +``` ++------+------+ +|Letter|Number| ++------+------+ +|A |1 | +|B |2 | +|C |3 | ++------+------+ +``` +OpenSearch index document will look like +```json +{ + "_index" : ".query_execution_result", + "_id" : "A2WOsYgBMUoqCqlDJHrn", + "_score" : 1.0, + "_source" : { + "result" : [ + """{"Letter":"A","Number":1}""", + """{"Letter":"B","Number":2}""", + """{"Letter":"C","Number":3}""" + ], + "schema" : [ + """{"column_name":"Letter","data_type":"string"}""", + """{"column_name":"Number","data_type":"integer"}""" + ], + "stepId" : "s-JZSB1139WIVU" + } +} +``` + ## Build To build and run this application with Spark, you can run: From f8157099eed03c13debab67eebddb2a51258972c Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Mon, 12 Jun 2023 15:38:56 -0700 Subject: [PATCH 09/11] Fix triple quotes issue Signed-off-by: Rupal Mahajan --- spark-sql-application/README.md | 10 +++++----- .../src/main/scala/org/opensearch/sql/SQLJob.scala | 13 ++++++------- .../test/scala/org/opensearch/sql/SQLJobTest.scala | 4 ++-- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index 1cc8fee837..b0505282ab 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -53,13 +53,13 @@ OpenSearch index document will look like "_score" : 1.0, "_source" : { "result" : [ - """{"Letter":"A","Number":1}""", - """{"Letter":"B","Number":2}""", - """{"Letter":"C","Number":3}""" + "{'Letter':'A','Number':1}", + "{'Letter':'B','Number':2}", + "{'Letter':'C','Number':3}" ], "schema" : [ - """{"column_name":"Letter","data_type":"string"}""", - """{"column_name":"Number","data_type":"integer"}""" + "{'column_name':'Letter','data_type':'string'}", + "{'column_name':'Number','data_type':'integer'}" ], "stepId" : "s-JZSB1139WIVU" } diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index 9bd21d05af..3c667a25a2 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -56,20 +56,19 @@ object SQLJob { } val resultSchema = spark.createDataFrame(spark.sparkContext.parallelize(schemaRows), StructType(Seq( StructField("column_name", StringType, nullable = false), - StructField("data_type", StringType, nullable = false) - ))) + StructField("data_type", StringType, nullable = false)))) // Define the data schema val schema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true) - )) + StructField("stepId", StringType, nullable = true))) // Create the data rows - val rows = Seq( - (result.toJSON.collect.toList, resultSchema.toJSON.collect.toList, sys.env.getOrElse("EMR_STEP_ID", "")) - ) + val rows = Seq(( + result.toJSON.collect.toList.map(_.replaceAll("\"", "'")), + resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")), + sys.env.getOrElse("EMR_STEP_ID", ""))) // Create the DataFrame for data spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala index e11606d5c9..8ff4b720e0 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -35,8 +35,8 @@ class SQLJobTest extends AnyFunSuite{ )) val expectedRows = Seq( Row( - Array("{\"Letter\":\"A\",\"Number\":1}","{\"Letter\":\"B\",\"Number\":2}", "{\"Letter\":\"C\",\"Number\":3}"), - Array("{\"column_name\":\"Letter\",\"data_type\":\"string\"}", "{\"column_name\":\"Number\",\"data_type\":\"integer\"}"), + Array("{'Letter':'A','Number':1}","{'Letter':'B','Number':2}", "{'Letter':'C','Number':3}"), + Array("{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}"), "" ) ) From ec08d0b1ebc8b3a280f6c9f83db3dd82989db8c5 Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Wed, 14 Jun 2023 09:32:55 -0700 Subject: [PATCH 10/11] Update method name and description Signed-off-by: Rupal Mahajan --- .../scala/org/opensearch/sql/SQLJob.scala | 26 +++++++++++++++++-- .../scala/org/opensearch/sql/SQLJobTest.scala | 4 +-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index 3c667a25a2..f2dd0c869c 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -8,6 +8,18 @@ package org.opensearch.sql import org.apache.spark.sql.{DataFrame, SparkSession, Row} import org.apache.spark.sql.types._ +/** + * Spark SQL Application entrypoint + * + * @param args(0) + * sql query + * @param args(1) + * opensearch index name + * @param args(2-6) + * opensearch connection values required for flint-integration jar. host, port, scheme, auth, region respectively. + * @return + * write sql query result to given opensearch index + */ object SQLJob { def main(args: Array[String]) { // Get the SQL query and Opensearch Config from the command line arguments @@ -27,7 +39,7 @@ object SQLJob { val result: DataFrame = spark.sql(query) // Get Data - val data = getData(result, spark) + val data = getFormattedData(result, spark) // Write data to OpenSearch index val aos = Map( @@ -49,7 +61,17 @@ object SQLJob { } } - def getData(result: DataFrame, spark: SparkSession): DataFrame = { + /** + * Create a new formatted dataframe with json result, json schema and EMR_STEP_ID. + * + * @param result + * sql query result dataframe + * @param spark + * spark session + * @return + * dataframe with result, schema and emr step id + */ + def getFormattedData(result: DataFrame, spark: SparkSession): DataFrame = { // Create the schema dataframe val schemaRows = result.schema.fields.map { field => Row(field.name, field.dataType.typeName) diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala index 8ff4b720e0..2cdb06d6ca 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -26,7 +26,7 @@ class SQLJobTest extends AnyFunSuite{ ) val input: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) - test("Test getData method") { + test("Test getFormattedData method") { // Define expected dataframe val expectedSchema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), @@ -43,7 +43,7 @@ class SQLJobTest extends AnyFunSuite{ val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema) // Compare the result - val result = SQLJob.getData(input, spark) + val result = SQLJob.getFormattedData(input, spark) assertEqualDataframe(expected, result) } From 959d5b2d899317bb8007f767ef355cd9637c9d3b Mon Sep 17 00:00:00 2001 From: Rupal Mahajan Date: Sun, 25 Jun 2023 14:58:27 -0700 Subject: [PATCH 11/11] Add applicationId Signed-off-by: Rupal Mahajan --- spark-sql-application/README.md | 4 +++- .../src/main/scala/org/opensearch/sql/SQLJob.scala | 6 ++++-- .../src/test/scala/org/opensearch/sql/SQLJobTest.scala | 6 ++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index b0505282ab..6422f294cd 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -3,6 +3,7 @@ This application execute sql query and store the result in OpenSearch index in following format ``` "stepId":"", +"applicationId":"" "schema": "json blob", "result": "json blob" ``` @@ -61,7 +62,8 @@ OpenSearch index document will look like "{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}" ], - "stepId" : "s-JZSB1139WIVU" + "stepId" : "s-JZSB1139WIVU", + "applicationId" : "application_1687726870985_0003" } } ``` diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index f2dd0c869c..04fa92b25b 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -84,13 +84,15 @@ object SQLJob { val schema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true))) + StructField("stepId", StringType, nullable = true), + StructField("applicationId", StringType, nullable = true))) // Create the data rows val rows = Seq(( result.toJSON.collect.toList.map(_.replaceAll("\"", "'")), resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")), - sys.env.getOrElse("EMR_STEP_ID", ""))) + sys.env.getOrElse("EMR_STEP_ID", "unknown"), + spark.sparkContext.applicationId)) // Create the DataFrame for data spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala index 2cdb06d6ca..7ec4e45450 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -31,13 +31,15 @@ class SQLJobTest extends AnyFunSuite{ val expectedSchema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true) + StructField("stepId", StringType, nullable = true), + StructField("applicationId", StringType, nullable = true) )) val expectedRows = Seq( Row( Array("{'Letter':'A','Number':1}","{'Letter':'B','Number':2}", "{'Letter':'C','Number':3}"), Array("{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}"), - "" + "unknown", + spark.sparkContext.applicationId ) ) val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema)