Skip to content

Commit

Permalink
AIP-84: Migrating GET queued asset events for assets to FASTAPI (#44139)
Browse files Browse the repository at this point in the history
* AIP84-get-assets-queued-events endpoint migration to fastapi

* Apply suggestions from code review

Co-authored-by: Pierre Jeambrun <[email protected]>

* updating typo in endpoint name

* address review comments

---------

Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
vatsrahul1001 and pierrejeambrun authored Nov 19, 2024
1 parent e8fe1bd commit 39042c8
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 137 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def delete_dag_asset_queued_events(
)


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
def get_asset_queued_events(
Expand Down
150 changes: 101 additions & 49 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,107 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
get:
tags:
- Asset
summary: Get Asset Queued Events
description: Get queued asset events for an asset.
operationId: get_asset_queued_events
parameters:
- name: uri
in: path
required: true
schema:
type: string
title: Uri
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/QueuedEventCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- Asset
summary: Delete Asset Queued Events
description: Delete queued asset events for an asset.
operationId: delete_asset_queued_events
parameters:
- name: uri
in: path
required: true
schema:
type: string
title: Uri
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'204':
description: Successful Response
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
Expand Down Expand Up @@ -665,55 +766,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/queuedEvent/{uri}:
delete:
tags:
- Asset
summary: Delete Asset Queued Events
description: Delete queued asset events for an asset.
operationId: delete_asset_queued_events
parameters:
- name: uri
in: path
required: true
schema:
type: string
title: Uri
- name: before
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Before
responses:
'204':
description: Successful Response
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/backfills/:
get:
tags:
Expand Down
42 changes: 42 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,48 @@ def create_asset_event(
return AssetEventResponse.model_validate(assets_event, from_attributes=True)


@assets_router.get(
"/assets/queuedEvent/{uri:path}",
responses=create_openapi_http_exception_doc(
[
status.HTTP_404_NOT_FOUND,
]
),
)
def get_asset_queued_events(
uri: str,
session: Annotated[Session, Depends(get_session)],
before: OptionalDateTimeQuery = None,
) -> QueuedEventCollectionResponse:
"""Get queued asset events for an asset."""
print(f"uri: {uri}")
where_clause = _generate_queued_event_where_clause(uri=uri, before=before)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)

dag_asset_queued_events_select, total_entries = paginated_select(
query,
[],
)
adrqs = session.execute(dag_asset_queued_events_select).all()

if not adrqs:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with uri: `{uri}` was not found")

queued_events = [
QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)
for adrq, uri in adrqs
]

return QueuedEventCollectionResponse(
queued_events=queued_events,
total_entries=total_entries,
)


@assets_router.get(
"/assets/{uri:path}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
28 changes: 25 additions & 3 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,28 @@ export const UseAssetServiceGetAssetEventsKeyFn = (
},
]),
];
export type AssetServiceGetAssetQueuedEventsDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAssetQueuedEvents>
>;
export type AssetServiceGetAssetQueuedEventsQueryResult<
TData = AssetServiceGetAssetQueuedEventsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useAssetServiceGetAssetQueuedEventsKey =
"AssetServiceGetAssetQueuedEvents";
export const UseAssetServiceGetAssetQueuedEventsKeyFn = (
{
before,
uri,
}: {
before?: string;
uri: string;
},
queryKey?: Array<unknown>,
) => [
useAssetServiceGetAssetQueuedEventsKey,
...(queryKey ?? [{ before, uri }]),
];
export type AssetServiceGetAssetDefaultResponse = Awaited<
ReturnType<typeof AssetService.getAsset>
>;
Expand Down Expand Up @@ -1211,15 +1233,15 @@ export type PoolServicePatchPoolMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteAssetQueuedEvents>
>;
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
>;
export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<
ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>
>;
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
ReturnType<typeof AssetService.deleteAssetQueuedEvents>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
Expand Down
23 changes: 23 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,29 @@ export const prefetchUseAssetServiceGetAssetEvents = (
sourceTaskId,
}),
});
/**
* Get Asset Queued Events
* Get queued asset events for an asset.
* @param data The data for the request.
* @param data.uri
* @param data.before
* @returns QueuedEventCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseAssetServiceGetAssetQueuedEvents = (
queryClient: QueryClient,
{
before,
uri,
}: {
before?: string;
uri: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseAssetServiceGetAssetQueuedEventsKeyFn({ before, uri }),
queryFn: () => AssetService.getAssetQueuedEvents({ before, uri }),
});
/**
* Get Asset
* Get an asset.
Expand Down
Loading

0 comments on commit 39042c8

Please sign in to comment.