Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pydantic-powered ORM models serialization for internal API. #29776

Merged

Conversation

potiuk
Copy link
Member

@potiuk potiuk commented Feb 26, 2023

Add basic serialization capabilities for the ORM SqlAlchemy models that we use on the client side of the Internal API. Serializing the whole ORM models is rather complex, therefore it seems much more reasonable to convert the ORM models into serializable form and use them - rather than the SQLAlchemy models.

There are just a handful of those models that we need to serialize, and it is important to maintain typing of the fields in the objects for MyPy verification so we can allow some level of duplication and redefine the models as pure Python objects.

We only need one-way converstion (from database models to Python models), because all the DB operations and modifications of the Database entries will be done in the internal API server, so the server side of any method will be able to use primary key stored in the serializable object, to retrieve the actual DB model to update.

We also need to serialization to work both way - an easy way to convert such Python classees to json and back - including validation.

We could serialize those models manually, but this would be quite an overhead to develop and maintain - therefore we are harnessing the power of Pydantic, that has already ORM mapping to plain Python (Pydantic) classes built in.

This PR implements definition of the Pydantic classes and tests for the classes testing:

  • conversion of the ORM models to Pydantic objects
  • serialization of the Pydantic classes to json
  • deserialization of the json-serialized classes to Pydantic objects

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 26, 2023
@potiuk potiuk requested review from mhenc and uranusjr February 26, 2023 14:51
@potiuk
Copy link
Member Author

potiuk commented Feb 26, 2023

cc: @vincbeck

@potiuk potiuk added the AIP-44 Airflow Internal API label Feb 26, 2023
@potiuk potiuk force-pushed the add-basic-model-serialization-for-aip-44 branch 2 times, most recently from 486c762 to 2af3dfa Compare February 26, 2023 16:42
@potiuk
Copy link
Member Author

potiuk commented Feb 26, 2023

While working on the next step of the POC I also added task_instance field to the BaseJobPydantic to show how nicely (and automatically Pydantic takes care about relations - when you use `BaseJobPydantic.from_orm(LocalTaskJob) it will automatically retrieve and convert to Pydantic also the linked TaskInstance object and when we serialize/deserialize the Pydantic object, all the infrormation that is kept in the TasInstance related to the LocalTaskJob is kept in the serialized form.

This is exactly what Pydantic's "from_orm` magic does.

@potiuk potiuk force-pushed the add-basic-model-serialization-for-aip-44 branch 2 times, most recently from ae98c57 to f134e19 Compare February 26, 2023 19:23
@potiuk
Copy link
Member Author

potiuk commented Feb 26, 2023

And now I also added all the "references" in DatasetModel with back-references, to show that Pydantic also handles it well

@potiuk potiuk force-pushed the add-basic-model-serialization-for-aip-44 branch 2 times, most recently from 8e99ad3 to a1b42bf Compare February 26, 2023 19:40
Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, it keeps it simple and easy to maintain.

To avoid having to redeclare all fields in BaseModelPydantic we can leverage auto-schema features from sqlalchemy model with: https://github.com/tiangolo/pydantic-sqlalchemy.

I don't have experience with this particular one, but the equivalent for marshmallow is really nice and flexible:
https://marshmallow-sqlalchemy.readthedocs.io/en/latest/
(We are already using the later in the api_connexion schema)

If I understand correctly we should rewrite all the code that needs db isolation to use this extra layer of objects and stop using SQLAlchemy model instance ? (Makes sense if they don't have access to the db anymore, using detached instance everywhere is not sustainable)

pydantic_task_instance = TaskInstancePydantic.from_orm(ti)

json_string = pydantic_task_instance.json()
print(json_string)
Copy link
Member

@pierrejeambrun pierrejeambrun Feb 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few print statement in tests that we can remove.

Copy link
Member Author

@potiuk potiuk Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i looked at pydantic-sqlalchemy and for now i decided not to use it. I will try and see what will be the quality of generated code vs. The manually written one.

But i can see if it can be integrated and generate the Pydantic classes automatically - that would save some effort (but not a lot to be honest). There are a few risks:

  • 'Still experimental' status of it

  • We likely do not want to convert all of the models and fields automatically - we will want to do skip some fields or treat them a bit differently and either decide to add some validations or not. Also we might want to exclude certain models from serializing if the serialized form will drag too much of useless data. There are some back references in our models and they might cause recursive serialization and attempting to serialize more than we need).

  • Those models of ours that we want to serialize change extremely rarely so benefit from automating the conversion are very small vs. the cost connected with fixing potential 'experimental' issues.

But I will try :)

The print statement are for now to show the serialized form so that you can manually see them when. Running - yes we can remove them eventually the asserts are good enough to show that serialization /serialization works

Re: how to use it - yes in all internal_api calls that we need to pas the models we should make sure that the db model gets created or retrieved on the 'other' side of the call and returned as serialized/Pydantic form.

And we can even optimise it in some cases that we see the need of actually for performance. We can also decide to return the DB model instead (in regular DB mode) because they are technically equivalent and using them is the same as long as the model would be detached anyway and the model is read-only (or as long as we will implement sending modifications back). So if you look at the context.pyi - we can have either DB model passed to Context or the Pydantic equivalent in case DB access is disabled.

That is not 100% compatible but for reading it should be (minus fields we decide not to serialize) and we already know that the case with internal APi will be slightly less performant and not everything will be possible that the users could do before - that's the whole point of the internal API to limit them

I have - for example a WIP/POC where i slightly modify LocalTaskJob interface where i use this technique to send BaseJobPydantic back from the server after creating LocalTaskJob instance (LocalTaskJob does not have other DB fields than BaseJob) and use it. Following the Pydantic classes, I think I will be able to do it with 0 impact on the 'regular' case (LocalTaskJob will be used everywhere) - only in case of 'internal api' we will serialize/reserialize the object created. So impact on performance and behaviour for regular case will be not changed.

In those cases return type of the methods will be LocalTaskJob | BaseJobPydantic and we will have Mypy to make sure that we have all fields in the Pydantic model. So for example when we will try to use a new field that is not defined in Pydantic version, Mypy will complain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a wondering why the choise is pydantic and not attrs or dataclasses which we already use in Airflow and supported in SQLA:

Copy link
Member Author

@potiuk potiuk Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to make them (both) work. But either I am too stupid or the documentation is completely misleading. I simply don't know how to make sure classes and related entitites (if they have them) can be serialized in a single go with them. But if someone would like to make a parallell POC witth them - I would love to see it. I think wiht this PR it's rather straightforward what we want to achieve and which entites should be made serializable.

If we can make them without Pydantic - I am all ears (not that Pydantic just works with just defining the entities and marking them to work in orm_mode. I wanted to avoid writing any code on our side to do the serialization, so if we could make it even simpler than that - I am all ears actually :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try to make them (both) work.

That basically what I was to know. 🤣 I do not have any concern about pydantic especially if it configure more straightforward rather than other options

@vincbeck
Copy link
Contributor

vincbeck commented Feb 27, 2023

We only need one-way converstion (from database models to Python models), because all the DB operations and modifications of the Database entries will be done in the internal API server, so the server side of any method will be able to use primary key stored in the serializable object, to retrieve the actual DB model to update.

Quick question. If we take this approach, why do we need then to send the whole serialized object and not the primary keys only? Is it to avoid to refactore too many functions and have a backward compatible solution? I am not saying it is wrong, I am just curious :)

@potiuk
Copy link
Member Author

potiuk commented Feb 27, 2023

Quick question. If we take this approach, why do we need then to send the whole serialized object and not the primary keys only? Is it to avoid to refactore too many functions and have a backward compatible solution? I am not saying it is wrong, I am just curious :)

Because much of our code that runs on the "client" side accesses those object's fields and even related instances fields. For example when local task job is run, it acesses a number of local task job fields. Good example is our "context" that each of the tasks "execute" method gets. https://github.com/apache/airflow/blob/main/airflow/utils/context.pyi -> if you look there, task_instance field is actually a TaskInstance object. And you would like all TaskInstance fields to be available in the execute's method of the operator.

@vincbeck
Copy link
Contributor

I see, thanks!

@potiuk potiuk force-pushed the add-basic-model-serialization-for-aip-44 branch from a1b42bf to 958c9b9 Compare March 4, 2023 16:58
@potiuk
Copy link
Member Author

potiuk commented Mar 4, 2023

Any more comments @mhenc ? Can we merge this one and proceed with some refactors of existing methods?

@mhenc
Copy link
Contributor

mhenc commented Mar 6, 2023

Thank you! This looks really cool.

Just to confirm I understand it correctly:
After this PR is merged, we need to change methods using TaskInstance parameter to use TaskInstance|TaskInstancePydantic,
e.g. in run_raw_task

def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:

I think the ti._run_raw_task need also be updated (to static method?) to work with both TaskInstance and TaskInstancePydantic.

@potiuk
Copy link
Member Author

potiuk commented Mar 7, 2023

Thank you! This looks really cool.

Just to confirm I understand it correctly: After this PR is merged, we need to change methods using TaskInstance parameter to use TaskInstance|TaskInstancePydantic, e.g. in run_raw_task

def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:

I think the ti._run_raw_task need also be updated (to static method?) to work with both TaskInstance and TaskInstancePydantic.

Yeah. Rouhgly speaking. This might be a bit more involved in a few places (for example we will have to split some methods into two etc. But yeah, roughly speaking that what's it is. I have an (unfinished) example of local task job refactor that would use that approach f11f5af

Add basic serialization capabilities for the ORM SqlAlchemy models
that we use on the client side of the Internal API. Serializing
the whole ORM models is rather complex, therefore it seems much
more reasonable to convert the ORM models into serializable
form and use them - rather than the SQLAlchemy models.

There are just a handful of those models that we need to serialize,
and it is important to maintain typing of the fields in the objects
for MyPy verification so we can allow some level of duplication
and redefine the models as pure Python objects.

We only need one-way converstion (from database models to Python models),
because all the DB operations and modifications of the Database
entries will be done in the internal API server, so the server side
of any method will be able to use primary key stored in the serializable
object, to retrieve the actual DB model to update.

We also need to serialization to work both way - an easy way to convert
such Python classees to json and back - including validation.

We could serialize those models manually, but this would be quite an
overhead to develop and maintain - therefore we are harnessing the
power of Pydantic, that has already ORM mapping to plain Python
(Pydantic) classes built in.

This PR implements definition of the Pydantic classes and tests
for the classes testing:

* conversion of the ORM models to Pydantic objects
* serialization of the Pydantic classes to json
* deserialization of the json-serialized classes to Pydantic objects
@potiuk potiuk force-pushed the add-basic-model-serialization-for-aip-44 branch from 958c9b9 to 0463b3f Compare March 7, 2023 21:38
@potiuk
Copy link
Member Author

potiuk commented Mar 7, 2023

If there are no more comments, I would love to merge this one (need a committer approval) and then I could follow up withe the example - and then similar approach could be used by other AIP-44 related PRs where we need to serialize DB models for the client API.

@potiuk potiuk merged commit f4ec389 into apache:main Mar 8, 2023
@potiuk potiuk deleted the add-basic-model-serialization-for-aip-44 branch March 8, 2023 00:27
@potiuk
Copy link
Member Author

potiuk commented Mar 8, 2023

Merged. I will try to complete my Local Task Job refactor with it - but feel free to use it to implement the other (simpler) parts of AIP-44 with it (happy to help with reviews).

@pierrejeambrun pierrejeambrun added this to the Airflow 2.6.0 milestone Mar 22, 2023
@pierrejeambrun pierrejeambrun added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-44 Airflow Internal API area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants