Skip to content

Commit

Permalink
fix(updater): fixed an issue with an excessively large context query …
Browse files Browse the repository at this point in the history
…for available tags in the API hub.docker.com
  • Loading branch information
orenlab committed Jan 13, 2025
1 parent 4ede5a3 commit 40f22b0
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 66 deletions.
199 changes: 142 additions & 57 deletions pytmbot/adapters/docker/updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
UpdateResult: TypeAlias = Dict[str, Dict[str, List[dict]]]


class UpdaterStatus(Enum):
SUCCESS = auto()
RATE_LIMITED = auto()
ERROR = auto()


@dataclass
class UpdaterResponse:
status: UpdaterStatus
message: str
data: Optional[Dict] = None


class TagType(Enum):
SEMVER = auto()
DATE = auto()
Expand Down Expand Up @@ -182,37 +195,45 @@ async def _fetch_remote_tags(

tags_info: List[EnhancedTagInfo] = []

async def fetch_with_pagination(_url: str) -> None:
async def fetch_first_page(_url: str) -> Optional[bool]:
try:
while _url:
async with session.get(_url, timeout=10) as response:
if response.status == 429:
retry_after = int(response.headers.get("Retry-After", "5"))
self._log.warning(f"Rate limit hit, waiting {retry_after} seconds")
await asyncio.sleep(retry_after)
continue

response.raise_for_status()
data = await response.json()

new_tags = [
self.analyzer.analyze_tag(TagInfo(
name=result["name"],
created_at=result.get("tag_last_pushed", ""),
digest=result.get("digest")
))
for result in data.get("results", [])
]
tags_info.extend(new_tags)
self._log.debug(f"Fetched {len(new_tags)} tags from {_url}")

_url = data.get("next")
async with session.get(_url, timeout=10) as response:
if response.status == 429:
retry_after = int(response.headers.get("Retry-After", "3600"))
self._log.warning(f"Rate limit exceeded, retry after {retry_after} seconds")
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=429,
message=f"Rate limit exceeded, retry after {retry_after} seconds"
)

response.raise_for_status()
data = await response.json()

new_tags = [
self.analyzer.analyze_tag(TagInfo(
name=result["name"],
created_at=result.get("tag_last_pushed", ""),
digest=result.get("digest")
))
for result in data.get("results", [])
]
tags_info.extend(new_tags)
self._log.debug(f"Fetched {len(new_tags)} tags from {_url}")
return False # Indicate successful fetch

except aiohttp.ClientResponseError as e:
if e.status == 429:
raise
self._log.warning(f"Unable to fetch tags from {_url}: {e}")
return None
except (aiohttp.ClientError, json.JSONDecodeError) as e:
self._log.warning(f"Unable to fetch tags from {_url}: {e}")
return None

for url in base_urls:
await fetch_with_pagination(url)
await fetch_first_page(url)
if tags_info:
self.tag_cache[repo] = tags_info
self._log.info(f"Successfully cached {len(tags_info)} tags for {repo}")
Expand Down Expand Up @@ -268,45 +289,109 @@ async def _find_compatible_updates(
self._log.debug(f"Found {len(updates)} compatible updates")
return updates

async def _check_updates(self) -> UpdateResult:
async def _check_updates(self) -> UpdaterResponse:
"""Check for updates across all repositories."""
with self._log.context(action="check_updates"):
updates: UpdateResult = {}

async with aiohttp.ClientSession() as session:
for repo, local_tags in self.local_images.items():
self._log.info(f"Checking updates for repository: {repo}")
remote_tags = await self._fetch_remote_tags(session, repo)

repo_updates = []
for local_tag_info in local_tags:
local_enhanced = self.analyzer.analyze_tag(TagInfo(
name=local_tag_info["tag"],
created_at=local_tag_info["created_at"],
digest=local_tag_info["digest"]
))

compatible_updates = await self._find_compatible_updates(
local_enhanced, remote_tags
)
repo_updates.extend(compatible_updates)

repo_updates.sort(
key=lambda x: isoparse(x.created_at_remote),
reverse=True
)
updates[repo] = {
"updates": [update.to_dict() for update in repo_updates[:5]]
}
self._log.info(f"Found {len(repo_updates)} updates for {repo}")
try:
async with aiohttp.ClientSession() as session:
updates = {}
for repo, local_tags in self.local_images.items():
self._log.info(f"Checking updates for repository: {repo}")
try:
remote_tags = await self._fetch_remote_tags(session, repo)
except aiohttp.ClientResponseError as e:
if e.status == 429:
return UpdaterResponse(
status=UpdaterStatus.RATE_LIMITED,
message="Docker Hub API rate limit exceeded. Please try again later.",
data={"retry_after": "3600"}
)
raise

repo_updates = []
for local_tag_info in local_tags:
local_enhanced = self.analyzer.analyze_tag(TagInfo(
name=local_tag_info["tag"],
created_at=local_tag_info["created_at"],
digest=local_tag_info["digest"]
))

compatible_updates = await self._find_compatible_updates(
local_enhanced, remote_tags
)
repo_updates.extend(compatible_updates)

return updates
repo_updates.sort(
key=lambda x: isoparse(x.created_at_remote),
reverse=True
)
updates[repo] = {
"updates": [update.to_dict() for update in repo_updates[:5]]
}
self._log.info(f"Found {len(repo_updates)} updates for {repo}")

return UpdaterResponse(
status=UpdaterStatus.SUCCESS,
message="Successfully checked for updates",
data=updates
)

except Exception as e:
self._log.error(f"Error checking for updates: {e}")
return UpdaterResponse(
status=UpdaterStatus.ERROR,
message=f"Error checking for updates: {str(e)}"
)

def to_json(self) -> str:
"""Returns the update check result in JSON format."""
"""
Returns the update check result in JSON format.
Returns:
str: JSON string containing the update check results or error information.
Example successful response:
{
"status": "SUCCESS",
"message": "Successfully checked for updates",
"data": {
"nginx": {
"updates": [
{
"current_tag": "1.24.0",
"newer_tag": "1.24.1",
"created_at_local": "2023-05-24T12:00:00Z",
"created_at_remote": "2023-06-01T15:30:00Z",
"current_digest": "sha256:abcd..."
}
]
}
}
}
Example rate limit response:
{
"status": "RATE_LIMITED",
"message": "Docker Hub API rate limit exceeded. Please try again later.",
"data": {
"retry_after": "3600"
}
}
Example error response:
{
"status": "ERROR",
"message": "Error checking for updates: Connection timeout",
"data": null
}
"""
with self._log.context(action="to_json"):
result = asyncio.run(self._check_updates())
return json.dumps(result, indent=4)
return json.dumps({
"status": result.status.name,
"message": result.message,
"data": result.data
}, indent=4)


if __name__ == "__main__":
Expand Down
80 changes: 71 additions & 9 deletions pytmbot/handlers/docker_handlers/inline/image_updates.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import json
from typing import Dict, List, Union

from telebot import TeleBot
from telebot.types import CallbackQuery

from pytmbot.adapters.docker.updates import DockerImageUpdater
from pytmbot.adapters.docker.updates import DockerImageUpdater, UpdaterStatus
from pytmbot.logs import Logger
from pytmbot.parsers.compiler import Compiler

logger = Logger()


# func=lambda call: call.data.startswith('__check_updates__')
@logger.catch()
@logger.session_decorator
def handle_image_updates(call: CallbackQuery, bot: TeleBot):
"""
Handles the callback for images updates.
Handles the callback for Docker image updates.
Args:
call (CallbackQuery): The callback query object.
Expand All @@ -27,17 +27,37 @@ def handle_image_updates(call: CallbackQuery, bot: TeleBot):
updater = DockerImageUpdater()
updater.initialize()

context_json = updater.to_json()
context = json.loads(context_json)
response_json = updater.to_json()
response = json.loads(response_json)

if not context or all(not image_info["updates"] for image_info in context.values()):
# Handle rate limit
if response["status"] == UpdaterStatus.RATE_LIMITED.name:
return bot.answer_callback_query(
call.id,
text=f"Rate limit exceeded. Please try again in {response['data']['retry_after']} seconds.",
show_alert=True,
)

# Handle errors
if response["status"] == UpdaterStatus.ERROR.name:
return bot.answer_callback_query(
call.id,
text=f"Error checking updates: {response['message']}",
show_alert=True,
)

# Handle success with no updates
if not response["data"] or all(
not image_info["updates"] for image_info in response["data"].values()
):
return bot.answer_callback_query(
call.id,
text="No updates found for any images.",
show_alert=True,
)

prepared_context = prepare_context_for_render(context)
# Process updates
prepared_context = prepare_context_for_render(response["data"])

with Compiler(template_name="d_updates.jinja2", **prepared_context) as compiler:
formatted_context = compiler.compile()
Expand All @@ -50,11 +70,53 @@ def handle_image_updates(call: CallbackQuery, bot: TeleBot):
)


def prepare_context_for_render(context):
def prepare_context_for_render(
data: Dict[str, Dict[str, List[dict]]]
) -> Dict[str, Union[Dict[str, Dict], List[str]]]:
"""
Prepares the context for template rendering.
Args:
data: Dictionary containing update information for repositories
Returns:
Dictionary with prepared context for rendering
Example input:
{
"nginx": {
"updates": [
{
"current_tag": "1.24.0",
"newer_tag": "1.24.1",
"created_at_local": "2023-05-24T12:00:00Z",
"created_at_remote": "2023-06-01T15:30:00Z"
}
]
}
}
Example output:
{
"updates": {
"nginx": {
"current_tag": "1.24.0",
"created_at_local": "2023-05-24T12:00:00Z",
"updates": [
{
"newer_tag": "1.24.1",
"created_at_remote": "2023-06-01T15:30:00Z"
}
]
}
},
"no_updates": []
}
"""
updates = {}
no_updates = []

for repo, info in context.items():
for repo, info in data.items():
if not info["updates"]:
no_updates.append(repo)
continue
Expand Down

0 comments on commit 40f22b0

Please sign in to comment.