Skip to content

Commit

Permalink
Add config to disable the *deserialize* API flag
Browse files Browse the repository at this point in the history
This flag is also *disabled* by default intentionally since it does not
always work and every Airflow setup should review the payload manually
to decide whether it's a good idea to enable the feature (it's usually
not).
  • Loading branch information
uranusjr committed Jun 29, 2023
1 parent 6b4350e commit 55ffffd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 10 deletions.
5 changes: 4 additions & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
from sqlalchemy.orm import Session

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema
from airflow.api_connexion.types import APIResponse
from airflow.models import DagRun as DR, XCom
from airflow.security import permissions
from airflow.settings import conf
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down Expand Up @@ -93,6 +94,8 @@ def get_xcom_entry(
) -> APIResponse:
"""Get an XCom entry."""
if deserialize:
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
raise BadRequest(detail="XCom deserialization is disabled in configuration.")
query = select(XCom, XCom.value)
else:
query = select(XCom)
Expand Down
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,15 @@ api:
version_added: 2.2.0
example: ~
default: ""
enable_xcom_deserialize_support:
description: |
Indicates whether the *xcomEntries* endpoint supports the *deserialize*
flag. If set to False, setting this flag in a request would result in a
400 Bad Request error.
type: boolean
version_added: 2.7.0
example: ~
default: "False"
lineage:
description: ~
options:
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,11 @@ access_control_allow_methods =
# Separate URLs with space.
access_control_allow_origins =

# Indicates whether the *xcomEntries* endpoint supports the *deserialize*
# flag. If set to False, setting this flag in a request would result in a
# 400 Bad Request error.
enable_xcom_deserialize_support = False

[lineage]
# what lineage backend to use
backend =
Expand Down
10 changes: 10 additions & 0 deletions newsfragments/32176.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
The ``xcomEntries`` API disables support for the ``deserialize`` flag by default

For security reasons, the ``/dags/*/dagRuns/*/taskInstances/*/xcomEntries/*``
API endpoint now disables the ``deserialize`` option to deserialize arbitrary
XCom values in the webserver. For backward compatibility, server admins may set
the ``[api] enable_xcom_deserialize_support`` config to *True* to enable the
flag and restore backward compatibility.

However, it is strongly advised to **not** enable the feature, and perform
deserialization at the client side instead.
55 changes: 46 additions & 9 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,61 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key,
)

@pytest.mark.parametrize(
"query, expected_value",
"allowed, query, expected_status_or_value",
[
pytest.param("?deserialize=true", "real deserialized TEST_VALUE", id="true"),
pytest.param("?deserialize=false", "orm deserialized TEST_VALUE", id="false"),
pytest.param("", "orm deserialized TEST_VALUE", id="default"),
pytest.param(
True,
"?deserialize=true",
"real deserialized TEST_VALUE",
id="true",
),
pytest.param(
False,
"?deserialize=true",
400,
id="disallowed",
),
pytest.param(
True,
"?deserialize=false",
"orm deserialized TEST_VALUE",
id="false-irrelevant",
),
pytest.param(
False,
"?deserialize=false",
"orm deserialized TEST_VALUE",
id="false",
),
pytest.param(
True,
"",
"orm deserialized TEST_VALUE",
id="default-irrelevant",
),
pytest.param(
False,
"",
"orm deserialized TEST_VALUE",
id="default",
),
],
)
@conf_vars({("core", "xcom_backend"): "tests.api_connexion.endpoints.test_xcom_endpoint.CustomXCom"})
def test_custom_xcom_deserialize(self, query, expected_value):
def test_custom_xcom_deserialize(self, allowed: bool, query: str, expected_status_or_value: int | str):
XCom = resolve_xcom_backend()
self._create_xcom_entry("dag", "run", utcnow(), "task", "key", backend=XCom)

url = f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/key{query}"
with mock.patch("airflow.api_connexion.endpoints.xcom_endpoint.XCom", XCom):
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})

assert response.status_code == 200
assert response.json["value"] == expected_value
with conf_vars({("api", "enable_xcom_deserialize_support"): str(allowed)}):
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})

if isinstance(expected_status_or_value, int):
assert response.status_code == expected_status_or_value
else:
assert response.status_code == 200
assert response.json["value"] == expected_status_or_value


class TestGetXComEntries(TestXComEndpoint):
Expand Down

0 comments on commit 55ffffd

Please sign in to comment.