Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate Modify Dag Run endpoint to FastAPI #42973

Merged
merged 24 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e50c43e
add modify_dag_run
rawwar Oct 13, 2024
4a62ed5
add tests
rawwar Oct 13, 2024
c668f3d
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 13, 2024
e1fa0d5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 15, 2024
abf01d9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 15, 2024
444956d
Update airflow/api_fastapi/views/public/dag_run.py
rawwar Oct 15, 2024
69b1439
fix
rawwar Oct 15, 2024
1b93b83
merge conflict fix
rawwar Oct 16, 2024
521c6fc
Update airflow/api_fastapi/routes/public/dag_run.py
rawwar Oct 16, 2024
7e00f2f
Update airflow/api_fastapi/serializers/dag_run.py
rawwar Oct 16, 2024
8956fb9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 16, 2024
e37b2b5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
70b7ea5
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
2614e45
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 18, 2024
816dcb8
use dagbag
rawwar Oct 19, 2024
f1dad1c
replace patch with put
rawwar Oct 19, 2024
28b6db8
refactor
rawwar Oct 19, 2024
7d66c48
use put in tests
rawwar Oct 19, 2024
07090a9
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 29, 2024
63943b5
modify to patch
rawwar Oct 29, 2024
7802d8f
add update_mask
rawwar Oct 29, 2024
da21c06
refactor update to patch
rawwar Oct 29, 2024
a23fe1f
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Oct 29, 2024
afdffbd
Merge branch 'main' into kalyan/AIP-84/modify_dag_run
rawwar Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@provide_session
@action_logging
Expand Down
79 changes: 79 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,68 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- DagRun
summary: Modify Dag Run
description: Modify a DAG Run.
operationId: modify_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunPatchBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down Expand Up @@ -1453,6 +1515,23 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunModifyFormStates:
type: string
enum:
- queued
- success
- failed
title: DAGRunModifyFormStates
description: Enum for DAG Run states when updating a DAG Run.
DAGRunPatchBody:
properties:
state:
$ref: '#/components/schemas/DAGRunModifyFormStates'
type: object
required:
- state
title: DAGRunPatchBody
description: DAG Run Serializer for PATCH requests.
DAGRunResponse:
properties:
run_id:
Expand Down
18 changes: 17 additions & 1 deletion airflow/api_fastapi/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.serializers.dag_run import DAGRunPatchBody, DAGRunResponse
from airflow.models import DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")
Expand Down Expand Up @@ -57,3 +57,19 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio
)

session.delete(dag_run)


@dag_run_router.patch("/{dag_run_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
async def modify_dag_run(
rawwar marked this conversation as resolved.
Show resolved Hide resolved
dag_id: str, dag_run_id: str, state: DAGRunPatchBody, session: Annotated[Session, Depends(get_session)]
) -> DAGRunResponse:
"""Modify a DAG Run."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

setattr(dag_run, "state", state.state)

pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
return DAGRunResponse.model_validate(dag_run, from_attributes=True)
15 changes: 15 additions & 0 deletions airflow/api_fastapi/serializers/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,28 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum

from pydantic import BaseModel, Field

from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType


class DAGRunModifyFormStates(str, Enum):
rawwar marked this conversation as resolved.
Show resolved Hide resolved
"""Enum for DAG Run states when updating a DAG Run."""

QUEUED = DagRunState.QUEUED
SUCCESS = DagRunState.SUCCESS
FAILED = DagRunState.FAILED


class DAGRunPatchBody(BaseModel):
"""DAG Run Serializer for PATCH requests."""

state: DAGRunModifyFormStates


class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type DagRunServiceModifyDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.modifyDagRun>
>;
export type DagServiceDeleteDagMutationResult = Awaited<
ReturnType<typeof DagService.deleteDag>
>;
Expand Down
54 changes: 53 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import {
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState, VariableBody } from "../requests/types.gen";
import {
DAGPatchBody,
DAGRunPatchBody,
DagRunState,
VariableBody,
} from "../requests/types.gen";
import * as Common from "./common";

/**
Expand Down Expand Up @@ -569,6 +574,53 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Modify Dag Run
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useDagRunServiceModifyDagRun = <
TData = Common.DagRunServiceModifyDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
},
TContext
>({
mutationFn: ({ dagId, dagRunId, requestBody }) =>
DagRunService.modifyDagRun({
dagId,
dagRunId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag
* Delete the specific DAG.
Expand Down
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,25 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;

export const $DAGRunModifyFormStates = {
type: "string",
enum: ["queued", "success", "failed"],
title: "DAGRunModifyFormStates",
description: "Enum for DAG Run states when updating a DAG Run.",
} as const;

export const $DAGRunPatchBody = {
properties: {
state: {
$ref: "#/components/schemas/DAGRunModifyFormStates",
},
},
type: "object",
required: ["state"],
title: "DAGRunPatchBody",
description: "DAG Run Serializer for PATCH requests.",
} as const;

export const $DAGRunResponse = {
properties: {
run_id: {
Expand Down
34 changes: 34 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import type {
GetDagRunResponse,
DeleteDagRunData,
DeleteDagRunResponse,
ModifyDagRunData,
ModifyDagRunResponse,
GetHealthResponse,
} from "./types.gen";

Expand Down Expand Up @@ -546,6 +548,38 @@ export class DagRunService {
},
});
}

/**
* Modify Dag Run
* Modify a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
public static modifyDagRun(
data: ModifyDagRunData,
): CancelablePromise<ModifyDagRunResponse> {
return __request(OpenAPI, {
method: "PATCH",
url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
path: {
dag_id: data.dagId,
dag_run_id: data.dagRunId,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class MonitorService {
Expand Down
49 changes: 49 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ export type DAGResponse = {
readonly file_token: string;
};

/**
* Enum for DAG Run states when updating a DAG Run.
*/
export type DAGRunModifyFormStates = "queued" | "success" | "failed";

/**
* DAG Run Serializer for PATCH requests.
*/
export type DAGRunPatchBody = {
state: DAGRunModifyFormStates;
};

/**
* DAG Run serializer for responses.
*/
Expand Down Expand Up @@ -448,6 +460,14 @@ export type DeleteDagRunData = {

export type DeleteDagRunResponse = void;

export type ModifyDagRunData = {
dagId: string;
dagRunId: string;
requestBody: DAGRunPatchBody;
};

export type ModifyDagRunResponse = DAGRunResponse;

export type GetHealthResponse = HealthInfoSchema;

export type $OpenApiTs = {
Expand Down Expand Up @@ -889,6 +909,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
patch: {
req: ModifyDagRunData;
res: {
/**
* Successful Response
*/
200: DAGRunResponse;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/monitor/health": {
get: {
Expand Down
Loading