Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to disable the 'deserialize' XCom API flag #32176

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
from sqlalchemy.orm import Session

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema
from airflow.api_connexion.types import APIResponse
from airflow.models import DagRun as DR, XCom
from airflow.security import permissions
from airflow.settings import conf
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down Expand Up @@ -93,6 +94,8 @@ def get_xcom_entry(
) -> APIResponse:
"""Get an XCom entry."""
if deserialize:
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
raise BadRequest(detail="XCom deserialization is disabled in configuration.")
query = select(XCom, XCom.value)
else:
query = select(XCom)
Expand Down
9 changes: 9 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,15 @@ api:
version_added: 2.2.0
example: ~
default: ""
enable_xcom_deserialize_support:
description: |
Indicates whether the *xcomEntries* endpoint supports the *deserialize*
flag. If set to False, setting this flag in a request would result in a
400 Bad Request error.
type: boolean
version_added: 2.7.0
example: ~
default: "False"
lineage:
description: ~
options:
Expand Down
5 changes: 5 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,11 @@ access_control_allow_methods =
# Separate URLs with space.
access_control_allow_origins =

# Indicates whether the *xcomEntries* endpoint supports the *deserialize*
# flag. If set to False, setting this flag in a request would result in a
# 400 Bad Request error.
enable_xcom_deserialize_support = False

[lineage]
# what lineage backend to use
backend =
Expand Down
10 changes: 10 additions & 0 deletions newsfragments/32176.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
The ``xcomEntries`` API disables support for the ``deserialize`` flag by default

For security reasons, the ``/dags/*/dagRuns/*/taskInstances/*/xcomEntries/*``
API endpoint now disables the ``deserialize`` option to deserialize arbitrary
XCom values in the webserver. For backward compatibility, server admins may set
the ``[api] enable_xcom_deserialize_support`` config to *True* to enable the
flag and restore backward compatibility.

However, it is strongly advised to **not** enable the feature, and perform
deserialization at the client side instead.
55 changes: 46 additions & 9 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,61 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key,
)

@pytest.mark.parametrize(
"query, expected_value",
"allowed, query, expected_status_or_value",
[
pytest.param("?deserialize=true", "real deserialized TEST_VALUE", id="true"),
pytest.param("?deserialize=false", "orm deserialized TEST_VALUE", id="false"),
pytest.param("", "orm deserialized TEST_VALUE", id="default"),
pytest.param(
True,
"?deserialize=true",
"real deserialized TEST_VALUE",
id="true",
),
pytest.param(
False,
"?deserialize=true",
400,
id="disallowed",
),
pytest.param(
True,
"?deserialize=false",
"orm deserialized TEST_VALUE",
id="false-irrelevant",
),
pytest.param(
False,
"?deserialize=false",
"orm deserialized TEST_VALUE",
id="false",
),
pytest.param(
True,
"",
"orm deserialized TEST_VALUE",
id="default-irrelevant",
),
pytest.param(
False,
"",
"orm deserialized TEST_VALUE",
id="default",
),
],
)
@conf_vars({("core", "xcom_backend"): "tests.api_connexion.endpoints.test_xcom_endpoint.CustomXCom"})
def test_custom_xcom_deserialize(self, query, expected_value):
def test_custom_xcom_deserialize(self, allowed: bool, query: str, expected_status_or_value: int | str):
XCom = resolve_xcom_backend()
self._create_xcom_entry("dag", "run", utcnow(), "task", "key", backend=XCom)

url = f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/key{query}"
with mock.patch("airflow.api_connexion.endpoints.xcom_endpoint.XCom", XCom):
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})

assert response.status_code == 200
assert response.json["value"] == expected_value
with conf_vars({("api", "enable_xcom_deserialize_support"): str(allowed)}):
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})

if isinstance(expected_status_or_value, int):
assert response.status_code == expected_status_or_value
else:
assert response.status_code == 200
assert response.json["value"] == expected_status_or_value


class TestGetXComEntries(TestXComEndpoint):
Expand Down