Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] java.lang.RuntimeException: BinaryExpressions must override either eval or nullSafeEval #1073

Closed
martinstuder opened this issue Nov 5, 2020 · 9 comments · Fixed by #1084
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@martinstuder
Copy link

What is your question?

I'm running into the following exception and I'm wondering whether this is an indication of an unsupported binary expression type (I'm running rapids-4-spark v0.2.0) or whether this is a potential indication of a bug:

Py4JJavaError: An error occurred while calling o29050.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 1630.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1630.0 (TID 331268, 10.50.24.17, executor 6): java.lang.RuntimeException: BinaryExpressions must override either eval or nullSafeEval
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeEval(Expression.scala:621)
	at com.nvidia.spark.rapids.CudfBinaryOperator.com$nvidia$spark$rapids$GpuBinaryExpression$$super$nullSafeEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval(GpuExpressions.scala:187)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval$(GpuExpressions.scala:164)
	at com.nvidia.spark.rapids.CudfBinaryOperator.columnarEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval(GpuExpressions.scala:169)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval$(GpuExpressions.scala:164)
	at com.nvidia.spark.rapids.CudfBinaryOperator.columnarEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuFilter$.apply(basicPhysicalOperators.scala:119)
	at com.nvidia.spark.rapids.GpuFilter$.$anonfun$apply$1(basicPhysicalOperators.scala:105)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuFilter$.withResource(basicPhysicalOperators.scala:97)
	at com.nvidia.spark.rapids.GpuFilter$.apply(basicPhysicalOperators.scala:104)
	at com.nvidia.spark.rapids.GpuFilterExec.$anonfun$doExecuteColumnar$2(basicPhysicalOperators.scala:176)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.RemoveEmptyBatchIterator.hasNext(GpuCoalesceBatches.scala:126)
	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.hasNext(GpuCoalesceBatches.scala:186)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:189)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:206)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2347)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2372)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1234)
	at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3682)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3680)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:654)
	at sun.reflect.GeneratedMethodAccessor608.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: BinaryExpressions must override either eval or nullSafeEval
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeEval(Expression.scala:621)
	at com.nvidia.spark.rapids.CudfBinaryOperator.com$nvidia$spark$rapids$GpuBinaryExpression$$super$nullSafeEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval(GpuExpressions.scala:187)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval$(GpuExpressions.scala:164)
	at com.nvidia.spark.rapids.CudfBinaryOperator.columnarEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval(GpuExpressions.scala:169)
	at com.nvidia.spark.rapids.GpuBinaryExpression.columnarEval$(GpuExpressions.scala:164)
	at com.nvidia.spark.rapids.CudfBinaryOperator.columnarEval(GpuExpressions.scala:236)
	at com.nvidia.spark.rapids.RapidsPluginImplicits$ReallyAGpuExpression.columnarEval(implicits.scala:35)
	at com.nvidia.spark.rapids.GpuFilter$.apply(basicPhysicalOperators.scala:119)
	at com.nvidia.spark.rapids.GpuFilter$.$anonfun$apply$1(basicPhysicalOperators.scala:105)
	at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
	at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
	at com.nvidia.spark.rapids.GpuFilter$.withResource(basicPhysicalOperators.scala:97)
	at com.nvidia.spark.rapids.GpuFilter$.apply(basicPhysicalOperators.scala:104)
	at com.nvidia.spark.rapids.GpuFilterExec.$anonfun$doExecuteColumnar$2(basicPhysicalOperators.scala:176)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at com.nvidia.spark.rapids.RemoveEmptyBatchIterator.hasNext(GpuCoalesceBatches.scala:126)
	at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.hasNext(GpuCoalesceBatches.scala:186)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:189)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:206)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
	at org.apache.spark.scheduler.Task.run(Task.scala:117)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Unfortunately I wasn't able to identify what concrete binary expression this is referring to since the failing stage/plan includes many different binary expressions.

Environment Details

  • Azure Databricks 7.0 with Databricks Container Services base image based on nvidia/cuda:10.2-cudnn7-runtime-ubuntu18.04
  • rapids-4-spark v0.2.0 with cudf-0.15-cuda10-2 (CUDA 10.2)

Spark Rapids configuration settings

"spark.plugins": "com.nvidia.spark.SQLPlugin",
"spark.sql.parquet.filterPushdown": "false",
"spark.rapids.sql.incompatibleOps.enabled": "true",
"spark.rapids.memory.pinnedPool.size": "2G",
"spark.task.resource.gpu.amount": 0.1,
"spark.rapids.sql.concurrentGpuTasks": 2,
"spark.locality.wait": "0s",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 2,
"spark.kryo.registrator": "com.nvidia.spark.rapids.GpuKryoRegistrator"
@martinstuder martinstuder added ? - Needs Triage Need team to review and classify question Further information is requested labels Nov 5, 2020
@revans2
Copy link
Collaborator

