Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix arrays_zip to not rely on broken segmented gather #7484

Merged
merged 3 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ def do_it(spark):


@pytest.mark.parametrize('data_gen', array_zips_gen, ids=idfn)
@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/7469")
def test_arrays_zip(data_gen):
gen = StructGen(
[('a', data_gen), ('b', data_gen), ('c', data_gen), ('d', data_gen)], nullable=False)
Expand All @@ -430,7 +429,6 @@ def test_arrays_zip(data_gen):
)


@pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/7469")
def test_arrays_zip_corner_cases():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, ArrayGen(int_gen), length=100).selectExpr(
Expand Down Expand Up @@ -660,4 +658,4 @@ def test_array_remove(data_gen):
lambda spark: gen_df(spark, gen).selectExpr(
'array_remove(a, b)',
'array_remove(a, null)')
)
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -729,62 +729,66 @@ case class GpuArraysZip(children: Seq[Expression]) extends GpuExpression with Sh
GpuExpressionsUtils.columnarEvalToColumn(expr, batch).getBase
}

withResource(inputs) { inputs =>

// Compute max size of input arrays for each row
//
// input1: [[A, B, C], [D, E], [F], [G]]
// input2: [[a, b], [c, d, e], null, [f, g]]
// max array size: [3, 3, 0, 2]
val maxArraySize = computeMaxArraySize(inputs)
val cleanedInputs = withResource(inputs) { inputs =>
normalizeNulls(inputs)
}

// Generate offsets from max array sizes
//
// [3, 3, 0, 2] => [0, 3, 6, 6, 8]
val offsets = maxArraySize.generateListOffsets()
val padded = withResource(cleanedInputs) { cleanedInputs =>
padArraysToMaxLength(cleanedInputs)
}

// Generate sequence indices for gathering children of input arrays
//
// [3, 3, 0, 2] => [[0, 1, 2], [0, 1, 2], [], [0, 1]]
val seqIndices = closeOnExcept(offsets) { _ =>
withResource(maxArraySize) { _ =>
generateSeqIndices(maxArraySize)
}
withResource(padded) { _ =>
closeOnExcept(zipArrays(padded)) { ret =>
GpuColumnVector.from(ret, dataType)
}
}
}

// Perform segment gather on input columns with indices covering each element
//
// input1: [[A, B, C], [D, E], [F], [G]]
// input2: [[a, b], [c, d, e], null, [f, g]]
// indices: [[0, 1, 2], [0, 1, 2], [], [0, 1]]
// children1: [A, B, C, D, E, null, G, null]
// children2: [a, b, null, c, d, e, f, g]
val gatheredChildren = closeOnExcept(offsets) { _ =>
withResource(seqIndices) { _ =>
inputs.safeMap { cv =>
withResource(cv.segmentedGather(seqIndices)) { gathered =>
gathered.getChildColumnView(0).copyToColumnVector()
}
/**
* Segmented gather in CUDF produces a NULL output for a NULL input. But we need to produce
* child columns that we can put together in a struct. This requires them all to have the
* same length. To make this work we need to make sure all of the inputs have nulls in the
* same places at the top level. That way when we gather things we get the same output
* size for all of the children of the input columns. The result of this will have the same
* validity for all top level columns, but possibly different offsets.
*/
private def normalizeNulls(inputs: Seq[cudf.ColumnVector]): Seq[ColumnVector] = {
// First lest figure out if there are any nulls at all, because if there are not we don't
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// need to do anything.
if (inputs.exists(_.hasNulls)) {
var nullOutput = inputs.head.isNull
try {
inputs.slice(1, inputs.length).foreach { cv =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val combinedIsNull = withResource(cv.isNull) { tmpIsNull =>
tmpIsNull.or(nullOutput)
}
closeOnExcept(combinedIsNull) { _ =>
nullOutput.close()
nullOutput = combinedIsNull
}
}
}

// Zip gathered children along with a union offset in the form of List of Struct
//
// validity mask: [true, true, false, true]
// offsets: [0, 3, 6, 6, 8]
// children1: [A, B, C, D, E, null, G, null]
// children2: [a, b, null, c, d, e, f, g]
// zipped: [[{A, a}, {B, b}, {C, null}],
// [{D, c}, {E, d}, {null, e}],
// null,
// [{G, f}, {null, g}]]
closeOnExcept(zipArrays(offsets, gatheredChildren, inputs)) { ret =>
GpuColumnVector.from(ret, dataType)
// input1: [[A, B, C], [D, E], [F], [G]]
// input2: [[a, b], [c, d, e], null, [f, g]]
// combinedIsNull, false, false, true, false

// output1: [[A, B, C], [D, E], null, [G]]
// output2: [[a, b], [c, d, e], null, [f, g]]

inputs.zip(children).safeMap { case (cv, child) =>
withResource(GpuScalar.from(null, child.dataType)) { nullArray =>
nullOutput.ifElse(nullArray, cv)
}
}
} finally {
nullOutput.close()
}
} else {
inputs.map(_.incRefCount())
}
}


private def computeMaxArraySize(inputs: Seq[ColumnVector]): ColumnVector = {
// Compute array sizes of input arrays
val arraySizes = inputs.safeMap(_.countElements())
Expand All @@ -809,28 +813,65 @@ case class GpuArraysZip(children: Seq[Expression]) extends GpuExpression with Sh

private def generateSeqIndices(maxArraySize: ColumnVector): ColumnVector = {
withResource(GpuScalar.from(0, IntegerType)) { s =>
val zeroCV = GpuColumnVector.from(s, maxArraySize.getRowCount.toInt, IntegerType)
withResource(zeroCV.getBase) { zero =>
withResource(cudf.ColumnVector.fromScalar(s, maxArraySize.getRowCount.toInt)) { zero =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
ColumnVector.sequence(zero, maxArraySize)
}
}
}

private def zipArrays(offsets: ColumnVector,
children: Seq[ColumnVector],
inputs: Seq[ColumnVector]) : ColumnVector = {
// Construct List of Struct with list offsets and struct fields
val zipped = withResource(offsets) { _ =>
withResource(children) { _ =>
withResource(ColumnVector.makeStruct(children: _*)) { struct =>
struct.makeListFromOffsets(offsets.getRowCount - 1, offsets)
}
/**
* Do a segmented gather on the inputs so that they are padded with nulls to make sure each LIST
* on the same row, at the top level, has the same length. The columns returned should have the
* same offsets and the same validity. This assumes that the validity on the inputs all match.
*/
private def padArraysToMaxLength(inputs: Seq[ColumnVector]): Seq[ColumnVector] = {
// Compute max size of input arrays for each row, this is to know how we need to pad things.
//
// input1: [[A, B, C], [D, E], null, [G]]
// input2: [[a, b], [c, d, e], null, [f, g]]
// max array size: [3, 3, 0, 2]
val seqIndices = withResource(computeMaxArraySize(inputs)) { maxArraySize =>
// Generate sequence indices for gathering children of input arrays
//
// [3, 3, 0, 2] => [[0, 1, 2], [0, 1, 2], [], [0, 1]]
generateSeqIndices(maxArraySize)
}

// Perform segment gather on input columns with indices covering each element
//
// input1: [[A, B, C], [D, E], null, [G]]
// input2: [[a, b], [c, d, e], null, [f, g]]
// indices: [[0, 1, 2], [0, 1, 2], [], [0, 1]]
// output1: [[A, B, C], [D, E, null], null, [G, null]]
// output2: [[a, b, null], [c, d, e], null, [f, g]]
withResource(seqIndices) { _ =>
inputs.safeMap { cv =>
cv.segmentedGather(seqIndices)
}
}
// Set the validity of result with the joint validity of input columns, which means
// the output record will be null if any of input records is null.
withResource(zipped) { _ =>
zipped.mergeAndSetValidity(BinaryOp.BITWISE_AND, inputs: _*)
}

/**
* This turns LIST[X], LIST[Y], ... into a LIST[STRUCT[X, Y, ...]] but requires that
* the input LIST columns all have the same validity and offsets.
*/
private def zipArrays(padded: Seq[ColumnVector]) : ColumnVector = {
// Get the data column from the children, without any offsets
val children = padded.safeMap(_.getChildColumnView(0))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// Put them into a struct column view
withResource(ColumnView.makeStructView(children: _*)) { structView =>
// Then make the struct a list using the input's offsets and validity
// in the cheapest way possible.
val proto = padded.head
withResource(proto.getValid) { valid =>
withResource(proto.getOffsets) { offsets =>
withResource(new ColumnView(DType.LIST, proto.getRowCount,
java.util.Optional.of[java.lang.Long](proto.getNullCount),
valid, offsets, Array(structView))) { retView =>
retView.copyToColumnVector()
}
}
}
}
}
}
Expand Down