Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

what should stringConcatenateListElements return when list only contains nulls? #9745

Closed
HaoYang670 opened this issue Nov 22, 2021 · 20 comments
Assignees
Labels
feature request New feature or request

Comments

@HaoYang670
Copy link
Contributor

I am using the method stringConcatenateListElements(Scalar separator, Scalar narep, boolean separateNulls, boolean emptyStringOutputIfEmptyList) in Spark-Rapids to cast Array typed dataframe to string type. I find that when the input is Array(null, null. null), the output is always an empty string. (I set narep="null", seperateNulls=true). However, what the expected is "null, null, null".

Describe the solution you'd like
I add a test in ColumnVectorTest.java:
image
This is what I expected.

@HaoYang670 HaoYang670 added Needs Triage Need team to review and classify feature request New feature or request labels Nov 22, 2021
@sperlingxx sperlingxx assigned sperlingxx and ttnghia and unassigned sperlingxx Nov 22, 2021
@sperlingxx
Copy link
Contributor

Hi @ttnghia, would you take a look when you have time?

@ttnghia
Copy link
Contributor

ttnghia commented Nov 22, 2021

Yes, the result is correct. It's the desired behavior of Spark. When the input is all nulls, Spark will output null.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 22, 2021

You will find that behavior almost everywhere: nulls in => null out 😃

@ttnghia
Copy link
Contributor

ttnghia commented Nov 22, 2021

FYI: Here is the cudf doc for the case of all nulls in a list (

* In the special case when the input list row contains all null elements, the output will be the
):

* In the special case when the input list row contains all null elements, the output will be the
* same as in case of empty input list regardless of @p string_narep and @p separate_nulls values.

So the output of all-nulls input will depend on the parameter emptyStringOutputIfEmptyList. In your case, you set to get an empty string (sorry I said null before).

@ttnghia
Copy link
Contributor

ttnghia commented Nov 22, 2021

That's interesting. @revans2 should know more about this, so can you comment on this please? If necessary then we can change that in cudf implementation.

@HaoYang670
Copy link
Contributor Author

image

@HaoYang670
Copy link
Contributor Author

This is the result I get using Spark 3.2.0. When casting to string, Array(null, null) is casted to "[null, null]"

@revans2
Copy link
Contributor

revans2 commented Nov 22, 2021

Sorry @ttnghia

I opened #7727 in March and it was close din April, so I am having some trouble remembering all of the details from 7 months ago. Also all of the places we currently use this API have all of the parameters set to false, so there could be some oddness happening there too.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 22, 2021

I just realize that the example above is casting from struct to string, not concat strings. Did you try concat strings?

val data = Seq(("James","A","Smith","2018","M",3000), (null, null, null, null, null, 1000))
val columns = Seq("fname","mname","lname","dob_year","gender","salary")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)
df.show(false)

df.select(concat(col("fname"),lit(','), col("mname"),lit(','),col("lname")).as("FullName")).show(false)

The behavior for all-nulls input was requested to match with Python output. We can further modify the cudf API if we must support non-null non-empty output for such cases.

@HaoYang670
Copy link
Contributor Author

HaoYang670 commented Nov 23, 2021

scala> val data = Seq(Row(Array(null, null)))
data: Seq[org.apache.spark.sql.Row] = List([[Lscala.runtime.Null$;@39081953])

scala> val schema = StructType(Array(StructField("a", ArrayType(IntegerType, true))))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,ArrayType(IntegerType,true),true))

scala> val df0 = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df0: org.apache.spark.sql.DataFrame = [a: array<int>]

scala> df0.createOrReplaceTempView("df0")

scala> val df1 = spark.sql("select concat(a, a) as res from df0")
df1: org.apache.spark.sql.DataFrame = [res: array<int>]

scala> df1.collect
res1: Array[org.apache.spark.sql.Row] = Array([WrappedArray(null, null, null, null)])

scala> val df2 = df1.withColumn("res", col("res").cast(StringType))
df2: org.apache.spark.sql.DataFrame = [res: string]

scala> df2.collect
res2: Array[org.apache.spark.sql.Row] = Array([[null, null, null, null]])

scala> df2.show
+--------------------+
|                 res|
+--------------------+
|[null, null, null...|
+--------------------+


scala> val df3 = spark.sql("select concat_ws(', ', a, a) as res from df0")
df3: org.apache.spark.sql.DataFrame = [res: string]

scala> df3.collect
res4: Array[org.apache.spark.sql.Row] = Array([])

scala> df3.show
+---+
|res|
+---+
|   |
+---+

I review #4728 and find the difference between concat and concat_ws on dealing with "all nulls list" (df2 and df3). The behavior of concatenate_list_elements in cudf is more similar with concat_ws.
However, in Spark, cast(StringType ) deals with null in the same way as concat (transfer each null to "null" in the list).
Could you add a new API in concatenate_list_elements that is consisted with concat and cast(StringType) on dealing with all null list? @ttnghia

@jrhemstad
Copy link
Contributor

Could you add a new API in concatenate_list_elements that is consisted with concat and cast(StringType) on dealing with all null list?

@ttnghia before proceeding with anything along these lines, can someone please summarize what libcudf operation we are talking about and the missing behavior? I've seen both casting and concatenating discussed and those seem like very different operations, so I'm not clear what behavior we are discussing.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 23, 2021

The behavior we're talking here is directly related to the API join_list_elements (formerly knew as concatenate_list_elements) (

std::unique_ptr<column> join_list_elements(
).

Previously, in order to match with Spark and Python behaviors, it was designed with a parameter to specify what the output will be in case of all-nulls input list, that is either a null, or an empty string (

* In the special case when the input list row contains all null elements, the output will be the
):

 * In the special case when the input list row contains all null elements, the output will be the
 * same as in case of empty input list regardless of @p string_narep and @p separate_nulls values.

Now it seems that we need to have a third option to allow using string_narep.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 23, 2021

I've seen both casting and concatenating discussed and those seem like very different operations

We're talking about casting struct to string (in Spark). That can be done by casting each child into a string then concatenating these string results.

Yes, since those are different ops, we may have undesired behavior when using API for doing one thing to do the other thing. Using join_list_elements to concatenate strings produces a null if the input list contains all nulls, which is the correct behavior. However, casting a struct (contains all nulls children) into string produces a non-null result but a string like "[null, null, null]".

I'm not very familiar with the implementation detail of such casting op. Maybe we need to modify join_list_elements to get the desired behavior, or maybe we just need to modify the casting op to use the correct join parameter. Waiting for others who know more about this (casting struct to string op) to comment.

@jrhemstad
Copy link
Contributor

jrhemstad commented Nov 23, 2021

Now it seems that we need to have a third option to allow using string_narep

I strongly dislike that idea. This API is already a nightmare of option flags with a complex cross product of behavior depending on the values of the various parameters. This API is trying to do too many things.

For example, why is the separators column allowed to contain null elements necessitating a separator_narep to use in place of nulls in separators. This could be done much cleaner by requiring separators.null_count() == 0. A user can perform replace_nulls on separators with separator_narep before hand to achieve that behavior.

Likewise, empty_list_policy seems entirely unnecessary. An empty list should generate an empty string. If someone wants to turn empty strings into nulls, this can be achieved by post-processing empty strings to be null.

The relationship between string_narep and separate_nulls is also confusing. When would I specify a valid string_narep and not want separators? I would make string_narep be an optional<string_scalar>. If not string_narep.has_value(), then nulls are ignored and filtered out. If string_narep.has_value() then nulls are replaced and always separated. If the desired behavior is for a list containing a null to become null, then that should be a post processing step.

In the special case when the input list row contains all null elements, the output will be the
same as in case of empty input list regardless of @p string_narep and @p separate_nulls values.

Why? This is very non-intuitive. If I have [null, null, null] with string_narep == 'n' and separator == : I expect to get "n:n:n".

It is clear this API was overfit to corner cases of Spark and/or Python. We should take a step back and ask "What does it make sense for this function to do?" then work backwards from there and decide how corner cases can be satisfied by pre/post-processing as I described above.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 23, 2021

I'm a bit confused why the casting op from struct to string can't produce the desired result (like "[null, null, null]"). If the struct members are casting into string first, you (@HaoYang670 ) should get a list of strings like ["null", "null", "null"]. Concatenate these strings then should produce the correct result. The first step should produce list of strings, not list of nulls.

@HaoYang670
Copy link
Contributor Author

HaoYang670 commented Nov 24, 2021


  private  def castArrayToString(
                                  input: ColumnView,
                                  elementType: DataType,
                                  containsNull: Boolean,
                                  ansiMode: Boolean,
                                  legacyCastToString: Boolean,
                                  stringToDateAnsiModeEnabled: Boolean): ColumnVector = {
    val (leftStr, rightStr) =  ("[", "]")
    val emptyStr = ""
    val nullStr =  "null"
    val numRows = input.getRowCount.toInt

    // cast all elements in arrays (lists) to string type. Ex: [[1,2,3],[4,5]] => [["1","2","3"], ["4","5"]]
    withResource(doCast(input, ArrayType(elementType, containsNull), ArrayType(StringType, containsNull), ansiMode, legacyCastToString, stringToDateAnsiModeEnabled)) { strArr =>
      // cast each array (list) to a string (without brackets). Ex: [["1","2","3"], ["4","5"]] => ["1, 2, 3", "4, 5"]
      withResource(strArr.stringConcatenateListElements(
        Scalar.fromString(", "),
        Scalar.fromString("null"),
        true,
        true)) { withoutBrackets =>
        // add brackets to each string. Ex: ["1, 2, 3", "4, 5"] => ["[1, 2, 3]", "[4, 5]"]
        withResource(Seq(leftStr, rightStr).safeMap(Scalar.fromString).safeMap(s => ColumnVector.fromScalar(s, numRows))){ case Seq(leftColumn, rightColumn) =>
          withResource(ArrayBuffer.empty[ColumnVector]) { columns =>
            columns += leftColumn.incRefCount()
            columns += withoutBrackets.incRefCount()
            columns += rightColumn.incRefCount()

            withResource(ColumnVector.stringConcatenate(Scalar.fromString(emptyStr), Scalar.fromString(nullStr), columns.toArray))(
              _.mergeAndSetValidity(BinaryOp.BITWISE_AND, input)
            )
          }
        }
      }
    }

Currently, I am developing the function of casting ArrayType column to StringType Column (NVIDIA/spark-rapids#4028). Firstly, I cast all elements in array to string type. Secondly, I use the function

public final ColumnVector stringConcatenateListElements(Scalar separator,
to concatenate all elements in one array to a string.
To be consistent with Spark, this function should return "[null, null]" when the input array is Array(null, null). However, these are what I get when I run the code on my desktop:

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> val schema = StructType(Array(StructField("a", ArrayType(IntegerType, true))))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,ArrayType(IntegerType,true),true))

scala> val data0 = Seq(Row(Array(null, null)))
data0: Seq[org.apache.spark.sql.Row] = List([[Lscala.runtime.Null$;@3b58d05f])

scala> val df0 = spark.createDataFrame(spark.sparkContext.parallelize(data0), schema)
df0: org.apache.spark.sql.DataFrame = [a: array<int>]

scala> df0.withColumn("a", col("a").cast(StringType)).collect
21/11/24 09:18:52 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> cast(a#1 as string) AS a#3 will run on GPU
    *Expression <Cast> cast(a#1 as string) will run on GPU
  !NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
    @Expression <AttributeReference> a#1 could run on GPU

res0: Array[org.apache.spark.sql.Row] = Array([[]])                             

scala> val data1 = Seq(Row(Array(null, 1)))
data1: Seq[org.apache.spark.sql.Row] = List([[Ljava.lang.Object;@40c925a1])

scala> val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema)
df1: org.apache.spark.sql.DataFrame = [a: array<int>]

scala> df1.withColumn("a", col("a").cast(StringType)).collect
21/11/24 09:19:32 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> cast(a#8 as string) AS a#10 will run on GPU
    *Expression <Cast> cast(a#8 as string) will run on GPU
  !NOT_FOUND <RDDScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RDDScanExec could be found
    @Expression <AttributeReference> a#8 could run on GPU

res1: Array[org.apache.spark.sql.Row] = Array([[null, 1]])

When the input is all-nulls-array, the output is "[]" (when emptyStringOutputIfEmptyList is set as True), or "[null]" ( when emptyStringOutputIfEmptyList is set as False) (see df0). And when there exists one element that is not null (such as Array(null, null, 1)), every null element in the array is converted to string "null" (see df1).
In a word, what I want is that when the input is all-nulls-array, the output is "[null, null, ..., null]"

@HaoYang670
Copy link
Contributor Author

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val schema = StructType(Array(StructField("a", ArrayType(IntegerType, true))))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,ArrayType(IntegerType,true),true))

scala> val data0 = Seq(Row(Array(null, null)))
data0: Seq[org.apache.spark.sql.Row] = List([[Lscala.runtime.Null$;@1177049c])

scala> val df0 = spark.createDataFrame(spark.sparkContext.parallelize(data0), schema)
df0: org.apache.spark.sql.DataFrame = [a: array<int>]

scala> df0.withColumn("a", col("a").cast(StringType)).collect
res0: Array[org.apache.spark.sql.Row] = Array([[null, null]])

scala> val data1 = Seq(Row(Array(null, 1)))
data1: Seq[org.apache.spark.sql.Row] = List([[Ljava.lang.Object;@4f00ee48])

scala> val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema)
df1: org.apache.spark.sql.DataFrame = [a: array<int>]

scala> df1.withColumn("a", col("a").cast(StringType)).collect
res1: Array[org.apache.spark.sql.Row] = Array([[null, 1]])

This is the result when I run the same code on Spark. You can see the difference between Spark and Spark-rapids when casting df0 to string type.

@ttnghia
Copy link
Contributor

ttnghia commented Nov 24, 2021

A null element will result in a null after casting. You can then use GpuCoalesce to replace null by string literal "null".

@HaoYang670
Copy link
Contributor Author

Thank you, I will try it

@HaoYang670
Copy link
Contributor Author

I currently use GpuNvl to cast all nulls to string before calling stringConcatenateListElements, so that it does not depend on cudf to process all nulls list. Thank you very much @ttnghia @jrhemstad @revans2 @sperlingxx

@bdice bdice removed the Needs Triage Need team to review and classify label Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants