-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DatasetAlias to support dynamic Dataset Event Emission and Dataset Creation #40478
Conversation
286535c
to
8a62a12
Compare
a118ff9
to
36608bb
Compare
Finally get the CI green. Will continue work on adding unit tests and docs |
617eb98
to
43b7673
Compare
3855047
to
9a8b71c
Compare
Added test cases to cover the changes. Will work on the documentation next |
…to_uri and rename it as extract_event_key
… and forbid passing list of data alias
… to be added in a single task
… only one dataset event
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
…t Creation (apache#40478) * feat(dataset_alias) * add DatasetAlias class * support yield dataset alias through datasets.Metadata * allow only one dataset event to triggered for the same dataset with the same extra in a single task * dynamically adding dataset through dataset_alias * feat(datasets): add optional alias argument to dataset metadata * feat(dag): add dataset aliases defined to db during dag parsing * feat(datasets): register dataset change through dataset alias in outlet event
dataset_outlets = [x for x in task.outlets or [] if isinstance(x, Dataset)] | ||
dataset_outlets: list[Dataset] = [] | ||
dataset_alias_outlets: list[DatasetAlias] = [] | ||
for outlet in task.outlets: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @Lee-W -- we just starting testing our upgrade for 2.9.1
to 2.10.4
and spent a while trying to debug a 'NoneType' object is not iterable
error that was showing up when serializing our dags. We had a function that created tasks that defaulted the outlet kwarg as None
instead of []
:
def make_task(task_name, outlets = None)
Updating the default kwarg value to the empty list fixed it but just wanted to mention that it might be helpful to provide the default iterable empty list like the previous version did:
for outlet in task.outlets or []
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Airflow is not expecting outlets
to be used this way.
airflow/airflow/models/baseoperator.py
Lines 1103 to 1121 in c083e45
self.outlets: list = [] | |
if inlets: | |
self.inlets = ( | |
inlets | |
if isinstance(inlets, list) | |
else [ | |
inlets, | |
] | |
) | |
if outlets: | |
self.outlets = ( | |
outlets | |
if isinstance(outlets, list) | |
else [ | |
outlets, | |
] | |
) |
@uranusjr WDYT?
Why this change?
related: #40039 which is inspired #34206
We want to allow
DatasetEvent
andDataset
to be created in a task dynamically.e.g.,
What's change?
Introduced
DatasetAlias
which support the following syntax to create aDataset
and aDatasetEvent
dynamicallyNote
This PR only supports part of the #40039. It does not yet support scheduling based on
DatasetAlias
(will create another PR for that)^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.