Skip to content

Commit

Permalink
Ensure that _external_ids keys are strings
Browse files Browse the repository at this point in the history
We had a lot of `Failed to find external id for job_id` messages on
rockfish, and I think that might have been part of the issue.
  • Loading branch information
mvdbeek committed Jun 18, 2024
1 parent 12f1349 commit 60abc18
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions pulsar/managers/base/external.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import logging
from string import Template
from typing import (
Dict,
Any,
)

from pulsar.managers import status
from .directory import DirectoryBaseManager
Expand All @@ -18,7 +22,7 @@ class ExternalBaseManager(DirectoryBaseManager):

def __init__(self, name, app, **kwds):
super().__init__(name, app, **kwds)
self._external_ids = {}
self._external_ids: Dict[str, Any] = {}
self.job_name_template = kwds.get('job_name_template', DEFAULT_JOB_NAME_TEMPLATE)

def clean(self, job_id):
Expand Down Expand Up @@ -46,11 +50,11 @@ def _register_external_id(self, job_id, external_id):
if isinstance(external_id, bytes):
external_id = external_id.decode("utf-8")
self._job_directory(job_id).store_metadata(JOB_FILE_EXTERNAL_ID, external_id)
self._external_ids[job_id] = external_id
self._external_ids[str(job_id)] = external_id
return external_id

def _external_id(self, job_id):
return self._external_ids.get(job_id, None)
return self._external_ids.get(str(job_id), None)

def _job_name(self, job_id):
env = self._job_template_env(job_id)
Expand All @@ -59,9 +63,9 @@ def _job_name(self, job_id):
def _recover_active_job(self, job_id):
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
self._external_ids[job_id] = external_id
self._external_ids[str(job_id)] = external_id
else:
raise Exception("Could not determine external ID for job_id [%s]" % job_id)

def _deactivate_job(self, job_id):
del self._external_ids[job_id]
del self._external_ids[str(job_id)]

0 comments on commit 60abc18

Please sign in to comment.