Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code for generating dot file visualizations #2449

Merged
merged 11 commits into from
May 26, 2021
2 changes: 1 addition & 1 deletion rapids-4-spark-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.sql.rapids.tool.profiling.ProfileMain</mainClass>
<mainClass>com.nvidia.spark.rapids.tool.profiling.ProfileMain</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import java.io.File
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo


/**
* CollectInformation mainly print information based on this event log:
* Such as executors, parameters, etc.
Expand Down Expand Up @@ -72,4 +78,39 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
messageHeader = messageHeader)
}
}

def generateDot(): Unit = {
for (app <- apps) {
val requiredDataFrames = Seq("sqlMetricsDF", "driverAccumDF",
"taskStageAccumDF", "taskStageAccumDF")
.map(name => s"${name}_${app.index}")
if (requiredDataFrames.forall(app.allDataFrames.contains)) {
val accums = app.runQuery(app.generateSQLAccums)
val start = System.nanoTime()
val accumSummary = accums
.select(col("sqlId"), col("accumulatorId"), col("max_value"))
.collect()
val map = new mutable.HashMap[Long, ArrayBuffer[(Long,Long)]]()
for (row <- accumSummary) {
val list = map.getOrElseUpdate(row.getLong(0), new ArrayBuffer[(Long, Long)]())
list += row.getLong(1) -> row.getLong(2)
}
val outDir = new File(app.args.outputDirectory())
for ((sqlID, planInfo) <- app.sqlPlan) {
val fileDir = new File(outDir, s"${app.appId}-query-$sqlID")
fileDir.mkdirs()
val metrics = map.getOrElse(sqlID, Seq.empty).toMap
GenerateDot.generateDotGraph(
QueryPlanWithMetrics(planInfo, metrics), None, fileDir, sqlID + ".dot")
}
val duration = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
fileWriter.write(s"Generated DOT graphs for app ${app.appId} " +
s"to ${outDir.getAbsolutePath} in $duration second(s)\n")
} else {
val missingDataFrames = requiredDataFrames.filterNot(app.allDataFrames.contains)
fileWriter.write(s"Could not generate DOT graph for app ${app.appId} " +
s"because of missing data frames: ${missingDataFrames.mkString(", ")}\n")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tool.profiling
andygrove marked this conversation as resolved.
Show resolved Hide resolved

import java.io.{File, FileWriter}
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.metric.SQLMetricInfo

/**
* Generate a DOT graph for one query plan, or showing differences between two query plans.
*
* Diff mode is intended for comparing query plans that are expected to have the same
* structure, such as two different runs of the same query but with different tuning options.
*
* When running in diff mode, any differences in SQL metrics are shown. Also, if the plan
* starts to deviate then the graph will show where the plans deviate and will not recurse
* further.
*
* Graphviz and other tools can be used to generate images from DOT files.
*
* See https://graphviz.org/pdf/dotguide.pdf for a description of DOT files.
*/
object GenerateDot {
private val GPU_COLOR = "#76b900" // NVIDIA Green
private val CPU_COLOR = "#0071c5"
private val TRANSITION_COLOR = "red"

/**
* Generate a query plan visualization in dot format.
*
* @param plan First query plan and metrics
* @param comparisonPlan Optional second query plan and metrics
* @param filename Filename to write dot graph to
* @param includeCodegen Include WholeStageCodegen and InputAdapter nodes, if true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we not want these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this would be pretty hard to test as far as comparing to the sql ui to make sure we are getting same thing?

For testing, we could generate dot file from a plan that we read from an event log, and then confirm that we see some expected nodes. I think comparing to Spark UI functionality would be tricky.

Copy link
Contributor Author

@andygrove andygrove May 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would we not want these?

The original version excluded them, then I made it optional. I don't have a strong opinion on whether it is worth maintaining this option. It would probably have been better just to have a subgraph for the WholestageCodegen part, like Spark does. I could look at this as a follow on PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for my understanding, if this is true it would include all nodes in the wholestagecodegen, but if its false it just has 1 box saying wholestagecodegen without any details?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, all of the operators are always included. If this option is true then we also show separate nodes for InputAdapter and WholestageCodegen. If false, we remove those nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this some more and I do think it would make sense to remove this option now. We want to see the duration metric for WholestageCodeGen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit to remove the option

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to remove java doc for includeCodegen

*/
def generateDotGraph(
plan: QueryPlanWithMetrics,
comparisonPlan: Option[QueryPlanWithMetrics],
dir: File,
filename: String): Unit = {

var nextId = 1

def isGpuPlan(plan: SparkPlanInfo): Boolean = {
plan.nodeName match {
case name if name contains "QueryStage" =>
plan.children.isEmpty || isGpuPlan(plan.children.head)
case name if name == "ReusedExchange" =>
plan.children.isEmpty || isGpuPlan(plan.children.head)
case name =>
name.startsWith("Gpu")
}
}

def formatMetric(m: SQLMetricInfo, value: Long): String = {
val formatter = java.text.NumberFormat.getIntegerInstance
m.metricType match {
case "timing" =>
val ms = value
s"${formatter.format(ms)} ms"
case "nsTiming" =>
val ms = TimeUnit.NANOSECONDS.toMillis(value)
s"${formatter.format(ms)} ms"
case _ =>
s"${formatter.format(value)}"
}
}

/** Recursively graph the operator nodes in the spark plan */
def writeGraph(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider splitting writeGraph in smaller functions. It looks too long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made one small improvement here but don't want to go too far because I can't actually test any of this code in this repo until other PRs are merged in.

w: FileWriter,
node: QueryPlanWithMetrics,
comparisonNode: QueryPlanWithMetrics,
id: Int = 0): Unit = {

val nodePlan = node.plan
val comparisonPlan = comparisonNode.plan
if (nodePlan.nodeName == comparisonPlan.nodeName &&
nodePlan.children.length == comparisonPlan.children.length) {

val metricNames = (nodePlan.metrics.map(_.name) ++
comparisonPlan.metrics.map(_.name)).distinct.sorted

val metrics = metricNames.flatMap(name => {
val l = nodePlan.metrics.find(_.name == name)
val r = comparisonPlan.metrics.find(_.name == name)
(l, r) match {
case (Some(metric1), Some(metric2)) =>
(node.metrics.get(metric1.accumulatorId),
comparisonNode.metrics.get(metric1.accumulatorId)) match {
case (Some(value1), Some(value2)) =>
if (value1 == value2) {
Some(s"$name: ${formatMetric(metric1, value1)}")
} else {
metric1.metricType match {
case "nsTiming" | "timing" =>
val pctStr = createPercentDiffString(value1, value2)
Some(s"$name: ${formatMetric(metric1, value1)} / " +
s"${formatMetric(metric2, value2)} ($pctStr %)")
case _ =>
Some(s"$name: ${formatMetric(metric1, value1)} / " +
s"${formatMetric(metric2, value2)}")
}
}
case _ => None
}
case _ => None
}
}).mkString("\n")

val color = if (isGpuPlan(nodePlan)) { GPU_COLOR } else { CPU_COLOR }

val label = if (nodePlan.nodeName.contains("QueryStage")) {
nodePlan.simpleString
} else {
nodePlan.nodeName
}

val nodeText =
s"""node$id [shape=box,color="$color",style="filled",
|label = "$label\n
|$metrics"];
|""".stripMargin

w.write(nodeText)
nodePlan.children.indices.foreach(i => {
val childId = nextId
nextId += 1
writeGraph(
w,
QueryPlanWithMetrics(nodePlan.children(i), node.metrics),
QueryPlanWithMetrics(comparisonPlan.children(i), comparisonNode.metrics),
childId);

val style = (isGpuPlan(nodePlan), isGpuPlan(nodePlan.children(i))) match {
case (true, true) => s"""color="$GPU_COLOR""""
case (false, false) => s"""color="$CPU_COLOR""""
case _ =>
// show emphasis on transitions between CPU and GPU
s"color=$TRANSITION_COLOR, style=bold"
}
w.write(s"node$childId -> node$id [$style];\n")
})
} else {
// plans have diverged - cannot recurse further
w.write(
s"""node$id [shape=box, color=red,
|label = "plans diverge here:
|${nodePlan.nodeName} vs ${comparisonPlan.nodeName}"];\n""".stripMargin)
}
}

// write the dot graph to a file
val file = new File(dir, filename)
println(s"Writing ${file.getAbsolutePath}")
val w = new FileWriter(file)
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
try {
w.write("digraph G {\n")
writeGraph(w, plan, comparisonPlan.getOrElse(plan), 0)
w.write("}\n")
} finally {
w.close()
}
}

private def createPercentDiffString(n1: Long, n2: Long) = {
val pct = (n2 - n1) * 100.0 / n1
val pctStr = if (pct < 0) {
f"$pct%.1f"
} else {
f"+$pct%.1f"
}
pctStr
}
}

