diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1abe45d8b31d0..69129f344e27a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -318,6 +318,14 @@ repos: pass_filenames: true entry: ./scripts/ci/pre_commit/pre_commit_check_xcom_get_value.py additional_dependencies: ['rich'] + # This check might be removed when min-airflow-version in providers is 2.2 + - id: check-ti-run-id-in-providers + name: Check that run_id is not accessed in providers + entry: ./scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py + language: python + pass_filenames: true + files: ^airflow/providers/.*\.py$ + additional_dependencies: ['rich'] - id: update-breeze-file name: Update output of breeze command in BREEZE.rst entry: ./scripts/ci/pre_commit/pre_commit_breeze_cmd_line.sh diff --git a/BREEZE.rst b/BREEZE.rst index 8ddba88c6eba7..87f5a68e56c5e 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2292,15 +2292,15 @@ This is the current syntax for `./breeze <./breeze>`_: build-providers-dependencies chart-schema-lint capitalized-breeze changelog-duplicates check-apache-license check-builtin-literals check-executables-have-shebangs check-extras-order check-hooks-apply - check-integrations check-merge-conflict check-xml daysago-import-check - debug-statements detect-private-key docstring-params doctoc dont-use-safe-filter - end-of-file-fixer fix-encoding-pragma flake8 flynt forbidden-xcom-get-value - codespell forbid-tabs helm-lint identity incorrect-use-of-LoggingMixin - insert-license isort json-schema language-matters lint-dockerfile lint-openapi - markdownlint mermaid migration-reference mixed-line-ending mypy mypy-helm - no-providers-in-core-examples no-relative-imports persist-credentials-disabled - pre-commit-descriptions pre-commit-hook-names pretty-format-json - provide-create-sessions providers-changelogs providers-init-file + check-integrations check-merge-conflict check-ti-run-id-in-providers check-xml + daysago-import-check debug-statements detect-private-key docstring-params doctoc + dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 flynt + forbidden-xcom-get-value codespell forbid-tabs helm-lint identity + incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters + lint-dockerfile lint-openapi markdownlint mermaid migration-reference + mixed-line-ending mypy mypy-helm no-providers-in-core-examples no-relative-imports + persist-credentials-disabled pre-commit-descriptions pre-commit-hook-names + pretty-format-json provide-create-sessions providers-changelogs providers-init-file providers-subpackages-init-file provider-yamls pydevd pydocstyle python-no-log-warn pyupgrade restrict-start_date rst-backticks setup-order setup-extra-packages shellcheck sort-in-the-wild sort-spelling-wordlist stylelint trailing-whitespace diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index 6121d694b62a9..dc13c3cc6dc57 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -164,6 +164,8 @@ require Breeze Docker images to be installed locally. ------------------------------------ ---------------------------------------------------------------- ------------ ``check-merge-conflicts`` Checks that merge conflicts are not being committed ------------------------------------ ---------------------------------------------------------------- ------------ +``check-ti-run-id-in-providers`` Check that run_id is not accessed in providers +------------------------------------ ---------------------------------------------------------------- ------------ ``check-xml`` Checks XML files with xmllint ------------------------------------ ---------------------------------------------------------------- ------------ ``daysago-import-check`` Checks if daysago is properly imported diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index dd6c045330ce0..f13ccef97b786 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -165,7 +165,10 @@ def set( from airflow.models.dagrun import DagRun if not exactly_one(execution_date is not None, run_id is not None): - raise ValueError("Exactly one of run_id or execution_date must be passed") + raise ValueError( + f"Exactly one of run_id or execution_date must be passed. " + f"Passed execution_date={execution_date}, run_id={run_id}" + ) if run_id is None: message = "Passing 'execution_date' to 'XCom.set()' is deprecated. Use 'run_id' instead." @@ -426,7 +429,10 @@ def get_many( from airflow.models.dagrun import DagRun if not exactly_one(execution_date is not None, run_id is not None): - raise ValueError("Exactly one of run_id or execution_date must be passed") + raise ValueError( + f"Exactly one of run_id or execution_date must be passed. " + f"Passed execution_date={execution_date}, run_id={run_id}" + ) if execution_date is not None: message = "Passing 'execution_date' to 'XCom.get_many()' is deprecated. Use 'run_id' instead." warnings.warn(message, PendingDeprecationWarning, stacklevel=3) @@ -538,7 +544,10 @@ def clear( raise TypeError("clear() missing required argument: task_id") if not exactly_one(execution_date is not None, run_id is not None): - raise ValueError("Exactly one of run_id or execution_date must be passed") + raise ValueError( + f"Exactly one of run_id or execution_date must be passed. " + f"Passed execution_date={execution_date}, run_id={run_id}" + ) if execution_date is not None: message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead." diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 7ca930b3d8490..0aea4e1a2f22d 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -283,7 +283,7 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool return {} ti = context['ti'] - run_id = getattr(ti, 'run_id') or context['run_id'] + run_id = context['run_id'] labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id} diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 54f75b68aea68..11304d33392ca 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -129,7 +129,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: return self.log_id_template.format( dag_id=ti.dag_id, task_id=ti.task_id, - run_id=ti.run_id, + run_id=getattr(ti, "run_id", ""), data_interval_start=data_interval_start, data_interval_end=data_interval_end, execution_date=execution_date, diff --git a/breeze-complete b/breeze-complete index 6ac632940c639..df9886c0d045f 100644 --- a/breeze-complete +++ b/breeze-complete @@ -97,6 +97,7 @@ check-extras-order check-hooks-apply check-integrations check-merge-conflict +check-ti-run-id-in-providers check-xml daysago-import-check debug-statements diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py b/dev/breeze/src/airflow_breeze/pre_commit_ids.py index ec5ee6e9dd1eb..208b5a1b3a83c 100644 --- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py +++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py @@ -40,6 +40,7 @@ 'check-hooks-apply', 'check-integrations', 'check-merge-conflict', + 'check-ti-run-id-in-providers', 'check-xml', 'codespell', 'daysago-import-check', diff --git a/scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py b/scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py new file mode 100755 index 0000000000000..c69602f61a243 --- /dev/null +++ b/scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re +import sys +from pathlib import Path +from typing import List + +from rich.console import Console + +if __name__ not in ("__main__", "__mp_main__"): + raise SystemExit( + "This file is intended to be executed as an executable program. You cannot use it as a module." + f"To run this script, run the ./{__file__} command [FILE] ..." + ) + + +console = Console(color_system="standard", width=200) + +errors: List[str] = [] + +GET_ATTR_MATCHER = re.compile(r".*getattr\((ti|TI), ['\"]run_id['\"]\).*") +TI_RUN_ID_MATCHER = re.compile(r".*(ti|TI)\.run_id.*") + + +def _check_file(_file: Path): + lines = _file.read_text().splitlines() + + for index, line in enumerate(lines): + if GET_ATTR_MATCHER.match(line) or TI_RUN_ID_MATCHER.match(line): + errors.append( + f"[red]In {_file}:{index} there is a forbidden construct " + f"(Airflow 2.2+ only):[/]\n\n" + f"{lines[index]}\n\n" + f"[yellow]You should not retrieve run_id from Task Instance in providers as it " + f"is not available in Airflow 2.1[/]\n\n" + f"Use one of: \n\n" + f" context['run_id']\n\n" + f" getattr(ti, 'run_id', '')\n\n" + ) + + +if __name__ == '__main__': + for file in sys.argv[1:]: + _check_file(Path(file)) + if errors: + console.print("[red]Found forbidden usage of TaskInstance's run_id in providers:[/]\n") + for error in errors: + console.print(f"{error}") + sys.exit(1) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 1df7ea05e7a28..20a4d08be957b 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -47,6 +47,7 @@ def create_context(task): "task": task, "ti": task_instance, "task_instance": task_instance, + "run_id": "test", }