Skip to content

Commit

Permalink
Rename Dataset database tables as Asset (#42023)
Browse files Browse the repository at this point in the history
* feat(models/asset): rename index idx_dataset_alias_dataset_event_alias_id as idx_asset_alias_asset_event_alias_id
* feat(models/asset): rename index idx_dataset_alias_dataset_event_event_id as idx_asset_alias_asset_event_event_id
* feat(models/asset): rename table dataset_alias_dataset_event as asset_alias_asset_event
* feat(models): rename dataset_alias as asset_alias
* rename table "dataset_alias" as "asset_alias"
* rename table "dataset_alias_dataset" as "asset_alias_dataset"
    * rename index "idx_dataset_alias_dataset_alias_id" as "idx_asset_alias_asset_alias_id"
    * rename index "idx_dataset_alias_dataset_alias_dataset_id" as "idx_asset_alias_asset_dataset_id"
    * rename fk constraint "ds_dsa_alias_id" as "a_aa_alias_id""
        * change reference column from "dataset_alias.id" to "asset_alias.id"
* rename table "dag_schedule_dataset_alias_reference" as "dag_schedule_asset_alias_reference"
    * rename column "dataset_alias" as "asset_alias"
    * rename fk constraint "dsdar_dataset_alias_fkey" as "dsaar_asset_alias_fkey"
        * change reference column from "dataset_alias.id" to "asset_alias.id"
    * rename index "idx_dag_schedule_dataset_alias_reference_dag_id" as "idx_dag_schedule_asset_alias_reference_dag_id"
* in table "asset_alias_asset_event"
    * change fk alias_id reference from "dataset_alias.id" to "asset_alias.id"
    * in fk constraint "dss_de_alias_id"
        * change reference column from "dataset_alias.id" to "asset_alias.id"
* feat(models/asset): rename table "dag_schedule_dataset_reference" as "dag_schedule_asset_reference"
* rename fk constraint "dsdr_dataset_fkey" as "dsar_dataset_fkey"
* rename fk constraint "dsdr_dag_id_fkey" as "dsar_dag_id_fkey"
* rename index "idx_dag_schedule_dataset_reference_dag_id" as "idx_dag_schedule_asset_reference_dag_id"
* feat(models): rename table "task_outlet_dataset_references" as "task_outlet_asset_references"
* rename fk constraint "todr_dataset_fkey" as "todr_asset_fkey"
* rename index "idx_task_outlet_dataset_reference_dag_id" as "todr_dataset_fkey"
* feat(models/asset): rename table "dagrun_dataset_event" as "dagrun_asset_event"
* rename index "idx_dagrun_dataset_events_dag_run_id" as "idx_dagrun_asset_events_dag_run_id"
* rename index "idx_dagrun_dataset_events_event_id" as "idx_dagrun_asset_events_event_id"
* feat(models/dag): rename column "schedule_dataset_references" as "schedule_asset_references" in table "dag"
* feat(models/asset): rename table "dataset_dag_run_queue" as "asset_dag_run_queue"
* rename index "idx_dataset_dag_run_queue_target_dag_id" as "idx_asset_dag_run_queue_target_dag_id"
* feat(models/dag): rename consumed_dataset_events as consumed_asset_events
* feat(models/asset): rename table "dataset_event" as "asset_event"
* feat(models/asset): rename index idx_dataset_id_timestamp as idx_asset_id_timestamp
* feat(models/asset): rename idx_asset_alias_asset_dataset_id as idx_asset_alias_asset_asset_id
* feat(models/asset): rename dataset as asset in fks
* feat(models/asset): rename asset_alias_dataset as asset_alias_asset
* feat(models/asset): rename index idx_dataset_alias_name_unique as idx_asset_alias_name_unique
* feat(models/asset): rename idx_dataset_name_uri_unique as idx_asset_name_uri_unique
* feat(models/asset): rename dsar_dataset_fkey as dsar_asset_fkey
* feat(models/asset): rename datasetdagrunqueue_pkey as assetdagrunqueue_pkey
* feat(models/asset): rename ddrq_dataset_fkey as ddrq_asset_fkey
* feat(models/asset): rename ddrq fks as adrq fks
* feat(models/asset): rename dsdr keys as dsar keys
* feat(models/asset): rename todr keys as toar keys
* feat(migrations): initial migration files
* feat(migrations): add utility functions _rename_index and _rename_fk_constraint
* feat(migrations): add _rename_pk_constraint utility function
* feat(migrations): update migration files to reflect current model change (except for fk reference table update)
* feat: rename dataset_id as asset_id
* feat(api_connexion): rename dataset_id as asset_id
* feat: rename dataset.uri as asset.uri
* feat: rename dataset_expression as asset_expression
* feat(listeners): rename argument dataset_alias as asset_alias
* feat(utils/types): rename DagRunType.DATASET_TRIGGERED as DagRunType.ASSET_TRIGGERED
* feat: rename dataset_triggered_dag_info as asset_triggered_dag_info
* feat: rename dataset as asset
* feat(models/asset): rename dss_de as aa_ae
* feat(models/asset): rename dsdar as dsaar
* feat(migrations): wrap up the upgrade part (except for fk upgrade)
* feat(migrations): rename migration file
* docs(newsfragment): add place holder newsfragment
* build: generate migration related files
* feat(migrations): wrap up upgrade script
* feat(migrations): wrap up downgrade script
* fix: fix missing frontend change
* fix(migrations): fix fk_constraint rename for association tables
* fix: fix missing dataset related changes due to rebasing
* feat(migrations): reorder migrations
* feat(migrations): fix error for sqlite
* feat(models/asset): remove redundant fk constraint aa_ae_alias_id and aa_ae_event_id
* feat(migrations): remove redundant fk constraint a_aa_asset_id and a_aa_alias_id
* build: generate db migration side files
* feat(migrations): add postgresql support
* feat(migrations): add mysql support for downgrade
* fix: rename reference_table as referent_table
* refactor: reorganize 0038 migration
* docs: improve endpoint asset_triggered change description
* docs: add description to endpoint renaming and note it in newsfragments
  • Loading branch information
Lee-W authored Oct 21, 2024
1 parent a7b1aa4 commit dc4def7
Show file tree
Hide file tree
Showing 79 changed files with 3,176 additions and 2,344 deletions.
10 changes: 5 additions & 5 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def get_asset_events(
query = select(AssetEvent)

if asset_id:
query = query.where(AssetEvent.dataset_id == asset_id)
query = query.where(AssetEvent.asset_id == asset_id)
if source_dag_id:
query = query.where(AssetEvent.source_dag_id == source_dag_id)
if source_task_id:
Expand Down Expand Up @@ -166,7 +166,7 @@ def _generate_queued_event_where_clause(
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.dataset_id.in_(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
Expand All @@ -187,7 +187,7 @@ def get_dag_asset_queued_event(
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before)
adrq = session.scalar(
select(AssetDagRunQueue)
.join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)
if adrq is None:
Expand Down Expand Up @@ -228,7 +228,7 @@ def get_dag_asset_queued_events(
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)
result = session.execute(query).all()
Expand Down Expand Up @@ -278,7 +278,7 @@ def get_asset_queued_events(
)
query = (
select(AssetDagRunQueue, AssetModel.uri)
.join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id)
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
.where(*where_clause)
)
total_entries = get_query_count(query, session=session)
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_upstream_asset_events(*, dag_id: str, dag_run_id: str, session: Session
"DAGRun not found",
detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
)
events = dag_run.consumed_dataset_events
events = dag_run.consumed_asset_events
return asset_event_collection_schema.dump(
AssetEventCollection(asset_events=events, total_entries=len(events))
)
Expand Down
76 changes: 63 additions & 13 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,7 @@ paths:
Get asset for a dag run.
*New in version 2.4.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents"
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
operationId: get_upstream_asset_events
tags: [DAGRun, Asset]
Expand Down Expand Up @@ -1211,6 +1212,7 @@ paths:
Get a queued asset event for a DAG.
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent/{uri}"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_dag_asset_queued_event
parameters:
Expand All @@ -1236,6 +1238,7 @@ paths:
Delete a queued Asset event for a DAG.
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent/{uri}"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: delete_dag_asset_queued_event
parameters:
Expand Down Expand Up @@ -1263,6 +1266,7 @@ paths:
Get queued Asset events for a DAG.
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_dag_asset_queued_events
parameters:
Expand All @@ -1288,6 +1292,7 @@ paths:
Delete queued Asset events for a DAG.
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: delete_dag_asset_queued_events
parameters:
Expand Down Expand Up @@ -1336,6 +1341,7 @@ paths:
Get queued Asset events for an Asset
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/assets/queuedEvent/{uri}"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_asset_queued_events
parameters:
Expand All @@ -1361,6 +1367,7 @@ paths:
Delete queued Asset events for a Asset.
*New in version 2.9.0*
*Changed in 3.0.0*: The endpoint value was renamed from "/assets/queuedEvent/{uri}"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: delete_asset_queued_events
parameters:
Expand Down Expand Up @@ -2480,6 +2487,8 @@ paths:
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_assets
tags: [Asset]
description: |
*Changed in 3.0.0*: The endpoint value was renamed from "/datasets"
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
Expand Down Expand Up @@ -2517,7 +2526,10 @@ paths:
- $ref: "#/components/parameters/AssetURI"
get:
summary: Get an asset
description: Get an asset by uri.
description: |
Get an asset by uri.
*Changed in 3.0.0*: The endpoint value was renamed from "/datasets/{uri}"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_asset
tags: [Asset]
Expand All @@ -2538,7 +2550,10 @@ paths:
/assets/events:
get:
summary: Get asset events
description: Get asset events
description: |
Get asset events
*Changed in 3.0.0*: The endpoint value was renamed from "/datasets/events"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: get_asset_events
tags: [Asset]
Expand Down Expand Up @@ -2566,7 +2581,10 @@ paths:
$ref: "#/components/responses/NotFound"
post:
summary: Create asset event
description: Create asset event
description: |
Create asset event
*Changed in 3.0.0*: The endpoint value was renamed from "/datasets/events"
x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint
operationId: create_asset_event
tags: [Asset]
Expand Down Expand Up @@ -3299,7 +3317,9 @@ components:
- backfill
- manual
- scheduled
- dataset_triggered
- asset_triggered
description: |
*Changed in 3.0.0*: The asset_triggered value was renamed from dataset_triggered.
state:
$ref: "#/components/schemas/DagState"
external_trigger:
Expand Down Expand Up @@ -4088,9 +4108,12 @@ components:
dag_run_timeout:
$ref: "#/components/schemas/TimeDelta"
nullable: true
dataset_expression:
asset_expression:
type: object
description: Nested asset any/all conditions
description: |
Nested asset any/all conditions
*Changed in 3.0.0*: The asset_expression value was renamed from dataset_expression.
nullable: true
doc_md:
type: string
Expand Down Expand Up @@ -4475,6 +4498,7 @@ components:
An asset item.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from Dataset.
type: object
properties:
id:
Expand Down Expand Up @@ -4510,6 +4534,7 @@ components:
An asset reference to an upstream task.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from TaskOutletDatasetReference.
type: object
properties:
dag_id:
Expand All @@ -4534,6 +4559,7 @@ components:
An asset reference to a downstream DAG.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from DagScheduleDatasetReference.
type: object
properties:
dag_id:
Expand All @@ -4554,6 +4580,7 @@ components:
A collection of assets.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from DatasetCollection.
type: object
allOf:
- type: object
Expand All @@ -4562,21 +4589,30 @@ components:
type: array
items:
$ref: "#/components/schemas/Asset"
description: |
*Changed in 3.0.0*: This was renamed from datasets.
- $ref: "#/components/schemas/CollectionInfo"

AssetEvent:
description: |
An asset event.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from DatasetEvent.
type: object
properties:
dataset_id:
asset_id:
type: integer
description: The asset id
dataset_uri:
description: |
The asset id
*Changed in 3.0.0*: This was renamed from dataset_id.
asset_uri:
type: string
description: The URI of the asset
description: |
The URI of the asset
*Changed in 3.0.0*: This was renamed from dataset_uri.
nullable: false
extra:
type: object
Expand Down Expand Up @@ -4611,10 +4647,15 @@ components:
type: object
required:
- asset_uri
description: |
*Changed in 3.0.0*: This was renamed from CreateDatasetEvent.
properties:
asset_uri:
type: string
description: The URI of the asset
description: |
The URI of the asset
*Changed in 3.0.0*: This was renamed from dataset_uri.
nullable: false
extra:
type: object
Expand Down Expand Up @@ -4705,12 +4746,15 @@ components:
A collection of asset events.
*New in version 2.4.0*
*Changed in 3.0.0*: This was renamed from DatasetEventCollection.
type: object
allOf:
- type: object
properties:
asset_events:
type: array
description: |
*Changed in 3.0.0*: This was renamed from dataset_events.
items:
$ref: "#/components/schemas/AssetEvent"
- $ref: "#/components/schemas/CollectionInfo"
Expand Down Expand Up @@ -5515,7 +5559,10 @@ components:
type: string
format: path
required: true
description: The encoded Asset URI
description: |
The encoded Asset URI
*Changed in 3.0.0*: This was renamed from DatasetURI.
PoolName:
in: path
Expand Down Expand Up @@ -5701,7 +5748,10 @@ components:
name: asset_id
schema:
type: integer
description: The Asset ID that updated the asset.
description: |
The Asset ID that updated the asset.
*Changed in 3.0.0*: This was renamed from FilterDatasetID.
FilterSourceDAGID:
in: query
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/asset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class Meta:
model = AssetEvent

id = auto_field()
dataset_id = auto_field()
dataset_uri = fields.String(attribute="dataset.uri", dump_only=True)
asset_id = auto_field()
asset_uri = fields.String(attribute="asset.uri", dump_only=True)
extra = JsonObjectField()
source_task_id = auto_field()
source_dag_id = auto_field()
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DAGDetailSchema(DAGSchema):
catchup = fields.Boolean(dump_only=True)
orientation = fields.String(dump_only=True)
max_active_tasks = fields.Integer(dump_only=True)
dataset_expression = fields.Dict(allow_none=True)
asset_expression = fields.Dict(allow_none=True)
start_date = fields.DateTime(dump_only=True)
dag_run_timeout = fields.Nested(TimeDeltaSchema, attribute="dagrun_timeout", dump_only=True)
doc_md = fields.String(dump_only=True)
Expand Down
14 changes: 7 additions & 7 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1377,11 +1377,11 @@ components:
format: duration
- type: 'null'
title: Dag Run Timeout
dataset_expression:
asset_expression:
anyOf:
- type: object
- type: 'null'
title: Dataset Expression
title: Asset Expression
doc_md:
anyOf:
- type: string
Expand Down Expand Up @@ -1472,7 +1472,7 @@ components:
- owners
- catchup
- dag_run_timeout
- dataset_expression
- asset_expression
- doc_md
- start_date
- end_date
Expand Down Expand Up @@ -1764,15 +1764,15 @@ components:
manual:
type: integer
title: Manual
dataset_triggered:
asset_triggered:
type: integer
title: Dataset Triggered
title: Asset Triggered
type: object
required:
- backfill
- scheduled
- manual
- dataset_triggered
- asset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DAGTagCollectionResponse:
Expand Down Expand Up @@ -1844,7 +1844,7 @@ components:
- backfill
- scheduled
- manual
- dataset_triggered
- asset_triggered
title: DagRunType
description: Class with DagRun types.
DagTagPydantic:
Expand Down
Loading

0 comments on commit dc4def7

Please sign in to comment.