From 6fd888eda7cdc5acbb95d78bdf8f5aedb36defcc Mon Sep 17 00:00:00 2001 From: Kim Date: Sun, 13 Oct 2024 23:43:47 -0600 Subject: [PATCH] Docs: Add templating info to TaskFlow tutorial (#42992) (cherry picked from commit 437616df18d0be3644a88d24c684b50afe3933ba) --- .../sql/tutorial_taskflow_template.sql | 23 ++++ .../tutorial_taskflow_templates.py | 107 ++++++++++++++++++ docs/apache-airflow/tutorial/taskflow.rst | 56 +++++++++ 3 files changed, 186 insertions(+) create mode 100644 airflow/example_dags/sql/tutorial_taskflow_template.sql create mode 100644 airflow/example_dags/tutorial_taskflow_templates.py diff --git a/airflow/example_dags/sql/tutorial_taskflow_template.sql b/airflow/example_dags/sql/tutorial_taskflow_template.sql new file mode 100644 index 0000000000000..375c39eac610b --- /dev/null +++ b/airflow/example_dags/sql/tutorial_taskflow_template.sql @@ -0,0 +1,23 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +select * from test_data +where 1=1 + and run_id = '{{ run_id }}' + and something_else = '{{ params.foobar }}' diff --git a/airflow/example_dags/tutorial_taskflow_templates.py b/airflow/example_dags/tutorial_taskflow_templates.py new file mode 100644 index 0000000000000..925f60524b5ea --- /dev/null +++ b/airflow/example_dags/tutorial_taskflow_templates.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +# [START tutorial] +# [START import_module] +import pendulum + +from airflow.decorators import dag, task +from airflow.operators.python import get_current_context + +# [END import_module] + + +# [START instantiate_dag] +@dag( + schedule="@daily", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], + params={"foobar": "param_from_dag", "other_param": "from_dag"}, +) +def tutorial_taskflow_templates(): + """ + ### TaskFlow API Tutorial Documentation + This is a simple data pipeline example which demonstrates the use of + the templates in the TaskFlow API. + Documentation that goes along with the Airflow TaskFlow API tutorial is + located + [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) + """ + # [END instantiate_dag] + + # [START template_test] + @task( + # Causes variables that end with `.sql` to be read and templates + # within to be rendered. + templates_exts=[".sql"], + ) + def template_test(sql, test_var, data_interval_end): + context = get_current_context() + + # Will print... + # select * from test_data + # where 1=1 + # and run_id = 'scheduled__2024-10-09T00:00:00+00:00' + # and something_else = 'param_from_task' + print(f"sql: {sql}") + + # Will print `scheduled__2024-10-09T00:00:00+00:00` + print(f"test_var: {test_var}") + + # Will print `2024-10-10 00:00:00+00:00`. + # Note how we didn't pass this value when calling the task. Instead + # it was passed by the decorator from the context + print(f"data_interval_end: {data_interval_end}") + + # Will print... + # run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag + template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}" + rendered_template = context["task"].render_template( + template_str, + context, + ) + print(f"rendered template: {rendered_template}") + + # Will print the full context dict + print(f"context: {context}") + + # [END template_test] + + # [START main_flow] + template_test.override( + # Will be merged with the dict defined in the dag + # and override existing parameters. + # + # Must be passed into the decorator's parameters + # through `.override()` not into the actual task + # function + params={"foobar": "param_from_task"}, + )( + sql="sql/test.sql", + test_var="{{ run_id }}", + ) + # [END main_flow] + + +# [START dag_invocation] +tutorial_taskflow_templates() +# [END dag_invocation] + +# [END tutorial] diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst index c77debab8f328..892c3bc4635e4 100644 --- a/docs/apache-airflow/tutorial/taskflow.rst +++ b/docs/apache-airflow/tutorial/taskflow.rst @@ -629,6 +629,62 @@ method. Current context is accessible only during the task execution. The context is not accessible during ``pre_execute`` or ``post_execute``. Calling this method outside execution context will raise an error. +Using templates in decorated tasks +---------------------------------------------- + +Arguments passed to your decorated function are automatically templated. + +You can also use the ``templates_exts`` parameter to template entire files. + +.. code-block:: python + + @task(templates_exts=[".sql"]) + def template_test(sql): + print(f"sql: {sql}") + + + template_test(sql="sql/test.sql") + +This will read the content of ``sql/test.sql`` and replace all template variables. You can also pass a list of files and all of them will be templated. + +You can pass additional parameters to the template engine through `the params parameter `_. + +However, the ``params`` parameter must be passed to the decorator and not to your function directly, such as ``@task(templates_exts=['.sql'], params={'my_param'})`` and can then be used with ``{{ params.my_param }}`` in your templated files and function parameters. + +Alternatively, you can also pass it using the ``.override()`` method: + +.. code-block:: python + + @task() + def template_test(input_var): + print(f"input_var: {input_var}") + + + template_test.override(params={"my_param": "wow"})( + input_var="my param is: {{ params.my_param }}", + ) + +Finally, you can also manually render templates: + +.. code-block:: python + + @task(params={"my_param": "wow"}) + def template_test(): + template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}" + + context = get_current_context() + rendered_template = context["task"].render_template( + template_str, + context, + ) + +Here is a full example that demonstrates everything above: + +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_templates.py + :language: python + :start-after: [START tutorial] + :end-before: [END tutorial] + Conditionally skipping tasks ----------------------------