Skip to content

Commit

Permalink
Fix dag run conf encoding with non-JSON serializable values (#28777)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: eladkal <[email protected]>
  • Loading branch information
3 people authored Apr 3, 2023
1 parent 82c5a5f commit 8069b50
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
15 changes: 12 additions & 3 deletions airflow/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def loads(self, s: str | bytes, **kwargs):


class WebEncoder(json.JSONEncoder):
"""This encodes values into a web understandable format. There is no deserializer"""
"""This encodes values into a web understandable format. There is no deserializer.
This parses datetime, dates, Decimal and bytes. In order to parse the custom
classes and the other types, and since it's just to show the result in the UI,
we return repr(object) for everything else.
"""

def default(self, o: Any) -> Any:
if isinstance(o, datetime):
Expand All @@ -59,7 +64,11 @@ def default(self, o: Any) -> Any:
data = serialize(o)
if isinstance(data, dict) and DATA in data:
return data[DATA]

if isinstance(o, bytes):
try:
return o.decode("unicode_escape")
except UnicodeDecodeError:
return repr(o)
try:
data = serialize(o)
if isinstance(data, dict) and CLASSNAME in data:
Expand All @@ -71,7 +80,7 @@ def default(self, o: Any) -> Any:
return data[DATA]
return data
except TypeError:
raise
return repr(o)


class XComEncoder(json.JSONEncoder):
Expand Down
12 changes: 8 additions & 4 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,28 @@ def get_mapped_summary(parent_instance, task_instances):
}


def get_dag_run_conf(dag_run_conf: Any) -> tuple[str | None, bool]:
def get_dag_run_conf(
dag_run_conf: Any, *, json_encoder: type[json.JSONEncoder] = json.JSONEncoder
) -> tuple[str | None, bool]:
conf: str | None = None

conf_is_json: bool = False
if isinstance(dag_run_conf, str):
conf = dag_run_conf
elif isinstance(dag_run_conf, (dict, list)) and any(dag_run_conf):
conf = json.dumps(dag_run_conf, sort_keys=True)
conf = json.dumps(dag_run_conf, sort_keys=True, cls=json_encoder, ensure_ascii=False)
conf_is_json = True

return conf, conf_is_json


def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
def encode_dag_run(
dag_run: DagRun | None, *, json_encoder: type[json.JSONEncoder] = json.JSONEncoder
) -> dict[str, Any] | None:
if not dag_run:
return None

conf, conf_is_json = get_dag_run_conf(dag_run.conf)
conf, conf_is_json = get_dag_run_conf(dag_run.conf, json_encoder=json_encoder)

return {
"run_id": dag_run.run_id,
Expand Down
4 changes: 3 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3642,7 +3642,9 @@ def grid_data(self):
query = query.filter(DagRun.state == run_state)

dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
encoded_runs = [
wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs
]
data = {
"groups": dag_to_grid(dag, dag_runs, session),
"dag_runs": encoded_runs,
Expand Down
17 changes: 17 additions & 0 deletions tests/www/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from bs4 import BeautifulSoup

from airflow.utils import json as utils_json
from airflow.www import utils
from airflow.www.utils import wrapped_markdown

Expand Down Expand Up @@ -226,6 +227,22 @@ def test_markdown_none(self):
rendered = self.attr_renderer["doc_md"](None)
assert rendered is None

def test_get_dag_run_conf(self):
dag_run_conf = {
"1": "string",
"2": b"bytes",
"3": 123,
"4": "à".encode("latin"),
"5": datetime(2023, 1, 1),
}
expected_encoded_dag_run_conf = (
'{"1": "string", "2": "bytes", "3": 123, "4": "à", "5": "2023-01-01T00:00:00+00:00"}'
)
encoded_dag_run_conf, conf_is_json = utils.get_dag_run_conf(
dag_run_conf, json_encoder=utils_json.WebEncoder
)
assert expected_encoded_dag_run_conf == encoded_dag_run_conf


class TestWrappedMarkdown:
def test_wrapped_markdown_with_docstring_curly_braces(self):
Expand Down

0 comments on commit 8069b50

Please sign in to comment.