diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index ae6770b8b02f3..b3ac366d2cf02 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -20,10 +20,13 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249 implementation for BigQuery. """ +import hashlib +import json import logging import time import warnings from copy import deepcopy +from datetime import timedelta, datetime from typing import Any, Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Type, Union from google.api_core.retry import Retry @@ -1443,6 +1446,15 @@ def get_job( job = client.get_job(job_id=job_id, project=project_id, location=location) return job + @staticmethod + def _custom_job_id(configuration: Dict[str, Any]) -> str: + hash_base = json.dumps(configuration, sort_keys=True) + uniqueness_suffix = hashlib.md5(hash_base.encode()).hexdigest() + microseconds_from_epoch = int( + (datetime.now() - datetime.fromtimestamp(0)) / timedelta(microseconds=1) + ) + return f"airflow_{microseconds_from_epoch}_{uniqueness_suffix}" + @GoogleBaseHook.fallback_to_default_project_id def insert_job( self, @@ -1472,7 +1484,7 @@ def insert_job( :type location: str """ location = location or self.location - job_id = job_id or f"airflow_{int(time.time())}" + job_id = job_id or self._custom_job_id(configuration) client = self.get_client(project_id=project_id, location=location) job_data = { diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index f4590023d68aa..93e7db043b18a 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2068,8 +2068,9 @@ def _job_id(self, context): if self.job_id: return f"{self.job_id}_{uniqueness_suffix}" - exec_date = re.sub(r"\:|-|\+", "_", context['execution_date'].isoformat()) - return f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}" + exec_date = context['execution_date'].isoformat() + job_id = f"airflow_{self.dag_id}_{self.task_id}_{exec_date}_{uniqueness_suffix}" + return re.sub(r"\:|-|\+\.", "_", job_id) def execute(self, context: Any): hook = BigQueryHook(