Skip to content

Commit

Permalink
Migrate public xcom endpoint get entry to fastapi (apache#43521)
Browse files Browse the repository at this point in the history
* Migrate public xcom get entry to fastapi

* Update date parse call

* add migration decorator, update params and test

* Minor adjustment

* Fix import and api spec

---------

Co-authored-by: pierrejeambrun <[email protected]>
  • Loading branch information
2 people authored and Lefteris Gilmaz committed Jan 5, 2025
1 parent 946b217 commit 2a1912e
Show file tree
Hide file tree
Showing 6 changed files with 530 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager
Expand Down Expand Up @@ -83,6 +84,7 @@ def get_xcom_entries(
return xcom_collection_schema.dump(XComCollection(xcom_entries=query, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.XCOM)
@provide_session
def get_xcom_entry(
Expand Down
49 changes: 49 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime
from typing import Any

from pydantic import BaseModel, field_validator


class XComResponse(BaseModel):
"""Serializer for a xcom item."""

key: str
timestamp: datetime
execution_date: datetime
map_index: int
task_id: str
dag_id: str


class XComResponseNative(XComResponse):
"""XCom response serializer with native return type."""

value: Any


class XComResponseString(XComResponse):
"""XCom response serializer with string return type."""

value: Any

@field_validator("value")
def value_to_string(cls, v):
return str(v) if v is not None else None
163 changes: 163 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,99 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
get:
tags:
- XCom
summary: Get Xcom Entry
description: Get an XCom entry.
operationId: get_xcom_entry
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: xcom_key
in: path
required: true
schema:
type: string
title: Xcom Key
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
- name: deserialize
in: query
required: false
schema:
type: boolean
default: false
title: Deserialize
- name: stringify
in: query
required: false
schema:
type: boolean
default: true
title: Stringify
responses:
'200':
description: Successful Response
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/XComResponseNative'
- $ref: '#/components/schemas/XComResponseString'
title: Response Get Xcom Entry
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
AppBuilderMenuItemResponse:
Expand Down Expand Up @@ -5134,3 +5227,73 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
XComResponseNative:
properties:
key:
type: string
title: Key
timestamp:
type: string
format: date-time
title: Timestamp
execution_date:
type: string
format: date-time
title: Execution Date
map_index:
type: integer
title: Map Index
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
value:
title: Value
type: object
required:
- key
- timestamp
- execution_date
- map_index
- task_id
- dag_id
- value
title: XComResponseNative
description: XCom response serializer with native return type.
XComResponseString:
properties:
key:
type: string
title: Key
timestamp:
type: string
format: date-time
title: Timestamp
execution_date:
type: string
format: date-time
title: Execution Date
map_index:
type: integer
title: Map Index
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
value:
title: Value
type: object
required:
- key
- timestamp
- execution_date
- map_index
- task_id
- dag_id
- value
title: XComResponseString
description: XCom response serializer with string return type.
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.api_fastapi.core_api.routes.public.task_instances import task_instances_router
from airflow.api_fastapi.core_api.routes.public.variables import variables_router
from airflow.api_fastapi.core_api.routes.public.version import version_router
from airflow.api_fastapi.core_api.routes.public.xcom import xcom_router

public_router = AirflowRouter(prefix="/public")

Expand All @@ -56,3 +57,4 @@
public_router.include_router(variables_router)
public_router.include_router(version_router)
public_router.include_router(dag_stats_router)
public_router.include_router(xcom_router)
88 changes: 88 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import copy

from fastapi import Depends, HTTPException
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.xcom import (
XComResponseNative,
XComResponseString,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf

xcom_router = AirflowRouter(
tags=["XCom"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries"
)


@xcom_router.get(
"/{xcom_key}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]),
)
def get_xcom_entry(
dag_id: str,
task_id: str,
dag_run_id: str,
xcom_key: str,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
deserialize: bool = False,
stringify: bool = True,
) -> XComResponseNative | XComResponseString:
"""Get an XCom entry."""
if deserialize:
if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False):
raise HTTPException(400, "XCom deserialization is disabled in configuration.")
query = select(XCom, XCom.value)
else:
query = select(XCom)
print()

query = query.where(
XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.key == xcom_key, XCom.map_index == map_index
)
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.run_id == DR.run_id))
query = query.where(DR.run_id == dag_run_id)

if deserialize:
item = session.execute(query).one_or_none()
else:
item = session.scalars(query).one_or_none()

if item is None:
raise HTTPException(404, f"XCom entry with key: `{xcom_key}` not found")

if deserialize:
xcom, value = item
xcom_stub = copy.copy(xcom)
xcom_stub.value = value
xcom_stub.value = XCom.deserialize_value(xcom_stub)
item = xcom_stub

if stringify or conf.getboolean("core", "enable_xcom_pickling"):
return XComResponseString.model_validate(item, from_attributes=True)

return XComResponseNative.model_validate(item, from_attributes=True)
Loading

0 comments on commit 2a1912e

Please sign in to comment.