Skip to content

Commit

Permalink
updating the code with exponential backoff for when it tries to retri…
Browse files Browse the repository at this point in the history
…eve a user
  • Loading branch information
modernNeo committed Mar 1, 2025
1 parent 9fbb527 commit bea3b20
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .wall_e_models
216 changes: 121 additions & 95 deletions wall_e/extensions/leveling.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@
import discord
import pytz
from discord import NotFound, app_commands, Guild
from discord.errors import DiscordServerError, HTTPException
from discord.errors import DiscordException
from discord.ext import commands, tasks

from utilities.global_vars import bot, wall_e_config

from wall_e_models.models import Level, UserPoint, UpdatedUser, ProfileBucketInProgress

from utilities.embed import embed, COLOUR_MAPPING, WallEColour
from utilities.file_uploading import start_file_uploading
from utilities.global_vars import bot, wall_e_config
from utilities.paginate import paginate_embed
from utilities.setup_logger import Loggers

BUGGY_USER = [234501345749630976]
from wall_e_models.models import Level, UserPoint, UpdatedUser, ProfileBucketInProgress


class Leveling(commands.Cog):
Expand Down Expand Up @@ -53,6 +49,7 @@ def __init__(self):
self.council_channel = None
self.levelling_website_avatar_channel = None
self.bucket_update_in_progress = False
self.NUMBER_OF_RETRIEVAL_ATTEMPTS_PER_USER = 15
self.ensure_xp_roles_exist_and_have_right_users.start()
self.process_leveling_profile_data_for_lurkers.start()
self.process_outdated_profile_pics.start()
Expand All @@ -67,7 +64,8 @@ async def upload_debug_logs(self):
while self.guild is None:
await asyncio.sleep(2)
await start_file_uploading(
self.logger, self.guild, bot, wall_e_config, self.debug_log_file_absolute_path, "leveling_debug"
self.logger, self.guild, bot, wall_e_config, self.debug_log_file_absolute_path,
"leveling_debug"
)

@commands.Cog.listener(name="on_ready")
Expand All @@ -84,7 +82,8 @@ async def upload_error_logs(self):
while self.guild is None:
await asyncio.sleep(2)
await start_file_uploading(
self.logger, self.guild, bot, wall_e_config, self.error_log_file_absolute_path, "leveling_error"
self.logger, self.guild, bot, wall_e_config, self.error_log_file_absolute_path,
"leveling_error"
)

@commands.Cog.listener(name="on_ready")
Expand Down Expand Up @@ -558,7 +557,7 @@ async def process_leveling_profile_data_for_lurkers(self):
f"[Leveling process_leveling_profile_data_for_lurkers()] {user_ids_to_update} "
f"potential updates retrieved for bucket {entry.bucket_number_completed}"
)
await self._update_users(self.process_lurkers_logger, user_ids_to_update)
await self._update_users_with_given_ids(self.process_lurkers_logger, user_ids_to_update)
await ProfileBucketInProgress.async_save(entry)

async def _set_bucket_numbers(self, logger):
Expand Down Expand Up @@ -671,37 +670,44 @@ async def _get_current_bucket_number(self) -> ProfileBucketInProgress:
entry.bucket_number_completed = 1
return entry

