Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-21.06' into align-gds
Browse files Browse the repository at this point in the history
  • Loading branch information
rongou committed May 24, 2021
2 parents 5762934 + 22a0ad6 commit 606eac0
Show file tree
Hide file tree
Showing 35 changed files with 1,181 additions and 96 deletions.
25 changes: 21 additions & 4 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pyspark.sql.types import *
from marks import *
import pyspark.sql.functions as f
from spark_session import is_before_spark_311
from spark_session import is_before_spark_311, with_cpu_session

_no_nans_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.hasNans': 'false',
Expand Down Expand Up @@ -281,7 +281,6 @@ def test_hash_grpby_pivot(data_gen, conf):
.agg(f.sum('c')),
conf=conf)


@approximate_float
@ignore_order(local=True)
@allow_non_gpu('HashAggregateExec', 'PivotFirst', 'AggregateExpression', 'Alias', 'GetArrayItem',
Expand All @@ -290,10 +289,19 @@ def test_hash_grpby_pivot(data_gen, conf):
@incompat
@pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nulls_and_nans], ids=idfn)
def test_hash_pivot_groupby_nan_fallback(data_gen):
# collect values to pivot in previous, to avoid this preparation job being captured
def fetch_pivot_values(spark):
max_values = spark._jsparkSession.sessionState().conf().dataFramePivotMaxValues()
df = gen_df(spark, data_gen, length=100)
df = df.select('b').distinct().limit(max_values + 1).sort('b')
return [row[0] for row in df.collect()]

pivot_values = with_cpu_session(fetch_pivot_values)

assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.pivot('b')
.pivot('b', pivot_values)
.agg(f.sum('c')),
"PivotFirst",
conf=_nans_float_conf)
Expand Down Expand Up @@ -333,10 +341,19 @@ def test_hash_multiple_grpby_pivot(data_gen, conf):
@incompat
@pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nulls_and_nans], ids=idfn)
def test_hash_pivot_reduction_nan_fallback(data_gen):
# collect values to pivot in previous, to avoid this preparation job being captured
def fetch_pivot_values(spark):
max_values = spark._jsparkSession.sessionState().conf().dataFramePivotMaxValues()
df = gen_df(spark, data_gen, length=100)
df = df.select('b').distinct().limit(max_values + 1).sort('b')
return [row[0] for row in df.collect()]

pivot_values = with_cpu_session(fetch_pivot_values)

assert_gpu_fallback_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby()
.pivot('b')
.pivot('b', pivot_values)
.agg(f.sum('c')),
"PivotFirst",
conf=_nans_float_conf)
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from conftest import is_at_least_precommit_run, is_databricks_runtime
from conftest import is_at_least_precommit_run

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version
try:
Expand Down Expand Up @@ -275,8 +275,6 @@ def gpu_run(spark):

# ======= Test Window In Pandas =======
@cudf_udf
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/2372')
def test_window(enable_cudf_udf):
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
Expand Down
14 changes: 7 additions & 7 deletions rapids-4-spark-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,30 @@ org.apache.spark.sql.rapids.tool.profiling.ProfileMain.main(Array("/path/to/even

## How to compile and use from command-line
1. `mvn clean package`
2. `cd $SPARK_HOME (Download Apache Spark if reequired)`
3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`
2. `cd $SPARK_HOME (Download Apache Spark if required)`
3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`

## Options
```
$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar --help
$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar --help
Spark event log profiling tool
Example:
# Input 1 or more event logs from local path:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1 /path/to/eventlog2
# If event log is from S3:
export AWS_ACCESS_KEY_ID=xxx
export AWS_SECRET_ACCESS_KEY=xxx
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar s3a://<BUCKET>/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar s3a://<BUCKET>/eventlog1 /path/to/eventlog2
# If event log is from hdfs:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar hdfs://<BUCKET>/eventlog1 /path/to/eventlog2
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar hdfs://<BUCKET>/eventlog1 /path/to/eventlog2
# Change output directory to /tmp. It outputs as "rapids_4_spark_tools_output.log" in the local directory if the output directory is not specified.
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/workload_profiling/target/rapids-4-spark-tools-<version>.jar -o /tmp /path/to/eventlog1
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar -o /tmp /path/to/eventlog1
For usage see below:
-o, --output-directory <arg> Output directory. Default is current directory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@


/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.rapids.tool.profiling._

/**
* Does analysis on the DataFrames
* from object of ApplicationInfo
*/
class Analysis(apps: ArrayBuffer[ApplicationInfo]) {

require(apps.nonEmpty)
private val fileWriter = apps.head.fileWriter

// Job Level TaskMetrics Aggregation
def jobMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Job level aggregated task metrics:")
apps.head.runQuery(apps.head.jobMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.jobMetricsAggregationSQL
} else {
query += " union " + app.jobMetricsAggregationSQL
}
}
fileWriter.write("Job level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// Stage Level TaskMetrics Aggregation
def stageMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Stage level aggregated task metrics:")
apps.head.runQuery(apps.head.stageMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.stageMetricsAggregationSQL
} else {
query += " union " + app.stageMetricsAggregationSQL
}
}
fileWriter.write("Stage level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// Job + Stage Level TaskMetrics Aggregation
def jobAndStageMetricsAggregation(): Unit = {
if (apps.size == 1) {
val messageHeader = "Job + Stage level aggregated task metrics:"
apps.head.runQuery(apps.head.jobAndStageMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
for (app <- apps) {
if (query.isEmpty) {
query += app.jobAndStageMetricsAggregationSQL
} else {
query += " union " + app.jobAndStageMetricsAggregationSQL
}
}
fileWriter.write("Job + Stage level aggregated task metrics:")
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// SQL Level TaskMetrics Aggregation(Only when SQL exists)
def sqlMetricsAggregation(): DataFrame = {
if (apps.size == 1) {
if (apps.head.allDataFrames.contains(s"sqlDF_${apps.head.index}")) {
val messageHeader = "SQL level aggregated task metrics:"
apps.head.runQuery(apps.head.sqlMetricsAggregationSQL + " order by Duration desc")
} else {
apps.head.sparkSession.emptyDataFrame
}
} else {
var query = ""
val appsWithSQL = apps.filter(p => p.allDataFrames.contains(s"sqlDF_${p.index}"))
for (app <- appsWithSQL) {
if (query.isEmpty) {
query += app.sqlMetricsAggregationSQL
} else {
query += " union " + app.sqlMetricsAggregationSQL
}
}
val messageHeader = "SQL level aggregated task metrics:"
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// custom query execution. Normally for debugging use.
def customQueryExecution(app: ApplicationInfo): Unit = {
fileWriter.write("Custom query execution:")
val customQuery =
s"""select stageId from stageDF_${app.index} limit 1
|""".stripMargin
app.runQuery(customQuery)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,115 @@ case class ExecutorCase(
executorID: String, host: String, totalCores: Int, resourceProfileId: Int)

case class ExecutorRemovedCase(
executorID: String, reason: String, time: Long)
executorID: String,
reason: String,
time: Long)

case class SQLExecutionCase(
sqlID: Long,
description: String,
details: String,
startTime: Long,
endTime: Option[Long],
duration: Option[Long],
durationStr: String)

case class SQLPlanMetricsCase(
sqlID: Long,
name: String,
accumulatorId: Long,
metricType: String)

case class PlanNodeAccumCase(
sqlID: Long,
nodeID: Long,
nodeName:String,
nodeDesc: String,
accumulatorId: Long)

case class DriverAccumCase(
sqlID: Long,
accumulatorId: Long,
value: Long)

case class TaskStageAccumCase(
stageId: Int,
attemptId: Int,
taskId: Option[Long],
accumulatorId: Long,
name: Option[String],
value: Option[Long],
isInternal: Boolean)

case class JobCase(
jobID: Int,
stageIds: Seq[Int],
sqlID: Option[Long],
properties: scala.collection.Map[String, String],
startTime: Long,
endTime: Option[Long],
jobResult: Option[String],
failedReason: String,
duration: Option[Long],
durationStr: String,
gpuMode: Boolean)

case class StageCase(
stageId: Int,
attemptId: Int,
name: String,
numTasks: Int,
numRDD: Int,
parentIds: Seq[Int],
details: String,
properties: scala.collection.Map[String, String],
submissionTime: Option[Long],
completionTime: Option[Long],
failureReason: Option[String],
duration: Option[Long],
durationStr: String, gpuMode: Boolean)

// Note: sr = Shuffle Read; sw = Shuffle Write
// Totally 39 columns
case class TaskCase(
stageId: Int,
stageAttemptId: Int,
taskType: String,
endReason: String,
taskId: Long,
attempt: Int,
launchTime: Long,
finishTime: Long,
duration: Long,
successful: Boolean,
executorId: String,
host: String,
taskLocality: String,
speculative: Boolean,
gettingResultTime: Long,
executorDeserializeTime: Long,
executorDeserializeCPUTime: Long,
executorRunTime: Long,
executorCPUTime: Long,
peakExecutionMemory: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
sr_remoteBlocksFetched: Long,
sr_localBlocksFetched: Long,
sr_fetchWaitTime: Long,
sr_remoteBytesRead: Long,
sr_remoteBytesReadToDisk: Long,
sr_localBytesRead: Long,
sr_totalBytesRead: Long,
sw_bytesWritten: Long,
sw_writeTime: Long,
sw_recordsWritten: Long,
input_bytesRead: Long,
input_recordsRead: Long,
output_bytesWritten: Long,
output_recordsWritten: Long)

case class ProblematicSQLCase(sqlID: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {

// Print Application Information
def printAppInfo(): Unit = {
fileWriter.write("Application Information:\n")
val messageHeader = "Application Information:\n"
for (app <- apps) {
app.runQuery(app.generateAppInfo)
app.runQuery(query = app.generateAppInfo, writeToFile = true, messageHeader = messageHeader)
}
}

Expand All @@ -57,17 +57,19 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {

// Print executor related information
def printExecutorInfo(): Unit = {
fileWriter.write("\n\nExecutor Information:\n")
val messageHeader = "\n\nExecutor Information:\n"
for (app <- apps) {
app.runQuery(app.generateExecutorInfo + " order by cast(executorID as long)")
app.runQuery(query = app.generateExecutorInfo + " order by cast(executorID as long)",
writeToFile = true, messageHeader = messageHeader)
}
}

// Print Rapids related Spark Properties
def printRapidsProperties(): Unit = {
fileWriter.write("\n\nSpark Rapids parameters set explicitly:\n")
val messageHeader = "\n\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
app.runQuery(app.generateRapidsProperties + " order by key")
app.runQuery(query = app.generateRapidsProperties + " order by key", writeToFile = true,
messageHeader = messageHeader)
}
}
}
Loading

0 comments on commit 606eac0

Please sign in to comment.