Skip to content

Commit

Permalink
Try using asyncio Events to coordinate coroutine stopping. Also, hand…
Browse files Browse the repository at this point in the history
…ling some errors seems not to work, so just bomb out and have an external thing restart the whole script.
  • Loading branch information
kdknigga committed Oct 25, 2024
1 parent 011bede commit dc80fa6
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
6 changes: 5 additions & 1 deletion rflying_tower_bot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ async def main() -> None:
post_stream = PostStream(bot_config)
inbox = Inbox(bot_config)

stop_event = asyncio.Event()

await asyncio.gather(
modlog.watch_modlog(), post_stream.watch_poststream(), inbox.watch_inbox()
modlog.watch_modlog(stop_event),
post_stream.watch_poststream(stop_event),
inbox.watch_inbox(stop_event),
)


Expand Down
15 changes: 9 additions & 6 deletions rflying_tower_bot/inbox.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A module to react to private messages."""

import asyncio
import logging
import time
from os import PathLike

from asyncpraw.models import Subreddit
Expand Down Expand Up @@ -46,12 +46,12 @@ async def do_dump_current_config(subreddit: Subreddit, path: PathLike) -> None:
"""
await dump_current_settings(subreddit, str(path))

async def watch_inbox(self) -> None:
async def watch_inbox(self, stop_event: asyncio.Event) -> None:
"""Watch the private message inbox and react to new messages."""
self.log.info("Watching the inbox for new messages")
subreddit = await self.config.reddit.subreddit(self.config.subreddit_name)
moderators = [moderator async for moderator in subreddit.moderator]
while True:
while not stop_event.is_set():
try:
# Skip existing messages to avoid processing the same message multiple times
# This is different from other streams, which do not skip existing items
Expand Down Expand Up @@ -98,12 +98,15 @@ async def watch_inbox(self) -> None:

except (RequestException, ServerError) as e:
self.log.warning(
"Server error in post stream watcher: %s. Sleeping for a bit.", e
"Server error in post stream watcher: %s. Exiting.", e
)
# Yes, I know a blocking sleep in async code is bad, but if Reddit is having a problem might as well pause the whole bot
time.sleep(60)
stop_event.set()
break
except KeyboardInterrupt:
self.log.info("Caught keyboard interrupt, exiting inbox watcher")
stop_event.set()
break
except Exception as e:
self.log.error("Error in inbox watcher: %s", e)
stop_event.set()
break
15 changes: 9 additions & 6 deletions rflying_tower_bot/modlog.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A module to react to moderator log events."""

import asyncio
import logging
import time

from asyncpraw.models import Comment, Submission, Subreddit
from asyncprawcore.exceptions import RequestException, ServerError
Expand Down Expand Up @@ -133,13 +133,13 @@ async def check_post_flair(self, post: Submission) -> None:
else:
await func(post)

async def watch_modlog(self) -> None:
async def watch_modlog(self, stop_event: asyncio.Event) -> None:
"""Watch for modlog entries and act on them when they match a rule, in an infinite loop."""
subreddit: Subreddit = await self.config.reddit.subreddit(
self.config.subreddit_name
)
self.log.info("Starting watch of %s's mod log", subreddit)
while True:
while not stop_event.is_set():
try:
async for modlog_entry in subreddit.mod.stream.log(skip_existing=True):
self.log.info(
Expand Down Expand Up @@ -169,12 +169,15 @@ async def watch_modlog(self) -> None:

except (RequestException, ServerError) as e:
self.log.warning(
"Server error in post stream watcher: %s. Sleeping for a bit.", e
"Server error in post stream watcher: %s. Exiting.", e
)
# Yes, I know a blocking sleep in async code is bad, but if Reddit is having a problem might as well pause the whole bot
time.sleep(60)
stop_event.set()
break
except KeyboardInterrupt:
self.log.info("Caught keyboard interrupt, exiting modlog watcher")
stop_event.set()
break
except Exception as e:
self.log.error("Error in modlog watcher: %s", e)
stop_event.set()
break
19 changes: 11 additions & 8 deletions rflying_tower_bot/post_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import logging
import time

from asyncpraw.exceptions import RedditAPIException
from asyncpraw.models import Comment, Subreddit
Expand Down Expand Up @@ -30,30 +29,34 @@ def __init__(self, config: BotConfig) -> None:
self.config = config
self.utilities = Utilities(config)

async def watch_poststream(self) -> None:
async def watch_poststream(self, stop_event: asyncio.Event) -> None:
"""Watch the post stream and react to new posts."""
subreddit: Subreddit = await self.config.reddit.subreddit(
self.config.subreddit_name
)
self.log.info("Watching the post stream for new posts in %s", subreddit)
while True:
while not stop_event.is_set():
try:
await self._watch_submissions(subreddit)
except (RequestException, ServerError) as e:
self.log.warning(
"Server error in post stream watcher: %s. Sleeping for a bit.", e
"Server error in post stream watcher: %s. Exiting.", e
)
# Yes, I know a blocking sleep in async code is bad, but if Reddit is having a problem might as well pause the whole bot
time.sleep(60)
break
except asyncio.CancelledError:
self.log.info("Post stream watcher cancelled, exiting")
stop_event.set()
break
except KeyboardInterrupt:
self.log.info("Caught keyboard interrupt, exiting post stream watcher")
stop_event.set()
break
except Exception as e:
self.log.error("Error in post stream watcher: %s", e, exc_info=True)
await asyncio.sleep(60)
self.log.error(
"Error in post stream watcher: %s. Exiting.", e, exc_info=True
)
stop_event.set()
break

async def _watch_submissions(self, subreddit: Subreddit) -> None:
"""Watch submissions in the subreddit."""
Expand Down

0 comments on commit dc80fa6

Please sign in to comment.