Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Databricks 13.3 udf_test failures #9493

Closed
5 tasks
razajafri opened this issue Oct 19, 2023 · 4 comments · Fixed by #9833
Closed
5 tasks

Fix Databricks 13.3 udf_test failures #9493

razajafri opened this issue Oct 19, 2023 · 4 comments · Fixed by #9833
Assignees
Labels
task Work required that improves the product but is not user facing

Comments

@razajafri
Copy link
Collaborator

razajafri commented Oct 19, 2023

Tasks

Preview Give feedback
@sameerz sameerz added the task Work required that improves the product but is not user facing label Oct 24, 2023
@razajafri
Copy link
Collaborator Author

This is the exception that I see when I run either of the tests above.

23/10/27 03:12:01 ERROR SQLUsageLogging: Exception while logging query profile
org.apache.spark.SparkException: [INTERNAL_ERROR] UnevaluableAggregateFunc.aggBufferSchema should not be called.
        at org.apache.spark.SparkException$.internalError(SparkException.scala:85)
        at org.apache.spark.SparkException$.internalError(SparkException.scala:89)
        at org.apache.spark.sql.catalyst.expressions.UnevaluableAggregateFunc.aggBufferSchema(PythonUDF.scala:109)
        at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.hasFixedSizeBuffer(interfaces.scala:359)
        at com.databricks.sql.serialization.marshalling.FallbackAggregateFunctionMarshaller$.tryToProto(ExpressionMarshaller.scala:231)
        at com.databricks.sql.serialization.marshalling.FallbackAggregateFunctionMarshaller$.tryToProto(ExpressionMarshaller.scala:220)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.$anonfun$tryToProto$1(interfaces.scala:154)
        at scala.Option.orElse(Option.scala:447)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.toProto(interfaces.scala:173)
        at com.databricks.sql.serialization.marshalling.ExpressionMarshaller$.toProto(ExpressionMarshaller.scala:57)
        at com.databricks.sql.serialization.marshalling.MarshallingContextImpl.toProto(interfaces.scala:96)
        at com.databricks.sql.serialization.marshalling.ExpressionChildWriter.writeExpression(childUtils.scala:300)
        at com.databricks.sql.serialization.marshalling.AggregateExpressionMarshaller$.buildProto(singleExpressionTypeMarshallers.scala:249)
        at com.databricks.sql.serialization.marshalling.AggregateExpressionMarshaller$.buildProto(singleExpressionTypeMarshallers.scala:245)
        at com.databricks.sql.serialization.marshalling.SingleTagExpressionMarshaller.toProto(ExpressionMarshaller.scala:111)
        at com.databricks.sql.serialization.marshalling.SingleTagExpressionMarshaller.toProto$(ExpressionMarshaller.scala:72)
        at com.databricks.sql.serialization.marshalling.SingleExpressionTypeMarshaller.toProto(ExpressionMarshaller.scala:280)
        at com.databricks.sql.serialization.marshalling.SingleExpressionTypeMarshaller.toProto(ExpressionMarshaller.scala:280)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.$anonfun$tryToProto$1(singleTypeMarshallers.scala:100)
        at scala.Option.map(Option.scala:230)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.tryToProto(singleTypeMarshallers.scala:100)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.toProto(interfaces.scala:173)
        at com.databricks.sql.serialization.marshalling.ExpressionMarshaller$.toProto(ExpressionMarshaller.scala:57)
        at com.databricks.sql.serialization.marshalling.MarshallingContextImpl.toProto(interfaces.scala:96)
        at com.databricks.sql.serialization.marshalling.ExpressionChildWriter.$anonfun$writeExpressions$1(childUtils.scala:313)
        at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
        at scala.collection.immutable.StreamIterator.$anonfun$next$1(Stream.scala:1061)
        at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:1050)
        at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:1050)
        at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1055)
        at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)
        at com.google.protobuf.AbstractMessageLite$Builder.checkForNullValues(AbstractMessageLite.java:348)
        at com.google.protobuf.AbstractMessageLite$Builder.addAll(AbstractMessageLite.java:335)
        at com.databricks.sql.serialization.protobuf.ChildExpressionList$Builder.addAllExpressions(ChildExpressionList.java:1047)
        at com.databricks.sql.serialization.marshalling.ExpressionChildWriter.writeExpressions(childUtils.scala:313)
        at com.databricks.sql.serialization.marshalling.UnknownSparkPlanMarshaller.buildProto(UnknownSparkPlanMarshaller.scala:38)
        at com.databricks.sql.serialization.marshalling.UnknownSparkPlanMarshaller.buildProto$(UnknownSparkPlanMarshaller.scala:33)
        at com.databricks.sql.serialization.marshalling.FallbackSparkPlanMarshaller$.buildProto(SparkPlanMarshaller.scala:603)
        at com.databricks.sql.serialization.marshalling.FallbackSparkPlanMarshaller$.toProto(SparkPlanMarshaller.scala:612)
        at com.databricks.sql.serialization.marshalling.FallbackSparkPlanMarshaller$.toProto(SparkPlanMarshaller.scala:603)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.$anonfun$toProto$1(interfaces.scala:173)
        at scala.Option.getOrElse(Option.scala:189)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.toProto(interfaces.scala:173)
        at com.databricks.sql.serialization.marshalling.SparkPlanMarshaller$.toProto(SparkPlanMarshaller.scala:55)
        at com.databricks.sql.serialization.marshalling.MarshallingContextImpl.toProto(interfaces.scala:94)
        at com.databricks.sql.serialization.marshalling.SparkPlanChildWriter.writePlan(childUtils.scala:339)
        at com.databricks.sql.serialization.marshalling.ShuffleExchangeExecMarshaller$.buildProto(singleSparkPlanTypeMarshallers.scala:941)
        at com.databricks.sql.serialization.marshalling.ShuffleExchangeExecMarshaller$.buildProto(singleSparkPlanTypeMarshallers.scala:932)
        at com.databricks.sql.serialization.marshalling.SingleSparkPlanTypeMarshaller.toProto(SparkPlanMarshaller.scala:670)
        at com.databricks.sql.serialization.marshalling.SingleSparkPlanTypeMarshaller.toProto(SparkPlanMarshaller.scala:662)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.$anonfun$tryToProto$1(singleTypeMarshallers.scala:100)
        at scala.Option.map(Option.scala:230)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.tryToProto(singleTypeMarshallers.scala:100)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.toProto(interfaces.scala:173)
        at com.databricks.sql.serialization.marshalling.SparkPlanMarshaller$.toProto(SparkPlanMarshaller.scala:55)
        at com.databricks.sql.serialization.marshalling.MarshallingContextImpl.toProto(interfaces.scala:94)
        at com.databricks.sql.serialization.marshalling.SparkPlanChildWriter.writePlan(childUtils.scala:339)
        at com.databricks.sql.serialization.marshalling.SortExecMarshaller$.buildProto(singleSparkPlanTypeMarshallers.scala:244)
        at com.databricks.sql.serialization.marshalling.SortExecMarshaller$.buildProto(singleSparkPlanTypeMarshallers.scala:239)
        at com.databricks.sql.serialization.marshalling.SingleSparkPlanTypeMarshaller.toProto(SparkPlanMarshaller.scala:670)
        at com.databricks.sql.serialization.marshalling.SingleSparkPlanTypeMarshaller.toProto(SparkPlanMarshaller.scala:662)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.$anonfun$tryToProto$1(singleTypeMarshallers.scala:100)
        at scala.Option.map(Option.scala:230)
        at com.databricks.sql.serialization.marshalling.SingleTypeMarshallerRegistry.tryToProto(singleTypeMarshallers.scala:100)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$1.tryToProto(interfaces.scala:154)
        at com.databricks.sql.serialization.marshalling.PartialMarshaller$$anon$2.toProto(interfaces.scala:173)
        at com.databricks.sql.serialization.marshalling.SparkPlanMarshaller$.toProto(SparkPlanMarshaller.scala:55)
        at com.databricks.sql.serialization.marshalling.MarshallingContextImpl.toProto(interfaces.scala:94)
        at com.databricks.sql.logging.QueryProfile$.toProto(QueryProfile.scala:570)
        at com.databricks.sql.logging.QueryProfile$.toProtoBase64(QueryProfile.scala:702)
        at com.databricks.sql.logging.SQLUsageLogging$.logQueryProfile(SQLUsageLogging.scala:251)
        at com.databricks.sql.logging.SQLUsageLogging$.tryLogQueryProfile(SQLUsageLogging.scala:301)
        at com.databricks.sql.logging.SQLUsageLogging$.$anonfun$tryLogQueryProfileWrapper$1(SQLUsageLogging.scala:384)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:118)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:81)
        at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:80)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:66)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:115)
        at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:118)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)


