-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Initial work on supporting DecimalType #1063
Conversation
@@ -158,6 +158,15 @@ private static final DType toRapidsOrNull(DataType type) { | |||
return DType.TIMESTAMP_MICROSECONDS; | |||
} else if (type instanceof StringType) { | |||
return DType.STRING; | |||
} else if (type instanceof DecimalType) { | |||
DecimalType decType = (DecimalType) type; | |||
if (decType.precision() <= DType.DECIMAL32_MAX_PRECISION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark is kind of split on how decimals are stored internally. For UnsafeRow
only supports a long
value or a binary representation for BigDecimal
.
For the build in ColumnVector
implementation they distinguish between int and long backed storage.
If we only support DECIMAL64 then it would potentially reduce complexity by a lot. But if we also support DECIMAL32 we could save on GPU memory.
The main reason I ask is because I just merged in some code that speeds up the transfer of fixed width data types. I have not tested it with decimal in any way, but there is code in there that makes some assumptions about what the size of the data types are and I think it is wrong right now. Should be simple to fix it, but I wanted to call this out and be sure we are all on the same page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can start from only supporting DECIMAL64 for reducing complexity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine either way, and it is not a big deal. I was just thinking out loud.
@@ -450,6 +450,7 @@ object GpuOverrides { | |||
case DateType => true | |||
case TimestampType => ZoneId.systemDefault().normalized() == GpuOverrides.UTC_TIMEZONE_ID | |||
case StringType => true | |||
case dt: DecimalType if dt.precision <= ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a list of all of the operators we currently support that also support Decimal in Spark? We have to be sure that if we are allowing decimal for all operators, like this is doing, that we have disallowed it everywhere that we do not have automated tests for.
The only tests I see in this patch are unit tests for Scalars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten isSupportedType
part. In latest version, general method isSupportedType
remain unchanged. And a decimal specialized method isSupportedTypeWithDecimal
was added for GpuExpressions (operators) who support Deimcal. For now, only very limited ones are decimal-enabled (with test cases), such as: GpuLiteral, GpuAlias. But this collection will be expanded only if any decimal-related dependencies are ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it is not easy to add DecimalType into supported type while keeping compatible with existed RapidsMetas. I tried to solve this problem with two actions:
- Add method
isSupportedTypeWithDecimal
for RapidMeta who overrides theareAllSupportedTypes
interface, such as:Literal
,Cast
. - Refactor
GpuOverrides.areAllSupportedTypes
, which receives RapidsMeta itself to determine whether it is decimal supportable. This is for RapidsMeta without interface overriding ofareAllSupportedTypes
.
@@ -201,6 +210,10 @@ static final DataType getSparkType(DType type) { | |||
return DataTypes.TimestampType; | |||
case STRING: | |||
return DataTypes.StringType; | |||
case DECIMAL32: | |||
return new DecimalType(DType.DECIMAL32_MAX_PRECISION, -type.getScale()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to track the effective precision of the value as it progresses through operations in order to be compatible with how Spark operates on Decimal
? For example, take this simple Spark shell session:
scala> val x = Decimal(5.43)
x: org.apache.spark.sql.types.Decimal = 5.43
scala> (x.precision, x.scale)
res2: (Int, Int) = (3,2)
scala> val y = x * x
y: org.apache.spark.sql.types.Decimal = 29.4849
scala> (y.precision, y.scale)
res3: (Int, Int) = (6,4)
If we perform the same operation with DECIMAL32
we will lose track of the precision, and when we cast back to a SparkType instead of getting DecimalType(6,4)
as we should, we'll get DecimalType(9,4)
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark should already be doing that for us, which is another reason why I don't think we want to rely on this API much longer. We are in the process of moving away from it for nested types and I think it would be best to just not support it for decimal at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@revans2 @jlowe The problem is that GpuColumnVector.from(cudf.ColumnVector cudfCv)
relies on the method getSparkType
. If getSparkType
doesn't support DecimalTypes, we won't be able to create decimal-typed GpuColumnVector from cudf.ColumnVector through the method GpuColumnVector.from(cudf.ColumnVector cudfCv)
. The method GpuColumnVector.from(cudf.ColumnVector cudfCv)
is referred in so many places, despite deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am in the process of updating the plugin so GpuColumnVector.from(cudf.ColumnVector cudfCv)
goes away and is replaced with GpuColumnVector.from(cudf.ColumnVector cudfCv, DataType)
, similar to what I have been doing for Table
to ColumnarBatch
conversion. I have a patch working, just need to finish testing it and a bit more cleanup. Because no one has taken a look at #1078 yet, I'll likely just update it instead of making a separate PR.
The goal eventually is to have getSparkType
go away entirely because we cannot support it when there is not a 1:1 mapping between cudf types and spark types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refined this PR on the basis of the patch #1078. I appended decimal type check into typeConversionAllowed
.
withResource(GpuColumnVector.from(ColumnVector.decimalFromInts(0, 1), | ||
DecimalType(DType.DECIMAL32_MAX_PRECISION + 1, 0))) { _ => } | ||
} | ||
// TODO: support fromScalar(cudf.ColumnVector cv, int rows) for fixed-point decimal in cuDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be a big problem. We need this for a project and other operations to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will support this feature ASAP. And I really want to know what other missing essentials for this PR. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we really have to have/test is
- Shuffle decimal types including Contiguous Split, UCX Headers Updated, JCudfSerialzation. I think all of these have been updated, but not sure if they have been tested yet.
- CoalesceBatch for decimal types. This includes Contiguous Split and Table.concat. Again I think these are in place, but it needs to be tested.
- ColumnarToRow and RowToColumnar. I think that is covered here for the most part, but it is not tested. I would like to have a follow on issue to try and "fix" some of the issues with the fast fixed width transfers, which I am happy to handle.
fromScalar
support. This is something that we are missing but I didn't realize how important it is until recently. We have a lot of Expressions that require this so we can take a constant value and turn it into a column, which happens regularly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @revans2 , thanks for outlining all essential work for basic decimal supporting. I am trying to address them these days, but I found it is not easy to accomplish all these targets at one time.
Here is the summary of current progress:
- Enable decimal type for valid
RapidsMeta
inGpuOverrides
(according to latest supportedType mechanism). - support decimal for
GpuColumnVector
(with test cases in DecimalUnitTest.scala) - support decimal for
GpuScalar
(with test cases in DecimalUnitTest.scala) - decimal converters for
GpuRowToColumnarExec
(with test cases in GpuBatchUtilsSuite.scala) - decimal columnar copy for
HostColumnarToGpu
(with test cases in DecimalUnitTest.scala) - test cases for decimal in
GpuCoalesceBatchesSuite
(Some cases fails because contiguous split of decimal column has yet supported by cuDF. I've created a PR to solve this problem.) - Creating decimal column vector from scalar will be supported after this PR merged (which is close to).
Could we get this PR merged at first, in order not to block other work relying on this (such as: binary operation support) ? Then, we address unsolved issues in other PRs. Perhaps we need to create an issue to summarize the progress of all these items?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with breaking things up, but I really want to see something that is self-contained and tested.
If we drop support for decimal in literal values then we don't need fromScalar
to work yet. But any code that we do put in I want to see at least minimal automated tests that it is working. I trust that you have run tests manually, but there is a lot that is changing right now with nested types etc, and I want to be sure that we have tests to verify that someone else didn't break something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am afraid that I don't get what the "minimal automated tests" should be, besides existed unit tests for decimal. Does it refer to tests with mixed data types, such as GpuSinglePartitioningSuite
? If it does, I am working on them. I've just put a PR to enable creating decimal column with Table.TestBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just added tests for binaryOps and project.
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
Signed-off-by: sperlingxx <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
row: SpecializedGetters, | ||
column: Int, | ||
builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { | ||
builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to see us use toUnscaledLong
to avoid any extra object creation, it is also what we are going to ultimately store the data as. This might mean that we need different classes for DECIMAL64 and DECIMAL32 too.
if (row.isNullAt(column)) { | ||
builder.appendNull() | ||
} else { | ||
new NotNullDecimalConverter(precision, scale).append(row, column, builder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is on the data path. I would like us to avoid object creation if at all possible to speed up the data path. Please make a static method instead, or use inheritance, which I think is less ideal.
builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { | ||
builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal) | ||
// Infer the storage type via precision, because we can't access DType of builder. | ||
if (precision > ai.rapids.cudf.DType.DECIMAL32_MAX_PRECISION) 8 else 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too it would be great to avoid conditionals in the data path. I would prefer it if we either passed in the size or had separate implementations for DECIMAL32 and DECIMAL64
// We assume that all common plans are decimal supportable by default, considering | ||
// whether decimal allowable is mainly determined in expression-level. | ||
override def isSupportedType(t: DataType): Boolean = | ||
GpuOverrides.isSupportedType(t, allowDecimal = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have tests that verify that we can support decimal for all top level spark operations? Have we tested join, expand, generate, filter, project, union, window, sort, or hash agregate? What about all of the arrow python UDF code where we go to/from arrow?
I think it would be much better if we split this big PR up into smaller pieces and put each piece in separately with corresponding tests to show that it works, and we only add decimal to the allow list for those things that we know it works for because we have tested it. If you want me to help with this I am happy to do it. I am already in the middle of doing it for Lists I am going to add in structs, maps, binary, null type and finally calendar interval based off of how much time I have and priorities. Some of these we will only be able to do very basic things with, but that should be enough to unblock others for using them for more complicated processing.
@@ -144,6 +144,8 @@ object GpuDivModLike { | |||
case DType.INT64 => Scalar.fromLong(0L) | |||
case DType.FLOAT32 => Scalar.fromFloat(0f) | |||
case DType.FLOAT64 => Scalar.fromDouble(0) | |||
case dt if dt.isDecimalType && dt.isBackedByInt => Scalar.fromDecimal(0, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is going to work in all cases. I think this might be another place were we have some tech debt to pay off and need to pass in a DataType instead of a DType.
@@ -144,6 +144,8 @@ object GpuDivModLike { | |||
case DType.INT64 => Scalar.fromLong(0L) | |||
case DType.FLOAT32 => Scalar.fromFloat(0f) | |||
case DType.FLOAT64 => Scalar.fromDouble(0) | |||
case dt if dt.isDecimalType && dt.isBackedByInt => Scalar.fromDecimal(0, 0) | |||
case dt if dt.isDecimalType && dt.isBackedByLong => Scalar.fromDecimal(0, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the div/mod to work properly we also need to update isScalarZero
, or we are going to miss a divide by zero case in decimal.
- GpuEqualNullSafe | ||
- GpuDivModLike (GpuDivide/GpuIntegralDivide/GpuPmod/GpuRemainder) | ||
*/ | ||
class DecimalBinaryOpSuite extends GpuExpressionTestSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally would prefer to see us update the python tests. They tend to be more complete. These tests are not covering the case where you need to take a literal and turn it into a column because rapidsai/cudf#6723 has not been merged in yet.
This is another reason to break down this into smaller pieces. I cannot let us put in supporting for binary ops without the above PR being merged in and tested. If we break this apart we can get some of the functionality in, and then we can look at what we need to do to get the rest of it in.
…p ci] [bot] (NVIDIA#1063) * Update submodule cudf to 50718e673ff53b18706cf66c6e02cda8e30681fe Signed-off-by: spark-rapids automation <[email protected]> * Update submodule cudf to ecadda5558ce29c28d487ce67fcd2fa7882d5db1 Signed-off-by: spark-rapids automation <[email protected]> --------- Signed-off-by: spark-rapids automation <[email protected]>
This PR is about to enable
DecimalType
in spark GPU runtime.Due to a lot of unready dependencies in cuDF, the primary intention of this PR is to import
DecimalType
as a supported type under GPU runtime. Along with the support on supported type system, we enabled the creation ofGpuColumnVector
andGpuScalar
withDecimalType
.In terms of tests, we test creating decimal as
GpuScalar
andGpuColumnVector
, with some expressions on decimal-type column vectors (such as: GpuIsNull).Here is the summary of progress:
GpuProject
with decimal columns.