Skip to content

Commit

Permalink
Merge pull request #778 from AlexsLemonade/kurtwheeler/fix-janitor
Browse files Browse the repository at this point in the history
Makes the janitor use the correct id so it doesn't delete literally e…
  • Loading branch information
kurtwheeler authored Nov 6, 2018
2 parents d55616d + f291de2 commit 7a1e964
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
28 changes: 15 additions & 13 deletions workers/data_refinery_workers/processors/janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,26 @@ def _find_and_remove_expired_jobs(job_context):
# Okay, does this job exist?
try:
job = ProcessorJob.objects.get(id=job_id)
except Exception:
pass

# Is this job running?
try:
job_status = nomad_client.job.get_job(job_id)["Status"]
# Is this job running?
try:
job_status = nomad_client.job.get_job(job.nomad_job_id)["Status"]

# This job is running, don't delete the working directory.
if job_status == "running":
# This job is running, don't delete the working directory.
if job_status == "running":
continue
except BaseNomadException as e:
# If we can't currently access Nomad,
# just continue until we can again.
continue
except BaseNomadException as e:
# If we can't currently access Nomad,
# just continue until we can again.
continue
except Exception as e:
# This job is likely vanished. No need for this directory.
except Exception as e:
# This job is likely vanished. No need for this directory.
pass
except Exception:
# If we don't have a record of the job, we don't need the directory.
pass


# Delete it!
try:
to_delete = LOCAL_ROOT_DIR + '/' + item
Expand Down
56 changes: 49 additions & 7 deletions workers/data_refinery_workers/processors/test_janitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import zipfile

from io import StringIO
from unittest.mock import patch, call
from unittest.mock import patch, call, MagicMock
from nomad import Nomad
from nomad.api.exceptions import URLNotFoundNomadException

Expand Down Expand Up @@ -38,8 +38,27 @@

def prepare_job():

# Create 10 job directories
for i in range(0, JOBS):
os.makedirs(LOCAL_ROOT_DIR + '/' + 'processor_job_' + str(i), exist_ok=True)
os.makedirs(LOCAL_ROOT_DIR + '/processor_job_' + str(i), exist_ok=True)

# Create a job out of the range with index in it to make sure we
# don't delete index directories since that's where transcriptome
# indices get downloaded to.
os.makedirs(LOCAL_ROOT_DIR + '/processor_job_' + str(JOBS+1) + '_index', exist_ok=True)

# Save two jobs so that we trigger two special circumstances, one
# where the job is still running and the other where querying
# nomad raises an exception.
pj = ProcessorJob()
pj.pipeline_applied = "SALMON"
pj.nomad_job_id = "running_job"
pj.save()

pj = ProcessorJob()
pj.pipeline_applied = "SALMON"
pj.nomad_job_id = "missing_job"
pj.save()

pj = ProcessorJob()
pj.pipeline_applied = "JANITOR"
Expand All @@ -50,13 +69,36 @@ def prepare_job():
class JanitorTestCase(TestCase):

@tag("janitor")
@patch.object(Nomad, 'job')
def test_janitor(self, mock_get_job):
@patch('data_refinery_workers.processors.janitor.Nomad')
def test_janitor(self, mock_nomad):
""" Main tester. """
def mock_get_job(job_id: str):
if job_id == "running_job":
return {"Status": "running"}
elif job_id == "missing_job":
raise URLNotFoundNomadException()
else:
return {"Status": "dead"}

def mock_init_nomad(host, port=0, timeout=0):
ret_value = MagicMock()
ret_value.job = MagicMock()
ret_value.job.get_job = MagicMock()
ret_value.job.get_job.side_effect = mock_get_job
return ret_value

mock_nomad.side_effect = mock_init_nomad
job = prepare_job()
final_context = janitor.run_janitor(job.pk)

for i in range(0, JOBS):
self.assertFalse(os.path.exists(LOCAL_ROOT_DIR + '/' + 'processor_job_' + str(i)))
# The job with id 1 should appear running.
if i == 1:
self.assertTrue(os.path.exists(LOCAL_ROOT_DIR + '/processor_job_' + str(i)))
else:
self.assertFalse(os.path.exists(LOCAL_ROOT_DIR + '/processor_job_' + str(i)))

self.assertTrue(os.path.exists(LOCAL_ROOT_DIR + '/processor_job_11_index'))

self.assertEqual(len(final_context['deleted_items']), 10)
# Deleted all the working directories except for the one that's still running.
self.assertEqual(len(final_context['deleted_items']), JOBS-1)

0 comments on commit 7a1e964

Please sign in to comment.