Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36851][table] Introduce TemporalProcessTimeJoinOperator in TemporalJoin with Async State API #25777

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Au-Miner
Copy link

What is the purpose of the change

Introduce TemporalProcessTimeJoinOperator in TemporalJoin with async state api.

Brief change log

  • Introduce TemporalProcessTimeJoinOperator in TemporalJoin with async state api.
  • Add TemporalProcessTimeJoinOperatorTest and TemporalJoinITCase

Verifying this change

Existent tests and new added tests can verify this change.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 10, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

maxRetentionTime,
isLeftOuterJoin);
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
return new AsyncStateTemporalProcessTimeJoinOperator(
Copy link
Contributor

@davidradl davidradl Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Jira mentions SQL and table and the component in the PR title mentions [TABLE], but the fix and test appear to be datastream only. Is there an intent that this effects table API and SQL?

Please could you update the documentation -as this is new changes to the API.

Copy link
Author

@Au-Miner Au-Miner Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. This PR is for completing a part of Flip-473. Its intention is similar to #25320 .

The original API has not been changed, it will only switch the TemporalProcessTimeJoinOperator under the FLINK-TABLE module to be executed by Asynchronous StateTemporalProcessTimeJoinOperator when table. exec.async-state.enable is set to true.

Is there anything else that needs to be modified? If so, please let me know. Thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. This PR is for completing a part of Flip-473. Its intention is similar to #25320 .

The original API has not been changed, it will only switch the TemporalProcessTimeJoinOperator under the FLINK-TABLE module to be executed by Asynchronous StateTemporalProcessTimeJoinOperator when table. exec.async-state.enable is set to true.

Is there anything else that needs to be modified? If so, please let me know. Thank you

ok thanks for the clarification

minRetentionTime,
maxRetentionTime,
isLeftOuterJoin);
if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the other PR, As a heads up, changes like this should update consumedOptions for the JSON serialization.

(Sorry for splitting the conversation across PRs.)

Copy link
Author

@Au-Miner Au-Miner Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The settings related to this configuration have been updated in #25320.

I don't quite understand the meaning of what you mean by 'update consumedOptions for the JSON serialization'. I would greatly appreciate it if you could give me some more tips.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotations here: https://github.com/apache/flink/pull/25777/files#diff-76717e3a4ed9896387a0961a4ff297ccd8a7dd4ef19b49605cf0755fb1d0d1dbR73-R78 show provide information about how to serialize this node to JSON.

Checkout other examples and you'll see a field named consumedOptions which tells which options are used the node.

If I understand, you are adding a new option which is used by this class. As such, you need to advertise it in the annotation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks. I have added the corresponding explanation for consumedOptions.

* <p>1) if your timers collide with clean up timers and you delete them, then state clean-up will
* not be performed, and
*
* <p>2) (this one is the reason why this class does not allow to override the onProcessingTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the brackets phrase would sit better at the end of 1)

@davidradl
Copy link
Contributor

Reviewed by Chi on 12/12/24. appears that this PR is healthily progressing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants