Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHub AAD authentication with Service Principal and Secret . Class Loader could not find AuthBySecretCallBackWithParams class #627

Open
seblea opened this issue Feb 22, 2022 · 3 comments · May be fixed by #685

Comments

@seblea
Copy link

seblea commented Feb 22, 2022

Thanks for filing an issue with us! Below are some guidelines when filing an issue. In general, the more detail the better!

Feature Requests:

  • What issue are you trying to solve? With Service Principal and Secret to authenticate EventHub using Scala
  • How do you want to solve it?
  • What is your use case for this feature? Wherever package I have for AuthBySecretCallBackWithParams class, Based on Event Hub AAD documentation, Scala code has to Authenticate EventHub and should get token and have EventHubConf . I am using Maven Scala .I confirmed that AuthBySecretCallBackWithParams class is in my Jar . I put AuthBySecretCallBackWithParams in the same package with EventHubsConf

//import org.apache.spark.eventhubs.{AuthBySecretCallBackWithParams, ConnectionStringBuilder, EventHubsConf, EventPosition}

//import java.net.URI

//eventHubConfigurations holds all parameters

val params: Map[String,Object ] = Map("authority" ->  eventHubConfigurations.tenantId,
  "clientId" -> eventHubConfigurations.clientId,
  "clientSecret" -> eventHubConfigurations.clientSecret)

val connectionString = ConnectionStringBuilder()
  .setAadAuthConnectionString(new URI(s"sb://${eventHubConfigurations.nameSpace}.servicebus.windows.net/"), 
    eventHubConfigurations.eventHubName)
  .build

val eventHubsConf = EventHubsConf(connectionString)
  .setConsumerGroup(eventHubConfigurations.consumerGroup)
  .setAadAuthCallback(callclass)
  .setAadAuthCallbackParams(params)

sparkSession.readStream.format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider").options(eventHubsConf.toMap).load().writeStream.format("delta")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.withColumn("IngestionTimeStamp", lit(TimeStampFunction()))
.write.format("delta").mode("append")
.option("checkpointLocation", eventHubcheckpointPath).save(eventHubSinkPath)

  }
  .start().awaitTermination()

Bug Report: Class Loader could not find AuthBySecretCallBackWithParams class

  • Actual behavior: Wherever Package AuthBySecretCallBackWithParams class is being getting ClassNotFoundException
    java.lang.ClassNotFoundException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:640)
    at org.apache.spark.eventhubs.EventHubsConf$$anonfun$aadAuthCallback$2.apply(EventHubsConf.scala:638)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.eventhubs.EventHubsConf.aadAuthCallback(EventHubsConf.scala:638)
    at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:73)
    at org.apache.spark.eventhubs.client.ClientConnectionPool$$anonfun$org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient$3.apply(ClientConnectionPool.scala:71)
    at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
    at org.apache.spark.eventhubs.utils.RetryUtils$$anonfun$retryJava$1.apply(RetryUtils.scala:91)
    at org.apache.spark.eventhubs.utils.RetryUtils$.org$apache$spark$eventhubs$utils$RetryUtils$$retryHelper$1(RetryUtils.scala:116)
    at org.apache.spark.eventhubs.utils.RetryUtils$.retryScala(RetryUtils.scala:149)
    at org.apache.spark.eventhubs.utils.RetryUtils$.retryJava(RetryUtils.scala:91)
    at org.apache.spark.eventhubs.client.ClientConnectionPool.org$apache$spark$eventhubs$client$ClientConnectionPool$$borrowClient(ClientConnectionPool.scala:69)
    at org.apache.spark.eventhubs.client.ClientConnectionPool$.borrowClient(ClientConnectionPool.scala:170)
    at org.apache.spark.eventhubs.client.EventHubsClient.org$apache$spark$eventhubs$client$EventHubsClient$$client(EventHubsClient.scala:62)
    at org.apache.spark.eventhubs.client.EventHubsClient.liftedTree1$1(EventHubsClient.scala:187)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal$lzycompute(EventHubsClient.scala:184)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCountLazyVal(EventHubsClient.scala:183)
    at org.apache.spark.eventhubs.client.EventHubsClient.partitionCount(EventHubsClient.scala:176)
    at org.apache.spark.sql.eventhubs.EventHubsSource.org$apache$spark$sql$eventhubs$EventHubsSource$$partitionCount(EventHubsSource.scala:81)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(EventHubsSource.scala:96)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply$mcJ$sp(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
    at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$1.apply(EventHubsSource.scala:96)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.eventhubs.EventHubsSource.(EventHubsSource.scala:95)
    at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:268)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:88)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:85)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:85)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:83)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$6.apply(TreeNode.scala:343)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:196)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:341)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:83)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:65)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:269)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
    org.apache.spark.sql.streaming.StreamingQueryException: org.apache.spark.eventhubs.AuthBySecretCallBackWithParams
    === Streaming Query ===
  • Expected behavior : Running Spark job on Synapse Spark , Expected to stream from EventHub
  • Spark version : Apache Spark 2.4.0, Scala 2.11
  • spark-eventhubs artifactId and version : com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.21
@jeffco11
Copy link

@seblea, Did you find a solution to this?

@LHuang2019
Copy link

@seblea and @jeffco11 were you able to find a solution to this? also facing the same issue.

@KaiLiPlayground
Copy link

KaiLiPlayground commented Mar 12, 2024

@seblea here are what I did to tackle the "class not found issue" in Synapse Spark. Hopefully, this is helpful to you or others to some extent.

Solution 1

Download all dependencies jars after including only azure-{eventhubs, msal4j}:

  • Run mvn dependency:copy-dependencies -DoutputDirectory=<PATH>
    • 40+ jars at the time of testing.
  • Add all these jars to workspace library and upload to Spark-pool.
    • Caveat: (Jar conflicts) Some downloaded jars are the same as the existing ones in Spark-Pool.
    • Exception:
    Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.12.1 at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
    
  • Workaround: remove the conflicted library one at the time.
    • Issue: require manual work and not ideal for package management.

Solution 2

Include azure-{eventhubs, msal4j}:

  • How to identify libraries' prefixes for renaming:
    • Find patterns in the console output of downloaded dependencies paths in IDE or mvn.
  • Shade specific patterns using relocation under Plugin (maven-shade-plugin):
    • shaded.com.fasterxml.jackson.databind
    • shaded.com.microsoft.azure
    • shaded.net.minidev
  • Exclude META-INF directory files since it can cause compilation issues (inform customer, used with discretion):
    • META-INF/*.SF
    • META-INF/*.DSA
    • META-INF/*.RSA
    • META-INF/*.MF

Issue:

  • This solution removes some of the metadata files from the final JAR file, which can cause issues with some tools or libraries.
    • It is recommended to keep the metadata files for better compatibility and troubleshooting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants