-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert three commits related to supporting custom coder in reshuffle #33414
Conversation
- Fix custom coder not being used in Reshuffle (global window) (apache#33339) - Fix custom coders not being used in Reshuffle (non global window) apache#33363 - Add missing to_type_hint to WindowedValueCoder apache#33403
LGTM. Thanks. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Failed tests are unrelated to the changes. |
Just a thought, as this changes coders in some cases, should this be guarded by the update compatibility flag? https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L592 |
As the flag is defined in "StreamingOptions", is it previously designed for using in streaming case? |
Yes, it is designed for "streaming update" where you may have an in-progress aggregation in a shuffle when you do a pipeline update. Then you need the state to be compatible. |
And by "state" here this includes the in-flight encoded elements that were written by the pre-udpate version of the pipeline and will be read by the post-update code. Irrelevant for batch pipelines, but may become so if a runner supports some kind of a resume (from pause or failure) where the code might be updated. |
I see. Thank you both for the clarification! Regarding the possibly breaking changes that could be introduced by reverting this reverted PR, shall we add a new pipeline option rather than overloading this existing flag? Something like "use_legacy_reshuffle" can allow users to switch back to the previous reshuffle code path, where basically FastPrimitivesCoder are used inside regardless of coders/typehints specified by cx. |
I don't think we want to introduce a new flag. The point of the |
I am fine with using a flag like that to avoid adding more options, as I don't like too many options to remember too. However, I cannot deny that both the naming and where it is defined are a little bit confusing to me. We are somehow overloading "update" to both streaming and batch in this context. For batch, cx may only want to "create" a pipeline with existed code that works as before. There is no "update" on the pipeline from their perspective, only an update of Beam version. :) |
It is causing some internal test failure so we revert it for now.