From a4091973d8bffa71933f1130733e5456eceba99f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 9 Jul 2024 19:23:16 +0200 Subject: [PATCH 1/4] Enable proper JSON view in Xcom display as well --- .../js/dag/details/taskInstance/Xcom/XcomEntry.tsx | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx index 8523181ae2146..20e877e833207 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -17,11 +17,12 @@ * under the License. */ -import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react"; +import { Alert, AlertIcon, Spinner, Td, Tr } from "@chakra-ui/react"; import React from "react"; import { useTaskXcomEntry } from "src/api"; import ErrorAlert from "src/components/ErrorAlert"; +import RenderedJsonField from "src/components/RenderedJsonField"; import type { Dag, DagRun, TaskInstance } from "src/types"; interface Props { @@ -54,7 +55,7 @@ const XcomEntry = ({ tryNumber: tryNumber || 1, }); - let content = {xcom?.value}; + let content = null; if (isLoading) { content = ; } else if (error) { @@ -66,6 +67,10 @@ const XcomEntry = ({ No value found for XCom key ); + } else { + const xcomStr = xcom?.value as string; + const xcomStrR = xcomStr.replace(/'/g, '"'); + content = ; } return ( From d08c9409a5912eb05bcf6ff708d4afc9899a3d5f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 12 Jul 2024 21:59:34 +0200 Subject: [PATCH 2/4] Review feedback --- .../dag/details/taskInstance/Xcom/XcomEntry.tsx | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx index 20e877e833207..6ba2800fa0578 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -60,7 +60,7 @@ const XcomEntry = ({ content = ; } else if (error) { content = ; - } else if (!xcom) { + } else if (!xcom || !xcom.value) { content = ( @@ -68,9 +68,17 @@ const XcomEntry = ({ ); } else { - const xcomStr = xcom?.value as string; - const xcomStrR = xcomStr.replace(/'/g, '"'); - content = ; + // Note: + // The Airflow API delivers the XCom value as Python JSON dump + // with Python style quotes - which can not be parsed in JavaScript + // by default. + // Example: {'key': 'value'} + // JavaScript expects: {"key": "value"} + // So we attempt to replaces string quotes which in 90% of cases will work + // It will fail if embedded quotes are in quotes. Then the XCom will be + // rendered as string as fallback. Better ideas welcome here. + const xcomString = xcom.value.replace(/'/g, '"'); + content = ; } return ( From 953852e8c234eb0403fbbe68acb9f618ff83a307 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 16 Jul 2024 23:48:03 +0200 Subject: [PATCH 3/4] Implement a native endpoint for XCom API to prevent retrieval as Python JSON string --- .../api_connexion/endpoints/xcom_endpoint.py | 13 ++++++- airflow/api_connexion/openapi/v1.yaml | 24 +++++++++++- airflow/api_connexion/schemas/xcom_schema.py | 15 ++++++-- airflow/www/static/js/api/useTaskXcom.ts | 2 +- .../details/taskInstance/Xcom/XcomEntry.tsx | 20 +++++----- airflow/www/static/js/types/api-generated.ts | 18 ++++++++- .../endpoints/test_xcom_endpoint.py | 38 ++++++++++++++++--- 7 files changed, 104 insertions(+), 26 deletions(-) diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 68a9ab2d45f52..59fa9f5acaaa5 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -25,7 +25,12 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound from airflow.api_connexion.parameters import check_limit, format_parameters -from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema +from airflow.api_connexion.schemas.xcom_schema import ( + XComCollection, + xcom_collection_schema, + xcom_schema_native, + xcom_schema_string, +) from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models import DagRun as DR, XCom from airflow.settings import conf @@ -88,6 +93,7 @@ def get_xcom_entry( xcom_key: str, map_index: int = -1, deserialize: bool = False, + stringify: bool = True, session: Session = NEW_SESSION, ) -> APIResponse: """Get an XCom entry.""" @@ -119,4 +125,7 @@ def get_xcom_entry( stub.value = XCom.deserialize_value(stub) item = stub - return xcom_schema.dump(item) + if stringify: + return xcom_schema_string.dump(item) + + return xcom_schema_native.dump(item) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 6b5800f870413..ee4d82ea7815a 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1904,6 +1904,19 @@ paths: This parameter is not meaningful when using the default XCom backend. *New in version 2.4.0* + - in: query + name: stringify + schema: + type: boolean + default: true + required: false + description: | + Whether to convert the XCom value to be a string. XCom values can be of Any data type. + + If set to true (default) the Any value will be returned as string, e.g. a Python representation + of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored. + + *New in version 2.10.0* responses: "200": description: Success. @@ -3921,8 +3934,15 @@ components: - type: object properties: value: - type: string - description: The value + anyOf: + - type: string + - type: number + - type: integer + - type: boolean + - type: array + items: {} + - type: object + description: The value(s), # Python objects # Based on diff --git a/airflow/api_connexion/schemas/xcom_schema.py b/airflow/api_connexion/schemas/xcom_schema.py index 5894db8b1ad8b..625f05bd14597 100644 --- a/airflow/api_connexion/schemas/xcom_schema.py +++ b/airflow/api_connexion/schemas/xcom_schema.py @@ -40,10 +40,16 @@ class Meta: dag_id = auto_field() -class XComSchema(XComCollectionItemSchema): - """XCom schema.""" +class XComSchemaNative(XComCollectionItemSchema): + """XCom schema with native return type.""" - value = auto_field() + value = fields.Raw() + + +class XComSchemaString(XComCollectionItemSchema): + """XCom schema forced to be string.""" + + value = fields.String() class XComCollection(NamedTuple): @@ -60,6 +66,7 @@ class XComCollectionSchema(Schema): total_entries = fields.Int() -xcom_schema = XComSchema() +xcom_schema_native = XComSchemaNative() +xcom_schema_string = XComSchemaString() xcom_collection_item_schema = XComCollectionItemSchema() xcom_collection_schema = XComCollectionSchema() diff --git a/airflow/www/static/js/api/useTaskXcom.ts b/airflow/www/static/js/api/useTaskXcom.ts index 1faa19005a906..403233285eb11 100644 --- a/airflow/www/static/js/api/useTaskXcom.ts +++ b/airflow/www/static/js/api/useTaskXcom.ts @@ -63,7 +63,7 @@ export const useTaskXcomEntry = ({ .replace("_DAG_RUN_ID_", dagRunId) .replace("_TASK_ID_", taskId) .replace("_XCOM_KEY_", xcomKey), - { params: { map_index: mapIndex } } + { params: { map_index: mapIndex, stringify: false } } ), { enabled: !!xcomKey, diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx index 6ba2800fa0578..2e9ba769ae099 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -68,16 +68,16 @@ const XcomEntry = ({ ); } else { - // Note: - // The Airflow API delivers the XCom value as Python JSON dump - // with Python style quotes - which can not be parsed in JavaScript - // by default. - // Example: {'key': 'value'} - // JavaScript expects: {"key": "value"} - // So we attempt to replaces string quotes which in 90% of cases will work - // It will fail if embedded quotes are in quotes. Then the XCom will be - // rendered as string as fallback. Better ideas welcome here. - const xcomString = xcom.value.replace(/'/g, '"'); + let xcomString = ""; + if (typeof xcom.value !== "string") { + try { + xcomString = JSON.stringify(xcom.value); + } catch (e) { + // skip + } + } else { + xcomString = xcom.value as string; + } content = ; } diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 2a92c2c087a6d..97ec6a4e12a52 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1599,8 +1599,13 @@ export interface components { } & components["schemas"]["CollectionInfo"]; /** @description Full representations of XCom entry. */ XCom: components["schemas"]["XComCollectionItem"] & { - /** @description The value */ - value?: string; + /** @description The value(s), */ + value?: Partial & + Partial & + Partial & + Partial & + Partial & + Partial<{ [key: string]: unknown }>; }; /** * @description DAG details. @@ -4439,6 +4444,15 @@ export interface operations { * *New in version 2.4.0* */ deserialize?: boolean; + /** + * Whether to convert the XCom value to be a string. XCom values can be of Any data type. + * + * If set to true (default) the Any value will be returned as string, e.g. a Python representation + * of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored. + * + * *New in version 2.10.0* + */ + stringify?: boolean; }; }; responses: { diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 1e4dbb56780cf..318e97842d9e6 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -122,14 +122,14 @@ def teardown_method(self) -> None: class TestGetXComEntry(TestXComEndpoint): - def test_should_respond_200(self): + def test_should_respond_200_stringify(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" execution_date_parsed = parse_execution_date(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) - self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key) + self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", environ_overrides={"REMOTE_USER": "test"}, @@ -145,7 +145,33 @@ def test_should_respond_200(self): "task_id": task_id, "map_index": -1, "timestamp": "TIMESTAMP", - "value": "TEST_VALUE", + "value": "{'key': 'value'}", + } + + def test_should_respond_200_native(self): + dag_id = "test-dag-id" + task_id = "test-task-id" + execution_date = "2005-04-02T00:00:00+00:00" + xcom_key = "test-xcom-key" + execution_date_parsed = parse_execution_date(execution_date) + run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) + self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) + response = self.client.get( + f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert 200 == response.status_code + + current_data = response.json + current_data["timestamp"] = "TIMESTAMP" + assert current_data == { + "dag_id": dag_id, + "execution_date": execution_date, + "key": xcom_key, + "task_id": task_id, + "map_index": -1, + "timestamp": "TIMESTAMP", + "value": {"key": "value"}, } def test_should_raise_404_for_non_existent_xcom(self): @@ -192,7 +218,9 @@ def test_should_raise_403_forbidden(self): ) assert response.status_code == 403 - def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, *, backend=XCom): + def _create_xcom_entry( + self, dag_id, run_id, execution_date, task_id, xcom_key, xcom_value="TEST_VALUE", *, backend=XCom + ): with create_session() as session: dagrun = DagRun( dag_id=dag_id, @@ -207,7 +235,7 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, session.add(ti) backend.set( key=xcom_key, - value="TEST_VALUE", + value=xcom_value, run_id=run_id, task_id=task_id, dag_id=dag_id, From 5f5039a6531d9366ac7ffc9a05492b7a93bc8f87 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Wed, 17 Jul 2024 18:58:12 +0200 Subject: [PATCH 4/4] Fix pytests --- tests/api_connexion/schemas/test_xcom_schema.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/api_connexion/schemas/test_xcom_schema.py b/tests/api_connexion/schemas/test_xcom_schema.py index eb3220626dcef..7a10b7e7a47b7 100644 --- a/tests/api_connexion/schemas/test_xcom_schema.py +++ b/tests/api_connexion/schemas/test_xcom_schema.py @@ -25,7 +25,7 @@ XComCollection, xcom_collection_item_schema, xcom_collection_schema, - xcom_schema, + xcom_schema_string, ) from airflow.models import DagRun, XCom from airflow.utils.dates import parse_execution_date @@ -199,7 +199,7 @@ def test_serialize(self, create_xcom, session): value=pickle.dumps(b"test_binary"), ) xcom_model = session.query(XCom).first() - deserialized_xcom = xcom_schema.dump(xcom_model) + deserialized_xcom = xcom_schema_string.dump(xcom_model) assert deserialized_xcom == { "key": "test_key", "timestamp": self.default_time, @@ -220,7 +220,7 @@ def test_deserialize(self): "dag_id": "test_dag", "value": b"test_binary", } - result = xcom_schema.load(xcom_dump) + result = xcom_schema_string.load(xcom_dump) assert result == { "key": "test_key", "timestamp": self.default_time_parsed,