Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call the right method to convert table from row major <=> col major [databricks] #3762

Merged
merged 10 commits into from
Jan 13, 2022
22 changes: 20 additions & 2 deletions integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,12 +45,30 @@ def test_row_conversions():

def test_row_conversions_fixed_width():
gens = [["a", byte_gen], ["b", short_gen], ["c", int_gen], ["d", long_gen],
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["e", float_gen], ["f", double_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", decimal_gen_64bit],
["l", decimal_gen_scale_precision]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))

def test_row_conversions_fixed_width_wide():
gens = [["a{}".format(i), ByteGen(nullable=True)] for i in range(10)] + \
[["b{}".format(i), ShortGen(nullable=True)] for i in range(10)] + \
[["c{}".format(i), IntegerGen(nullable=True)] for i in range(10)] + \
[["d{}".format(i), LongGen(nullable=True)] for i in range(10)] + \
[["e{}".format(i), FloatGen(nullable=True)] for i in range(10)] + \
[["f{}".format(i), DoubleGen(nullable=True)] for i in range(10)] + \
[["h{}".format(i), BooleanGen(nullable=True)] for i in range(10)] + \
[["i{}".format(i), TimestampGen(nullable=True)] for i in range(10)] + \
[["j{}".format(i), DateGen(nullable=True)] for i in range(10)] + \
[["k{}".format(i), DecimalGen(precision=12, scale=2, nullable=True)] for i in range(10)] + \
[["l{}".format(i), DecimalGen(precision=7, scale=3, nullable=True)] for i in range(10)]
def do_it(spark):
df=gen_df(spark, gens, length=1).selectExpr("*", "a0 as a_again")
debug_df(df)
return df
assert_gpu_and_cpu_are_equal_collect(do_it)

# Test handling of transitions when the data is already columnar on the host
# Note that Apache Spark will automatically convert a load of nested types to rows, so
# the nested types will not test a host columnar transition in that case.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@ public abstract class UnsafeRowToColumnarBatchIterator implements Iterator<Colum
protected final GpuMetric numInputRows;
protected final GpuMetric numOutputRows;
protected final GpuMetric numOutputBatches;
protected final boolean isOptimizedForFixedWidth;