revans2 commented Nov 5, 2020

Wow, that is a bug in our code and something that we didn't ever expect to see happen. Typically in spark all binary expressions are foldable if the two inputs are scalar values, and then as a part of query planning they get replaced by a single scalar value. In this case it looks like it didn't happen. Can you share more of your query with us so that we can try and reproduce this failure? All I can tell from the stack trace is that your are in the middle of a Filter with a GpuBinaryExpression in the filter by clause.

@revans2 revans2 changed the title [QST] java.lang.RuntimeException: BinaryExpressions must override either eval or nullSafeEval [BUG] java.lang.RuntimeException: BinaryExpressions must override either eval or nullSafeEval Nov 5, 2020
@revans2 revans2 added bug Something isn't working and removed question Further information is requested labels Nov 5, 2020
@martinstuder
Copy link
Author

It looks like the source of the issue is the following pyspark expression:

factor_expr = F.when(
  (F.col('cf') == 1) & (F.col('rt') > 0),
  F.least(F.greatest(F.col('rt') - F.col('aapd'), F.lit(0.0)), F.col('aapl')) / F.col('rt')
).otherwise(F.lit(1.0))

df \
  .withColumn('factor', factor_expr) \
  .withColumn('result', F.col('factor') * F.col('value'))

I'm pretty sure we've used every single one of those binary expressions/operators in other places where they didn't cause any issues.

@revans2
Copy link
Collaborator

revans2 commented Nov 5, 2020

@martinstuder But none of those operations will result in a filter happening. You are adding columns to a data frame not dropping any rows. Does your query have a join, na, dropna or filter in it? If you could send the result of an explain that would be really helpful. If you cannot post it publicly I am happy to see if we can look at the plan offline somehow.

@martinstuder
Copy link
Author

@revans2 I was stripping down the pyspark expression too much. Yes, there is a join:

factor_expr = F.when(
  (F.col('cf') == 1) & (F.col('rt') > 0),
  F.least(F.greatest(F.col('rt') - F.col('aapd'), F.lit(0.0)), F.col('aapl')) / F.col('rt')
).otherwise(F.lit(1.0))

df \
  .withColumn('rt', F.sum('value').over(window)) \
  .join(F.broadcast(programs), on='program_id') \
  .withColumn('factor', factor_expr) \
  .withColumn('result', F.col('factor') * F.col('value')) \
  .select(...) \
  .persist() \
  .checkpoint()

Attached is the failing SQL query plan as shown in the Spark UI (with obfuscated variable names; I hope you don't mind).

failing_plan.txt

@revans2
Copy link
Collaborator

revans2 commented Nov 5, 2020

That is really odd. The plan you posted has no GpuFilter in it, but the stack trace does. The stack trace clearly shows a shuffle writer is asking the GpuFilter for more data and the GpuFilter is blowing up.

@tgravescs Is it possible that the checkpoint is hiding part of the plan?

@tgravescs
Copy link
Collaborator

this code all looks like its before the checkpoint and the plan shows all the other operators so I wouldn't think it would just be hiding a Filter. But I haven't looked much at checkpoint on data frames.

Is this perhaps a filter that happens in df before this code?

@revans2
Copy link
Collaborator

revans2 commented Nov 5, 2020

Ya that is all I could think of is that there is something happening where it goes to an RDD and then back to a data frame again.

@revans2
Copy link
Collaborator

revans2 commented Nov 6, 2020

I found a few places in the code where we might actually hit this. So I will update the code to never call eval or nullSafeEval and instead force us to deal with this situation.

@revans2 revans2 self-assigned this Nov 6, 2020
@revans2 revans2 removed the ? - Needs Triage Need team to review and classify label Nov 6, 2020
@revans2 revans2 added this to the Nov 9 - Nov 20 milestone Nov 6, 2020
@revans2 revans2 added the P0 Must have for release label Nov 6, 2020
@revans2
Copy link
Collaborator

revans2 commented Nov 6, 2020

So I was able to reproduce something similar using coalesce.

spark.range(1, 100).selectExpr("4 + coalesce(5, id)").collect

I am not 100% sure that this is what happened in your case, but I suspect it was something similar. Binary, unary, and trinary operators did not handle this properly. Binary and trinary would try to call into nullSafeEval, which is the error from the stack trace. unary operators just threw an exception. We eventually need to check all GpuExpressions to be sure that they do the right thing when they get one of these, but I will file a follow on issue to make sure we do the right thing in all of those cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants