From f22795dc142020dfbee4c392f1a1a022745a9293 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Mon, 7 Mar 2022 18:13:05 +0800 Subject: [PATCH] Speed up copying decimal column from parquet buffer to GPU buffer (#4872) Signed-off-by: sperlingxx Closes #4784 Adds specialized support for the columnar copy of WritableColumnVector on decimal type. The new implementation copies the unscaled values directly to avoid the round trip of Decimal encoding/decoding --- .../spark/rapids/ColumnarCopyHelper.java | 26 ++++++++++++++++++- .../spark/rapids/HostColumnarToGpu.scala | 24 +++++++++++++---- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnarCopyHelper.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnarCopyHelper.java index 028691d28c6..f4d4342d487 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnarCopyHelper.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/ColumnarCopyHelper.java @@ -18,6 +18,7 @@ import ai.rapids.cudf.HostColumnVector.ColumnBuilder; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; /** @@ -162,7 +163,30 @@ public static void stringCopy(ColumnVector cv, ColumnBuilder b, int rows) { } } - // TODO: https://github.com/NVIDIA/spark-rapids/issues/4784 + public static void decimal32Copy(WritableColumnVector cv, ColumnBuilder b, int rows) { + intCopy(cv, b, rows); + } + + public static void decimal64Copy(WritableColumnVector cv, ColumnBuilder b, int rows) { + longCopy(cv, b, rows); + } + + public static void decimal128Copy(WritableColumnVector cv, ColumnBuilder b, int rows) { + if (!cv.hasNull()) { + for (int i = 0; i < rows; i++) { + b.appendDecimal128(cv.getBinary(i)); + } + return; + } + for (int i = 0; i < rows; i++) { + if (cv.isNullAt(i)) { + b.appendNull(); + } else { + b.appendDecimal128(cv.getBinary(i)); + } + } + } + public static void decimal32Copy(ColumnVector cv, ColumnBuilder b, int rows, int precision, int scale) { if (!cv.hasNull()) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index e926e844903..8a59926d40e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.vectorized.WritableColumnVector import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.sql.vectorized.rapids.AccessibleArrowColumnVector @@ -128,12 +129,25 @@ object HostColumnarToGpu extends Logging { ColumnarCopyHelper.doubleCopy(cv, b, rows) case StringType => ColumnarCopyHelper.stringCopy(cv, b, rows) - case dt: DecimalType if DecimalType.is32BitDecimalType(dt) => - ColumnarCopyHelper.decimal32Copy(cv, b, rows, dt.precision, dt.scale) - case dt: DecimalType if DecimalType.is64BitDecimalType(dt) => - ColumnarCopyHelper.decimal64Copy(cv, b, rows, dt.precision, dt.scale) case dt: DecimalType => - ColumnarCopyHelper.decimal128Copy(cv, b, rows, dt.precision, dt.scale) + cv match { + case wcv: WritableColumnVector => + if (DecimalType.is32BitDecimalType(dt)) { + ColumnarCopyHelper.decimal32Copy(wcv, b, rows) + } else if (DecimalType.is64BitDecimalType(dt)) { + ColumnarCopyHelper.decimal64Copy(wcv, b, rows) + } else { + ColumnarCopyHelper.decimal128Copy(wcv, b, rows) + } + case _ => + if (DecimalType.is32BitDecimalType(dt)) { + ColumnarCopyHelper.decimal32Copy(cv, b, rows, dt.precision, dt.scale) + } else if (DecimalType.is64BitDecimalType(dt)) { + ColumnarCopyHelper.decimal64Copy(cv, b, rows, dt.precision, dt.scale) + } else { + ColumnarCopyHelper.decimal128Copy(cv, b, rows, dt.precision, dt.scale) + } + } case t => throw new UnsupportedOperationException( s"Converting to GPU for $t is not currently supported")