Skip to content

Commit

Permalink
updated monitor for start date
Browse files Browse the repository at this point in the history
  • Loading branch information
john-friedman committed Feb 11, 2025
1 parent 6639c8e commit c63c925
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
69 changes: 57 additions & 12 deletions datamule/datamule/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ def _get_current_eastern_date():
eastern = pytz.timezone('America/New_York')
return datetime.now(eastern)

def _parse_date(date_str):
"""Parse YYYY-MM-DD date string to datetime object in Eastern timezone"""
try:
date = datetime.strptime(date_str, '%Y-%m-%d')
eastern = pytz.timezone('America/New_York')
return eastern.localize(date)
except ValueError as e:
raise ValueError(f"Invalid date format. Please use YYYY-MM-DD. Error: {str(e)}")

class PreciseRateLimiter:
def __init__(self, rate, interval=1.0):
self.rate = rate # requests per interval
Expand Down Expand Up @@ -67,7 +76,8 @@ def get_current_rates(self):
class Monitor:
def __init__(self):
self.last_total = 0
self.last_date = _get_current_eastern_date()
self.last_date = None
self.current_monitor_date = None
self.submissions = []
self.max_hits = 10000
self.limiter = PreciseRateLimiter(5) # 5 requests per second
Expand All @@ -91,16 +101,29 @@ async def _poll(self, base_url, session, poll_interval, quiet):
"""Poll API until new submissions are found."""
while True:
current_date = _get_current_eastern_date()
date_str = current_date.strftime('%Y-%m-%d')
timestamp = int(time.time()) # Add this line

if self.last_date != current_date.strftime('%Y-%m-%d'):
print(f"New date: {date_str}")
# If we're caught up to current date, use it, otherwise use our tracking date
if self.current_monitor_date.date() >= current_date.date():
self.current_monitor_date = current_date
else:
# If we're behind current date and haven't finished current date's processing,
# continue with current date
if self.last_date == self.current_monitor_date.strftime('%Y-%m-%d'):
pass
else:
# Move to next day
self.current_monitor_date += timedelta(days=1)

date_str = self.current_monitor_date.strftime('%Y-%m-%d')
timestamp = int(time.time())

if self.last_date != date_str:
print(f"Processing date: {date_str}")
self.last_total = 0
self.submissions = []
self.last_date = date_str

poll_url = f"{base_url}&startdt={date_str}&enddt={date_str}&v={timestamp}" # Modified this line
poll_url = f"{base_url}&startdt={date_str}&enddt={date_str}&v={timestamp}"
if not quiet:
print(f"Polling {poll_url}")

Expand All @@ -109,18 +132,23 @@ async def _poll(self, base_url, session, poll_interval, quiet):
if data:
current_total = data['hits']['total']['value']
if current_total > self.last_total:
print(f"Found {current_total - self.last_total} new submissions")
print(f"Found {current_total - self.last_total} new submissions for {date_str}")
self.last_total = current_total
return current_total, data, poll_url
self.last_total = current_total

# If we have no hits and we're processing a past date,
# we can move to the next day immediately
if current_total == 0 and self.current_monitor_date.date() < current_date.date():
continue

except Exception as e:
print(f"Error in poll: {str(e)}")

await asyncio.sleep(poll_interval / 1000)

async def _retrieve_batch(self, session, poll_url, from_positions, quiet):
"""Retrieve a batch of submissions concurrently."""
# The poll_url already contains the timestamp from _poll
tasks = [
self._fetch_json(
session,
Expand Down Expand Up @@ -176,11 +204,17 @@ async def _retrieve(self, poll_url, initial_data, session, quiet):

return submissions

async def _monitor(self, callback, form=None, cik=None, ticker=None, poll_interval=1000, quiet=True):
async def _monitor(self, callback, form=None, cik=None, ticker=None, start_date=None, poll_interval=1000, quiet=True):
"""Main monitoring loop with parallel processing."""
if poll_interval < 100:
raise ValueError("SEC rate limit is 10 requests per second, set poll_interval to 100ms or higher")

# Set up initial monitoring date
if start_date:
self.current_monitor_date = _parse_date(start_date)
else:
self.current_monitor_date = _get_current_eastern_date()

# Handle form parameter
if form is None:
form = ['-0']
Expand Down Expand Up @@ -233,6 +267,17 @@ async def _monitor(self, callback, form=None, cik=None, ticker=None, poll_interv

await asyncio.sleep(poll_interval / 1000)

def monitor_submissions(self, callback=None, form=None, cik=None, ticker=None, poll_interval=1000, quiet=True):
"""Start the monitoring process."""
asyncio.run(self._monitor(callback, form, cik, ticker, poll_interval, quiet))
def monitor_submissions(self, callback=None, form=None, cik=None, ticker=None, start_date=None, poll_interval=1000, quiet=True):
"""
Start the monitoring process.
Parameters:
callback (callable, optional): Function to call when new submissions are found
form (str or list, optional): Form type(s) to monitor
cik (str or list, optional): CIK(s) to monitor
ticker (str, optional): Ticker symbol to monitor
start_date (str, optional): Start date in YYYY-MM-DD format
poll_interval (int, optional): Polling interval in milliseconds
quiet (bool, optional): Suppress verbose output
"""
asyncio.run(self._monitor(callback, form, cik, ticker, start_date, poll_interval, quiet))
2 changes: 1 addition & 1 deletion datamule/docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Changelog
=========
v01.0.3 2025-02-06
v01.0.3 2025-02-10
- modified item mapping dict regex to be more robust.

v01.0.2 2025-02-06
Expand Down
2 changes: 1 addition & 1 deletion datamule/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
setup(
name="datamule",
author="John Friedman",
version="1.0.2",
version="1.0.3",
description="Making it easier to use SEC filings.",
packages=find_namespace_packages(include=['datamule*']),
url="https://github.com/john-friedman/datamule-python",
Expand Down

0 comments on commit c63c925

Please sign in to comment.