diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index cbbe542ea7987..1ea1db2b3bbb8 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -133,7 +133,7 @@ def get_asset_events( query = select(AssetEvent) if asset_id: - query = query.where(AssetEvent.dataset_id == asset_id) + query = query.where(AssetEvent.asset_id == asset_id) if source_dag_id: query = query.where(AssetEvent.source_dag_id == source_dag_id) if source_task_id: @@ -166,7 +166,7 @@ def _generate_queued_event_where_clause( where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) if uri is not None: where_clause.append( - AssetDagRunQueue.dataset_id.in_( + AssetDagRunQueue.asset_id.in_( select(AssetModel.id).where(AssetModel.uri == uri), ), ) @@ -187,7 +187,7 @@ def get_dag_asset_queued_event( where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before) adrq = session.scalar( select(AssetDagRunQueue) - .join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) .where(*where_clause) ) if adrq is None: @@ -228,7 +228,7 @@ def get_dag_asset_queued_events( where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before) query = ( select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) .where(*where_clause) ) result = session.execute(query).all() @@ -278,7 +278,7 @@ def get_asset_queued_events( ) query = ( select(AssetDagRunQueue, AssetModel.uri) - .join(AssetModel, AssetDagRunQueue.dataset_id == AssetModel.id) + .join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id) .where(*where_clause) ) total_entries = get_query_count(query, session=session) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 74eae13ddd4d0..e0d83575611b0 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -130,7 +130,7 @@ def get_upstream_asset_events(*, dag_id: str, dag_run_id: str, session: Session "DAGRun not found", detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found", ) - events = dag_run.consumed_dataset_events + events = dag_run.consumed_asset_events return asset_event_collection_schema.dump( AssetEventCollection(asset_events=events, total_entries=len(events)) ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index e99f91639c49e..af38326489a4a 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1146,6 +1146,7 @@ paths: Get asset for a dag run. *New in version 2.4.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents" x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint operationId: get_upstream_asset_events tags: [DAGRun, Asset] @@ -1211,6 +1212,7 @@ paths: Get a queued asset event for a DAG. *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent/{uri}" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_dag_asset_queued_event parameters: @@ -1236,6 +1238,7 @@ paths: Delete a queued Asset event for a DAG. *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent/{uri}" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: delete_dag_asset_queued_event parameters: @@ -1263,6 +1266,7 @@ paths: Get queued Asset events for a DAG. *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_dag_asset_queued_events parameters: @@ -1288,6 +1292,7 @@ paths: Delete queued Asset events for a DAG. *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/dags/{dag_id}/datasets/queuedEvent" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: delete_dag_asset_queued_events parameters: @@ -1336,6 +1341,7 @@ paths: Get queued Asset events for an Asset *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/assets/queuedEvent/{uri}" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_asset_queued_events parameters: @@ -1361,6 +1367,7 @@ paths: Delete queued Asset events for a Asset. *New in version 2.9.0* + *Changed in 3.0.0*: The endpoint value was renamed from "/assets/queuedEvent/{uri}" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: delete_asset_queued_events parameters: @@ -2480,6 +2487,8 @@ paths: x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_assets tags: [Asset] + description: | + *Changed in 3.0.0*: The endpoint value was renamed from "/datasets" parameters: - $ref: "#/components/parameters/PageLimit" - $ref: "#/components/parameters/PageOffset" @@ -2517,7 +2526,10 @@ paths: - $ref: "#/components/parameters/AssetURI" get: summary: Get an asset - description: Get an asset by uri. + description: | + Get an asset by uri. + + *Changed in 3.0.0*: The endpoint value was renamed from "/datasets/{uri}" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_asset tags: [Asset] @@ -2538,7 +2550,10 @@ paths: /assets/events: get: summary: Get asset events - description: Get asset events + description: | + Get asset events + + *Changed in 3.0.0*: The endpoint value was renamed from "/datasets/events" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: get_asset_events tags: [Asset] @@ -2566,7 +2581,10 @@ paths: $ref: "#/components/responses/NotFound" post: summary: Create asset event - description: Create asset event + description: | + Create asset event + + *Changed in 3.0.0*: The endpoint value was renamed from "/datasets/events" x-openapi-router-controller: airflow.api_connexion.endpoints.asset_endpoint operationId: create_asset_event tags: [Asset] @@ -3299,7 +3317,9 @@ components: - backfill - manual - scheduled - - dataset_triggered + - asset_triggered + description: | + *Changed in 3.0.0*: The asset_triggered value was renamed from dataset_triggered. state: $ref: "#/components/schemas/DagState" external_trigger: @@ -4088,9 +4108,12 @@ components: dag_run_timeout: $ref: "#/components/schemas/TimeDelta" nullable: true - dataset_expression: + asset_expression: type: object - description: Nested asset any/all conditions + description: | + Nested asset any/all conditions + + *Changed in 3.0.0*: The asset_expression value was renamed from dataset_expression. nullable: true doc_md: type: string @@ -4475,6 +4498,7 @@ components: An asset item. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from Dataset. type: object properties: id: @@ -4510,6 +4534,7 @@ components: An asset reference to an upstream task. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from TaskOutletDatasetReference. type: object properties: dag_id: @@ -4534,6 +4559,7 @@ components: An asset reference to a downstream DAG. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from DagScheduleDatasetReference. type: object properties: dag_id: @@ -4554,6 +4580,7 @@ components: A collection of assets. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from DatasetCollection. type: object allOf: - type: object @@ -4562,6 +4589,8 @@ components: type: array items: $ref: "#/components/schemas/Asset" + description: | + *Changed in 3.0.0*: This was renamed from datasets. - $ref: "#/components/schemas/CollectionInfo" AssetEvent: @@ -4569,14 +4598,21 @@ components: An asset event. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from DatasetEvent. type: object properties: - dataset_id: + asset_id: type: integer - description: The asset id - dataset_uri: + description: | + The asset id + + *Changed in 3.0.0*: This was renamed from dataset_id. + asset_uri: type: string - description: The URI of the asset + description: | + The URI of the asset + + *Changed in 3.0.0*: This was renamed from dataset_uri. nullable: false extra: type: object @@ -4611,10 +4647,15 @@ components: type: object required: - asset_uri + description: | + *Changed in 3.0.0*: This was renamed from CreateDatasetEvent. properties: asset_uri: type: string - description: The URI of the asset + description: | + The URI of the asset + + *Changed in 3.0.0*: This was renamed from dataset_uri. nullable: false extra: type: object @@ -4705,12 +4746,15 @@ components: A collection of asset events. *New in version 2.4.0* + *Changed in 3.0.0*: This was renamed from DatasetEventCollection. type: object allOf: - type: object properties: asset_events: type: array + description: | + *Changed in 3.0.0*: This was renamed from dataset_events. items: $ref: "#/components/schemas/AssetEvent" - $ref: "#/components/schemas/CollectionInfo" @@ -5515,7 +5559,10 @@ components: type: string format: path required: true - description: The encoded Asset URI + description: | + The encoded Asset URI + + *Changed in 3.0.0*: This was renamed from DatasetURI. PoolName: in: path @@ -5701,7 +5748,10 @@ components: name: asset_id schema: type: integer - description: The Asset ID that updated the asset. + description: | + The Asset ID that updated the asset. + + *Changed in 3.0.0*: This was renamed from FilterDatasetID. FilterSourceDAGID: in: query diff --git a/airflow/api_connexion/schemas/asset_schema.py b/airflow/api_connexion/schemas/asset_schema.py index 662f73a50d8b9..7f84b799d1a77 100644 --- a/airflow/api_connexion/schemas/asset_schema.py +++ b/airflow/api_connexion/schemas/asset_schema.py @@ -136,8 +136,8 @@ class Meta: model = AssetEvent id = auto_field() - dataset_id = auto_field() - dataset_uri = fields.String(attribute="dataset.uri", dump_only=True) + asset_id = auto_field() + asset_uri = fields.String(attribute="asset.uri", dump_only=True) extra = JsonObjectField() source_task_id = auto_field() source_dag_id = auto_field() diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index 6c7ff6fdc30e0..3ca650ac9a5e9 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -98,7 +98,7 @@ class DAGDetailSchema(DAGSchema): catchup = fields.Boolean(dump_only=True) orientation = fields.String(dump_only=True) max_active_tasks = fields.Integer(dump_only=True) - dataset_expression = fields.Dict(allow_none=True) + asset_expression = fields.Dict(allow_none=True) start_date = fields.DateTime(dump_only=True) dag_run_timeout = fields.Nested(TimeDeltaSchema, attribute="dagrun_timeout", dump_only=True) doc_md = fields.String(dump_only=True) diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 325a6354de2b2..ebe209334d5ce 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1377,11 +1377,11 @@ components: format: duration - type: 'null' title: Dag Run Timeout - dataset_expression: + asset_expression: anyOf: - type: object - type: 'null' - title: Dataset Expression + title: Asset Expression doc_md: anyOf: - type: string @@ -1472,7 +1472,7 @@ components: - owners - catchup - dag_run_timeout - - dataset_expression + - asset_expression - doc_md - start_date - end_date @@ -1764,15 +1764,15 @@ components: manual: type: integer title: Manual - dataset_triggered: + asset_triggered: type: integer - title: Dataset Triggered + title: Asset Triggered type: object required: - backfill - scheduled - manual - - dataset_triggered + - asset_triggered title: DAGRunTypes description: DAG Run Types for responses. DAGTagCollectionResponse: @@ -1844,7 +1844,7 @@ components: - backfill - scheduled - manual - - dataset_triggered + - asset_triggered title: DagRunType description: Class with DagRun types. DagTagPydantic: diff --git a/airflow/api_fastapi/core_api/routes/ui/assets.py b/airflow/api_fastapi/core_api/routes/ui/assets.py index b8a17c7398424..6786bc30ae680 100644 --- a/airflow/api_fastapi/core_api/routes/ui/assets.py +++ b/airflow/api_fastapi/core_api/routes/ui/assets.py @@ -56,11 +56,11 @@ async def next_run_assets( AssetModel.uri, func.max(AssetEvent.timestamp).label("lastUpdate"), ) - .join(DagScheduleAssetReference, DagScheduleAssetReference.dataset_id == AssetModel.id) + .join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == AssetModel.id) .join( AssetDagRunQueue, and_( - AssetDagRunQueue.dataset_id == AssetModel.id, + AssetDagRunQueue.asset_id == AssetModel.id, AssetDagRunQueue.target_dag_id == DagScheduleAssetReference.dag_id, ), isouter=True, @@ -68,7 +68,7 @@ async def next_run_assets( .join( AssetEvent, and_( - AssetEvent.dataset_id == AssetModel.id, + AssetEvent.asset_id == AssetModel.id, ( AssetEvent.timestamp >= latest_run.execution_date if latest_run and latest_run.execution_date @@ -82,5 +82,5 @@ async def next_run_assets( .order_by(AssetModel.uri) ) ] - data = {"dataset_expression": dag_model.dataset_expression, "events": events} + data = {"asset_expression": dag_model.asset_expression, "events": events} return data diff --git a/airflow/api_fastapi/core_api/serializers/dags.py b/airflow/api_fastapi/core_api/serializers/dags.py index 39e85ea8c6f0e..a3e11de36039b 100644 --- a/airflow/api_fastapi/core_api/serializers/dags.py +++ b/airflow/api_fastapi/core_api/serializers/dags.py @@ -112,7 +112,7 @@ class DAGDetailsResponse(DAGResponse): catchup: bool dag_run_timeout: timedelta | None - dataset_expression: dict | None + asset_expression: dict | None doc_md: str | None start_date: datetime | None end_date: datetime | None diff --git a/airflow/api_fastapi/core_api/serializers/dashboard.py b/airflow/api_fastapi/core_api/serializers/dashboard.py index ee31a812945ec..66adc8ed3df92 100644 --- a/airflow/api_fastapi/core_api/serializers/dashboard.py +++ b/airflow/api_fastapi/core_api/serializers/dashboard.py @@ -25,7 +25,7 @@ class DAGRunTypes(BaseModel): backfill: int scheduled: int manual: int - dataset_triggered: int + asset_triggered: int class DAGRunStates(BaseModel): diff --git a/airflow/assets/__init__.py b/airflow/assets/__init__.py index 4dbc35fb95397..dcc5484656eb7 100644 --- a/airflow/assets/__init__.py +++ b/airflow/assets/__init__.py @@ -169,7 +169,7 @@ def expand_alias_to_assets(alias: str | AssetAlias, *, session: Session = NEW_SE select(AssetAliasModel).where(AssetAliasModel.name == alias_name).limit(1) ) if asset_alias_obj: - return [asset.to_public() for asset in asset_alias_obj.datasets] + return [asset.to_public() for asset in asset_alias_obj.assets] return [] diff --git a/airflow/assets/manager.py b/airflow/assets/manager.py index cd4d72e633a8e..a06c7c31786f5 100644 --- a/airflow/assets/manager.py +++ b/airflow/assets/manager.py @@ -138,7 +138,7 @@ def register_asset_change( cls._add_asset_alias_association({alias.name for alias in aliases}, asset_model, session=session) event_kwargs = { - "dataset_id": asset_model.id, + "asset_id": asset_model.id, "extra": extra, } if task_instance: @@ -167,7 +167,7 @@ def register_asset_change( ).unique() for asset_alias_model in asset_alias_models: - asset_alias_model.dataset_events.append(asset_event) + asset_alias_model.asset_events.append(asset_event) session.add(asset_alias_model) dags_to_queue_from_asset_alias |= { @@ -224,7 +224,7 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue: set[DagModel], session: Se @classmethod def _slow_path_queue_dagruns(cls, asset_id: int, dags_to_queue: set[DagModel], session: Session) -> None: def _queue_dagrun_if_needed(dag: DagModel) -> str | None: - item = AssetDagRunQueue(target_dag_id=dag.dag_id, dataset_id=asset_id) + item = AssetDagRunQueue(target_dag_id=dag.dag_id, asset_id=asset_id) # Don't error whole transaction when a single RunQueue item conflicts. # https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint try: @@ -243,7 +243,7 @@ def _postgres_queue_dagruns(cls, asset_id: int, dags_to_queue: set[DagModel], se from sqlalchemy.dialects.postgresql import insert values = [{"target_dag_id": dag.dag_id} for dag in dags_to_queue] - stmt = insert(AssetDagRunQueue).values(dataset_id=asset_id).on_conflict_do_nothing() + stmt = insert(AssetDagRunQueue).values(asset_id=asset_id).on_conflict_do_nothing() session.execute(stmt, values) @classmethod diff --git a/airflow/dag_processing/collection.py b/airflow/dag_processing/collection.py index 068a3727d04c0..034c9c0540125 100644 --- a/airflow/dag_processing/collection.py +++ b/airflow/dag_processing/collection.py @@ -67,9 +67,9 @@ def _find_orm_dags(dag_ids: Iterable[str], *, session: Session) -> dict[str, Dag select(DagModel) .options(joinedload(DagModel.tags, innerjoin=False)) .where(DagModel.dag_id.in_(dag_ids)) - .options(joinedload(DagModel.schedule_dataset_references)) - .options(joinedload(DagModel.schedule_dataset_alias_references)) - .options(joinedload(DagModel.task_outlet_dataset_references)) + .options(joinedload(DagModel.schedule_asset_references)) + .options(joinedload(DagModel.schedule_asset_alias_references)) + .options(joinedload(DagModel.task_outlet_asset_references)) ) stmt = with_row_locks(stmt, of=DagModel, session=session) return {dm.dag_id: dm for dm in session.scalars(stmt).unique()} @@ -223,7 +223,7 @@ def update_dags( ) dm.timetable_summary = dag.timetable.summary dm.timetable_description = dag.timetable.description - dm.dataset_expression = dag.timetable.asset_condition.as_expression() + dm.asset_expression = dag.timetable.asset_condition.as_expression() dm.processor_subdir = processor_subdir last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id) @@ -237,8 +237,8 @@ def update_dags( dm.calculate_dagrun_date_fields(dag, last_automated_data_interval) if not dag.timetable.asset_condition: - dm.schedule_dataset_references = [] - dm.schedule_dataset_alias_references = [] + dm.schedule_asset_references = [] + dm.schedule_asset_alias_references = [] # FIXME: STORE NEW REFERENCES. if dag.tags: @@ -367,15 +367,15 @@ def add_dag_asset_references( for dag_id, references in self.schedule_asset_references.items(): # Optimization: no references at all; this is faster than repeated delete(). if not references: - dags[dag_id].schedule_dataset_references = [] + dags[dag_id].schedule_asset_references = [] continue referenced_asset_ids = {asset.id for asset in (assets[r.uri] for r in references)} - orm_refs = {r.dataset_id: r for r in dags[dag_id].schedule_dataset_references} + orm_refs = {r.asset_id: r for r in dags[dag_id].schedule_asset_references} for asset_id, ref in orm_refs.items(): if asset_id not in referenced_asset_ids: session.delete(ref) session.bulk_save_objects( - DagScheduleAssetReference(dataset_id=asset_id, dag_id=dag_id) + DagScheduleAssetReference(asset_id=asset_id, dag_id=dag_id) for asset_id in referenced_asset_ids if asset_id not in orm_refs ) @@ -393,10 +393,10 @@ def add_dag_asset_alias_references( for dag_id, references in self.schedule_asset_alias_references.items(): # Optimization: no references at all; this is faster than repeated delete(). if not references: - dags[dag_id].schedule_dataset_alias_references = [] + dags[dag_id].schedule_asset_alias_references = [] continue referenced_alias_ids = {alias.id for alias in (aliases[r.name] for r in references)} - orm_refs = {a.alias_id: a for a in dags[dag_id].schedule_dataset_alias_references} + orm_refs = {a.alias_id: a for a in dags[dag_id].schedule_asset_alias_references} for alias_id, ref in orm_refs.items(): if alias_id not in referenced_alias_ids: session.delete(ref) @@ -419,18 +419,18 @@ def add_task_asset_references( for dag_id, references in self.outlet_references.items(): # Optimization: no references at all; this is faster than repeated delete(). if not references: - dags[dag_id].task_outlet_dataset_references = [] + dags[dag_id].task_outlet_asset_references = [] continue referenced_outlets = { (task_id, asset.id) for task_id, asset in ((task_id, assets[d.uri]) for task_id, d in references) } - orm_refs = {(r.task_id, r.dataset_id): r for r in dags[dag_id].task_outlet_dataset_references} + orm_refs = {(r.task_id, r.asset_id): r for r in dags[dag_id].task_outlet_asset_references} for key, ref in orm_refs.items(): if key not in referenced_outlets: session.delete(ref) session.bulk_save_objects( - TaskOutletAssetReference(dataset_id=asset_id, dag_id=dag_id, task_id=task_id) + TaskOutletAssetReference(asset_id=asset_id, dag_id=dag_id, task_id=task_id) for task_id, asset_id in referenced_outlets if (task_id, asset_id) not in orm_refs ) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index a052bf700db7d..233b687d1069f 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1272,15 +1272,15 @@ def _do_scheduling(self, session: Session) -> int: @retry_db_transaction def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Session) -> None: """Find Dag Models needing DagRuns and Create Dag Runs with retries in case of OperationalError.""" - query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + query, asset_triggered_dag_info = DagModel.dags_needing_dagruns(session) all_dags_needing_dag_runs = set(query.all()) - dataset_triggered_dags = [ - dag for dag in all_dags_needing_dag_runs if dag.dag_id in dataset_triggered_dag_info + asset_triggered_dags = [ + dag for dag in all_dags_needing_dag_runs if dag.dag_id in asset_triggered_dag_info ] - non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags) - self._create_dag_runs(non_dataset_dags, session) - if dataset_triggered_dags: - self._create_dag_runs_asset_triggered(dataset_triggered_dags, dataset_triggered_dag_info, session) + non_asset_dags = all_dags_needing_dag_runs.difference(asset_triggered_dags) + self._create_dag_runs(non_asset_dags, session) + if asset_triggered_dags: + self._create_dag_runs_asset_triggered(asset_triggered_dags, asset_triggered_dag_info, session) # commit the session - Release the write lock on DagModel table. guard.commit() @@ -1391,7 +1391,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - def _create_dag_runs_asset_triggered( self, dag_models: Collection[DagModel], - dataset_triggered_dag_info: dict[str, tuple[datetime, datetime]], + asset_triggered_dag_info: dict[str, tuple[datetime, datetime]], session: Session, ) -> None: """For DAGs that are triggered by assets, create dag runs.""" @@ -1401,7 +1401,7 @@ def _create_dag_runs_asset_triggered( # duplicate dag runs exec_dates = { dag_id: timezone.coerce_datetime(last_time) - for dag_id, (_, last_time) in dataset_triggered_dag_info.items() + for dag_id, (_, last_time) in asset_triggered_dag_info.items() } existing_dagruns: set[tuple[str, timezone.DateTime]] = set( session.execute( @@ -1441,7 +1441,7 @@ def _create_dag_runs_asset_triggered( .where( DagRun.dag_id == dag.dag_id, DagRun.execution_date < exec_date, - DagRun.run_type == DagRunType.DATASET_TRIGGERED, + DagRun.run_type == DagRunType.ASSET_TRIGGERED, ) .order_by(DagRun.execution_date.desc()) .limit(1) @@ -1457,14 +1457,14 @@ def _create_dag_runs_asset_triggered( select(AssetEvent) .join( DagScheduleAssetReference, - AssetEvent.dataset_id == DagScheduleAssetReference.dataset_id, + AssetEvent.asset_id == DagScheduleAssetReference.asset_id, ) .where(*asset_event_filters) ).all() data_interval = dag.timetable.data_interval_for_events(exec_date, asset_events) run_id = dag.timetable.generate_run_id( - run_type=DagRunType.DATASET_TRIGGERED, + run_type=DagRunType.ASSET_TRIGGERED, logical_date=exec_date, data_interval=data_interval, session=session, @@ -1473,7 +1473,7 @@ def _create_dag_runs_asset_triggered( dag_run = dag.create_dagrun( run_id=run_id, - run_type=DagRunType.DATASET_TRIGGERED, + run_type=DagRunType.ASSET_TRIGGERED, execution_date=exec_date, data_interval=data_interval, state=DagRunState.QUEUED, @@ -1484,7 +1484,7 @@ def _create_dag_runs_asset_triggered( triggered_by=DagRunTriggeredByType.DATASET, ) Stats.incr("asset.triggered_dagruns") - dag_run.consumed_dataset_events.extend(asset_events) + dag_run.consumed_asset_events.extend(asset_events) session.execute( delete(AssetDagRunQueue).where(AssetDagRunQueue.target_dag_id == dag_run.dag_id) ) diff --git a/airflow/listeners/spec/asset.py b/airflow/listeners/spec/asset.py index 78b14c8b10aeb..dba9ac700e415 100644 --- a/airflow/listeners/spec/asset.py +++ b/airflow/listeners/spec/asset.py @@ -33,8 +33,8 @@ def on_asset_created(asset: Asset): @hookspec -def on_asset_alias_created(dataset_alias: AssetAlias): - """Execute when a new dataset alias is created.""" +def on_asset_alias_created(asset_alias: AssetAlias): + """Execute when a new asset alias is created.""" @hookspec diff --git a/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py b/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py new file mode 100644 index 0000000000000..70261b75c7b56 --- /dev/null +++ b/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py @@ -0,0 +1,693 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Rename dataset as asset. + +Revision ID: 05234396c6fc +Revises: 3a8972ecb8f9 +Create Date: 2024-10-02 08:10:01.697128 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +import sqlalchemy_jsonfield +from alembic import op + +from airflow.settings import json + +# revision identifiers, used by Alembic. +revision = "05234396c6fc" +down_revision = "3a8972ecb8f9" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + +if TYPE_CHECKING: + from alembic.operations.base import BatchOperations + from sqlalchemy.sql.elements import conv + + +def _rename_index( + *, batch_op: BatchOperations, original_name: str, new_name: str, columns: list[str], unique: bool +) -> None: + batch_op.drop_index(original_name) + batch_op.create_index(new_name, columns, unique=unique) + + +def _rename_fk_constraint( + *, + batch_op: BatchOperations, + original_name: str | conv, + new_name: str | conv, + referent_table: str, + local_cols: list[str], + remote_cols: list[str], + ondelete: str, +) -> None: + batch_op.drop_constraint(original_name, type_="foreignkey") + batch_op.create_foreign_key( + constraint_name=new_name, + referent_table=referent_table, + local_cols=local_cols, + remote_cols=remote_cols, + ondelete=ondelete, + ) + + +def _rename_pk_constraint( + *, batch_op: BatchOperations, original_name: str, new_name: str, columns: list[str] +) -> None: + if batch_op.get_bind().dialect.name in ("postgresql", "mysql"): + batch_op.drop_constraint(original_name, type_="primary") + batch_op.create_primary_key(constraint_name=new_name, columns=columns) + + +# original table name to new table name +table_name_mappings = ( + ("dataset_alias_dataset", "asset_alias_asset"), + ("dataset_alias_dataset_event", "asset_alias_asset_event"), + ("dataset_alias", "asset_alias"), + ("dataset", "asset"), + ("dag_schedule_dataset_alias_reference", "dag_schedule_asset_alias_reference"), + ("dag_schedule_dataset_reference", "dag_schedule_asset_reference"), + ("task_outlet_dataset_reference", "task_outlet_asset_reference"), + ("dataset_dag_run_queue", "asset_dag_run_queue"), + ("dagrun_dataset_event", "dagrun_asset_event"), + ("dataset_event", "asset_event"), +) + + +def upgrade(): + """Rename dataset as asset.""" + # Rename tables + for original_name, new_name in table_name_mappings: + op.rename_table(original_name, new_name) + + with op.batch_alter_table("asset_active", schema=None) as batch_op: + batch_op.drop_constraint("asset_active_asset_name_uri_fkey", type_="foreignkey") + + with op.batch_alter_table("asset", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_name_uri_unique", + new_name="idx_asset_name_uri_unique", + columns=["name", "uri"], + unique=True, + ) + + with op.batch_alter_table("asset_active", schema=None) as batch_op: + batch_op.create_foreign_key( + constraint_name="asset_active_asset_name_uri_fkey", + referent_table="asset", + local_cols=["name", "uri"], + remote_cols=["name", "uri"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op: + batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + + with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op: + batch_op.drop_constraint(op.f("dataset_alias_dataset_alias_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_alias_dataset_alias_id", + new_name="idx_asset_alias_asset_alias_id", + columns=["alias_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="asset_alias_asset_alias_id_fk_key", + referent_table="asset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint(op.f("dataset_alias_dataset_dataset_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_alias_dataset_alias_dataset_id", + new_name="idx_asset_alias_asset_asset_id", + columns=["asset_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="asset_alias_asset_asset_id_fk_key", + referent_table="asset", + local_cols=["asset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("asset_alias_asset_event", schema=None) as batch_op: + batch_op.drop_constraint(op.f("dataset_alias_dataset_event_alias_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_alias_dataset_event_alias_id", + new_name="idx_asset_alias_asset_event_alias_id", + columns=["alias_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("asset_alias_asset_event_asset_id_fkey"), + referent_table="asset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint(op.f("dataset_alias_dataset_event_event_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_alias_dataset_event_event_id", + new_name="idx_asset_alias_asset_event_event_id", + columns=["event_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("asset_alias_asset_event_event_id_fk_key"), + referent_table="asset_event", + local_cols=["event_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op: + batch_op.drop_constraint("dsdar_dataset_fkey", type_="foreignkey") + batch_op.drop_constraint("dsdar_dag_id_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="dsdar_pkey", + new_name="asaar_pkey", + columns=["alias_id", "dag_id"], + ) + _rename_index( + batch_op=batch_op, + original_name="idx_dag_schedule_dataset_alias_reference_dag_id", + new_name="idx_dag_schedule_asset_alias_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="dsaar_asset_alias_fkey", + referent_table="asset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="dsaar_dag_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op: + batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + + with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op: + batch_op.drop_constraint("dsdr_dag_id_fkey", type_="foreignkey") + if op.get_bind().dialect.name in ("postgres", "mysql"): + batch_op.drop_constraint("dsdr_dataset_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="dsdr_pkey", + new_name="dsar_pkey", + columns=["asset_id", "dag_id"], + ) + _rename_index( + batch_op=batch_op, + original_name="idx_dag_schedule_dataset_reference_dag_id", + new_name="idx_dag_schedule_asset_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="dsar_dag_id_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="dsar_asset_fkey", + referent_table="asset", + local_cols=["asset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op: + batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + + batch_op.drop_constraint("todr_dag_id_fkey", type_="foreignkey") + if op.get_bind().dialect.name in ("postgres", "mysql"): + batch_op.drop_constraint("todr_dataset_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="todr_pkey", + new_name="toar_pkey", + columns=["asset_id", "dag_id", "task_id"], + ) + + _rename_index( + batch_op=batch_op, + original_name="idx_task_outlet_dataset_reference_dag_id", + new_name="idx_task_outlet_asset_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="toar_asset_fkey", + referent_table="asset", + local_cols=["asset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="toar_dag_id_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op: + batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + + batch_op.drop_constraint("ddrq_dag_fkey", type_="foreignkey") + if op.get_bind().dialect.name in ("postgres", "mysql"): + batch_op.drop_constraint("ddrq_dataset_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="datasetdagrunqueue_pkey", + new_name="assetdagrunqueue_pkey", + columns=["asset_id", "target_dag_id"], + ) + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_dag_run_queue_target_dag_id", + new_name="idx_asset_dag_run_queue_target_dag_id", + columns=["target_dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="adrq_asset_fkey", + referent_table="asset", + local_cols=["asset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="adrq_dag_fkey", + referent_table="dag", + local_cols=["target_dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dagrun_asset_event", schema=None) as batch_op: + batch_op.drop_constraint("dagrun_dataset_event_event_id_fkey", type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dagrun_dataset_events_dag_run_id", + new_name="idx_dagrun_asset_events_dag_run_id", + columns=["dag_run_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="dagrun_asset_event_dag_run_id_fkey", + referent_table="dag_run", + local_cols=["dag_run_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint("dagrun_dataset_event_dag_run_id_fkey", type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dagrun_dataset_events_event_id", + new_name="idx_dagrun_asset_events_event_id", + columns=["event_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="dagrun_asset_event_event_id_fkey", + referent_table="asset_event", + local_cols=["event_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("asset_event", schema=None) as batch_op: + batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False) + + with op.batch_alter_table("asset_event", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_id_timestamp", + new_name="idx_asset_id_timestamp", + columns=["asset_id", "timestamp"], + unique=False, + ) + + with op.batch_alter_table("asset_alias", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_dataset_alias_name_unique", + new_name="idx_asset_alias_name_unique", + columns=["name"], + unique=True, + ) + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "dataset_expression", + new_column_name="asset_expression", + type_=sqlalchemy_jsonfield.JSONField(json=json), + ) + + +def downgrade(): + """Unapply Rename dataset as asset.""" + # Rename tables + for original_name, new_name in table_name_mappings: + op.rename_table(new_name, original_name) + + with op.batch_alter_table("asset_active", schema=None) as batch_op: + batch_op.drop_constraint("asset_active_asset_name_uri_fkey", type_="foreignkey") + + with op.batch_alter_table("dataset", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_asset_name_uri_unique", + new_name="idx_dataset_name_uri_unique", + columns=["name", "uri"], + unique=True, + ) + + with op.batch_alter_table("asset_active", schema=None) as batch_op: + batch_op.create_foreign_key( + constraint_name="asset_active_asset_name_uri_fkey", + referent_table="dataset", + local_cols=["name", "uri"], + remote_cols=["name", "uri"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op: + batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) + + with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op: + batch_op.drop_constraint(op.f("asset_alias_asset_alias_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_asset_alias_asset_alias_id", + new_name="idx_dataset_alias_dataset_alias_id", + columns=["alias_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("dataset_alias_dataset_alias_id_fkey"), + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint(op.f("asset_alias_asset_asset_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_asset_alias_asset_asset_id", + new_name="idx_dataset_alias_dataset_alias_dataset_id", + columns=["dataset_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("dataset_alias_dataset_dataset_id_fkey"), + referent_table="dataset", + local_cols=["dataset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dataset_alias_dataset_event", schema=None) as batch_op: + batch_op.drop_constraint(op.f("asset_alias_asset_event_alias_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_asset_alias_asset_event_alias_id", + new_name="idx_dataset_alias_dataset_event_alias_id", + columns=["alias_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("dataset_alias_dataset_event_alias_id_fkey"), + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint(op.f("asset_alias_asset_event_event_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_asset_alias_asset_event_event_id", + new_name="idx_dataset_alias_dataset_event_event_id", + columns=["event_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name=op.f("dataset_alias_dataset_event_event_id_fkey"), + referent_table="dataset_event", + local_cols=["event_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: + batch_op.drop_constraint("dsaar_asset_alias_fkey", type_="foreignkey") + batch_op.drop_constraint("dsaar_dag_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="asaar_pkey", + new_name="dsdar_pkey", + columns=["alias_id", "dag_id"], + ) + _rename_index( + batch_op=batch_op, + original_name="idx_dag_schedule_asset_alias_reference_dag_id", + new_name="idx_dag_schedule_dataset_alias_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="dsdar_dataset_fkey", + referent_table="dataset_alias", + local_cols=["alias_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="dsdar_dag_id_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dag_schedule_dataset_reference", schema=None) as batch_op: + batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) + + batch_op.drop_constraint("dsar_dag_id_fkey", type_="foreignkey") + batch_op.drop_constraint("dsar_asset_fkey", type_="foreignkey") + + _rename_index( + batch_op=batch_op, + original_name="idx_dag_schedule_asset_reference_dag_id", + new_name="idx_dag_schedule_dataset_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + _rename_pk_constraint( + batch_op=batch_op, + original_name="dsar_pkey", + new_name="dsdr_pkey", + columns=["dataset_id", "dag_id"], + ) + + batch_op.create_foreign_key( + constraint_name="dsdr_dag_id_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="dsdr_dataset_fkey", + referent_table="dataset", + local_cols=["dataset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op: + batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) + + batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey") + batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey") + + _rename_index( + batch_op=batch_op, + original_name="idx_task_outlet_asset_reference_dag_id", + new_name="idx_task_outlet_dataset_reference_dag_id", + columns=["dag_id"], + unique=False, + ) + _rename_pk_constraint( + batch_op=batch_op, + original_name="toar_pkey", + new_name="todr_pkey", + columns=["dataset_id", "dag_id", "task_id"], + ) + + batch_op.create_foreign_key( + constraint_name="todr_dataset_fkey", + referent_table="dataset", + local_cols=["dataset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="todr_dag_id_fkey", + referent_table="dag", + local_cols=["dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dataset_dag_run_queue", schema=None) as batch_op: + batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) + + batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey") + batch_op.drop_constraint("adrq_dag_fkey", type_="foreignkey") + + _rename_pk_constraint( + batch_op=batch_op, + original_name="assetdagrunqueue_pkey", + new_name="datasetdagrunqueue_pkey", + columns=["dataset_id", "target_dag_id"], + ) + _rename_index( + batch_op=batch_op, + original_name="idx_asset_dag_run_queue_target_dag_id", + new_name="idx_dataset_dag_run_queue_target_dag_id", + columns=["target_dag_id"], + unique=False, + ) + + batch_op.create_foreign_key( + constraint_name="ddrq_dataset_fkey", + referent_table="dataset", + local_cols=["dataset_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + batch_op.create_foreign_key( + constraint_name="ddrq_dag_fkey", + referent_table="dag", + local_cols=["target_dag_id"], + remote_cols=["dag_id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dagrun_dataset_event", schema=None) as batch_op: + batch_op.drop_constraint(op.f("dagrun_asset_event_event_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dagrun_asset_events_event_id", + new_name="idx_dagrun_dataset_events_event_id", + columns=["event_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="dagrun_dataset_event_event_id_fkey", + referent_table="dataset_event", + local_cols=["event_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + batch_op.drop_constraint(op.f("dagrun_asset_event_dag_run_id_fkey"), type_="foreignkey") + _rename_index( + batch_op=batch_op, + original_name="idx_dagrun_asset_events_dag_run_id", + new_name="idx_dagrun_dataset_events_dag_run_id", + columns=["dag_run_id"], + unique=False, + ) + batch_op.create_foreign_key( + constraint_name="dagrun_dataset_event_dag_run_id_fkey", + referent_table="dag_run", + local_cols=["dag_run_id"], + remote_cols=["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("dataset_event", schema=None) as batch_op: + batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) + + with op.batch_alter_table("dataset_event", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_asset_id_timestamp", + new_name="idx_dataset_id_timestamp", + columns=["dataset_id", "timestamp"], + unique=False, + ) + + with op.batch_alter_table("dataset_alias", schema=None) as batch_op: + _rename_index( + batch_op=batch_op, + original_name="idx_asset_alias_name_unique", + new_name="idx_dataset_alias_name_unique", + columns=["name"], + unique=True, + ) + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column( + "asset_expression", + new_column_name="dataset_expression", + type_=sqlalchemy_jsonfield.JSONField(json=json), + ) diff --git a/airflow/models/asset.py b/airflow/models/asset.py index 79f9b7389439d..2b810e3d0f641 100644 --- a/airflow/models/asset.py +++ b/airflow/models/asset.py @@ -40,45 +40,21 @@ from airflow.utils.sqlalchemy import UtcDateTime alias_association_table = Table( - "dataset_alias_dataset", + "asset_alias_asset", Base.metadata, - Column("alias_id", ForeignKey("dataset_alias.id", ondelete="CASCADE"), primary_key=True), - Column("dataset_id", ForeignKey("dataset.id", ondelete="CASCADE"), primary_key=True), - Index("idx_dataset_alias_dataset_alias_id", "alias_id"), - Index("idx_dataset_alias_dataset_alias_dataset_id", "dataset_id"), - ForeignKeyConstraint( - ("alias_id",), - ["dataset_alias.id"], - name="ds_dsa_alias_id", - ondelete="CASCADE", - ), - ForeignKeyConstraint( - ("dataset_id",), - ["dataset.id"], - name="ds_dsa_dataset_id", - ondelete="CASCADE", - ), + Column("alias_id", ForeignKey("asset_alias.id", ondelete="CASCADE"), primary_key=True), + Column("asset_id", ForeignKey("asset.id", ondelete="CASCADE"), primary_key=True), + Index("idx_asset_alias_asset_alias_id", "alias_id"), + Index("idx_asset_alias_asset_asset_id", "asset_id"), ) -dataset_alias_dataset_event_assocation_table = Table( - "dataset_alias_dataset_event", +asset_alias_asset_event_assocation_table = Table( + "asset_alias_asset_event", Base.metadata, - Column("alias_id", ForeignKey("dataset_alias.id", ondelete="CASCADE"), primary_key=True), - Column("event_id", ForeignKey("dataset_event.id", ondelete="CASCADE"), primary_key=True), - Index("idx_dataset_alias_dataset_event_alias_id", "alias_id"), - Index("idx_dataset_alias_dataset_event_event_id", "event_id"), - ForeignKeyConstraint( - ("alias_id",), - ["dataset_alias.id"], - name="dss_de_alias_id", - ondelete="CASCADE", - ), - ForeignKeyConstraint( - ("event_id",), - ["dataset_event.id"], - name="dss_de_event_id", - ondelete="CASCADE", - ), + Column("alias_id", ForeignKey("asset_alias.id", ondelete="CASCADE"), primary_key=True), + Column("event_id", ForeignKey("asset_event.id", ondelete="CASCADE"), primary_key=True), + Index("idx_asset_alias_asset_event_alias_id", "alias_id"), + Index("idx_asset_alias_asset_event_event_id", "event_id"), ) @@ -116,23 +92,23 @@ class AssetAliasModel(Base): nullable=False, ) - __tablename__ = "dataset_alias" + __tablename__ = "asset_alias" __table_args__ = ( - Index("idx_dataset_alias_name_unique", name, unique=True), + Index("idx_asset_alias_name_unique", name, unique=True), {"sqlite_autoincrement": True}, # ensures PK values not reused ) - datasets = relationship( + assets = relationship( "AssetModel", secondary=alias_association_table, backref="aliases", ) - dataset_events = relationship( + asset_events = relationship( "AssetEvent", - secondary=dataset_alias_dataset_event_assocation_table, + secondary=asset_alias_asset_event_assocation_table, back_populates="source_aliases", ) - consuming_dags = relationship("DagScheduleAssetAliasReference", back_populates="dataset_alias") + consuming_dags = relationship("DagScheduleAssetAliasReference", back_populates="asset_alias") @classmethod def from_public(cls, obj: AssetAlias) -> AssetAliasModel: @@ -207,12 +183,12 @@ class AssetModel(Base): active = relationship("AssetActive", uselist=False, viewonly=True) - consuming_dags = relationship("DagScheduleAssetReference", back_populates="dataset") - producing_tasks = relationship("TaskOutletAssetReference", back_populates="dataset") + consuming_dags = relationship("DagScheduleAssetReference", back_populates="asset") + producing_tasks = relationship("TaskOutletAssetReference", back_populates="asset") - __tablename__ = "dataset" + __tablename__ = "asset" __table_args__ = ( - Index("idx_dataset_name_uri_unique", name, uri, unique=True), + Index("idx_asset_name_uri_unique", name, uri, unique=True), {"sqlite_autoincrement": True}, # ensures PK values not reused ) @@ -293,7 +269,7 @@ class AssetActive(Base): PrimaryKeyConstraint(name, uri, name="asset_active_pkey"), ForeignKeyConstraint( columns=[name, uri], - refcolumns=["dataset.name", "dataset.uri"], + refcolumns=["asset.name", "asset.uri"], name="asset_active_asset_name_uri_fkey", ondelete="CASCADE", ), @@ -314,25 +290,25 @@ class DagScheduleAssetAliasReference(Base): created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - dataset_alias = relationship("AssetAliasModel", back_populates="consuming_dags") - dag = relationship("DagModel", back_populates="schedule_dataset_alias_references") + asset_alias = relationship("AssetAliasModel", back_populates="consuming_dags") + dag = relationship("DagModel", back_populates="schedule_asset_alias_references") - __tablename__ = "dag_schedule_dataset_alias_reference" + __tablename__ = "dag_schedule_asset_alias_reference" __table_args__ = ( - PrimaryKeyConstraint(alias_id, dag_id, name="dsdar_pkey"), + PrimaryKeyConstraint(alias_id, dag_id, name="asaar_pkey"), ForeignKeyConstraint( (alias_id,), - ["dataset_alias.id"], - name="dsdar_dataset_alias_fkey", + ["asset_alias.id"], + name="dsaar_asset_alias_fkey", ondelete="CASCADE", ), ForeignKeyConstraint( columns=(dag_id,), refcolumns=["dag.dag_id"], - name="dsdar_dag_fkey", + name="dsaar_dag_fkey", ondelete="CASCADE", ), - Index("idx_dag_schedule_dataset_alias_reference_dag_id", dag_id), + Index("idx_dag_schedule_asset_alias_reference_dag_id", dag_id), ) def __eq__(self, other): @@ -344,104 +320,99 @@ def __hash__(self): return hash(self.__mapper__.primary_key) def __repr__(self): - args = [] - for attr in [x.name for x in self.__mapper__.primary_key]: - args.append(f"{attr}={getattr(self, attr)!r}") + args = [f"{x.name}={getattr(self, x.name)!r}" for x in self.__mapper__.primary_key] return f"{self.__class__.__name__}({', '.join(args)})" class DagScheduleAssetReference(Base): """References from a DAG to an asset of which it is a consumer.""" - dataset_id = Column(Integer, primary_key=True, nullable=False) + asset_id = Column(Integer, primary_key=True, nullable=False) dag_id = Column(StringID(), primary_key=True, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - dataset = relationship("AssetModel", back_populates="consuming_dags") - dag = relationship("DagModel", back_populates="schedule_dataset_references") + asset = relationship("AssetModel", back_populates="consuming_dags") + dag = relationship("DagModel", back_populates="schedule_asset_references") queue_records = relationship( "AssetDagRunQueue", primaryjoin="""and_( - DagScheduleAssetReference.dataset_id == foreign(AssetDagRunQueue.dataset_id), + DagScheduleAssetReference.asset_id == foreign(AssetDagRunQueue.asset_id), DagScheduleAssetReference.dag_id == foreign(AssetDagRunQueue.target_dag_id), )""", cascade="all, delete, delete-orphan", ) - __tablename__ = "dag_schedule_dataset_reference" + __tablename__ = "dag_schedule_asset_reference" __table_args__ = ( - PrimaryKeyConstraint(dataset_id, dag_id, name="dsdr_pkey"), + PrimaryKeyConstraint(asset_id, dag_id, name="dsar_pkey"), ForeignKeyConstraint( - (dataset_id,), - ["dataset.id"], - name="dsdr_dataset_fkey", + (asset_id,), + ["asset.id"], + name="dsar_asset_fkey", ondelete="CASCADE", ), ForeignKeyConstraint( columns=(dag_id,), refcolumns=["dag.dag_id"], - name="dsdr_dag_id_fkey", + name="dsar_dag_id_fkey", ondelete="CASCADE", ), - Index("idx_dag_schedule_dataset_reference_dag_id", dag_id), + Index("idx_dag_schedule_asset_reference_dag_id", dag_id), ) def __eq__(self, other): if isinstance(other, self.__class__): - return self.dataset_id == other.dataset_id and self.dag_id == other.dag_id - else: - return NotImplemented + return self.asset_id == other.asset_id and self.dag_id == other.dag_id + return NotImplemented def __hash__(self): return hash(self.__mapper__.primary_key) def __repr__(self): - args = [] - for attr in [x.name for x in self.__mapper__.primary_key]: - args.append(f"{attr}={getattr(self, attr)!r}") + args = [f"{attr}={getattr(self, attr)!r}" for attr in [x.name for x in self.__mapper__.primary_key]] return f"{self.__class__.__name__}({', '.join(args)})" class TaskOutletAssetReference(Base): """References from a task to an asset that it updates / produces.""" - dataset_id = Column(Integer, primary_key=True, nullable=False) + asset_id = Column(Integer, primary_key=True, nullable=False) dag_id = Column(StringID(), primary_key=True, nullable=False) task_id = Column(StringID(), primary_key=True, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) - dataset = relationship("AssetModel", back_populates="producing_tasks") + asset = relationship("AssetModel", back_populates="producing_tasks") - __tablename__ = "task_outlet_dataset_reference" + __tablename__ = "task_outlet_asset_reference" __table_args__ = ( ForeignKeyConstraint( - (dataset_id,), - ["dataset.id"], - name="todr_dataset_fkey", + (asset_id,), + ["asset.id"], + name="toar_asset_fkey", ondelete="CASCADE", ), - PrimaryKeyConstraint(dataset_id, dag_id, task_id, name="todr_pkey"), + PrimaryKeyConstraint(asset_id, dag_id, task_id, name="toar_pkey"), ForeignKeyConstraint( columns=(dag_id,), refcolumns=["dag.dag_id"], - name="todr_dag_id_fkey", + name="toar_dag_id_fkey", ondelete="CASCADE", ), - Index("idx_task_outlet_dataset_reference_dag_id", dag_id), + Index("idx_task_outlet_asset_reference_dag_id", dag_id), ) def __eq__(self, other): if isinstance(other, self.__class__): return ( - self.dataset_id == other.dataset_id + self.asset_id == other.asset_id and self.dag_id == other.dag_id and self.task_id == other.task_id ) - else: - return NotImplemented + + return NotImplemented def __hash__(self): return hash(self.__mapper__.primary_key) @@ -456,31 +427,32 @@ def __repr__(self): class AssetDagRunQueue(Base): """Model for storing asset events that need processing.""" - dataset_id = Column(Integer, primary_key=True, nullable=False) + asset_id = Column(Integer, primary_key=True, nullable=False) target_dag_id = Column(StringID(), primary_key=True, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) - dataset = relationship("AssetModel", viewonly=True) - __tablename__ = "dataset_dag_run_queue" + asset = relationship("AssetModel", viewonly=True) + + __tablename__ = "asset_dag_run_queue" __table_args__ = ( - PrimaryKeyConstraint(dataset_id, target_dag_id, name="datasetdagrunqueue_pkey"), + PrimaryKeyConstraint(asset_id, target_dag_id, name="assetdagrunqueue_pkey"), ForeignKeyConstraint( - (dataset_id,), - ["dataset.id"], - name="ddrq_dataset_fkey", + (asset_id,), + ["asset.id"], + name="adrq_asset_fkey", ondelete="CASCADE", ), ForeignKeyConstraint( (target_dag_id,), ["dag.dag_id"], - name="ddrq_dag_fkey", + name="adrq_dag_fkey", ondelete="CASCADE", ), - Index("idx_dataset_dag_run_queue_target_dag_id", target_dag_id), + Index("idx_asset_dag_run_queue_target_dag_id", target_dag_id), ) def __eq__(self, other): if isinstance(other, self.__class__): - return self.dataset_id == other.dataset_id and self.target_dag_id == other.target_dag_id + return self.asset_id == other.asset_id and self.target_dag_id == other.target_dag_id else: return NotImplemented @@ -495,12 +467,12 @@ def __repr__(self): association_table = Table( - "dagrun_dataset_event", + "dagrun_asset_event", Base.metadata, Column("dag_run_id", ForeignKey("dag_run.id", ondelete="CASCADE"), primary_key=True), - Column("event_id", ForeignKey("dataset_event.id", ondelete="CASCADE"), primary_key=True), - Index("idx_dagrun_dataset_events_dag_run_id", "dag_run_id"), - Index("idx_dagrun_dataset_events_event_id", "event_id"), + Column("event_id", ForeignKey("asset_event.id", ondelete="CASCADE"), primary_key=True), + Index("idx_dagrun_asset_events_dag_run_id", "dag_run_id"), + Index("idx_dagrun_asset_events_event_id", "event_id"), ) @@ -508,7 +480,7 @@ class AssetEvent(Base): """ A table to store assets events. - :param dataset_id: reference to AssetModel record + :param asset_id: reference to AssetModel record :param extra: JSON field for arbitrary extra info :param source_task_id: the task_id of the TI which updated the asset :param source_dag_id: the dag_id of the TI which updated the asset @@ -521,7 +493,7 @@ class AssetEvent(Base): """ id = Column(Integer, primary_key=True, autoincrement=True) - dataset_id = Column(Integer, nullable=False) + asset_id = Column(Integer, nullable=False) extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) source_task_id = Column(StringID(), nullable=True) source_dag_id = Column(StringID(), nullable=True) @@ -529,22 +501,22 @@ class AssetEvent(Base): source_map_index = Column(Integer, nullable=True, server_default=text("-1")) timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False) - __tablename__ = "dataset_event" + __tablename__ = "asset_event" __table_args__ = ( - Index("idx_dataset_id_timestamp", dataset_id, timestamp), + Index("idx_asset_id_timestamp", asset_id, timestamp), {"sqlite_autoincrement": True}, # ensures PK values not reused ) created_dagruns = relationship( "DagRun", secondary=association_table, - backref="consumed_dataset_events", + backref="consumed_asset_events", ) source_aliases = relationship( "AssetAliasModel", - secondary=dataset_alias_dataset_event_assocation_table, - back_populates="dataset_events", + secondary=asset_alias_asset_event_assocation_table, + back_populates="asset_events", ) source_task_instance = relationship( @@ -569,9 +541,9 @@ class AssetEvent(Base): lazy="select", uselist=False, ) - dataset = relationship( + asset = relationship( AssetModel, - primaryjoin="AssetEvent.dataset_id == foreign(AssetModel.id)", + primaryjoin="AssetEvent.asset_id == foreign(AssetModel.id)", viewonly=True, lazy="select", uselist=False, @@ -579,13 +551,13 @@ class AssetEvent(Base): @property def uri(self): - return self.dataset.uri + return self.asset.uri def __repr__(self) -> str: args = [] for attr in [ "id", - "dataset_id", + "asset_id", "extra", "source_task_id", "source_dag_id", diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f0a7d7f56be2c..00943ec2ee262 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -268,12 +268,12 @@ def get_asset_triggered_next_run_info( .join( ADRQ, and_( - ADRQ.dataset_id == DagScheduleAssetReference.dataset_id, + ADRQ.asset_id == DagScheduleAssetReference.asset_id, ADRQ.target_dag_id == DagScheduleAssetReference.dag_id, ), isouter=True, ) - .join(AssetModel, AssetModel.id == DagScheduleAssetReference.dataset_id) + .join(AssetModel, AssetModel.id == DagScheduleAssetReference.asset_id) .group_by(DagScheduleAssetReference.dag_id) .where(DagScheduleAssetReference.dag_id.in_(dag_ids)) ).all() @@ -322,7 +322,7 @@ def _create_orm_dagrun( ) # Load defaults into the following two fields to ensure result can be serialized detached run.log_template_id = int(session.scalar(select(func.max(LogTemplate.__table__.c.id)))) - run.consumed_dataset_events = [] + run.consumed_asset_events = [] session.add(run) session.flush() run.dag = dag @@ -376,7 +376,7 @@ class DAG(LoggingMixin): .. versionadded:: 2.4 The *schedule* argument to specify either time-based scheduling logic - (timetable), or dataset-driven triggers. + (timetable), or asset-driven triggers. .. versionchanged:: 3.0 The default value of *schedule* has been changed to *None* (no schedule). @@ -2675,9 +2675,9 @@ def get_serialized_fields(cls): """Stringified DAGs and operators contain exactly these fields.""" if not cls.__serialized_fields: exclusion_list = { - "schedule_dataset_references", - "schedule_dataset_alias_references", - "task_outlet_dataset_references", + "schedule_asset_references", + "schedule_asset_alias_references", + "task_outlet_asset_references", "_old_context_manager_dags", "safe_dag_id", "last_loaded", @@ -2842,8 +2842,8 @@ class DagModel(Base): timetable_summary = Column(Text, nullable=True) # Timetable description timetable_description = Column(String(1000), nullable=True) - # Dataset expression based on dataset triggers - dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + # Asset expression based on asset triggers + asset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) # Tags for view filter tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag")) # Dag owner links for DAGs view @@ -2870,18 +2870,18 @@ class DagModel(Base): __table_args__ = (Index("idx_next_dagrun_create_after", next_dagrun_create_after, unique=False),) - schedule_dataset_references = relationship( + schedule_asset_references = relationship( "DagScheduleAssetReference", back_populates="dag", cascade="all, delete, delete-orphan", ) - schedule_dataset_alias_references = relationship( + schedule_asset_alias_references = relationship( "DagScheduleAssetAliasReference", back_populates="dag", cascade="all, delete, delete-orphan", ) - schedule_datasets = association_proxy("schedule_dataset_references", "dataset") - task_outlet_dataset_references = relationship( + schedule_assets = association_proxy("schedule_asset_references", "asset") + task_outlet_asset_references = relationship( "TaskOutletAssetReference", cascade="all, delete, delete-orphan", ) @@ -3093,7 +3093,7 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: del all_records dag_statuses = {} for dag_id, records in by_dag.items(): - dag_statuses[dag_id] = {x.dataset.uri: True for x in records} + dag_statuses[dag_id] = {x.asset.uri: True for x in records} ser_dags = session.scalars( select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() @@ -3105,27 +3105,27 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: del by_dag[dag_id] del dag_statuses[dag_id] del dag_statuses - dataset_triggered_dag_info = {} + asset_triggered_dag_info = {} for dag_id, records in by_dag.items(): times = sorted(x.created_at for x in records) - dataset_triggered_dag_info[dag_id] = (times[0], times[-1]) + asset_triggered_dag_info[dag_id] = (times[0], times[-1]) del by_dag - dataset_triggered_dag_ids = set(dataset_triggered_dag_info.keys()) - if dataset_triggered_dag_ids: + asset_triggered_dag_ids = set(asset_triggered_dag_info.keys()) + if asset_triggered_dag_ids: exclusion_list = set( session.scalars( select(DagModel.dag_id) .join(DagRun.dag_model) .where(DagRun.state.in_((DagRunState.QUEUED, DagRunState.RUNNING))) - .where(DagModel.dag_id.in_(dataset_triggered_dag_ids)) + .where(DagModel.dag_id.in_(asset_triggered_dag_ids)) .group_by(DagModel.dag_id) .having(func.count() >= func.max(DagModel.max_active_runs)) ) ) if exclusion_list: - dataset_triggered_dag_ids -= exclusion_list - dataset_triggered_dag_info = { - k: v for k, v in dataset_triggered_dag_info.items() if k not in exclusion_list + asset_triggered_dag_ids -= exclusion_list + asset_triggered_dag_info = { + k: v for k, v in asset_triggered_dag_info.items() if k not in exclusion_list } # We limit so that _one_ scheduler doesn't try to do all the creation of dag runs @@ -3137,7 +3137,7 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: cls.has_import_errors == expression.false(), or_( cls.next_dagrun_create_after <= func.now(), - cls.dag_id.in_(dataset_triggered_dag_ids), + cls.dag_id.in_(asset_triggered_dag_ids), ), ) .order_by(cls.next_dagrun_create_after) @@ -3146,7 +3146,7 @@ def dag_ready(dag_id: str, cond: BaseAsset, statuses: dict) -> bool | None: return ( session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)), - dataset_triggered_dag_info, + asset_triggered_dag_info, ) def calculate_dagrun_date_fields( @@ -3186,7 +3186,7 @@ def calculate_dagrun_date_fields( @provide_session def get_asset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None: - if self.dataset_expression is None: + if self.asset_expression is None: return None # When an asset alias does not resolve into assets, get_asset_triggered_next_run_info returns diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 20ec12b9915e7..5de0466a6be0e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -91,7 +91,7 @@ CreatedTasks = TypeVar("CreatedTasks", Iterator["dict[str, Any]"], Iterator[TI]) -RUN_ID_REGEX = r"^(?:manual|scheduled|dataset_triggered)__(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+00:00)$" +RUN_ID_REGEX = r"^(?:manual|scheduled|asset_triggered)__(?:\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+00:00)$" class TISchedulingDecision(NamedTuple): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 1dbe299f25b39..b5fcac30a9ad5 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1086,11 +1086,11 @@ def get_triggering_events() -> dict[str, list[AssetEvent | AssetEventPydantic]]: nonlocal dag_run if dag_run not in session: dag_run = session.merge(dag_run, load=False) - asset_events = dag_run.consumed_dataset_events + asset_events = dag_run.consumed_asset_events triggering_events: dict[str, list[AssetEvent | AssetEventPydantic]] = defaultdict(list) for event in asset_events: - if event.dataset: - triggering_events[event.dataset.uri].append(event) + if event.asset: + triggering_events[event.asset.uri].append(event) return triggering_events @@ -2911,22 +2911,22 @@ def _register_asset_changes(self, *, events: OutletEventAccessors, session: Sess frozen_extra = frozenset(asset_alias_event["extra"].items()) asset_alias_names[(asset_uri, frozen_extra)].add(asset_alias_name) - dataset_models: dict[str, AssetModel] = { - dataset_obj.uri: dataset_obj - for dataset_obj in session.scalars( + asset_models: dict[str, AssetModel] = { + asset_obj.uri: asset_obj + for asset_obj in session.scalars( select(AssetModel).where(AssetModel.uri.in_(uri for uri, _ in asset_alias_names)) ) } - if missing_datasets := [Asset(uri=u) for u, _ in asset_alias_names if u not in dataset_models]: - dataset_models.update( - (dataset_obj.uri, dataset_obj) - for dataset_obj in asset_manager.create_assets(missing_datasets, session=session) + if missing_assets := [Asset(uri=u) for u, _ in asset_alias_names if u not in asset_models]: + asset_models.update( + (asset_obj.uri, asset_obj) + for asset_obj in asset_manager.create_assets(missing_assets, session=session) ) - self.log.warning("Created new datasets for alias reference: %s", missing_datasets) + self.log.warning("Created new assets for alias reference: %s", missing_assets) session.flush() # Needed because we need the id for fk. for (uri, extra_items), alias_names in asset_alias_names.items(): - asset_obj = dataset_models[uri] + asset_obj = asset_models[uri] self.log.info( 'Creating event for %r through aliases "%s"', asset_obj, @@ -2935,7 +2935,7 @@ def _register_asset_changes(self, *, events: OutletEventAccessors, session: Sess asset_manager.register_asset_change( task_instance=self, asset=asset_obj, - aliases=[AssetAlias(name) for name in alias_names], + aliases=[AssetAlias(name=name) for name in alias_names], extra=dict(extra_items), session=session, source_alias_names=alias_names, diff --git a/airflow/serialization/pydantic/asset.py b/airflow/serialization/pydantic/asset.py index 4cd264902091a..611730dd92e47 100644 --- a/airflow/serialization/pydantic/asset.py +++ b/airflow/serialization/pydantic/asset.py @@ -23,7 +23,7 @@ class DagScheduleAssetReferencePydantic(BaseModelPydantic): """Serializable version of the DagScheduleAssetReference ORM SqlAlchemyModel used by internal API.""" - dataset_id: int + asset_id: int dag_id: str created_at: datetime updated_at: datetime @@ -34,7 +34,7 @@ class DagScheduleAssetReferencePydantic(BaseModelPydantic): class TaskOutletAssetReferencePydantic(BaseModelPydantic): """Serializable version of the TaskOutletAssetReference ORM SqlAlchemyModel used by internal API.""" - dataset_id: int + asset_id: int dag_id: str task_id: str created_at: datetime @@ -62,13 +62,13 @@ class AssetEventPydantic(BaseModelPydantic): """Serializable representation of the AssetEvent ORM SqlAlchemyModel used by internal API.""" id: int - dataset_id: Optional[int] + asset_id: Optional[int] extra: dict source_task_id: Optional[str] source_dag_id: Optional[str] source_run_id: Optional[str] source_map_index: Optional[int] timestamp: datetime - dataset: Optional[AssetPydantic] + asset: Optional[AssetPydantic] model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) diff --git a/airflow/serialization/pydantic/dag_run.py b/airflow/serialization/pydantic/dag_run.py index 86857452e8310..fd12ca12c0184 100644 --- a/airflow/serialization/pydantic/dag_run.py +++ b/airflow/serialization/pydantic/dag_run.py @@ -55,7 +55,7 @@ class DagRunPydantic(BaseModelPydantic): dag_hash: Optional[str] updated_at: Optional[datetime] dag: Optional[PydanticDag] - consumed_dataset_events: List[AssetEventPydantic] # noqa: UP006 + consumed_asset_events: List[AssetEventPydantic] # noqa: UP006 log_template_id: Optional[int] triggered_by: Optional[DagRunTriggeredByType] diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 9f180c2a5deac..14528f62bddf1 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -265,7 +265,7 @@ def encode_asset_condition(var: BaseAsset) -> dict[str, Any]: def decode_asset_condition(var: dict[str, Any]) -> BaseAsset: """ - Decode a previously serialized dataset condition. + Decode a previously serialized asset condition. :meta private: """ @@ -740,8 +740,8 @@ def serialize( elif isinstance(var, LazySelectSequence): return cls.serialize(list(var)) elif isinstance(var, BaseAsset): - serialized_dataset = encode_asset_condition(var) - return cls._encode(serialized_dataset, type_=serialized_dataset.pop("__type")) + serialized_asset = encode_asset_condition(var) + return cls._encode(serialized_asset, type_=serialized_asset.pop("__type")) elif isinstance(var, SimpleTaskInstance): return cls._encode( cls.serialize(var.__dict__, strict=strict, use_pydantic_models=use_pydantic_models), diff --git a/airflow/timetables/assets.py b/airflow/timetables/assets.py index b158555590ad5..d69a8e4d80cc0 100644 --- a/airflow/timetables/assets.py +++ b/airflow/timetables/assets.py @@ -92,6 +92,6 @@ def next_dagrun_info( ) def generate_run_id(self, *, run_type: DagRunType, **kwargs: typing.Any) -> str: - if run_type != DagRunType.DATASET_TRIGGERED: + if run_type != DagRunType.ASSET_TRIGGERED: return self.timetable.generate_run_id(run_type=run_type, **kwargs) return super().generate_run_id(run_type=run_type, **kwargs) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index aa6437118eeea..79a55e7b08dcc 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -383,7 +383,7 @@ export const $DAGDetailsResponse = { ], title: "Dag Run Timeout", }, - dataset_expression: { + asset_expression: { anyOf: [ { type: "object", @@ -392,7 +392,7 @@ export const $DAGDetailsResponse = { type: "null", }, ], - title: "Dataset Expression", + title: "Asset Expression", }, doc_md: { anyOf: [ @@ -538,7 +538,7 @@ export const $DAGDetailsResponse = { "owners", "catchup", "dag_run_timeout", - "dataset_expression", + "asset_expression", "doc_md", "start_date", "end_date", @@ -1002,13 +1002,13 @@ export const $DAGRunTypes = { type: "integer", title: "Manual", }, - dataset_triggered: { + asset_triggered: { type: "integer", - title: "Dataset Triggered", + title: "Asset Triggered", }, }, type: "object", - required: ["backfill", "scheduled", "manual", "dataset_triggered"], + required: ["backfill", "scheduled", "manual", "asset_triggered"], title: "DAGRunTypes", description: "DAG Run Types for responses.", } as const; @@ -1093,7 +1093,7 @@ export const $DagRunTriggeredByType = { export const $DagRunType = { type: "string", - enum: ["backfill", "scheduled", "manual", "dataset_triggered"], + enum: ["backfill", "scheduled", "manual", "asset_triggered"], title: "DagRunType", description: "Class with DagRun types.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c795fce3e540d..83030dfb0d168 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -68,7 +68,7 @@ export type DAGDetailsResponse = { owners: Array; catchup: boolean; dag_run_timeout: string | null; - dataset_expression: { + asset_expression: { [key: string]: unknown; } | null; doc_md: string | null; @@ -174,7 +174,7 @@ export type DAGRunTypes = { backfill: number; scheduled: number; manual: number; - dataset_triggered: number; + asset_triggered: number; }; /** @@ -222,7 +222,7 @@ export type DagRunType = | "backfill" | "scheduled" | "manual" - | "dataset_triggered"; + | "asset_triggered"; /** * Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 4f308a6f9e060..9af1486f914e0 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -283,7 +283,7 @@ def __getitem__(self, key: int | str | Asset | AssetAlias) -> LazyAssetEventSele where_clause = AssetAliasModel.name == asset_alias.name elif isinstance(obj, (Asset, str)): asset = self._assets[extract_event_key(obj)] - join_clause = AssetEvent.dataset + join_clause = AssetEvent.asset where_clause = AssetModel.uri == asset.uri else: raise ValueError(key) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index d6a367c4d91fe..8579be74ff4bd 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol): "2.9.0": "1949afb29106", "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", - "3.0.0": "3a8972ecb8f9", + "3.0.0": "05234396c6fc", } diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index f98ef40c0af63..4fc387617be87 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -108,7 +108,7 @@ def readable_config(self): keep_last_filters=[column("external_trigger") == false()], keep_last_group_by=["dag_id"], ), - _TableConfig(table_name="dataset_event", recency_column_name="timestamp"), + _TableConfig(table_name="asset_event", recency_column_name="timestamp"), _TableConfig(table_name="import_error", recency_column_name="timestamp"), _TableConfig(table_name="log", recency_column_name="dttm"), _TableConfig(table_name="sla_miss", recency_column_name="timestamp"), diff --git a/airflow/utils/types.py b/airflow/utils/types.py index 80ee1d644d4d2..e012d9f1963cb 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -86,7 +86,7 @@ class DagRunType(str, enum.Enum): BACKFILL_JOB = "backfill" SCHEDULED = "scheduled" MANUAL = "manual" - DATASET_TRIGGERED = "dataset_triggered" + ASSET_TRIGGERED = "asset_triggered" def __str__(self) -> str: return self.value @@ -118,5 +118,5 @@ class DagRunTriggeredByType(enum.Enum): UI = "ui" # for clicking the `Trigger DAG` button TEST = "test" # for dag.test() TIMETABLE = "timetable" # for timetable based triggering - DATASET = "dataset" # for dataset_triggered run type + DATASET = "dataset" # for asset_triggered run type BACKFILL = "backfill" diff --git a/airflow/www/jest-setup.js b/airflow/www/jest-setup.js index ecf79db5cb19f..0c5ebf5c67586 100644 --- a/airflow/www/jest-setup.js +++ b/airflow/www/jest-setup.js @@ -61,7 +61,7 @@ global.defaultDagRunDisplayNumber = 245; global.filtersOptions = { // Must stay in sync with airflow/www/static/js/types/index.ts dagStates: ["success", "running", "queued", "failed"], - runTypes: ["manual", "backfill", "scheduled", "dataset_triggered"], + runTypes: ["manual", "backfill", "scheduled", "asset_triggered"], }; global.moment = moment; diff --git a/airflow/www/static/js/cluster-activity/index.test.tsx b/airflow/www/static/js/cluster-activity/index.test.tsx index 3381a5be87325..4a162e12bdf7e 100644 --- a/airflow/www/static/js/cluster-activity/index.test.tsx +++ b/airflow/www/static/js/cluster-activity/index.test.tsx @@ -37,7 +37,7 @@ const mockHistoricalMetricsData = { dag_run_states: { failed: 0, queued: 0, running: 0, success: 306 }, dag_run_types: { backfill: 0, - dataset_triggered: 0, + asset_triggered: 0, manual: 14, scheduled: 292, }, diff --git a/airflow/www/static/js/components/DatasetEventCard.tsx b/airflow/www/static/js/components/DatasetEventCard.tsx index 9dd1ee91e3731..d3931bf3c1de9 100644 --- a/airflow/www/static/js/components/DatasetEventCard.tsx +++ b/airflow/www/static/js/components/DatasetEventCard.tsx @@ -69,7 +69,7 @@ const DatasetEventCard = ({ - {assetEvent.datasetUri && assetEvent.datasetUri !== selectedUri ? ( + {assetEvent.assetUri && assetEvent.assetUri !== selectedUri ? ( - {assetEvent.datasetUri} + {assetEvent.assetUri} ) : ( - {assetEvent.datasetUri} + {assetEvent.assetUri} )} diff --git a/airflow/www/static/js/components/RunTypeIcon.tsx b/airflow/www/static/js/components/RunTypeIcon.tsx index 19e59babd3157..5d5be01663385 100644 --- a/airflow/www/static/js/components/RunTypeIcon.tsx +++ b/airflow/www/static/js/components/RunTypeIcon.tsx @@ -42,7 +42,7 @@ const DagRunTypeIcon = ({ runType, ...rest }: Props) => { return ; case "scheduled": return ; - case "dataset_triggered": + case "asset_triggered": return ; default: return null; diff --git a/airflow/www/static/js/dag/details/Header.tsx b/airflow/www/static/js/dag/details/Header.tsx index 172443ca47b36..f881a48213b9d 100644 --- a/airflow/www/static/js/dag/details/Header.tsx +++ b/airflow/www/static/js/dag/details/Header.tsx @@ -66,7 +66,7 @@ const Header = ({ mapIndex }: Props) => { runId.includes("manual__") || runId.includes("scheduled__") || runId.includes("backfill__") || - runId.includes("dataset_triggered__") ? ( + runId.includes("asset_triggered__") ? (