Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add return_immediately parameter to PubSubPullSensor class #41842

Conversation

arnaubadia
Copy link
Contributor

@arnaubadia arnaubadia commented Aug 28, 2024

Closes: #41838

Add optional return_immediately parameter to PubSubPullSensor (default: True for backwards compatibility). This allows setting return_immediately=False to resolve issue #41838 and improve performance. The parameter was originally removed from the class but hard-coded to True in the PubSubHook.pull call in the poke function in PR #23231.

Note: SubscriberClient.pull (called by PubSubHook.pull) warns that setting return_immediately=True is discouraged due to performance impact.

return_immediately (bool):
    Optional. If this field set to true, the system will
    respond immediately even if it there are no messages
    available to return in the ``Pull`` response. Otherwise,
    the system may wait (for a bounded amount of time) until
    at least one message is available, rather than returning
    no messages. Warning: setting this field to ``true`` is
    discouraged because it adversely impacts the performance
    of ``Pull`` operations. We recommend that users do not
    set this field.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Aug 28, 2024
Copy link

boring-cyborg bot commented Aug 28, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@arnaubadia arnaubadia changed the title Add return_immediately as argument to PubSubPullSensor Add return_immediately as parameter to PubSubPullSensor class Aug 28, 2024
@arnaubadia arnaubadia changed the title Add return_immediately as parameter to PubSubPullSensor class Add return_immediately parameter to PubSubPullSensor class Aug 28, 2024
@romsharon98 romsharon98 requested a review from eladkal August 29, 2024 05:07
@eladkal
Copy link
Contributor

eladkal commented Aug 29, 2024

The parameter was originally removed from the class but hard-coded to True in the PubSubHook.pull call in the poke function in PR #23231.

My PR only removed the deprecated code. it was deprecated 4 years ago in #7766

The original PR claims

    :param return_immediately:
        (Deprecated) This is an underlying PubSub API implementation detail.
        It has no real effect on Sensor behaviour other than some internal wait time before retrying
        on empty queue.
        The Sensor task will (by definition) always wait for a message, regardless of this argument value.
        If you want a non-blocking task that does not to wait for messages, please use
        :class:`airflow.providers.google.cloud.operators.PubSubPullOperator`
        instead.

You didn't explain why it's not valid any more. Specifically about underlying PubSub API implementation detail and why PubSubPullOperator isn't covering the use case (operator instead of sensor)

@eladkal eladkal closed this Aug 29, 2024
@eladkal eladkal reopened this Aug 29, 2024
@arnaubadia
Copy link
Contributor Author

arnaubadia commented Aug 29, 2024

@eladkal

You didn't explain why it's not valid any more. Specifically about underlying PubSub API implementation detail

The removed code in PR #23231 claims that return_immediately "has no effect on PubSubPullSensor behaviour" and "It should be left as default value of True."

I claim this is not true based on empirical evidence (sometimes the sensor didn't pull messages even if there are unacked messages available) and based on the comment in the pull function of SubscriberClient (from google.cloud.pubsub_v1 import SubscriberClient), which is called by PubSubHook.pull in the poke function of the sensor:

return_immediately (bool):
    Optional. If this field set to true, the system will
    respond immediately even if it there are no messages
    available to return in the ``Pull`` response. Otherwise,
    the system may wait (for a bounded amount of time) until
    at least one message is available, rather than returning
    no messages. Warning: setting this field to ``true`` is
    discouraged because it adversely impacts the performance
    of ``Pull`` operations. We recommend that users do not
    set this field.

why PubSubPullOperator isn't covering the use case (operator instead of sensor)

The PubSubPullOperator doesn't cover my use case because I don't want to continue with the DAG run if there are no messages available. I want to wait until there's a new message available and use the contents of the message in the DAG run.

@eladkal
Copy link
Contributor

eladkal commented Aug 29, 2024

Lets wait for review of someone from the Google team cc @VladaZakharova

@potiuk potiuk merged commit 1a255f5 into apache:main Sep 9, 2024
58 checks passed
Copy link

boring-cyborg bot commented Sep 9, 2024

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sometimes PubSubPullSensor doesn't pull messages even if the PubSub subscription has unacked messages
5 participants