async def _update_users(self, logger, updated_user_ids):
async def _update_users_with_given_ids(self, logger, updated_user_ids):
"""
iterates through the given list of user_ids and updates them
:param updated_user_ids:
:return:
"""
total_number_of_updates_needed = len(updated_user_ids)
for index, user_id in enumerate(updated_user_ids):
if self.user_points[user_id].being_processed:
continue
self.user_points[user_id].being_processed = True
successful = False
logger.debug(
f"[Leveling _update_users()] attempting to get updated user_point profile data for member {user_id} "
f"{index + 1}/{total_number_of_updates_needed} "
f"[Leveling _update_users_with_given_ids()] attempt "
f"{self.user_points[user_id].leveling_update_attempt} to get updated user_point profile data for "
f"member {user_id} {index + 1}/{total_number_of_updates_needed} "
)
member = None
try:
member = await self.guild.fetch_member(user_id)
except (NotFound, DiscordServerError) as e:
try:
logger.info(
f"[Leveling _update_users()] got following error when fetching guild member {user_id}\n{e}"
)
member = await bot.fetch_user(user_id)
except DiscordServerError as e:
logger.error(
f"[Leveling _update_users()] got the following error when fetching member {user_id}\n{e}."
)
except HTTPException as e:
logger.warn(
f"[Leveling _update_users()] got the following error when fetching member {user_id}\n{e}."
while (
not successful and
self.user_points[user_id].leveling_update_attempt < self.NUMBER_OF_RETRIEVAL_ATTEMPTS_PER_USER
):
self.user_points[user_id].leveling_update_attempt += 1
random_number_milliseconds = random.randint(0, 1000) / 1000
sleep_seconds = math.pow(
2, self.user_points[user_id].leveling_update_attempt + random_number_milliseconds
)
logger.debug(
f"[Leveling _update_users_with_given_ids()] Sleeping for {sleep_seconds} before attempt "
f"{self.user_points[user_id].leveling_update_attempt} when trying to update user_point profile "
f"data for member {user_id}."
)
if member:
await self._update_member_profile_data(logger, member, user_id, index, total_number_of_updates_needed)
await asyncio.sleep(sleep_seconds)
member = await self.get_user(logger, user_id)
if member:
successful = await self._update_member_profile_data(
logger, member, user_id, index, total_number_of_updates_needed
)
self.user_points[user_id].being_processed = False

@tasks.loop(seconds=2)
async def process_leveling_profile_data_for_active_users(self):
Expand All @@ -715,21 +721,65 @@ async def process_leveling_profile_data_for_active_users(self):
updated_user_logs = await UpdatedUser.get_updated_user_logs()
total_number_of_updates_needed = len(updated_user_logs)
for index, update_user in enumerate(updated_user_logs):
updated_user_log_id = update_user[0] # noqa: F841
updated_user_id = update_user[1]
if self.user_points[updated_user_id].being_processed:
continue
self.user_points[updated_user_id].being_processed = True
updated_user_log_id = update_user[0] # noqa: F841
successful = False
self.logger.debug(
f"[Leveling process_leveling_profile_data_for_active_users()] attempting to get updated "
f"user_point profile data for member {updated_user_id} "
f"{index + 1}/{total_number_of_updates_needed} "
f"[Leveling process_leveling_profile_data_for_active_users()] attempt "
f"{self.user_points[updated_user_id].leveling_update_attempt} to get data for user in UpdatedUser"
f" with id {updated_user_id} {index + 1}/{total_number_of_updates_needed} "
)
while (
not successful and
self.user_points[updated_user_id].leveling_update_attempt <
self.NUMBER_OF_RETRIEVAL_ATTEMPTS_PER_USER
):
self.user_points[updated_user_id].leveling_update_attempt += 1
random_number_milliseconds = random.randint(0, 1000) / 1000
sleep_seconds = math.pow(
2, self.user_points[updated_user_id].leveling_update_attempt + random_number_milliseconds
)
self.logger.debug(
f"[Leveling process_leveling_profile_data_for_active_users()] Sleeping for {sleep_seconds} before"
f" attempt {self.user_points[updated_user_id].leveling_update_attempt} when trying to get data"
f" for user in UpdatedUser with id {updated_user_id}."
)
await asyncio.sleep(sleep_seconds)
member = await self.get_user(self.logger, updated_user_id)
if member:
# cannot call _update_users_with_given_ids like process_outdated_profile_pics and
# process_leveling_profile_data_for_lurkers because this task needs to be able to specify a
# updated_user_log_id that will be deleted update_leveling_profile_info when the user is processed
successful = await self._update_member_profile_data(
self.logger, member, updated_user_id, index, total_number_of_updates_needed,
updated_user_log_id=updated_user_log_id
)
self.user_points[updated_user_id].being_processed = False

async def get_user(self, logger, user_id):
user = None
try:
user = await self.guild.fetch_member(user_id)
except NotFound as e:
try:
member = await self.guild.fetch_member(updated_user_id)
except NotFound:
member = await bot.fetch_user(updated_user_id)
await self._update_member_profile_data(
self.logger, member, updated_user_id, index, total_number_of_updates_needed,
updated_user_log_id=updated_user_log_id
logger.info(
f"[Leveling get_user()] unable to find guild member {user_id}\n{e}"
)
user = await bot.fetch_user(user_id)
except DiscordException as e:
logger.error(
f"[Leveling get_user()] got the following error when fetching discord user "
f"{user_id}\n{e}."
)
except DiscordException as e:
logger.error(
f"[Leveling get_user()] got the following error when fetching guild member "
f"{user_id}\n{e}."
)
return user

@tasks.loop(seconds=5)
async def process_outdated_profile_pics(self):
Expand All @@ -747,7 +797,7 @@ async def process_outdated_profile_pics(self):
f"[Leveling process_outdated_profile_pics()] {number_of_users_to_update} users with outdated CDN links"
f" to update"
)
await self._update_users(self.update_outdated_profile_pics_logger, user_ids_to_update)
await self._update_users_with_given_ids(self.update_outdated_profile_pics_logger, user_ids_to_update)
self.process_outdated_profile_pics_in_progress = False

async def _update_member_profile_data(self, logger, member, updated_user_id, index,
Expand All @@ -763,62 +813,38 @@ async def _update_member_profile_data(self, logger, member, updated_user_id, ind
current loop
:param updated_user_log_id: the ID of the UpdatedUser record to delete if this function was called as a
result of member_update_listener's recording any detected changed in UpdatedUser
:return:
:return: True if member was successfully processed
"""
if member:
if member.id in BUGGY_USER:
return
try:
if self.user_points[member.id].leveling_update_attempt >= 5:
log_level = logger.error if updated_user_log_id is None else logger.debug
# wil print out this log line at ERROR only if its not part of the code that attempts to update
# active users returned by UpdatedUser.get_updated_user_logs()
# I am doing this only cause if you happen to have a backlog of users to work through, you will
# mistakenly get alerted about attempts to update the user info not being successful when in
# actuality what is happening is just the first process was all that was necessary and the
# rest of superfluous but I don't want to update the database to take in just 1 log for each
# user for the times when the task is active and therefore one log for each update would
# actually be useful
log_level(
f"[Leveling _update_member_profile_data()] "
f"attempt {self.user_points[member.id].leveling_update_attempt} to update the member profile"
f" data in the database for member {member} with id [{member.id}], expiry_date of "
f"[{self.user_points[member.id].discord_avatar_link_expiry_date.pst}] and a CDN link of "
f"<{self.user_points[member.id].leveling_message_avatar_url}> "
f"{index + 1}/{total_number_of_updates_needed}"
)
else:
# leveling_update_attempt is reset to 0 in update_leveling_profile_info if member is successfully
# updated THIS time
logger.debug(
f"[Leveling _update_member_profile_data()] attempt"
f" {self.user_points[member.id].leveling_update_attempt} to"
f" update the member profile data in the database for member {member} with"
f" id [{updated_user_id}] {index + 1}/{total_number_of_updates_needed}"
)
user_updated = await self.user_points[member.id].update_leveling_profile_info(
logger, self.guild.id, member, self.levelling_website_avatar_channel,
updated_user_log_id=updated_user_log_id
)
if user_updated:
logger.debug(
f"[Leveling _update_member_profile_data()] updated the member profile data"
f" in the database for member {member} with id [{updated_user_id}] "
f"{index + 1}/{total_number_of_updates_needed}"
)
except Exception as e:
logger.error(
f"[Leveling _update_member_profile_data()] unable to update the member profile"
f" data in the database for member {member} with id [{updated_user_id}]"
f" {index + 1}/{total_number_of_updates_needed} due to error:\n{e}"
user_updated = False
try:
logger.debug(
f"[Leveling _update_member_profile_data()] "
f"attempt {self.user_points[member.id].leveling_update_attempt} to update the member profile"
f"data in the database for member {member} with id [{member.id}], "
f"updated_user_log_id = {updated_user_log_id}, expiry_date of "
f"[{self.user_points[member.id].discord_avatar_link_expiry_date.pst}] and a CDN link of "
f"<{self.user_points[member.id].leveling_message_avatar_url}> "
f"{index + 1}/{total_number_of_updates_needed}"
)
# leveling_update_attempt is reset to 0 in update_leveling_profile_info if member is successfully
# updated THIS time
user_updated = await self.user_points[member.id].update_leveling_profile_info(
logger, self.guild.id, member, self.levelling_website_avatar_channel,
updated_user_log_id=updated_user_log_id
)
if user_updated:
logger.debug(
f"[Leveling _update_member_profile_data()] updated the member profile data"
f" in the database for member {member} with id [{updated_user_id}] "
f"{index + 1}/{total_number_of_updates_needed}"
)
else:
logger.warn(
f"[Leveling _update_member_profile_data()] attempt "
f"{self.user_points[member.id].leveling_update_attempt}: unable to update the member profile data"
f" in the database for member {updated_user_id} {index + 1}/{total_number_of_updates_needed}"
except Exception as e:
logger.error(
f"[Leveling _update_member_profile_data()] unable to update the member profile"
f" data in the database for member {member} with id [{updated_user_id}] and updated_user_log_id"
f" = {updated_user_log_id} {index + 1}/{total_number_of_updates_needed} due to error:\n{e}"
)
await self.user_points[member.id].async_save()
return user_updated

@app_commands.command(name="reset_attempts")
@app_commands.checks.has_any_role("Bot_manager")
Expand Down

0 comments on commit bea3b20

Please sign in to comment.