Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Result mismatch in CollectList when 'sort by' clause is involved #8227

Open
NEUpanning opened this issue Dec 13, 2024 · 8 comments
Open
Labels
bug Something isn't working triage

Comments

@NEUpanning
Copy link
Contributor

NEUpanning commented Dec 13, 2024

Backend

VL (Velox)

Bug description

Reproducing SQL:

CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
  (1, 'a'), (1, 'b'), (1, 'c'),
  (2, 'd'), (2, 'e'), (2, 'f'),
  (3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);

SELECT 1-id, collect_list(value) AS values_list
FROM (
  select * from
  (SELECT id, value
  FROM temp_table
  DISTRIBUTE BY rand()) 
  DISTRIBUTE BY id sort by id,value
) t
GROUP BY 1;

Results:

The vanilla result is deterministic and values_list is sorted by value column:

id values_list
1 ["a", "b", "c"]
2 ["d", "e", "f"]
3 ["g", "h", "i"]
The gluten result is non-deterministic and values_list is not sorted, e.g. :

id values_list
1 ["a", "c", "b"]
3 ["g", "i", "h"]
2 ["f", "e", "d"]

gluten physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(4) HashAggregateTransformer(keys=[(1 - id#0)#15], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[(1 - id)#4, values_list#2])
      +- ^(4) InputIteratorTransformer[(1 - id#0)#15, buffer#11]
         +- CustomShuffleReader coalesced
            +- ShuffleQueryStage 2
               +- ColumnarExchange hashpartitioning((1 - id#0)#15, 20), ENSURE_REQUIREMENTS, [(1 - id#0)#15, buffer#11], [id=#1137], [id=#1137], [OUTPUT] List((1 - id#0):IntegerType, buffer:ArrayType(StringType,false)), [OUTPUT] List((1 - id#0):IntegerType, buffer:ArrayType(StringType,false))
                  +- VeloxAppendBatches 3276
                     +- ^(3) ProjectExecTransformer [hash((1 - id#0)#15, 42) AS hash_partition_key#16, (1 - id#0)#15, buffer#11]
                        +- ^(3) FlushableHashAggregateTransformer(keys=[(1 - id#0)#15], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=true, output=[(1 - id#0)#15, buffer#11])
                           +- ^(3) ProjectExecTransformer [id#0, value#1, (1 - id#0) AS (1 - id#0)#15]
                              +- ^(3) SortExecTransformer [(1 - id#0)#15 ASC NULLS FIRST], false, 0
                                 +- ^(3) ProjectExecTransformer [id#0, value#1, (1 - id#0) AS (1 - id#0)#15]
                                    +- ^(3) SortExecTransformer [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
                                       +- ^(3) InputIteratorTransformer[id#0, value#1]
                                          +- ShuffleQueryStage 1
                                             +- ColumnarExchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id#0, value#1], [id=#1015], [id=#1015], [OUTPUT] List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, value:StringType)
                                                +- VeloxAppendBatches 3276
                                                   +- ^(2) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#14, id#0, value#1]
                                                      +- ^(2) InputIteratorTransformer[id#0, value#1, _nondeterministic#5]
                                                         +- ShuffleQueryStage 0
                                                            +- ColumnarExchange hashpartitioning(_nondeterministic#5, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#5], [id=#944], [id=#944], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                                               +- VeloxAppendBatches 3276
                                                                  +- ^(1) ProjectExecTransformer [hash(_nondeterministic#5, 42) AS hash_partition_key#12, id#0, value#1, _nondeterministic#5]
                                                                     +- ^(1) InputIteratorTransformer[id#0, value#1, _nondeterministic#5]
                                                                        +- RowToVeloxColumnar
                                                                           +- LocalTableScan [id#0, value#1, _nondeterministic#5]

vanilla spark physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- ObjectHashAggregate(keys=[(1 - id#0)#13], functions=[collect_list(value#1, 0, 0)], output=[(1 - id)#4, values_list#2])
   +- CustomShuffleReader coalesced
      +- ShuffleQueryStage 2
         +- Exchange hashpartitioning((1 - id#0)#13, 20), true, [id=#766]
            +- ObjectHashAggregate(keys=[(1 - id#0) AS (1 - id#0)#13], functions=[partial_collect_list(value#1, 0, 0)], output=[(1 - id#0)#13, buf#10])
               +- *(2) Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
                  +- ShuffleQueryStage 1
                     +- Exchange hashpartitioning(id#0, 20), false, [id=#739]
                        +- *(1) Project [id#0, value#1]
                           +- ShuffleQueryStage 0
                              +- Exchange hashpartitioning(_nondeterministic#5, 20), false, [id=#680]
                                 +- LocalTableScan [id#0, value#1, _nondeterministic#5]

Root cause
CollectList that is a TypedImperativeAggregate function is replaced by VeloxCollectList function that is a DeclarativeAggregate in logical optimization phase. Therefore, SortAggregateExec is used in gluten for VeloxCollectList instead of ObjectHashAggregateExec. The SortOrder of SortExec that corresponds to SortExecTransformer [(1 - id#0)#15 ASC NULLS FIRST] in the Gluten physical plan differs from the SortExec added by the 'sort by' clause, which corresponds to SortExecTransformer [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST]. As a result, the result is mismatched with vanilla spark.

Spark version

None

Gluten version

1.2.0

@NEUpanning NEUpanning added bug Something isn't working triage labels Dec 13, 2024
@NEUpanning
Copy link
Contributor Author

@NEUpanning NEUpanning changed the title [VL] Result mismatch in CollectList when 'sort by' is involved [VL] Result mismatch in CollectList when 'sort by' clause is involved Dec 13, 2024
@zhztheplayer
Copy link
Member

zhztheplayer commented Dec 16, 2024

@NEUpanning @kecookier

Thank you for reporting.

After going through the issue and relevant code I am exploring whether we could rework Velox's CollectListAggregate / CollectSetAggregate to make them match on a Spark-side TypedImperativeAggregate. The existing function VeloxBloomFilterAggregate might be a good example to do this.

As you see, Spark collect_list / collect_set are already TypedImperativeAggregate however since they use Java-based serialization protocol (correct me if I am wrong) so we may need to write our own TypedImperativeAggregate functions with another protocol that is compatible with C++ code.

@NEUpanning
Copy link
Contributor Author

NEUpanning commented Dec 20, 2024

@zhztheplayer

Thank you for your idea.

I'm trying to implement this solution, but I am struggling to design a new collect_list / collect_set that is both a TypedImperativeAggregate and compatible with Velox's collect_list / collect_set.

For Velox's BloomFilterAggregate, the intermediate data type is StringView, which is compatible with Gluten's intermediate data type, BINARY. As a result, Gluten's VeloxBloomFilterAggregate only needs to implement serialize and deserialize methods for the transition between intermediate data(BINARY) and buffer data(BloomFilter) in a manner that is similar to Velox's BloomFilterAggregate.

However, the intermediate data type for Velox's collect_list is ARRAY, which is not compatible with BINARY. It seems that this solution won't work unless we change the intermediate data type of one of them, similar to the current workaround where the intermediate data type of Gluten's VeloxBloomFilterAggregate is ARRAY.

@zhztheplayer
Copy link
Member

@NEUpanning Thanks for the explanation, very helpful here.

However, the intermediate data type for Velox's collect_list is ARRAY, which is not compatible with BINARY.

That's right. I think a complete solution should involve changes to Velox code to make sure Velox's collect functions use binary intermediate buffer. Which is reasonable given that Velox could align its Spark functions more precisely with vanilla Spark.

Perhaps using binary buffer is faster as well? At least in the fallback cases. velox_collect_set is now slow when the resident aggregate operator fell back.

@NEUpanning
Copy link
Contributor Author

@zhztheplayer
Thanks for your thoughts.

I think a complete solution should involve changes to Velox code to make sure Velox's collect functions use binary intermediate buffer

This solution sounds good to me. I think what we need to do is (correct me if I am wrong)
(a) change the IntermediateType and OutputType of collect_list/collect_set to VARBINARY
(b) utilize UnsafeRowDeserializer and UnsafeRowFast to do the SerDe
(c) remove the rewrite rules for collect_list/collect_set in gluten

Should we need to open an issue in Velox?

velox_collect_set is now slow when the resident aggregate operator fell back.

And could you provide more details about this? Thanks!

@zhztheplayer
Copy link
Member

zhztheplayer commented Dec 24, 2024

@NEUpanning

Looking good to me, thank for summarizing them up.

BTW regarding (c):

remove the rewrite rules for collect_list/collect_set in gluten

Wanted to hear about your thoughts here but I was thinking we may still need to have typed-imperative versions of velox_collect_list / velox_collect_set. Because collect_list / collect_set will not be compatible with velox_collect_list / velox_collect_set on their intermediate buffer serde protocals.

And could you provide more details about this? Thanks!

Could see code

case class VeloxCollectSet(child: Expression) extends VeloxCollect(child) {
override lazy val evaluateExpression: Expression =
ArrayDistinct(buffer)
override def prettyName: String = "velox_collect_set"
override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}
. VeloxCollectSet is actually doing distinct on a large array buffer when it's falling back. This is wasting memory if there are a lot of duplicated input records.

@NEUpanning
Copy link
Contributor Author

@zhztheplayer

Wanted to hear about your thoughts here but I was thinking we may still need to have typed-imperative versions of velox_collect_list / velox_collect_set. Because collect_list / collect_set will not be not compatible with velox_collect_list / velox_collect_set on their intermediate buffer serde protocals.

Currently, Spark's collect_list/collect_set uses UnsafeArrayData as format for SerDe. If we utilize UnsafeRowDeserializer and UnsafeRowFast that are compatible with UnsafeArrayData format(BTW, they are also used in R2C and C2R) to do the SerDe in Velox, we will no longer need to maintain the rewrite rules.

@zhztheplayer
Copy link
Member

zhztheplayer commented Dec 24, 2024

Spark's collect_list/collect_set uses UnsafeArrayData as format for SerDe.

Aha. Sounds good to me if it's compatible with the Velox row SerDes. Let's remove the collect rewrite rules then. Glad to see we can simplify our code in passing, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

2 participants