Skip to content

Commit

Permalink
Support non literal position and length for substring (#7051)
Browse files Browse the repository at this point in the history
* Support non literal position and length for substring

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Nov 17, 2022
1 parent 458d417 commit f4f09a8
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 42 deletions.
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -13705,7 +13705,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -13726,7 +13726,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
32 changes: 32 additions & 0 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,38 @@ def test_substring():
'SUBSTRING(a, 0, 10)',
'SUBSTRING(a, 0, 0)'))

def test_substring_column():
str_gen = mk_str_gen('.{0,30}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, str_gen, int_gen, int_gen).selectExpr(
'SUBSTRING(a, b, c)',
'SUBSTRING(a, b, 0)',
'SUBSTRING(a, b, 5)',
'SUBSTRING(a, b, -5)',
'SUBSTRING(a, b, 100)',
'SUBSTRING(a, b, -100)',
'SUBSTRING(a, b, NULL)',
'SUBSTRING(a, 0, c)',
'SUBSTRING(a, 5, c)',
'SUBSTRING(a, -5, c)',
'SUBSTRING(a, 100, c)',
'SUBSTRING(a, -100, c)',
'SUBSTRING(a, NULL, c)',
'SUBSTRING(\'abc\', b, c)',
'SUBSTRING(\'abc\', 1, c)',
'SUBSTRING(\'abc\', 0, c)',
'SUBSTRING(\'abc\', 5, c)',
'SUBSTRING(\'abc\', -1, c)',
'SUBSTRING(\'abc\', -5, c)',
'SUBSTRING(\'abc\', NULL, c)',
'SUBSTRING(\'abc\', b, 10)',
'SUBSTRING(\'abc\', b, -10)',
'SUBSTRING(\'abc\', b, 2)',
'SUBSTRING(\'abc\', b, 0)',
'SUBSTRING(\'abc\', b, NULL)',
'SUBSTRING(\'abc\', b)',
'SUBSTRING(a, b)'))

def test_repeat_scalar_and_column():
gen_s = StringGen(nullable=False)
gen_r = IntegerGen(min_val=-100, max_val=100, special_cases=[0], nullable=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2958,8 +2958,8 @@ object GpuOverrides extends Logging {
"Substring operator",
ExprChecks.projectOnly(TypeSig.STRING, TypeSig.STRING + TypeSig.BINARY,
Seq(ParamCheck("str", TypeSig.STRING, TypeSig.STRING + TypeSig.BINARY),
ParamCheck("pos", TypeSig.lit(TypeEnum.INT), TypeSig.INT),
ParamCheck("len", TypeSig.lit(TypeEnum.INT), TypeSig.INT))),
ParamCheck("pos", TypeSig.INT, TypeSig.INT),
ParamCheck("len", TypeSig.INT, TypeSig.INT))),
(in, conf, p, r) => new TernaryExprMeta[Substring](in, conf, p, r) {
override def convertToGpu(
column: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.apache.spark.sql.rapids

import java.nio.charset.Charset
import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, PadSide, Scalar, Table}
import ai.rapids.cudf.{BinaryOp, BinaryOperable, ColumnVector, ColumnView, DType, PadSide, Scalar, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.{RegExpShim, ShimExpression}
Expand Down Expand Up @@ -460,34 +461,179 @@ case class GpuSubstring(str: Expression, pos: Expression, len: Expression)
this(str, pos, GpuLiteral(Integer.MAX_VALUE, IntegerType))
}

override def doColumnar(
val0: GpuColumnVector,
val1: GpuColumnVector,
val2: GpuColumnVector): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
private[this] def computeStarts(strs: ColumnView, poses: ColumnView): ColumnVector = {
// CPU:
// start = (pos < 0) ? pos + str_size : ((pos > 0) ? pos - 1 : 0)
// cudf `substring(column, column, column)` treats negative start always as 0, so
// need to do the similar calculation as CPU here.

override def doColumnar(
val0: GpuScalar,
val1: GpuColumnVector,
val2: GpuColumnVector): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
// 1) pos + str_size
val negConvertedPoses = withResource(strs.getCharLengths) { strSizes =>
poses.add(strSizes, DType.INT32)
}
withResource(negConvertedPoses) { _ =>
withResource(Scalar.fromInt(0)) { zero =>
// 2) (pos > 0) ? pos - 1 : 0
val subOnePoses = withResource(Scalar.fromInt(1)) { one =>
poses.sub(one, DType.INT32)
}
val zeroBasedPoses = withResource(subOnePoses) { _ =>
withResource(poses.greaterThan(zero)) { posPosFlags =>
// Use "poses" here instead of "zero" as the false path to keep the nulls,
// since "ifElse" will erase the null mask of "poses".
posPosFlags.ifElse(subOnePoses, poses)
}
}

override def doColumnar(val0: GpuScalar, val1: GpuScalar, val2: GpuColumnVector): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
withResource(zeroBasedPoses) { _ =>
withResource(poses.lessThan(zero)) { negPosFlags =>
negPosFlags.ifElse(negConvertedPoses, zeroBasedPoses)
}
}
}
}
}

override def doColumnar(val0: GpuScalar, val1: GpuColumnVector, val2: GpuScalar): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
private[this] def computeEnds(starts: BinaryOperable, lens: BinaryOperable): ColumnVector = {
// CPU:
// end = start + length
// , along with integer overflow check
val endLongCol = withResource(starts.add(lens, DType.INT64)) { endColAsLong =>
// If (end < 0), end = 0, let cudf return empty string.
// If (end > Int.MaxValue), end = Int.MaxValue, let cudf return string
// from start until the string end.
// To align with the CPU's behavior.
withResource(Scalar.fromLong(0L)) { zero =>
withResource(Scalar.fromLong(Int.MaxValue.toLong)) { maxInt =>
endColAsLong.clamp(zero, maxInt)
}
}
}
withResource(endLongCol) { _ =>
endLongCol.castTo(DType.INT32)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: GpuScalar,
val2: GpuColumnVector): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
private[this] def substringColumn(strs: ColumnView, starts: ColumnView,
ends: ColumnView): ColumnVector = {
// cudf does not allow nulls in starts and ends.
val noNullStarts = new ColumnView(starts.getType, starts.getRowCount, Optional.of(0L),
starts.getData, null)
withResource(noNullStarts) { _ =>
val noNullEnds = new ColumnView(ends.getType, ends.getRowCount, Optional.of(0L),
ends.getData, null)
withResource(noNullEnds) { _ =>
// Spark returns null if any of (str, pos, len) is null, and `ends`'s null mask
// should already cover pos and len.
withResource(strs.mergeAndSetValidity(BinaryOp.BITWISE_AND, strs, ends)) { rets =>
rets.substring(noNullStarts, noNullEnds)
}
}
}
}

override def doColumnar(strCol: GpuColumnVector, posCol: GpuColumnVector,
lenCol: GpuColumnVector): ColumnVector = {
val strs = strCol.getBase
val poses = posCol.getBase
val lens = lenCol.getBase
withResource(computeStarts(strs, poses)) { starts =>
withResource(computeEnds(starts, lens)) { ends =>
substringColumn(strs, starts, ends)
}
}
}

override def doColumnar(column: GpuColumnVector, position: GpuScalar,
length: GpuScalar): ColumnVector = {
val pos = position.getValue.asInstanceOf[Int]
val len = length.getValue.asInstanceOf[Int]
override def doColumnar(strS: GpuScalar, posCol: GpuColumnVector,
lenCol: GpuColumnVector): ColumnVector = {
val numRows = posCol.getRowCount.toInt
withResource(GpuColumnVector.from(strS, numRows, strS.dataType)) { strCol =>
doColumnar(strCol, posCol, lenCol)
}
}

override def doColumnar(strCol: GpuColumnVector, posS: GpuScalar,
lenCol: GpuColumnVector): ColumnVector = {
val strs = strCol.getBase
val lens = lenCol.getBase
val pos = posS.getValue.asInstanceOf[Int]
// CPU:
// start = (pos < 0) ? pos + str_size : ((pos > 0) ? pos - 1 : 0)
val starts = if (pos < 0) {
withResource(strs.getCharLengths) { strSizes =>
withResource(Scalar.fromInt(pos)) { posS =>
posS.add(strSizes, DType.INT32)
}
}
} else { // pos >= 0
val start = if (pos > 0) pos - 1 else 0
withResource(Scalar.fromInt(start)) { startS =>
ColumnVector.fromScalar(startS, strs.getRowCount.toInt)
}
}

withResource(starts) { _ =>
withResource(computeEnds(starts, lens)) { ends =>
substringColumn(strs, starts, ends)
}
}
}

override def doColumnar (strS: GpuScalar, posS: GpuScalar,
lenCol: GpuColumnVector): ColumnVector = {
val strValue = strS.getValue.asInstanceOf[UTF8String]
val pos = posS.getValue.asInstanceOf[Int]
val lens = lenCol.getBase
val numRows = lenCol.getRowCount.toInt
// CPU:
// start = (pos < 0) ? pos + str_size : ((pos > 0) ? pos - 1 : 0)
val start = if (pos < 0) {
pos + strValue.numChars()
} else if (pos > 0) {
pos - 1
} else 0

val starts = withResource(Scalar.fromInt(start)) { startS =>
ColumnVector.fromScalar(startS, numRows)
}

withResource(starts) { _ =>
withResource(computeEnds(starts, lens)) { ends =>
withResource(ColumnVector.fromScalar(strS.getBase, numRows)) { strs =>
substringColumn(strs, starts, ends)
}
}
}
}

override def doColumnar(strCol: GpuColumnVector, posCol: GpuColumnVector,
lenS: GpuScalar): ColumnVector = {
val strs = strCol.getBase
val poses = posCol.getBase
val numRows = strCol.getRowCount.toInt
withResource(computeStarts(strs, poses)) { starts =>
val ends = withResource(ColumnVector.fromScalar(lenS.getBase, numRows)) { lens =>
computeEnds(starts, lens)
}
withResource(ends) { _ =>
substringColumn(strs, starts, ends)
}
}
}

override def doColumnar(strS: GpuScalar, posCol: GpuColumnVector,
lenS: GpuScalar): ColumnVector = {
val numRows = posCol.getRowCount.toInt
withResource(GpuColumnVector.from(strS, numRows, strS.dataType)) { strCol =>
doColumnar(strCol, posCol, lenS)
}
}

override def doColumnar(strCol: GpuColumnVector, posS: GpuScalar,
lenS: GpuScalar): ColumnVector = {
val strs = strCol.getBase
val pos = posS.getValue.asInstanceOf[Int]
val len = lenS.getValue.asInstanceOf[Int]
val (start, endOpt) = if (len <= 0) {
// Spark returns empty string if length is negative or zero
(0, Some(0))
Expand All @@ -511,22 +657,15 @@ case class GpuSubstring(str: Expression, pos: Expression, len: Expression)
// e.g. `substring("abc", -3, 4)` outputs "abc".
(pos, None)
}
val col = column.getBase
endOpt.map(col.substring(start, _)).getOrElse(col.substring(start))
endOpt.map(strs.substring(start, _)).getOrElse(strs.substring(start))
}

override def doColumnar(numRows: Int, val0: GpuScalar, val1: GpuScalar,
val2: GpuScalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
override def doColumnar(numRows: Int, strS: GpuScalar, posS: GpuScalar,
lenS: GpuScalar): ColumnVector = {
withResource(GpuColumnVector.from(strS, numRows, strS.dataType)) { strCol =>
doColumnar(strCol, posS, lenS)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: GpuColumnVector,
val2: GpuScalar): ColumnVector =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
}

case class GpuInitCap(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes {
Expand Down
4 changes: 2 additions & 2 deletions tools/src/main/resources/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ StringTrimRight,S,`rtrim`,None,project,src,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA
StringTrimRight,S,`rtrim`,None,project,trimStr,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
StringTrimRight,S,`rtrim`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,str,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,pos,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,len,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,pos,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,len,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Substring,S,`substr`; `substring`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NS,NA,NA,NA,NA,NA
SubstringIndex,S,`substring_index`,None,project,str,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA
SubstringIndex,S,`substring_index`,None,project,delim,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA
Expand Down

0 comments on commit f4f09a8

Please sign in to comment.