diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index d3b03e84cee..2240aa27928 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -45,12 +45,30 @@ def test_row_conversions(): def test_row_conversions_fixed_width(): gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen], - ["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen], + ["e", float_gen], ["f", double_gen], ["h", boolean_gen], ["i", timestamp_gen], ["j", date_gen], ["k", decimal_gen_64bit], ["l", decimal_gen_scale_precision]] assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) +def test_row_conversions_fixed_width_wide(): + gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \ + [["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \ + [["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \ + [["d{}".format(i), LongGen(nullable=True)] for i in range(10)] + \ + [["e{}".format(i), FloatGen(nullable=True)] for i in range(10)] + \ + [["f{}".format(i), DoubleGen(nullable=True)] for i in range(10)] + \ + [["h{}".format(i), BooleanGen(nullable=True)] for i in range(10)] + \ + [["i{}".format(i), TimestampGen(nullable=True)] for i in range(10)] + \ + [["j{}".format(i), DateGen(nullable=True)] for i in range(10)] + \ + [["k{}".format(i), DecimalGen(precision=12, scale=2, nullable=True)] for i in range(10)] + \ + [["l{}".format(i), DecimalGen(precision=7, scale=3, nullable=True)] for i in range(10)] + def do_it(spark): + df=gen_df(spark, gens, length=1).selectExpr("*", "a0 as a_again") + debug_df(df) + return df + assert_gpu_and_cpu_are_equal_collect(do_it) + # Test handling of transitions when the data is already columnar on the host # Note that Apache Spark will automatically convert a load of nested types to rows, so # the nested types will not test a host columnar transition in that case. diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index daa146f1486..1c9a8bed0ec 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -163,7 +163,13 @@ public ColumnarBatch next() { } try (NvtxRange ignored = buildRange; ColumnVector cv = devColumn; - Table tab = Table.convertFromRows(cv, rapidsTypes)) { + Table tab = rapidsTypes.length < 100 ? + // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means + // at most 184 double/long values. We are branching over the size of the output to + // know which kernel to call. If rapidsTypes.length < 100 we call the fixed-width + // optimized version, otherwise the generic one + Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : + Table.convertFromRows(cv, rapidsTypes)) { return GpuColumnVector.from(tab, outputTypes); } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 0582ccfa6bf..7ffe9c3d14d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,7 +108,17 @@ class AcceleratedColumnarToRowIterator( if (cb.numRows() > 0) { withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ => withResource(rearrangeRows(cb)) { table => - withResource(table.convertToRows()) { rowsCvList => + // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at + // most 184 double/long values. Spark by default limits codegen to 100 fields + // "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that + // until we have tested it more. We branching over the size of the output to know which + // kernel to call. If schema.length < 100 we call the fixed-width optimized version, + // otherwise the generic one + withResource(if (schema.length < 100) { + table.convertToRowsFixedWidthOptimized() + } else { + table.convertToRows() + }) { rowsCvList => rowsCvList.foreach { rowsCv => pendingCvs += rowsCv.copyToHost() } @@ -342,15 +352,16 @@ object GpuColumnarToRowExecParent { if (CudfRowTransitions.areAllSupported(output) && // For a small number of columns it is still best to do it the original way output.length > 4 && - // The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long - // values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields". - // So, we are going to be cautious and start with that until we have tested it more. - output.length < 100) { + // We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data. + // This number includes the 1-bit validity per column, but doesn't include padding. + // We are being conservative by only allowing 100M columns until we feel the need to + // increase this number + output.length <= 100000000) { (batches: Iterator[ColumnarBatch]) => { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(output, output) - new AcceleratedColumnarToRowIterator(output, - batches, numInputBatches, numOutputRows, opTime, streamTime).map(toUnsafe) + new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows, + opTime, streamTime).map(toUnsafe) } } else { (batches: Iterator[ColumnarBatch]) => { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index deb018c666a..9d971b1d18d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -767,7 +767,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | (com.nvidia.spark.rapids.GpuMetric)references[6], | (com.nvidia.spark.rapids.GpuMetric)references[7], | (com.nvidia.spark.rapids.GpuMetric)references[8]); - | ${ctx.initMutableStates()} + | ${ctx.initMutableStates()} | } | | // Avoid virtual function calls by copying the data in a batch at a time instead @@ -780,8 +780,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | | int dataOffset = 0; | int currentRow = 0; + | int offsetIndex = 0; | - | offsetsBuffer.setInt(0, dataOffset); | // If we are here we have at least one row to process, so don't bother checking yet | boolean done = false; | while (!done) { @@ -793,6 +793,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | row = (UnsafeRow)input.next(); | } | int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress); + | offsetsBuffer.setInt(offsetIndex, dataOffset); + | offsetIndex += 4; | if (numBytesUsedByRow < 0) { | pending = row; | done = true; @@ -892,10 +894,12 @@ case class GpuRowToColumnarExec(child: SparkPlan, // cache in a local to avoid serializing the plan val localSchema = schema - // The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long - // values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields". - // So, we are going to be cautious and start with that until we have tested it more. - if ((1 until 100).contains(output.length) && + // We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data. + // This number includes the 1-bit validity per column, but doesn't include padding. + // We are being conservative by only allowing 100M columns until we feel the need to + // increase this number. Spark by default limits codegen to 100 fields + // "spark.sql.codegen.maxFields". + if ((1 until 100000000).contains(output.length) && CudfRowTransitions.areAllSupported(output)) { val localOutput = output rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(