Skip to content

Commit

Permalink
✨ Adds support for paginated storage queries, and implements paginati…
Browse files Browse the repository at this point in the history
…on for the wallets_list endpoint (#3000)

* 🎨 name the scan args

Signed-off-by: ff137 <[email protected]>

* 🎨 add return types and arg docstrings

Signed-off-by: ff137 <[email protected]>

* ✅ fix assertion

Signed-off-by: ff137 <[email protected]>

* :spark: implement `find_paginated_records`

Signed-off-by: ff137 <[email protected]>

* ✨ add `limit` and `offset` to BaseRecord.query, calling `find_paginated_records` if requested

Signed-off-by: ff137 <[email protected]>

* ✨ add `limit` and `offset` to wallet list query. define `MAXIMUM_PAGE_SIZE` and use in validation. use `limit` and `offset` in records query

Signed-off-by: ff137 <[email protected]>

* 🎨 organise imports

Signed-off-by: ff137 <[email protected]>

* ✅ fix assertion

Signed-off-by: ff137 <[email protected]>

* 🎨 correction: call `scan` on `store`

Signed-off-by: ff137 <[email protected]>

* 👷 Don't run integration tests when PR is in draft

Signed-off-by: ff137 <[email protected]>

* 🎨 call `scan` on `profile.opened.store`

Signed-off-by: ff137 <[email protected]>

* 🎨 remove unused options arg

Signed-off-by: ff137 <[email protected]>

* 🎨 NB: remove unused option `retrieveTags`. Unimplemented / has no effect

Signed-off-by: ff137 <[email protected]>

* 🎨 fix accessor for the store instance

Signed-off-by: ff137 <[email protected]>

* 🎨 add direct instantiation of `self._profile`, so that `self._profile.store` reference is resolvable

Signed-off-by: ff137 <[email protected]>

* ✨ add optional limit and offset to AskarStorageSearchSession.fetch

Signed-off-by: ff137 <[email protected]>

* ✨ implement PaginatedQuerySchema class for deduplication

Signed-off-by: ff137 <[email protected]>

* ✅ add tests for BaseRecord.query when called with limit or offset

Signed-off-by: ff137 <[email protected]>

* 🎨 organise imports

Signed-off-by: ff137 <[email protected]>

* 🎨 efficient subset of OrderedDict values

Signed-off-by: ff137 <[email protected]>

* ✅ test coverage for find_paginated_records

Signed-off-by: ff137 <[email protected]>

* 🎨 organise imports

Signed-off-by: ff137 <[email protected]>

* ✅ test coverage for PaginatedQuerySchema

Signed-off-by: ff137 <[email protected]>

* 🎨 fix marshmallow warning

Signed-off-by: ff137 <[email protected]>

* 🐛 fix reference to store, and add _profile to init for type hint

Signed-off-by: ff137 <[email protected]>

* Update lock file

Signed-off-by: ff137 <[email protected]>

---------

Signed-off-by: ff137 <[email protected]>
Co-authored-by: jamshale <[email protected]>
  • Loading branch information
ff137 and jamshale authored Jun 11, 2024
1 parent db54bf7 commit 586a30d
Show file tree
Hide file tree
Showing 16 changed files with 541 additions and 218 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integrationtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defaults:
jobs:
test:
runs-on: ubuntu-latest
if: (github.event_name == 'pull_request' && github.repository == 'hyperledger/aries-cloudagent-python') || (github.event_name != 'pull_request')
if: (github.event_name == 'pull_request' && !github.event.pull_request.draft && github.repository == 'hyperledger/aries-cloudagent-python') || (github.event_name != 'pull_request')
outputs:
is_release: ${{ steps.check_if_release.outputs.is_release }}
steps:
Expand Down
20 changes: 10 additions & 10 deletions aries_cloudagent/anoncreds/holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ async def get_credentials(self, start: int, count: int, wql: dict):

try:
rows = self.profile.store.scan(
CATEGORY_CREDENTIAL,
wql,
start,
count,
self.profile.settings.get("wallet.askar_profile"),
category=CATEGORY_CREDENTIAL,
tag_filter=wql,
offset=start,
limit=count,
profile=self.profile.settings.get("wallet.askar_profile"),
)
async for row in rows:
cred = Credential.load(row.raw_value)
Expand Down Expand Up @@ -424,11 +424,11 @@ async def get_credentials_for_presentation_request_by_referent(
tag_filter = {"$and": [tag_filter, extra_query]}

rows = self.profile.store.scan(
CATEGORY_CREDENTIAL,
tag_filter,
start,
count,
self.profile.settings.get("wallet.askar_profile"),
category=CATEGORY_CREDENTIAL,
tag_filter=tag_filter,
offset=start,
limit=count,
profile=self.profile.settings.get("wallet.askar_profile"),
)
async for row in rows:
if row.name in creds:
Expand Down
3 changes: 2 additions & 1 deletion aries_cloudagent/askar/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def __init__(
self._opener = self.profile.store.transaction(profile.profile_id)
else:
self._opener = self.profile.store.session(profile.profile_id)
self._profile = profile
self._handle: Session = None
self._acquire_start: float = None
self._acquire_end: float = None
Expand All @@ -219,7 +220,7 @@ def handle(self) -> Session:
@property
def store(self) -> Store:
"""Accessor for the Store instance."""
return self._handle and self._handle.store
return self._profile and self._profile.store

@property
def is_transaction(self) -> bool:
Expand Down
3 changes: 2 additions & 1 deletion aries_cloudagent/askar/profile_anon.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def __init__(
self._opener = self.profile.store.transaction(profile.profile_id)
else:
self._opener = self.profile.store.session(profile.profile_id)
self._profile = profile
self._handle: Session = None
self._acquire_start: float = None
self._acquire_end: float = None
Expand All @@ -214,7 +215,7 @@ def handle(self) -> Session:
@property
def store(self) -> Store:
"""Accessor for the Store instance."""
return self._handle and self._handle.store
return self._profile and self._profile.store

@property
def is_transaction(self) -> bool:
Expand Down
20 changes: 10 additions & 10 deletions aries_cloudagent/indy/credx/holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ async def get_credentials(self, start: int, count: int, wql: dict):

try:
rows = self._profile.store.scan(
CATEGORY_CREDENTIAL,
wql,
start,
count,
self._profile.settings.get("wallet.askar_profile"),
category=CATEGORY_CREDENTIAL,
tag_filter=wql,
offset=start,
limit=count,
profile=self._profile.settings.get("wallet.askar_profile"),
)
async for row in rows:
cred = Credential.load(row.raw_value)
Expand Down Expand Up @@ -321,11 +321,11 @@ async def get_credentials_for_presentation_request_by_referent(
tag_filter = {"$and": [tag_filter, extra_query]}

rows = self._profile.store.scan(
CATEGORY_CREDENTIAL,
tag_filter,
start,
count,
self._profile.settings.get("wallet.askar_profile"),
category=CATEGORY_CREDENTIAL,
tag_filter=tag_filter,
offset=start,
limit=count,
profile=self._profile.settings.get("wallet.askar_profile"),
)
async for row in rows:
if row.name in creds:
Expand Down
35 changes: 27 additions & 8 deletions aries_cloudagent/messaging/models/base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from ...cache.base import BaseCache
from ...config.settings import BaseSettings
from ...core.profile import ProfileSession
from ...storage.base import BaseStorage, StorageDuplicateError, StorageNotFoundError
from ...storage.base import (
DEFAULT_PAGE_SIZE,
BaseStorage,
StorageDuplicateError,
StorageNotFoundError,
)
from ...storage.record import StorageRecord
from ..util import datetime_to_str, time_now
from ..valid import INDY_ISO8601_DATETIME_EXAMPLE, INDY_ISO8601_DATETIME_VALIDATE
Expand Down Expand Up @@ -228,7 +233,7 @@ async def retrieve_by_id(

storage = session.inject(BaseStorage)
result = await storage.get_record(
cls.RECORD_TYPE, record_id, {"forUpdate": for_update, "retrieveTags": False}
cls.RECORD_TYPE, record_id, options={"forUpdate": for_update}
)
vals = json.loads(result.value)
return cls.from_storage(record_id, vals)
Expand All @@ -255,7 +260,7 @@ async def retrieve_by_tag_filter(
rows = await storage.find_all_records(
cls.RECORD_TYPE,
cls.prefix_tag_filter(tag_filter),
options={"forUpdate": for_update, "retrieveTags": False},
options={"forUpdate": for_update},
)
found = None
for record in rows:
Expand Down Expand Up @@ -284,6 +289,8 @@ async def query(
session: ProfileSession,
tag_filter: dict = None,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
post_filter_positive: dict = None,
post_filter_negative: dict = None,
alt: bool = False,
Expand All @@ -293,18 +300,30 @@ async def query(
Args:
session: The profile session to use
tag_filter: An optional dictionary of tag filter clauses
limit: The maximum number of records to retrieve
offset: The offset to start retrieving records from
post_filter_positive: Additional value filters to apply matching positively
post_filter_negative: Additional value filters to apply matching negatively
alt: set to match any (positive=True) value or miss all (positive=False)
values in post_filter
"""

storage = session.inject(BaseStorage)
rows = await storage.find_all_records(
cls.RECORD_TYPE,
cls.prefix_tag_filter(tag_filter),
options={"retrieveTags": False},
)

tag_query = cls.prefix_tag_filter(tag_filter)
if limit is not None or offset is not None:
rows = await storage.find_paginated_records(
type_filter=cls.RECORD_TYPE,
tag_query=tag_query,
limit=limit or DEFAULT_PAGE_SIZE,
offset=offset or 0,
)
else:
rows = await storage.find_all_records(
type_filter=cls.RECORD_TYPE,
tag_query=tag_query,
)

result = []
for record in rows:
vals = json.loads(record.value)
Expand Down
31 changes: 31 additions & 0 deletions aries_cloudagent/messaging/models/paginated_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Class for paginated query parameters."""

from marshmallow import fields

from aries_cloudagent.storage.base import DEFAULT_PAGE_SIZE, MAXIMUM_PAGE_SIZE

from ...messaging.models.openapi import OpenAPISchema


class PaginatedQuerySchema(OpenAPISchema):
"""Parameters for paginated queries."""

limit = fields.Int(
required=False,
load_default=DEFAULT_PAGE_SIZE,
validate=lambda x: x > 0 and x <= MAXIMUM_PAGE_SIZE,
metadata={"description": "Number of results to return", "example": 50},
error_messages={
"validator_failed": (
"Value must be greater than 0 and "
f"less than or equal to {MAXIMUM_PAGE_SIZE}"
)
},
)
offset = fields.Int(
required=False,
load_default=0,
validate=lambda x: x >= 0,
metadata={"description": "Offset for pagination", "example": 0},
error_messages={"validator_failed": "Value must be 0 or greater"},
)
110 changes: 100 additions & 10 deletions aries_cloudagent/messaging/models/tests/test_base_record.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import json

from aries_cloudagent.tests import mock
from unittest import IsolatedAsyncioTestCase

from marshmallow import EXCLUDE, fields

from aries_cloudagent.tests import mock

from ....cache.base import BaseCache
from ....core.event_bus import EventBus, MockEventBus, Event
from ....core.event_bus import Event, EventBus, MockEventBus
from ....core.in_memory import InMemoryProfile
from ....messaging.models.base import BaseModelError
from ....storage.base import (
DEFAULT_PAGE_SIZE,
BaseStorage,
StorageDuplicateError,
StorageRecord,
)
from ....messaging.models.base import BaseModelError

from ...util import time_now

from ..base_record import BaseRecord, BaseRecordSchema


Expand Down Expand Up @@ -170,7 +170,8 @@ async def test_query(self):
mock_storage.find_all_records.return_value = [stored]
result = await BaseRecordImpl.query(session, tag_filter)
mock_storage.find_all_records.assert_awaited_once_with(
BaseRecordImpl.RECORD_TYPE, tag_filter, options={"retrieveTags": False}
type_filter=BaseRecordImpl.RECORD_TYPE,
tag_query=tag_filter,
)
assert result and isinstance(result[0], BaseRecordImpl)
assert result[0]._id == record_id
Expand Down Expand Up @@ -220,9 +221,8 @@ async def test_query_post_filter(self):
session, tag_filter, post_filter_positive={"a": "one"}
)
mock_storage.find_all_records.assert_awaited_once_with(
ARecordImpl.RECORD_TYPE,
tag_filter,
options={"retrieveTags": False},
type_filter=ARecordImpl.RECORD_TYPE,
tag_query=tag_filter,
)
assert result and isinstance(result[0], ARecordImpl)
assert result[0]._id == record_id
Expand Down Expand Up @@ -323,3 +323,93 @@ async def test_tag_prefix(self):
assert UnencTestImpl.prefix_tag_filter(tags) == {
"$or": [{"~a": "x"}, {"c": "z"}]
}

async def test_query_with_limit(self):
session = InMemoryProfile.test_session()
mock_storage = mock.MagicMock(BaseStorage, autospec=True)
session.context.injector.bind_instance(BaseStorage, mock_storage)
record_id = "record_id"
a_record = ARecordImpl(ident=record_id, a="one", b="two", code="red")
record_value = a_record.record_value
record_value.update({"created_at": time_now(), "updated_at": time_now()})
tag_filter = {"code": "red"}
stored = StorageRecord(
ARecordImpl.RECORD_TYPE,
json.dumps(record_value),
{"code": "red"},
record_id,
)
mock_storage.find_paginated_records.return_value = [stored]

# Query with limit
result = await ARecordImpl.query(session, tag_filter, limit=10)
mock_storage.find_paginated_records.assert_awaited_once_with(
type_filter=ARecordImpl.RECORD_TYPE,
tag_query=tag_filter,
limit=10,
offset=0,
)
assert result and isinstance(result[0], ARecordImpl)
assert result[0]._id == record_id
assert result[0].value == record_value
assert result[0].a == "one"

async def test_query_with_offset(self):
session = InMemoryProfile.test_session()
mock_storage = mock.MagicMock(BaseStorage, autospec=True)
session.context.injector.bind_instance(BaseStorage, mock_storage)
record_id = "record_id"
a_record = ARecordImpl(ident=record_id, a="one", b="two", code="red")
record_value = a_record.record_value
record_value.update({"created_at": time_now(), "updated_at": time_now()})
tag_filter = {"code": "red"}
stored = StorageRecord(
ARecordImpl.RECORD_TYPE,
json.dumps(record_value),
{"code": "red"},
record_id,
)
mock_storage.find_paginated_records.return_value = [stored]

# Query with offset
result = await ARecordImpl.query(session, tag_filter, offset=10)
mock_storage.find_paginated_records.assert_awaited_once_with(
type_filter=ARecordImpl.RECORD_TYPE,
tag_query=tag_filter,
limit=DEFAULT_PAGE_SIZE,
offset=10,
)
assert result and isinstance(result[0], ARecordImpl)
assert result[0]._id == record_id
assert result[0].value == record_value
assert result[0].a == "one"

async def test_query_with_limit_and_offset(self):
session = InMemoryProfile.test_session()
mock_storage = mock.MagicMock(BaseStorage, autospec=True)
session.context.injector.bind_instance(BaseStorage, mock_storage)
record_id = "record_id"
a_record = ARecordImpl(ident=record_id, a="one", b="two", code="red")
record_value = a_record.record_value
record_value.update({"created_at": time_now(), "updated_at": time_now()})
tag_filter = {"code": "red"}
stored = StorageRecord(
ARecordImpl.RECORD_TYPE,
json.dumps(record_value),
{"code": "red"},
record_id,
)
mock_storage.find_paginated_records.return_value = [stored]

# Query with limit and offset
result = await ARecordImpl.query(session, tag_filter, limit=10, offset=5)
mock_storage.find_paginated_records.assert_awaited_once_with(
type_filter=ARecordImpl.RECORD_TYPE,
tag_query=tag_filter,
limit=10,
offset=5,
)
assert result and isinstance(result[0], ARecordImpl)
assert result[0]._id == record_id
assert result[0].value == record_value
assert result[0].a == "one"
Loading

0 comments on commit 586a30d

Please sign in to comment.