/**
* Query plan with metrics.
*
* @param plan Query plan.
* @param metrics Map of accumulatorId to metric.
*/
case class QueryPlanWithMetrics(plan: SparkPlanInfo, metrics: Map[Long, Long])
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ For usage see below:
val numOutputRows: ScallopOption[Int] =
opt[Int](required = false,
descr = "Number of output rows for each Application. Default is 1000")
val generateDot: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Generate query visualizations in DOT format. Default is false")

verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,41 @@
package com.nvidia.spark.rapids.tool.profiling

import java.io.FileWriter

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
import scala.collection.mutable.ArrayBuffer

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.hadoop.fs.Path

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.tool.profiling._

/**
* A profiling tool to parse Spark Event Log
* This is the Main function.
*/
object ProfileMain extends Logging {
/**
* Entry point from spark-submit running this as the driver.
*/
def main(args: Array[String]) {
val sparkSession = ProfileUtils.createSparkSession
val exitCode = mainInternal(sparkSession, new ProfileArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
}

/**
* Entry point for tests
*/
def mainInternal(sparkSession: SparkSession, appArgs: ProfileArgs): Int = {

// This tool's output log file name
val logFileName = "rapids_4_spark_tools_output.log"

// Parsing args
val appArgs = new ProfileArgs(args)
val eventlogPaths = appArgs.eventlog()
val outputDirectory = appArgs.outputDirectory().stripSuffix("/")

// Create the FileWriter and sparkSession used for ALL Applications.
val fileWriter = new FileWriter(s"$outputDirectory/$logFileName")
val sparkSession = ProfileUtils.createSparkSession
logInfo(s"Output directory: $outputDirectory")

// Convert the input path string to Path(s)
Expand All @@ -68,9 +77,9 @@ object ProfileMain extends Logging {
//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
System.exit(0)
return 0
}
processApps(apps)
processApps(apps, generateDot = false)
// Show the application Id <-> appIndex mapping.
for (app <- apps) {
logApplicationInfo(app)
Expand All @@ -84,7 +93,7 @@ object ProfileMain extends Logging {
val app = new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index)
apps += app
logApplicationInfo(app)
processApps(apps)
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
}
Expand All @@ -100,7 +109,7 @@ object ProfileMain extends Logging {
* evaluated at once and the output is one row per application. Else each eventlog is parsed one
* at a time.
*/
def processApps(apps: ArrayBuffer[ApplicationInfo]): Unit = {
def processApps(apps: ArrayBuffer[ApplicationInfo], generateDot: Boolean): Unit = {
if (appArgs.compare()) { // Compare Applications
logInfo(s"### A. Compare Information Collected ###")
val compare = new CompareApplications(apps)
Expand All @@ -113,6 +122,9 @@ object ProfileMain extends Logging {
collect.printAppInfo()
collect.printExecutorInfo()
collect.printRapidsProperties()
if (generateDot) {
collect.generateDot()
}
}

logInfo(s"### B. Analysis ###")
Expand All @@ -133,5 +145,7 @@ object ProfileMain extends Logging {
logInfo(s"============== ${app.appId} (index=${app.index}) ==============")
logInfo("========================================================================")
}

0
}
}
Loading