Skip to content

Commit

Permalink
add clear_dag_run
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar committed Oct 13, 2024
1 parent f1735b4 commit a418bcf
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 2 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, session: Session = NEW
return dagrun_schema.dump(dag_run)


@mark_fastapi_migration_done
@security.requires_access_dag("PUT", DagAccessEntity.RUN)
@action_logging
@provide_session
Expand Down
64 changes: 64 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,61 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear:
post:
tags:
- DagRun
summary: Clear Dag Run
operationId: clear_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunClearBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
components:
schemas:
ConnectionResponse:
Expand Down Expand Up @@ -1147,6 +1202,15 @@ components:
- file_token
title: DAGResponse
description: DAG serializer for responses.
DAGRunClearBody:
properties:
dry_run:
type: boolean
title: Dry Run
default: true
type: object
title: DAGRunClearBody
description: DAG Run serializer for clear endpoint body.
DAGRunResponse:
properties:
run_id:
Expand Down
6 changes: 6 additions & 0 deletions airflow/api_fastapi/serializers/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ class DAGRunResponse(BaseModel):
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None


class DAGRunClearBody(BaseModel):
"""DAG Run serializer for clear endpoint body."""

dry_run: bool = True
29 changes: 28 additions & 1 deletion airflow/api_fastapi/views/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.serializers.dag_run import DAGRunClearBody, DAGRunResponse
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.models import DagRun

Expand All @@ -42,3 +42,30 @@ async def get_dag_run(
)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)


@dag_run_router.post("/{dag_run_id}/clear", responses=create_openapi_http_exception_doc([401, 403, 404]))
async def clear_dag_run(
dag_id: str, dag_run_id: str, dry_run: DAGRunClearBody, session: Annotated[Session, Depends(get_session)]
):
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
if dag_run is None:
raise HTTPException(
404, f"The DagRun with dag_id: `{dag_id}` and run_id: `{dag_run_id}` was not found"
)

if dry_run.dry_run:
task_instances = dag_run.clear(
start_date=dag_run.start_date,
end_date=dag_run.end_date,
task_ids=None,
only_failed=False,
dry_run=True,
)
return task_instances # Need to create TaskInstance serializer
else:
dag_run.clear(
start_date=dag_run.start_date, end_date=dag_run.end_date, task_ids=None, only_failed=False
)
dag_run_cleared = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ export const UseDagRunServiceGetDagRunKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type DagRunServiceClearDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.clearDagRun>
>;
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
Expand Down
52 changes: 51 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import {
DashboardService,
VariableService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState } from "../requests/types.gen";
import {
DAGPatchBody,
DAGRunClearBody,
DagRunState,
} from "../requests/types.gen";
import * as Common from "./common";

/**
Expand Down Expand Up @@ -295,6 +299,52 @@ export const useDagRunServiceGetDagRun = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
/**
* Clear Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagRunServiceClearDagRun = <
TData = Common.DagRunServiceClearDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunClearBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
dagRunId: string;
requestBody: DAGRunClearBody;
},
TContext
>({
mutationFn: ({ dagId, dagRunId, requestBody }) =>
DagRunService.clearDagRun({
dagId,
dagRunId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Dags
* Patch multiple DAGs.
Expand Down
13 changes: 13 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,19 @@ export const $DAGResponse = {
description: "DAG serializer for responses.",
} as const;

export const $DAGRunClearBody = {
properties: {
dry_run: {
type: "boolean",
title: "Dry Run",
default: true,
},
},
type: "object",
title: "DAGRunClearBody",
description: "DAG Run serializer for clear endpoint body.",
} as const;

export const $DAGRunResponse = {
properties: {
run_id: {
Expand Down
32 changes: 32 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import type {
GetVariableResponse,
GetDagRunData,
GetDagRunResponse,
ClearDagRunData,
ClearDagRunResponse,
} from "./types.gen";

export class AssetService {
Expand Down Expand Up @@ -391,4 +393,34 @@ export class DagRunService {
},
});
}

/**
* Clear Dag Run
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
public static clearDagRun(
data: ClearDagRunData,
): CancelablePromise<ClearDagRunResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear",
path: {
dag_id: data.dagId,
dag_run_id: data.dagRunId,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}
42 changes: 42 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ export type DAGResponse = {
readonly file_token: string;
};

/**
* DAG Run serializer for clear endpoint body.
*/
export type DAGRunClearBody = {
dry_run?: boolean;
};

/**
* DAG Run serializer for responses.
*/
Expand Down Expand Up @@ -355,6 +362,14 @@ export type GetDagRunData = {

export type GetDagRunResponse = DAGRunResponse;

export type ClearDagRunData = {
dagId: string;
dagRunId: string;
requestBody: DAGRunClearBody;
};

export type ClearDagRunResponse = unknown;

export type $OpenApiTs = {
"/ui/next_run_assets/{dag_id}": {
get: {
Expand Down Expand Up @@ -658,4 +673,31 @@ export type $OpenApiTs = {
};
};
};
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/clear": {
post: {
req: ClearDagRunData;
res: {
/**
* Successful Response
*/
200: unknown;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
};

0 comments on commit a418bcf

Please sign in to comment.