Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala products are used but Scala API is not on the classpath #177

Open
xandril opened this issue Nov 14, 2024 · 27 comments
Open

Scala products are used but Scala API is not on the classpath #177

xandril opened this issue Nov 14, 2024 · 27 comments

Comments

@xandril
Copy link

xandril commented Nov 14, 2024

Hello! I am not entirely sure if this relates to the flink-scala-api, but I don't know who else can help with the interaction between Flink and Scala.
Currently, I am trying to integrate flink-scala-api using Scala 2.13.
As a base image, I am using https://hub.docker.com/layers/apache/flink/1.20-scala_2.12-java8/images/sha256-37750cd7c7f1f33f295cb9415393276b343c72200860863024973466b54cac03?context=explore - this Docker image.
In my own image, I remove flink-scala2.12-1.20.0.jar and add scala-library-2.13.15.jar in lib directory, then in the configuration classloader.parent-first-patterns.default, I remove 'scala.' As a result, when attempting to submit a jar with Scala code, I get an exception with the message: "Scala products are used but Scala API is not on the classpath."

Can you suggest what I might be missing in this scenario? @novakov-alexey

@novakov-alexey
Copy link
Collaborator

@xandril I think this is because there is no flink-scala-api in the Flink classpath. The error probably comes from here: https://github.com/apache/flink/blob/release-1.20/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java#L100

Did you add this library into your job classpath? If yes, the error "Scala products are used but Scala API is not on the classpath" should disappear then.

@novakov-alexey
Copy link
Collaborator

@xandril the issue is solved? Did you try my suggestion ?

@xandril
Copy link
Author

xandril commented Nov 26, 2024

Sorry for taking so long to respond. Yes, I tried to add it to the classpath in various ways and found this thread: https://github.com/flink-extended/flink-scala-api/issues?q=classpath and tried all the options from it, but the problem remained. I'll prepare a Docker container and repository closer to the weekend (probably on Saturday) to have some reproducibility."

@novakov-alexey
Copy link
Collaborator

I see. Perhaps an example based on the Ververica Flink image would be useful:

FROM registry.ververica.com/v2.13/flink:1.19.0-stream2-scala_2.12-java11
RUN rm /flink/lib/flink-scala_2.12-1.19.0-stream2.jar
# Note: if you plan to use Scala 3, it also requires scala-library 2.13.x due to some deep dependencies between Scalas
ADD https://repo1.maven.org/maven2/org/scala-lang/scala3-library_3/3.6.1/scala3-library_3-3.6.1.jar /flink/lib/
ADD https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.15/scala-library-2.13.15.jar /flink/lib/

@xandril
Copy link
Author

xandril commented Nov 30, 2024

@novakov-alexey I uploaded a repository with a simple job - https://github.com/[xandril/scala-flink-playground](https://github.com/xandril/scala-flink-playground), built it using Gradle, and also prepared a ready-to-use image based on your
example with Ververica Flink and uploaded it to github packages - https://github.com/xandril/scala-flink-playground/pkgs/container/scala-flink-1.20.0-test.

When I run a local cluster inside the container and submit a job via flink run ./usrlib/test_job.jar, I get the following exception

WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Scala products are used but Scala API is not on the classpath.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.IllegalStateException: Scala products are used but Scala API is not on the classpath.
        at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:99)
        at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)
        at org.apache.flinkx.api.KeyedStream.aggregate(KeyedStream.scala:387)
        at org.apache.flinkx.api.KeyedStream.sum(KeyedStream.scala:317)
        at org.example.WordCount$.main(WordCount.scala:20)
        at org.example.WordCount.main(WordCount.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
        ... 9 more

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Nov 30, 2024

Thanks. I can reproduce the error as well.

Although flink-scala-api is inside the fat JAR, "flink run" and its way of launching the Flink job can't load necessary class such as DefaultScalaProductFieldAccessorFactory.

Here is what I believe why:

"flink run" submits job to the existing standalone/session cluster, if the last one is available around. It must be using the Session Mode.

Solution:

In Session Mode, you need to have all dependencies in the Flink cluster. You need to upload the library JAR https://central.sonatype.com/artifact/org.flinkextended/flink-scala-api_2.13/1.20.0_1.2.0 to the flink/lib folder. Then it will work. I expected that this class DefaultScalaProductFieldAccessorFactory would be loaded from the usr JAR as well, but it seems a package prefix "org/apache/flink" makes Flink to use parent classloader, i.e. load everything from the flink/lib.

My approach:
I usually use Application Mode with Ververica Platform. In this mode the fat JAR approach works fine. Especially if you use Kubernetes with a Flink image which contains your user JAR with all dependencies inside.

What I do not understand, why you need to run flink run via Docker image? Is it just for showing the problem here or you are going to use this Docker image further in some Flink platform?

@xandril
Copy link
Author

xandril commented Nov 30, 2024

Yes, I use the flink run command via a Docker image only to reproduce and demonstrate the error. The problem also occurs when using application mode and deploying in Kubernetes in native mode. I tested it at work, in a company test environment K8s cluster with image based on apacahe/flink-1.20.0

@xandril
Copy link
Author

xandril commented Nov 30, 2024

I also tried to add the Scala library and flink-scala-api to the lib folder, but I got an error which usually appears when multiple Scala versions are mixed up - missing collection or Seq stuff

@novakov-alexey
Copy link
Collaborator

If you add flink-scala-api and scala 2.13 to the lib folder AND remove scala 2.12 from the same lib folder then it should work.
Pleas check locally using Flink standalone cluster with single Job Manager and single Task Manager.

@novakov-alexey
Copy link
Collaborator

 > flink run -c com.example.wordCount target/scala-3.3.3/word-count-assembly-0.1.0-SNAPSHOT.jar
Job has been submitted with JobID 714586f237bcd31fa03ae1f78a46ea9f
Program execution finished
Job with JobID 714586f237bcd31fa03ae1f78a46ea9f has finished.
Job Runtime: 686 ms
Screenshot 2024-11-30 at 23 02 06
% ls ../flink/current/lib
flink-cep-1.18.1.jar			flink-dist-1.18.1.jar			flink-table-api-java-uber-1.18.1.jar	log4j-1.2-api-2.17.1.jar		log4j-slf4j-impl-2.17.1.jar
flink-connector-files-1.18.1.jar	flink-json-1.18.1.jar			flink-table-planner-loader-1.18.1.jar	log4j-api-2.17.1.jar			scala-library-2.13.15.jar
flink-csv-1.18.1.jar			flink-scala-api_3-1.18.1_1.2.0.jar	flink-table-runtime-1.18.1.jar		log4j-core-2.17.1.jar			scala3-library_3-3.6.1.jar

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Nov 30, 2024

Screenshot 2024-11-30 at 23 05 02

@xandril
Copy link
Author

xandril commented Dec 2, 2024

I'll try to submit the job with a Docker Compose session cluster and attach the Docker Compose file if I encounter any issues to reproduce. Also, could it be a problem that I'm not using the Ververica Flink image?

@novakov-alexey
Copy link
Collaborator

I would try first without Docker just using your local computer with single Job and Task Manager. This should be enough to test serialization of data between the job tasks.
No, I do not think that Ververica Flink image makes a difference.

@novakov-alexey
Copy link
Collaborator

If there are no updates on this issue, I will close it.

@xandril
Copy link
Author

xandril commented Jan 13, 2025

Sorry, just came back from a long vacation. This week I will try to launch locally as we discussed earlier

@xandril
Copy link
Author

xandril commented Jan 18, 2025

local cluster, scala-api library in shadowJar and scala standart library in lib directory

Image

@novakov-alexey
Copy link
Collaborator

novakov-alexey commented Jan 18, 2025

What if you also put the library https://central.sonatype.com/artifact/org.flinkextended/flink-scala-api_2.13/1.20.0_1.2.0 into the lib directory of the local cluster?

@xandril
Copy link
Author

xandril commented Jan 19, 2025

with jar in the lib dir

Image

Image

@novakov-alexey
Copy link
Collaborator

I see. I am not using Scala 2.13 any more that is why I have not seen such issue before. But we will find the problem.

This Stackoverflow answer says that "org.scala-lang" % "scala-reflect" % "2.13.x" might be required in your classpath.
https://stackoverflow.com/a/37506742

Can you add try to add this module as well and try again?

Scala API is using Scala reflection package for Scala 2 at the LowPrioImplicits.scala.

@xandril
Copy link
Author

xandril commented Jan 21, 2025

i added two dependencies into lib directory - scala-reflect and https://mvnrepository.com/artifact/com.softwaremill.magnolia1_2/magnolia_2.13/1.1.10 and now it works correctly

@xandril
Copy link
Author

xandril commented Jan 21, 2025

@novakov-alexey The solution was to put Magnolia, the Scala library, and the Scala API into the lib folder in the Flink directory, and it would work for Scala 2.13

@novakov-alexey
Copy link
Collaborator

@xandril Great. As I understand Gradle did not package non-provided dependencies by default into a fat JAR?

I use SBT only, it offers assembly plugin which builds fat JAR including all non-provided dependencies into the resulting JAR. Magnolia and Scala Reflect libraries are both transitive dependencies with respect to the user project, which are taken into the fat JAR by assembly as well.

@novakov-alexey
Copy link
Collaborator

@xandril do you want to propose some short documentation summary which could then include it into the library README file?

@xandril
Copy link
Author

xandril commented Jan 22, 2025

@novakov-alexey Sure! I also want to check Scala 3 with Gradle and Scala 2.13 with SBT on my local machine and then make a short documentation summary.

@xandril
Copy link
Author

xandril commented Jan 30, 2025

@novakov-alexey
I've been running jobs with Scala 2, 3 and using sbt, gradle - everything works fine. As I understand, the scala-api should in any case be placed in flink/lib, otherwise the required factory won't start from the Flink code. Maybe some tricks with classpath could help, i dont know.
Also, with Gradle, it works fine if I pack the Scala API into a shadow jar. But then I need to pack the Scala API and store it in the lib folder - this applies to sbt as well.

@xandril
Copy link
Author

xandril commented Jan 30, 2025

So, if that's correct, there are two options:
1. Put the Scala API into the lib folder and into a shadowJar.
2. Put Scala Reflect and Magnolia for Scala 2.13, and Magnolia only for Scala 3, into the lib folder and exclude the Scala API from the shadow jar

I can make a merge request to extend the README in the usage section with that information

@novakov-alexey
Copy link
Collaborator

  1. If we put Scala API into the flink/lib folder on every cluster node, then we do not need to pack it into the user JAR (I guess you call it as shadowJar as per Gradle terminology). Including Scala API in both places should not be required.
  2. Scala Reflect and Magnolia are dependencies of the Scala API. If you build your user JAR (generally called as fat JAR or uber JAR), then no additional work is needed. As for excluding Scala API from the shadow JAR is up to the user: it either has to be in the Flink cluster node CLASSPATH (i.e. in the flink/lib) or it should be put inside the fat JAR. There is no 3rd option.

I think what we should add into the README is:

  • a note that when including Scala API either to a fat JAR or to the flink/lib folder, a user should not forget Scala API's dependencies such as Scala Reflect and Magnolia.

P.S. As for last two libraries, sbt assembly does include all transitive dependencies automatically, i.e. Scala Reflect and Magnolia are taken into the fat JAR as they are dependencies of the Scala API library, that is why I was confused on what we are trying to debug here :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants