-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Benchmark utility to perform diff of output from benchmark runs, allowing for precision differences #782
Benchmark utility to perform diff of output from benchmark runs, allowing for precision differences #782
Changes from 22 commits
08aa904
c820427
f0420ae
a133872
3823800
5fa574e
dae7177
780ed9f
15fa656
d1f1650
7585220
c3994b1
15b153b
49cd605
4bd3593
5db1cfb
e6ca6ef
c5c1844
2887b4e
d8c821b
21c3cc8
c9aceb9
eebba51
8d36be4
a8e0c85
6bccea2
ebf5aa8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,8 @@ import org.json4s.jackson.JsonMethods.parse | |
import org.json4s.jackson.Serialization.writePretty | ||
|
||
import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION} | ||
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} | ||
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} | ||
import org.apache.spark.sql.functions.col | ||
|
||
object BenchUtils { | ||
|
||
|
@@ -246,6 +247,145 @@ object BenchUtils { | |
} | ||
} | ||
|
||
/** | ||
* Perform a diff of the results collected from two DataFrames, allowing for differences in | ||
* precision. | ||
* | ||
* The intended usage is to run timed benchmarks that write results to file and then separately | ||
* use this utility to compare those result sets. This code performs a sort and a collect and | ||
* is only suitable for data sets that can fit in the driver's memory. For larger datasets, | ||
* a better approach would be to convert the results to single files, download them locally | ||
* and adapt this Scala code to read those files directly (without using Spark). | ||
* | ||
* Example usage: | ||
* | ||
* <pre> | ||
* scala> val cpu = spark.read.parquet("/data/q5-cpu") | ||
* scala> val gpu = spark.read.parquet("/data/q5-gpu") | ||
* scala> import com.nvidia.spark.rapids.tests.common._ | ||
* scala> BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0) | ||
* Collecting rows from DataFrame | ||
* Collected 989754 rows in 7.701 seconds | ||
* Collecting rows from DataFrame | ||
* Collected 989754 rows in 2.325 seconds | ||
* Results match | ||
* </pre> | ||
* | ||
* @param df1 DataFrame to compare. | ||
* @param df2 DataFrame to compare. | ||
* @param ignoreOrdering Sort the data collected from the DataFrames before comparing them. | ||
* @param maxErrors Maximum number of differences to report. | ||
* @param epsilon Allow for differences in precision when comparing floating point values. | ||
*/ | ||
def compareResults( | ||
df1: DataFrame, | ||
df2: DataFrame, | ||
ignoreOrdering: Boolean, | ||
maxErrors: Int = 10, | ||
epsilon: Double = 0.00001): Unit = { | ||
|
||
val result1: Seq[Seq[Any]] = collectResults(df1, ignoreOrdering) | ||
val result2: Seq[Seq[Any]] = collectResults(df2, ignoreOrdering) | ||
|
||
if (result1.length == result2.length) { | ||
var errors = 0 | ||
var i = 0; | ||
while (i < result1.length && errors < maxErrors) { | ||
val l = result1(i) | ||
val r = result2(i) | ||
if (!rowEqual(l, r, epsilon)) { | ||
println(s"Row $i:\n${l.mkString(",")}\n${r.mkString(",")}\n") | ||
errors += 1 | ||
if (errors == maxErrors) { | ||
println(s"Aborting comparison after reaching maximum of $maxErrors errors") | ||
} | ||
} | ||
i += 1 | ||
} | ||
|
||
if (errors==0) { | ||
println(s"Results match") | ||
} | ||
|
||
} else { | ||
println(s"Row counts do not match: ${result1.length} != ${result2.length}") | ||
} | ||
} | ||
|
||
private def collectResults(df: DataFrame, ignoreOrdering: Boolean): Seq[Seq[Any]] = { | ||
println("Collecting rows from DataFrame") | ||
val t1 = System.currentTimeMillis() | ||
val rows = if (ignoreOrdering) { | ||
// let Spark do the sorting | ||
df.sort(df.columns.map(col): _*).collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the downside to this is if sort is broken, is this forcing cpu sort for instance? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only known issue with GPU sort vs CPU sort is #84 Which I don't think we will ever run into outside of artificially generated data sets, which is why we have not pushed to fix it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The job config is outside of the tool. I think to verify the results we should let standard cpu spark do it, nothing stops you from running this with the plugin, so it's up to the user at this point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we have other tests for correctness so ignore my comment, and hopefully the GPU sort should be faster. |
||
} else { | ||
df.collect() | ||
} | ||
val t2 = System.currentTimeMillis() | ||
println(s"Collected ${rows.length} rows in ${(t2-t1)/1000.0} seconds") | ||
rows.map(_.toSeq) | ||
} | ||
|
||
private def rowEqual(row1: Seq[Any], row2: Seq[Any], epsilon: Double): Boolean = { | ||
row1.zip(row2).forall { | ||
case (l, r) => compare(l, r, epsilon) | ||
} | ||
} | ||
|
||
// this is copied from SparkQueryCompareTestSuite | ||
abellina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private def compare(expected: Any, actual: Any, epsilon: Double = 0.0): Boolean = { | ||
def doublesAreEqualWithinPercentage(expected: Double, actual: Double): (String, Boolean) = { | ||
if (!compare(expected, actual)) { | ||
if (expected != 0) { | ||
val v = Math.abs((expected - actual) / expected) | ||
(s"\n\nABS($expected - $actual) / ABS($actual) == $v is not <= $epsilon ", v <= epsilon) | ||
} else { | ||
val v = Math.abs(expected - actual) | ||
(s"\n\nABS($expected - $actual) == $v is not <= $epsilon ", v <= epsilon) | ||
} | ||
} else { | ||
("SUCCESS", true) | ||
} | ||
} | ||
(expected, actual) match { | ||
case (a: Float, b: Float) if a.isNaN && b.isNaN => true | ||
case (a: Double, b: Double) if a.isNaN && b.isNaN => true | ||
case (null, null) => true | ||
case (null, _) => false | ||
case (_, null) => false | ||
case (a: Array[_], b: Array[_]) => | ||
a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) } | ||
case (a: Map[_, _], b: Map[_, _]) => | ||
a.size == b.size && a.keys.forall { aKey => | ||
b.keys.find(bKey => compare(aKey, bKey)) | ||
.exists(bKey => compare(a(aKey), b(bKey))) | ||
} | ||
case (a: Iterable[_], b: Iterable[_]) => | ||
a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) } | ||
case (a: Product, b: Product) => | ||
compare(a.productIterator.toSeq, b.productIterator.toSeq, epsilon) | ||
case (a: Row, b: Row) => | ||
compare(a.toSeq, b.toSeq, epsilon) | ||
// 0.0 == -0.0, turn float/double to bits before comparison, to distinguish 0.0 and -0.0. | ||
case (a: Double, b: Double) if epsilon <= 0 => | ||
java.lang.Double.doubleToRawLongBits(a) == java.lang.Double.doubleToRawLongBits(b) | ||
case (a: Double, b: Double) if epsilon > 0 => | ||
val ret = doublesAreEqualWithinPercentage(a, b) | ||
if (!ret._2) { | ||
System.err.println(ret._1 + " (double)") | ||
} | ||
ret._2 | ||
case (a: Float, b: Float) if epsilon <= 0 => | ||
java.lang.Float.floatToRawIntBits(a) == java.lang.Float.floatToRawIntBits(b) | ||
case (a: Float, b: Float) if epsilon > 0 => | ||
val ret = doublesAreEqualWithinPercentage(a, b) | ||
if (!ret._2) { | ||
System.err.println(ret._1 + " (float)") | ||
} | ||
ret._2 | ||
case (a, b) => a == b | ||
} | ||
} | ||
} | ||
|
||
/** Top level benchmark report class */ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
considering this is a benchmarking utility, should we also provide an iterator in case the collected results are too big?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark's
DataFrame.collect()
loads the full result set into the driver and returns anArray
. If (when) we need to handle comparisons of larger results I think we would need an alternate approach like converting the results to single files, downloading them, and using Scala code (without Spark) to perform the diff.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a relatively minor change to be able to do this with an iterator. We have that support in the python unit tests. The only issue is that you cannot compare the size of the results ahead of time, because you don't know it yet. But I agree until we hit a situation where we need it there is no point in doing it. Also it is very slow to try and do a comparison like that single threaded. It might be better to truncate floating point values and do a few anti-joins to see if there is something in the left that is not in the right, and vise versa. This might not handle duplicate rows, so we might need something there too, but it would scale a lot better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to know more about the iterator approach. I did look at the
toLocalIterator
method but this seems to load one partition at a time so I don't think that would work when we need ordering across the result set. Could you point me to an example where we do this in the Python tests?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It loads one partition of the result at a time, so it does preserve ordering. It is just rather slow because we are doing the comparison single threaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkout mortgage_test.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the option to use an iterator and have been testing this out and it seems to work well.