Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferrable mode to AWS glue operators (Job & Crawl) #30948

Merged
merged 20 commits into from
May 31, 2023

Conversation

vandonr-amz
Copy link
Contributor

Note: The Glue Job operator has a "verbose" feature that regularly polls logs, so we cannot really convert it to a waiter.

cc @syedahsn

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Apr 28, 2023
airflow/providers/amazon/aws/triggers/glue.py Show resolved Hide resolved
self.verbose = verbose
self.aws_conn_id = aws_conn_id

def serialize(self) -> tuple[str, dict[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I posted a comment about it here #30928 (comment)
I'd like to do a serialize/deserialize test, but I don't know how to deserialize from the tuple returned here 🤔

Simply testing that variables have the value I set them to is a bit pointless imho. It's not testing the behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I see. But that test at least prevents accidentally updating the args like CI will catch if job_name get removed from dict??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I'm thinking is that no one is going to remove job_name entirely "by mistake". They'd do it because they think it's useless for instance. And if the test is not "showing" the behavior expected (but just repeating what's written in the production code), it'd be very natural for that same person to remove it in the test as well.
My opinion is that such tests don't "catch" anything.

A good test here would be serialize, deserialize, and then assert that all fields of the object are the same, with an exclusion list.
If you remove a field you'd understand why it was there. If you add a field but forget the serialize method it'd catch it.

@vandonr-amz
Copy link
Contributor Author

converting to draft until I add some logging providing info on the state being seen by the waiter

@vandonr-amz vandonr-amz marked this pull request as ready for review May 19, 2023 00:02
@vandonr-amz
Copy link
Contributor Author

I tested this by running the system test / example dag in a local airflow instance, modifying it to use deferrable mode

vandonr-amz and others added 2 commits May 30, 2023 13:54
Co-authored-by: Niko Oliveira <[email protected]>
Co-authored-by: Niko Oliveira <[email protected]>

async def run(self) -> AsyncIterator[TriggerEvent]:
hook = GlueJobHook(aws_conn_id=self.aws_conn_id)
await hook.async_job_completion(self.job_name, self.run_id, self.verbose)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a comment than any actionable item, but I wonder if it is possible to include job completion logic here rather than everything in the hook? Is there a benefit to keeping the completion logic in the hook? This ends up being a very thin trigger otherwise. But I understand that that is sometimes necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to have the async method next to the non-async, in the hook. If we move the logic here, it'd make sense to move the logic of job_completion in the operator as well.
Also, I'm fine with thin triggers. It's one more place to read the code to understand what's happening (after the operator and the hook), so I'd be happy keeping it as thin as possible.

self.crawler_name = crawler_name
self.poll_interval = poll_interval
self.aws_conn_id = aws_conn_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to both Triggers, but is there something that is tracking the number of attempts the Trigger makes? The timeout is not set, which means that there is no timeout. What is the exit criteria if the service is indefinitely down?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was no limit to the number of tries if wait_for_completion before, so I kept the behavior 🤷

Copy link
Contributor

@syedahsn syedahsn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just a few comments to address.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants