Skip to content

Commit

Permalink
fsapp: add clean_pidbox for cleaning up unack'd replies
Browse files Browse the repository at this point in the history
  • Loading branch information
pmrowla committed Apr 27, 2023
1 parent 358a100 commit e07e276
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions src/dvc_task/app/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ def _delete_expired(
cache: Dict[str, str],
include_tickets: bool = False,
):
assert isinstance(msg.properties, dict)
properties = cast(Dict[str, Any], msg.properties)
delivery_info: Dict[str, str] = properties.get("delivery_info", {})
if queues:
assert isinstance(msg.properties, dict)
properties = cast(Dict[str, Any], msg.properties)
delivery_info: Dict[str, str] = properties.get("delivery_info", {})
routing_key = delivery_info.get("routing_key")
if routing_key and routing_key in queues:
return
Expand All @@ -256,7 +256,10 @@ def _delete_expired(
ticket = msg.headers.get("ticket")
if include_tickets and ticket or (expires is not None and expires <= now):
assert msg.delivery_tag
self._delete_msg(msg.delivery_tag, [], cache)
try:
self._delete_msg(msg.delivery_tag, [], cache)
except ValueError:
pass

queues = set(exclude) if exclude else set()
now = datetime.now().timestamp()
Expand All @@ -270,3 +273,21 @@ def _delete_expired(
def clean(self):
"""Clean extraneous celery messages from this FSApp."""
self._gc(exclude=[self.conf.task_default_queue])
self._clean_pidbox(f"reply.{self.conf.task_default_queue}.pidbox")

def _clean_pidbox(self, exchange: str):
"""Clean pidbox replies for the specified exchange."""

def _delete_replies(msg: Message, exchange: str, cache: Dict[str, str]):
assert isinstance(msg.properties, dict)
properties = cast(Dict[str, Any], msg.properties)
delivery_info: Dict[str, str] = properties.get("delivery_info", {})
if delivery_info.get("exchange", "") == exchange:
assert msg.delivery_tag
try:
self._delete_msg(msg.delivery_tag, [], cache)
except ValueError:
pass

for msg in self._iter_data_folder():
_delete_replies(msg, exchange, self._queued_msg_path_cache)

0 comments on commit e07e276

Please sign in to comment.