Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling tool: Print potential problems #3933

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,11 @@ SQL level aggregated task metrics:

```
SQL Duration and Executor CPU Time Percent
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
|appIndex|App ID |sqlID|SQL Duration|Contains Dataset or RDD Op|App Duration|Potential Problems|Executor CPU Time Percent|
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
|1 |application_1603128018386_7759|0 |11042 |false |119990 |null |68.48 |
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
+--------+-------------------+-----+------------+--------------------------+------------+---------------------------+-------------------------+
|appIndex|App ID |sqlID|SQL Duration|Contains Dataset or RDD Op|App Duration|Potential Problems |Executor CPU Time Percent|
+--------+-------------------+-----+------------+--------------------------+------------+---------------------------+-------------------------+
|1 |local-1626104300434|0 |1260 |false |131104 |DECIMAL:NESTED COMPLEX TYPE|92.65 |
|1 |local-1626104300434|1 |259 |false |131104 |DECIMAL:NESTED COMPLEX TYPE|76.79 |
```

- Shuffle Skew Check:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ class Analysis(apps: Seq[ApplicationInfo]) {
def sqlMetricsAggregationDurationAndCpuTime(): Seq[SQLDurationExecutorTimeProfileResult] = {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// Potential problems not properly track, add it later
SQLDurationExecutorTimeProfileResult(app.index, app.appId, sqlId, sqlCase.duration,
sqlCase.hasDatasetOrRDD, app.appInfo.duration, sqlCase.problematic,
sqlCase.sqlCpuTimePercent)
Expand Down
139 changes: 138 additions & 1 deletion tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids.tool
import java.io.InputStream
import java.util.zip.GZIPInputStream

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.io.{Codec, Source}

import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
Expand All @@ -44,6 +44,10 @@ abstract class AppBase(
// The data source information
val dataSourceInfo: ArrayBuffer[DataSourceCase] = ArrayBuffer[DataSourceCase]()

// SQL containing any Dataset operation or RDD to DataSet/DataFrame operation
val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]()
val sqlIDtoProblematic: HashMap[Long, Set[String]] = HashMap[Long, Set[String]]()

def processEvent(event: SparkListenerEvent): Boolean

private def openEventLogInternal(log: Path, fs: FileSystem): InputStream = {
Expand Down Expand Up @@ -231,4 +235,137 @@ abstract class AppBase(
)
}
}

protected def reportComplexTypes: (String, String) = {
if (dataSourceInfo.size != 0) {
val schema = dataSourceInfo.map { ds => ds.schema }
AppBase.parseReadSchemaForNestedTypes(schema)
} else {
("", "")
}
}

protected def probNotDataset: HashMap[Long, Set[String]] = {
sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetOrRDDCase.contains(sqlID) }
}

protected def getPotentialProblemsForDf: String = {
probNotDataset.values.flatten.toSet.mkString(":")
}

// This is to append potential issues such as UDF, decimal type determined from
// SparkGraphPlan Node description and nested complex type determined from reading the
// event logs. If there are any complex nested types, then `NESTED COMPLEX TYPE` is mentioned
// in the `Potential Problems` section in the csv file. Section `Unsupported Nested Complex
// Types` has information on the exact nested complex types which are not supported for a
// particular application.
protected def getAllPotentialProblems(dFPotentialProb: String, nestedComplex: String): String = {
val nestedComplexType = if (nestedComplex.nonEmpty) "NESTED COMPLEX TYPE" else ""
val result = if (dFPotentialProb.nonEmpty) {
if (nestedComplex.nonEmpty) {
s"$dFPotentialProb:$nestedComplexType"
} else {
dFPotentialProb
}
} else {
nestedComplexType
}
result
}
}

object AppBase extends Logging {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

def parseReadSchemaForNestedTypes(
schema: ArrayBuffer[String]): (String, String) = {
val tempStringBuilder = new StringBuilder()
val individualSchema: ArrayBuffer[String] = new ArrayBuffer()
var angleBracketsCount = 0
var parenthesesCount = 0
val distinctSchema = schema.distinct.filter(_.nonEmpty).mkString(",")

// Get the nested types i.e everything between < >
for (char <- distinctSchema) {
char match {
case '<' => angleBracketsCount += 1
case '>' => angleBracketsCount -= 1
// If the schema has decimals, Example decimal(6,2) then we have to make sure it has both
// opening and closing parentheses(unless the string is incomplete due to V2 reader).
case '(' => parenthesesCount += 1
case ')' => parenthesesCount -= 1
case _ =>
}
if (angleBracketsCount == 0 && parenthesesCount == 0 && char.equals(',')) {
individualSchema += tempStringBuilder.toString
tempStringBuilder.setLength(0)
} else {
tempStringBuilder.append(char);
}
}
if (!tempStringBuilder.isEmpty) {
individualSchema += tempStringBuilder.toString
}

// If DataSource V2 is used, then Schema may be incomplete with ... appended at the end.
// We determine complex types and nested complex types until ...
val incompleteSchema = individualSchema.filter(x => x.contains("..."))
val completeSchema = individualSchema.filterNot(x => x.contains("..."))

// Check if it has types
val incompleteTypes = incompleteSchema.map { x =>
if (x.contains("...") && x.contains(":")) {
val schemaTypes = x.split(":", 2)
if (schemaTypes.size == 2) {
val partialSchema = schemaTypes(1).split("\\.\\.\\.")
if (partialSchema.size == 1) {
partialSchema(0)
} else {
""
}
} else {
""
}
} else {
""
}
}
// Omit columnName and get only schemas
val completeTypes = completeSchema.map { x =>
val schemaTypes = x.split(":", 2)
if (schemaTypes.size == 2) {
schemaTypes(1)
} else {
""
}
}
val schemaTypes = completeTypes ++ incompleteTypes

// Filter only complex types.
// Example: array<string>, array<struct<string, string>>
val complexTypes = schemaTypes.filter(x =>
x.startsWith("array<") || x.startsWith("map<") || x.startsWith("struct<"))

// Determine nested complex types from complex types
// Example: array<struct<string, string>> is nested complex type.
val nestedComplexTypes = complexTypes.filter(complexType => {
val startIndex = complexType.indexOf('<')
val closedBracket = complexType.lastIndexOf('>')
// If String is incomplete due to dsv2, then '>' may not be present. In that case traverse
// until length of the incomplete string
val lastIndex = if (closedBracket == -1) {
complexType.length - 1
} else {
closedBracket
}
val string = complexType.substring(startIndex, lastIndex + 1)
string.contains("array<") || string.contains("struct<") || string.contains("map<")
})

// Since it is saved as csv, replace commas with ;
val complexTypesResult = complexTypes.filter(_.nonEmpty).mkString(";").replace(",", ";")
val nestedComplexTypesResult = nestedComplexTypes.filter(
_.nonEmpty).mkString(";").replace(",", ";")

(complexTypesResult, nestedComplexTypesResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class ApplicationInfo(
* Function to process SQL Plan Metrics after all events are processed
*/
def processSQLPlanMetrics(): Unit = {
for ((sqlID, planInfo) <- sqlPlan){
for ((sqlID, planInfo) <- sqlPlan) {
checkMetadataForReadSchema(sqlID, planInfo)
val planGraph = SparkPlanGraph(planInfo)
// SQLPlanMetric is a case Class of
Expand All @@ -272,6 +272,7 @@ class ApplicationInfo(
checkGraphNodeForBatchScan(sqlID, node)
if (isDataSetOrRDDPlan(node.desc)) {
sqlIdToInfo.get(sqlID).foreach { sql =>
sqlIDToDataSetOrRDDCase += sqlID
sql.hasDatasetOrRDD = true
}
if (gpuMode) {
Expand All @@ -280,6 +281,19 @@ class ApplicationInfo(
unsupportedSQLplan += thisPlan
}
}

// find potential problems
val issues = findPotentialIssues(node.desc)
if (issues.nonEmpty) {
val existingIssues = sqlIDtoProblematic.getOrElse(sqlID, Set.empty[String])
sqlIDtoProblematic(sqlID) = existingIssues ++ issues
}
val (_, nestedComplexTypes) = reportComplexTypes
val potentialProbs = getAllPotentialProblems(getPotentialProblemsForDf, nestedComplexTypes)
sqlIdToInfo.get(sqlID).foreach { sql =>
sql.problematic = potentialProbs
}

// Then process SQL plan metric type
for (metric <- node.metrics) {
val allMetric = SQLMetricInfoCase(sqlID, metric.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.tool.qualification

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap}

import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.profiling._
Expand Down Expand Up @@ -55,11 +55,6 @@ class QualificationAppInfo(
val jobIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]
val sqlIDtoJobFailures: HashMap[Long, ArrayBuffer[Int]] = HashMap.empty[Long, ArrayBuffer[Int]]

val sqlIDtoProblematic: HashMap[Long, Set[String]] = HashMap[Long, Set[String]]()

// SQL containing any Dataset operation or RDD to DataSet/DataFrame operation
val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]()

val notSupportFormatAndTypes: HashMap[String, Set[String]] = HashMap[String, Set[String]]()

private lazy val eventProcessor = new QualificationEventProcessor(this)
Expand Down Expand Up @@ -135,10 +130,6 @@ class QualificationAppInfo(
}.values.sum
}

private def probNotDataset: HashMap[Long, Set[String]] = {
sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetOrRDDCase.contains(sqlID) }
}

// The total task time for all tasks that ran during SQL dataframe
// operations. if the SQL contains a dataset, it isn't counted.
private def calculateTaskDataframeDuration: Long = {
Expand All @@ -148,30 +139,6 @@ class QualificationAppInfo(
validSums.values.map(dur => dur.totalTaskDuration).sum
}

private def getPotentialProblemsForDf: String = {
probNotDataset.values.flatten.toSet.mkString(":")
}

// This is to append potential issues such as UDF, decimal type determined from
// SparkGraphPlan Node description and nested complex type determined from reading the
// event logs. If there are any complex nested types, then `NESTED COMPLEX TYPE` is mentioned
// in the `Potential Problems` section in the csv file. Section `Unsupported Nested Complex
// Types` has information on the exact nested complex types which are not supported for a
// particular application.
private def getAllPotentialProblems(dFPotentialProb: String, nestedComplex: String): String = {
val nestedComplexType = if (nestedComplex.nonEmpty) "NESTED COMPLEX TYPE" else ""
val result = if (dFPotentialProb.nonEmpty) {
if (nestedComplex.nonEmpty) {
s"$dFPotentialProb:$nestedComplexType"
} else {
dFPotentialProb
}
} else {
nestedComplexType
}
result
}

private def getSQLDurationProblematic: Long = {
probNotDataset.keys.map { sqlId =>
sqlDurationTime.getOrElse(sqlId, 0L)
Expand Down Expand Up @@ -221,15 +188,6 @@ class QualificationAppInfo(
}.getOrElse(1.0)
}

private def reportComplexTypes: (String, String) = {
if (dataSourceInfo.size != 0) {
val schema = dataSourceInfo.map { ds => ds.schema }
QualificationAppInfo.parseReadSchemaForNestedTypes(schema)
} else {
("", "")
}
}

/**
* Aggregate and process the application after reading the events.
* @return Option of QualificationSummaryInfo, Some if we were able to process the application
Expand Down Expand Up @@ -370,97 +328,4 @@ object QualificationAppInfo extends Logging {
}
app
}

def parseReadSchemaForNestedTypes(
schema: ArrayBuffer[String]): (String, String) = {
val tempStringBuilder = new StringBuilder()
val individualSchema: ArrayBuffer[String] = new ArrayBuffer()
var angleBracketsCount = 0
var parenthesesCount = 0
val distinctSchema = schema.distinct.filter(_.nonEmpty).mkString(",")

// Get the nested types i.e everything between < >
for (char <- distinctSchema) {
char match {
case '<' => angleBracketsCount += 1
case '>' => angleBracketsCount -= 1
// If the schema has decimals, Example decimal(6,2) then we have to make sure it has both
// opening and closing parentheses(unless the string is incomplete due to V2 reader).
case '(' => parenthesesCount += 1
case ')' => parenthesesCount -= 1
case _ =>
}
if (angleBracketsCount == 0 && parenthesesCount == 0 && char.equals(',')) {
individualSchema += tempStringBuilder.toString
tempStringBuilder.setLength(0)
} else {
tempStringBuilder.append(char);
}
}
if (!tempStringBuilder.isEmpty) {
individualSchema += tempStringBuilder.toString
}

// If DataSource V2 is used, then Schema may be incomplete with ... appended at the end.
// We determine complex types and nested complex types until ...
val incompleteSchema = individualSchema.filter(x => x.contains("..."))
val completeSchema = individualSchema.filterNot(x => x.contains("..."))

// Check if it has types
val incompleteTypes = incompleteSchema.map { x =>
if (x.contains("...") && x.contains(":")) {
val schemaTypes = x.split(":", 2)
if (schemaTypes.size == 2) {
val partialSchema = schemaTypes(1).split("\\.\\.\\.")
if (partialSchema.size == 1) {
partialSchema(0)
} else {
""
}
} else {
""
}
} else {
""
}
}
// Omit columnName and get only schemas
val completeTypes = completeSchema.map { x =>
val schemaTypes = x.split(":", 2)
if (schemaTypes.size == 2) {
schemaTypes(1)
} else {
""
}
}
val schemaTypes = completeTypes ++ incompleteTypes

// Filter only complex types.
// Example: array<string>, array<struct<string, string>>
val complexTypes = schemaTypes.filter(x =>
x.startsWith("array<") || x.startsWith("map<") || x.startsWith("struct<"))

// Determine nested complex types from complex types
// Example: array<struct<string, string>> is nested complex type.
val nestedComplexTypes = complexTypes.filter(complexType => {
val startIndex = complexType.indexOf('<')
val closedBracket = complexType.lastIndexOf('>')
// If String is incomplete due to dsv2, then '>' may not be present. In that case traverse
// until length of the incomplete string
val lastIndex = if (closedBracket == -1) {
complexType.length - 1
} else {
closedBracket
}
val string = complexType.substring(startIndex, lastIndex + 1)
string.contains("array<") || string.contains("struct<") || string.contains("map<")
})

// Since it is saved as csv, replace commas with ;
val complexTypesResult = complexTypes.filter(_.nonEmpty).mkString(";").replace(",", ";")
val nestedComplexTypesResult = nestedComplexTypes.filter(
_.nonEmpty).mkString(";").replace(",", ";")

(complexTypesResult, nestedComplexTypesResult)
}
}
Loading