From b35722e68385c131d941fea836b6e388976911bf Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 20 May 2024 15:53:34 -0400 Subject: [PATCH] Retry when the BQ job error can be retried --- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index fab5a8611591..67fd5d47fb52 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -651,9 +651,19 @@ def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, max_retries=0): job_reference.projectId, job_reference.jobId, job_reference.location) logging.info('Job %s status: %s', job.id, job.status.state) if job.status.state == 'DONE' and job.status.errorResult: - raise RuntimeError( - 'BigQuery job {} failed. Error Result: {}'.format( - job_reference.jobId, job.status.errorResult)) + if 'Retrying may solve the problem.' in job.status.errorResult.message: + logging.info( + 'BigQuery job %s failed but can be retried. ' + 'Error Result: %s', + job_reference.jobId, + job.status.errorResult) + time.sleep(sleep_duration_sec) + if max_retries != 0 and retry >= max_retries: + raise RuntimeError('The maximum number of retries has been reached') + else: + raise RuntimeError( + 'BigQuery job {} failed. Error Result: {}'.format( + job_reference.jobId, job.status.errorResult)) elif job.status.state == 'DONE': return True else: