-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add deferrable
param in EmrContainerSensor
#30945
Add deferrable
param in EmrContainerSensor
#30945
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just a few minor comments.
40c947f
to
619ab51
Compare
e25f107
to
fd0db14
Compare
async with self.hook.async_conn as client: | ||
waiter = self.hook.get_waiter("container_job_complete", deferrable=True, client=client) | ||
attempt = 0 | ||
while attempt < self.max_attempts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though I have seen it elsewhere, to me, putting "max_attempts" as a trigger parameter doesn't make sense.
The "right" way to set a time limit on a trigger is the deferral timeout. IMO, if an existing operator has max_attempts param, then we should just calculate the deferral timeout based on that number and use that.
This avoids the added complexity in the trigger, the extra signature param, and it avoids the odd fact that, if max attempts is used, then if a triggerer dies and the trigger is picked up again on another machine, it will start from zero and then get killed by the timeout anyway.
What do you think about this @pankajastro ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. thank you for raising it.
honestly, I didn't think about the possibility that a trigger can restart while writing this. I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a point that was brought up in a previous PR (by @dstandish :D), and the solution we went with was to essentially use both. Set a timeout on the operator level that is computed from the given parameters (as well as a 60 second buffer), but also use the number of attempts as a metric. I think it is beneficial to have the number of attempts used as a metric in the Triggers, but it is definitely a good idea to have the timeout
set on the operator level in case of a Trigger restart, as mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adeed the timeout while deferring to handle
if a triggerer dies and the trigger is picked up again on another machine
removed max_attempt to reduce the code maintenance
1244bd9
to
b1edbf7
Compare
bda9e41
to
5ed07df
Compare
a56de48
to
1bcbf8e
Compare
Add the deferrable param in EmrContainerSensor. This will allow running EmrContainerSensor in an async way that means we only submit a job from the worker to run a job then defer to the trigger for polling and wait for a job the job status and the worker slot won't be occupied for the whole period of task execution.
1bcbf8e
to
df24dc2
Compare
Add the deferrable param in EmrContainerSensor.
This will allow running EmrContainerSensor in an async way
that means we only submit a job from the worker to run a job
then defer to the trigger for polling and wait for a job the job status
and the worker slot won't be occupied for the whole period of
task execution.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.