diff --git a/rapids-4-spark-tools/README.md b/rapids-4-spark-tools/README.md index 5b835c01cb1..56eaeb2cc48 100644 --- a/rapids-4-spark-tools/README.md +++ b/rapids-4-spark-tools/README.md @@ -25,30 +25,30 @@ org.apache.spark.sql.rapids.tool.profiling.ProfileMain.main(Array("/path/to/even ## How to compile and use from command-line 1. `mvn clean package` -2. `cd $SPARK_HOME (Download Apache Spark if reequired)` -3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar /path/to/eventlog1` +2. `cd $SPARK_HOME (Download Apache Spark if required)` +3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar /path/to/eventlog1` ## Options ``` -$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar --help +$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar --help Spark event log profiling tool Example: # Input 1 or more event logs from local path: -./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar /path/to/eventlog1 /path/to/eventlog2 +./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar /path/to/eventlog1 /path/to/eventlog2 # If event log is from S3: export AWS_ACCESS_KEY_ID=xxx export AWS_SECRET_ACCESS_KEY=xxx -./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar s3a:///eventlog1 /path/to/eventlog2 +./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar s3a:///eventlog1 /path/to/eventlog2 # If event log is from hdfs: -./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar hdfs:///eventlog1 /path/to/eventlog2 +./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar hdfs:///eventlog1 /path/to/eventlog2 # Change output directory to /tmp. It outputs as "rapids_4_spark_tools_output.log" in the local directory if the output directory is not specified. -./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /workload_profiling/target/rapids-4-spark-tools-.jar -o /tmp /path/to/eventlog1 +./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar -o /tmp /path/to/eventlog1 For usage see below: -o, --output-directory Output directory. Default is current directory diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala new file mode 100644 index 00000000000..7bad3f9081a --- /dev/null +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala @@ -0,0 +1,124 @@ + + +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.rapids.tool.profiling._ + +/** + * Does analysis on the DataFrames + * from object of ApplicationInfo + */ +class Analysis(apps: ArrayBuffer[ApplicationInfo]) { + + require(apps.nonEmpty) + private val fileWriter = apps.head.fileWriter + + // Job Level TaskMetrics Aggregation + def jobMetricsAggregation(): Unit = { + if (apps.size == 1) { + fileWriter.write("Job level aggregated task metrics:") + apps.head.runQuery(apps.head.jobMetricsAggregationSQL + " order by Duration desc") + } else { + var query = "" + for (app <- apps) { + if (query.isEmpty) { + query += app.jobMetricsAggregationSQL + } else { + query += " union " + app.jobMetricsAggregationSQL + } + } + fileWriter.write("Job level aggregated task metrics:") + apps.head.runQuery(query + " order by appIndex, Duration desc") + } + } + + // Stage Level TaskMetrics Aggregation + def stageMetricsAggregation(): Unit = { + if (apps.size == 1) { + fileWriter.write("Stage level aggregated task metrics:") + apps.head.runQuery(apps.head.stageMetricsAggregationSQL + " order by Duration desc") + } else { + var query = "" + for (app <- apps) { + if (query.isEmpty) { + query += app.stageMetricsAggregationSQL + } else { + query += " union " + app.stageMetricsAggregationSQL + } + } + fileWriter.write("Stage level aggregated task metrics:") + apps.head.runQuery(query + " order by appIndex, Duration desc") + } + } + + // Job + Stage Level TaskMetrics Aggregation + def jobAndStageMetricsAggregation(): Unit = { + if (apps.size == 1) { + val messageHeader = "Job + Stage level aggregated task metrics:" + apps.head.runQuery(apps.head.jobAndStageMetricsAggregationSQL + " order by Duration desc") + } else { + var query = "" + for (app <- apps) { + if (query.isEmpty) { + query += app.jobAndStageMetricsAggregationSQL + } else { + query += " union " + app.jobAndStageMetricsAggregationSQL + } + } + fileWriter.write("Job + Stage level aggregated task metrics:") + apps.head.runQuery(query + " order by appIndex, Duration desc") + } + } + + // SQL Level TaskMetrics Aggregation(Only when SQL exists) + def sqlMetricsAggregation(): DataFrame = { + if (apps.size == 1) { + if (apps.head.allDataFrames.contains(s"sqlDF_${apps.head.index}")) { + val messageHeader = "SQL level aggregated task metrics:" + apps.head.runQuery(apps.head.sqlMetricsAggregationSQL + " order by Duration desc") + } else { + apps.head.sparkSession.emptyDataFrame + } + } else { + var query = "" + val appsWithSQL = apps.filter(p => p.allDataFrames.contains(s"sqlDF_${p.index}")) + for (app <- appsWithSQL) { + if (query.isEmpty) { + query += app.sqlMetricsAggregationSQL + } else { + query += " union " + app.sqlMetricsAggregationSQL + } + } + val messageHeader = "SQL level aggregated task metrics:" + apps.head.runQuery(query + " order by appIndex, Duration desc") + } + } + + // custom query execution. Normally for debugging use. + def customQueryExecution(app: ApplicationInfo): Unit = { + fileWriter.write("Custom query execution:") + val customQuery = + s"""select stageId from stageDF_${app.index} limit 1 + |""".stripMargin + app.runQuery(customQuery) + } +} \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ClassWarehouse.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ClassWarehouse.scala index 3580587df1c..6d046f71b1b 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ClassWarehouse.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ClassWarehouse.scala @@ -44,4 +44,115 @@ case class ExecutorCase( executorID: String, host: String, totalCores: Int, resourceProfileId: Int) case class ExecutorRemovedCase( - executorID: String, reason: String, time: Long) \ No newline at end of file + executorID: String, + reason: String, + time: Long) + +case class SQLExecutionCase( + sqlID: Long, + description: String, + details: String, + startTime: Long, + endTime: Option[Long], + duration: Option[Long], + durationStr: String) + +case class SQLPlanMetricsCase( + sqlID: Long, + name: String, + accumulatorId: Long, + metricType: String) + +case class PlanNodeAccumCase( + sqlID: Long, + nodeID: Long, + nodeName:String, + nodeDesc: String, + accumulatorId: Long) + +case class DriverAccumCase( + sqlID: Long, + accumulatorId: Long, + value: Long) + +case class TaskStageAccumCase( + stageId: Int, + attemptId: Int, + taskId: Option[Long], + accumulatorId: Long, + name: Option[String], + value: Option[Long], + isInternal: Boolean) + +case class JobCase( + jobID: Int, + stageIds: Seq[Int], + sqlID: Option[Long], + properties: scala.collection.Map[String, String], + startTime: Long, + endTime: Option[Long], + jobResult: Option[String], + failedReason: String, + duration: Option[Long], + durationStr: String, + gpuMode: Boolean) + +case class StageCase( + stageId: Int, + attemptId: Int, + name: String, + numTasks: Int, + numRDD: Int, + parentIds: Seq[Int], + details: String, + properties: scala.collection.Map[String, String], + submissionTime: Option[Long], + completionTime: Option[Long], + failureReason: Option[String], + duration: Option[Long], + durationStr: String, gpuMode: Boolean) + +// Note: sr = Shuffle Read; sw = Shuffle Write +// Totally 39 columns +case class TaskCase( + stageId: Int, + stageAttemptId: Int, + taskType: String, + endReason: String, + taskId: Long, + attempt: Int, + launchTime: Long, + finishTime: Long, + duration: Long, + successful: Boolean, + executorId: String, + host: String, + taskLocality: String, + speculative: Boolean, + gettingResultTime: Long, + executorDeserializeTime: Long, + executorDeserializeCPUTime: Long, + executorRunTime: Long, + executorCPUTime: Long, + peakExecutionMemory: Long, + resultSize: Long, + jvmGCTime: Long, + resultSerializationTime: Long, + memoryBytesSpilled: Long, + diskBytesSpilled: Long, + sr_remoteBlocksFetched: Long, + sr_localBlocksFetched: Long, + sr_fetchWaitTime: Long, + sr_remoteBytesRead: Long, + sr_remoteBytesReadToDisk: Long, + sr_localBytesRead: Long, + sr_totalBytesRead: Long, + sw_bytesWritten: Long, + sw_writeTime: Long, + sw_recordsWritten: Long, + input_bytesRead: Long, + input_recordsRead: Long, + output_bytesWritten: Long, + output_recordsWritten: Long) + +case class ProblematicSQLCase(sqlID: Long) \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 32909b20824..c6742b11d69 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -30,9 +30,9 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) { // Print Application Information def printAppInfo(): Unit = { - fileWriter.write("Application Information:\n") + val messageHeader = "Application Information:\n" for (app <- apps) { - app.runQuery(app.generateAppInfo) + app.runQuery(query = app.generateAppInfo, writeToFile = true, messageHeader = messageHeader) } } @@ -57,17 +57,19 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) { // Print executor related information def printExecutorInfo(): Unit = { - fileWriter.write("\n\nExecutor Information:\n") + val messageHeader = "\n\nExecutor Information:\n" for (app <- apps) { - app.runQuery(app.generateExecutorInfo + " order by cast(executorID as long)") + app.runQuery(query = app.generateExecutorInfo + " order by cast(executorID as long)", + writeToFile = true, messageHeader = messageHeader) } } // Print Rapids related Spark Properties def printRapidsProperties(): Unit = { - fileWriter.write("\n\nSpark Rapids parameters set explicitly:\n") + val messageHeader = "\n\nSpark Rapids parameters set explicitly:\n" for (app <- apps) { - app.runQuery(app.generateRapidsProperties + " order by key") + app.runQuery(query = app.generateRapidsProperties + " order by key", writeToFile = true, + messageHeader = messageHeader) } } } \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CompareApplications.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CompareApplications.scala index e14b2a095f9..4f3d2ca00a6 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CompareApplications.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CompareApplications.scala @@ -30,7 +30,7 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging { // Compare the App Information. def compareAppInfo(): Unit = { - fileWriter.append("Compare Application Information:\n") + val messageHeader = "Compare Application Information:\n" var query = "" var i = 1 for (app <- apps) { @@ -42,12 +42,12 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging { } i += 1 } - apps.head.runQuery(query) + apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader) } // Compare Executors information def compareExecutorInfo(): Unit = { - fileWriter.write("\n\nCompare Executor Information:\n") + val messageHeader = "\n\nCompare Executor Information:\n" var query = "" var i = 1 for (app <- apps) { @@ -59,12 +59,12 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging { } i += 1 } - apps.head.runQuery(query) + apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader) } // Compare Rapids Properties which are set explicitly def compareRapidsProperties(): Unit ={ - fileWriter.write("\n\nCompare Rapids Properties which are set explicitly:\n") + val messageHeader = "\n\nCompare Rapids Properties which are set explicitly:\n" var withClauseAllKeys = "with allKeys as \n (" val selectKeyPart = "select allKeys.key" var selectValuePart = "" @@ -97,6 +97,6 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging { query = withClauseAllKeys + selectKeyPart + selectValuePart + " from (\n" + query + "\n) order by key" logDebug("Running query " + query) - apps.head.runQuery(query) + apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader) } } \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala index c4df78d42af..00adbaae687 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala @@ -26,19 +26,19 @@ Example: # Input 1 or more event logs from local path: ./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain -/workload_profiling/target/rapids-4-spark-tools-.jar +/rapids-4-spark-tools/target/rapids-4-spark-tools-.jar /path/to/eventlog1 /path/to/eventlog2 # If any event log is from S3: export AWS_ACCESS_KEY_ID=xxx export AWS_SECRET_ACCESS_KEY=xxx ./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain -/workload_profiling/target/rapids-4-spark-tools-.jar +/rapids-4-spark-tools/target/rapids-4-spark-tools-.jar s3a:///eventlog1 /path/to/eventlog2 # Change output directory to /tmp ./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain - /workload_profiling/target/rapids-4-spark-tools-.jar + /rapids-4-spark-tools/target/rapids-4-spark-tools-.jar -o /tmp /path/to/eventlog1 For usage see below: @@ -52,6 +52,12 @@ For usage see below: trailArg[List[String]](required = true, descr = "Event log filenames(space separated). " + "eg: s3a:///eventlog1 /path/to/eventlog2") + val compare: ScallopOption[Boolean] = + opt[Boolean](required = false, + descr = "Compare Applications (Recommended to compare less than 10 applications)." + + " Default is false") + val numOutputRows: ScallopOption[Int] = + opt[Int](required = false, + descr = "Number of output rows for each Application. Default is 1000") verify() -} - +} \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala index 4a554d0e032..65a74ba76e5 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileMain.scala @@ -20,8 +20,10 @@ import java.io.FileWriter import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.tool.profiling.{_} +import org.apache.spark.sql.rapids.tool.profiling._ /** * A profiling tool to parse Spark Event Log @@ -43,38 +45,93 @@ object ProfileMain extends Logging { val sparkSession = ProfileUtils.createSparkSession logInfo(s"Output directory: $outputDirectory") - // Create an Array of Applications(with an index starting from 1) - val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() - var index: Int = 1 - for (path <- eventlogPaths) { - apps += new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index) - index += 1 + // Convert the input path string to Path(s) + val allPaths: ArrayBuffer[Path] = ArrayBuffer[Path]() + for (pathString <- eventlogPaths) { + val paths = ProfileUtils.stringToPath(pathString) + if (paths.nonEmpty) { + allPaths ++= paths + } } - require(apps.nonEmpty) - - // If only 1 Application, collect: - // A. Information Collected - val collect = new CollectInformation(apps) - if (apps.size == 1) { - logInfo(s"### A. Information Collected ###") - collect.printAppInfo() - collect.printExecutorInfo() - collect.printRapidsProperties() + + // If compare mode is on, we need lots of memory to cache all applications then compare. + // Suggest only enable compare mode if there is no more than 10 applications as input. + if (appArgs.compare()) { + // Create an Array of Applications(with an index starting from 1) + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + var index: Int = 1 + for (path <- allPaths.filter(p => !p.getName.contains("."))) { + apps += new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index) + index += 1 + } + + //Exit if there are no applications to process. + if (apps.isEmpty) { + logInfo("No application to process. Exiting") + System.exit(0) + } + processApps(apps) + // Show the application Id <-> appIndex mapping. + for (app <- apps) { + logApplicationInfo(app) + } } else { - val compare = new CompareApplications(apps) - // Compare Applications - logInfo(s"### A. Compare Information Collected ###") - compare.compareAppInfo() - compare.compareExecutorInfo() - compare.compareRapidsProperties() + // This mode is to process one application at one time. + var index: Int = 1 + for (path <- allPaths.filter(p => !p.getName.contains("."))) { + // This apps only contains 1 app in each loop. + val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() + val app = new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index) + apps += app + logApplicationInfo(app) + processApps(apps) + app.dropAllTempViews() + index += 1 + } } - for (app <- apps) { + logInfo(s"Output log location: $outputDirectory/$logFileName") + + fileWriter.flush() + fileWriter.close() + + /** + * Function to process ApplicationInfo. If it is in compare mode, then all the eventlogs are + * evaluated at once and the output is one row per application. Else each eventlog is parsed one + * at a time. + */ + def processApps(apps: ArrayBuffer[ApplicationInfo]): Unit = { + if (appArgs.compare()) { // Compare Applications + logInfo(s"### A. Compare Information Collected ###") + val compare = new CompareApplications(apps) + compare.compareAppInfo() + compare.compareExecutorInfo() + compare.compareRapidsProperties() + } else { + val collect = new CollectInformation(apps) + logInfo(s"### A. Information Collected ###") + collect.printAppInfo() + collect.printExecutorInfo() + collect.printRapidsProperties() + } + + logInfo(s"### B. Analysis ###") + val analysis = new Analysis(apps) + analysis.jobAndStageMetricsAggregation() + val sqlAggMetricsDF = analysis.sqlMetricsAggregation() + + if (!sqlAggMetricsDF.isEmpty) { + fileWriter.write(s"### C. Qualification ###\n") + new Qualification(apps, sqlAggMetricsDF) + } else { + logInfo(s"Skip qualification part because no sqlAggMetrics DataFrame is detected.") + } + } + + def logApplicationInfo(app: ApplicationInfo) = { logInfo("========================================================================") logInfo(s"============== ${app.appId} (index=${app.index}) ==============") logInfo("========================================================================") } - fileWriter.flush() - fileWriter.close() } -} +} \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala index d569b928b38..96b35d0b9fe 100644 --- a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileUtils.scala @@ -16,6 +16,11 @@ package com.nvidia.spark.rapids.tool.profiling +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.rapids.tool.profiling.ToolUtils @@ -63,4 +68,13 @@ object ProfileUtils { try Some(a.get - b) catch { case _: NoSuchElementException => None } -} + + // Return an Array(Path) based on input path string + def stringToPath(pathString: String): ArrayBuffer[Path] = { + val inputPath = new Path(pathString) + val uri = inputPath.toUri + val fs = FileSystem.get(uri, new Configuration()) + val allStatus = fs.listStatus(inputPath).filter(s => s.isFile) + ArrayBuffer(FileUtil.stat2Paths(allStatus): _*) + } +} \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Qualification.scala b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Qualification.scala new file mode 100644 index 00000000000..c1461a52a63 --- /dev/null +++ b/rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Qualification.scala @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.nvidia.spark.rapids.tool.profiling + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.rapids.tool.profiling._ + +/** + * Qualifies or disqualifies an application for GPU acceleration. + */ +class Qualification( + apps: ArrayBuffer[ApplicationInfo], + sqlAggMetricsDF: DataFrame) extends Logging { + + require(apps.nonEmpty) + require(!sqlAggMetricsDF.isEmpty) + private val fileWriter = apps.head.fileWriter + + // Register sqlAggMetricsDF as a temp view + sqlAggMetricsDF.createOrReplaceTempView("sqlAggMetricsDF") + + // Qualify each App + for (app <- apps) { + qualifyApp(app) + } + + // Function to qualify an application. Below criteria is used to decide if the application can + // be qualified. + // 1. If the application doesn't contain SQL, then it is disqualified. + // 2. If the application has SQL, below 2 conditions have to be met to mark it as qualified: + // a. SQL duration is greater than 30 seconds. + // b. executorCPUTime_sum/executorRunTime_sum > 30 ( atleast 30%) + def qualifyApp(app: ApplicationInfo): Boolean = { + + // If this application does not have SQL + if (!app.allDataFrames.contains(s"sqlDF_${app.index}")) { + logInfo(s"${app.appId} (index=${app.index}) is disqualified because no SQL is inside.") + fileWriter.write(s"${app.appId} (index=${app.index}) is " + + s"disqualified because no SQL is inside.\n") + return false + } + + val df = app.queryToDF(app.qualificationSQL) + if (df.isEmpty) { + logInfo(s"${app.appId} (index=${app.index}) is disqualified because no SQL is qualified.") + fileWriter.write(s"${app.appId} (index=${app.index}) is " + + s"disqualified because no SQL is qualified.\n") + false + } else { + fileWriter.write(s"${app.appId} (index=${app.index}) " + + s"is qualified with below qualified SQL(s):\n") + fileWriter.write("\n" + ToolUtils.showString(df, app.args.numOutputRows.getOrElse(1000))) + true + } + } +} diff --git a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index e859ec84a0d..62721190654 100644 --- a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.rapids.tool.profiling import java.io.FileWriter -import java.net.URI import com.nvidia.spark.rapids.tool.profiling._ import org.apache.hadoop.conf.Configuration @@ -31,6 +30,8 @@ import org.apache.spark.deploy.history.EventLogFileReader import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphNode} import org.apache.spark.ui.UIUtils import org.apache.spark.util._ @@ -42,7 +43,7 @@ class ApplicationInfo( val args: ProfileArgs, val sparkSession: SparkSession, val fileWriter: FileWriter, - val eventlog: String, + val eventlog: Path, val index: Int) extends Logging { // From SparkListenerLogStart @@ -57,6 +58,10 @@ class ApplicationInfo( // 4. executorsDF (Must exist, otherwise fail!) // 5. propertiesDF (Must exist, otherwise fail!) // 6. blockManagersRemoved (Optional) + // 7. sqlDF (Could be missing) + // 8. jobDF (Must exist, otherwise fail!) + // 9. stageDF (Must exist, otherwise fail!) + // 10. taskDF (Must exist, otherwise fail!) val allDataFrames: HashMap[String, DataFrame] = HashMap.empty[String, DataFrame] // From SparkListenerResourceProfileAdded @@ -86,16 +91,97 @@ class ApplicationInfo( var executors: ArrayBuffer[ExecutorCase] = ArrayBuffer[ExecutorCase]() var executorsRemoved: ArrayBuffer[ExecutorRemovedCase] = ArrayBuffer[ExecutorRemovedCase]() + // From SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd + var sqlStart: ArrayBuffer[SQLExecutionCase] = ArrayBuffer[SQLExecutionCase]() + val sqlEndTime: HashMap[Long, Long] = HashMap.empty[Long, Long] + + // From SparkListenerSQLExecutionStart and SparkListenerSQLAdaptiveExecutionUpdate + // sqlPlan stores HashMap (sqlID <-> SparkPlanInfo) + var sqlPlan: HashMap[Long, SparkPlanInfo] = HashMap.empty[Long, SparkPlanInfo] + // physicalPlanDescription stores HashMap (sqlID <-> physicalPlanDescription) + var physicalPlanDescription: HashMap[Long, String] = HashMap.empty[Long, String] + + // From SparkListenerSQLExecutionStart and SparkListenerSQLAdaptiveExecutionUpdate + var sqlPlanMetrics: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]() + var planNodeAccum: ArrayBuffer[PlanNodeAccumCase] = ArrayBuffer[PlanNodeAccumCase]() + // From SparkListenerSQLAdaptiveSQLMetricUpdates + var sqlPlanMetricsAdaptive: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]() + + // From SparkListenerDriverAccumUpdates + var driverAccum: ArrayBuffer[DriverAccumCase] = ArrayBuffer[DriverAccumCase]() + // From SparkListenerTaskEnd and SparkListenerTaskEnd + var taskStageAccum: ArrayBuffer[TaskStageAccumCase] = ArrayBuffer[TaskStageAccumCase]() + + // From SparkListenerJobStart and SparkListenerJobEnd + // JobStart contains mapping relationship for JobID -> StageID(s) + var jobStart: ArrayBuffer[JobCase] = ArrayBuffer[JobCase]() + val jobEndTime: HashMap[Int, Long] = HashMap.empty[Int, Long] + val jobEndResult: HashMap[Int, String] = HashMap.empty[Int, String] + val jobFailedReason: HashMap[Int, String] = HashMap.empty[Int, String] + + // From SparkListenerStageSubmitted and SparkListenerStageCompleted + // stageSubmitted contains mapping relationship for Stage -> RDD(s) + var stageSubmitted: ArrayBuffer[StageCase] = ArrayBuffer[StageCase]() + val stageCompletionTime: HashMap[Int, Option[Long]] = HashMap.empty[Int, Option[Long]] + val stageFailureReason: HashMap[Int, Option[String]] = HashMap.empty[Int, Option[String]] + + // From SparkListenerTaskStart & SparkListenerTaskEnd + // taskEnd contains task level metrics + var taskStart: ArrayBuffer[SparkListenerTaskStart] = ArrayBuffer[SparkListenerTaskStart]() + var taskEnd: ArrayBuffer[TaskCase] = ArrayBuffer[TaskCase]() + + // From SparkListenerTaskGettingResult + var taskGettingResult: ArrayBuffer[SparkListenerTaskGettingResult] = + ArrayBuffer[SparkListenerTaskGettingResult]() + // From all other events var otherEvents: ArrayBuffer[SparkListenerEvent] = ArrayBuffer[SparkListenerEvent]() // Generated warnings by predefined checks for this Application var warnings: ArrayBuffer[String] = ArrayBuffer[String]() + // All the metrics column names in Task Metrics with the aggregation type + val taskMetricsColumns: scala.collection.mutable.SortedMap[String, String] + = scala.collection.mutable.SortedMap( + "duration" -> "all", + "gettingResultTime" -> "sum", + "executorDeserializeTime" -> "sum", + "executorDeserializeCPUTime" -> "sum", + "executorRunTime" -> "sum", + "executorCPUTime" -> "sum", + "peakExecutionMemory" -> "max", + "resultSize" -> "max", + "jvmGCTime" -> "sum", + "resultSerializationTime" -> "sum", + "memoryBytesSpilled" -> "sum", + "diskBytesSpilled" -> "sum", + "sr_remoteBlocksFetched" -> "sum", + "sr_localBlocksFetched" -> "sum", + "sr_fetchWaitTime" -> "sum", + "sr_remoteBytesRead" -> "sum", + "sr_remoteBytesReadToDisk" -> "sum", + "sr_localBytesRead" -> "sum", + "sr_totalBytesRead" -> "sum", + "sw_bytesWritten" -> "sum", + "sw_writeTime" -> "sum", + "sw_recordsWritten" -> "sum", + "input_bytesRead" -> "sum", + "input_recordsRead" -> "sum", + "output_bytesWritten" -> "sum", + "output_recordsWritten" -> "sum" + ) + + // By looping through SQL Plan nodes to find out the problematic SQLs. Currently we define + // problematic SQL's as those which have RowToColumnar, ColumnarToRow transitions and Lambda's in + // the Spark plan. + var problematicSQL: ArrayBuffer[ProblematicSQLCase] = ArrayBuffer[ProblematicSQLCase]() + // Process all events processEvents() // Process all properties after all events are processed processAllProperties() + // Process SQL Plan Metrics after all events are processed + processSQLPlanMetrics() // Create Spark DataFrame(s) based on ArrayBuffer(s) arraybufferToDF() @@ -103,15 +189,12 @@ class ApplicationInfo( * Functions to process all the events */ def processEvents(): Unit = { - logInfo("Parsing Event Log File: " + eventlog) - // Convert a String to org.apache.hadoop.fs.Path - val uri = URI.create(eventlog) - val config = new Configuration() - val fs = FileSystem.get(uri, config) - val path = new Path(eventlog) + logInfo("Parsing Event Log File: " + eventlog.toString) + + val fs = FileSystem.get(eventlog.toUri,new Configuration()) var totalNumEvents = 0 - Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in => + Utils.tryWithResource(EventLogFileReader.openEventLog(eventlog, fs)) { in => val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList totalNumEvents = lines.size lines.foreach { line => @@ -155,6 +238,37 @@ class ApplicationInfo( } } + /** + * Function to process SQL Plan Metrics after all events are processed + */ + def processSQLPlanMetrics(): Unit ={ + for ((sqlID, planInfo) <- sqlPlan){ + val planGraph = SparkPlanGraph(planInfo) + // SQLPlanMetric is a case Class of + // (name: String,accumulatorId: Long,metricType: String) + val allnodes = planGraph.allNodes + for (node <- allnodes){ + // Firstly identify problematic SQLs if there is any + if (isProblematicPlan(node)){ + problematicSQL += ProblematicSQLCase(sqlID) + } + // Then process SQL plan metric type + for (metric <- node.metrics){ + val thisMetric = SQLPlanMetricsCase(sqlID,metric.name, + metric.accumulatorId,metric.metricType) + sqlPlanMetrics += thisMetric + val thisNode = PlanNodeAccumCase(sqlID, node.id, + node.name, node.desc, metric.accumulatorId) + planNodeAccum += thisNode + } + } + } + if (this.sqlPlanMetricsAdaptive.nonEmpty){ + logInfo(s"Merging ${sqlPlanMetricsAdaptive.size} SQL Metrics(Adaptive) for appID=$appId") + sqlPlanMetrics = sqlPlanMetrics.union(sqlPlanMetricsAdaptive).distinct + } + } + /** * Functions to convert ArrayBuffer to DataFrame * and then create a view for each of them @@ -165,7 +279,6 @@ class ApplicationInfo( // For resourceProfilesDF if (this.resourceProfiles.nonEmpty) { this.allDataFrames += (s"resourceProfilesDF_$index" -> this.resourceProfiles.toDF) - this.resourceProfiles.clear() } else { logWarning("resourceProfiles is empty!") } @@ -173,7 +286,6 @@ class ApplicationInfo( // For blockManagersDF if (this.blockManagers.nonEmpty) { this.allDataFrames += (s"blockManagersDF_$index" -> this.blockManagers.toDF) - this.blockManagers.clear() } else { logWarning("blockManagers is empty!") } @@ -189,7 +301,6 @@ class ApplicationInfo( // For propertiesDF if (this.allProperties.nonEmpty) { this.allDataFrames += (s"propertiesDF_$index" -> this.allProperties.toDF) - this.allProperties.clear() } else { logError("propertiesDF is empty! Existing...") System.exit(1) @@ -211,7 +322,6 @@ class ApplicationInfo( appStartNew += newApp } this.allDataFrames += (s"appDF_$index" -> appStartNew.toDF) - this.appStart.clear() } else { logError("Application is empty! Exiting...") System.exit(1) @@ -220,7 +330,6 @@ class ApplicationInfo( // For executorsDF if (this.executors.nonEmpty) { this.allDataFrames += (s"executorsDF_$index" -> this.executors.toDF) - this.executors.clear() } else { logError("executors is empty! Exiting...") System.exit(1) @@ -229,23 +338,153 @@ class ApplicationInfo( // For executorsRemovedDF if (this.executorsRemoved.nonEmpty) { this.allDataFrames += (s"executorsRemovedDF_$index" -> this.executorsRemoved.toDF) - this.executorsRemoved.clear() } else { logDebug("executorsRemoved is empty!") } + // For sqlDF + if (sqlStart.nonEmpty) { + val sqlStartNew: ArrayBuffer[SQLExecutionCase] = ArrayBuffer[SQLExecutionCase]() + for (res <- sqlStart) { + val thisEndTime = sqlEndTime.get(res.sqlID) + val durationResult = ProfileUtils.OptionLongMinusLong(thisEndTime, res.startTime) + val durationString = durationResult match { + case Some(i) => UIUtils.formatDuration(i) + case None => "" + } + val sqlExecutionNew = res.copy(endTime = thisEndTime, + duration = durationResult, + durationStr = durationString) + sqlStartNew += sqlExecutionNew + } + allDataFrames += (s"sqlDF_$index" -> sqlStartNew.toDF) + } else { + logInfo("No SQL Execution Found. Skipping generating SQL Execution DataFrame.") + } + + // For jobDF + if (jobStart.nonEmpty) { + val jobStartNew: ArrayBuffer[JobCase] = ArrayBuffer[JobCase]() + for (res <- jobStart) { + val thisEndTime = jobEndTime.get(res.jobID) + val durationResult = ProfileUtils.OptionLongMinusLong(thisEndTime, res.startTime) + val durationString = durationResult match { + case Some(i) => UIUtils.formatDuration(i) + case None => "" + } + + val jobNew = res.copy(endTime = thisEndTime, + duration = durationResult, + durationStr = durationString, + jobResult = Some(jobEndResult(res.jobID)), + failedReason = jobFailedReason(res.jobID) + ) + jobStartNew += jobNew + } + allDataFrames += (s"jobDF_$index" -> jobStartNew.toDF) + } else { + logError("No Job Found. Exiting.") + System.exit(1) + } + + // For stageDF + if (stageSubmitted.nonEmpty) { + val stageSubmittedNew: ArrayBuffer[StageCase] = ArrayBuffer[StageCase]() + for (res <- stageSubmitted) { + val thisEndTime = stageCompletionTime(res.stageId) + val thisFailureReason = stageFailureReason(res.stageId) + + val durationResult = ProfileUtils.optionLongMinusOptionLong(thisEndTime, res.submissionTime) + val durationString = durationResult match { + case Some(i) => UIUtils.formatDuration(i) + case None => "" + } + + val stageNew = res.copy(completionTime = thisEndTime, + failureReason = thisFailureReason, + duration = durationResult, + durationStr = durationString) + stageSubmittedNew += stageNew + } + allDataFrames += (s"stageDF_$index" -> stageSubmittedNew.toDF) + } else { + logError("No Stage Found. Exiting.") + System.exit(1) + } + + // For taskDF + if (taskEnd.nonEmpty) { + allDataFrames += (s"taskDF_$index" -> taskEnd.toDF) + } else { + logError("task is empty! Exiting...") + System.exit(1) + } + + // For sqlMetricsDF + if (sqlPlanMetrics.nonEmpty) { + logInfo(s"Total ${sqlPlanMetrics.size} SQL Metrics for appID=$appId") + allDataFrames += (s"sqlMetricsDF_$index" -> sqlPlanMetrics.toDF) + } else { + logInfo("No SQL Metrics Found. Skipping generating SQL Metrics DataFrame.") + } + + // For driverAccumDF + allDataFrames += (s"driverAccumDF_$index" -> driverAccum.toDF) + if (driverAccum.nonEmpty) { + logInfo(s"Total ${driverAccum.size} driver accums for appID=$appId") + } else { + logInfo("No Driver accum Found. Create an empty driver accum DataFrame.") + } + + // For taskStageAccumDF + allDataFrames += (s"taskStageAccumDF_$index" -> taskStageAccum.toDF) + if (taskStageAccum.nonEmpty) { + logInfo(s"Total ${taskStageAccum.size} task&stage accums for appID=$appId") + } else { + logInfo("No task&stage accums Found.Create an empty task&stage accum DataFrame.") + } + + // For planNodeAccumDF + allDataFrames += (s"planNodeAccumDF_$index" -> planNodeAccum.toDF) + if (planNodeAccum.nonEmpty) { + logInfo(s"Total ${planNodeAccum.size} Plan node accums for appID=$appId") + } else { + logInfo("No Plan node accums Found. Create an empty Plan node accums DataFrame.") + } + + // For problematicSQLDF + allDataFrames += (s"problematicSQLDF_$index" -> problematicSQL.toDF) + for ((name, df) <- this.allDataFrames) { df.createOrReplaceTempView(name) sparkSession.table(name).cache } } - // Function to run a query and show the result -- limit 1000 rows. - def runQuery(query: String, vertical: Boolean = false): DataFrame = { + // Function to drop all temp views of this application. + def dropAllTempViews(): Unit ={ + for ((name,_) <- this.allDataFrames) { + sparkSession.catalog.dropTempView(name+"_"+index) + } + // Clear all cached tables as well. + sparkSession.catalog.clearCache() + } + + // Function to run a query and print the result to the file. + // Limit to 1000 rows if the output number of rows is not mentioned in the command line. + def runQuery( + query: String, + vertical: Boolean = false, + writeToFile: Boolean = false, + messageHeader: String = ""): DataFrame = { logDebug("Running:" + query) + val numRows = args.numOutputRows.getOrElse(1000) val df = sparkSession.sql(query) - logInfo("\n" + df.showString(1000, 0, vertical)) - fileWriter.write(df.showString(1000, 0, vertical)) + logInfo("\n" + df.showString(numRows, 0, vertical)) + if (writeToFile) { + fileWriter.write(messageHeader) + fileWriter.write(df.showString(numRows, 0, vertical)) + } df } @@ -311,4 +550,123 @@ class ApplicationInfo( |where source ='spark' |and key like 'spark.rapids%' |""".stripMargin + + // Function to generate the SQL string for aggregating task metrics columns. + def generateAggSQLString: String = { + var resultString = "" + + // Function to concat the Aggregation column string + // eg: ",\n round(sum(column),1) as column_sum" + def concatAggCol(col: String, aggType: String): Unit = { + val colString = "round(" + aggType + "(t." + col + ")" + ",1)" + resultString += ",\n" + colString + " as " + col + "_" + aggType + } + + for ((col, aggType) <- this.taskMetricsColumns) { + // If aggType=all, it means all 4 aggregation: sum, max, min, avg. + if (aggType == "all") { + concatAggCol(col, "sum") + concatAggCol(col, "max") + concatAggCol(col, "min") + concatAggCol(col, "avg") + } + else { + concatAggCol(col, aggType) + } + } + resultString + } + + // Function to generate a query for job level Task Metrics aggregation + def jobMetricsAggregationSQL: String = { + s"""select $index as appIndex, concat('job_',j.jobID) as ID, + |count(*) as numTasks, max(j.duration) as Duration + |$generateAggSQLString + |from taskDF_$index t, stageDF_$index s, jobDF_$index j + |where t.stageId=s.stageId + |and array_contains(j.stageIds, s.stageId) + |group by j.jobID + |""".stripMargin + } + + // Function to generate a query for stage level Task Metrics aggregation + def stageMetricsAggregationSQL: String = { + s"""select $index as appIndex, concat('stage_',s.stageId) as ID, + |count(*) as numTasks, max(s.duration) as Duration + |$generateAggSQLString + |from taskDF_$index t, stageDF_$index s + |where t.stageId=s.stageId + |group by s.stageId + |""".stripMargin + } + + // Function to generate a query for job+stage level Task Metrics aggregation + def jobAndStageMetricsAggregationSQL: String = { + jobMetricsAggregationSQL + " union " + stageMetricsAggregationSQL + } + + // Function to generate a query for SQL level Task Metrics aggregation + def sqlMetricsAggregationSQL: String = { + s"""select $index as appIndex, '$appId' as appID, + |sq.sqlID, sq.description, + |count(*) as numTasks, max(sq.duration) as Duration, + |round(sum(executorCPUTime)/sum(executorRunTime)*100,2) executorCPURatio + |$generateAggSQLString + |from taskDF_$index t, stageDF_$index s, + |jobDF_$index j, sqlDF_$index sq + |where t.stageId=s.stageId + |and array_contains(j.stageIds, s.stageId) + |and sq.sqlID=j.sqlID + |group by sq.sqlID,sq.description + |""".stripMargin + } + + // Function to generate a query for printing SQL metrics(accumulables) + def generateSQLAccums: String = { + s"""with allaccums as + |( + |select s.sqlID, p.nodeID, p.nodeName, + |s.accumulatorId, s.name, d.value, s.metricType + |from sqlMetricsDF_$index s, driverAccumDF_$index d, + |planNodeAccumDF_$index p + |where s.sqlID = d.sqlID and s.accumulatorId=d.accumulatorId + |and s.sqlID=p.sqlID and s.accumulatorId=p.accumulatorId + |union + |select s.sqlID, p.nodeID, p.nodeName, + |s.accumulatorId, s.name, t.value, s.metricType + |from jobDF_$index j, sqlDF_$index sq , + |taskStageAccumDF_$index t, sqlMetricsDF_$index s, + |planNodeAccumDF_$index p + |where array_contains(j.stageIds, t.stageId) + |and sq.sqlID=j.sqlID + |and s.sqlID = sq.sqlID + |and s.accumulatorId=t.accumulatorId + |and s.sqlID=p.sqlID and s.accumulatorId=p.accumulatorId + |) + |select sqlID, nodeID, nodeName, + |accumulatorId, name, max(value) as max_value, metricType + |from allaccums + |group by sqlID, nodeID, nodeName, accumulatorId, name, metricType + |order by sqlID, nodeID, nodeName, accumulatorId, name, metricType + |""".stripMargin + } + + // Function to generate a query for qualification + def qualificationSQL: String = { + s"""select $index as appIndex, '$appId' as appID, + |sq.sqlID, sq.description, + |sq.duration, m.executorCPURatio + |from sqlDF_$index sq , sqlAggMetricsDF m + |where $index = m.appIndex and sq.sqlID = m.sqlID + |and sq.sqlID not in (select sqlID from problematicSQLDF_$index) + |and sq.duration > 30000 + |and m.executorCPURatio > 30 + |""".stripMargin + } + + // Function to determine if a SparkPlanGraphNode could be problematic. + def isProblematicPlan(node: SparkPlanGraphNode): Boolean = { + node.name == "GpuColumnarToRow" || node.name == "GpuRowToColumnar" || + (node.desc matches ".*\\$Lambda\\$.*") + } } \ No newline at end of file diff --git a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala index 4ce524e1005..837230ca513 100644 --- a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala +++ b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala @@ -16,11 +16,14 @@ package org.apache.spark.sql.rapids.tool.profiling +import scala.collection.JavaConverters._ + import com.nvidia.spark.rapids.tool.profiling._ import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} /** * This object is to process all events and do validation in the end. @@ -55,6 +58,42 @@ object EventsProcessor extends Logging { case _: SparkListenerExecutorRemoved => doSparkListenerExecutorRemoved(app, event.asInstanceOf[SparkListenerExecutorRemoved]) + case _: SparkListenerTaskStart => + doSparkListenerTaskStart(app, + event.asInstanceOf[SparkListenerTaskStart]) + case _: SparkListenerTaskEnd => + doSparkListenerTaskEnd(app, + event.asInstanceOf[SparkListenerTaskEnd]) + case _: SparkListenerSQLExecutionStart => + doSparkListenerSQLExecutionStart(app, + event.asInstanceOf[SparkListenerSQLExecutionStart]) + case _: SparkListenerSQLExecutionEnd => + doSparkListenerSQLExecutionEnd(app, + event.asInstanceOf[SparkListenerSQLExecutionEnd]) + case _: SparkListenerDriverAccumUpdates => + doSparkListenerDriverAccumUpdates(app, + event.asInstanceOf[SparkListenerDriverAccumUpdates]) + case _: SparkListenerJobStart => + doSparkListenerJobStart(app, + event.asInstanceOf[SparkListenerJobStart]) + case _: SparkListenerJobEnd => + doSparkListenerJobEnd(app, + event.asInstanceOf[SparkListenerJobEnd]) + case _: SparkListenerStageSubmitted => + doSparkListenerStageSubmitted(app, + event.asInstanceOf[SparkListenerStageSubmitted]) + case _: SparkListenerStageCompleted => + doSparkListenerStageCompleted(app, + event.asInstanceOf[SparkListenerStageCompleted]) + case _: SparkListenerTaskGettingResult => + doSparkListenerTaskGettingResult(app, + event.asInstanceOf[SparkListenerTaskGettingResult]) + case _: SparkListenerSQLAdaptiveExecutionUpdate => + doSparkListenerSQLAdaptiveExecutionUpdate(app, + event.asInstanceOf[SparkListenerSQLAdaptiveExecutionUpdate]) + case _: SparkListenerSQLAdaptiveSQLMetricUpdates => + doSparkListenerSQLAdaptiveSQLMetricUpdates(app, + event.asInstanceOf[SparkListenerSQLAdaptiveSQLMetricUpdates]) case _ => doOtherEvent(app, event) } } @@ -180,6 +219,239 @@ object EventsProcessor extends Logging { app.executorsRemoved += thisExecutorRemoved } + def doSparkListenerTaskStart( + app: ApplicationInfo, + event: SparkListenerTaskStart): Unit = { + logDebug("Processing event: " + event.getClass) + app.taskStart += event + } + + def doSparkListenerTaskEnd( + app: ApplicationInfo, + event: SparkListenerTaskEnd): Unit = { + logDebug("Processing event: " + event.getClass) + + // Parse task accumulables + for (res <- event.taskInfo.accumulables) { + try { + val value = res.value.getOrElse("").toString.toLong + val thisMetric = TaskStageAccumCase( + event.stageId, event.stageAttemptId, Some(event.taskInfo.taskId), + res.id, res.name, Some(value), res.internal) + app.taskStageAccum += thisMetric + } catch { + case e: ClassCastException => + logWarning("ClassCastException when parsing accumulables for task " + + "stageID=" + event.stageId + ",taskId=" + event.taskInfo.taskId + + ": ") + logWarning(e.toString) + logWarning("The problematic accumulable is: name=" + + res.name + ",value=" + res.value) + } + } + + val thisTask = TaskCase( + event.stageId, + event.stageAttemptId, + event.taskType, + event.reason.toString, + event.taskInfo.taskId, + event.taskInfo.attemptNumber, + event.taskInfo.launchTime, + event.taskInfo.finishTime, + event.taskInfo.duration, + event.taskInfo.successful, + event.taskInfo.executorId, + event.taskInfo.host, + event.taskInfo.taskLocality.toString, + event.taskInfo.speculative, + event.taskInfo.gettingResultTime, + event.taskMetrics.executorDeserializeTime, + event.taskMetrics.executorDeserializeCpuTime / 1000000, + event.taskMetrics.executorRunTime, + event.taskMetrics.executorCpuTime / 1000000, + event.taskMetrics.peakExecutionMemory, + event.taskMetrics.resultSize, + event.taskMetrics.jvmGCTime, + event.taskMetrics.resultSerializationTime, + event.taskMetrics.memoryBytesSpilled, + event.taskMetrics.diskBytesSpilled, + event.taskMetrics.shuffleReadMetrics.remoteBlocksFetched, + event.taskMetrics.shuffleReadMetrics.localBlocksFetched, + event.taskMetrics.shuffleReadMetrics.fetchWaitTime, + event.taskMetrics.shuffleReadMetrics.remoteBytesRead, + event.taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, + event.taskMetrics.shuffleReadMetrics.localBytesRead, + event.taskMetrics.shuffleReadMetrics.totalBytesRead, + event.taskMetrics.shuffleWriteMetrics.bytesWritten, + event.taskMetrics.shuffleWriteMetrics.writeTime / 1000000, + event.taskMetrics.shuffleWriteMetrics.recordsWritten, + event.taskMetrics.inputMetrics.bytesRead, + event.taskMetrics.inputMetrics.recordsRead, + event.taskMetrics.outputMetrics.bytesWritten, + event.taskMetrics.outputMetrics.recordsWritten + ) + app.taskEnd += thisTask + } + + def doSparkListenerSQLExecutionStart( + app: ApplicationInfo, + event: SparkListenerSQLExecutionStart): Unit = { + logDebug("Processing event: " + event.getClass) + val sqlExecution = SQLExecutionCase( + event.executionId, + event.description, + event.details, + event.time, + None, + None, + "" + ) + app.sqlStart += sqlExecution + app.sqlPlan += (event.executionId -> event.sparkPlanInfo) + app.physicalPlanDescription += (event.executionId -> event.physicalPlanDescription) + } + + def doSparkListenerSQLExecutionEnd( + app: ApplicationInfo, + event: SparkListenerSQLExecutionEnd): Unit = { + logDebug("Processing event: " + event.getClass) + app.sqlEndTime += (event.executionId -> event.time) + } + + def doSparkListenerDriverAccumUpdates( + app: ApplicationInfo, + event: SparkListenerDriverAccumUpdates): Unit = { + logDebug("Processing event: " + event.getClass) + + val SparkListenerDriverAccumUpdates(sqlID, accumUpdates) = event + accumUpdates.foreach(accum => + app.driverAccum += DriverAccumCase(sqlID, accum._1, accum._2) + ) + logDebug("Current driverAccum has " + app.driverAccum.size + " accums.") + } + + def doSparkListenerJobStart( + app: ApplicationInfo, + event: SparkListenerJobStart): Unit = { + logDebug("Processing event: " + event.getClass) + val sqlIDString = event.properties.getProperty("spark.sql.execution.id") + val sqlID = ProfileUtils.stringToLong(sqlIDString) + val thisJob = JobCase( + event.jobId, + event.stageIds, + sqlID, + event.properties.asScala, + event.time, + None, + None, + "", + None, + "", + ProfileUtils.isGPUMode(event.properties.asScala) || app.gpuMode + ) + app.jobStart += thisJob + } + + def doSparkListenerJobEnd( + app: ApplicationInfo, + event: SparkListenerJobEnd): Unit = { + logDebug("Processing event: " + event.getClass) + app.jobEndTime += (event.jobId -> event.time) + + // Parse jobResult + val thisJobResult = event.jobResult match { + case JobSucceeded => "JobSucceeded" + case _: JobFailed => "JobFailed" + case _ => "Unknown" + } + app.jobEndResult += (event.jobId -> thisJobResult) + + val thisFailedReason = event.jobResult match { + case JobSucceeded => "" + case jobFailed: JobFailed => jobFailed.exception.toString + case _ => "" + } + app.jobFailedReason += (event.jobId -> thisFailedReason) + } + + def doSparkListenerStageSubmitted( + app: ApplicationInfo, + event: SparkListenerStageSubmitted): Unit = { + logDebug("Processing event: " + event.getClass) + val thisStage = StageCase( + event.stageInfo.stageId, + event.stageInfo.attemptNumber(), + event.stageInfo.name, + event.stageInfo.numTasks, + event.stageInfo.rddInfos.size, + event.stageInfo.parentIds, + event.stageInfo.details, + event.properties.asScala, + event.stageInfo.submissionTime, + None, + None, + None, + "", + ProfileUtils.isGPUMode(event.properties.asScala) || app.gpuMode + ) + app.stageSubmitted += thisStage + } + + def doSparkListenerStageCompleted( + app: ApplicationInfo, + event: SparkListenerStageCompleted): Unit = { + logDebug("Processing event: " + event.getClass) + app.stageCompletionTime += (event.stageInfo.stageId -> event.stageInfo.completionTime) + app.stageFailureReason += (event.stageInfo.stageId -> event.stageInfo.failureReason) + + // Parse stage accumulables + for (res <- event.stageInfo.accumulables) { + try { + val value = res._2.value.getOrElse("").toString.toLong + val thisMetric = TaskStageAccumCase( + event.stageInfo.stageId, event.stageInfo.attemptNumber(), + None, res._2.id, res._2.name, Some(value), res._2.internal) + app.taskStageAccum += thisMetric + } catch { + case e: ClassCastException => + logWarning("ClassCastException when parsing accumulables for task " + + "stageID=" + event.stageInfo.stageId + ": ") + logWarning(e.toString) + logWarning("The problematic accumulable is: name=" + + res._2.name + ",value=" + res._2.value) + } + } + } + + def doSparkListenerTaskGettingResult( + app: ApplicationInfo, + event: SparkListenerTaskGettingResult): Unit = { + logDebug("Processing event: " + event.getClass) + app.taskGettingResult += event + } + + def doSparkListenerSQLAdaptiveExecutionUpdate( + app: ApplicationInfo, + event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { + logDebug("Processing event: " + event.getClass) + // AQE plan can override the ones got from SparkListenerSQLExecutionStart + app.sqlPlan += (event.executionId -> event.sparkPlanInfo) + app.physicalPlanDescription += (event.executionId -> event.physicalPlanDescription) + } + + def doSparkListenerSQLAdaptiveSQLMetricUpdates( + app: ApplicationInfo, + event: SparkListenerSQLAdaptiveSQLMetricUpdates): Unit = { + logDebug("Processing event: " + event.getClass) + val SparkListenerSQLAdaptiveSQLMetricUpdates(sqlID, sqlPlanMetrics) = event + val metrics = sqlPlanMetrics.map { metric => + SQLPlanMetricsCase(sqlID, metric.name, + metric.accumulatorId, metric.metricType) + } + app.sqlPlanMetricsAdaptive ++= metrics + } + // To process all other unknown events def doOtherEvent(app: ApplicationInfo, event: SparkListenerEvent): Unit = { logInfo("Processing other event: " + event.getClass) diff --git a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ToolUtils.scala b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ToolUtils.scala index ba912effa36..b828fb0498c 100644 --- a/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ToolUtils.scala +++ b/rapids-4-spark-tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ToolUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.rapids.tool.profiling import org.apache.spark.internal.config +import org.apache.spark.sql.DataFrame object ToolUtils { @@ -24,4 +25,8 @@ object ToolUtils { (properties.getOrElse(config.PLUGINS.key, "").contains("com.nvidia.spark.SQLPlugin") && properties.getOrElse("spark.rapids.sql.enabled", "true").toBoolean) } + + def showString(df: DataFrame, numRows: Int) = { + df.showString(numRows, 0) + } } diff --git a/rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 4fec7577ee9..e5f1a480b29 100644 --- a/rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -44,13 +44,18 @@ class ApplicationInfoSuite extends FunSuite with Logging { var index: Int = 1 val eventlogPaths = appArgs.eventlog() for (path <- eventlogPaths) { - apps += new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index) + apps += new ApplicationInfo(appArgs, sparkSession, fileWriter, + ProfileUtils.stringToPath(path)(0), index) index += 1 } assert(apps.size == 1) assert(apps.head.sparkVersion.equals("3.1.1")) assert(apps.head.gpuMode.equals(true)) - + assert(apps.head.jobStart(apps.head.index).jobID.equals(1)) + assert(apps.head.stageSubmitted(apps.head.index).numTasks.equals(1)) + assert(apps.head.stageSubmitted(2).stageId.equals(2)) + assert(apps.head.taskEnd(apps.head.index).successful.equals(true)) + assert(apps.head.taskEnd(apps.head.index).endReason.equals("Success")) } finally { fileWriter.close() tempFile.deleteOnExit()