Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated logic to allow AWS Batch Hook get_job_description retries to be more effective #37552

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions airflow/providers/amazon/aws/hooks/batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,13 @@ def get_job_description(self, job_id: str) -> dict:

try:
response = self.get_conn().describe_jobs(jobs=[job_id])
return self.parse_job_description(job_id, response)
job_description = self.parse_job_description(job_id, response)
# allow us to retry getting the job description in case
# we called it before AWS could register the job
if job_description:
return job_description
else:
continue
Comment on lines +403 to +404
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem like this continue redundant

except botocore.exceptions.ClientError as err:
# Allow it to retry in case of exceeded quota limit of requests to AWS API
if err.response.get("Error", {}).get("Code") != "TooManyRequestsException":
Expand All @@ -413,22 +419,22 @@ def get_job_description(self, job_id: str) -> dict:
)

@staticmethod
def parse_job_description(job_id: str, response: dict) -> dict:
def parse_job_description(job_id: str, response: dict) -> dict | None:
"""
Parse job description to extract description for job_id.

:param job_id: a Batch job ID

:param response: an API response for describe jobs

:return: an API response to describe job_id
:return: an API response to describe job_id or None if job_id not found in response

:raises: AirflowException
:raises:
"""
jobs = response.get("jobs", [])
matching_jobs = [job for job in jobs if job.get("jobId") == job_id]
if len(matching_jobs) != 1:
raise AirflowException(f"AWS Batch job ({job_id}) description error: response: {response}")
return None

return matching_jobs[0]

Expand Down
Loading