Skip to content

Commit

Permalink
Fix email_on_failure with render_template_as_native_obj (#22770)
Browse files Browse the repository at this point in the history
Co-authored-by: andyhuang <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
GitOrigin-RevId: d80d52acf14034b0adf00e45b0fbac6ac03ab593
  • Loading branch information
3 people authored and Cloud Composer Team committed Sep 12, 2024
1 parent 8faf9ab commit 26c8cc5
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def get_template_env(self) -> "jinja2.Environment":

dag = self.get_dag()
if dag:
return dag.get_template_env()
return dag.get_template_env(force_sandboxed=False)
return SandboxedEnvironment(cache_size=0)

def prepare_template(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()

def get_template_env(self) -> jinja2.Environment:
def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment:
"""Build a Jinja2 environment."""
# Collect directories to search for template files
searchpath = [self.folder]
Expand All @@ -1275,7 +1275,7 @@ def get_template_env(self) -> jinja2.Environment:
if self.jinja_environment_kwargs:
jinja_env_options.update(self.jinja_environment_kwargs)
env: jinja2.Environment
if self.render_template_as_native_obj:
if self.render_template_as_native_obj and not force_sandboxed:
env = airflow.templates.NativeEnvironment(**jinja_env_options)
else:
env = airflow.templates.SandboxedEnvironment(**jinja_env_options)
Expand Down
10 changes: 9 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sentry import Sentry
from airflow.stats import Stats
from airflow.templates import SandboxedEnvironment
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
from airflow.timetables.base import DataInterval
Expand Down Expand Up @@ -2191,7 +2192,14 @@ def get_email_subject_content(self, exception: BaseException) -> Tuple[str, str,
html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context)

else:
jinja_env = self.task.get_template_env()
# Use the DAG's get_template_env() to set force_sandboxed. Don't add
# the flag to the function on task object -- that function can be
# overridden, and adding a flag breaks backward compatibility.
dag = self.task.get_dag()
if dag:
jinja_env = dag.get_template_env(force_sandboxed=True)
else:
jinja_env = SandboxedEnvironment(cache_size=0)
jinja_context = self.get_template_context()
context_merge(jinja_context, additional_context)

Expand Down
14 changes: 14 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from airflow.security import permissions
from airflow.templates import NativeEnvironment, SandboxedEnvironment
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.simple import NullTimetable, OnceTimetable
from airflow.utils import timezone
Expand Down Expand Up @@ -473,6 +474,19 @@ def test_template_undefined(self):
jinja_env = dag.get_template_env()
assert jinja_env.undefined is jinja2.Undefined

@parameterized.expand(
[
(False, True, SandboxedEnvironment),
(False, False, SandboxedEnvironment),
(True, False, NativeEnvironment),
(True, True, SandboxedEnvironment),
],
)
def test_template_env(self, use_native_obj, force_sandboxed, expected_env):
dag = DAG("test-dag", render_template_as_native_obj=use_native_obj)
jinja_env = dag.get_template_env(force_sandboxed=force_sandboxed)
assert isinstance(jinja_env, expected_env)

def test_resolve_template_files_value(self):

with NamedTemporaryFile(suffix='.template') as f:
Expand Down
5 changes: 3 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,9 +1317,10 @@ def test_overwrite_params_with_dag_run_conf_none(self, create_task_instance):

assert params["override"] is False

@pytest.mark.parametrize("use_native_obj", [True, False])
@patch('airflow.models.taskinstance.send_email')
def test_email_alert(self, mock_send_email, dag_maker):
with dag_maker(dag_id='test_failure_email'):
def test_email_alert(self, mock_send_email, dag_maker, use_native_obj):
with dag_maker(dag_id='test_failure_email', render_template_as_native_obj=use_native_obj):
task = BashOperator(task_id='test_email_alert', bash_command='exit 1', email='to')
ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
Expand Down

0 comments on commit 26c8cc5

Please sign in to comment.