Skip to content

Commit

Permalink
Call the right method to convert table from row major <=> col major […
Browse files Browse the repository at this point in the history
…databricks] (#3762)

* Call the appropriate method for row<->col conversion

Signed-off-by: Raza Jafri <[email protected]>

* copy offsets data per row

Signed-off-by: Raza Jafri <[email protected]>

* debug

Signed-off-by: Raza Jafri <[email protected]>

* Revert "debug"

This reverts commit 7b310e9.

Signed-off-by: Raza Jafri <[email protected]>

* remove println

Signed-off-by: Raza Jafri <[email protected]>

* fixed typo

Signed-off-by: Raza Jafri <[email protected]>

* Insert a check for rows < 1

Signed-off-by: Raza Jafri <[email protected]>

* update copyrights

Signed-off-by: Raza Jafri <[email protected]>

* removed the extra parameter and calculate the branch variable right before the branching

Signed-off-by: Raza Jafri <[email protected]>

Co-authored-by: Raza Jafri <[email protected]>
  • Loading branch information
razajafri and razajafri authored Jan 13, 2022
1 parent 0d89e41 commit f1be1a5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 19 deletions.
22 changes: 20 additions & 2 deletions integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,12 +45,30 @@ def test_row_conversions():

def test_row_conversions_fixed_width():
gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["e", float_gen], ["f", double_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", decimal_gen_64bit],
["l", decimal_gen_scale_precision]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))

def test_row_conversions_fixed_width_wide():
gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \
[["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \
[["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \
[["d{}".format(i), LongGen(nullable=True)] for i in range(10)] + \
[["e{}".format(i), FloatGen(nullable=True)] for i in range(10)] + \
[["f{}".format(i), DoubleGen(nullable=True)] for i in range(10)] + \
[["h{}".format(i), BooleanGen(nullable=True)] for i in range(10)] + \
[["i{}".format(i), TimestampGen(nullable=True)] for i in range(10)] + \
[["j{}".format(i), DateGen(nullable=True)] for i in range(10)] + \
[["k{}".format(i), DecimalGen(precision=12, scale=2, nullable=True)] for i in range(10)] + \
[["l{}".format(i), DecimalGen(precision=7, scale=3, nullable=True)] for i in range(10)]
def do_it(spark):
df=gen_df(spark, gens, length=1).selectExpr("*", "a0 as a_again")
debug_df(df)
return df
assert_gpu_and_cpu_are_equal_collect(do_it)

# Test handling of transitions when the data is already columnar on the host
# Note that Apache Spark will automatically convert a load of nested types to rows, so
# the nested types will not test a host columnar transition in that case.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,13 @@ public ColumnarBatch next() {
}
try (NvtxRange ignored = buildRange;
ColumnVector cv = devColumn;
Table tab = Table.convertFromRows(cv, rapidsTypes)) {
Table tab = rapidsTypes.length < 100 ?
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means
// at most 184 double/long values. We are branching over the size of the output to
// know which kernel to call. If rapidsTypes.length < 100 we call the fixed-width
// optimized version, otherwise the generic one
Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) :
Table.convertFromRows(cv, rapidsTypes)) {
return GpuColumnVector.from(tab, outputTypes);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,7 +108,17 @@ class AcceleratedColumnarToRowIterator(
if (cb.numRows() > 0) {
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
withResource(rearrangeRows(cb)) { table =>
withResource(table.convertToRows()) { rowsCvList =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at
// most 184 double/long values. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that
// until we have tested it more. We branching over the size of the output to know which
// kernel to call. If schema.length < 100 we call the fixed-width optimized version,
// otherwise the generic one
withResource(if (schema.length < 100) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}) { rowsCvList =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
Expand Down Expand Up @@ -342,15 +352,16 @@ object GpuColumnarToRowExecParent {
if (CudfRowTransitions.areAllSupported(output) &&
// For a small number of columns it is still best to do it the original way
output.length > 4 &&
// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
output.length < 100) {
// We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data.
// This number includes the 1-bit validity per column, but doesn't include padding.
// We are being conservative by only allowing 100M columns until we feel the need to
// increase this number
output.length <= 100000000) {
(batches: Iterator[ColumnarBatch]) => {
// UnsafeProjection is not serializable so do it on the executor side
val toUnsafe = UnsafeProjection.create(output, output)
new AcceleratedColumnarToRowIterator(output,
batches, numInputBatches, numOutputRows, opTime, streamTime).map(toUnsafe)
new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows,
opTime, streamTime).map(toUnsafe)
}
} else {
(batches: Iterator[ColumnarBatch]) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -767,7 +767,7 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| (com.nvidia.spark.rapids.GpuMetric)references[6],
| (com.nvidia.spark.rapids.GpuMetric)references[7],
| (com.nvidia.spark.rapids.GpuMetric)references[8]);
| ${ctx.initMutableStates()}
| ${ctx.initMutableStates()}
| }
|
| // Avoid virtual function calls by copying the data in a batch at a time instead
Expand All @@ -780,8 +780,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
|
| int dataOffset = 0;
| int currentRow = 0;
| int offsetIndex = 0;
|
| offsetsBuffer.setInt(0, dataOffset);
| // If we are here we have at least one row to process, so don't bother checking yet
| boolean done = false;
| while (!done) {
Expand All @@ -793,6 +793,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| row = (UnsafeRow)input.next();
| }
| int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress);
| offsetsBuffer.setInt(offsetIndex, dataOffset);
| offsetIndex += 4;
| if (numBytesUsedByRow < 0) {
| pending = row;
| done = true;
Expand Down Expand Up @@ -892,10 +894,12 @@ case class GpuRowToColumnarExec(child: SparkPlan,
// cache in a local to avoid serializing the plan
val localSchema = schema

// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
if ((1 until 100).contains(output.length) &&
// We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data.
// This number includes the 1-bit validity per column, but doesn't include padding.
// We are being conservative by only allowing 100M columns until we feel the need to
// increase this number. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields".
if ((1 until 100000000).contains(output.length) &&
CudfRowTransitions.areAllSupported(output)) {
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
Expand Down

0 comments on commit f1be1a5

Please sign in to comment.