Skip to content

Commit

Permalink
Adding additional functionalities to profiling tool (#2469)
Browse files Browse the repository at this point in the history
* Adding additional functionalities to profiling tool

Signed-off-by: Niranjan Artal <[email protected]>

* addressed review comments

Signed-off-by: Niranjan Artal <[email protected]>
  • Loading branch information
nartal1 authored May 24, 2021
1 parent a813edf commit 22a0ad6
Show file tree
Hide file tree
Showing 13 changed files with 1,100 additions and 74 deletions.
14 changes: 7 additions & 7 deletions rapids-4-spark-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,30 @@ org.apache.spark.sql.rapids.tool.profiling.ProfileMain.main(Array("/path/to/even

## How to compile and use from command-line
1. `mvn clean package`
2. `cd $SPARK_HOME (Download Apache Spark if reequired)`
3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`
2. `cd $SPARK_HOME (Download Apache Spark if required)`
3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`

## Options
```
$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar --help
$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar --help
Spark event log profiling tool
Example:
# Input 1 or more event logs from local path:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1 /path/to/eventlog2
# If event log is from S3:
export AWS_ACCESS_KEY_ID=xxx
export AWS_SECRET_ACCESS_KEY=xxx
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar s3a://<BUCKET>/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar s3a://<BUCKET>/eventlog1 /path/to/eventlog2
# If event log is from hdfs:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar hdfs://<BUCKET>/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar hdfs://<BUCKET>/eventlog1 /path/to/eventlog2
# Change output directory to /tmp. It outputs as "rapids_4_spark_tools_output.log" in the local directory if the output directory is not specified.
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar -o /tmp /path/to/eventlog1
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar -o /tmp /path/to/eventlog1
For usage see below:
-o, --output-directory <arg> Output directory. Default is current directory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@


/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.rapids.tool.profiling._

/**
* Does analysis on the DataFrames
* from object of ApplicationInfo
*/
class Analysis(apps: ArrayBuffer[ApplicationInfo]) {

require(apps.nonEmpty)
private val fileWriter = apps.head.fileWriter

// Job Level TaskMetrics Aggregation
def jobMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Job level aggregated task metrics:")
apps.head.runQuery(apps.head.jobMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.jobMetricsAggregationSQL
} else {
query += " union " + app.jobMetricsAggregationSQL
}
}
fileWriter.write("Job level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// Stage Level TaskMetrics Aggregation
def stageMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Stage level aggregated task metrics:")
apps.head.runQuery(apps.head.stageMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.stageMetricsAggregationSQL
} else {
query += " union " + app.stageMetricsAggregationSQL
}
}
fileWriter.write("Stage level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// Job + Stage Level TaskMetrics Aggregation
def jobAndStageMetricsAggregation(): Unit = {
if (apps.size == 1) {
val messageHeader = "Job + Stage level aggregated task metrics:"
apps.head.runQuery(apps.head.jobAndStageMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.jobAndStageMetricsAggregationSQL
} else {
query += " union " + app.jobAndStageMetricsAggregationSQL
}
}
fileWriter.write("Job + Stage level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// SQL Level TaskMetrics Aggregation(Only when SQL exists)
def sqlMetricsAggregation(): DataFrame = {
if (apps.size == 1) {
if (apps.head.allDataFrames.contains(s"sqlDF_${apps.head.index}")) {
val messageHeader = "SQL level aggregated task metrics:"
apps.head.runQuery(apps.head.sqlMetricsAggregationSQL + " order by Duration desc")
} else {
apps.head.sparkSession.emptyDataFrame
}
} else {
var query = ""
val appsWithSQL = apps.filter(p => p.allDataFrames.contains(s"sqlDF_${p.index}"))
for (app <- appsWithSQL) {
if (query.isEmpty) {
query += app.sqlMetricsAggregationSQL
} else {
query += " union " + app.sqlMetricsAggregationSQL
}
}
val messageHeader = "SQL level aggregated task metrics:"
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// custom query execution. Normally for debugging use.
def customQueryExecution(app: ApplicationInfo): Unit = {
fileWriter.write("Custom query execution:")
val customQuery =
s"""select stageId from stageDF_${app.index} limit 1
|""".stripMargin
app.runQuery(customQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,115 @@ case class ExecutorCase(
executorID: String, host: String, totalCores: Int, resourceProfileId: Int)

case class ExecutorRemovedCase(
executorID: String, reason: String, time: Long)
executorID: String,
reason: String,
time: Long)

case class SQLExecutionCase(
sqlID: Long,
description: String,
details: String,
startTime: Long,
endTime: Option[Long],
duration: Option[Long],
durationStr: String)

case class SQLPlanMetricsCase(
sqlID: Long,
name: String,
accumulatorId: Long,
metricType: String)

case class PlanNodeAccumCase(
sqlID: Long,
nodeID: Long,
nodeName:String,
nodeDesc: String,
accumulatorId: Long)

case class DriverAccumCase(
sqlID: Long,
accumulatorId: Long,
value: Long)

case class TaskStageAccumCase(
stageId: Int,
attemptId: Int,
taskId: Option[Long],
accumulatorId: Long,
name: Option[String],
value: Option[Long],
isInternal: Boolean)

case class JobCase(
jobID: Int,
stageIds: Seq[Int],
sqlID: Option[Long],
properties: scala.collection.Map[String, String],
startTime: Long,
endTime: Option[Long],
jobResult: Option[String],
failedReason: String,
duration: Option[Long],
durationStr: String,
gpuMode: Boolean)

case class StageCase(
stageId: Int,
attemptId: Int,
name: String,
numTasks: Int,
numRDD: Int,
parentIds: Seq[Int],
details: String,
properties: scala.collection.Map[String, String],
submissionTime: Option[Long],
completionTime: Option[Long],
failureReason: Option[String],
duration: Option[Long],
durationStr: String, gpuMode: Boolean)

// Note: sr = Shuffle Read; sw = Shuffle Write
// Totally 39 columns
case class TaskCase(
stageId: Int,
stageAttemptId: Int,
taskType: String,
endReason: String,
taskId: Long,
attempt: Int,
launchTime: Long,
finishTime: Long,
duration: Long,
successful: Boolean,
executorId: String,
host: String,
taskLocality: String,
speculative: Boolean,
gettingResultTime: Long,
executorDeserializeTime: Long,
executorDeserializeCPUTime: Long,
executorRunTime: Long,
executorCPUTime: Long,
peakExecutionMemory: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
sr_remoteBlocksFetched: Long,
sr_localBlocksFetched: Long,
sr_fetchWaitTime: Long,
sr_remoteBytesRead: Long,
sr_remoteBytesReadToDisk: Long,
sr_localBytesRead: Long,
sr_totalBytesRead: Long,
sw_bytesWritten: Long,
sw_writeTime: Long,
sw_recordsWritten: Long,
input_bytesRead: Long,
input_recordsRead: Long,
output_bytesWritten: Long,
output_recordsWritten: Long)

case class ProblematicSQLCase(sqlID: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {

// Print Application Information
def printAppInfo(): Unit = {
fileWriter.write("Application Information:\n")
val messageHeader = "Application Information:\n"
for (app <- apps) {
app.runQuery(app.generateAppInfo)
app.runQuery(query = app.generateAppInfo, writeToFile = true, messageHeader = messageHeader)
}
}

Expand All @@ -57,17 +57,19 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {

// Print executor related information
def printExecutorInfo(): Unit = {
fileWriter.write("\n\nExecutor Information:\n")
val messageHeader = "\n\nExecutor Information:\n"
for (app <- apps) {
app.runQuery(app.generateExecutorInfo + " order by cast(executorID as long)")
app.runQuery(query = app.generateExecutorInfo + " order by cast(executorID as long)",
writeToFile = true, messageHeader = messageHeader)
}
}

// Print Rapids related Spark Properties
def printRapidsProperties(): Unit = {
fileWriter.write("\n\nSpark Rapids parameters set explicitly:\n")
val messageHeader = "\n\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
app.runQuery(app.generateRapidsProperties + " order by key")
app.runQuery(query = app.generateRapidsProperties + " order by key", writeToFile = true,
messageHeader = messageHeader)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {

// Compare the App Information.
def compareAppInfo(): Unit = {
fileWriter.append("Compare Application Information:\n")
val messageHeader = "Compare Application Information:\n"
var query = ""
var i = 1
for (app <- apps) {
Expand All @@ -42,12 +42,12 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
}
i += 1
}
apps.head.runQuery(query)
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
}

// Compare Executors information
def compareExecutorInfo(): Unit = {
fileWriter.write("\n\nCompare Executor Information:\n")
val messageHeader = "\n\nCompare Executor Information:\n"
var query = ""
var i = 1
for (app <- apps) {
Expand All @@ -59,12 +59,12 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
}
i += 1
}
apps.head.runQuery(query)
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
}

// Compare Rapids Properties which are set explicitly
def compareRapidsProperties(): Unit ={
fileWriter.write("\n\nCompare Rapids Properties which are set explicitly:\n")
val messageHeader = "\n\nCompare Rapids Properties which are set explicitly:\n"
var withClauseAllKeys = "with allKeys as \n ("
val selectKeyPart = "select allKeys.key"
var selectValuePart = ""
Expand Down Expand Up @@ -97,6 +97,6 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
query = withClauseAllKeys + selectKeyPart + selectValuePart +
" from (\n" + query + "\n) order by key"
logDebug("Running query " + query)
apps.head.runQuery(query)
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
}
}
Loading

0 comments on commit 22a0ad6

Please sign in to comment.