-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix BigQueryValueCheckOperator deferrable mode optimisation #34018
Fix BigQueryValueCheckOperator deferrable mode optimisation #34018
Conversation
PR apache#31872 tried to optimise the deferrable mode in BigQueryValueCheckOperator. However for deciding on whether to defer it just checked the job status but did not actually verified the passed value to check for and returned a success prematurely. This PR adds on the missing logic with the optimisation to check and compare the pass value and tolerations. closes: apache#34010
231c9e0
to
f340b32
Compare
# job.result() returns a RowIterator. Mypy expects an instance of SupportsNext[Any] for | ||
# the next() call which the RowIterator does not resemble to. Hence, ignore the arg-type error. | ||
records = next(job.result()) # type: ignore[arg-type] | ||
self.check_value(records) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're checking the values here, can we remove the check_value
call in the BigQueryValueCheckTrigger
?
hook.value_check(self.sql, self.pass_value, records, self.tolerance) |
I'm not really sure why the check_value there wasn't picking up the failed status 🤔 I'd assume that if the check failed it would raise an exception which would then be caught and returned as a failed trigger status:
airflow/airflow/providers/google/cloud/triggers/bigquery.py
Lines 451 to 454 in 81b85eb
except Exception as e: | |
self.log.exception("Exception occurred while checking for query completion") | |
yield TriggerEvent({"status": "error", "message": str(e)}) | |
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave the DAG a local run and saw that the task was not getting deferred at all, which means it does not defer after setting deferrrable=True.
This happens because of
if job.running(): |
The running state of the job evaluates to False and it does not defer. Had it deferred, the trigger would check it well and pass on the execution to execute_complete in the operator.
But in scenarios where it does not defer, I have added this check later to actually achive what the operator is meant to do :)
Sorry I missed explaining this earlier, hope it makes sense now.
And yes, we cannot remove the check from the Trigger code as when it defers it goes to Triggererer where it checks and then returns to execute_complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting, nice catch!
With PR apache#34018, the google provider depends on the common-sql provider changes that are getting released in the 1.7.2 version of the common-sql provider. Hence, bump the minimum common-sql provider version to 1.7.2 in the Google provider dependencies.
With PR #34018, the google provider depends on the common-sql provider changes that are getting released in the 1.7.2 version of the common-sql provider. Hence, bump the minimum common-sql provider version to 1.7.2 in the Google provider dependencies.
@pankajkoti I just tried out the new provider with the example code I provided on the original issue. However, the result doesn't seem to be quite right as the error is reporting an attribute error on
|
Hi @nathadfield you would also need to upgrade the apache-airflow-providers-common-sql to the latest release 1.7.2 Because the check_value function was added in the same PR and has been added in the latest release of common-sql provider |
@pankajkoti Is that not a requirement of Google provider 10.8? |
@nathadfield I am not exactly sure if that should be a requirement or not. I tried to add it as a requirement while testing the RC in PR #34257 Could you please check the discussion on the PR and see if it sounds alright? cc: @eladkal |
Instinctively it feels like it should be a requirement otherwise how would anyone know that they also need to install the common-sql provider? In fact, for us, we don't even specify the common-sql provider and just accept what get's delivered as part of the Astro runtime image. |
Maybe I should add |
You have three options:
This is when the dependency is "crucial" for the package to work.
https://github.com/apache/airflow/blob/main/airflow/providers/amazon/provider.yaml#L702 In this case amazon providers has The last one is pretty "soft" - i.e. it only works when you use the extra directly - but it is a way to store and present the dependency on min-version and even automate it when you use extra. And this is the Maybe some day it will be added, but currently there is no way to force min version for optional dependency. |
BTW. If someone would like to take what I just described above and find the right place in our contributing documentation to add it and describe in better words - absolutely do not hesitate @pankajkoti @nathadfield -- it seems that we need some kind of description about it, as examples we have in current providers are not enough, it's just a matter of finding the right place and wording it in the way that it will be easy for others to follow it :). Feel free :) |
Actually, it looks like the dependencies in https://github.com/apache/airflow/blob/providers-google/10.8.0/airflow/providers/google/provider.yaml#L78 |
Thank you @potiuk Last week and this week has been real tight due to a team offsite, personal and office work, I have added it to my backlog and soon will try to enhance the docs around this. |
PR #31872 tried to optimise the deferrable mode in
BigQueryValueCheckOperator. However for deciding on
whether to defer it just checked the job status but did not
actually verified the passed value to check for and
returned a success prematurely. This PR adds on the missing
logic with the optimisation to check and compare the pass
value and tolerations.
closes: #34010
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.