diff --git a/.circleci/config.yml b/.circleci/config.yml index c10d98868..2e23523f5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -195,6 +195,7 @@ jobs: working_directory: ~/refinebio steps: - checkout + - run: bash .circleci/verify_tag.sh - run: bash .circleci/git_decrypt.sh - run: bash .circleci/update_docker_img.sh - run: bash .circleci/run_terraform.sh diff --git a/.circleci/verify_tag.sh b/.circleci/verify_tag.sh new file mode 100755 index 000000000..e1a2bb693 --- /dev/null +++ b/.circleci/verify_tag.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +# This script verifies that the tag triggering this deploy was signed +# by a trusted member of the CCDL. + +dongbo_key="138F2211AC85BFB74DB4F59405153C3530E360A7" +rich_key="65EC6E219A655CA8CAC3096890D90E5F0A053C53" +casey_key="DFAA02F5552C553B00CC3DCC31D330047976BAA1" +kurt_key="B539C2CA0D4424660876A9381E1C8D1C2A663250" + +trusted_keys="$dongbo_key $rich_key $casey_key $kurt_key" + +for key in $trusted_keys; do + gpg --keyserver pgp.mit.edu --recv-keys $key +done + +# If it is not a good key then the exit code is 1, which will cause +# the deploy to fail. +git tag --verify $CIRCLE_TAG diff --git a/foreman/data_refinery_foreman/foreman/main.py b/foreman/data_refinery_foreman/foreman/main.py index eb4b89f0b..c51fb4e01 100644 --- a/foreman/data_refinery_foreman/foreman/main.py +++ b/foreman/data_refinery_foreman/foreman/main.py @@ -18,10 +18,11 @@ from data_refinery_common.message_queue import send_job from data_refinery_common.job_lookup import ProcessorPipeline, Downloaders from data_refinery_common.logging import get_and_configure_logger -from data_refinery_common.utils import get_env_variable +from data_refinery_common.utils import get_env_variable, get_env_variable_gracefully logger = get_and_configure_logger(__name__) +RUNNING_IN_CLOUD = get_env_variable_gracefully("RUNNING_IN_CLOUD", False) # Maximum number of retries, so the number of attempts will be one # greater than this because of the first attempt @@ -375,15 +376,31 @@ def retry_lost_processor_jobs() -> None: def monitor_jobs(): """Runs a thread for each job monitoring loop.""" - functions = [retry_failed_downloader_jobs, - retry_hung_downloader_jobs, - retry_lost_downloader_jobs, - retry_failed_processor_jobs, - retry_hung_processor_jobs, - retry_lost_processor_jobs] + processor_functions = [ retry_failed_processor_jobs, + retry_hung_processor_jobs, + retry_lost_processor_jobs] threads = [] - for f in functions: + for f in processor_functions: + thread = Thread(target=f, name=f.__name__) + thread.start() + threads.append(thread) + logger.info("Thread started for monitoring function: %s", f.__name__) + + # This is only a concern when running at scale. + if RUNNING_IN_CLOUD: + # We start the processor threads first so that we don't + # accidentally queue too many downloader jobs and knock down our + # source databases. They may take a while to run, and this + # function only runs once per deploy, so give a generous amount of + # time, say 5 minutes: + time.sleep(60*5) + + downloader_functions = [retry_failed_downloader_jobs, + retry_hung_downloader_jobs, + retry_lost_downloader_jobs] + + for f in downloader_functions: thread = Thread(target=f, name=f.__name__) thread.start() threads.append(thread) diff --git a/infrastructure/environments/prod.tfvars b/infrastructure/environments/prod.tfvars index 3b7b448b8..9fd640e60 100644 Binary files a/infrastructure/environments/prod.tfvars and b/infrastructure/environments/prod.tfvars differ diff --git a/infrastructure/instances.tf b/infrastructure/instances.tf index ebd787847..349658908 100644 --- a/infrastructure/instances.tf +++ b/infrastructure/instances.tf @@ -281,9 +281,13 @@ resource "aws_autoscaling_group" "clients" { name = "asg-clients-${var.user}-${var.stage}" max_size = "${var.max_clients}" min_size = "0" + desired_capacity = "4" health_check_grace_period = 300 health_check_type = "EC2" - default_cooldown = 0 + + # 300 seconds so we don't start more than an instance worth of + # downloader jobs at once as we scale up. + default_cooldown = 300 # Super important flag. Makes it so that terraform doesn't fail # every time because it can't acquire spot instances fast enough diff --git a/workers/data_refinery_workers/processors/salmon.py b/workers/data_refinery_workers/processors/salmon.py index edf664b8f..cb289c850 100644 --- a/workers/data_refinery_workers/processors/salmon.py +++ b/workers/data_refinery_workers/processors/salmon.py @@ -972,7 +972,7 @@ def salmon(job_id: int) -> None: _run_fastqc, _run_salmon, - _run_salmontools, + # _run_salmontools, _run_multiqc, utils.end_job]) return final_context diff --git a/workers/data_refinery_workers/processors/test_salmon.py b/workers/data_refinery_workers/processors/test_salmon.py index 4d82dc549..81a381fe9 100644 --- a/workers/data_refinery_workers/processors/test_salmon.py +++ b/workers/data_refinery_workers/processors/test_salmon.py @@ -433,78 +433,78 @@ def test_fastqc(self): self.assertFalse(fail['success']) -class SalmonToolsTestCase(TestCase): - """Test SalmonTools command.""" - - def setUp(self): - self.test_dir = '/home/user/data_store/salmontools/' - - @tag('salmon') - def test_double_reads(self): - """Test outputs when the sample has both left and right reads.""" - job_context = { - 'job_id': 123, - 'job': ProcessorJob(), - 'pipeline': Pipeline(name="Salmon"), - 'input_file_path': self.test_dir + 'double_input/reads_1.fastq', - 'input_file_path_2': self.test_dir + 'double_input/reads_2.fastq', - 'salmontools_directory': self.test_dir + 'double_salmontools/', - 'salmontools_archive': self.test_dir + 'salmontools-result.tar.gz', - 'output_directory': self.test_dir + 'double_output/', - 'computed_files': [] - } - os.makedirs(job_context["salmontools_directory"], exist_ok=True) - - homo_sapiens = Organism.get_object_for_name("HOMO_SAPIENS") - sample = Sample() - sample.organism = homo_sapiens - sample.save() - job_context["sample"] = sample - - salmon._run_salmontools(job_context) - - # Confirm job status - self.assertTrue(job_context["success"]) - - # Check two output files - output_file1 = job_context['salmontools_directory'] + 'unmapped_by_salmon_1.fa' - expected_output_file1 = self.test_dir + 'expected_double_output/unmapped_by_salmon_1.fa' - self.assertTrue(identical_checksum(output_file1, expected_output_file1)) - - output_file2 = job_context['salmontools_directory'] + 'unmapped_by_salmon_2.fa' - expected_output_file2 = self.test_dir + 'expected_double_output/unmapped_by_salmon_2.fa' - self.assertTrue(identical_checksum(output_file2, expected_output_file2)) - - @tag('salmon') - def test_single_read(self): - """Test outputs when the sample has one read only.""" - job_context = { - 'job_id': 456, - 'job': ProcessorJob(), - 'pipeline': Pipeline(name="Salmon"), - 'input_file_path': self.test_dir + 'single_input/single_read.fastq', - 'output_directory': self.test_dir + 'single_output/', - 'salmontools_directory': self.test_dir + 'single_salmontools/', - 'salmontools_archive': self.test_dir + 'salmontools-result.tar.gz', - 'computed_files': [] - } - os.makedirs(job_context["salmontools_directory"], exist_ok=True) - - homo_sapiens = Organism.get_object_for_name("HOMO_SAPIENS") - sample = Sample() - sample.organism = homo_sapiens - sample.save() - job_context["sample"] = sample - - salmon._run_salmontools(job_context) - - # Confirm job status - self.assertTrue(job_context["success"]) - - # Check output file - output_file = job_context['salmontools_directory'] + 'unmapped_by_salmon.fa' - expected_output_file = self.test_dir + 'expected_single_output/unmapped_by_salmon.fa' - self.assertTrue(identical_checksum(output_file, expected_output_file)) +# class SalmonToolsTestCase(TestCase): +# """Test SalmonTools command.""" + +# def setUp(self): +# self.test_dir = '/home/user/data_store/salmontools/' + +# @tag('salmon') +# def test_double_reads(self): +# """Test outputs when the sample has both left and right reads.""" +# job_context = { +# 'job_id': 123, +# 'job': ProcessorJob(), +# 'pipeline': Pipeline(name="Salmon"), +# 'input_file_path': self.test_dir + 'double_input/reads_1.fastq', +# 'input_file_path_2': self.test_dir + 'double_input/reads_2.fastq', +# 'salmontools_directory': self.test_dir + 'double_salmontools/', +# 'salmontools_archive': self.test_dir + 'salmontools-result.tar.gz', +# 'output_directory': self.test_dir + 'double_output/', +# 'computed_files': [] +# } +# os.makedirs(job_context["salmontools_directory"], exist_ok=True) + +# homo_sapiens = Organism.get_object_for_name("HOMO_SAPIENS") +# sample = Sample() +# sample.organism = homo_sapiens +# sample.save() +# job_context["sample"] = sample + +# salmon._run_salmontools(job_context) + +# # Confirm job status +# self.assertTrue(job_context["success"]) + +# # Check two output files +# output_file1 = job_context['salmontools_directory'] + 'unmapped_by_salmon_1.fa' +# expected_output_file1 = self.test_dir + 'expected_double_output/unmapped_by_salmon_1.fa' +# self.assertTrue(identical_checksum(output_file1, expected_output_file1)) + +# output_file2 = job_context['salmontools_directory'] + 'unmapped_by_salmon_2.fa' +# expected_output_file2 = self.test_dir + 'expected_double_output/unmapped_by_salmon_2.fa' +# self.assertTrue(identical_checksum(output_file2, expected_output_file2)) + +# @tag('salmon') +# def test_single_read(self): +# """Test outputs when the sample has one read only.""" +# job_context = { +# 'job_id': 456, +# 'job': ProcessorJob(), +# 'pipeline': Pipeline(name="Salmon"), +# 'input_file_path': self.test_dir + 'single_input/single_read.fastq', +# 'output_directory': self.test_dir + 'single_output/', +# 'salmontools_directory': self.test_dir + 'single_salmontools/', +# 'salmontools_archive': self.test_dir + 'salmontools-result.tar.gz', +# 'computed_files': [] +# } +# os.makedirs(job_context["salmontools_directory"], exist_ok=True) + +# homo_sapiens = Organism.get_object_for_name("HOMO_SAPIENS") +# sample = Sample() +# sample.organism = homo_sapiens +# sample.save() +# job_context["sample"] = sample + +# salmon._run_salmontools(job_context) + +# # Confirm job status +# self.assertTrue(job_context["success"]) + +# # Check output file +# output_file = job_context['salmontools_directory'] + 'unmapped_by_salmon.fa' +# expected_output_file = self.test_dir + 'expected_single_output/unmapped_by_salmon.fa' +# self.assertTrue(identical_checksum(output_file, expected_output_file)) class DetermineIndexLengthTestCase(TestCase): """Test salmon._determine_index_length function, which gets the salmon index length of a sample.