Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84: Migrate Dag Parsing endpoint to FastApi #44416

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

Expand All @@ -38,6 +39,7 @@
from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest


@mark_fastapi_migration_done
@security.requires_access_dag("PUT")
@provide_session
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
Expand Down
46 changes: 46 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5612,6 +5612,52 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/parseDagFile/{file_token}:
put:
tags:
- DAG Parsing
summary: Reparse Dag File
description: Request re-parsing a DAG file.
operationId: reparse_dag_file
parameters:
- name: file_token
in: path
required: true
schema:
type: string
title: File Token
responses:
'201':
description: Successful Response
content:
application/json:
schema:
type: 'null'
title: Response Reparse Dag File
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.api_fastapi.core_api.routes.public.backfills import backfills_router
from airflow.api_fastapi.core_api.routes.public.config import config_router
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_parsing import dag_parsing_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dag_sources import dag_sources_router
from airflow.api_fastapi.core_api.routes.public.dag_stats import dag_stats_router
Expand Down Expand Up @@ -73,6 +74,7 @@
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
authenticated_router.include_router(task_instances_log_router)
authenticated_router.include_router(dag_parsing_router)


# Include authenticated router in public router
Expand Down
66 changes: 66 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Annotated

from fastapi import Depends, HTTPException, Request, status
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy import select
from sqlalchemy.orm import Session

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest

if TYPE_CHECKING:
from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest

dag_parsing_router = AirflowRouter(tags=["DAG Parsing"], prefix="/parseDagFile/{file_token}")


@dag_parsing_router.put(
"",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
prabhusneha marked this conversation as resolved.
Show resolved Hide resolved
status_code=status.HTTP_201_CREATED,
)
def reparse_dag_file(
file_token: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
) -> None:
"""Request re-parsing a DAG file."""
secret_key = request.app.state.secret_key
auth_s = URLSafeSerializer(secret_key)
try:
path = auth_s.loads(file_token)
except BadSignature:
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")

requests: Sequence[IsAuthorizedDagRequest] = [
{"method": "PUT", "details": DagDetails(id=dag_id)}
for dag_id in session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == path))
]
if not requests:
raise HTTPException(status.HTTP_404_NOT_FOUND, "File not found")

parsing_request = DagPriorityParsingRequest(fileloc=path)
session.add(parsing_request)
4 changes: 4 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagParsingService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -1629,6 +1630,9 @@ export type BackfillServiceUnpauseBackfillMutationResult = Awaited<
export type BackfillServiceCancelBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.cancelBackfill>
>;
export type DagParsingServiceReparseDagFileMutationResult = Awaited<
ReturnType<typeof DagParsingService.reparseDagFile>
>;
export type ConnectionServicePatchConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.patchConnection>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
BackfillService,
ConfigService,
ConnectionService,
DagParsingService,
DagRunService,
DagService,
DagSourceService,
Expand Down Expand Up @@ -3175,6 +3176,45 @@ export const useBackfillServiceCancelBackfill = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Reparse Dag File
* Request re-parsing a DAG file.
* @param data The data for the request.
* @param data.fileToken
* @returns null Successful Response
* @throws ApiError
*/
export const useDagParsingServiceReparseDagFile = <
TData = Common.DagParsingServiceReparseDagFileMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
fileToken: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
fileToken: string;
},
TContext
>({
mutationFn: ({ fileToken }) =>
DagParsingService.reparseDagFile({
fileToken,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Connection
* Update a connection entry.
Expand Down
30 changes: 30 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ import type {
GetVariablesResponse,
PostVariableData,
PostVariableResponse,
ReparseDagFileData,
ReparseDagFileResponse,
GetHealthResponse,
GetVersionResponse,
} from "./types.gen";
Expand Down Expand Up @@ -2964,6 +2966,34 @@ export class VariableService {
}
}

export class DagParsingService {
/**
* Reparse Dag File
* Request re-parsing a DAG file.
* @param data The data for the request.
* @param data.fileToken
* @returns null Successful Response
* @throws ApiError
*/
public static reparseDagFile(
data: ReparseDagFileData,
): CancelablePromise<ReparseDagFileResponse> {
return __request(OpenAPI, {
method: "PUT",
url: "/public/parseDagFile/{file_token}",
path: {
file_token: data.fileToken,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}
}

export class MonitorService {
/**
* Get Health
Expand Down
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2034,6 +2034,12 @@ export type PostVariableData = {

export type PostVariableResponse = VariableResponse;

export type ReparseDagFileData = {
fileToken: string;
};

export type ReparseDagFileResponse = null;

export type GetHealthResponse = HealthInfoSchema;

export type GetVersionResponse = VersionInfo;
Expand Down Expand Up @@ -4315,6 +4321,33 @@ export type $OpenApiTs = {
};
};
};
"/public/parseDagFile/{file_token}": {
put: {
req: ReparseDagFileData;
res: {
/**
* Successful Response
*/
201: null;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/monitor/health": {
get: {
res: {
Expand Down
80 changes: 80 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pytest
from sqlalchemy import select

from airflow.models import DagBag
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.session import provide_session

from tests_common.test_utils.db import clear_db_dag_parsing_requests

pytestmark = pytest.mark.db_test

if TYPE_CHECKING:
from airflow.models.dag import DAG


class TestDagParsingEndpoint:
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
EXAMPLE_DAG_FILE = os.path.join("airflow", "example_dags", "example_bash_operator.py")
EXAMPLE_DAG_ID = "example_bash_operator"
TEST_DAG_ID = "latest_only"
NOT_READABLE_DAG_ID = "latest_only_with_trigger"
TEST_MULTIPLE_DAGS_ID = "asset_produces_1"

@staticmethod
def clear_db():
clear_db_dag_parsing_requests()

@provide_session
@pytest.fixture(autouse=True)
def setup(self, session=None) -> None:
self.clear_db()

def teardown_method(self) -> None:
self.clear_db()

def test_201_and_400_requests(self, url_safe_serializer, session, test_client):
dagbag = DagBag(dag_folder=self.EXAMPLE_DAG_FILE)
dagbag.sync_to_db()
test_dag: DAG = dagbag.dags[self.TEST_DAG_ID]

url = f"/public/parseDagFile/{url_safe_serializer.dumps(test_dag.fileloc)}"
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 201
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests[0].fileloc == test_dag.fileloc

# Duplicate file parsing request
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 409
parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests[0].fileloc == test_dag.fileloc

def test_bad_file_request(self, url_safe_serializer, session, test_client):
url = f"/public/parseDagFile/{url_safe_serializer.dumps('/some/random/file.py')}"
response = test_client.put(url, headers={"Accept": "application/json"})
assert response.status_code == 404

parsing_requests = session.scalars(select(DagPriorityParsingRequest)).all()
assert parsing_requests == []