Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate Modify Dag Run endpoint to FastAPI #42973

Merged
merged 24 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e50c43e
add modify_dag_run
rawwar Oct 13, 2024
4a62ed5
add tests
rawwar Oct 13, 2024
c668f3d
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 13, 2024
e1fa0d5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 15, 2024
abf01d9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 15, 2024
444956d
Update airflow/api_fastapi/views/public/dag_run.py
rawwar Oct 15, 2024
69b1439
fix
rawwar Oct 15, 2024
1b93b83
merge conflict fix
rawwar Oct 16, 2024
521c6fc
Update airflow/api_fastapi/routes/public/dag_run.py
rawwar Oct 16, 2024
7e00f2f
Update airflow/api_fastapi/serializers/dag_run.py
rawwar Oct 16, 2024
8956fb9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 16, 2024
e37b2b5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
70b7ea5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
2614e45
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
816dcb8
use dagbag
rawwar Oct 19, 2024
f1dad1c
replace patch with put
rawwar Oct 19, 2024
28b6db8
refactor
rawwar Oct 19, 2024
7d66c48
use put in tests
rawwar Oct 19, 2024
07090a9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 29, 2024
63943b5
modify to patch
rawwar Oct 29, 2024
7802d8f
add update_mask
rawwar Oct 29, 2024
da21c06
refactor update to patch
rawwar Oct 29, 2024
a23fe1f
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 29, 2024
afdffbd
Merge branch 'main' into kalyan/AIP-84/modify_dag_run
rawwar Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@provide_session
@action_logging
Expand Down
89 changes: 89 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,78 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- DagRun
summary: Patch Dag Run State
description: Modify a DAG Run.
operationId: patch_dag_run_state
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: update_mask
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Update Mask
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunPatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -2079,6 +2151,23 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunPatchBody:
properties:
state:
$ref: '#/components/schemas/DAGRunPatchStates'
type: object
required:
- state
title: DAGRunPatchBody
description: DAG Run Serializer for PATCH requests.
DAGRunPatchStates:
type: string
enum:
- queued
- success
- failed
title: DAGRunPatchStates
description: Enum for DAG Run states when updating a DAG Run.
DAGRunResponse:
properties:
run_id:
Expand Down
57 changes: 54 additions & 3 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

from __future__ import annotations

from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, Query, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api.common.mark_tasks import (
set_dag_run_state_to_failed,
set_dag_run_state_to_queued,
set_dag_run_state_to_success,
)
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.models import DagRun
from airflow.api_fastapi.core_api.serializers.dag_run import (
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
)
from airflow.models import DAG, DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")

Expand Down Expand Up @@ -57,3 +66,45 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio
)

session.delete(dag_run)


@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
async def patch_dag_run_state(
dag_id: str,
dag_run_id: str,
patch_body: DAGRunPatchBody,
session: Annotated[Session, Depends(get_session)],
request: Request,
update_mask: list[str] | None = Query(None),
) -> DAGRunResponse:
"""Modify a DAG Run."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

dag: DAG = request.app.state.dag_bag.get_dag(dag_id)

if not dag:
raise HTTPException(404, f"Dag with id {dag_id} was not found")

if update_mask:
if update_mask != ["state"]:
raise HTTPException(400, "Only `state` field can be updated through the REST API")
else:
update_mask = ["state"]

for attr_name in update_mask:
if attr_name == "state":
state = getattr(patch_body, attr_name)
if state == DAGRunPatchStates.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True)
elif state == DAGRunPatchStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True)
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True)

dag_run = session.get(DagRun, dag_run.id)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
15 changes: 15 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,28 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType


class DAGRunPatchStates(str, Enum):
"""Enum for DAG Run states when updating a DAG Run."""

QUEUED = DagRunState.QUEUED
SUCCESS = DagRunState.SUCCESS
FAILED = DagRunState.FAILED


class DAGRunPatchBody(BaseModel):
"""DAG Run Serializer for PATCH requests."""

pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
state: DAGRunPatchStates


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type DagRunServicePatchDagRunStateMutationResult = Awaited<
ReturnType<typeof DagRunService.patchDagRunState>
>;
export type PoolServicePatchPoolMutationResult = Awaited<
ReturnType<typeof PoolService.patchPool>
>;
Expand Down
52 changes: 52 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "../requests/services.gen";
import {
DAGPatchBody,
DAGRunPatchBody,
DagRunState,
PoolPatchBody,
PoolPostBody,
Expand Down Expand Up @@ -948,6 +949,57 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Dag Run State
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @param data.updateMask
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useDagRunServicePatchDagRunState = <
TData = Common.DagRunServicePatchDagRunStateMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
updateMask?: string[];
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
updateMask?: string[];
},
TContext
>({
mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
DagRunService.patchDagRunState({
dagId,
dagRunId,
requestBody,
updateMask,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Pool
* Update a Pool.
Expand Down
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,25 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;

export const $DAGRunPatchBody = {
properties: {
state: {
$ref: "#/components/schemas/DAGRunPatchStates",
},
},
type: "object",
required: ["state"],
title: "DAGRunPatchBody",
description: "DAG Run Serializer for PATCH requests.",
} as const;

export const $DAGRunPatchStates = {
type: "string",
enum: ["queued", "success", "failed"],
title: "DAGRunPatchStates",
description: "Enum for DAG Run states when updating a DAG Run.",
} as const;

export const $DAGRunResponse = {
properties: {
run_id: {
Expand Down
38 changes: 38 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import type {
GetDagRunResponse,
DeleteDagRunData,
DeleteDagRunResponse,
PatchDagRunStateData,
PatchDagRunStateResponse,
GetHealthResponse,
DeletePoolData,
DeletePoolResponse,
Expand Down Expand Up @@ -672,6 +674,42 @@ export class DagRunService {
},
});
}

/**
* Patch Dag Run State
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @param data.updateMask
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
public static patchDagRunState(
data: PatchDagRunStateData,
): CancelablePromise<PatchDagRunStateResponse> {
return __request(OpenAPI, {
method: "PATCH",
url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
path: {
dag_id: data.dagId,
dag_run_id: data.dagRunId,
},
query: {
update_mask: data.updateMask,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class MonitorService {
Expand Down
Loading
Loading