@razajafri razajafri self-assigned this Oct 27, 2023
@jlowe jlowe changed the title Fix Integration Test Failures related to udf_test.py Fix Databricks 13.3 udf_test failures Nov 16, 2023
@jlowe jlowe self-assigned this Nov 16, 2023
@jlowe
Copy link
Contributor

jlowe commented Nov 16, 2023

I was not able to reproduce the exception listed in the description, but I did see hangs. It appears Databricks 13.3 has pulled in SPARK-44705 which changes the Python worker handling to use a single thread rather than separate reader and writer threads. That leads to hangs in UDF tests because our support assumes there are reader and writer threads.

@firestarman
Copy link
Collaborator

firestarman commented Nov 21, 2023

I think I am quite close to the root cause, and the fix is likely to be a big one and take two or three days. Because I have to deal with all the existing shims and prepare for the coming 4.0 which is using the new PartitionEvaluator and ArrowPythonWithNamedArgumentRunner.

@firestarman
Copy link
Collaborator

firestarman commented Nov 22, 2023

I have fixed the hangs, but there are still some failing tests. Will continue debugging. Current change is on https://github.com/firestarman/spark-rapids/tree/udf-db133

firestarman added a commit that referenced this issue Nov 30, 2023
fix #9493
fix #9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <[email protected]>
razajafri pushed a commit to razajafri/spark-rapids that referenced this issue Jan 25, 2024
fix NVIDIA#9493
fix NVIDIA#9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <[email protected]>
razajafri added a commit that referenced this issue Jan 26, 2024
* Download Maven from apache.org archives (#10225)

Fixes #10224 

Replace broken install using apt by downloading Maven from apache.org.

Signed-off-by: Gera Shegalov <[email protected]>

* Fix a hang for Pandas UDFs on DB 13.3[databricks] (#9833)

fix #9493
fix #9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <[email protected]>

* Fix a potential data corruption for Pandas UDF (#9942)

This PR moves the BatchQueue into the DataProducer to share the same lock as the output iterator
returned by asIterator,  and make the batch movement from the input iterator to the batch queue be
an atomic operation to eliminate the race when appending the batches to the queue.

* Do some refactor for the Python UDF code to try to reduce duplicate code. (#9902)

Signed-off-by: Firestarman <[email protected]>

* Fixed 330db Shims to Adopt the PythonRunner Changes [databricks] (#10232)

This PR removes the old 330db shims in favor of the new Shims, similar to the one in 341db. 

**Tests:**
Ran udf_test.py on Databricks 11.3 and they all passed. 

fixes #10228 

---------

Signed-off-by: raza jafri <[email protected]>

---------

Signed-off-by: Gera Shegalov <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: raza jafri <[email protected]>
Co-authored-by: Gera Shegalov <[email protected]>
Co-authored-by: Liangcai Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
task Work required that improves the product but is not user facing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants