Skip to content

Commit

Permalink
Make XCom display as react json (#40640)
Browse files Browse the repository at this point in the history
* Enable proper JSON view in Xcom display as well

* Review feedback

* Implement a native endpoint for XCom API to prevent retrieval as Python JSON string

* Fix pytests
  • Loading branch information
jscheffl authored Jul 22, 2024
1 parent dc2f2dd commit 6c591e5
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 22 deletions.
13 changes: 11 additions & 2 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema
from airflow.api_connexion.schemas.xcom_schema import (
XComCollection,
xcom_collection_schema,
xcom_schema_native,
xcom_schema_string,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf
Expand Down Expand Up @@ -88,6 +93,7 @@ def get_xcom_entry(
xcom_key: str,
map_index: int = -1,
deserialize: bool = False,
stringify: bool = True,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get an XCom entry."""
Expand Down Expand Up @@ -119,4 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

return xcom_schema.dump(item)
if stringify:
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
24 changes: 22 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,19 @@ paths:
This parameter is not meaningful when using the default XCom backend.
*New in version 2.4.0*
- in: query
name: stringify
schema:
type: boolean
default: true
required: false
description: |
Whether to convert the XCom value to be a string. XCom values can be of Any data type.
If set to true (default) the Any value will be returned as string, e.g. a Python representation
of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.
*New in version 2.10.0*
responses:
"200":
description: Success.
Expand Down Expand Up @@ -3922,8 +3935,15 @@ components:
- type: object
properties:
value:
type: string
description: The value
anyOf:
- type: string
- type: number
- type: integer
- type: boolean
- type: array
items: {}
- type: object
description: The value(s),

# Python objects
# Based on
Expand Down
15 changes: 11 additions & 4 deletions airflow/api_connexion/schemas/xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@ class Meta:
dag_id = auto_field()


class XComSchema(XComCollectionItemSchema):
"""XCom schema."""
class XComSchemaNative(XComCollectionItemSchema):
"""XCom schema with native return type."""

value = auto_field()
value = fields.Raw()


class XComSchemaString(XComCollectionItemSchema):
"""XCom schema forced to be string."""

value = fields.String()


class XComCollection(NamedTuple):
Expand All @@ -60,6 +66,7 @@ class XComCollectionSchema(Schema):
total_entries = fields.Int()


xcom_schema = XComSchema()
xcom_schema_native = XComSchemaNative()
xcom_schema_string = XComSchemaString()
xcom_collection_item_schema = XComCollectionItemSchema()
xcom_collection_schema = XComCollectionSchema()
2 changes: 1 addition & 1 deletion airflow/www/static/js/api/useTaskXcom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export const useTaskXcomEntry = ({
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId)
.replace("_XCOM_KEY_", xcomKey),
{ params: { map_index: mapIndex } }
{ params: { map_index: mapIndex, stringify: false } }
),
{
enabled: !!xcomKey,
Expand Down
19 changes: 16 additions & 3 deletions airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* under the License.
*/

import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
import { Alert, AlertIcon, Spinner, Td, Tr } from "@chakra-ui/react";
import React from "react";

import { useTaskXcomEntry } from "src/api";
import ErrorAlert from "src/components/ErrorAlert";
import RenderedJsonField from "src/components/RenderedJsonField";
import type { Dag, DagRun, TaskInstance } from "src/types";

interface Props {
Expand Down Expand Up @@ -54,18 +55,30 @@ const XcomEntry = ({
tryNumber: tryNumber || 1,
});

let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
let content = null;
if (isLoading) {
content = <Spinner />;
} else if (error) {
content = <ErrorAlert error={error} />;
} else if (!xcom) {
} else if (!xcom || !xcom.value) {
content = (
<Alert status="info">
<AlertIcon />
No value found for XCom key
</Alert>
);
} else {
let xcomString = "";
if (typeof xcom.value !== "string") {
try {
xcomString = JSON.stringify(xcom.value);
} catch (e) {
// skip
}
} else {
xcomString = xcom.value as string;
}
content = <RenderedJsonField content={xcomString} />;
}

return (
Expand Down
18 changes: 16 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1599,8 +1599,13 @@ export interface components {
} & components["schemas"]["CollectionInfo"];
/** @description Full representations of XCom entry. */
XCom: components["schemas"]["XComCollectionItem"] & {
/** @description The value */
value?: string;
/** @description The value(s), */
value?: Partial<string> &
Partial<number> &
Partial<number> &
Partial<boolean> &
Partial<unknown[]> &
Partial<{ [key: string]: unknown }>;
};
/**
* @description DAG details.
Expand Down Expand Up @@ -4439,6 +4444,15 @@ export interface operations {
* *New in version 2.4.0*
*/
deserialize?: boolean;
/**
* Whether to convert the XCom value to be a string. XCom values can be of Any data type.
*
* If set to true (default) the Any value will be returned as string, e.g. a Python representation
* of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.
*
* *New in version 2.10.0*
*/
stringify?: boolean;
};
};
responses: {
Expand Down
38 changes: 33 additions & 5 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ def teardown_method(self) -> None:


class TestGetXComEntry(TestXComEndpoint):
def test_should_respond_200(self):
def test_should_respond_200_stringify(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
environ_overrides={"REMOTE_USER": "test"},
Expand All @@ -145,7 +145,33 @@ def test_should_respond_200(self):
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": "TEST_VALUE",
"value": "{'key': 'value'}",
}

def test_should_respond_200_native(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false",
environ_overrides={"REMOTE_USER": "test"},
)
assert 200 == response.status_code

current_data = response.json
current_data["timestamp"] = "TIMESTAMP"
assert current_data == {
"dag_id": dag_id,
"execution_date": execution_date,
"key": xcom_key,
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": {"key": "value"},
}

def test_should_raise_404_for_non_existent_xcom(self):
Expand Down Expand Up @@ -192,7 +218,9 @@ def test_should_raise_403_forbidden(self):
)
assert response.status_code == 403

def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, *, backend=XCom):
def _create_xcom_entry(
self, dag_id, run_id, execution_date, task_id, xcom_key, xcom_value="TEST_VALUE", *, backend=XCom
):
with create_session() as session:
dagrun = DagRun(
dag_id=dag_id,
Expand All @@ -207,7 +235,7 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key,
session.add(ti)
backend.set(
key=xcom_key,
value="TEST_VALUE",
value=xcom_value,
run_id=run_id,
task_id=task_id,
dag_id=dag_id,
Expand Down
6 changes: 3 additions & 3 deletions tests/api_connexion/schemas/test_xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
XComCollection,
xcom_collection_item_schema,
xcom_collection_schema,
xcom_schema,
xcom_schema_string,
)
from airflow.models import DagRun, XCom
from airflow.utils.dates import parse_execution_date
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_serialize(self, create_xcom, session):
value=pickle.dumps(b"test_binary"),
)
xcom_model = session.query(XCom).first()
deserialized_xcom = xcom_schema.dump(xcom_model)
deserialized_xcom = xcom_schema_string.dump(xcom_model)
assert deserialized_xcom == {
"key": "test_key",
"timestamp": self.default_time,
Expand All @@ -220,7 +220,7 @@ def test_deserialize(self):
"dag_id": "test_dag",
"value": b"test_binary",
}
result = xcom_schema.load(xcom_dump)
result = xcom_schema_string.load(xcom_dump)
assert result == {
"key": "test_key",
"timestamp": self.default_time_parsed,
Expand Down

0 comments on commit 6c591e5

Please sign in to comment.