Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Override default_args between Nested TaskGroups #31604

Closed
2 tasks done
hyeong10 opened this issue May 30, 2023 · 1 comment · Fixed by #31608
Closed
2 tasks done

Override default_args between Nested TaskGroups #31604

hyeong10 opened this issue May 30, 2023 · 1 comment · Fixed by #31608
Labels
affected_version:main_branch Issues Reported for main branch area:core kind:bug This is a clearly a bug

Comments

@hyeong10
Copy link

hyeong10 commented May 30, 2023

What do you see as an issue?

Hello!

I don't know if this is intended, but default_args is not an override when using nested TaskGroups.

def callback_in_dag(context: Context):
    print("DAG!")

def callback_in_task_group(context: Context):
    print("Parent TaskGroup!")

with DAG(
    "some_dag_id",
    default_args={
        "on_failure_callback": callback_in_dag
    },
    schedule=None,
    start_date=datetime(2023, 1, 1)
) as dag:
    with TaskGroup(
        "parent_tg", 
        default_args={
            "on_failure_callback": callback_in_task_group
        }
    ) as parent_tg:
        with TaskGroup("child_tg") as child_tg:
            BashOperator(task_id="task_1", bash_command="nooooo_command")

I want the result to be "Parent TaskGroup!", but I get "DAG!".

[2023-05-30, 10:38:52 KST] {logging_mixin.py:137} INFO - DAG!

Solving the problem

Add _update_default_args like link

airflow/utils/task_group.py

...
class TaskGroup(DAGNode):
    def __init__(...):
        ...
        self.default_args = copy.deepcopy(default_args or {})

        # Call 'self._update_default_args' when exists parent_group
        if parent_group is not None:
            self._update_default_args(parent_group)
        ...
    ...
    # Update self.default_args
    def _update_default_args(parent_group: TaskGroup):
        if parent_group.default_args:
            self.default_args.update(parent_group.default_args) 
...

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@hyeong10 hyeong10 added kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet labels May 30, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented May 30, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:main_branch Issues Reported for main branch area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants