-
-
Notifications
You must be signed in to change notification settings - Fork 396
/
creator.py
504 lines (419 loc) · 18.5 KB
/
creator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
import inspect
from base64 import b32encode
from hashlib import sha3_224
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, cast
import pydantic
from pydantic.main import BaseConfig as PydanticBaseConfig
from tortoise import fields
from tortoise.contrib.pydantic.base import PydanticListModel, PydanticModel
from tortoise.contrib.pydantic.utils import get_annotations
if TYPE_CHECKING: # pragma: nocoverage
from tortoise.models import Model
_MODEL_INDEX: Dict[str, Type[PydanticModel]] = {}
class PydanticMeta:
"""
The ``PydanticMeta`` class is used to configure metadata for generating the pydantic Model.
Usage:
.. code-block:: python3
class Foo(Model):
...
class PydanticMeta:
exclude = ("foo", "baa")
computed = ("count_peanuts", )
"""
#: If not empty, only fields this property contains will be in the pydantic model
include: Tuple[str, ...] = ()
#: Fields listed in this property will be excluded from pydantic model
exclude: Tuple[str, ...] = ()
#: Computed fields can be listed here to use in pydantic model
computed: Tuple[str, ...] = ()
#: Use backward relations without annotations - not recommended, it can be huge data
#: without control
backward_relations: bool = True
#: Maximum recursion level allowed
max_recursion: int = 3
#: Allow cycles in recursion - This can result in HUGE data - Be careful!
#: Please use this with ``exclude``/``include`` and sane ``max_recursion``
allow_cycles: bool = False
#: If we should exclude raw fields (the ones have _id suffixes) of relations
exclude_raw_fields: bool = True
#: Sort fields alphabetically.
#: If not set (or ``False``) then leave fields in declaration order
sort_alphabetically: bool = False
#: Allows user to specify custom config class for generated model
config_class: Optional[Type[PydanticBaseConfig]] = None
def _br_it(val: str) -> str:
return val.replace("\n", "<br/>").strip()
def _cleandoc(obj: Any) -> str:
return _br_it(inspect.cleandoc(obj.__doc__ or ""))
def _pydantic_recursion_protector(
cls: "Type[Model]",
*,
stack: tuple,
exclude: Tuple[str, ...] = (),
include: Tuple[str, ...] = (),
computed: Tuple[str, ...] = (),
name=None,
allow_cycles: bool = False,
sort_alphabetically: Optional[bool] = None,
) -> Optional[Type[PydanticModel]]:
"""
It is an inner function to protect pydantic model creator against cyclic recursion
"""
if not allow_cycles and cls in (c[0] for c in stack[:-1]):
return None
caller_fname = stack[0][1]
prop_path = [caller_fname] # It stores the fields in the hierarchy
level = 1
for _, parent_fname, parent_max_recursion in stack[1:]:
# Check recursion level
prop_path.insert(0, parent_fname)
if level >= parent_max_recursion:
# This is too verbose, Do we even need a way of reporting truncated models?
# tortoise.logger.warning(
# "Recursion level %i has reached for model %s",
# level,
# parent_cls.__qualname__ + "." + ".".join(prop_path),
# )
return None
level += 1
return pydantic_model_creator(
cls,
exclude=exclude,
include=include,
computed=computed,
name=name,
_stack=stack,
allow_cycles=allow_cycles,
sort_alphabetically=sort_alphabetically,
)
def pydantic_model_creator(
cls: "Type[Model]",
*,
name=None,
exclude: Tuple[str, ...] = (),
include: Tuple[str, ...] = (),
computed: Tuple[str, ...] = (),
optional: Tuple[str, ...] = (),
allow_cycles: Optional[bool] = None,
sort_alphabetically: Optional[bool] = None,
_stack: tuple = (),
exclude_readonly: bool = False,
meta_override: Optional[Type] = None,
config_class: Optional[Type[PydanticBaseConfig]] = None,
) -> Type[PydanticModel]:
"""
Function to build `Pydantic Model <https://pydantic-docs.helpmanual.io/usage/models/>`__ off Tortoise Model.
:param cls: The Tortoise Model
:param name: Specify a custom name explicitly, instead of a generated name.
:param exclude: Extra fields to exclude from the provided model.
:param include: Extra fields to include from the provided model.
:param computed: Extra computed fields to include from the provided model.
:param optional: Extra optional fields for the provided model.
:param allow_cycles: Do we allow any cycles in the generated model?
This is only useful for recursive/self-referential models.
A value of ``False`` (the default) will prevent any and all backtracking.
:param sort_alphabetically: Sort the parameters alphabetically instead of Field-definition order.
The default order would be:
* Field definition order +
* order of reverse relations (as discovered) +
* order of computed functions (as provided).
:param exclude_readonly: Build a subset model that excludes any readonly fields
:param meta_override: A PydanticMeta class to override model's values.
:param config_class: A custom config class to use as pydantic config.
Note: Created pydantic model uses config_class parameter and PydanticMeta's
config_class as its Config class's bases(Only if provided!), but it
ignores ``fields`` config. pydantic_model_creator will generate fields by
include/exclude/computed parameters automatically.
"""
# Fully qualified class name
fqname = cls.__module__ + "." + cls.__qualname__
postfix = ""
def get_name() -> str:
# If arguments are specified (different from the defaults), we append a hash to the
# class name, to make it unique
# We don't check by stack, as cycles get explicitly renamed.
# When called later, include is explicitly set, so fence passes.
nonlocal postfix
is_default = (
exclude == ()
and include == ()
and computed == ()
and sort_alphabetically is None
and allow_cycles is None
)
hashval = (
f"{fqname};{exclude};{include};{computed};{_stack}:{sort_alphabetically}:{allow_cycles}"
)
postfix = (
"." + b32encode(sha3_224(hashval.encode("utf-8")).digest()).decode("utf-8").lower()[:6]
if not is_default
else ""
)
return fqname + postfix
# We need separate model class for different exclude, include and computed parameters
_name = name or get_name()
has_submodel = False
# Get settings and defaults
meta = getattr(cls, "PydanticMeta", PydanticMeta)
def get_param(attr: str) -> Any:
if meta_override:
return getattr(meta_override, attr, getattr(meta, attr, getattr(PydanticMeta, attr)))
return getattr(meta, attr, getattr(PydanticMeta, attr))
default_include: Tuple[str, ...] = tuple(get_param("include"))
default_exclude: Tuple[str, ...] = tuple(get_param("exclude"))
default_computed: Tuple[str, ...] = tuple(get_param("computed"))
default_config_class: Optional[Type[PydanticBaseConfig]] = get_param("config_class")
backward_relations: bool = bool(get_param("backward_relations"))
max_recursion: int = int(get_param("max_recursion"))
exclude_raw_fields: bool = bool(get_param("exclude_raw_fields"))
_sort_fields: bool = (
bool(get_param("sort_alphabetically"))
if sort_alphabetically is None
else sort_alphabetically
)
_allow_cycles: bool = bool(get_param("allow_cycles") if allow_cycles is None else allow_cycles)
# Update parameters with defaults
include = tuple(include) + default_include
exclude = tuple(exclude) + default_exclude
computed = tuple(computed) + default_computed
# Get all annotations
annotations = get_annotations(cls)
# Note: First ones override next ones' attributes
pconfig_bases: list[Type] = [PydanticModel.Config]
# If default config class is specified, we add it as first item of bases
if default_config_class:
pconfig_bases.insert(0, default_config_class)
# If config class is specified, we add it as first item of bases
if config_class:
pconfig_bases.insert(0, config_class)
# fields will be filled BY using include/exclude/computed
pconfig_attrs: Dict[Any, Any] = {"fields": {}}
# If at least one of default_config_class or config_class have title,
# we don't add title automatically.
if not hasattr(default_config_class, "title") and not hasattr(config_class, "title"):
pconfig_attrs["title"] = name or cls.__name__
# If at least one of default_config_class or config_class have extra,
# we don't add extra automatically.
if not hasattr(default_config_class, "extra") and not hasattr(config_class, "extra"):
pconfig_attrs["extra"] = pydantic.main.Extra.forbid
# Properties and their annotations` store
pconfig: Type[pydantic.main.BaseConfig] = type(
"Config",
tuple(pconfig_bases),
pconfig_attrs,
)
pannotations: Dict[str, Optional[Type]] = {}
properties: Dict[str, Any] = {"__annotations__": pannotations, "Config": pconfig}
# Get model description
model_description = cls.describe(serializable=False)
# Field map we use
field_map: Dict[str, dict] = {}
pk_raw_field: str = ""
def field_map_update(keys: tuple, is_relation=True) -> None:
nonlocal pk_raw_field
for key in keys:
fds = model_description[key]
if isinstance(fds, dict):
fds = [fds]
for fd in fds:
n = fd["name"]
if key == "pk_field":
pk_raw_field = n
# Include or exclude field
if (include and n not in include) or n in exclude:
continue
# Remove raw fields
raw_field = fd.get("raw_field", None)
if raw_field is not None and exclude_raw_fields and raw_field != pk_raw_field:
del field_map[raw_field]
field_map[n] = fd
# Update field definitions from description
if not exclude_readonly:
field_map_update(("pk_field",), is_relation=False)
field_map_update(("data_fields",), is_relation=False)
if not exclude_readonly:
included_fields: tuple = (
"fk_fields",
"o2o_fields",
"m2m_fields",
)
if backward_relations:
included_fields = (
*included_fields,
"backward_fk_fields",
"backward_o2o_fields",
)
field_map_update(included_fields)
# Add possible computed fields
field_map.update(
{
k: {"field_type": callable, "function": getattr(cls, k), "description": None}
for k in computed
}
)
# Sort field map (Python 3.7+ has guaranteed ordered dictionary keys)
if _sort_fields:
# Sort Alphabetically
field_map = {k: field_map[k] for k in sorted(field_map)}
else:
# Sort to definition order
field_map = {
k: field_map[k] for k in tuple(cls._meta.fields_map.keys()) + computed if k in field_map
}
# Process fields
for fname, fdesc in field_map.items():
comment = ""
fconfig: Dict[str, Any] = {}
field_type = fdesc["field_type"]
field_default = fdesc.get("default")
def get_submodel(_model: "Type[Model]") -> Optional[Type[PydanticModel]]:
"""Get Pydantic model for the submodel"""
nonlocal exclude, _name, has_submodel
if _model:
new_stack = _stack + ((cls, fname, max_recursion),)
# Get pydantic schema for the submodel
prefix_len = len(fname) + 1
pmodel = _pydantic_recursion_protector(
_model,
exclude=tuple(
str(v[prefix_len:]) for v in exclude if v.startswith(fname + ".")
),
include=tuple(
str(v[prefix_len:]) for v in include if v.startswith(fname + ".")
),
computed=tuple(
str(v[prefix_len:]) for v in computed if v.startswith(fname + ".")
),
stack=new_stack,
allow_cycles=_allow_cycles,
sort_alphabetically=sort_alphabetically,
)
else:
pmodel = None
# If the result is None it has been excluded and we need to exclude the field
if pmodel is None:
exclude += (fname,)
else:
has_submodel = True
# We need to rename if there are duplicate instances of this model
if cls in (c[0] for c in _stack):
_name = name or get_name()
return pmodel
# Foreign keys and OneToOne fields are embedded schemas
if (
field_type is fields.relational.ForeignKeyFieldInstance
or field_type is fields.relational.OneToOneFieldInstance
or field_type is fields.relational.BackwardOneToOneRelation
):
model = get_submodel(fdesc["python_type"])
if model:
if fdesc.get("nullable"):
fconfig["nullable"] = True
if fdesc.get("nullable") or field_default is not None:
model = Optional[model] # type: ignore
pannotations[fname] = model
# Backward FK and ManyToMany fields are list of embedded schemas
elif (
field_type is fields.relational.BackwardFKRelation
or field_type is fields.relational.ManyToManyFieldInstance
):
model = get_submodel(fdesc["python_type"])
if model:
pannotations[fname] = List[model] # type: ignore
# Computed fields as methods
elif field_type is callable:
func = fdesc["function"]
annotation = get_annotations(cls, func).get("return", None)
comment = _cleandoc(func)
if annotation is not None:
pannotations[fname] = annotation
# Json fields
elif field_type is fields.JSONField:
pannotations[fname] = Any # type: ignore
# Any other tortoise fields
else:
annotation = annotations.get(fname, None)
fconfig.update(fdesc["constraints"])
ptype = fdesc["python_type"]
if fdesc.get("nullable"):
fconfig["nullable"] = True
if fdesc.get("nullable") or field_default is not None or fname in optional:
ptype = Optional[ptype]
if not (exclude_readonly and fdesc["constraints"].get("readOnly") is True):
pannotations[fname] = annotation or ptype
# Create a schema for the field
if fname in pannotations:
# Use comment if we have and enabled or use the field description if specified
description = comment or _br_it(fdesc.get("docstring") or fdesc["description"] or "")
fconfig["description"] = description
fconfig["title"] = fname.replace("_", " ").title()
if field_default is not None and not callable(field_default):
properties[fname] = field_default
pconfig.fields[fname] = fconfig
# Here we endure that the name is unique, but complete objects are still labeled verbatim
if not has_submodel:
_name = name or f"{fqname}.leaf"
elif has_submodel:
_name = name or get_name()
# Here we de-dup to ensure that a uniquely named object is a unique object
# This fixes some Pydantic constraints.
if _name in _MODEL_INDEX:
return _MODEL_INDEX[_name]
# Creating Pydantic class for the properties generated before
model = cast(Type[PydanticModel], type(_name, (PydanticModel,), properties))
# Copy the Model docstring over
model.__doc__ = _cleandoc(cls)
# Store the base class
setattr(model.__config__, "orig_model", cls)
# Store model reference so we can de-dup it later on if needed.
_MODEL_INDEX[_name] = model
return model
def pydantic_queryset_creator(
cls: "Type[Model]",
*,
name=None,
exclude: Tuple[str, ...] = (),
include: Tuple[str, ...] = (),
computed: Tuple[str, ...] = (),
allow_cycles: Optional[bool] = None,
sort_alphabetically: Optional[bool] = None,
) -> Type[PydanticListModel]:
"""
Function to build a `Pydantic Model <https://pydantic-docs.helpmanual.io/usage/models/>`__ list off Tortoise Model.
:param cls: The Tortoise Model to put in a list.
:param name: Specify a custom name explicitly, instead of a generated name.
The list generated name is currently naive and merely adds a "s" to the end
of the singular name.
:param exclude: Extra fields to exclude from the provided model.
:param include: Extra fields to include from the provided model.
:param computed: Extra computed fields to include from the provided model.
:param allow_cycles: Do we allow any cycles in the generated model?
This is only useful for recursive/self-referential models.
A value of ``False`` (the default) will prevent any and all backtracking.
:param sort_alphabetically: Sort the parameters alphabetically instead of Field-definition order.
The default order would be:
* Field definition order +
* order of reverse relations (as discovered) +
* order of computed functions (as provided).
"""
submodel = pydantic_model_creator(
cls,
exclude=exclude,
include=include,
computed=computed,
allow_cycles=allow_cycles,
sort_alphabetically=sort_alphabetically,
name=name,
)
lname = name or f"{submodel.__name__}_list"
properties = {"__annotations__": {"__root__": List[submodel]}} # type: ignore
# Creating Pydantic class for the properties generated before
model = cast(Type[PydanticListModel], type(lname, (PydanticListModel,), properties))
# Copy the Model docstring over
model.__doc__ = _cleandoc(cls)
# The title of the model to hide the hash postfix
setattr(model.__config__, "title", name or f"{getattr(submodel.__config__,'title')}_list")
# Store the base class & submodel
setattr(model.__config__, "submodel", submodel)
return model