Skip to content

Commit

Permalink
feat: Implement ungrouped SPARQLModelAdapter.query_paginate
Browse files Browse the repository at this point in the history
Implement SPARQLModelAdapter.query_paginate for ungrouped pagination.
For ungrouped pagination the internal query is temporarily modified
with page/offset and size/limit according to the page and size
parameters.

For FastAPI and pagination see https://lewoudar.medium.com/fastapi-and-pagination-d27ad52983a.

Closes #43.
  • Loading branch information
lu-pl committed Aug 4, 2024
1 parent 7044bc1 commit 942865c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
55 changes: 51 additions & 4 deletions rdfproxy/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

from collections import defaultdict
from collections.abc import Iterator
from typing import Any
from typing import Any, overload

from SPARQLWrapper import JSON, QueryResult, SPARQLWrapper
from pydantic import BaseModel
from rdfproxy.utils._exceptions import UndefinedBindingException
from rdfproxy.utils._types import _TModelInstance
from rdfproxy.utils.sparql_templates import ungrouped_pagination_base_query
from rdfproxy.utils.utils import (
get_bindings_from_query_result,
instantiate_model_from_kwargs,
temporary_query_override,
)


Expand Down Expand Up @@ -83,10 +85,9 @@ def _run_query(self) -> Iterator[tuple[BaseModel, dict[str, Any]]]:
model = instantiate_model_from_kwargs(self._model, **bindings)
yield model, bindings

def query(self) -> Iterator[BaseModel]:
def query(self) -> list[BaseModel]:
"""Run query against endpoint, map SPARQL result sets to model and return model instances."""
for model, _ in self._run_query():
yield model
return [model for model, _ in self._run_query()]

def query_group_by(self, group_by: str) -> dict[str, list[BaseModel]]:
"""Run query against endpoint like SPARQLModelAdapter.query but group results by a SPARQL binding.
Expand Down Expand Up @@ -130,3 +131,49 @@ def query_group_by(self, group_by: str) -> dict[str, list[BaseModel]]:
group[str(key)].append(model)

return group

@staticmethod
def _calculate_offset(page: int, size: int) -> int:
"""Calculate offset value for paginated SPARQL templates."""
match page:
case 1:
return 0
case 2:
return size
case _:
return size * page

def _query_paginate_ungrouped(self, page: int, size: int):
"""Run query with pagination according to page and size.
This method is intended to be part of the public SPARQLModelAdapter.query_paginate method.
The internal query is dynamically modified according to page/offset and size/limit
and run with SPARQLModelAdapter.query.
"""
paginated_query = ungrouped_pagination_base_query.substitute(
query=self._query, offset=self._calculate_offset(page, size), limit=size
)

with temporary_query_override(self.sparql_wrapper):
self.sparql_wrapper.setQuery(paginated_query)
return self.query()

@overload
def query_paginate(
self, page: int, size: int, group_by: None = None
) -> list[BaseModel]: ...

@overload
def query_paginate(
self, page: int, size: int, group_by: str
) -> dict[str, list[BaseModel]]: ...

def query_paginate(
self, page: int, size: int, group_by: str | None = None
) -> list[BaseModel] | dict[str, list[BaseModel]]:
"""Run query with pagination according to page and size and optional grouping."""
if group_by is None:
return self._query_paginate_ungrouped(page=page, size=size)
else:
raise NotImplementedError
10 changes: 10 additions & 0 deletions rdfproxy/utils/sparql_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""SPARQL Query templates for RDFProxy paginations."""

from string import Template


ungrouped_pagination_base_query = Template("""
$query
limit $limit
offset $offset
""")

0 comments on commit 942865c

Please sign in to comment.