Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: CoGroupByKey throws with large (10K+) output groups on Flink runner #31981

Open
2 of 17 tasks
a-kazakov opened this issue Jul 25, 2024 · 2 comments
Open
2 of 17 tasks

Comments

@a-kazakov
Copy link

a-kazakov commented Jul 25, 2024

What happened?

CoGroupByKey when being run on Flink runner with the default settings throws IllegalStateException: GBK result is not re-iterable. when trying to iterate over any of CoGbkResult iterarables. This only happens on large collections, when Beam doesn't load the CoGBK result into memory.

Minimum reproduction (I used Kotlin for readability and the project size): https://github.com/a-kazakov/beam_flink_failure_repro/blob/main/src/main/kotlin/Main.kt - this example creates two collections of 20K KV pairs each, all elements with the same key, and then runs CoGBK on it. Runs fine on DirectRunner, but fails on FlinkRunner.

Using --reIterableGroupByKeyResult as suggested by the error message isn't always an option since it requires the collection to fit in memory, which isn't always the case.

My preliminary investigation suggests that the bug could have been introduced by #30851. The problem is that the result of GBK with the Flink runner is once-iterable. That means that that the .iterator() method can only be called once for the GBK value. However, currently, it is called twice in CoGBKResult.

  • The first time it happens when checking whether the result fits in memory.
  • The second time it happens when creating an instance of RecordingFilteringIterator.

A possible fix would be amending RecordingFilteringIterator in such a way that it accepts an Iterator, not an Iterable. Also, with this approach already retrieved elements from the iterator (during an attempt to load everything to memory) should be accounted for - possibly, by using iteratorChain or something similar.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@yolgun
Copy link

yolgun commented Aug 1, 2024

We are encountering the same issue. I can confirm that some of our Flink&Beam pipelines work with Beam 2.55, but they fail with Beam 2.56.

@je-ik
Copy link
Contributor

je-ik commented Aug 14, 2024

Does using --reIterableGroupByKeyResult flag help?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants