-
-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set Limit To Maximum Concurrent Survey Jobs #838
Changes from 3 commits
8dc155a
5f69a60
e84a963
1c51701
82dcf0f
ea45d48
8ef5bf4
450b2fb
155c50b
2475531
82bc0b9
aab7918
33ed1ca
60f15e1
c33da40
4e43f41
9538fb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,12 +139,31 @@ def requeue_downloader_job(last_job: DownloaderJob) -> None: | |
|
||
def handle_downloader_jobs(jobs: List[DownloaderJob]) -> None: | ||
"""For each job in jobs, either retry it or log it.""" | ||
|
||
nomad_host = get_env_variable("NOMAD_HOST") | ||
nomad_port = get_env_variable("NOMAD_PORT", "4646") | ||
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30) | ||
# Maximum number of total jobs running at a time. | ||
# We do this now rather than import time for testing purposes. | ||
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", 1000)) | ||
all_jobs = nomad_client.jobs.get_jobs() | ||
if len(all_jobs) >= MAX_TOTAL_JOBS: | ||
logger.info("Not requeuing job until we're running fewer jobs.") | ||
return False | ||
|
||
jobs_dispatched = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should start at |
||
for job in jobs: | ||
if job.num_retries < MAX_NUM_RETRIES: | ||
requeue_downloader_job(job) | ||
jobs_dispatched = jobs_dispatched + 1 | ||
else: | ||
handle_repeated_failure(job) | ||
|
||
if jobs_dispatched >= MAX_TOTAL_JOBS: | ||
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more downloader jobs now.") | ||
return False | ||
|
||
return True | ||
|
||
@do_forever(MIN_LOOP_TIME) | ||
def retry_failed_downloader_jobs() -> None: | ||
|
@@ -306,12 +325,31 @@ def requeue_processor_job(last_job: ProcessorJob) -> None: | |
|
||
def handle_processor_jobs(jobs: List[ProcessorJob]) -> None: | ||
"""For each job in jobs, either retry it or log it.""" | ||
|
||
nomad_host = get_env_variable("NOMAD_HOST") | ||
nomad_port = get_env_variable("NOMAD_PORT", "4646") | ||
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30) | ||
# Maximum number of total jobs running at a time. | ||
# We do this now rather than import time for testing purposes. | ||
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", 1000)) | ||
all_jobs = nomad_client.jobs.get_jobs() | ||
if len(all_jobs) >= MAX_TOTAL_JOBS: | ||
logger.info("Not requeuing job until we're running fewer jobs.") | ||
return False | ||
|
||
jobs_dispatched = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. |
||
for job in jobs: | ||
if job.num_retries < MAX_NUM_RETRIES: | ||
requeue_processor_job(job) | ||
jobs_dispatched = jobs_dispatched + 1 | ||
else: | ||
handle_repeated_failure(job) | ||
|
||
if jobs_dispatched >= MAX_TOTAL_JOBS: | ||
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more processor jobs now.") | ||
return False | ||
|
||
return True | ||
|
||
@do_forever(MIN_LOOP_TIME) | ||
def retry_failed_processor_jobs() -> None: | ||
|
@@ -425,20 +463,8 @@ def requeue_survey_job(last_job: SurveyJob) -> None: | |
The new survey job will have num_retries one greater than | ||
last_job.num_retries. | ||
""" | ||
nomad_host = get_env_variable("NOMAD_HOST") | ||
nomad_port = get_env_variable("NOMAD_PORT", "4646") | ||
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30) | ||
# Maximum number of total jobs running at a time. | ||
# We do this now rather than import time for testing purposes. | ||
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", 1000)) | ||
|
||
lost_jobs = [] | ||
all_jobs = nomad_client.jobs.get_jobs() | ||
|
||
if len(all_jobs) >= MAX_TOTAL_JOBS: | ||
logger.info("Not requeuing job until we're running fewer jobs.") | ||
return False | ||
|
||
num_retries = last_job.num_retries + 1 | ||
|
||
new_job = SurveyJob(num_retries=num_retries, | ||
|
@@ -484,14 +510,34 @@ def requeue_survey_job(last_job: SurveyJob) -> None: | |
|
||
return True | ||
|
||
|
||
def handle_survey_jobs(jobs: List[SurveyJob]) -> None: | ||
"""For each job in jobs, either retry it or log it.""" | ||
|
||
nomad_host = get_env_variable("NOMAD_HOST") | ||
nomad_port = get_env_variable("NOMAD_PORT", "4646") | ||
nomad_client = Nomad(nomad_host, port=int(nomad_port), timeout=30) | ||
# Maximum number of total jobs running at a time. | ||
# We do this now rather than import time for testing purposes. | ||
MAX_TOTAL_JOBS = int(get_env_variable_gracefully("MAX_TOTAL_JOBS", 1000)) | ||
all_jobs = nomad_client.jobs.get_jobs() | ||
if len(all_jobs) >= MAX_TOTAL_JOBS: | ||
logger.info("Not requeuing job until we're running fewer jobs.") | ||
return False | ||
|
||
jobs_dispatched = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. |
||
for job in jobs: | ||
if job.num_retries < MAX_NUM_RETRIES: | ||
requeue_survey_job(job) | ||
jobs_dispatched = jobs_dispatched + 1 | ||
else: | ||
handle_repeated_failure(job) | ||
|
||
if jobs_dispatched >= MAX_TOTAL_JOBS: | ||
logger.info("We hit the maximum total jobs ceiling, so we're not handling any more survey jobs now.") | ||
return False | ||
|
||
return True | ||
|
||
@do_forever(MIN_LOOP_TIME) | ||
def retry_failed_survey_jobs() -> None: | ||
|
@@ -599,7 +645,7 @@ def retry_lost_survey_jobs() -> None: | |
handle_survey_jobs(lost_jobs) | ||
|
||
## | ||
# Main loop | ||
# Janitor | ||
## | ||
|
||
@do_forever(JANITOR_DISPATCH_TIME) | ||
|
@@ -626,6 +672,9 @@ def send_janitor_jobs(): | |
# If we can't dispatch this job, something else has gone wrong. | ||
continue | ||
|
||
## | ||
# Main loop | ||
## | ||
|
||
def monitor_jobs(): | ||
"""Runs a thread for each job monitoring loop.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that this will include dead jobs, we only want to consider pending and running jobs.