Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decimal128 support for Parquet #4362

Merged
merged 12 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>128bit decimal only supported for Orc</em></td>
<td><em>PS<br/>128bit decimal only supported for Orc and Parquet</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -17509,13 +17509,13 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -17530,13 +17530,13 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><b>NS</b></td>
</tr>
</table>
1 change: 1 addition & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
decimal_64_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(7, 3, nullable=False), DecimalGen(12, 2, nullable=False), DecimalGen(18, -3, nullable=False)]]
decimal_128_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False),
DecimalGen(36, -5, nullable=False)]]
decimal_128_no_neg_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False)]]

# Some map gens, but not all because of nesting
map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
Expand Down
60 changes: 32 additions & 28 deletions integration_tests/src/main/python/parquet_test.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]
parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit]
parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit,
decimal_gen_20_2, decimal_gen_36_5, decimal_gen_38_0, decimal_gen_38_10]
parquet_decimal_struct_gen= StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_decimal_gens)])
writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'}
Expand All @@ -57,7 +58,7 @@ def limited_int96():

parquet_basic_map_gens = [MapGen(f(nullable=False), f()) for f in
[BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen,
limited_timestamp]] + [simple_string_to_string_map_gen]
limited_timestamp]] + [simple_string_to_string_map_gen] + decimal_128_no_neg_map_gens

parquet_struct_gen = [StructGen([['child' + str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_basic_gen)]),
StructGen([['child0', StructGen([['child1', byte_gen]])]]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,9 @@ object GpuOverrides extends Logging {
cudfWrite = TypeSig.none,
sparkSig = TypeSig.atomics)),
(ParquetFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP).nested(),
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT +
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT +
TypeSig.ARRAY + TypeSig.MAP).nested(),
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT +
TypeSig.ARRAY + TypeSig.MAP).nested(),
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
Expand Down Expand Up @@ -3480,7 +3480,7 @@ object GpuOverrides extends Logging {
exec[DataWritingCommandExec](
"Writing data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL.withPsNote(
TypeEnum.DECIMAL, "128bit decimal only supported for Orc") +
TypeEnum.DECIMAL, "128bit decimal only supported for Orc and Parquet") +
TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Only supported for Parquet") +
TypeSig.MAP.withPsNote(TypeEnum.MAP, "Only supported for Parquet") +
TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Only supported for Parquet")).nested(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
field => {
if (field.isPrimitive) {
val t = field.getOriginalType
(t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32)
(t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) ||
(t == OriginalType.UINT_32) || (t == OriginalType.UINT_64)
} else {
existsUnsignedType(field.asGroupType)
}
Expand All @@ -794,7 +795,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = {
cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType())
// UINT64 is casted to Decimal(20,0) by Spark to accommodate
// the largest possible values this type can take. Other Unsigned data types are converted to
// basic types like LongType, this is analogous to that except we spill over to large
// decimal/ints.
cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) ||
cv.getType.equals(DType.UINT64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments to elaborate why UINT64 needs to be casted to decimal here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I can :) Coming up shortly.

}

def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = {
Expand Down
Binary file added tests/src/test/resources/test_unsigned64.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,18 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite {
assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) {
frame => frame.select(col("*"))
}

/**
* Parquet file with 2 columns
* <simple_uint64, UINT64>
* <arr_uint64, array(UINT64)>
*/

testSparkResultsAreEqual("Test Parquet unsigned int: uint64",
frameFromParquet("test_unsigned64.parquet"),
// CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2.
// The exception is like "Parquet type not supported: INT32 (UINT_8)"
assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) {
frame => frame.select(col("*"))
}
}
2 changes: 1 addition & 1 deletion tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS
ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Parquet,read,S,S,S,S,S,S,S,S,PS,S,PS,NA,NS,NA,PS,PS,PS,NS
Parquet,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS
Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA