Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for task group mapping #28001

Merged
merged 4 commits into from
Dec 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 131 additions & 36 deletions docs/apache-airflow/concepts/dynamic-task-mapping.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,32 @@ The grid view also provides visibility into your mapped tasks in the details pan

Although we show a "reduce" task here (``sum_it``) you don't have to have one, the mapped tasks will still be executed even if they have no downstream tasks.

Repeated Mapping
================
Task-generated Mapping
----------------------

The above examples we've shown could all be achieved with a ``for`` loop in the DAG file, but the real power of dynamic task mapping comes from being able to have a task generate the list to iterate over.

.. code-block:: python

@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
print(arg)


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())

The ``make_list`` task runs as a normal task and must return a list or dict (see `What data types can be expanded?`_), and then the ``consumer`` task will be called four times, once with each value in the return of ``make_list``.

Repeated mapping
----------------

The result of one mapped task can also be used as input to the next mapped task.

Expand All @@ -117,10 +141,10 @@ The result of one mapped task can also be used as input to the next mapped task.

This would have a result of ``[3, 4, 5]``.

Constant parameters
===================
Adding parameters that do not expand
------------------------------------

As well as passing arguments that get expanded at run-time, it is possible to pass arguments that don't changein order to clearly differentiate between the two kinds we use different functions, ``expand()`` for mapped arguments, and ``partial()`` for unmapped ones.
As well as passing arguments that get expanded at run-time, it is possible to pass arguments that don't change---in order to clearly differentiate between the two kinds we use different functions, ``expand()`` for mapped arguments, and ``partial()`` for unmapped ones.

.. code-block:: python

Expand All @@ -140,7 +164,7 @@ This would result in values of 11, 12, and 13.
This is also useful for passing things such as connection IDs, database table names, or bucket names to tasks.

Mapping over multiple parameters
================================
--------------------------------

As well as a single parameter it is possible to pass multiple parameters to expand. This will have the effect of creating a "cross product", calling the mapped task with each combination of parameters.

Expand All @@ -162,30 +186,6 @@ As well as a single parameter it is possible to pass multiple parameters to expa

This would result in the add task being called 6 times. Please note however that the order of expansion is not guaranteed.

Task-generated Mapping
======================

Up until now the examples we've shown could all be achieved with a ``for`` loop in the DAG file, but the real power of dynamic task mapping comes from being able to have a task generate the list to iterate over.

.. code-block:: python

@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
print(arg)


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())

The ``make_list`` task runs as a normal task and must return a list or dict (see `What data types can be expanded?`_), and then the ``consumer`` task will be called four times, once with each value in the return of ``make_list``.

Mapping with non-TaskFlow operators
===================================

Expand Down Expand Up @@ -216,9 +216,9 @@ If you want to map over the result of a classic operator, you should explicitly


Mixing TaskFlow and classic operators
=====================================
-------------------------------------

In this example you have a regular data delivery to an S3 bucket and want to apply the same processing to every file that arrives, no matter how many arrive each time.
In this example, you have a regular data delivery to an S3 bucket and want to apply the same processing to every file that arrives, no matter how many arrive each time.

.. code-block:: python

Expand Down Expand Up @@ -296,8 +296,103 @@ Similar to ``expand``, you can also map against a XCom that returns a list of di
task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)

Filtering items from an expanded task
=====================================
Mapping over a task group
=========================

Similar to a TaskFlow task, you can also call either ``expand`` or ``expand_kwargs`` on a ``@task_group``-decorated function to create a mapped task group:

.. note:: Implementations of individual tasks in this section are omitted for brevity.

.. code-block:: python

@task_group
def file_transforms(filename):
return convert_to_yaml(filename)


file_transforms.expand(filename=["data1.json", "data2.json"])

In the above example, task ``convert_to_yaml`` is expanded into two task instances at runtime. The first expanded would receive ``"data1.json"`` as input, and the second ``"data2.json"``.

Value references in a task group function
-----------------------------------------

One important distinction between a task function (``@task``) and a task *group* function (``@task_group``) is, since a task group does not have an associated worker, code in a task group function cannot resolve arguments passed into it; the real value and is only resolved when the reference is passed into a task.

For example, this code will *not* work:

.. code-block:: python

@task
def my_task(value):
print(value)


@task_group
def my_group(value):
if not value: # DOES NOT work as you'd expect!
task_a = EmptyOperator(...)
else:
task_a = PythonOperator(...)
task_a << my_task(value)


my_group.expand(value=[0, 1, 2])

When code in ``my_group`` is executed, ``value`` would still only be a reference, not the real value, so the ``if not value`` branch will not work as you likely want. However, if you pass that reference into a task, it will become resolved when the task is executed, and the three ``my_task`` instances will therefore receive 1, 2, and 3, respectively.

It is, therefore, important to remember that, if you intend to perform any logic to a value passed into a task group function, you must always use a task to run the logic, such as ``@task.branch`` (or ``BranchPythonOperator``) for conditions, and task mapping methods for loops.

.. note:: Task-mapping in a mapped task group is not permitted

It is not currently permitted to do task mapping nested inside a mapped task group. While the technical aspect of this feature is not particularly difficult, we have decided to intentionally omit this feature since it adds considerable UI complexities, and may not be necessary for general use cases. This restriction may be revisited in the future depending on user feedback.

Depth-first execution
---------------------

If a mapped task group contains multiple tasks, all tasks in the group are expanded "together" against the same inputs. For example:

.. code-block:: python

@task_group
def file_transforms(filename):
converted = convert_to_yaml(filename)
return replace_defaults(converted)


file_transforms.expand(filename=["data1.json", "data2.json"])

Since the group ``file_transforms`` is expanded into two, tasks ``convert_to_yaml`` and ``replace_defaults`` will each become two instances at runtime.

A similar effect can be achieved by expanding the two tasks separately like so:

.. code-block:: python

converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)

The difference, however, is that a task group allows each task inside to only depend on its "relevant inputs". For the above example, the ``replace_defaults`` would only depend on ``convert_to_yaml`` of the same expanded group, not instances of the same task, but in a different group. This strategy, called *depth-first execution* (in contrast to the simple group-less *breath-first execution*), allows for more logical task separation, fine-grained dependency rules, and accurate resource allocation---using the above example, the first ``replace_defaults`` would be able to run before ``convert_to_yaml("data2.json")`` is done, and does not need to care about whether it succeeds or not.

Depending on a mapped task group's output
-----------------------------------------

Similar to a mapped task group, depending on a mapped task group's output would also automatically aggregate the group's results:

.. code-block:: python

@task_group
def add_to(value):
value = add_one(value)
return double(value)


results = add_to.expand(value=[1, 2, 3])
consumer(results) # Will receive [4, 6, 8].

It is also possible to perform any operations as results from a normal mapped task.

Filtering items from a mapped task
==================================

A mapped task can remove any elements from being passed on to its downstream tasks by returning ``None``. For example, if we want to *only* copy files from an S3 bucket to another with certain extensions, we could implement ``create_copy_kwargs`` like this instead:

Expand All @@ -319,8 +414,8 @@ A mapped task can remove any elements from being passed on to its downstream tas

This makes ``copy_files`` only expand against ``.json`` and ``.yml`` files, while ignoring the rest.

Transforming mapped data
========================
Transforming expanding data
===========================

Since it is common to want to transform the output data format for task mapping, especially from a non-TaskFlow operator, where the output format is pre-determined and cannot be easily converted (such as ``create_copy_kwargs`` in the above example), a special ``map()`` function can be used to easily perform this kind of transformation. The above example can therefore be modified like this:

Expand Down