Skip to content

Commit

Permalink
Add poll_until_no_more_changes to SubscriptionList (#35)
Browse files Browse the repository at this point in the history
Refactor SubscriptionList class to include a new method poll_until_no_more_changes in subscription_list.py
  • Loading branch information
mb-jp authored May 7, 2024
1 parent e953c30 commit 6b907ec
Showing 1 changed file with 45 additions and 4 deletions.
49 changes: 45 additions & 4 deletions macrobond_data_api/web/subscription_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from datetime import datetime, timezone, timedelta
from typing import Sequence, List, Dict
from typing import Sequence, List, Dict, Iterator

from macrobond_data_api.common.types._parse_iso8601 import _parse_iso8601

Expand All @@ -13,7 +13,7 @@ class SubscriptionList:
This class shouldn't be instantiated directly, but instead should be retrieved from a web client through `subscription_list`
Examples
Examples - for a program that polls for updates continuously
--------
```python
from datetime import datetime, timezone
Expand All @@ -26,15 +26,39 @@ class SubscriptionList:
for key, date in result.items():
print(f'Series "{key}", last updated "{date}"')
```
Examples - for a program that retrieves all updates and then exits
--------
```python
from datetime import datetime, timezone
# this should be the last time you polled the subscription list.
last_modified = previous_last_modified # or datetime.now(timezone.utc) if it's the first time you poll.
with WebClient() as api:
subscription_list = api.subscription_list(last_modified)
for result in subscription_list.poll_until_no_more_changes():
for key, date in result.items():
print(f'Series "{key}", last updated "{date}"')
# all done polling, so we can now update the last_modified time for next itme we poll.
last_modified = subscription_list.last_modified
```
"""

def __init__(self, session: Session, last_modified: datetime, poll_interval: timedelta = None):
self._session = session

self.last_modified = last_modified - timedelta(seconds=5)
"""
Stores the date for when the subscription list was last modified.
"""

self.no_more_changes = False
"""
An indicator that there are no changes at the moment.
"""

if poll_interval is None:
poll_interval = timedelta(seconds=15)

Expand Down Expand Up @@ -122,7 +146,7 @@ def poll(self) -> Dict[str, datetime]:
Returns
-------
Optional[Dict[str, datetime]]
Dict[str, datetime]]
A dictionary of primary keys that has been updated, and the corresponding last update date.
"""
if not self._session._is_open:
Expand All @@ -136,12 +160,29 @@ def poll(self) -> Dict[str, datetime]:
"v1/subscriptionlist/getupdates", params={"ifModifiedSince": self.last_modified.isoformat()}
).json()

if data["noMoreChanges"]:
self.no_more_changes = data["noMoreChanges"]
if self.no_more_changes:
self._next_poll = datetime.now(timezone.utc) + self.poll_interval

self.last_modified = _parse_iso8601(data["timeStampForIfModifiedSince"])
return {entity["name"]: _parse_iso8601(entity["modified"]) for entity in data["entities"]}

def poll_until_no_more_changes(self) -> Iterator[Dict[str, datetime]]:
"""
Polls for any changes on the series in the subscription list until thers no more changes.
Returns
-------
Iterator[Dict[str, datetime]]]
A Iterator of dictionarys of primary keys that has been updated, and the corresponding last update date.
"""
while True:
changes = self.poll()
if len(changes) > 0:
yield changes
if self.no_more_changes:
break

def _call(self, endpoint: str, keys: Sequence[str]) -> None:
if not isinstance(keys, Sequence):
raise TypeError("keys is not a sequence")
Expand Down

0 comments on commit 6b907ec

Please sign in to comment.