protected UnsafeRowToColumnarBatchIterator(
Iterator<UnsafeRow> input,
Expand All @@ -65,7 +66,8 @@ protected UnsafeRowToColumnarBatchIterator(
GpuMetric opTime,
GpuMetric numInputRows,
GpuMetric numOutputRows,
GpuMetric numOutputBatches) {
GpuMetric numOutputBatches,
Boolean isOptimizedForFixedWidth) {
this.input = input;
int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema);
numRowsEstimate = (int)Math.max(1,
Expand All @@ -84,6 +86,7 @@ protected UnsafeRowToColumnarBatchIterator(
this.numInputRows = numInputRows;
this.numOutputRows = numOutputRows;
this.numOutputBatches = numOutputBatches;
this.isOptimizedForFixedWidth = isOptimizedForFixedWidth;
}

@Override
Expand Down Expand Up @@ -163,7 +166,16 @@ public ColumnarBatch next() {
}
try (NvtxRange ignored = buildRange;
ColumnVector cv = devColumn;
Table tab = Table.convertFromRows(cv, rapidsTypes)) {
// We are branching over the output.length to know which kernel to call.
// If output.length < 100 we call the fixed-width optimized version, otherwise the
razajafri marked this conversation as resolved.
Show resolved Hide resolved
// generic one
Table tab = isOptimizedForFixedWidth ?
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means
// at most 184 double/long values. We are branching over the output.length to know
// which kernel to call. If output.length < 100 we call the fixed-width optimized
// version, otherwise the generic one
Table.convertFromRowsFixedWidthOptimized(cv, rapidsTypes) :
Table.convertFromRows(cv, rapidsTypes)) {
return GpuColumnVector.from(tab, outputTypes);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,8 @@ class AcceleratedColumnarToRowIterator(
numInputBatches: GpuMetric,
numOutputRows: GpuMetric,
opTime: GpuMetric,
streamTime: GpuMetric) extends Iterator[InternalRow] with Arm with Serializable {
streamTime: GpuMetric,
isFixedWidthOptimized: Boolean) extends Iterator[InternalRow] with Arm with Serializable {
@transient private var pendingCvs: Queue[HostColumnVector] = Queue.empty
// GPU batches read in must be closed by the receiver (us)
@transient private var currentCv: Option[HostColumnVector] = None
Expand Down Expand Up @@ -108,7 +109,17 @@ class AcceleratedColumnarToRowIterator(
if (cb.numRows() > 0) {
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
withResource(rearrangeRows(cb)) { table =>
withResource(table.convertToRows()) { rowsCvList =>
// The fixed-width optimized cudf kernel only supports up to 1.5 KB per row which means at
// most 184 double/long values. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields". So, we are going to be cautious and start with that
// until we have tested it more. We branching over the output.length to know which kernel
// to call. If output.length < 100 we call the fixed-width optimized version, otherwise
// the generic one
withResource(if (isFixedWidthOptimized) {
table.convertToRowsFixedWidthOptimized()
} else {
table.convertToRows()
}) { rowsCvList =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
Expand Down Expand Up @@ -342,15 +353,16 @@ object GpuColumnarToRowExecParent {
if (CudfRowTransitions.areAllSupported(output) &&
// For a small number of columns it is still best to do it the original way
output.length > 4 &&
// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
output.length < 100) {
// We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data.
// This number includes the 1-bit validity per column, but doesn't include padding.
// We are being conservative by only allowing 100M columns until we feel the need to
// increase this number
output.length <= 100000000) {
(batches: Iterator[ColumnarBatch]) => {
// UnsafeProjection is not serializable so do it on the executor side
val toUnsafe = UnsafeProjection.create(output, output)
new AcceleratedColumnarToRowIterator(output,
batches, numInputBatches, numOutputRows, opTime, streamTime).map(toUnsafe)
new AcceleratedColumnarToRowIterator(output, batches, numInputBatches, numOutputRows,
opTime, streamTime, output.length < 100).map(toUnsafe)
}
} else {
(batches: Iterator[ColumnarBatch]) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -685,7 +685,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
opTime: GpuMetric,
numInputRows: GpuMetric,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric): UnsafeRowToColumnarBatchIterator = {
numOutputBatches: GpuMetric,
isOptimizedForFixedWidth: Boolean): UnsafeRowToColumnarBatchIterator = {
val ctx = new CodegenContext

ctx.addReferenceObj("iter", input, classOf[Iterator[UnsafeRow]].getName)
Expand All @@ -697,6 +698,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
ctx.addReferenceObj("numInputRows", numInputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputRows", numOutputRows, classOf[GpuMetric].getName)
ctx.addReferenceObj("numOutputBatches", numOutputBatches, classOf[GpuMetric].getName)
ctx.addReferenceObj("isOptimizedForFixedWidth",
isOptimizedForFixedWidth, classOf[Boolean].getName)

val rowBaseObj = ctx.freshName("rowBaseObj")
val rowBaseOffset = ctx.freshName("rowBaseOffset")
Expand Down Expand Up @@ -766,8 +769,9 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| (com.nvidia.spark.rapids.GpuMetric)references[5],
| (com.nvidia.spark.rapids.GpuMetric)references[6],
| (com.nvidia.spark.rapids.GpuMetric)references[7],
| (com.nvidia.spark.rapids.GpuMetric)references[8]);
| ${ctx.initMutableStates()}
| (com.nvidia.spark.rapids.GpuMetric)references[8],
| (java.lang.Boolean)references[9]);
| ${ctx.initMutableStates()}
| }
|
| // Avoid virtual function calls by copying the data in a batch at a time instead
Expand All @@ -780,8 +784,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
|
| int dataOffset = 0;
| int currentRow = 0;
| int offsetIndex = 0;
|
| offsetsBuffer.setInt(0, dataOffset);
| // If we are here we have at least one row to process, so don't bother checking yet
| boolean done = false;
| while (!done) {
Expand All @@ -793,6 +797,8 @@ object GeneratedUnsafeRowToCudfRowIterator extends Logging {
| row = (UnsafeRow)input.next();
| }
| int numBytesUsedByRow = copyInto(row, dataBaseAddress + dataOffset, endDataAddress);
| offsetsBuffer.setInt(offsetIndex, dataOffset);
| offsetIndex += 4;
| if (numBytesUsedByRow < 0) {
| pending = row;
| done = true;
Expand Down Expand Up @@ -892,16 +898,18 @@ case class GpuRowToColumnarExec(child: SparkPlan,
// cache in a local to avoid serializing the plan
val localSchema = schema

// The cudf kernel only supports up to 1.5 KB per row which means at most 184 double/long
// values. Spark by default limits codegen to 100 fields "spark.sql.codegen.maxFields".
// So, we are going to be cautious and start with that until we have tested it more.
if ((1 until 100).contains(output.length) &&
// We can support upto 2^31 bytes per row. That is ~250M columns of 64-bit fixed-width data.
// This number includes the 1-bit validity per column, but doesn't include padding.
// We are being conservative by only allowing 100M columns until we feel the need to
// increase this number. Spark by default limits codegen to 100 fields
// "spark.sql.codegen.maxFields".
if ((1 until 100000000).contains(output.length) &&
CudfRowTransitions.areAllSupported(output)) {
val localOutput = output
rowBased.mapPartitions(rowIter => GeneratedUnsafeRowToCudfRowIterator(
rowIter.asInstanceOf[Iterator[UnsafeRow]],
localOutput.toArray, localGoal, semaphoreWaitTime, streamTime, opTime,
numInputRows, numOutputRows, numOutputBatches))
numInputRows, numOutputRows, numOutputBatches, output.length < 100))
} else {
val converters = new GpuRowToColumnConverter(localSchema)
rowBased.mapPartitions(rowIter => new RowToColumnarIterator(rowIter,
Expand Down