Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible case class serialization/deserialization bug #106

Closed
BjarkeTornager opened this issue Apr 8, 2024 · 5 comments
Closed

Possible case class serialization/deserialization bug #106

BjarkeTornager opened this issue Apr 8, 2024 · 5 comments

Comments

@BjarkeTornager
Copy link

I have been using your library for many months without any issues, and thanks for making this project available, however I am encountering a weird issue with a MatchError on a None that seems to be a library issues - maybe. I have tried to debug my Flink job but I cannot see that there should be any issue with the application code as this does not show up in my unit nor integration tests, it seems to only happen on substantial load on the production data. I can recreate the error both locally and in my Flink cluster when running on the production data coming from Kafka. I am using Flink's KeyedCoProcessFunction to do data enrichment and I am getting the following error:

[info] [2024-04-08 10:06:42,187] WARN Co-Keyed-Process (1/1)#0 (5d5cf71827a90860134cddc3077e286b_e95aaff440415b0a80c0a4fa5a9ff133_0_0) switched from RUNNING to FAILED with failure cause: (org.apache.flink.runtime.taskmanager.Task)
[info] scala.MatchError: None (of class scala.None$)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:68)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize$$anonfun$1(ListSerializer.scala:20)
[info] 	at scala.runtime.function.JProcedure1.apply(JProcedure1.java:15)
[info] 	at scala.runtime.function.JProcedure1.apply(JProcedure1.java:10)
[info] 	at scala.collection.immutable.List.foreach(List.scala:333)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:20)
[info] 	at org.apache.flinkx.api.serializer.ListSerializer.serialize(ListSerializer.scala:18)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:66)
[info] 	at org.apache.flinkx.api.serializer.OptionSerializer.serialize(OptionSerializer.scala:63)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:104)
[info] 	at org.apache.flinkx.api.serializer.CaseClassSerializer.serialize(CaseClassSerializer.scala:99)
[info] 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:173)
[info] 	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:44)
[info] 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info] 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:152)
[info] 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:108)
[info] 	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:101)
[info] 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:53)
[info] 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:60)
[info] 	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:32)
[info] 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
[info] 	at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
[info] 	at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:88)
[info] 	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
[info] 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
[info] 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
[info] 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
[info] 	at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
[info] 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
[info] 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
[info] 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
[info] 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[info] 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[info] 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[info] 	at java.base/java.lang.Thread.run(Thread.java:840)
[info] [2024-04-08 10:06:42,256] WARN Failed to trigger or complete checkpoint 1 for job 926262f3df68336e0973bba535007918. (0 consecutive failed attempts so far) (org.apache.flink.runtime.checkpoint.CheckpointFailureManager)
[info] org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
[info] 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:2056)
[info] 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1580)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1146)
[info] 	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1118)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:600)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:382)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:358)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:326)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
[info] 	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
[info] 	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
[info] 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
[info] 	at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
[info] 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info] 	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
[info] 	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
[info] 	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
[info] 	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[info] 	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[info] 	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[info] 	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[info] 	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[info] 	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[info] 	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[info] 	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[info] 	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[info] 	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[info] 	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[info] 	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[info] 	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info] 	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
[info] 	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

I am using the following library versions:

  • scala -> 3.4.1
  • flink-scala-api -> 1.18.1_1.1.4
  • flink -> 1.19.0 (tried to downgrade to 1.18.1 but same things happens)

Any ideas what could cause this?

@novakov-alexey
Copy link
Collaborator

Thanks for the feedback!

It is quite weird error indeed. Perhaps it is related to latest Scala version.
Scala 3.3.3 is a long-term-supported version and the one you use is the latest one.
What if you try to compile your Flink job with Scala 3.3.3? Does issue arise or not?

Another guess I have is a Flink classloading issue. There can be two class loaders in your Flink JVM, where each loads own version of Scala Option type, so that Scala match operator blows up (can you have two different Scala libraries in runtime?). Some other effects of this issue were fixed in this PR: #102 (although this is different area)

@BjarkeTornager
Copy link
Author

Sorry about the late reply.

I have tried to downgrade to Scala 3.3.3 still the same issue. I could not see that I should have two different Scala versions on class path. However, I am running in a Flink k8s operator environment and cannot replicate locally so it is possible. From what I can tell it seems to be related to using a List type in a case class but I am not 100% sure at this point since I tried a lot of things. But when removing the List type I don't get this issue it seems.

However, when I use native Flink with POJOs instead of case classes I don't have any issues - replicating the case class structure 1:1 with POJO. Using POJOs with Scala is suboptimal compared to case classes so I will try to rewrite my current job back to using this API at a later stage.

Thanks for your suggestions 👍

@novakov-alexey
Copy link
Collaborator

Thanks for reply.
It would be very helpful if you could share a minimal code as example of the issue, so that we could debug it here.

@BjarkeTornager
Copy link
Author

@novakov-alexey, I have not been able to make a minimal example that reproduces the above issue - the issue is that I cannot reproduce it locally but only in the Flink Cluster on a substantial load. Can we close this issue as of now and then I can reopen a new issue if I can reproduce it in a minimal example at a later time? Apologise for any inconvenience.

@novakov-alexey
Copy link
Collaborator

@BjarkeTornager thanks for update. This is definitely interesting issue. Most likely it is classloading issue.
We can close it for now and reopen it later if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants