Skip to content

Commit

Permalink
feat: redesign SPARQLModelAdapter class
Browse files Browse the repository at this point in the history
The redesign introduces major class API changes:

1. Initialization now takes the endpoint, query and model class directly, this simplifies
class usage and allows for better state retention in the instance.

2. functionality previously defined in
SPARQLModelAdapter.__call__ is transposed to SPARQLModelAdapter.query.

Closes: #38.
  • Loading branch information
lu-pl committed Aug 20, 2024
1 parent 00f5043 commit 1a3e657
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 171 deletions.
191 changes: 109 additions & 82 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
sparqlwrapper = "^2.0.0"
toolz = "^0.12.1"
pydantic = "^2.8.2"
typeguard = "^4.3.0"


[tool.poetry.group.dev.dependencies]
Expand Down
252 changes: 184 additions & 68 deletions rdfproxy/adapter.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,203 @@
"""SPARQLModelAdapter class for QueryResult to Pydantic model conversions."""
"""SPARQLModelAdapter class for SPARQL query result set to Pydantic model conversions."""

from collections.abc import Iterable
from typing import cast
from collections import defaultdict
from collections.abc import Iterator
import math
from typing import Any, Generic, overload

from typeguard import typechecked

from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper
from pydantic import BaseModel
from rdfproxy.utils._types import _TModelConstructorCallable, _TModelInstance
from rdfproxy.utils._exceptions import (
InterdependentParametersException,
UndefinedBindingException,
)
from rdfproxy.utils._types import _TModelInstance
from rdfproxy.utils.models import Page
from rdfproxy.utils.sparql.sparql_templates import ungrouped_pagination_base_query
from rdfproxy.utils.sparql.sparql_utils import (
calculate_offset,
construct_count_query,
construct_grouped_count_query,
construct_grouped_pagination_query,
query_with_wrapper,
temporary_query_override,
)
from rdfproxy.utils.utils import (
get_bindings_from_query_result,
instantiate_model_from_kwargs,
)


class SPARQLModelAdapter:
"""Adapter/Mapper for QueryResult to Pydantic model conversions.
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint
and map a flat SPARQL query result set to a potentially nested Pydantic model.
Example:
from SPARQLWrapper import SPARQLWrapper
from pydantic import BaseModel
from rdfproxy import SPARQLModelAdapter, _TModelInstance
class SimpleModel(BaseModel):
x: int
y: int
class NestedModel(BaseModel):
a: str
b: SimpleModel
class ComplexModel(BaseModel):
p: str
q: NestedModel
@typechecked
class SPARQLModelAdapter(Generic[_TModelInstance]):
"""Adapter/Mapper for SPARQL query result set to Pydantic model conversions.
sparql_wrapper = SPARQLWrapper("https://query.wikidata.org/bigdata/namespace/wdq/sparql")
query = '''
select ?x ?y ?a ?p
where {
values (?x ?y ?a ?p) {
(1 2 "a value" "p value")
}
}
'''
adapter = SPARQLModelAdapter(sparql_wrapper=sparql_wrapper)
models: list[_TModelInstance] = adapter(query=query, model_constructor=ComplexModel)
The rdfproxy.SPARQLModelAdapter class allows to run a query against an endpoint,
map a flat SPARQL query result set to a potentially nested Pydantic model and
optionally paginate and/or group the results by a SPARQL binding.
"""

def __init__(self, sparql_wrapper: SPARQLWrapper) -> None:
self.sparql_wrapper = sparql_wrapper

if self.sparql_wrapper.returnFormat != "json":
self.sparql_wrapper.setReturnFormat(JSON)
def __init__(
self, target: str | SPARQLWrapper, query: str, model: type[_TModelInstance]
) -> None:
self._query = query
self._model = model

def __call__(
self,
query: str,
model_constructor: type[_TModelInstance] | _TModelConstructorCallable,
) -> Iterable[_TModelInstance]:
self.sparql_wrapper: SPARQLWrapper = (
SPARQLWrapper(target) if isinstance(target, str) else target
)
self.sparql_wrapper.setReturnFormat(JSON)
self.sparql_wrapper.setQuery(query)
query_result: QueryResult = self.sparql_wrapper.query()

if isinstance(model_constructor, type(BaseModel)):
model_constructor = cast(type[_TModelInstance], model_constructor)
@overload
def query(self) -> list[_TModelInstance]: ...

@overload
def query(
self,
*,
group_by: str,
) -> dict[str, list[_TModelInstance]]: ...

bindings = get_bindings_from_query_result(query_result)
models: list[_TModelInstance] = [
instantiate_model_from_kwargs(model_constructor, **binding)
for binding in bindings
]
@overload
def query(
self,
*,
page: int,
size: int,
) -> Page[_TModelInstance]: ...

elif isinstance(model_constructor, _TModelConstructorCallable):
models: Iterable[_TModelInstance] = model_constructor(query_result)
@overload
def query(
self,
*,
page: int,
size: int,
group_by: str,
) -> Page[_TModelInstance]: ...

def query(
self,
*,
page: int | None = None,
size: int | None = None,
group_by: str | None = None,
) -> (
list[_TModelInstance] | dict[str, list[_TModelInstance]] | Page[_TModelInstance]
):
"""Run query against endpoint and map the SPARQL query result set to a Pydantic model.
Optional pagination and/or grouping by a SPARQL binding is avaible by
supplying the group_by and/or page/size parameters.
"""
match page, size, group_by:
case None, None, None:
return self._query_collect_models()
case int(), int(), None:
return self._query_paginate_ungrouped(page=page, size=size)
case None, None, str():
return self._query_group_by(group_by=group_by)
case int(), int(), str():
return self._query_paginate_grouped(
page=page, size=size, group_by=group_by
)
case (None, int(), Any()) | (int(), None, Any()):
raise InterdependentParametersException(
"Parameters 'page' and 'size' are mutually dependent."
)
case _:
raise Exception("This should never happen.")

def _query_generate_model_bindings_mapping(
self, query: str | None = None
) -> Iterator[tuple[_TModelInstance, dict[str, Any]]]:
"""Run query, construct model instances and generate a model-bindings mapping.
The query parameter defaults to the initially defined query and
is run against the endpoint defined in the SPARQLModelAdapter instance.
Note: The coupling of model instances with flat SPARQL results
allows for easier and more efficient grouping operations (see grouping functionality).
"""
if query is None:
query_result: QueryResult = self.sparql_wrapper.query()
else:
raise TypeError(
"Argument 'model_constructor' must be a model class "
"or a model constructor callable."
)

return models
with temporary_query_override(self.sparql_wrapper):
self.sparql_wrapper.setQuery(query)
query_result: QueryResult = self.sparql_wrapper.query()

_bindings = get_bindings_from_query_result(query_result)

for bindings in _bindings:
model = instantiate_model_from_kwargs(self._model, **bindings)
yield model, bindings

def _query_collect_models(self, query: str | None = None) -> list[_TModelInstance]:
"""Run query against endpoint and collect model instances."""
return [
model
for model, _ in self._query_generate_model_bindings_mapping(query=query)
]

def _query_group_by(
self, group_by: str, query: str | None = None
) -> dict[str, list[_TModelInstance]]:
"""Run query against endpoint and group results by a SPARQL binding."""
group = defaultdict(list)

for model, bindings in self._query_generate_model_bindings_mapping(query):
try:
key = bindings[group_by]
except KeyError:
raise UndefinedBindingException(
f"SPARQL binding '{group_by}' requested for grouping "
f"not in query projection '{bindings}'."
)

group[str(key)].append(model)

return group

def _get_count(self, query: str) -> int:
"""Construct a count query from the initialized query, run it and return the count result."""
result = query_with_wrapper(query=query, sparql_wrapper=self.sparql_wrapper)
return int(next(result)["cnt"])

def _query_paginate_ungrouped(self, page: int, size: int) -> Page[_TModelInstance]:
"""Run query with pagination according to page and size.
The internal query is dynamically modified according to page (offset)/size (limit)
and run with SPARQLModelAdapter._query_collect_models.
"""
paginated_query = ungrouped_pagination_base_query.substitute(
query=self._query, offset=calculate_offset(page, size), limit=size
)
count_query = construct_count_query(self._query)

items = self._query_collect_models(query=paginated_query)
total = self._get_count(count_query)
pages = math.ceil(total / size)

return Page(items=items, page=page, size=size, total=total, pages=pages)

def _query_paginate_grouped(
self, page: int, size: int, group_by: str
) -> Page[_TModelInstance]:
"""Run query with pagination according to page/size and group result by a SPARQL binding.
The internal query is dynamically modified according to page (offset)/size (limit)
and run with SPARQLModelAdapter._query_group_by.
"""
grouped_paginated_query = construct_grouped_pagination_query(
query=self._query, page=page, size=size, group_by=group_by
)
grouped_count_query = construct_grouped_count_query(
query=self._query, group_by=group_by
)

items = self._query_group_by(group_by=group_by, query=grouped_paginated_query)
total = self._get_count(grouped_count_query)
pages = math.ceil(total / size)

return Page(items=items, page=page, size=size, total=total, pages=pages)
9 changes: 9 additions & 0 deletions rdfproxy/utils/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Custom exceptions for RDFProxy."""


class UndefinedBindingException(KeyError):
"""Exception for indicating that a requested key could not be retrieved from a SPARQL binding mapping."""


class InterdependentParametersException(Exception):
"""Exceptiono for indicating that two or more parameters are interdependent."""
21 changes: 21 additions & 0 deletions rdfproxy/utils/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,24 @@ class _TModelConstructorCallable(Protocol[_TModelInstance]):
"""Callback protocol for model constructor callables."""

def __call__(self, query_result: QueryResult) -> Iterable[_TModelInstance]: ...


class SPARQLBinding(str):
"""SPARQLBinding type for explicit SPARQL binding to model field allocation.
This type's intended use is with typing.Annotated in the context of a Pyantic field definition.
Example:
class Work(BaseModel):
name: Annotated[str, SPARQLBinding("title")]
class Person(BaseModel):
name: str
work: Work
This signals to the RDFProxy SPARQL-to-model mapping logic
to use the "title" SPARQL binding (not the "name" binding) to populate the Work.name field.
"""

...
23 changes: 23 additions & 0 deletions rdfproxy/utils/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Pydantic Model definitions for rdfproxy."""

from typing import Generic

from pydantic import BaseModel
from rdfproxy.utils._types import _TModelInstance


class Page(BaseModel, Generic[_TModelInstance]):
"""Page model for rdfproxy pagination functionality.
This model is loosely inspired by the fastapi-pagination Page class,
see https://github.com/uriyyo/fastapi-pagination.
Also see https://docs.pydantic.dev/latest/concepts/models/#generic-models
for Generic Pydantic models.
"""

items: list[_TModelInstance] | dict[str, list[_TModelInstance]]
page: int
size: int
total: int
pages: int
10 changes: 10 additions & 0 deletions rdfproxy/utils/sparql/sparql_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""SPARQL Query templates for RDFProxy paginations."""

from string import Template


ungrouped_pagination_base_query = Template("""
$query
limit $limit
offset $offset
""")
Loading

0 comments on commit 1a3e657

Please sign in to comment.