Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Limit To Maximum Concurrent Survey Jobs #838

Merged
merged 17 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions common/data_refinery_common/message_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def send_job(job_type: Enum, job) -> bool:
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = nomad.Nomad(nomad_host, port=int(nomad_port), timeout=30)

is_processor = True
if job_type is ProcessorPipeline.TRANSCRIPTOME_INDEX_LONG \
or job_type is ProcessorPipeline.TRANSCRIPTOME_INDEX_SHORT:
nomad_job = NOMAD_TRANSCRIPTOME_JOB
Expand Down Expand Up @@ -60,16 +61,18 @@ def send_job(job_type: Enum, job) -> bool:
elif job_type is ProcessorPipeline.AGILENT_TWOCOLOR_TO_PCL:
# Agilent twocolor uses the same job specification as Affy.
nomad_job = ProcessorPipeline.AFFY_TO_PCL.value
elif job_type in list(Downloaders):
nomad_job = NOMAD_DOWNLOADER_JOB
is_processor = False
elif job_type in list(SurveyJobTypes):
nomad_job = job_type.value
is_processor = False
elif job_type is Downloaders.NONE:
logger.warn("Not queuing %s job.", job_type, job_id=job_id)
raise ValueError(NONE_JOB_ERROR_TEMPLATE.format(job_type.value, "Downloader", job_id))
elif job_type is ProcessorPipeline.NONE:
logger.warn("Not queuing %s job.", job_type, job_id=job_id)
raise ValueError(NONE_JOB_ERROR_TEMPLATE.format(job_type.value, "Processor", job_id))
elif job_type in list(Downloaders):
nomad_job = NOMAD_DOWNLOADER_JOB
elif job_type in list(SurveyJobTypes):
nomad_job = job_type.value
else:
raise ValueError("Invalid job_type: {}".format(job_type.value))

Expand All @@ -93,20 +96,27 @@ def send_job(job_type: Enum, job) -> bool:
elif isinstance(job, SurveyJob):
nomad_job = nomad_job + "_" + str(job.ram_amount)

try:
nomad_response = nomad_client.job.dispatch_job(nomad_job, meta={"JOB_NAME": job_type.value,
"JOB_ID": str(job.id)})
job.nomad_job_id = nomad_response["DispatchedJobID"]
# We only want to dispatch processor jobs directly.
# Everything else will be handled by the Foreman, which will increment the retry counter.
if is_processor:
try:
nomad_response = nomad_client.job.dispatch_job(nomad_job, meta={"JOB_NAME": job_type.value,
"JOB_ID": str(job.id)})
job.nomad_job_id = nomad_response["DispatchedJobID"]
job.save()
return True
except URLNotFoundNomadException:
logger.error("Dispatching Nomad job of type %s for job spec %s to host %s and port %s failed.",
job_type, nomad_job, nomad_host, nomad_port, job=str(job.id))
return False
except Exception as e:
logger.exception('Unable to Dispatch Nomad Job.',
job_name=job_type.value,
job_id=str(job.id),
reason=str(e)
)
raise
else:
job.num_retries = job.num_retries - 1
job.save()
return True
except URLNotFoundNomadException:
logger.error("Dispatching Nomad job of type %s for job spec %s to host %s and port %s failed.",
job_type, nomad_job, nomad_host, nomad_port, job=str(job.id))
return False
except Exception as e:
logger.exception('Unable to Dispatch Nomad Job.',
job_name=job_type.value,
job_id=str(job.id),
reason=str(e)
)
raise
return True
101 changes: 89 additions & 12 deletions foreman/data_refinery_foreman/foreman/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
# greater than this because of the first attempt
MAX_NUM_RETRIES = 2

# This can be overritten by the env var "MAX_TOTAL_JOBS"
DEFAULT_MAX_JOBS = 10000

# The fastest each thread will repeat its checks.
# Could be slower if the thread takes longer than this to check its jobs.
MIN_LOOP_TIME = timedelta(minutes=2)
Expand Down Expand Up @@ -139,12 +142,34 @@ def requeue_downloader_job(last_job: DownloaderJob) -> None:

def handle_downloader_jobs(jobs: List[DownloaderJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should start at len(all_jobs) not 0.

for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_downloader_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more downloader jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_downloader_jobs() -> None:
Expand All @@ -166,7 +191,7 @@ def retry_hung_downloader_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -205,7 +230,7 @@ def retry_lost_downloader_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
lost_jobs = []
for job in potentially_lost_jobs:
try:
Expand Down Expand Up @@ -306,12 +331,34 @@ def requeue_processor_job(last_job: ProcessorJob) -> None:

def handle_processor_jobs(jobs: List[ProcessorJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_processor_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more processor jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_processor_jobs() -> None:
Expand All @@ -338,7 +385,7 @@ def retry_hung_processor_jobs() -> None:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -425,6 +472,8 @@ def requeue_survey_job(last_job: SurveyJob) -> None:
The new survey job will have num_retries one greater than
last_job.num_retries.
"""

lost_jobs = []
num_retries = last_job.num_retries + 1

new_job = SurveyJob(num_retries=num_retries,
Expand Down Expand Up @@ -468,20 +517,44 @@ def requeue_survey_job(last_job: SurveyJob) -> None:
# Can't communicate with nomad just now, leave the job for a later loop.
new_job.delete()

return True


def handle_survey_jobs(jobs: List[SurveyJob]) -> None:
"""For each job in jobs, either retry it or log it."""
for job in jobs:

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
# Maximum number of total jobs running at a time.
# We do this now rather than import time for testing purposes.
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", DEFAULT_MAX_JOBS))
len_all_jobs = len(nomad_client.jobs.get_jobs())
if len_all_jobs >= MAX_TOTAL_JOBS:
logger.info("Not requeuing job until we're running fewer jobs.")
return False

jobs_dispatched = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

for count, job in enumerate(jobs):
if job.num_retries < MAX_NUM_RETRIES:
requeue_survey_job(job)
jobs_dispatched = jobs_dispatched + 1
else:
handle_repeated_failure(job)

if (count % 100) == 0:
len_all_jobs = len(nomad_client.jobs.get_jobs())

if (jobs_dispatched + len_all_jobs) >= MAX_TOTAL_JOBS:
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more survey jobs now.")
return False

return True

@do_forever(MIN_LOOP_TIME)
def retry_failed_survey_jobs() -> None:
"""Handle survey jobs that were marked as a failure."""
failed_jobs = SurveyJob.objects.filter(success=False, retried=False)
failed_jobs = SurveyJob.objects.filter(success=False, retried=False).order_by('pk')
if failed_jobs:
logger.info(
"Handling failed (explicitly-marked-as-failure) jobs!",
Expand All @@ -499,11 +572,11 @@ def retry_hung_survey_jobs() -> None:
end_time=None,
start_time__isnull=False,
no_retry=False
)
).order_by('pk')

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
hung_jobs = []
for job in potentially_hung_jobs:
try:
Expand Down Expand Up @@ -541,12 +614,13 @@ def retry_lost_survey_jobs() -> None:
start_time=None,
end_time=None,
no_retry=False
)
).order_by('pk')

nomad_host = get_env_variable("NOMAD_HOST")
nomad_port = get_env_variable("NOMAD_PORT", "4646")
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=5)
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30)
lost_jobs = []

for job in potentially_lost_jobs:
try:
# Surveyor jobs didn't always have nomad_job_ids. If they
Expand Down Expand Up @@ -583,7 +657,7 @@ def retry_lost_survey_jobs() -> None:
handle_survey_jobs(lost_jobs)

##
# Main loop
# Janitor
##

@do_forever(JANITOR_DISPATCH_TIME)
Expand All @@ -610,6 +684,9 @@ def send_janitor_jobs():
# If we can't dispatch this job, something else has gone wrong.
continue

##
# Main loop
##

def monitor_jobs():
"""Runs a thread for each job monitoring loop."""
Expand Down
16 changes: 15 additions & 1 deletion foreman/data_refinery_foreman/foreman/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
ProcessorJobOriginalFileAssociation,
ProcessorJobDatasetAssociation,
)

from test.support import EnvironmentVarGuard # Python >=3

class ForemanTestCase(TestCase):
def create_downloader_job(self):
Expand Down Expand Up @@ -605,6 +605,20 @@ def test_repeated_survey_failures(self, mock_send_job):
self.assertEqual(last_job.num_retries, main.MAX_NUM_RETRIES)
self.assertFalse(last_job.success)

# MAX TOTAL tests
self.env = EnvironmentVarGuard()
self.env.set('MAX_TOTAL_JOBS', '0')
with self.env:
job = self.create_survey_job()
result = main.handle_survey_jobs([job])
self.assertFalse(result)

self.env.set('MAX_TOTAL_JOBS', '1000')
with self.env:
job = self.create_survey_job()
result = main.requeue_survey_job(job)
self.assertTrue(result)

@patch('data_refinery_foreman.foreman.main.send_job')
def test_retrying_failed_survey_jobs(self, mock_send_job):
mock_send_job.return_value = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def queue_surveyor_for_accession(accession: str) -> None:
"""Dispatches a surveyor job for the accession code."""
# Start at 256MB of RAM for surveyor jobs.
survey_job = SurveyJob(ram_amount=256)
# Survey Jobs will actually be dispatched by the foreman retry logic as capacity allows,
# so we set num_retries to -1 for right now.
survey_job.num_retries = -1

set_source_type_for_accession(survey_job, accession)

Expand All @@ -74,11 +77,8 @@ def queue_surveyor_for_accession(accession: str) -> None:
value=accession)
key_value_pair.save()

try:
send_job(SurveyJobTypes.SURVEYOR, survey_job)
except:
# If we can't dispatch this, then let the foreman retry it late.
pass
# We don't actually send the job here, we just create it.
# The foreman will pick it up and dispatch it when the time is appropriate.

class Command(BaseCommand):
def add_arguments(self, parser):
Expand Down
2 changes: 1 addition & 1 deletion workers/illumina_dependencies.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ devtools::install_version('data.table', version='1.11.0')
devtools::install_version('optparse', version='1.4.4')
devtools::install_version('lazyeval', version='0.2.1')
devtools::install_version('tidyverse', version='1.2.1')
devtools::install_version('rlang', version='0.2.2')
devtools::install_version('rlang', version='0.3.0')

# devtools::install_url() requires BiocInstaller
install.packages('https://bioconductor.org/packages/3.6/bioc/src/contrib/BiocInstaller_1.28.0.tar.gz')
Expand Down
2 changes: 1 addition & 1 deletion workers/install_gene_convert.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ devtools::install_version('data.table', version='1.11.0')
devtools::install_version('optparse', version='1.4.4')

devtools::install_version('tidyverse', version='1.2.1')
devtools::install_version('rlang', version='0.2.2')
devtools::install_version('rlang', version='0.3.0')

# devtools::install_url() requires BiocInstaller
install.packages('https://bioconductor.org/packages/3.6/bioc/src/contrib/BiocInstaller_1.28.0.tar.gz')
Expand Down