From dc9c1db63b992018b005d1a1d63b76495faeff90 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 Nov 2024 18:52:07 +0530 Subject: [PATCH 1/4] AIP84-get-assets-queued-events endpoint migration to fastapi --- .../api_connexion/endpoints/asset_endpoint.py | 1 + .../core_api/openapi/v1-generated.yaml | 53 +++++++++++++++++++ .../core_api/routes/public/assets.py | 44 +++++++++++++++ airflow/ui/openapi-gen/queries/common.ts | 22 ++++++++ airflow/ui/openapi-gen/queries/prefetch.ts | 23 ++++++++ airflow/ui/openapi-gen/queries/queries.ts | 32 +++++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 32 +++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 32 +++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 34 ++++++++++++ .../core_api/routes/public/test_assets.py | 36 +++++++++++++ 10 files changed, 309 insertions(+) diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index ff47db8838799..6837b1eb446e2 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -271,6 +271,7 @@ def delete_dag_asset_queued_events( ) +@mark_fastapi_migration_done @security.requires_access_asset("GET") @provide_session def get_asset_queued_events( diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 8678b21c9b0d3..e70bf1432bbc6 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -487,6 +487,59 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/assets/queuedEvent/{uri}: + get: + tags: + - Asset + summary: Get Asset Queued Events + description: Get queued asset events for an asset. + operationId: get_asset_queued_events + parameters: + - name: uri + in: path + required: true + schema: + type: string + title: Uri + - name: before + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Before + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/backfills/: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 37fbb6e58783f..c2dd69347eeba 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -252,3 +252,47 @@ def get_dag_asset_queued_events( ], total_entries=total_entries, ) + + +@assets_router.get( + "/assets/queuedEvent/{uri:path}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_asset_queued_events( + uri: str, + session: Annotated[Session, Depends(get_session)], + before: OptionalDateTimeQuery = None, +) -> QueuedEventCollectionResponse: + """Get queued asset events for an asset.""" + where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in adrqs + ] + + return QueuedEventCollectionResponse( + queued_events=[ + QueuedEventResponse.model_validate(queued_event, from_attributes=True) + for queued_event in queued_events + ], + total_entries=total_entries, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 7b23e33f0ab4a..305f1c42fb26a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -151,6 +151,28 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = ( useAssetServiceGetDagAssetQueuedEventsKey, ...(queryKey ?? [{ before, dagId }]), ]; +export type AssetServiceGetAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetAssetQueuedEventsQueryResult< + TData = AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetAssetQueuedEventsKey = + "AssetServiceGetAssetQueuedEvents"; +export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: Array, +) => [ + useAssetServiceGetAssetQueuedEventsKey, + ...(queryKey ?? [{ before, uri }]), +]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 0c522f36e4330..736e539f87088 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -191,6 +191,29 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvents = ( }), queryFn: () => AssetService.getDagAssetQueuedEvents({ before, dagId }), }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + uri, + }: { + before?: string; + uri: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 8ec0ea9234aca..b5f65fc42fbd0 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -246,6 +246,38 @@ export const useAssetServiceGetDagAssetQueuedEvents = < AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, ...options, }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetQueuedEvents = < + TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( + { before, uri }, + queryKey, + ), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 1b81422281535..15bf4e144003d 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -228,6 +228,38 @@ export const useAssetServiceGetDagAssetQueuedEventsSuspense = < AssetService.getDagAssetQueuedEvents({ before, dagId }) as TData, ...options, }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetQueuedEventsSuspense = < + TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( + { before, uri }, + queryKey, + ), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + ...options, + }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 361cde79fc5df..805995d0510c6 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -15,6 +15,8 @@ import type { GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, + GetAssetQueuedEventsData, + GetAssetQueuedEventsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -306,6 +308,36 @@ export class AssetService { }, }); } + + /** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getAssetQueuedEvents( + data: GetAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/assets/queuedEvent/{uri}", + path: { + uri: data.uri, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 948b3149c4fbe..5a59f4e0fae7f 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1069,6 +1069,13 @@ export type GetDagAssetQueuedEventsData = { export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; +export type GetAssetQueuedEventsData = { + before?: string | null; + uri: string; +}; + +export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; + export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1692,6 +1699,33 @@ export type $OpenApiTs = { }; }; }; + "/public/assets/queuedEvent/{uri}": { + get: { + req: GetAssetQueuedEventsData; + res: { + /** + * Successful Response + */ + 200: QueuedEventCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/dashboard/historical_metrics_data": { get: { req: HistoricalMetricsData; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 4e127ffd0c7f3..19413934e2629 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -549,3 +549,39 @@ def test_invalid_attr_not_allowed(self, test_client, session): response = test_client.post("/public/assets/events", json=event_invalid_payload) assert response.status_code == 422 + + +class TestGetAssetQueuedEvents(TestQueuedEventEndpoint): + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session, create_dummy_dag): + dag, _ = create_dummy_dag() + dag_id = dag.dag_id + self.create_assets(session=session, num=1) + uri = "s3://bucket/key/1" + asset_id = 1 + self._create_asset_dag_run_queues(dag_id, asset_id, session) + + response = test_client.get( + f"/public/assets/queuedEvent/{uri}", + ) + assert response.status_code == 200 + assert response.json() == { + "queued_events": [ + { + "created_at": self.default_time.replace("+00:00", "Z"), + "uri": "s3://bucket/key/1", + "dag_id": "dag", + } + ], + "total_entries": 1, + } + + def test_should_respond_404(self, test_client): + uri = "not_exists" + + response = test_client.get( + f"/public/assets/queuedEvent/{uri}", + ) + + assert response.status_code == 404 + assert response.json()["detail"] == "Queue event with uri: `not_exists` was not found" From 3148957d4817d45e04ebf2db71ebe67fb60e571a Mon Sep 17 00:00:00 2001 From: vatsrahul1001 <43964496+vatsrahul1001@users.noreply.github.com> Date: Mon, 18 Nov 2024 20:30:37 +0530 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Pierre Jeambrun --- airflow/api_fastapi/core_api/routes/public/assets.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index c2dd69347eeba..09c89d5a7af82 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -255,7 +255,7 @@ def get_dag_asset_queued_events( @assets_router.get( - "/assets/queuedEvent/{uri:path}", + "/assets/queuedEvents/{uri:path}", responses=create_openapi_http_exception_doc( [ status.HTTP_404_NOT_FOUND, @@ -290,9 +290,6 @@ def get_asset_queued_events( ] return QueuedEventCollectionResponse( - queued_events=[ - QueuedEventResponse.model_validate(queued_event, from_attributes=True) - for queued_event in queued_events - ], + queued_events=queued_events, total_entries=total_entries, ) From 4ea66ed2d70760f00adc80e80321c81f7b710e03 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 Nov 2024 22:40:28 +0530 Subject: [PATCH 3/4] updating typo in endpoint name --- airflow/api_fastapi/core_api/routes/public/assets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index cc563a0faeb54..181fe86e70141 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -279,7 +279,7 @@ def delete_dag_asset_queued_events( @assets_router.get( - "/assets/queuedEvents/{uri:path}", + "/assets/queuedEvent/{uri:path}", responses=create_openapi_http_exception_doc( [ status.HTTP_404_NOT_FOUND, From ce4cb7db87b9f4a468d9591e509f3a799fe759ab Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 19 Nov 2024 15:58:58 +0530 Subject: [PATCH 4/4] address review comments --- .../core_api/openapi/v1-generated.yaml | 144 +++++++++--------- .../core_api/routes/public/assets.py | 83 +++++----- airflow/ui/openapi-gen/queries/common.ts | 50 +++--- airflow/ui/openapi-gen/queries/prefetch.ts | 46 +++--- airflow/ui/openapi-gen/queries/queries.ts | 114 +++++++------- airflow/ui/openapi-gen/queries/suspense.ts | 64 ++++---- .../ui/openapi-gen/requests/services.gen.ts | 128 ++++++++-------- airflow/ui/openapi-gen/requests/types.gen.ts | 80 +++++----- 8 files changed, 355 insertions(+), 354 deletions(-) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index ca5126a526fdc..8b0a0e040dcb3 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -395,13 +395,13 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/{uri}: + /public/assets/queuedEvent/{uri}: get: tags: - Asset - summary: Get Asset - description: Get an asset. - operationId: get_asset + summary: Get Asset Queued Events + description: Get queued asset events for an asset. + operationId: get_asset_queued_events parameters: - name: uri in: path @@ -409,13 +409,21 @@ paths: schema: type: string title: Uri + - name: before + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Before responses: '200': description: Successful Response content: application/json: schema: - $ref: '#/components/schemas/AssetResponse' + $ref: '#/components/schemas/QueuedEventCollectionResponse' '401': content: application/json: @@ -440,20 +448,19 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvent: - get: + delete: tags: - Asset - summary: Get Dag Asset Queued Events - description: Get queued asset events for a DAG. - operationId: get_dag_asset_queued_events + summary: Delete Asset Queued Events + description: Delete queued asset events for an asset. + operationId: delete_asset_queued_events parameters: - - name: dag_id + - name: uri in: path required: true schema: type: string - title: Dag Id + title: Uri - name: before in: query required: false @@ -463,12 +470,8 @@ paths: - type: 'null' title: Before responses: - '200': + '204': description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/QueuedEventCollectionResponse' '401': content: application/json: @@ -493,29 +496,27 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - delete: + /public/assets/{uri}: + get: tags: - Asset - summary: Delete Dag Asset Queued Events - operationId: delete_dag_asset_queued_events + summary: Get Asset + description: Get an asset. + operationId: get_asset parameters: - - name: dag_id + - name: uri in: path required: true schema: type: string - title: Dag Id - - name: before - in: query - required: false - schema: - anyOf: - - type: string - - type: 'null' - title: Before + title: Uri responses: - '204': + '200': description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetResponse' '401': content: application/json: @@ -528,12 +529,6 @@ paths: schema: $ref: '#/components/schemas/HTTPExceptionResponse' description: Forbidden - '400': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPExceptionResponse' - description: Bad Request '404': content: application/json: @@ -546,13 +541,13 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/dags/{dag_id}/assets/queuedEvent/{uri}: + /public/dags/{dag_id}/assets/queuedEvent: get: tags: - Asset - summary: Get Dag Asset Queued Event - description: Get a queued asset event for a DAG. - operationId: get_dag_asset_queued_event + summary: Get Dag Asset Queued Events + description: Get queued asset events for a DAG. + operationId: get_dag_asset_queued_events parameters: - name: dag_id in: path @@ -560,12 +555,6 @@ paths: schema: type: string title: Dag Id - - name: uri - in: path - required: true - schema: - type: string - title: Uri - name: before in: query required: false @@ -580,7 +569,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/QueuedEventResponse' + $ref: '#/components/schemas/QueuedEventCollectionResponse' '401': content: application/json: @@ -608,9 +597,8 @@ paths: delete: tags: - Asset - summary: Delete Dag Asset Queued Event - description: Delete a queued asset event for a DAG. - operationId: delete_dag_asset_queued_event + summary: Delete Dag Asset Queued Events + operationId: delete_dag_asset_queued_events parameters: - name: dag_id in: path @@ -618,12 +606,6 @@ paths: schema: type: string title: Dag Id - - name: uri - in: path - required: true - schema: - type: string - title: Uri - name: before in: query required: false @@ -665,14 +647,20 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /public/assets/queuedEvent/{uri}: - delete: + /public/dags/{dag_id}/assets/queuedEvent/{uri}: + get: tags: - Asset - summary: Delete Asset Queued Events - description: Delete queued asset events for an asset. - operationId: delete_asset_queued_events + summary: Get Dag Asset Queued Event + description: Get a queued asset event for a DAG. + operationId: get_dag_asset_queued_event parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id - name: uri in: path required: true @@ -688,8 +676,12 @@ paths: - type: 'null' title: Before responses: - '204': + '200': description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/QueuedEventResponse' '401': content: application/json: @@ -714,13 +706,19 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - get: + delete: tags: - Asset - summary: Get Asset Queued Events - description: Get queued asset events for an asset. - operationId: get_asset_queued_events + summary: Delete Dag Asset Queued Event + description: Delete a queued asset event for a DAG. + operationId: delete_dag_asset_queued_event parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id - name: uri in: path required: true @@ -736,12 +734,8 @@ paths: - type: 'null' title: Before responses: - '200': + '204': description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/QueuedEventCollectionResponse' '401': content: application/json: @@ -754,6 +748,12 @@ paths: schema: $ref: '#/components/schemas/HTTPExceptionResponse' description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request '404': content: application/json: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 9e67cfee3cd3b..8388c5fcf7bfe 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -189,6 +189,48 @@ def create_asset_event( return AssetEventResponse.model_validate(assets_event, from_attributes=True) +@assets_router.get( + "/assets/queuedEvent/{uri:path}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + ] + ), +) +def get_asset_queued_events( + uri: str, + session: Annotated[Session, Depends(get_session)], + before: OptionalDateTimeQuery = None, +) -> QueuedEventCollectionResponse: + """Get queued asset events for an asset.""" + print(f"uri: {uri}") + where_clause = _generate_queued_event_where_clause(uri=uri, before=before) + query = ( + select(AssetDagRunQueue, AssetModel.uri) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) + .where(*where_clause) + ) + + dag_asset_queued_events_select, total_entries = paginated_select( + query, + [], + ) + adrqs = session.execute(dag_asset_queued_events_select).all() + + if not adrqs: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") + + queued_events = [ + QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) + for adrq, uri in adrqs + ] + + return QueuedEventCollectionResponse( + queued_events=queued_events, + total_entries=total_entries, + ) + + @assets_router.get( "/assets/{uri:path}", responses=create_openapi_http_exception_doc([401, 403, 404]), @@ -358,44 +400,3 @@ def delete_dag_asset_queued_event( status.HTTP_404_NOT_FOUND, detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found", ) - - -@assets_router.get( - "/assets/queuedEvent/{uri:path}", - responses=create_openapi_http_exception_doc( - [ - status.HTTP_404_NOT_FOUND, - ] - ), -) -def get_asset_queued_events( - uri: str, - session: Annotated[Session, Depends(get_session)], - before: OptionalDateTimeQuery = None, -) -> QueuedEventCollectionResponse: - """Get queued asset events for an asset.""" - where_clause = _generate_queued_event_where_clause(uri=uri, before=before) - query = ( - select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) - .where(*where_clause) - ) - - dag_asset_queued_events_select, total_entries = paginated_select( - query, - [], - ) - adrqs = session.execute(dag_asset_queued_events_select).all() - - if not adrqs: - raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found") - - queued_events = [ - QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri) - for adrq, uri in adrqs - ] - - return QueuedEventCollectionResponse( - queued_events=queued_events, - total_entries=total_entries, - ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 8b1b692e63d42..803f13fbe4e6d 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -113,6 +113,28 @@ export const UseAssetServiceGetAssetEventsKeyFn = ( }, ]), ]; +export type AssetServiceGetAssetQueuedEventsDefaultResponse = Awaited< + ReturnType +>; +export type AssetServiceGetAssetQueuedEventsQueryResult< + TData = AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useAssetServiceGetAssetQueuedEventsKey = + "AssetServiceGetAssetQueuedEvents"; +export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: Array, +) => [ + useAssetServiceGetAssetQueuedEventsKey, + ...(queryKey ?? [{ before, uri }]), +]; export type AssetServiceGetAssetDefaultResponse = Awaited< ReturnType >; @@ -175,28 +197,6 @@ export const UseAssetServiceGetDagAssetQueuedEventKeyFn = ( useAssetServiceGetDagAssetQueuedEventKey, ...(queryKey ?? [{ before, dagId, uri }]), ]; -export type AssetServiceGetAssetQueuedEventsDefaultResponse = Awaited< - ReturnType ->; -export type AssetServiceGetAssetQueuedEventsQueryResult< - TData = AssetServiceGetAssetQueuedEventsDefaultResponse, - TError = unknown, -> = UseQueryResult; -export const useAssetServiceGetAssetQueuedEventsKey = - "AssetServiceGetAssetQueuedEvents"; -export const UseAssetServiceGetAssetQueuedEventsKeyFn = ( - { - before, - uri, - }: { - before?: string; - uri: string; - }, - queryKey?: Array, -) => [ - useAssetServiceGetAssetQueuedEventsKey, - ...(queryKey ?? [{ before, uri }]), -]; export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited< ReturnType >; @@ -1204,15 +1204,15 @@ export type PoolServicePatchPoolMutationResult = Awaited< export type VariableServicePatchVariableMutationResult = Awaited< ReturnType >; +export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited< + ReturnType +>; export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited< ReturnType >; export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited< ReturnType >; -export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited< - ReturnType ->; export type ConnectionServiceDeleteConnectionMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 76aacbca24780..8ebc69e02cd9c 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -145,6 +145,29 @@ export const prefetchUseAssetServiceGetAssetEvents = ( sourceTaskId, }), }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseAssetServiceGetAssetQueuedEvents = ( + queryClient: QueryClient, + { + before, + uri, + }: { + before?: string; + uri: string; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), + }); /** * Get Asset * Get an asset. @@ -221,29 +244,6 @@ export const prefetchUseAssetServiceGetDagAssetQueuedEvent = ( }), queryFn: () => AssetService.getDagAssetQueuedEvent({ before, dagId, uri }), }); -/** - * Get Asset Queued Events - * Get queued asset events for an asset. - * @param data The data for the request. - * @param data.uri - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const prefetchUseAssetServiceGetAssetQueuedEvents = ( - queryClient: QueryClient, - { - before, - uri, - }: { - before?: string; - uri: string; - }, -) => - queryClient.prefetchQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }), - }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 55311f2a262af..05268e7b54bed 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -187,6 +187,38 @@ export const useAssetServiceGetAssetEvents = < }) as TData, ...options, }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetQueuedEvents = < + TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( + { before, uri }, + queryKey, + ), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + ...options, + }); /** * Get Asset * Get an asset. @@ -282,38 +314,6 @@ export const useAssetServiceGetDagAssetQueuedEvent = < AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, ...options, }); -/** - * Get Asset Queued Events - * Get queued asset events for an asset. - * @param data The data for the request. - * @param data.uri - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useAssetServiceGetAssetQueuedEvents = < - TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - uri, - }: { - before?: string; - uri: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, - queryKey, - ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, - ...options, - }); /** * Historical Metrics * Return cluster activity historical metrics. @@ -2620,15 +2620,16 @@ export const useVariableServicePatchVariable = < ...options, }); /** - * Delete Dag Asset Queued Events + * Delete Asset Queued Events + * Delete queued asset events for an asset. * @param data The data for the request. - * @param data.dagId + * @param data.uri * @param data.before * @returns void Successful Response * @throws ApiError */ -export const useAssetServiceDeleteDagAssetQueuedEvents = < - TData = Common.AssetServiceDeleteDagAssetQueuedEventsMutationResult, +export const useAssetServiceDeleteAssetQueuedEvents = < + TData = Common.AssetServiceDeleteAssetQueuedEventsMutationResult, TError = unknown, TContext = unknown, >( @@ -2638,7 +2639,7 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < TError, { before?: string; - dagId: string; + uri: string; }, TContext >, @@ -2650,29 +2651,27 @@ export const useAssetServiceDeleteDagAssetQueuedEvents = < TError, { before?: string; - dagId: string; + uri: string; }, TContext >({ - mutationFn: ({ before, dagId }) => - AssetService.deleteDagAssetQueuedEvents({ + mutationFn: ({ before, uri }) => + AssetService.deleteAssetQueuedEvents({ before, - dagId, + uri, }) as unknown as Promise, ...options, }); /** - * Delete Dag Asset Queued Event - * Delete a queued asset event for a DAG. + * Delete Dag Asset Queued Events * @param data The data for the request. * @param data.dagId - * @param data.uri * @param data.before * @returns void Successful Response * @throws ApiError */ -export const useAssetServiceDeleteDagAssetQueuedEvent = < - TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult, +export const useAssetServiceDeleteDagAssetQueuedEvents = < + TData = Common.AssetServiceDeleteDagAssetQueuedEventsMutationResult, TError = unknown, TContext = unknown, >( @@ -2683,7 +2682,6 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < { before?: string; dagId: string; - uri: string; }, TContext >, @@ -2696,29 +2694,28 @@ export const useAssetServiceDeleteDagAssetQueuedEvent = < { before?: string; dagId: string; - uri: string; }, TContext >({ - mutationFn: ({ before, dagId, uri }) => - AssetService.deleteDagAssetQueuedEvent({ + mutationFn: ({ before, dagId }) => + AssetService.deleteDagAssetQueuedEvents({ before, dagId, - uri, }) as unknown as Promise, ...options, }); /** - * Delete Asset Queued Events - * Delete queued asset events for an asset. + * Delete Dag Asset Queued Event + * Delete a queued asset event for a DAG. * @param data The data for the request. + * @param data.dagId * @param data.uri * @param data.before * @returns void Successful Response * @throws ApiError */ -export const useAssetServiceDeleteAssetQueuedEvents = < - TData = Common.AssetServiceDeleteAssetQueuedEventsMutationResult, +export const useAssetServiceDeleteDagAssetQueuedEvent = < + TData = Common.AssetServiceDeleteDagAssetQueuedEventMutationResult, TError = unknown, TContext = unknown, >( @@ -2728,6 +2725,7 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TError, { before?: string; + dagId: string; uri: string; }, TContext @@ -2740,13 +2738,15 @@ export const useAssetServiceDeleteAssetQueuedEvents = < TError, { before?: string; + dagId: string; uri: string; }, TContext >({ - mutationFn: ({ before, uri }) => - AssetService.deleteAssetQueuedEvents({ + mutationFn: ({ before, dagId, uri }) => + AssetService.deleteDagAssetQueuedEvent({ before, + dagId, uri, }) as unknown as Promise, ...options, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index a8e6bda9265c9..7034fd52bc4b0 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -169,6 +169,38 @@ export const useAssetServiceGetAssetEventsSuspense = < }) as TData, ...options, }); +/** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceGetAssetQueuedEventsSuspense = < + TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + before, + uri, + }: { + before?: string; + uri: string; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( + { before, uri }, + queryKey, + ), + queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, + ...options, + }); /** * Get Asset * Get an asset. @@ -264,38 +296,6 @@ export const useAssetServiceGetDagAssetQueuedEventSuspense = < AssetService.getDagAssetQueuedEvent({ before, dagId, uri }) as TData, ...options, }); -/** - * Get Asset Queued Events - * Get queued asset events for an asset. - * @param data The data for the request. - * @param data.uri - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ -export const useAssetServiceGetAssetQueuedEventsSuspense = < - TData = Common.AssetServiceGetAssetQueuedEventsDefaultResponse, - TError = unknown, - TQueryKey extends Array = unknown[], ->( - { - before, - uri, - }: { - before?: string; - uri: string; - }, - queryKey?: TQueryKey, - options?: Omit, "queryKey" | "queryFn">, -) => - useSuspenseQuery({ - queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn( - { before, uri }, - queryKey, - ), - queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }) as TData, - ...options, - }); /** * Historical Metrics * Return cluster activity historical metrics. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 165b7a9ffe3f2..7300afb74c073 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -11,6 +11,10 @@ import type { GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, + GetAssetQueuedEventsData, + GetAssetQueuedEventsResponse, + DeleteAssetQueuedEventsData, + DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, @@ -21,10 +25,6 @@ import type { GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, - DeleteAssetQueuedEventsData, - DeleteAssetQueuedEventsResponse, - GetAssetQueuedEventsData, - GetAssetQueuedEventsResponse, HistoricalMetricsData, HistoricalMetricsResponse, RecentDagRunsData, @@ -261,6 +261,66 @@ export class AssetService { }); } + /** + * Get Asset Queued Events + * Get queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns QueuedEventCollectionResponse Successful Response + * @throws ApiError + */ + public static getAssetQueuedEvents( + data: GetAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/assets/queuedEvent/{uri}", + path: { + uri: data.uri, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * Delete Asset Queued Events + * Delete queued asset events for an asset. + * @param data The data for the request. + * @param data.uri + * @param data.before + * @returns void Successful Response + * @throws ApiError + */ + public static deleteAssetQueuedEvents( + data: DeleteAssetQueuedEventsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "DELETE", + url: "/public/assets/queuedEvent/{uri}", + path: { + uri: data.uri, + }, + query: { + before: data.before, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Asset * Get an asset. @@ -411,66 +471,6 @@ export class AssetService { }, }); } - - /** - * Delete Asset Queued Events - * Delete queued asset events for an asset. - * @param data The data for the request. - * @param data.uri - * @param data.before - * @returns void Successful Response - * @throws ApiError - */ - public static deleteAssetQueuedEvents( - data: DeleteAssetQueuedEventsData, - ): CancelablePromise { - return __request(OpenAPI, { - method: "DELETE", - url: "/public/assets/queuedEvent/{uri}", - path: { - uri: data.uri, - }, - query: { - before: data.before, - }, - errors: { - 401: "Unauthorized", - 403: "Forbidden", - 404: "Not Found", - 422: "Validation Error", - }, - }); - } - - /** - * Get Asset Queued Events - * Get queued asset events for an asset. - * @param data The data for the request. - * @param data.uri - * @param data.before - * @returns QueuedEventCollectionResponse Successful Response - * @throws ApiError - */ - public static getAssetQueuedEvents( - data: GetAssetQueuedEventsData, - ): CancelablePromise { - return __request(OpenAPI, { - method: "GET", - url: "/public/assets/queuedEvent/{uri}", - path: { - uri: data.uri, - }, - query: { - before: data.before, - }, - errors: { - 401: "Unauthorized", - 403: "Forbidden", - 404: "Not Found", - 422: "Validation Error", - }, - }); - } } export class DashboardService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 893d3738244e5..163fc5598093c 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1056,6 +1056,20 @@ export type CreateAssetEventData = { export type CreateAssetEventResponse = AssetEventResponse; +export type GetAssetQueuedEventsData = { + before?: string | null; + uri: string; +}; + +export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; + +export type DeleteAssetQueuedEventsData = { + before?: string | null; + uri: string; +}; + +export type DeleteAssetQueuedEventsResponse = void; + export type GetAssetData = { uri: string; }; @@ -1092,20 +1106,6 @@ export type DeleteDagAssetQueuedEventData = { export type DeleteDagAssetQueuedEventResponse = void; -export type DeleteAssetQueuedEventsData = { - before?: string | null; - uri: string; -}; - -export type DeleteAssetQueuedEventsResponse = void; - -export type GetAssetQueuedEventsData = { - before?: string | null; - uri: string; -}; - -export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; - export type HistoricalMetricsData = { endDate: string; startDate: string; @@ -1675,14 +1675,14 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/{uri}": { + "/public/assets/queuedEvent/{uri}": { get: { - req: GetAssetData; + req: GetAssetQueuedEventsData; res: { /** * Successful Response */ - 200: AssetResponse; + 200: QueuedEventCollectionResponse; /** * Unauthorized */ @@ -1701,15 +1701,13 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - }; - "/public/dags/{dag_id}/assets/queuedEvent": { - get: { - req: GetDagAssetQueuedEventsData; + delete: { + req: DeleteAssetQueuedEventsData; res: { /** * Successful Response */ - 200: QueuedEventCollectionResponse; + 204: void; /** * Unauthorized */ @@ -1728,17 +1726,15 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - delete: { - req: DeleteDagAssetQueuedEventsData; + }; + "/public/assets/{uri}": { + get: { + req: GetAssetData; res: { /** * Successful Response */ - 204: void; - /** - * Bad Request - */ - 400: HTTPExceptionResponse; + 200: AssetResponse; /** * Unauthorized */ @@ -1758,14 +1754,14 @@ export type $OpenApiTs = { }; }; }; - "/public/dags/{dag_id}/assets/queuedEvent/{uri}": { + "/public/dags/{dag_id}/assets/queuedEvent": { get: { - req: GetDagAssetQueuedEventData; + req: GetDagAssetQueuedEventsData; res: { /** * Successful Response */ - 200: QueuedEventResponse; + 200: QueuedEventCollectionResponse; /** * Unauthorized */ @@ -1785,7 +1781,7 @@ export type $OpenApiTs = { }; }; delete: { - req: DeleteDagAssetQueuedEventData; + req: DeleteDagAssetQueuedEventsData; res: { /** * Successful Response @@ -1814,14 +1810,14 @@ export type $OpenApiTs = { }; }; }; - "/public/assets/queuedEvent/{uri}": { - delete: { - req: DeleteAssetQueuedEventsData; + "/public/dags/{dag_id}/assets/queuedEvent/{uri}": { + get: { + req: GetDagAssetQueuedEventData; res: { /** * Successful Response */ - 204: void; + 200: QueuedEventResponse; /** * Unauthorized */ @@ -1840,13 +1836,17 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; - get: { - req: GetAssetQueuedEventsData; + delete: { + req: DeleteDagAssetQueuedEventData; res: { /** * Successful Response */ - 200: QueuedEventCollectionResponse; + 204: void; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; /** * Unauthorized */