-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: redesign SPARQLModelAdapter class
The redesign introduces major class API changes: 1. Initialization now takes the endpoint, query and model class directly, this simplifies class usage and allows for better state retention in the instance. 2. functionality previously defined in SPARQLModelAdapter.__call__ is transposed to SPARQLModelAdapter.query. Closes: #38.
- Loading branch information
Showing
7 changed files
with
409 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,87 +1,203 @@ | ||
"""SPARQLModelAdapter class for QueryResult to Pydantic model conversions.""" | ||
"""SPARQLModelAdapter class for SPARQL query result set to Pydantic model conversions.""" | ||
|
||
from collections.abc import Iterable | ||
from typing import cast | ||
from collections import defaultdict | ||
from collections.abc import Iterator | ||
import math | ||
from typing import Any, Generic, overload | ||
|
||
from typeguard import typechecked | ||
|
||
from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper | ||
from pydantic import BaseModel | ||
from rdfproxy.utils._types import _TModelConstructorCallable, _TModelInstance | ||
from rdfproxy.utils._exceptions import ( | ||
InterdependentParametersException, | ||
UndefinedBindingException, | ||
) | ||
from rdfproxy.utils._types import _TModelInstance | ||
from rdfproxy.utils.models import Page | ||
from rdfproxy.utils.sparql.sparql_templates import ungrouped_pagination_base_query | ||
from rdfproxy.utils.sparql.sparql_utils import ( | ||
calculate_offset, | ||
construct_count_query, | ||
construct_grouped_count_query, | ||
construct_grouped_pagination_query, | ||
query_with_wrapper, | ||
temporary_query_override, | ||
) | ||
from rdfproxy.utils.utils import ( | ||
get_bindings_from_query_result, | ||
instantiate_model_from_kwargs, | ||
) | ||
|
||
|
||
class SPARQLModelAdapter: | ||
"""Adapter/Mapper for QueryResult to Pydantic model conversions. | ||
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint | ||
and map a flat SPARQL query result set to a potentially nested Pydantic model. | ||
Example: | ||
from SPARQLWrapper import SPARQLWrapper | ||
from pydantic import BaseModel | ||
from rdfproxy import SPARQLModelAdapter, _TModelInstance | ||
class SimpleModel(BaseModel): | ||
x: int | ||
y: int | ||
class NestedModel(BaseModel): | ||
a: str | ||
b: SimpleModel | ||
class ComplexModel(BaseModel): | ||
p: str | ||
q: NestedModel | ||
@typechecked | ||
class SPARQLModelAdapter(Generic[_TModelInstance]): | ||
"""Adapter/Mapper for SPARQL query result set to Pydantic model conversions. | ||
sparql_wrapper = SPARQLWrapper("https://query.wikidata.org/bigdata/namespace/wdq/sparql") | ||
query = ''' | ||
select ?x ?y ?a ?p | ||
where { | ||
values (?x ?y ?a ?p) { | ||
(1 2 "a value" "p value") | ||
} | ||
} | ||
''' | ||
adapter = SPARQLModelAdapter(sparql_wrapper=sparql_wrapper) | ||
models: list[_TModelInstance] = adapter(query=query, model_constructor=ComplexModel) | ||
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint, | ||
map a flat SPARQL query result set to a potentially nested Pydantic model and | ||
optionally paginate and/or group the results by a SPARQL binding. | ||
""" | ||
|
||
def __init__(self, sparql_wrapper: SPARQLWrapper) -> None: | ||
self.sparql_wrapper = sparql_wrapper | ||
|
||
if self.sparql_wrapper.returnFormat != "json": | ||
self.sparql_wrapper.setReturnFormat(JSON) | ||
def __init__( | ||
self, target: str | SPARQLWrapper, query: str, model: type[_TModelInstance] | ||
) -> None: | ||
self._query = query | ||
self._model = model | ||
|
||
def __call__( | ||
self, | ||
query: str, | ||
model_constructor: type[_TModelInstance] | _TModelConstructorCallable, | ||
) -> Iterable[_TModelInstance]: | ||
self.sparql_wrapper: SPARQLWrapper = ( | ||
SPARQLWrapper(target) if isinstance(target, str) else target | ||
) | ||
self.sparql_wrapper.setReturnFormat(JSON) | ||
self.sparql_wrapper.setQuery(query) | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
|
||
if isinstance(model_constructor, type(BaseModel)): | ||
model_constructor = cast(type[_TModelInstance], model_constructor) | ||
@overload | ||
def query(self) -> list[_TModelInstance]: ... | ||
|
||
@overload | ||
def query( | ||
self, | ||
*, | ||
group_by: str, | ||
) -> dict[str, list[_TModelInstance]]: ... | ||
|
||
bindings = get_bindings_from_query_result(query_result) | ||
models: list[_TModelInstance] = [ | ||
instantiate_model_from_kwargs(model_constructor, **binding) | ||
for binding in bindings | ||
] | ||
@overload | ||
def query( | ||
self, | ||
*, | ||
page: int, | ||
size: int, | ||
) -> Page[_TModelInstance]: ... | ||
|
||
elif isinstance(model_constructor, _TModelConstructorCallable): | ||
models: Iterable[_TModelInstance] = model_constructor(query_result) | ||
@overload | ||
def query( | ||
self, | ||
*, | ||
page: int, | ||
size: int, | ||
group_by: str, | ||
) -> Page[_TModelInstance]: ... | ||
|
||
def query( | ||
self, | ||
*, | ||
page: int | None = None, | ||
size: int | None = None, | ||
group_by: str | None = None, | ||
) -> ( | ||
list[_TModelInstance] | dict[str, list[_TModelInstance]] | Page[_TModelInstance] | ||
): | ||
"""Run query against endpoint and map the SPARQL query result set to a Pydantic model. | ||
Optional pagination and/or grouping by a SPARQL binding is avaible by | ||
supplying the group_by and/or page/size parameters. | ||
""" | ||
match page, size, group_by: | ||
case None, None, None: | ||
return self._query_collect_models() | ||
case int(), int(), None: | ||
return self._query_paginate_ungrouped(page=page, size=size) | ||
case None, None, str(): | ||
return self._query_group_by(group_by=group_by) | ||
case int(), int(), str(): | ||
return self._query_paginate_grouped( | ||
page=page, size=size, group_by=group_by | ||
) | ||
case (None, int(), Any()) | (int(), None, Any()): | ||
raise InterdependentParametersException( | ||
"Parameters 'page' and 'size' are mutually dependent." | ||
) | ||
case _: | ||
raise Exception("This should never happen.") | ||
|
||
def _query_generate_model_bindings_mapping( | ||
self, query: str | None = None | ||
) -> Iterator[tuple[_TModelInstance, dict[str, Any]]]: | ||
"""Run query, construct model instances and generate a model-bindings mapping. | ||
The query parameter defaults to the initially defined query and | ||
is run against the endpoint defined in the SPARQLModelAdapter instance. | ||
Note: The coupling of model instances with flat SPARQL results | ||
allows for easier and more efficient grouping operations (see grouping functionality). | ||
""" | ||
if query is None: | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
else: | ||
raise TypeError( | ||
"Argument 'model_constructor' must be a model class " | ||
"or a model constructor callable." | ||
) | ||
|
||
return models | ||
with temporary_query_override(self.sparql_wrapper): | ||
self.sparql_wrapper.setQuery(query) | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
|
||
_bindings = get_bindings_from_query_result(query_result) | ||
|
||
for bindings in _bindings: | ||
model = instantiate_model_from_kwargs(self._model, **bindings) | ||
yield model, bindings | ||
|
||
def _query_collect_models(self, query: str | None = None) -> list[_TModelInstance]: | ||
"""Run query against endpoint and collect model instances.""" | ||
return [ | ||
model | ||
for model, _ in self._query_generate_model_bindings_mapping(query=query) | ||
] | ||
|
||
def _query_group_by( | ||
self, group_by: str, query: str | None = None | ||
) -> dict[str, list[_TModelInstance]]: | ||
"""Run query against endpoint and group results by a SPARQL binding.""" | ||
group = defaultdict(list) | ||
|
||
for model, bindings in self._query_generate_model_bindings_mapping(query): | ||
try: | ||
key = bindings[group_by] | ||
except KeyError: | ||
raise UndefinedBindingException( | ||
f"SPARQL binding '{group_by}' requested for grouping " | ||
f"not in query projection '{bindings}'." | ||
) | ||
|
||
group[str(key)].append(model) | ||
|
||
return group | ||
|
||
def _get_count(self, query: str) -> int: | ||
"""Construct a count query from the initialized query, run it and return the count result.""" | ||
result = query_with_wrapper(query=query, sparql_wrapper=self.sparql_wrapper) | ||
return int(next(result)["cnt"]) | ||
|
||
def _query_paginate_ungrouped(self, page: int, size: int) -> Page[_TModelInstance]: | ||
"""Run query with pagination according to page and size. | ||
The internal query is dynamically modified according to page (offset)/size (limit) | ||
and run with SPARQLModelAdapter._query_collect_models. | ||
""" | ||
paginated_query = ungrouped_pagination_base_query.substitute( | ||
query=self._query, offset=calculate_offset(page, size), limit=size | ||
) | ||
count_query = construct_count_query(self._query) | ||
|
||
items = self._query_collect_models(query=paginated_query) | ||
total = self._get_count(count_query) | ||
pages = math.ceil(total / size) | ||
|
||
return Page(items=items, page=page, size=size, total=total, pages=pages) | ||
|
||
def _query_paginate_grouped( | ||
self, page: int, size: int, group_by: str | ||
) -> Page[_TModelInstance]: | ||
"""Run query with pagination according to page/size and group result by a SPARQL binding. | ||
The internal query is dynamically modified according to page (offset)/size (limit) | ||
and run with SPARQLModelAdapter._query_group_by. | ||
""" | ||
grouped_paginated_query = construct_grouped_pagination_query( | ||
query=self._query, page=page, size=size, group_by=group_by | ||
) | ||
grouped_count_query = construct_grouped_count_query( | ||
query=self._query, group_by=group_by | ||
) | ||
|
||
items = self._query_group_by(group_by=group_by, query=grouped_paginated_query) | ||
total = self._get_count(grouped_count_query) | ||
pages = math.ceil(total / size) | ||
|
||
return Page(items=items, page=page, size=size, total=total, pages=pages) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
"""Custom exceptions for RDFProxy.""" | ||
|
||
|
||
class UndefinedBindingException(KeyError): | ||
"""Exception for indicating that a requested key could not be retrieved from a SPARQL binding mapping.""" | ||
|
||
|
||
class InterdependentParametersException(Exception): | ||
"""Exceptiono for indicating that two or more parameters are interdependent.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
"""Pydantic Model definitions for rdfproxy.""" | ||
|
||
from typing import Generic | ||
|
||
from pydantic import BaseModel | ||
from rdfproxy.utils._types import _TModelInstance | ||
|
||
|
||
class Page(BaseModel, Generic[_TModelInstance]): | ||
"""Page model for rdfproxy pagination functionality. | ||
This model is loosely inspired by the fastapi-pagination Page class, | ||
see https://github.com/uriyyo/fastapi-pagination. | ||
Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models | ||
for Generic Pydantic models. | ||
""" | ||
|
||
items: list[_TModelInstance] | dict[str, list[_TModelInstance]] | ||
page: int | ||
size: int | ||
total: int | ||
pages: int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""SPARQL Query templates for RDFProxy paginations.""" | ||
|
||
from string import Template | ||
|
||
|
||
ungrouped_pagination_base_query = Template(""" | ||
$query | ||
limit $limit | ||
offset $offset | ||
""") |
Oops, something went wrong.