Skip to content

Commit

Permalink
Update for timestamp type for Spark 31x; update for String type
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Sep 12, 2023
1 parent 37a6d71 commit e99514b
Showing 1 changed file with 60 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ParquetScaleTestSuite extends SparkQueryCompareTestSuite with Logging {
.set("spark.rapids.sql.explain", "ALL")

/**
* By default cuDF splits row groups by 1,000,000 rows, we expect one row group
* By default cuDF splits row groups by 1,000,000 rows, we expect one row group.
* Refer to cuDF parquet.hpp
* default_row_group_size_bytes = 128 * 1024 * 1024; ///< 128MB per row group
* default_row_group_size_rows = 1000000; ///< 1 million rows per row group
Expand Down Expand Up @@ -142,11 +142,48 @@ class ParquetScaleTestSuite extends SparkQueryCompareTestSuite with Logging {
val gpuStats = withGpuSparkSession(getStats(testDataFile), sparkConf)
val gpuFileSize = testDataFile.listFiles(f => f.getName.endsWith(".parquet"))(0).length()

// compare schema
if (!skipCheckSchema) {
assertResult(cpuStats.schema)(gpuStats.schema)
}

// compare stats
if (skipCheckSchema) {
assertResult(cpuStats.rowGroupStats)(gpuStats.rowGroupStats)
} else {
assertResult(cpuStats)(gpuStats)
assert(cpuStats.rowGroupStats.length == gpuStats.rowGroupStats.length)
cpuStats.rowGroupStats.zip(gpuStats.rowGroupStats).foreach {
case (cpuRowGroup, gpuRowGroup) => {
assert(cpuRowGroup.rowCount == gpuRowGroup.rowCount)
assert(cpuRowGroup.columnStats.length == gpuRowGroup.columnStats.length)
cpuRowGroup.columnStats.zip(gpuRowGroup.columnStats).foreach {
case (cpuColumnStat, gpuColumnStat) => {
assert(cpuColumnStat.hasNonNullValue == gpuColumnStat.hasNonNullValue)
if (cpuColumnStat.hasNonNullValue) {
// compare all the attributes
assertResult(cpuColumnStat)(gpuColumnStat)
} else {
// hasNonNullValue is false, which means stats are invalid, no need to compare
// other attributes.
/**
* hasNonNullValue means:
*
* Returns whether there have been non-null values added to this statistics
*
* @return true if the values contained at least one non-null value
*
* Refer to link: https://github.com/apache/parquet-mr/blob/apache-parquet-1.10.1
* /parquet-column/src/main/java/org/apache/parquet/column/statistics
* /Statistics.java#L504-L506
*
* e.g.: Spark 31x, for timestamp type
* CPU: hasNonNullValue: false, isNumNullsSet: false, getNumNulls: -1
* GPU: hasNonNullValue: false, isNumNullsSet: true, getNumNulls: 0
*
* Above are expected differences.
*/
assertResult(cpuColumnStat.primitiveType)(gpuColumnStat.primitiveType)
}
}
}
}
}

// Check the Gpu file size is not too large.
Expand Down Expand Up @@ -462,14 +499,28 @@ class ParquetScaleTestSuite extends SparkQueryCompareTestSuite with Logging {
*/
// skip check the schema
val (cpuStat, gpuStat) = checkStats(genDf(tab), skipCheckSchema = true)
val expectedCpuSchema = Seq(

val expectedCpuSchemaForSpark31x = Seq(
"[c1, list, element, key_value, key] required binary key (UTF8)",
"[c1, list, element, key_value, value] optional int64 value")

val expectedCpuSchemaForSpark320Plus = Seq(
"[c1, list, element, key_value, key] required binary key (STRING)",
"[c1, list, element, key_value, value] optional int64 value")
val expectedGpuSchema = Seq(

val expectedGpuSchemaForSpark31x = Seq(
"[c1, list, c1, key_value, key] required binary key (UTF8)",
"[c1, list, c1, key_value, value] optional int64 value")

val expectedGpuSchemaForSpark320Plus = Seq(
"[c1, list, c1, key_value, key] required binary key (STRING)",
"[c1, list, c1, key_value, value] optional int64 value")
assertResult(expectedCpuSchema)(cpuStat.schema)
assertResult(expectedGpuSchema)(gpuStat.schema)

assert(cpuStat.schema == expectedCpuSchemaForSpark31x ||
cpuStat.schema == expectedCpuSchemaForSpark320Plus)

assert(gpuStat.schema == expectedGpuSchemaForSpark31x ||
gpuStat.schema == expectedGpuSchemaForSpark320Plus)
}

test("Statistics tests for Parquet files written by GPU, map(map)") {
Expand Down

0 comments on commit e99514b

Please sign in to comment.