-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: redesign SPARQLModelAdapter class
The redesign introduces major class API changes: 1. Initialization now takes the endpoint, query and model class directly, this simplifies class usage and allows for better state retention in the instance. 2. functionality previously defined in SPARQLModelAdapter.__call__ is transposed to SPARQLModelAdapter.query. Closes: #38.
- Loading branch information
Showing
9 changed files
with
407 additions
and
101 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,87 +1,200 @@ | ||
"""SPARQLModelAdapter class for QueryResult to Pydantic model conversions.""" | ||
"""SPARQLModelAdapter class for SPARQL query result set to Pydantic model conversions.""" | ||
|
||
from collections.abc import Iterable | ||
from typing import cast | ||
from collections import defaultdict | ||
from collections.abc import Iterator | ||
import math | ||
from typing import Any, Generic, overload | ||
|
||
from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper | ||
from pydantic import BaseModel | ||
from rdfproxy.utils._types import _TModelConstructorCallable, _TModelInstance | ||
from rdfproxy.utils._exceptions import ( | ||
InterdependentParametersException, | ||
UndefinedBindingException, | ||
) | ||
from rdfproxy.utils._types import _TModelInstance | ||
from rdfproxy.utils.models import Page | ||
from rdfproxy.utils.sparql.sparql_templates import ungrouped_pagination_base_query | ||
from rdfproxy.utils.sparql.sparql_utils import ( | ||
calculate_offset, | ||
construct_count_query, | ||
construct_grouped_count_query, | ||
construct_grouped_pagination_query, | ||
query_with_wrapper, | ||
temporary_query_override, | ||
) | ||
from rdfproxy.utils.utils import ( | ||
get_bindings_from_query_result, | ||
instantiate_model_from_kwargs, | ||
) | ||
|
||
|
||
class SPARQLModelAdapter: | ||
"""Adapter/Mapper for QueryResult to Pydantic model conversions. | ||
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint | ||
and map a flat SPARQL query result set to a potentially nested Pydantic model. | ||
Example: | ||
from SPARQLWrapper import SPARQLWrapper | ||
from pydantic import BaseModel | ||
from rdfproxy import SPARQLModelAdapter, _TModelInstance | ||
class SimpleModel(BaseModel): | ||
x: int | ||
y: int | ||
class NestedModel(BaseModel): | ||
a: str | ||
b: SimpleModel | ||
class ComplexModel(BaseModel): | ||
p: str | ||
q: NestedModel | ||
sparql_wrapper = SPARQLWrapper("https://query.wikidata.org/bigdata/namespace/wdq/sparql") | ||
class SPARQLModelAdapter(Generic[_TModelInstance]): | ||
"""Adapter/Mapper for SPARQL query result set to Pydantic model conversions. | ||
query = ''' | ||
select ?x ?y ?a ?p | ||
where { | ||
values (?x ?y ?a ?p) { | ||
(1 2 "a value" "p value") | ||
} | ||
} | ||
''' | ||
adapter = SPARQLModelAdapter(sparql_wrapper=sparql_wrapper) | ||
models: list[_TModelInstance] = adapter(query=query, model_constructor=ComplexModel) | ||
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint, | ||
map a flat SPARQL query result set to a potentially nested Pydantic model and | ||
optionally paginate and/or group the results by a SPARQL binding. | ||
""" | ||
|
||
def __init__(self, sparql_wrapper: SPARQLWrapper) -> None: | ||
self.sparql_wrapper = sparql_wrapper | ||
|
||
if self.sparql_wrapper.returnFormat != "json": | ||
self.sparql_wrapper.setReturnFormat(JSON) | ||
def __init__( | ||
self, target: str | SPARQLWrapper, query: str, model: type[_TModelInstance] | ||
) -> None: | ||
self._query = query | ||
self._model = model | ||
|
||
def __call__( | ||
self, | ||
query: str, | ||
model_constructor: type[_TModelInstance] | _TModelConstructorCallable, | ||
) -> Iterable[_TModelInstance]: | ||
self.sparql_wrapper: SPARQLWrapper = ( | ||
SPARQLWrapper(target) if isinstance(target, str) else target | ||
) | ||
self.sparql_wrapper.setReturnFormat(JSON) | ||
self.sparql_wrapper.setQuery(query) | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
|
||
if isinstance(model_constructor, type(BaseModel)): | ||
model_constructor = cast(type[_TModelInstance], model_constructor) | ||
@overload | ||
def query(self) -> list[_TModelInstance]: ... | ||
|
||
bindings = get_bindings_from_query_result(query_result) | ||
models: list[_TModelInstance] = [ | ||
instantiate_model_from_kwargs(model_constructor, **binding) | ||
for binding in bindings | ||
] | ||
@overload | ||
def query( | ||
self, | ||
*, | ||
group_by: str, | ||
) -> dict[str, list[_TModelInstance]]: ... | ||
|
||
elif isinstance(model_constructor, _TModelConstructorCallable): | ||
models: Iterable[_TModelInstance] = model_constructor(query_result) | ||
@overload | ||
def query( | ||
self, | ||
*, | ||
page: int, | ||
size: int, | ||
) -> Page[_TModelInstance]: ... | ||
|
||
else: | ||
raise TypeError( | ||
"Argument 'model_constructor' must be a model class " | ||
"or a model constructor callable." | ||
) | ||
@overload | ||
def query( | ||
self, | ||
*, | ||
page: int, | ||
size: int, | ||
group_by: str, | ||
) -> Page[_TModelInstance]: ... | ||
|
||
return models | ||
def query( | ||
self, | ||
*, | ||
page: int | None = None, | ||
size: int | None = None, | ||
group_by: str | None = None, | ||
) -> ( | ||
list[_TModelInstance] | dict[str, list[_TModelInstance]] | Page[_TModelInstance] | ||
): | ||
"""Run query against endpoint and map the SPARQL query result set to a Pydantic model. | ||
Optional pagination and/or grouping by a SPARQL binding is avaible by | ||
supplying the group_by and/or page/size parameters. | ||
""" | ||
match page, size, group_by: | ||
case None, None, None: | ||
return self._query_collect_models() | ||
case int(), int(), None: | ||
return self._query_paginate_ungrouped(page=page, size=size) | ||
case None, None, str(): | ||
return self._query_group_by(group_by=group_by) | ||
case int(), int(), str(): | ||
return self._query_paginate_grouped( | ||
page=page, size=size, group_by=group_by | ||
) | ||
case (None, int(), Any()) | (int(), None, Any()): | ||
raise InterdependentParametersException( | ||
"Parameters 'page' and 'size' are mutually dependent." | ||
) | ||
case _: | ||
raise Exception("This should never happen.") | ||
|
||
def _query_generate_model_bindings_mapping( | ||
self, query: str | None = None | ||
) -> Iterator[tuple[_TModelInstance, dict[str, Any]]]: | ||
"""Run query, construct model instances and generate a model-bindings mapping. | ||
The query parameter defaults to the initially defined query and | ||
is run against the endpoint defined in the SPARQLModelAdapter instance. | ||
Note: The coupling of model instances with flat SPARQL results | ||
allows for easier and more efficient grouping operations (see grouping functionality). | ||
""" | ||
if query is None: | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
else: | ||
with temporary_query_override(self.sparql_wrapper): | ||
self.sparql_wrapper.setQuery(query) | ||
query_result: QueryResult = self.sparql_wrapper.query() | ||
|
||
_bindings = get_bindings_from_query_result(query_result) | ||
|
||
for bindings in _bindings: | ||
model = instantiate_model_from_kwargs(self._model, **bindings) | ||
yield model, bindings | ||
|
||
def _query_collect_models(self, query: str | None = None) -> list[_TModelInstance]: | ||
"""Run query against endpoint and collect model instances.""" | ||
return [ | ||
model | ||
for model, _ in self._query_generate_model_bindings_mapping(query=query) | ||
] | ||
|
||
def _query_group_by( | ||
self, group_by: str, query: str | None = None | ||
) -> dict[str, list[_TModelInstance]]: | ||
"""Run query against endpoint and group results by a SPARQL binding.""" | ||
group = defaultdict(list) | ||
|
||
for model, bindings in self._query_generate_model_bindings_mapping(query): | ||
try: | ||
key = bindings[group_by] | ||
except KeyError: | ||
raise UndefinedBindingException( | ||
f"SPARQL binding '{group_by}' requested for grouping " | ||
f"not in query projection '{bindings}'." | ||
) | ||
|
||
group[str(key)].append(model) | ||
|
||
return group | ||
|
||
def _get_count(self, query: str) -> int: | ||
"""Construct a count query from the initialized query, run it and return the count result.""" | ||
result = query_with_wrapper(query=query, sparql_wrapper=self.sparql_wrapper) | ||
return int(next(result)["cnt"]) | ||
|
||
def _query_paginate_ungrouped(self, page: int, size: int) -> Page[_TModelInstance]: | ||
"""Run query with pagination according to page and size. | ||
The internal query is dynamically modified according to page (offset)/size (limit) | ||
and run with SPARQLModelAdapter._query_collect_models. | ||
""" | ||
paginated_query = ungrouped_pagination_base_query.substitute( | ||
query=self._query, offset=calculate_offset(page, size), limit=size | ||
) | ||
count_query = construct_count_query(self._query) | ||
|
||
items = self._query_collect_models(query=paginated_query) | ||
total = self._get_count(count_query) | ||
pages = math.ceil(total / size) | ||
|
||
return Page(items=items, page=page, size=size, total=total, pages=pages) | ||
|
||
def _query_paginate_grouped( | ||
self, page: int, size: int, group_by: str | ||
) -> Page[_TModelInstance]: | ||
"""Run query with pagination according to page/size and group result by a SPARQL binding. | ||
The internal query is dynamically modified according to page (offset)/size (limit) | ||
and run with SPARQLModelAdapter._query_group_by. | ||
""" | ||
grouped_paginated_query = construct_grouped_pagination_query( | ||
query=self._query, page=page, size=size, group_by=group_by | ||
) | ||
grouped_count_query = construct_grouped_count_query( | ||
query=self._query, group_by=group_by | ||
) | ||
|
||
items = self._query_group_by(group_by=group_by, query=grouped_paginated_query) | ||
total = self._get_count(grouped_count_query) | ||
pages = math.ceil(total / size) | ||
|
||
return Page(items=items, page=page, size=size, total=total, pages=pages) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
"""Custom exceptions for RDFProxy.""" | ||
|
||
|
||
class UndefinedBindingException(KeyError): | ||
"""Exception for indicating that a requested key could not be retrieved from a SPARQL binding mapping.""" | ||
|
||
|
||
class InterdependentParametersException(Exception): | ||
"""Exceptiono for indicating that two or more parameters are interdependent.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
"""Pydantic Model definitions for rdfproxy.""" | ||
|
||
from typing import Generic | ||
|
||
from pydantic import BaseModel | ||
from rdfproxy.utils._types import _TModelInstance | ||
|
||
|
||
class Page(BaseModel, Generic[_TModelInstance]): | ||
"""Page model for rdfproxy pagination functionality. | ||
This model is loosely inspired by the fastapi-pagination Page class, | ||
see https://github.com/uriyyo/fastapi-pagination. | ||
Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models | ||
for Generic Pydantic models. | ||
""" | ||
|
||
items: list[_TModelInstance] | dict[str, list[_TModelInstance]] | ||
page: int | ||
size: int | ||
total: int | ||
pages: int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""SPARQL Query templates for RDFProxy paginations.""" | ||
|
||
from string import Template | ||
|
||
|
||
ungrouped_pagination_base_query = Template(""" | ||
$query | ||
limit $limit | ||
offset $offset | ||
""") |
Oops, something went wrong.