Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add poll_until_no_more_changes to SubscriptionList #35

Merged
merged 2 commits into from
May 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions macrobond_data_api/web/subscription_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from datetime import datetime, timezone, timedelta
from typing import Sequence, List, Dict
from typing import Sequence, List, Dict, Iterator

from macrobond_data_api.common.types._parse_iso8601 import _parse_iso8601

Expand All @@ -13,7 +13,7 @@ class SubscriptionList:

This class shouldn't be instantiated directly, but instead should be retrieved from a web client through `subscription_list`

Examples
Examples - for a program that polls for updates continuously
--------
```python
from datetime import datetime, timezone
Expand All @@ -26,15 +26,39 @@ class SubscriptionList:
for key, date in result.items():
print(f'Series "{key}", last updated "{date}"')
```

Examples - for a program that retrieves all updates and then exits
--------
```python
from datetime import datetime, timezone

# this should be the last time you polled the subscription list.
last_modified = previous_last_modified # or datetime.now(timezone.utc) if it's the first time you poll.

with WebClient() as api:
subscription_list = api.subscription_list(last_modified)
for result in subscription_list.poll_until_no_more_changes():
for key, date in result.items():
print(f'Series "{key}", last updated "{date}"')

# all done polling, so we can now update the last_modified time for next itme we poll.
last_modified = subscription_list.last_modified
```
"""

def __init__(self, session: Session, last_modified: datetime, poll_interval: timedelta = None):
self._session = session

self.last_modified = last_modified - timedelta(seconds=5)
"""
Stores the date for when the subscription list was last modified.
"""

self.no_more_changes = False
"""
An indicator that there are no changes at the moment.
"""

if poll_interval is None:
poll_interval = timedelta(seconds=15)

Expand Down Expand Up @@ -122,7 +146,7 @@ def poll(self) -> Dict[str, datetime]:

Returns
-------
Optional[Dict[str, datetime]]
Dict[str, datetime]]
A dictionary of primary keys that has been updated, and the corresponding last update date.
"""
if not self._session._is_open:
Expand All @@ -136,12 +160,29 @@ def poll(self) -> Dict[str, datetime]:
"v1/subscriptionlist/getupdates", params={"ifModifiedSince": self.last_modified.isoformat()}
).json()

if data["noMoreChanges"]:
self.no_more_changes = data["noMoreChanges"]
if self.no_more_changes:
self._next_poll = datetime.now(timezone.utc) + self.poll_interval

self.last_modified = _parse_iso8601(data["timeStampForIfModifiedSince"])
return {entity["name"]: _parse_iso8601(entity["modified"]) for entity in data["entities"]}

def poll_until_no_more_changes(self) -> Iterator[Dict[str, datetime]]:
"""
Polls for any changes on the series in the subscription list until thers no more changes.

Returns
-------
Iterator[Dict[str, datetime]]]
A Iterator of dictionarys of primary keys that has been updated, and the corresponding last update date.
"""
while True:
changes = self.poll()
if len(changes) > 0:
yield changes
if self.no_more_changes:
break

def _call(self, endpoint: str, keys: Sequence[str]) -> None:
if not isinstance(keys, Sequence):
raise TypeError("keys is not a sequence")
Expand Down
Loading