diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 830cedb51c840..81c0fe8f1237e 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -23,12 +23,13 @@ from sqlalchemy.orm import Session from airflow.api_connexion import security -from airflow.api_connexion.exceptions import NotFound +from airflow.api_connexion.exceptions import BadRequest, NotFound from airflow.api_connexion.parameters import check_limit, format_parameters from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema from airflow.api_connexion.types import APIResponse from airflow.models import DagRun as DR, XCom from airflow.security import permissions +from airflow.settings import conf from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.session import NEW_SESSION, provide_session @@ -93,6 +94,8 @@ def get_xcom_entry( ) -> APIResponse: """Get an XCom entry.""" if deserialize: + if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False): + raise BadRequest(detail="XCom deserialization is disabled in configuration.") query = select(XCom, XCom.value) else: query = select(XCom) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b6b160326523a..13f1926ae4e01 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1178,6 +1178,15 @@ api: version_added: 2.2.0 example: ~ default: "" + enable_xcom_deserialize_support: + description: | + Indicates whether the *xcomEntries* endpoint supports the *deserialize* + flag. If set to False, setting this flag in a request would result in a + 400 Bad Request error. + type: boolean + version_added: 2.7.0 + example: ~ + default: "False" lineage: description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 1fbf58f7bccde..701e9e8f77694 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -640,6 +640,11 @@ access_control_allow_methods = # Separate URLs with space. access_control_allow_origins = +# Indicates whether the *xcomEntries* endpoint supports the *deserialize* +# flag. If set to False, setting this flag in a request would result in a +# 400 Bad Request error. +enable_xcom_deserialize_support = False + [lineage] # what lineage backend to use backend = diff --git a/newsfragments/32176.significant.rst b/newsfragments/32176.significant.rst new file mode 100644 index 0000000000000..492d5f955d05a --- /dev/null +++ b/newsfragments/32176.significant.rst @@ -0,0 +1,10 @@ +The ``xcomEntries`` API disables support for the ``deserialize`` flag by default + +For security reasons, the ``/dags/*/dagRuns/*/taskInstances/*/xcomEntries/*`` +API endpoint now disables the ``deserialize`` option to deserialize arbitrary +XCom values in the webserver. For backward compatibility, server admins may set +the ``[api] enable_xcom_deserialize_support`` config to *True* to enable the +flag and restore backward compatibility. + +However, it is strongly advised to **not** enable the feature, and perform +deserialization at the client side instead. diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 45a5894ec6936..7cd82fdaa3897 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -183,24 +183,61 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, ) @pytest.mark.parametrize( - "query, expected_value", + "allowed, query, expected_status_or_value", [ - pytest.param("?deserialize=true", "real deserialized TEST_VALUE", id="true"), - pytest.param("?deserialize=false", "orm deserialized TEST_VALUE", id="false"), - pytest.param("", "orm deserialized TEST_VALUE", id="default"), + pytest.param( + True, + "?deserialize=true", + "real deserialized TEST_VALUE", + id="true", + ), + pytest.param( + False, + "?deserialize=true", + 400, + id="disallowed", + ), + pytest.param( + True, + "?deserialize=false", + "orm deserialized TEST_VALUE", + id="false-irrelevant", + ), + pytest.param( + False, + "?deserialize=false", + "orm deserialized TEST_VALUE", + id="false", + ), + pytest.param( + True, + "", + "orm deserialized TEST_VALUE", + id="default-irrelevant", + ), + pytest.param( + False, + "", + "orm deserialized TEST_VALUE", + id="default", + ), ], ) @conf_vars({("core", "xcom_backend"): "tests.api_connexion.endpoints.test_xcom_endpoint.CustomXCom"}) - def test_custom_xcom_deserialize(self, query, expected_value): + def test_custom_xcom_deserialize(self, allowed: bool, query: str, expected_status_or_value: int | str): XCom = resolve_xcom_backend() self._create_xcom_entry("dag", "run", utcnow(), "task", "key", backend=XCom) url = f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/key{query}" with mock.patch("airflow.api_connexion.endpoints.xcom_endpoint.XCom", XCom): - response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"}) - - assert response.status_code == 200 - assert response.json["value"] == expected_value + with conf_vars({("api", "enable_xcom_deserialize_support"): str(allowed)}): + response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"}) + + if isinstance(expected_status_or_value, int): + assert response.status_code == expected_status_or_value + else: + assert response.status_code == 200 + assert response.json["value"] == expected_status_or_value class TestGetXComEntries(TestXComEndpoint):