Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 #22385

Merged
merged 1 commit into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ repos:
pass_filenames: true
entry: ./scripts/ci/pre_commit/pre_commit_check_xcom_get_value.py
additional_dependencies: ['rich']
# This check might be removed when min-airflow-version in providers is 2.2
- id: check-ti-run-id-in-providers
name: Check that run_id is not accessed in providers
entry: ./scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py
language: python
pass_filenames: true
files: ^airflow/providers/.*\.py$
additional_dependencies: ['rich']
- id: update-breeze-file
name: Update output of breeze command in BREEZE.rst
entry: ./scripts/ci/pre_commit/pre_commit_breeze_cmd_line.sh
Expand Down
18 changes: 9 additions & 9 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2292,15 +2292,15 @@ This is the current syntax for `./breeze <./breeze>`_:
build-providers-dependencies chart-schema-lint capitalized-breeze
changelog-duplicates check-apache-license check-builtin-literals
check-executables-have-shebangs check-extras-order check-hooks-apply
check-integrations check-merge-conflict check-xml daysago-import-check
debug-statements detect-private-key docstring-params doctoc dont-use-safe-filter
end-of-file-fixer fix-encoding-pragma flake8 flynt forbidden-xcom-get-value
codespell forbid-tabs helm-lint identity incorrect-use-of-LoggingMixin
insert-license isort json-schema language-matters lint-dockerfile lint-openapi
markdownlint mermaid migration-reference mixed-line-ending mypy mypy-helm
no-providers-in-core-examples no-relative-imports persist-credentials-disabled
pre-commit-descriptions pre-commit-hook-names pretty-format-json
provide-create-sessions providers-changelogs providers-init-file
check-integrations check-merge-conflict check-ti-run-id-in-providers check-xml
daysago-import-check debug-statements detect-private-key docstring-params doctoc
dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 flynt
forbidden-xcom-get-value codespell forbid-tabs helm-lint identity
incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
lint-dockerfile lint-openapi markdownlint mermaid migration-reference
mixed-line-ending mypy mypy-helm no-providers-in-core-examples no-relative-imports
persist-credentials-disabled pre-commit-descriptions pre-commit-hook-names
pretty-format-json provide-create-sessions providers-changelogs providers-init-file
providers-subpackages-init-file provider-yamls pydevd pydocstyle python-no-log-warn
pyupgrade restrict-start_date rst-backticks setup-order setup-extra-packages
shellcheck sort-in-the-wild sort-spelling-wordlist stylelint trailing-whitespace
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ require Breeze Docker images to be installed locally.
------------------------------------ ---------------------------------------------------------------- ------------
``check-merge-conflicts`` Checks that merge conflicts are not being committed
------------------------------------ ---------------------------------------------------------------- ------------
``check-ti-run-id-in-providers`` Check that run_id is not accessed in providers
------------------------------------ ---------------------------------------------------------------- ------------
``check-xml`` Checks XML files with xmllint
------------------------------------ ---------------------------------------------------------------- ------------
``daysago-import-check`` Checks if daysago is properly imported
Expand Down
15 changes: 12 additions & 3 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ def set(
from airflow.models.dagrun import DagRun

if not exactly_one(execution_date is not None, run_id is not None):
raise ValueError("Exactly one of run_id or execution_date must be passed")
raise ValueError(
potiuk marked this conversation as resolved.
Show resolved Hide resolved
f"Exactly one of run_id or execution_date must be passed. "
f"Passed execution_date={execution_date}, run_id={run_id}"
)

if run_id is None:
message = "Passing 'execution_date' to 'XCom.set()' is deprecated. Use 'run_id' instead."
Expand Down Expand Up @@ -426,7 +429,10 @@ def get_many(
from airflow.models.dagrun import DagRun

if not exactly_one(execution_date is not None, run_id is not None):
raise ValueError("Exactly one of run_id or execution_date must be passed")
raise ValueError(
f"Exactly one of run_id or execution_date must be passed. "
f"Passed execution_date={execution_date}, run_id={run_id}"
)
if execution_date is not None:
message = "Passing 'execution_date' to 'XCom.get_many()' is deprecated. Use 'run_id' instead."
warnings.warn(message, PendingDeprecationWarning, stacklevel=3)
Expand Down Expand Up @@ -538,7 +544,10 @@ def clear(
raise TypeError("clear() missing required argument: task_id")

if not exactly_one(execution_date is not None, run_id is not None):
raise ValueError("Exactly one of run_id or execution_date must be passed")
raise ValueError(
f"Exactly one of run_id or execution_date must be passed. "
f"Passed execution_date={execution_date}, run_id={run_id}"
)

if execution_date is not None:
message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool
return {}

ti = context['ti']
run_id = getattr(ti, 'run_id') or context['run_id']
run_id = context['run_id']

labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
return self.log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=ti.run_id,
run_id=getattr(ti, "run_id", ""),
Copy link
Member

@kaxil kaxil Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work with an empty string as a default?

cc @jedcunningham @uranusjr

Copy link
Member Author

@potiuk potiuk Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only formatting hte log entry so I "guess" it produces proper log entry. But good point - I do not know if the es logs will be properly parsed by ES engine then (though having unparseable logs is still infintely better than crashing airflow in this case :D)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedcunningham @uranusjr 🙏 - I'd love to re-release providers asap as the ones we have install gitpython and wheel due to my sloppines :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jedcunningham @uranusjr @kaxil -> I'd really love to merge it and re-release the providers soon. The "extra packages" in the last list need to go away and the cncf.kuberbnetes starts to be a problem for users of Airflow 2.1 (likely this one https://apache-airflow.slack.com/archives/CCV3FV9KL/p1647936437392429)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved the PR but I still don't know about this and haven't dug deep

Copy link
Member Author

@potiuk potiuk Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. It certainly won't be worse than crashing when you try to send an elasticsearch log :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what 2.1 users would experience if they run this provider version.

data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
execution_date=execution_date,
Expand Down
1 change: 1 addition & 0 deletions breeze-complete
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ check-extras-order
check-hooks-apply
check-integrations
check-merge-conflict
check-ti-run-id-in-providers
check-xml
daysago-import-check
debug-statements
Expand Down
1 change: 1 addition & 0 deletions dev/breeze/src/airflow_breeze/pre_commit_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
'check-hooks-apply',
'check-integrations',
'check-merge-conflict',
'check-ti-run-id-in-providers',
'check-xml',
'codespell',
'daysago-import-check',
Expand Down
64 changes: 64 additions & 0 deletions scripts/ci/pre_commit/pre_commit_check_no_ti_run_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import sys
from pathlib import Path
from typing import List

from rich.console import Console

if __name__ not in ("__main__", "__mp_main__"):
raise SystemExit(
"This file is intended to be executed as an executable program. You cannot use it as a module."
f"To run this script, run the ./{__file__} command [FILE] ..."
)


console = Console(color_system="standard", width=200)

errors: List[str] = []

GET_ATTR_MATCHER = re.compile(r".*getattr\((ti|TI), ['\"]run_id['\"]\).*")
TI_RUN_ID_MATCHER = re.compile(r".*(ti|TI)\.run_id.*")


def _check_file(_file: Path):
lines = _file.read_text().splitlines()

for index, line in enumerate(lines):
if GET_ATTR_MATCHER.match(line) or TI_RUN_ID_MATCHER.match(line):
errors.append(
f"[red]In {_file}:{index} there is a forbidden construct "
f"(Airflow 2.2+ only):[/]\n\n"
f"{lines[index]}\n\n"
f"[yellow]You should not retrieve run_id from Task Instance in providers as it "
f"is not available in Airflow 2.1[/]\n\n"
f"Use one of: \n\n"
f" context['run_id']\n\n"
f" getattr(ti, 'run_id', '<DEFAULT>')\n\n"
)


if __name__ == '__main__':
for file in sys.argv[1:]:
_check_file(Path(file))
if errors:
console.print("[red]Found forbidden usage of TaskInstance's run_id in providers:[/]\n")
for error in errors:
console.print(f"{error}")
sys.exit(1)
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_context(task):
"task": task,
"ti": task_instance,
"task_instance": task_instance,
"run_id": "test",
}


Expand Down