Skip to content

Commit

Permalink
Data aware scheduling docs edits (apache#38687)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzdanski authored and Grégoire Rolland committed Apr 19, 2024
1 parent a06349e commit 4616ced
Showing 1 changed file with 32 additions and 33 deletions.
65 changes: 32 additions & 33 deletions docs/apache-airflow/authoring-and-scheduling/datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Data-aware scheduling
Quickstart
----------

In addition to scheduling DAGs based upon time, they can also be scheduled based upon a task updating a dataset.
In addition to scheduling DAGs based on time, you can also schedule DAGs to run based on when a task updates a dataset.

.. code-block:: python
Expand Down Expand Up @@ -51,38 +51,38 @@ In addition to scheduling DAGs based upon time, they can also be scheduled based
What is a "dataset"?
--------------------

An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream "producer" tasks, and dataset updates contribute to scheduling downstream "consumer" DAGs.
An Airflow dataset is a logical grouping of data. Upstream producer tasks can update datasets, and dataset updates contribute to scheduling downstream consumer DAGs.

A dataset is defined by a Uniform Resource Identifier (URI):
`Uniform Resource Identifier (URI) <https://en.wikipedia.org/wiki/Uniform_Resource_Identifier>`_ define datasets:

.. code-block:: python
from airflow.datasets import Dataset
example_dataset = Dataset("s3://dataset-bucket/example.csv")
Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string, so any use of regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg ``input_2022*.csv``) as an attempt to create multiple datasets from one declaration will not work.
Airflow makes no assumptions about the content or location of the data represented by the URI, and treats the URI like a string. This means that Airflow treats any regular expressions, like ``input_\d+.csv``, or file glob patterns, such as ``input_2022*.csv``, as an attempt to create multiple datasets from one declaration, and they will not work.

A dataset should be created with a valid URI. Airflow core and providers define various URI schemes that you can use, such as ``file`` (core), ``postgres`` (by the Postgres provider), and ``s3`` (by the Amazon provider). Third-party providers and plugins may also provide their own schemes. These pre-defined schemes have individual semantics that are expected to be followed.
You must create datasets with a valid URI. Airflow core and providers define various URI schemes that you can use, such as ``file`` (core), ``postgres`` (by the Postgres provider), and ``s3`` (by the Amazon provider). Third-party providers and plugins might also provide their own schemes. These pre-defined schemes have individual semantics that are expected to be followed.

What is valid URI?
------------------

Technically, the URI must conform to the valid character set in RFC 3986. If you don't know what this means, that's basically ASCII alphanumeric characters, plus ``%``, ``-``, ``_``, ``.``, and ``~``. To identify a resource that cannot be represented by URI-safe characters, encode the resource name with `percent-encoding <https://en.wikipedia.org/wiki/Percent-encoding>`_.
Technically, the URI must conform to the valid character set in RFC 3986, which is basically ASCII alphanumeric characters, plus ``%``, ``-``, ``_``, ``.``, and ``~``. To identify a resource that cannot be represented by URI-safe characters, encode the resource name with `percent-encoding <https://en.wikipedia.org/wiki/Percent-encoding>`_.

The URI is also case sensitive, so ``s3://example/dataset`` and ``s3://Example/Dataset`` are considered different. Note that the *host* part of the URI is also case sensitive, which differs from RFC 3986.

Do not use the ``airflow`` scheme, which is is reserved for Airflow's internals.

Airflow always prefers using lower cases in schemes, and case sensitivity is needed in the host part to correctly distinguish between resources.
Airflow always prefers using lower cases in schemes, and case sensitivity is needed in the host part of the URI to correctly distinguish between resources.

.. code-block:: python
# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")
If you wish to define datasets with a scheme without additional semantic constraints, use a scheme with the prefix ``x-``. Airflow will skip any semantic validation on URIs with such schemes.
If you want to define datasets with a scheme that doesn't include additional semantic constraints, use a scheme with the prefix ``x-``. Airflow skips any semantic validation on URIs with these schemes.

.. code-block:: python
Expand All @@ -99,10 +99,10 @@ The identifier does not have to be absolute; it can be a scheme-less, relative U
Non-absolute identifiers are considered plain strings that do not carry any semantic meanings to Airflow.

Extra information on Dataset
Extra information on dataset
----------------------------

If needed, an extra dictionary can be included in a Dataset:
If needed, you can include an extra dictionary in a dataset:

.. code-block:: python
Expand All @@ -128,12 +128,12 @@ This can be used to supply custom description to the dataset, such as who has ow
...,
)
.. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in dataset URIs or extra key values!
.. note:: **Security Note:** Dataset URI and extra fields are not encrypted, they are stored in cleartext in Airflow's metadata database. Do NOT store any sensitive values, especially credentials, in either dataset URIs or extra key values!

How to use datasets in your DAGs
--------------------------------

You can use datasets to specify data dependencies in your DAGs. Take the following example:
You can use datasets to specify data dependencies in your DAGs. The following example shows how after the ``producer`` task in the ``producer`` DAG successfully completes, Airflow schedules the ``consumer`` DAG. Airflow marks a dataset as ``updated`` only if the task completes successfully. If the task fails or if it is skipped, no update occurs, and Airflow doesn't schedule the ``consumer`` DAG.

.. code-block:: python
Expand All @@ -145,15 +145,14 @@ You can use datasets to specify data dependencies in your DAGs. Take the followi
with DAG(dag_id="consumer", schedule=[example_dataset], ...):
...
Once the ``producer`` task in the ``producer`` DAG has completed successfully, Airflow schedules the ``consumer`` DAG. A dataset will be marked as updated only if the task completes successfully — if the task fails or if it is skipped, no update occurs, and the ``consumer`` DAG will not be scheduled.
A listing of the relationships between datasets and DAGs can be found in the
You can find a listing of the relationships between datasets and DAGs in the
:ref:`Datasets View<ui:datasets-view>`

Multiple Datasets
-----------------

As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and the DAG will be scheduled once **all** datasets it consumes have been updated at least once since the last time it was run:
Because the ``schedule`` parameter is a list, DAGs can require multiple datasets. Airflow schedules a DAG after **all** datasets the DAG consumes have been updated at least once since the last time the DAG ran:

.. code-block:: python
Expand All @@ -169,7 +168,7 @@ As the ``schedule`` parameter is a list, DAGs can require multiple datasets, and
...
If one dataset is updated multiple times before all consumed datasets have been updated, the downstream DAG will still only be run once, as shown in this illustration:
If one dataset is updated multiple times before all consumed datasets update, the downstream DAG still only runs once, as shown in this illustration:

.. ::
ASCII art representation of this diagram
Expand Down Expand Up @@ -224,13 +223,13 @@ If one dataset is updated multiple times before all consumed datasets have been

}

Attaching extra information to an emitting Dataset Event
Attaching extra information to an emitting dataset event
--------------------------------------------------------

.. versionadded:: 2.10.0

A task with a dataset outlet can optionally attach extra information before it emits a dataset event. This is different
from `Extra information on Dataset`_. Extra information on a dataset statically describes the entity pointed to by the dataset URI; extra information on the *dataset event* instead should be used to annotate the triggering data change, such as how many rows in the database are changed by the update, or the date range covered by it.
from `Extra information on dataset`_. Extra information on a dataset statically describes the entity pointed to by the dataset URI; extra information on the *dataset event* instead should be used to annotate the triggering data change, such as how many rows in the database are changed by the update, or the date range covered by it.

The easiest way to attach extra information to the dataset event is by ``yield``-ing a ``Metadata`` object from a task:

Expand Down Expand Up @@ -263,10 +262,10 @@ Another way to achieve the same is by accessing ``dataset_events`` in a task's e
There's minimal magic here---Airflow simply writes the yielded values to the exact same accessor. This also works in classic operators, including ``execute``, ``pre_execute``, and ``post_execute``.


Fetching information from a Triggering Dataset Event
Fetching information from a triggering dataset event
----------------------------------------------------

A triggered DAG can fetch information from the Dataset that triggered it using the ``triggering_dataset_events`` template or parameter.
A triggered DAG can fetch information from the dataset that triggered it using the ``triggering_dataset_events`` template or parameter.
See more at :ref:`templates-ref`.

Example:
Expand Down Expand Up @@ -300,27 +299,27 @@ Example:
print_triggering_dataset_events()
Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one Dataset given to the DAG, and the first of one DatasetEvent for that Dataset. An implementation may be quite complex if you have multiple Datasets, potentially with multiple DatasetEvents.
Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one dataset given to the DAG, and the first of one DatasetEvent for that dataset. An implementation can be quite complex if you have multiple datasets, potentially with multiple DatasetEvents.

Advanced Dataset Scheduling with Conditional Expressions
Advanced dataset scheduling with conditional expressions
--------------------------------------------------------

Apache Airflow introduces advanced scheduling capabilities that leverage conditional expressions with datasets. This feature allows Airflow users to define complex dependencies for DAG executions based on dataset updates, using logical operators for more granular control over workflow triggers.
Apache Airflow includes advanced scheduling capabilities that use conditional expressions with datasets. This feature allows you to define complex dependencies for DAG executions based on dataset updates, using logical operators for more control on workflow triggers.

Logical Operators for Datasets
Logical operators for datasets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Airflow supports two logical operators for combining dataset conditions:

- **AND (``&``)**: Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
- **OR (``|``)**: Specifies that the DAG should be triggered when any one of the specified datasets is updated.
- **OR (``|``)**: Specifies that the DAG should be triggered when any of the specified datasets is updated.

These operators enable the expression of complex dataset update conditions, enhancing the dynamism and flexibility of Airflow workflows.
These operators enable you to configure your Airflow workflows to use more complex dataset update conditions, making them more dynamic and flexible.

Example Usage
Example Use
-------------

**Scheduling Based on Multiple Dataset Updates**
**Scheduling based on multiple dataset updates**

To schedule a DAG to run only when two specific datasets have both been updated, use the AND operator (``&``):

Expand All @@ -336,9 +335,9 @@ To schedule a DAG to run only when two specific datasets have both been updated,
):
...
**Scheduling Based on Any Dataset Update**
**Scheduling based on any dataset update**

To trigger a DAG execution when either of two datasets is updated, apply the OR operator (``|``):
To trigger a DAG execution when either one of two datasets is updated, apply the OR operator (``|``):

.. code-block:: python
Expand All @@ -364,11 +363,11 @@ For scenarios requiring more intricate conditions, such as triggering a DAG when
):
...
Combining Dataset and Time-Based Schedules
Combining dataset and time-based schedules
------------------------------------------

DatasetTimetable Integration
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
With the introduction of ``DatasetOrTimeSchedule``, it is now possible to schedule DAGs based on both dataset events and time-based schedules. This feature offers flexibility for scenarios where a DAG needs to be triggered by data updates as well as run periodically according to a fixed timetable.
You can schedule DAGs based on both dataset events and time-based schedules using ``DatasetOrTimeSchedule``. This allows you to create workflows when a DAG needs both to be triggered by data updates and run periodically according to a fixed timetable.

For more detailed information on ``DatasetOrTimeSchedule`` and its usage, refer to the corresponding section in :ref:`DatasetOrTimeSchedule <dataset-timetable-section>`.
For more detailed information on ``DatasetOrTimeSchedule``, refer to the corresponding section in :ref:`DatasetOrTimeSchedule <dataset-timetable-section>`.

0 comments on commit 4616ced

Please sign in to comment.