From ad9a072e9c7d3ca7a8c0f98e86c4c46a03cb26c9 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 6 Oct 2021 16:01:14 -0700 Subject: [PATCH 1/9] Call the appropriate method for row<->col conversion Signed-off-by: Raza Jafri --- .../src/main/python/row_conversion_test.py | 17 ++++++++++- .../UnsafeRowToColumnarBatchIterator.java | 16 +++++++++-- .../spark/rapids/GpuColumnarToRowExec.scala | 28 +++++++++++++------ .../spark/rapids/GpuRowToColumnarExec.scala | 22 +++++++++------ 4 files changed, 64 insertions(+), 19 deletions(-) diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index a0525fc92a1..13b80437c8a 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -43,8 +43,23 @@ def test_row_conversions(): def test_row_conversions_fixed_width(): gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen], - ["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen], + ["e", float_gen], ["f", double_gen], ["h", boolean_gen], ["i", timestamp_gen], ["j", date_gen], ["k", decimal_gen_64bit], ["l", decimal_gen_scale_precision]] assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) + +def test_row_conversions_fixed_width_200(): + gens = [["a{}".format(i), byte_gen] for i in range(10)] + \ + [["b{}".format(i), short_gen] for i in range(10)] + \ + [["c{}".format(i), int_gen] for i in range(10)] + \ + [["d{}".format(i), long_gen] for i in range(10)] + \ + [["e{}".format(i), float_gen] for i in range(10)] + \ + [["f{}".format(i), double_gen] for i in range(10)] + \ + [["h{}".format(i), boolean_gen] for i in range(10)] + \ + [["i{}".format(i), timestamp_gen] for i in range(10)] + \ + [["j{}".format(i), date_gen] for i in range(10)] + \ + [["k{}".format(i), decimal_gen_64bit] for i in range(10)] + \ + [["l{}".format(i), decimal_gen_scale_precision] for i in range(10)] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: gen_df(spark, gens).selectExpr("*", "a0 as a_again")) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index daa146f1486..a08a76d20ee 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -55,6 +55,7 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator input, @@ -65,7 +66,8 @@ protected UnsafeRowToColumnarBatchIterator( GpuMetric opTime, GpuMetric numInputRows, GpuMetric numOutputRows, - GpuMetric numOutputBatches) { + GpuMetric numOutputBatches, + Boolean isOptimizedForFixedWidth) { this.input = input; int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema); numRowsEstimate = (int)Math.max(1, @@ -84,6 +86,7 @@ protected UnsafeRowToColumnarBatchIterator( this.numInputRows = numInputRows; this.numOutputRows = numOutputRows; this.numOutputBatches = numOutputBatches; + this.isOptimizedForFixedWidth = isOptimizedForFixedWidth; } @Override @@ -163,7 +166,16 @@ public ColumnarBatch next() { } try (NvtxRange ignored = buildRange; ColumnVector cv = devColumn; - Table tab = Table.convertFromRows(cv, rapidsTypes)) { + // We are branching over the output.length to know which kernel to call. + // If output.length < 100 we call the fixed-width optimized version, otherwise the + // generic one + Table tab = isOptimizedForFixedWidth ? + // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means + // at most 184 double/long values. We are branching over the output.length to know + // which kernel to call. If output.length < 100 we call the fixed-width optimized + // version, otherwise the generic one + Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : + Table.convertFromRows(cv, rapidsTypes)) { return GpuColumnVector.from(tab, outputTypes); } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index bc922d48003..d31a977855f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -42,7 +42,8 @@ class AcceleratedColumnarToRowIterator( numInputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric, - streamTime: GpuMetric) extends Iterator[InternalRow] with Arm with Serializable { + streamTime: GpuMetric, + isFixedWidthOptimized: Boolean) extends Iterator[InternalRow] with Arm with Serializable { @transient private var pendingCvs: Queue[HostColumnVector] = Queue.empty // GPU batches read in must be closed by the receiver (us) @transient private var currentCv: Option[HostColumnVector] = None @@ -108,7 +109,17 @@ class AcceleratedColumnarToRowIterator( if (cb.numRows() > 0) { withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ => withResource(rearrangeRows(cb)) { table => - withResource(table.convertToRows()) { rowsCvList => + // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at + // most 184 double/long values. Spark by default limits codegen to 100 fields + // "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that + // until we have tested it more. We branching over the output.length to know which kernel + // to call. If output.length < 100 we call the fixed-width optimized version, otherwise + // the generic one + withResource(if (isFixedWidthOptimized) { + table.convertToRowsFixedWidthOptimized() + } else { + table.convertToRows() + }) { rowsCvList => rowsCvList.foreach { rowsCv => pendingCvs += rowsCv.copyToHost() } @@ -334,15 +345,16 @@ object GpuColumnarToRowExecParent { if (CudfRowTransitions.areAllSupported(output) && // For a small number of columns it is still best to do it the original way output.length > 4 && - // The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long - // values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields". - // So, we are going to be cautious and start with that until we have tested it more. - output.length < 100) { + // We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data. + // This number includes the 1-bit validity per column, but doesn't include padding. + // We are being conservative by only allowing 100M columns until we feel the need to + // increase this number + output.length <= 100000000) { (batches: Iterator[ColumnarBatch]) => { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(output, output) - new AcceleratedColumnarToRowIterator(output, - batches, numInputBatches, numOutputRows, opTime, streamTime).map(toUnsafe) + new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows, + opTime, streamTime, output.length < 100).map(toUnsafe) } } else { (batches: Iterator[ColumnarBatch]) => { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index a542a9d5d14..927495d25df 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -680,7 +680,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { opTime: GpuMetric, numInputRows: GpuMetric, numOutputRows: GpuMetric, - numOutputBatches: GpuMetric): UnsafeRowToColumnarBatchIterator = { + numOutputBatches: GpuMetric, + isOptimizedForFixedWidth: Boolean): UnsafeRowToColumnarBatchIterator = { val ctx = new CodegenContext ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName) @@ -692,6 +693,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName) ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName) ctx.addReferenceObj("numOutputBatches", numOutputBatches, classOf[GpuMetric].getName) + ctx.addReferenceObj("isOptimizedForFixedWidth", + isOptimizedForFixedWidth, classOf[Boolean].getName) val rowBaseObj = ctx.freshName("rowBaseObj") val rowBaseOffset = ctx.freshName("rowBaseOffset") @@ -760,8 +763,9 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | (com.nvidia.spark.rapids.GpuMetric)references[5], | (com.nvidia.spark.rapids.GpuMetric)references[6], | (com.nvidia.spark.rapids.GpuMetric)references[7], - | (com.nvidia.spark.rapids.GpuMetric)references[8]); - | ${ctx.initMutableStates()} + | (com.nvidia.spark.rapids.GpuMetric)references[8], + | (java.lang.Boolean)references[9]); + | ${ctx.initMutableStates()} | } | | // Avoid virtual function calls by copying the data in a batch at a time instead @@ -886,16 +890,18 @@ case class GpuRowToColumnarExec(child: SparkPlan, // cache in a local to avoid serializing the plan val localSchema = schema - // The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long - // values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields". - // So, we are going to be cautious and start with that until we have tested it more. - if ((1 until 100).contains(output.length) && + // We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data. + // This number includes the 1-bit validity per column, but doesn't include padding. + // We are being conservative by only allowing 100M columns until we feel the need to + // increase this number. Spark by default limits codegen to 100 fields + // "spark.sql.codegen.maxFields". + if (output.length < 100000000 && CudfRowTransitions.areAllSupported(output)) { val localOutput = output rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator( rowIter.asInstanceOf[Iterator[UnsafeRow]], localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime, - numInputRows, numOutputRows, numOutputBatches)) + numInputRows, numOutputRows, numOutputBatches, output.length < 100)) } else { val converters = new GpuRowToColumnConverter(localSchema) rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter, From 7c0b88a7f16c24e2ecca46e8267ff5a67172eb33 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 15 Oct 2021 16:05:37 -0700 Subject: [PATCH 2/9] copy offsets data per row Signed-off-by: Raza Jafri --- .../scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index 927495d25df..11651bae0e8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -778,8 +778,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | | int dataOffset = 0; | int currentRow = 0; + | int offsetIndex = 0; | - | offsetsBuffer.setInt(0, dataOffset); | // If we are here we have at least one row to process, so don't bother checking yet | boolean done = false; | while (!done) { @@ -791,6 +791,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | row = (UnsafeRow)input.next(); | } | int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress); + | offsetsBuffer.setInt(offsetIndex, dataOffset); + | offsetIndex += 4; | if (numBytesUsedByRow < 0) { | pending = row; | done = true; From 7b310e91d4e1e10c6dfcb0fbd21046d5f36b2e54 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 20 Oct 2021 15:31:18 -0700 Subject: [PATCH 3/9] debug Signed-off-by: Raza Jafri --- .../rapids/UnsafeRowToColumnarBatchIterator.java | 4 ++-- .../nvidia/spark/rapids/GpuColumnarToRowExec.scala | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index a08a76d20ee..755cf92493a 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -169,12 +169,12 @@ public ColumnarBatch next() { // We are branching over the output.length to know which kernel to call. // If output.length < 100 we call the fixed-width optimized version, otherwise the // generic one - Table tab = isOptimizedForFixedWidth ? + Table tab = //isOptimizedForFixedWidth ? // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means // at most 184 double/long values. We are branching over the output.length to know // which kernel to call. If output.length < 100 we call the fixed-width optimized // version, otherwise the generic one - Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : +// Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : Table.convertFromRows(cv, rapidsTypes)) { return GpuColumnVector.from(tab, outputTypes); } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index d31a977855f..304b3eed576 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -115,11 +115,10 @@ class AcceleratedColumnarToRowIterator( // until we have tested it more. We branching over the output.length to know which kernel // to call. If output.length < 100 we call the fixed-width optimized version, otherwise // the generic one - withResource(if (isFixedWidthOptimized) { - table.convertToRowsFixedWidthOptimized() - } else { - table.convertToRows() - }) { rowsCvList => + withResource( // if (isFixedWidthOptimized) { +// table.convertToRowsFixedWidthOptimized() +// } else { + table.convertToRows()) { rowsCvList => rowsCvList.foreach { rowsCv => pendingCvs += rowsCv.copyToHost() } @@ -350,7 +349,7 @@ object GpuColumnarToRowExecParent { // We are being conservative by only allowing 100M columns until we feel the need to // increase this number output.length <= 100000000) { - (batches: Iterator[ColumnarBatch]) => { + batches: Iterator[ColumnarBatch] => { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(output, output) new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows, From 8eca9c8ec5a6df9b092fe4b2768760de8a8ecfc1 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 20 Oct 2021 20:34:19 -0700 Subject: [PATCH 4/9] Revert "debug" This reverts commit 7b310e91d4e1e10c6dfcb0fbd21046d5f36b2e54. Signed-off-by: Raza Jafri --- .../rapids/UnsafeRowToColumnarBatchIterator.java | 4 ++-- .../nvidia/spark/rapids/GpuColumnarToRowExec.scala | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index 755cf92493a..a08a76d20ee 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -169,12 +169,12 @@ public ColumnarBatch next() { // We are branching over the output.length to know which kernel to call. // If output.length < 100 we call the fixed-width optimized version, otherwise the // generic one - Table tab = //isOptimizedForFixedWidth ? + Table tab = isOptimizedForFixedWidth ? // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means // at most 184 double/long values. We are branching over the output.length to know // which kernel to call. If output.length < 100 we call the fixed-width optimized // version, otherwise the generic one -// Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : + Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : Table.convertFromRows(cv, rapidsTypes)) { return GpuColumnVector.from(tab, outputTypes); } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 304b3eed576..d31a977855f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -115,10 +115,11 @@ class AcceleratedColumnarToRowIterator( // until we have tested it more. We branching over the output.length to know which kernel // to call. If output.length < 100 we call the fixed-width optimized version, otherwise // the generic one - withResource( // if (isFixedWidthOptimized) { -// table.convertToRowsFixedWidthOptimized() -// } else { - table.convertToRows()) { rowsCvList => + withResource(if (isFixedWidthOptimized) { + table.convertToRowsFixedWidthOptimized() + } else { + table.convertToRows() + }) { rowsCvList => rowsCvList.foreach { rowsCv => pendingCvs += rowsCv.copyToHost() } @@ -349,7 +350,7 @@ object GpuColumnarToRowExecParent { // We are being conservative by only allowing 100M columns until we feel the need to // increase this number output.length <= 100000000) { - batches: Iterator[ColumnarBatch] => { + (batches: Iterator[ColumnarBatch]) => { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(output, output) new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows, From c3dc5bd95aa65cc1f1a3de190b1e8546767c1b5c Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 20 Oct 2021 20:35:06 -0700 Subject: [PATCH 5/9] remove println Signed-off-by: Raza Jafri --- .../src/main/python/row_conversion_test.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index 13b80437c8a..045dbdbf09a 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -49,17 +49,21 @@ def test_row_conversions_fixed_width(): assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) -def test_row_conversions_fixed_width_200(): - gens = [["a{}".format(i), byte_gen] for i in range(10)] + \ - [["b{}".format(i), short_gen] for i in range(10)] + \ - [["c{}".format(i), int_gen] for i in range(10)] + \ - [["d{}".format(i), long_gen] for i in range(10)] + \ - [["e{}".format(i), float_gen] for i in range(10)] + \ - [["f{}".format(i), double_gen] for i in range(10)] + \ - [["h{}".format(i), boolean_gen] for i in range(10)] + \ - [["i{}".format(i), timestamp_gen] for i in range(10)] + \ - [["j{}".format(i), date_gen] for i in range(10)] + \ - [["k{}".format(i), decimal_gen_64bit] for i in range(10)] + \ - [["l{}".format(i), decimal_gen_scale_precision] for i in range(10)] - assert_gpu_and_cpu_are_equal_collect( - lambda spark: gen_df(spark, gens).selectExpr("*", "a0 as a_again")) +def test_row_conversions_fixe_width_wide(): + gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \ + [["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \ + [["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \ + [["d{}".format(i), LongGen(nullable=True)] for i in range(10)] + \ + [["e{}".format(i), FloatGen(nullable=True)] for i in range(10)] + \ + [["f{}".format(i), DoubleGen(nullable=True)] for i in range(10)] + \ + [["h{}".format(i), BooleanGen(nullable=True)] for i in range(10)] + \ + [["i{}".format(i), TimestampGen(nullable=True)] for i in range(10)] + \ + [["j{}".format(i), DateGen(nullable=True)] for i in range(10)] + \ + [["k{}".format(i), DecimalGen(precision=12, scale=2, nullable=True)] for i in range(10)] + \ + [["l{}".format(i), DecimalGen(precision=7, scale=3, nullable=True)] for i in range(10)] + def do_it(spark): + df=gen_df(spark, gens, length=1).selectExpr("*", "a0 as a_again") + debug_df(df) + return df + assert_gpu_and_cpu_are_equal_collect(do_it) + From fbb0a8feaf8c9a618c3ea971f1c458b1095d66e2 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 21 Oct 2021 09:13:22 -0700 Subject: [PATCH 6/9] fixed typo Signed-off-by: Raza Jafri --- integration_tests/src/main/python/row_conversion_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index 045dbdbf09a..fb277403cd3 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -49,7 +49,7 @@ def test_row_conversions_fixed_width(): assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again")) -def test_row_conversions_fixe_width_wide(): +def test_row_conversions_fixed_width_wide(): gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \ [["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \ [["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \ From 390ec7d3e24ffe39489bf355e06f91d3ead79831 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 10 Jan 2022 20:37:56 -0800 Subject: [PATCH 7/9] Insert a check for rows < 1 Signed-off-by: Raza Jafri --- .../scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index bacbde9ad78..d60af5bf4b2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -903,7 +903,7 @@ case class GpuRowToColumnarExec(child: SparkPlan, // We are being conservative by only allowing 100M columns until we feel the need to // increase this number. Spark by default limits codegen to 100 fields // "spark.sql.codegen.maxFields". - if (output.length < 100000000 && + if ((1 until 100000000).contains(output.length) && CudfRowTransitions.areAllSupported(output)) { val localOutput = output rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator( From e88170bcdc829a5fa895565d833120cb9f4ac9bb Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 11 Jan 2022 18:14:20 -0800 Subject: [PATCH 8/9] update copyrights Signed-off-by: Raza Jafri --- integration_tests/src/main/python/row_conversion_test.py | 2 +- .../nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java | 2 +- .../scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/row_conversion_test.py b/integration_tests/src/main/python/row_conversion_test.py index 3f28ce0838c..2240aa27928 100644 --- a/integration_tests/src/main/python/row_conversion_test.py +++ b/integration_tests/src/main/python/row_conversion_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index a08a76d20ee..9ab8bfa2776 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index c466a8347ae..890295d9ca8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From e3726fc5f6c2ebcc0007bc400a6d03e8233362e0 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 12 Jan 2022 11:24:44 -0800 Subject: [PATCH 9/9] removed the extra parameter and calculate the branch variable right before the branching Signed-off-by: Raza Jafri --- .../rapids/UnsafeRowToColumnarBatchIterator.java | 16 +++++----------- .../spark/rapids/GpuColumnarToRowExec.scala | 13 ++++++------- .../spark/rapids/GpuRowToColumnarExec.scala | 10 +++------- 3 files changed, 14 insertions(+), 25 deletions(-) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java index 9ab8bfa2776..1c9a8bed0ec 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/UnsafeRowToColumnarBatchIterator.java @@ -55,7 +55,6 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator input, @@ -66,8 +65,7 @@ protected UnsafeRowToColumnarBatchIterator( GpuMetric opTime, GpuMetric numInputRows, GpuMetric numOutputRows, - GpuMetric numOutputBatches, - Boolean isOptimizedForFixedWidth) { + GpuMetric numOutputBatches) { this.input = input; int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema); numRowsEstimate = (int)Math.max(1, @@ -86,7 +84,6 @@ protected UnsafeRowToColumnarBatchIterator( this.numInputRows = numInputRows; this.numOutputRows = numOutputRows; this.numOutputBatches = numOutputBatches; - this.isOptimizedForFixedWidth = isOptimizedForFixedWidth; } @Override @@ -166,14 +163,11 @@ public ColumnarBatch next() { } try (NvtxRange ignored = buildRange; ColumnVector cv = devColumn; - // We are branching over the output.length to know which kernel to call. - // If output.length < 100 we call the fixed-width optimized version, otherwise the - // generic one - Table tab = isOptimizedForFixedWidth ? + Table tab = rapidsTypes.length < 100 ? // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means - // at most 184 double/long values. We are branching over the output.length to know - // which kernel to call. If output.length < 100 we call the fixed-width optimized - // version, otherwise the generic one + // at most 184 double/long values. We are branching over the size of the output to + // know which kernel to call. If rapidsTypes.length < 100 we call the fixed-width + // optimized version, otherwise the generic one Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) : Table.convertFromRows(cv, rapidsTypes)) { return GpuColumnVector.from(tab, outputTypes); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala index 890295d9ca8..7ffe9c3d14d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarToRowExec.scala @@ -42,8 +42,7 @@ class AcceleratedColumnarToRowIterator( numInputBatches: GpuMetric, numOutputRows: GpuMetric, opTime: GpuMetric, - streamTime: GpuMetric, - isFixedWidthOptimized: Boolean) extends Iterator[InternalRow] with Arm with Serializable { + streamTime: GpuMetric) extends Iterator[InternalRow] with Arm with Serializable { @transient private var pendingCvs: Queue[HostColumnVector] = Queue.empty // GPU batches read in must be closed by the receiver (us) @transient private var currentCv: Option[HostColumnVector] = None @@ -112,10 +111,10 @@ class AcceleratedColumnarToRowIterator( // The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at // most 184 double/long values. Spark by default limits codegen to 100 fields // "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that - // until we have tested it more. We branching over the output.length to know which kernel - // to call. If output.length < 100 we call the fixed-width optimized version, otherwise - // the generic one - withResource(if (isFixedWidthOptimized) { + // until we have tested it more. We branching over the size of the output to know which + // kernel to call. If schema.length < 100 we call the fixed-width optimized version, + // otherwise the generic one + withResource(if (schema.length < 100) { table.convertToRowsFixedWidthOptimized() } else { table.convertToRows() @@ -362,7 +361,7 @@ object GpuColumnarToRowExecParent { // UnsafeProjection is not serializable so do it on the executor side val toUnsafe = UnsafeProjection.create(output, output) new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows, - opTime, streamTime, output.length < 100).map(toUnsafe) + opTime, streamTime).map(toUnsafe) } } else { (batches: Iterator[ColumnarBatch]) => { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index d60af5bf4b2..9d971b1d18d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -685,8 +685,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { opTime: GpuMetric, numInputRows: GpuMetric, numOutputRows: GpuMetric, - numOutputBatches: GpuMetric, - isOptimizedForFixedWidth: Boolean): UnsafeRowToColumnarBatchIterator = { + numOutputBatches: GpuMetric): UnsafeRowToColumnarBatchIterator = { val ctx = new CodegenContext ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName) @@ -698,8 +697,6 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName) ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName) ctx.addReferenceObj("numOutputBatches", numOutputBatches, classOf[GpuMetric].getName) - ctx.addReferenceObj("isOptimizedForFixedWidth", - isOptimizedForFixedWidth, classOf[Boolean].getName) val rowBaseObj = ctx.freshName("rowBaseObj") val rowBaseOffset = ctx.freshName("rowBaseOffset") @@ -769,8 +766,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging { | (com.nvidia.spark.rapids.GpuMetric)references[5], | (com.nvidia.spark.rapids.GpuMetric)references[6], | (com.nvidia.spark.rapids.GpuMetric)references[7], - | (com.nvidia.spark.rapids.GpuMetric)references[8], - | (java.lang.Boolean)references[9]); + | (com.nvidia.spark.rapids.GpuMetric)references[8]); | ${ctx.initMutableStates()} | } | @@ -909,7 +905,7 @@ case class GpuRowToColumnarExec(child: SparkPlan, rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator( rowIter.asInstanceOf[Iterator[UnsafeRow]], localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime, - numInputRows, numOutputRows, numOutputBatches, output.length < 100)) + numInputRows, numOutputRows, numOutputBatches)) } else { val converters = new GpuRowToColumnConverter(localSchema) rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,