Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Slow/no progress with cascaded pandas udfs/mapInPandas in Databricks #10770

Closed
eordentlich opened this issue May 7, 2024 · 8 comments · Fixed by #11395
Closed

[BUG] Slow/no progress with cascaded pandas udfs/mapInPandas in Databricks #10770

eordentlich opened this issue May 7, 2024 · 8 comments · Fixed by #11395
Assignees
Labels
bug Something isn't working

Comments

@eordentlich
Copy link
Contributor

Describe the bug
Successively applied Pandas UDFs and MapInPandas make no progress in Databricks.

Steps/Code to reproduce bug

import pyspark.sql.functions as F
import numpy as np
import pandas as pd
transformed_df = spark.range(1000000) 
from pyspark.sql.functions import pandas_udf

@pandas_udf("int")
def rand_label(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 1")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

@pandas_udf("int")
def rand_label2(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 2")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

transformed_df_w_label_2 = transformed_df.withColumn("label", rand_label(F.lit(0)))

The following then is problematic.

transformed_df = spark.read.parquet("s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet")
features_col = 'feature_array'
prediction_col = 'label'
centers = np.random.rand(1000,3000)
from pyspark.sql.types import StructType, StructField, DoubleType

sc = transformed_df.rdd.context
centers_bc = sc.broadcast(centers)

def partition_score_udf(
    pdf_iter
) :
    local_centers = centers_bc.value.astype(np.float64)
    partition_score = 0.0
    import logging
    logger = logging.getLogger('partition_score_udf')
    logger.info("in partition score udf")
    for pdf in pdf_iter:
        print("in partition score udf")
        input_vecs = np.array(list(pdf[features_col]), dtype=np.float64)
        predictions = list(pdf[prediction_col])
        center_vecs = local_centers[predictions, :]
        partition_score += np.sum((input_vecs - center_vecs) ** 2)
    yield pd.DataFrame({"partition_score": [partition_score]})

total_score = (
  # the below is extremely slow
  # if instead of transformed_df_w_label_2 we apply to transformed_df_w_label it runs fine
  # one difference is that transformed_df_ws_label_2 is itself the output of another pandas udf
  # so data for this case is passing back and forth between jvm and python workers multiple times
    transformed_df_w_label_2.mapInPandas(
        partition_score_udf,  # type: ignore
        StructType([StructField("partition_score", DoubleType(), True)]),
    )
    .agg(F.sum("partition_score").alias("total_score"))
    .toPandas()
)  # type: ignore
total_score = total_score["total_score"][0]  # type: ignore

In this case, at least in 13.3ML, the computation slows dramatically and may be deadlocked.

Expected behavior
No slowdowns, like with baseline Spark without the plugin.

Environment details (please complete the following information)

  • Environment location: Second example is slow only in Databricks 13.3ML
  • Spark configuration settings related to the issue
spark.task.resource.gpu.amount 1
spark.task.cpus 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.04.0.jar:/databricks/spark/python
spark.sql.files.minPartitionNum 2
spark.sql.execution.arrow.maxRecordsPerBatch 10000
spark.executor.cores 8
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.sql.cache.serializer com.nvidia.spark.ParquetCachedBatchSerializer
spark.rapids.memory.gpu.pooling.enabled false
spark.rapids.sql.explain ALL
spark.sql.execution.sortBeforeRepartition false
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.rapids.sql.batchSizeBytes 512m
spark.sql.adaptive.enabled false
spark.sql.execution.arrow.pyspark.enabled true
spark.sql.files.maxPartitionBytes 2000000000000
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

Cluster shape: 2x workers with g5.2xlarge and driver with g4dn.xlarge
Additional context
Also, based on print statement output in the logs, the first udf appears to complete fully before the second one starts. The batches should flow through both python udfs incrementally as is the case with baseline Spark.

Might be related to: #10751

@eordentlich eordentlich added ? - Needs Triage Need team to review and classify bug Something isn't working labels May 7, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 7, 2024
@firestarman
Copy link
Collaborator

could u first try to increase the value of "concurrentGpuTask" to see if we can get any better perf ?

@eordentlich
Copy link
Contributor Author

The computation gets pretty much stuck with essentially no progress. I don't think that will make a difference. Partial stack trace after reaching this point (might be from similar but not identical example to this repro):

Details

at sun.misc.Unsafe.copyMemory(Native Method)
at sun.misc.Unsafe.copyMemory(Unsafe.java:560)
at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:331)
at org.apache.spark.util.DirectByteBufferOutputStream.grow(DirectByteBufferOutputStream.scala:63)
at org.apache.spark.util.DirectByteBufferOutputStream.ensureCapacity(DirectByteBufferOutputStream.scala:49)
at org.apache.spark.util.DirectByteBufferOutputStream.write(DirectByteBufferOutputStream.scala:44)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
- locked <0x0000000768a99630> (a java.io.DataOutputStream)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1(GpuArrowWriter.scala:48)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1$adapted(GpuArrowWriter.scala:42)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter$$Lambda$3498/1244767780.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.handleBuffer(GpuArrowWriter.scala:42)
at ai.rapids.cudf.Table.writeArrowIPCArrowChunk(Native Method)
at ai.rapids.cudf.Table.access$2000(Table.java:41)
at ai.rapids.cudf.Table$ArrowIPCTableWriter.write(Table.java:1739)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1(GpuArrowWriter.scala:99)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1$adapted(GpuArrowWriter.scala:97)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3493/108528776.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write(GpuArrowWriter.scala:97)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write$(GpuArrowWriter.scala:96)
at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.write(GpuArrowWriter.scala:144)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1(GpuArrowWriter.scala:93)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1$adapted(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3492/1674125626.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose$(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.writeAndClose(GpuArrowWriter.scala:144)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner$$anon$1.writeNextInputToStream(GpuArrowPythonRunner.scala:74)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:931)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:851)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0000000765602c48> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:71)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:48)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:635)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
at com.nvidia.spark.rapids.GpuMergeAggregateIterator$$Lambda$3706/165795055.apply(Unknown Source)```

@firestarman
Copy link
Collaborator

firestarman commented Jun 6, 2024

Hi @eordentlich, where can i get the file s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet ?

@firestarman
Copy link
Collaborator

And another try is to set the spark.rapids.sql.python.gpu.enabled to false and remove this line spark.python.daemon.module rapids.daemon_databricks if no GPU is required in the UDFs.

@eordentlich
Copy link
Contributor Author

Hi @eordentlich, where can i get the file s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet ?

It's a public s3 bucket/file. Can you access via spark parquet reader or s3 cli?

@firestarman
Copy link
Collaborator

firestarman commented Jun 7, 2024

It's a public s3 bucket/file. Can you access via spark parquet reader or s3 cli?

I tried to reproduce this locally, but always getting the error as below, seems there is something I missed.

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:216)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1269)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:845)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:794)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)
	at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6431)
	at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6404)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5441)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
	... 22 more
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
	at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
	... 43 more

@crajive
Copy link

crajive commented Jun 11, 2024

Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))

Use AnonymousAWSCredentialsProvider.

Set fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider

eg:

hdfs dfs \
    -D fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider -ls \ 
    s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet/

or via aws cli

aws s3 --no-sign-request ls \
    s3://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet/ 

@firestarman
Copy link
Collaborator

firestarman commented Jun 17, 2024

i can not reproduce this locally.
And another try is to

  • set the spark.rapids.sql.python.gpu.enabled to false, and
  • remove this line spark.python.daemon.module rapids.daemon_databricks if no GPU is required in the UDFs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants