-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deferrable mode for ECS operators #31881
Conversation
@@ -67,6 +71,15 @@ def execute(self, context: Context): | |||
"""Must overwrite in child classes.""" | |||
raise NotImplementedError("Please implement execute() in subclass") | |||
|
|||
def _complete_exec_with_cluster_desc(self, context, event=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this callback is shared between create and delete cluster operators, so I put it there. It felt like a better solution than copy-pasting it for both.
... # TODO return last log line but task_log_fetcher will always be None here | ||
|
||
@provide_session | ||
def _after_execution(self, session=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to extract this to reuse it in execute
and execute_complete
, but I wouldn't find a great name for it.
def _start_wait_check_task(self, context): | ||
def _start_wait_task(self, context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the check went to _after_execution
|
||
yield TriggerEvent({"status": "success", "task_arn": self.task_arn}) | ||
|
||
async def _forward_logs(self, logs_client, next_token: str | None = None) -> str | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is code that very inspired by https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/hooks/logs.py#L53 but since I need to use an async call in the middle, refactoring the existing code to allow that seemed like a lot of added complexity to the existing code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be part of the hook implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of the logs hook you mean ? idk, I wasn't sure if it was a good idea to duplicate the code in the hook, I found it easier to write something fitting exactly my need here.
# In some circumstances the ECS Cluster is deleted immediately, | ||
# so there is no reason to wait for completion. | ||
# if the cluster doesn't have capacity providers that are associated with it, | ||
# the deletion is instantaneous, and we don't need to wait for it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cluster_details
has the capacityProviders
associated with the nodegroup. Would that be a better way to decide whether we want to wait for completion or not?
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/delete_cluster.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, we could do that, but the check on the status above is already taking care of that. We can write a different check, but the result would be the same.
) | ||
# we reach this point only if the waiter met a success criteria | ||
yield TriggerEvent({"status": "success", "arn": self.cluster_arn}) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that, in this case, by using a generic Trigger for both the cluster_active and cluster_inactive, we are simplifying the code, but at the expense of user experience. Specifically, I think the exceptions raised should tell the user what the specific error was (i.e. create cluster failed etc.)
I don't know if this is a moot point because of the EMR Serverless custom waiters PR, which will clean up a lot of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd vote for not modifying that code now since we can replace it with the common helper as soon as it's merged
I believe this will close #31636 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This method is just causing trouble by handling several things, it's hiding the logic. A bug fixed in apache#31838 was reintroduced in apache#31881 because the check that was skipped on `wait_for_completion` was not skipped anymore. The bug is that checking the status will always fail if not waiting for completion, because obviously the task is not ready just after creation.
This method is just causing trouble by handling several things, it's hiding the logic. A bug fixed in #31838 was reintroduced in #31881 because the check that was skipped on `wait_for_completion` was not skipped anymore. The bug is that checking the status will always fail if not waiting for completion, because obviously the task is not ready just after creation.
This method is just causing trouble by handling several things, it's hiding the logic. A bug fixed in apache#31838 was reintroduced in apache#31881 because the check that was skipped on `wait_for_completion` was not skipped anymore. The bug is that checking the status will always fail if not waiting for completion, because obviously the task is not ready just after creation.
Add deferrable mode to ECS operators that can make use of it:
The trickiest one is the Run Task operator because it made use of a thread to fetch logs while waiting. I implemented this in the triggerer by staying in the same thread, but pulling logs between waiter checks.
closes #31636
cc @syedahsn