Skip to content

Commit

Permalink
Merge pull request #838 from AlexsLemonade/mis/maxjobs
Browse files Browse the repository at this point in the history
Set Limit To Maximum Concurrent Survey Jobs
  • Loading branch information
Rich Jones authored Nov 15, 2018
2 parents dc1c2e0 + 775c299 commit 48784a6
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 101 deletions.
59 changes: 37 additions & 22 deletions common/data_refinery_common/message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from enum import Enum
import nomad
from nomad.api.exceptions import URLNotFoundNomadException
from data_refinery_common.utils import get_env_variable, get_volume_index
from data_refinery_common.utils import get_env_variable, get_env_variable_gracefully, get_volume_index
from data_refinery_common.models import ProcessorJob, SurveyJob
from data_refinery_common.job_lookup import ProcessorPipeline, Downloaders, SurveyJobTypes
from data_refinery_common.logging import get_and_configure_logger
Expand All @@ -17,9 +17,14 @@
NOMAD_TRANSCRIPTOME_JOB = "TRANSCRIPTOME_INDEX"
NOMAD_DOWNLOADER_JOB = "DOWNLOADER"
NONE_JOB_ERROR_TEMPLATE = "send_job was called with NONE job_type: {} for {} job {}"
RUNNING_IN_CLOUD = get_env_variable_gracefully("RUNNING_IN_CLOUD", "False")
if RUNNING_IN_CLOUD == "False":
RUNNING_IN_CLOUD = False
else:
RUNNING_IN_CLOUD = True


def send_job(job_type: Enum, job) -> bool:
def send_job(job_type: Enum, job, is_dispatch=False) -> bool:
"""Queues a worker job by sending a Nomad Job dispatch message.
job_type must be a valid Enum for ProcessorPipelines or
Expand All @@ -32,6 +37,7 @@ def send_job(job_type: Enum, job) -> bool:
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=30)

is_processor = True
if job_type is ProcessorPipeline.TRANSCRIPTOME_INDEX_LONG \
or job_type is ProcessorPipeline.TRANSCRIPTOME_INDEX_SHORT:
nomad_job = NOMAD_TRANSCRIPTOME_JOB
Expand Down Expand Up @@ -60,16 +66,18 @@ def send_job(job_type: Enum, job) -> bool:
elif job_type is ProcessorPipeline.AGILENT_TWOCOLOR_TO_PCL:
# Agilent twocolor uses the same job specification as Affy.
nomad_job = ProcessorPipeline.AFFY_TO_PCL.value
elif job_type in list(Downloaders):
nomad_job = NOMAD_DOWNLOADER_JOB
is_processor = False
elif job_type in list(SurveyJobTypes):
nomad_job = job_type.value
is_processor = False
elif job_type is Downloaders.NONE:
logger.warn("Not queuing %s job.", job_type, job_id=job_id)
raise ValueError(NONE_JOB_ERROR_TEMPLATE.format(job_type.value, "Downloader", job_id))
elif job_type is ProcessorPipeline.NONE:
logger.warn("Not queuing %s job.", job_type, job_id=job_id)
raise ValueError(NONE_JOB_ERROR_TEMPLATE.format(job_type.value, "Processor", job_id))
elif job_type in list(Downloaders):
nomad_job = NOMAD_DOWNLOADER_JOB
elif job_type in list(SurveyJobTypes):
nomad_job = job_type.value
else:
raise ValueError("Invalid job_type: {}".format(job_type.value))

Expand All @@ -93,20 +101,27 @@ def send_job(job_type: Enum, job) -> bool:
elif isinstance(job, SurveyJob):
nomad_job = nomad_job + "_" + str(job.ram_amount)

try:
nomad_response = nomad_client.job.dispatch_job(nomad_job, meta={"JOB_NAME": job_type.value,
"JOB_ID": str(job.id)})
job.nomad_job_id = nomad_response["DispatchedJobID"]
# We only want to dispatch processor jobs directly.
# Everything else will be handled by the Foreman, which will increment the retry counter.
if is_processor or is_dispatch or (not RUNNING_IN_CLOUD):
try:
nomad_response = nomad_client.job.dispatch_job(nomad_job, meta={"JOB_NAME": job_type.value,
"JOB_ID": str(job.id)})
job.nomad_job_id = nomad_response["DispatchedJobID"]
job.save()
return True
except URLNotFoundNomadException:
logger.error("Dispatching Nomad job of type %s for job spec %s to host %s and port %s failed.",
job_type, nomad_job, nomad_host, nomad_port, job=str(job.id))
return False
except Exception as e:
logger.exception('Unable to Dispatch Nomad Job.',
job_name=job_type.value,
job_id=str(job.id),
reason=str(e)
)
raise
else:
job.num_retries = job.num_retries - 1
job.save()
return True
except URLNotFoundNomadException:
logger.error("Dispatching Nomad job of type %s for job spec %s to host %s and port %s failed.",
job_type, nomad_job, nomad_host, nomad_port, job=str(job.id))
return False
except Exception as e:
logger.exception('Unable to Dispatch Nomad Job.',
job_name=job_type.value,
job_id=str(job.id),
reason=str(e)
)
raise
return True
109 changes: 93 additions & 16 deletions foreman/data_refinery_foreman/foreman/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
# greater than this because of the first attempt
MAX_NUM_RETRIES = 2

# This can be overritten by the env var "MAX_TOTAL_JOBS"
DEFAULT_MAX_JOBS = 10000

# The fastest each thread will repeat its checks.
# Could be slower if the thread takes longer than this to check its jobs.
MIN_LOOP_TIME = timedelta(minutes=2)
Expand Down Expand Up @@ -121,7 +124,7 @@ def requeue_downloader_job(last_job: DownloaderJob) -> None:
last_job.id,
new_job.id)
try:
if send_job(Downloaders[last_job.downloader_task], new_job):
if send_job(Downloaders[last_job.downloader_task], job=new_job, is_dispatch=True):
last_job.retried = True
last_job.success = False
last_job.retried_job = new_job
Expand All @@ -139,12 +142,34 @@ def requeue_downloader_job(last_job: DownloaderJob) -> None:

def handle_downloader_jobs(jobs: List[DownloaderJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_downloader_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more downloader jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_downloader_jobs() -> None:
Expand All @@ -166,7 +191,7 @@ def retry_hung_downloader_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -205,7 +230,7 @@ def retry_lost_downloader_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
lost_jobs = []
for job in potentially_lost_jobs:
try:
Expand Down Expand Up @@ -288,7 +313,7 @@ def requeue_processor_job(last_job: ProcessorJob) -> None:
logger.info("Requeuing Processor Job which had ID %d with a new Processor Job with ID %d.",
last_job.id,
new_job.id)
if send_job(ProcessorPipeline[last_job.pipeline_applied], new_job):
if send_job(ProcessorPipeline[last_job.pipeline_applied], job=new_job, is_dispatch=True):
last_job.retried = True
last_job.success = False
last_job.retried_job = new_job
Expand All @@ -306,12 +331,34 @@ def requeue_processor_job(last_job: ProcessorJob) -> None:

def handle_processor_jobs(jobs: List[ProcessorJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_processor_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more processor jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_processor_jobs() -> None:
Expand All @@ -338,7 +385,7 @@ def retry_hung_processor_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -425,6 +472,8 @@ def requeue_survey_job(last_job: SurveyJob) -> None:
The new survey job will have num_retries one greater than
last_job.num_retries.
"""

lost_jobs = []
num_retries = last_job.num_retries + 1

new_job = SurveyJob(num_retries=num_retries,
Expand Down Expand Up @@ -453,7 +502,7 @@ def requeue_survey_job(last_job: SurveyJob) -> None:
new_job.id)

try:
if send_job(SurveyJobTypes.SURVEYOR, new_job):
if send_job(SurveyJobTypes.SURVEYOR, job=new_job, is_dispatch=True):
last_job.retried = True
last_job.success = False
last_job.retried_job = new_job
Expand All @@ -468,20 +517,44 @@ def requeue_survey_job(last_job: SurveyJob) -> None:
# Can't communicate with nomad just now, leave the job for a later loop.
new_job.delete()

return True


def handle_survey_jobs(jobs: List[SurveyJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_survey_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more survey jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_survey_jobs() -> None:
"""Handle survey jobs that were marked as a failure."""
failed_jobs = SurveyJob.objects.filter(success=False, retried=False)
failed_jobs = SurveyJob.objects.filter(success=False, retried=False).order_by('pk')
if failed_jobs:
logger.info(
"Handling failed (explicitly-marked-as-failure) jobs!",
Expand All @@ -499,11 +572,11 @@ def retry_hung_survey_jobs() -> None:
end_time=None,
start_time__isnull=False,
no_retry=False
)
).order_by('pk')

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -541,12 +614,13 @@ def retry_lost_survey_jobs() -> None:
start_time=None,
end_time=None,
no_retry=False
)
).order_by('pk')

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
lost_jobs = []

for job in potentially_lost_jobs:
try:
# Surveyor jobs didn't always have nomad_job_ids. If they
Expand Down Expand Up @@ -583,7 +657,7 @@ def retry_lost_survey_jobs() -> None:
handle_survey_jobs(lost_jobs)

##
# Main loop
# Janitor
##

@do_forever(JANITOR_DISPATCH_TIME)
Expand All @@ -605,11 +679,14 @@ def send_janitor_jobs():
index=actual_index
)
try:
send_job(ProcessorPipeline["JANITOR"], new_job)
send_job(ProcessorPipeline["JANITOR"], job=new_job, is_dispatch=True)
except Exception as e:
# If we can't dispatch this job, something else has gone wrong.
continue

##
# Main loop
##

def monitor_jobs():
"""Runs a thread for each job monitoring loop."""
Expand Down
16 changes: 15 additions & 1 deletion foreman/data_refinery_foreman/foreman/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
ProcessorJobOriginalFileAssociation,
ProcessorJobDatasetAssociation,
)

from test.support import EnvironmentVarGuard # Python >=3

class ForemanTestCase(TestCase):
def create_downloader_job(self):
Expand Down Expand Up @@ -605,6 +605,20 @@ def test_repeated_survey_failures(self, mock_send_job):
self.assertEqual(last_job.num_retries, main.MAX_NUM_RETRIES)
self.assertFalse(last_job.success)

# MAX TOTAL tests
self.env = EnvironmentVarGuard()
self.env.set('MAX_TOTAL_JOBS', '0')
with self.env:
job = self.create_survey_job()
result = main.handle_survey_jobs([job])
self.assertFalse(result)

self.env.set('MAX_TOTAL_JOBS', '1000')
with self.env:
job = self.create_survey_job()
result = main.requeue_survey_job(job)
self.assertTrue(result)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_failed_survey_jobs(self, mock_send_job):
mock_send_job.return_value = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,15 @@ def queue_surveyor_for_accession(accession: str) -> None:
"""Dispatches a surveyor job for the accession code."""
# Start at 256MB of RAM for surveyor jobs.
survey_job = SurveyJob(ram_amount=256)

set_source_type_for_accession(survey_job, accession)

key_value_pair = SurveyJobKeyValue(survey_job=survey_job,
key="experiment_accession_code",
value=accession)
key_value_pair.save()

try:
send_job(SurveyJobTypes.SURVEYOR, survey_job)
except:
# If we can't dispatch this, then let the foreman retry it late.
pass
# We don't actually send the job here, we just create it.
# The foreman will pick it up and dispatch it when the time is appropriate.

class Command(BaseCommand):
def add_arguments(self, parser):
Expand Down
Loading

0 comments on commit 48784a6

Please sign in to comment.