Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] #30402 restore upgradability #30403

Merged
merged 3 commits into from
Feb 26, 2024
Merged

Conversation

je-ik
Copy link
Contributor

@je-ik je-ik commented Feb 23, 2024

Closes #30402


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@je-ik je-ik changed the title 30402 flink upgrade [flink] #30402 restore upgradability Feb 23, 2024
@je-ik
Copy link
Contributor Author

je-ik commented Feb 23, 2024

R: @JozoVilcek @robertwb

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@@ -143,6 +143,9 @@
*/
public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
implements Serializable /* See the note above */, HasDisplayData {

private static final long serialVersionUID = 3383862966597863311L;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately Flink uses Java serialization to serialize checkpoints (via SerializableCoder) in legacy Read transform (needed for KafkaIO), so without stabilizing this we are unable to upgrade even with the SerializablePipelineOptions moved back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Hopefully we can clean this up in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that Flink needs to serialize the UnboundedSource (after split) into state to support legacy Read. I'm not sure what is the correct path of fixing this - we could use Kryo, but UnboundedSource actually is Serializable. How does Dataflow serialize the source? I suppose the same applies for ReadViaSDF as well.

@je-ik
Copy link
Contributor Author

je-ik commented Feb 23, 2024

I was able to upgrade from Beam 2.54.0 to 2.55.0-SNAPSHOT with this patch.

@je-ik je-ik force-pushed the 30402-flink-upgrade branch from 6ea2bfa to 7ba6d60 Compare February 23, 2024 13:04
Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than duplicating SerializablePipelineOptions, can we move it back (and change all the references)? (Seems like it better belongs in runners core anyway.)

@je-ik
Copy link
Contributor Author

je-ik commented Feb 23, 2024

Makes sense. I removed the sdk.util version. PTAL.

@je-ik je-ik force-pushed the 30402-flink-upgrade branch from e7cebee to 8b2b9f7 Compare February 23, 2024 20:01
@@ -143,6 +143,9 @@
*/
public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
implements Serializable /* See the note above */, HasDisplayData {

private static final long serialVersionUID = 3383862966597863311L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Hopefully we can clean this up in the future.

@je-ik je-ik force-pushed the 30402-flink-upgrade branch from 8b2b9f7 to 4982f6a Compare February 23, 2024 20:38
@@ -46,6 +46,8 @@ dependencies {
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb this is unfortunate. Do we have a check for API surface that this does not leak anywhere? In old construction-java and in sdk-core this is shaded. I can shade it, but runners-core is currently "plain", so that could be argument against placing it into the runners-core. I'm not familar enough wth the details of the build system to make a decision myself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh...

@kennknowles would be in a better position to answer this.

We could re-introduce runners-core-construction with this as the sole class if needed if we can't place it in runners-core itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shadow configuration apparently means the opposite than I would expect. sdks-java-core already has jackson as a runtime dependecny, thus this only fixes checkstyle to not to complain on undeclared transitive dependency. Should be safe to merge.

@je-ik je-ik merged commit 907a451 into apache:master Feb 26, 2024
36 checks passed
@je-ik je-ik deleted the 30402-flink-upgrade branch February 26, 2024 08:05
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util.construction;
package org.apache.beam.runners.core.construction;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just re-introduce this to the flink runner? Yes the package doesn't match but at least it is isolated and not where anyone else might use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my initial line of thinking, but due to discussion on the ML it ended as generic class. It it actually true that the same problem might apply to other runners as well.

@@ -29,6 +29,7 @@
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not intended, probably some wrong targeted replacement. Will fix that in different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I forgot. The discusson with @robertwb was that we will remove SerializablePipelineOptions from sdks-java-core completely. That is why the packages changed in all affected runners. Actually, I think that is correct, because of the potential serializability issue. The SerializablePipelineOptions is intended for runners, so runners-core-java might be actually the correct place to put it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

SerializablePipelineOptions (per its name) is presumably intended for serialization, so it would be backwards incompatible to move it. Fortunately it's only referenced by runners, so probably makes sense as part of a runner support package. I agree further cleanup could be done here, but that would require resolving the shading/jackson issues too.

@Abacn Abacn mentioned this pull request Apr 4, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[flink] restore SerializablePipelineOptions to enable Flink upgrade
3 participants