Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"@task.virtualenv" cannot appear in a comment inside a @task.virtualenv task #34154

Closed
1 of 2 tasks
mziwisky opened this issue Sep 7, 2023 · 2 comments · Fixed by #38649
Closed
1 of 2 tasks

"@task.virtualenv" cannot appear in a comment inside a @task.virtualenv task #34154

mziwisky opened this issue Sep 7, 2023 · 2 comments · Fixed by #38649
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:bug This is a clearly a bug

Comments

@mziwisky
Copy link

mziwisky commented Sep 7, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

On Airflow 2.5.1 (on AWS MWAA), I ran this DAG:

from datetime import datetime, timedelta
from airflow.decorators import dag, task

@task.virtualenv(system_site_packages=True)
def test():
    print('This line gets printed')
    # @task.virtualenv
    print('This line does not')
    raise Exception("The task will succeed because this line won't run either")

@dag(
    start_date=datetime(2023, 9, 6),
    schedule="10 * * * *",
)
def bug_test():
    test()

the_dag = bug_test()

The test task runs and succeeds, but it only executes the code that precedes the line # @task.virtualenv. Everything after that line is ignored.

In general, as long as "@task.virtualenv" is in the comment, it will kill all lines after it. The comment could be # we're inside an @task.virtualenv or # [email protected], same effect. If the comment does not have exactly that string in it, e.g. # we're inside a task.virtualenv (no "@") or # we're inside an @ task.virtualenv then the lines after it get executed.

What you think should happen instead

All of the code in the task should get executed. Here's a log dump for a run of that task:

[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1083}} INFO - Dependencies all met for <TaskInstance: bug_test.test scheduled__2023-09-07T04:10:00+00:00 [queued]>
[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1083}} INFO - Dependencies all met for <TaskInstance: bug_test.test scheduled__2023-09-07T04:10:00+00:00 [queued]>
[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1279}} INFO - 
--------------------------------------------------------------------------------
[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1280}} INFO - Starting attempt 5 of 5
[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1281}} INFO - 
--------------------------------------------------------------------------------
[2023-09-07, 05:37:34 UTC] {{taskinstance.py:1300}} INFO - Executing <Task(_PythonVirtualenvDecoratedOperator): test> on 2023-09-07 04:10:00+00:00
[2023-09-07, 05:37:34 UTC] {{standard_task_runner.py:55}} INFO - Started process 10023 to run task
[2023-09-07, 05:37:34 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'bug_test', 'test', 'scheduled__2023-09-07T04:10:00+00:00', '--job-id', '2069253', '--raw', '--subdir', 'DAGS_FOLDER/bug.py', '--cfg-path', '/tmp/tmpz3vkh0z2']
[2023-09-07, 05:37:34 UTC] {{standard_task_runner.py:83}} INFO - Job 2069253: Subtask test
[2023-09-07, 05:37:34 UTC] {{task_command.py:388}} INFO - Running <TaskInstance: bug_test.test scheduled__2023-09-07T04:10:00+00:00 [running]> on host ip-10-42-8-187.us-west-2.compute.internal
[2023-09-07, 05:37:35 UTC] {{taskinstance.py:1507}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=bug_test
AIRFLOW_CTX_TASK_ID=test
AIRFLOW_CTX_EXECUTION_DATE=2023-09-07T04:10:00+00:00
AIRFLOW_CTX_TRY_NUMBER=5
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-09-07T04:10:00+00:00
[2023-09-07, 05:37:35 UTC] {{process_utils.py:179}} INFO - Executing cmd: /usr/bin/python3.10 -m virtualenv /tmp/venva253igp3 --system-site-packages
[2023-09-07, 05:37:35 UTC] {{process_utils.py:183}} INFO - Output:
[2023-09-07, 05:37:35 UTC] {{process_utils.py:187}} INFO - created virtual environment CPython3.10.8.final.0-64 in 387ms
[2023-09-07, 05:37:35 UTC] {{process_utils.py:187}} INFO -   creator CPython3Posix(dest=/tmp/venva253igp3, clear=False, no_vcs_ignore=False, global=True)
[2023-09-07, 05:37:35 UTC] {{process_utils.py:187}} INFO -   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/usr/local/airflow/.local/share/virtualenv)
[2023-09-07, 05:37:35 UTC] {{process_utils.py:187}} INFO -     added seed packages: pip==22.3.1, setuptools==65.6.3, wheel==0.38.4
[2023-09-07, 05:37:35 UTC] {{process_utils.py:187}} INFO -   activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
[2023-09-07, 05:37:36 UTC] {{process_utils.py:179}} INFO - Executing cmd: /tmp/venva253igp3/bin/pip install -r /tmp/venva253igp3/requirements.txt
[2023-09-07, 05:37:36 UTC] {{process_utils.py:183}} INFO - Output:
[2023-09-07, 05:37:43 UTC] {{logging_mixin.py:137}} WARNING - /usr/local/airflow/.local/lib/python3.10/site-packages/watchtower/__init__.py:349 WatchtowerWarning: Received empty message. Empty messages cannot be sent to CloudWatch Logs
[2023-09-07, 05:37:43 UTC] {{logging_mixin.py:137}} WARNING - Traceback (most recent call last):
[2023-09-07, 05:37:43 UTC] {{logging_mixin.py:137}} WARNING -   File "/usr/local/airflow/config/cloudwatch_logging.py", line 161, in emit
    self.sniff_errors(record)
[2023-09-07, 05:37:43 UTC] {{logging_mixin.py:137}} WARNING -   File "/usr/local/airflow/config/cloudwatch_logging.py", line 212, in sniff_errors
    if pattern.search(record.message):
[2023-09-07, 05:37:43 UTC] {{logging_mixin.py:137}} WARNING - AttributeError: 'LogRecord' object has no attribute 'message'
[2023-09-07, 05:37:43 UTC] {{process_utils.py:187}} INFO - [notice] A new release of pip available: 22.3.1 -> 23.2.1
[2023-09-07, 05:37:43 UTC] {{process_utils.py:187}} INFO - [notice] To update, run: /tmp/venva253igp3/bin/python -m pip install --upgrade pip
[2023-09-07, 05:37:43 UTC] {{process_utils.py:179}} INFO - Executing cmd: /tmp/venva253igp3/bin/python /tmp/venva253igp3/script.py /tmp/venva253igp3/script.in /tmp/venva253igp3/script.out /tmp/venva253igp3/string_args.txt
[2023-09-07, 05:37:43 UTC] {{process_utils.py:183}} INFO - Output:
[2023-09-07, 05:37:46 UTC] {{process_utils.py:187}} INFO - This line gets printed
[2023-09-07, 05:37:46 UTC] {{python.py:177}} INFO - Done. Returned value was: None
[2023-09-07, 05:37:46 UTC] {{taskinstance.py:1318}} INFO - Marking task as SUCCESS. dag_id=bug_test, task_id=test, execution_date=20230907T041000, start_date=20230907T053734, end_date=20230907T053746
[2023-09-07, 05:37:46 UTC] {{local_task_job.py:208}} INFO - Task exited with return code 0

How to reproduce

Described above

Operating System

Linux? it's AWS MWAA

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@mziwisky mziwisky added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 7, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 7, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Sep 7, 2023
@eladkal eladkal added area:core-operators Operators, Sensors and hooks within Core Airflow affected_version:2.5 Issues Reported for 2.5 labels Sep 8, 2023
@SamWheating
Copy link
Contributor

SamWheating commented Sep 15, 2023

I was able to replicase on main... this is a really funny bug which I think stems from this section of code:

def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:
"""
Remove @task or similar decorators as well as @setup and @teardown.
:param python_source: The python source code
:param task_decorator_name: the decorator name
"""
def _remove_task_decorator(py_source, decorator_name):
if decorator_name not in py_source:
return python_source
split = python_source.split(decorator_name)
before_decorator, after_decorator = split[0], split[1]
if after_decorator[0] == "(":
after_decorator = _balance_parens(after_decorator)
if after_decorator[0] == "\n":
after_decorator = after_decorator[1:]
return before_decorator + after_decorator
decorators = ["@setup", "@teardown", task_decorator_name]
for decorator in decorators:
python_source = _remove_task_decorator(python_source, decorator)
return python_source

Since if the decorator name is included in the code, the result of the split operation will be more than 2 items, and anything after the second use of the decorator will be ignored.

Knowing this, we can replicate the issue in other ways too - this task will also succeed:

def test_virtualenv():
    print('This line gets printed')
    # @setup @setup
    print('This line does not')
    raise Exception("The task will succeed because this line won't run either")

And I suspect other DAGs which subclass DecoratedOperator (@task.kubernetes, @task.docker) would act similarly.

We probably shouldn't be doing string manipulation on code anyways, could we maybe use some sort of actual AST parsing for this?

Anyways, I will keep investigating, feel free to assign to me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:core-operators Operators, Sensors and hooks within Core Airflow good first issue kind:bug This is a clearly a bug
Projects
None yet
4 participants