From f3755cbada044f3e3e9c5b065239285910e0c366 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 13:22:30 -0700 Subject: [PATCH 01/10] WIP for launching batch runs on AWS directly from COVIDScenarioPipeline --- batch/job_launcher.py | 103 ++++++++++++++++++++++++++++++++++++++++++ batch/runner.sh | 55 ++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 batch/job_launcher.py create mode 100644 batch/runner.sh diff --git a/batch/job_launcher.py b/batch/job_launcher.py new file mode 100644 index 000000000..faccf0c61 --- /dev/null +++ b/batch/job_launcher.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python + +import boto3 +import click +import glob +import os +import re +import tarfile +import time +import yaml + +@click.command() +@click.option("-p", "--job-prefix", type=str, required=True, + help="A short but descriptive string to use as an identifier for the job run") +@click.option("-c", "--config", "config_file", envvar="CONFIG_PATH", type=click.Path(exists=True), required=True, + help="configuration file for this run") +@click.option("-j", "--num-jobs", "num_jobs", type=click.IntRange(min=1), required=True, + help="total number of jobs to run in this batch") +@click.option("-s", "--sims-per-job", "sims_per_job", type=click.IntRange(min=1), required=True, + help="how many sims each job should run") +@click.option("-t", "--dvc-target", "dvc_target", type=click.Path(exists=True), required=True, + help="name of the .dvc file that is the last step in the pipeline") +@click.option("-i", "--s3-input-bucket", "s3_input_bucket", type=str, default="idd-input-data-sets") +@click.option("-o", "--s3-output-bucket", "s3_output_bucket", type=str, default="idd-pipeline-results") +@click.option("-d", "--job-definition", "batch_job_definition", type=str, default="Batch-CovidPipeline-Job") +@click.option("-q", "--job-queue", "batch_job_queue", type=str, default="Batch-CovidPipeline") +def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucket, s3_output_bucket, batch_job_definition, batch_job_queue): + + # A unique name for this job run, based on the job prefix and current time + job_name = "%s-%d" % (job_prefix, int(time.time())) + print("Preparing to run job: %s" % job_name) + + # Update and save the config file with the number of sims to run + print("Updating config file %s to run %d simulations..." % (config_file, sims_per_job)) + config = open(config_file).read() + config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, config) + with open(config_file, "w") as f: + f.write(config) + + # Prepare to tar up the current directory, excluding any dvc outputs, so it + # can be shipped to S3 + dvc_outputs = get_dvc_outputs() + tarfile_name = "%s.tar.gz" % job_name + tar = tarfile.open(tarfile_name, "w:gz") + for p in os.listdir('.'): + if not (p.startswith(".") or p.endswith("tar.gz") or p in dvc_outputs or p == "batch"): + tar.add(p) + tar.close() + + # Upload the tar'd contents of this directory and the runner script to S3 + runner_script_name = "%s-runner.sh" % job_name + s3_client = boto3.client('s3') + s3_client.upload_file("batch/runner.sh", s3_input_bucket, runner_script_name) + s3_client.upload_file(tarfile_name, s3_input_bucket, tarfile_name) + os.remove(tarfile_name) + + # Prepare and launch the num_jobs via AWS Batch. + model_data_path = "s3://%s/%s" % (s3_input_bucket, tarfile_name) + results_path = "s3://%s/%s" % (s3_output_bucket, job_name) + env_vars = [ + {"name": "CONFIG_PATH", "value": config_file}, + {"name": "S3_MODEL_DATA_PATH", "value": model_data_path}, + {"name": "DVC_TARGET", "value": dvc_target}, + {"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)}, + {"name": "S3_RESULTS_PATH", "value": results_path} + ] + s3_cp_run_script = "aws s3 cp s3://%s/%s $PWD/run-covid-pipeline" % (s3_input_bucket, runner_script_name) + command = ["sh", "-c", "%s; /bin/bash $PWD/run-covid-pipeline" % s3_cp_run_script] + container_overrides = { + 'vcpus': 72, + 'memory': 184000, + 'environment': env_vars, + 'command': command + } + + batch_client = boto3.client('batch') + if num_jobs > 1: + resp = batch_client.submit_job( + jobName=job_name, + jobQueue=batch_job_queue, + arrayProperties={'size': num_jobs}, + jobDefinition=batch_job_definition, + containerOverrides=container_overrides) + else: + resp = batch_client.submit_job( + jobName=job_name, + jobQueue=batch_job_queue, + jobDefinition=batch_job_definition, + containerOverrides=container_overrides) + + +def get_dvc_outputs(): + ret = [] + for dvc_file in glob.glob("*.dvc"): + with open(dvc_file) as df: + d = yaml.load(df, Loader=yaml.FullLoader) + if 'cmd' in d and 'outs' in d: + ret.extend([x['path'] for x in d['outs']]) + return ret + + +if __name__ == '__main__': + launch_batch() diff --git a/batch/runner.sh b/batch/runner.sh new file mode 100644 index 000000000..3f90e5c20 --- /dev/null +++ b/batch/runner.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +# Expected environment variables from AWS Batch env +# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run +# DVC_TARGET the name of the dvc file in the model that should be reproduced locally. +# DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space +# S3_RESULTS_PATH location in S3 to store the results + +# setup the python environment +HOME=/home/app +PYENV_ROOT=$HOME/.pyenv +PYTHON_VERSION=3.7.6 +PYTHON_VENV_DIR=$HOME/python_venv +PATH=$PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH +. $PYTHON_VENV_DIR/bin/activate + +# set optimized S3 configuration +aws configure set default.s3.max_concurrent_requests 100 +aws configure set default.s3.max_queue_size 100 +aws configure set default.s3.multipart_threshold 8MB +aws configure set default.s3.multipart_chunksize 8MB + +# Copy the complete model + data package from S3 and +# install the local R packages +aws s3 cp $S3_MODEL_DATA_PATH model_data.tar.gz +mkdir model_data +tar -xvzf model_data.tar.gz -C model_data +cd model_data +Rscript local_install.R + +# Initialize dvc and run the pipeline to re-create the +# dvc target +dvc init --no-scm +dvc repro $DVC_TARGET + +DVC_OUTPUTS_ARRAY=($DVC_OUTPUTS) +if [ -z "$AWS_BATCH_JOB_ARRAY_INDEX" ]; then + echo "Compressing and uploading outputs from singleton batch job" + for output in "${DVC_OUTPUTS_ARRAY[@]}" + do + "Saving output $output" + tar cv --use-compress-program=pbzip2 -f $output.tar.bz2 $output + aws s3 cp $output.tar.bz2 $S3_RESULTS_PATH/ + done +else + echo "Saving outputs from array batch job" + for output in "${DVC_OUTPUTS_ARRAY[@]}" + do + echo "Saving output $output" + aws s3 cp --recursive $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/ + aws s3 sync $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/ --delete + done +fi + +echo "Done" From 655b2d1b04a99ae42e0fd3a2159ac1440ce73a69 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 14:53:10 -0700 Subject: [PATCH 02/10] dvc init w/o usage tracking --- .dvc/.gitignore | 9 +++++++++ .dvc/config | 2 ++ 2 files changed, 11 insertions(+) create mode 100644 .dvc/.gitignore create mode 100644 .dvc/config diff --git a/.dvc/.gitignore b/.dvc/.gitignore new file mode 100644 index 000000000..9614e4e9d --- /dev/null +++ b/.dvc/.gitignore @@ -0,0 +1,9 @@ +/config.local +/updater +/lock +/updater.lock +/tmp +/state-journal +/state-wal +/state +/cache diff --git a/.dvc/config b/.dvc/config new file mode 100644 index 000000000..c1372072b --- /dev/null +++ b/.dvc/config @@ -0,0 +1,2 @@ +[core] + analytics = false From dafe067b64a475d9ba825f999e7782f1097e8d31 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 15:01:22 -0700 Subject: [PATCH 03/10] Always ignore the data/ directory now --- .gitignore | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 46ce9c10a..360d39e58 100644 --- a/.gitignore +++ b/.gitignore @@ -5,8 +5,7 @@ COVID-19_California.Rproj __pycache__/ COVID-19_California.code-workspace -data/RAW/ -data/dataside/ +data/ dev-versions/ reports/ doc/html From 56667834997364a074ca1ee69f36095e285b8ddb Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 15:02:38 -0700 Subject: [PATCH 04/10] git should ignore the standard output file directories --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 360d39e58..360de6ed8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,10 @@ COVID-19_California.Rproj __pycache__/ COVID-19_California.code-workspace data/ +importation/ +model_output/ +model_parameters/ +hospitalization/ dev-versions/ reports/ doc/html From bc8a4dfeb6229838e1d3b8801f5639660ca83ed3 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 16:19:39 -0700 Subject: [PATCH 05/10] More batch/dvc workflow updates --- batch/{job_launcher.py => launch_job.py} | 11 +++++ batch/prepare_repo.py | 53 ++++++++++++++++++++++++ run_dvc.sh | 28 +++++++++++++ 3 files changed, 92 insertions(+) rename batch/{job_launcher.py => launch_job.py} (93%) create mode 100644 batch/prepare_repo.py create mode 100644 run_dvc.sh diff --git a/batch/job_launcher.py b/batch/launch_job.py similarity index 93% rename from batch/job_launcher.py rename to batch/launch_job.py index faccf0c61..0f3746e67 100644 --- a/batch/job_launcher.py +++ b/batch/launch_job.py @@ -30,6 +30,13 @@ def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3 job_name = "%s-%d" % (job_prefix, int(time.time())) print("Preparing to run job: %s" % job_name) + print("Verifying that dvc target is up to date...") + exit_code, output = subprocess.getstatusoutput("dvc status") + if exit_code != 0: + print("dvc status is not up to date...") + print(output) + return 1 + # Update and save the config file with the number of sims to run print("Updating config file %s to run %d simulations..." % (config_file, sims_per_job)) config = open(config_file).read() @@ -88,6 +95,10 @@ def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3 jobDefinition=batch_job_definition, containerOverrides=container_overrides) + # TODO: record batch job info to a file so it can be tracked + + return 0 + def get_dvc_outputs(): ret = [] diff --git a/batch/prepare_repo.py b/batch/prepare_repo.py new file mode 100644 index 000000000..a90689dc7 --- /dev/null +++ b/batch/prepare_repo.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +import click +from datetime import datetime +import re +import subprocess + +@click.command() +@click.option("-d", "--data-repo", "data_repo", type=str, required=True, + help="The name of the HopkinsIDD/ repo whose data should be used for the run (e.g., COVID19_Minimal)") +@click.option("-u", "--user", "user", envvar="USER", required=True, + help="The user who is kicking off this run") +@click.option("-c", "--config", "config_file", type=str, default="config.yml", + help="The name of the config file in the data repo to use for the current run") +@click.option("-n", "--num-test-sims", "num_test_sims", type=click.IntRange(min=1), default=15, + help="The number of local test simulations to run in the config") +def prepare_repo(data_repo, user, config_file, num_test_sims): + + # Create a new branch to track the run + branch_name = "run_%s_%s_%s" % (data_repo.lower(), user, datetime.today().strftime('%Y%m%d%H%M%S')) + print("Creating run branch named %s..." % branch_name) + branch_proc = subprocess.Popen(["git", "checkout", "-b", branch_name], stdout=subprocess.PIPE) + print(branch_proc.communicate()[0]) + + # Import the data/ directory from the data_repo with dvc + if not data_repo.endswith(".git"): + data_repo = "git@github.com:HopkinsIDD/%s.git" % data_repo + print("Importing data/ from %s..." % data_repo) + import_proc = subprocess.Popen(["dvc", "import", data_repo, "data"], stdout=subprocess.PIPE) + print(import_proc.communicate()[0]) + + # Get the config file for the run + print("Getting %s from %s..." % (config_file, data_repo)) + import_proc = subprocess.Popen(["dvc", "get", data_repo, config_file], stdout=subprocess.PIPE) + print(import_proc.communicate()[0]) + + print("Updating config file %s to run %d simulations..." % (config_file, num_test_sims)) + config = open(config_file).read() + config = re.sub("nsimulations: \d+", "nsimulations: %d" % num_test_sims, config) + with open(config_file, "w") as f: + f.write(config) + + print("Committing data and config for run...") + add_proc = subprocess.Popen(["git", "add", "data.dvc", config_file], stdout=subprocess.PIPE) + print(add_proc.communicate()[0]) + commit_proc = subprocess.Popen(["git", "commit", "-m", "'Commiting data and config for run'"], stdout=subprocess.PIPE) + print(commit_proc.communicate()[0]) + + print("Branch %s is ready; execute 'run_dvc.sh %s' to setup the commands for the batch run" % (branch_name, config_file)) + + +if __name__ == '__main__': + prepare_repo() diff --git a/run_dvc.sh b/run_dvc.sh new file mode 100644 index 000000000..be743fdfa --- /dev/null +++ b/run_dvc.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +if [[ $# -eq 0 ]] ; then + echo 'Usage: run_dvc.sh ' + exit 0 +fi + +CONFIG=$1 + +if [[ ! -f model_output.dvc ]]; then + dvc run \ + -d data -d $CONFIG \ + -o model_output -o model_parameters \ + python3 simulate.py -c $CONFIG + git add model_output.dvc + git commit -m "Commit model_output.dvc from run" +fi + +if [[ ! -f hospitalization.dvc ]]; then + dvc run \ + -d model_output \ + -o hospitalization \ + Rscript R/scripts/hosp_run.R -c $CONFIG -p . + git add hospitalization.dvc + got commit -m "Commit hospitalization.dvc from run" +fi + +echo "dvc run commands are saved; batch job is ready to be launched on AWS via batch/launch_job.py" From e89e066930b57545e1e128e083967be3604daf70 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 16:36:47 -0700 Subject: [PATCH 06/10] Integrate Sam's feedback --- batch/launch_job.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/batch/launch_job.py b/batch/launch_job.py index 0f3746e67..033ba1c69 100644 --- a/batch/launch_job.py +++ b/batch/launch_job.py @@ -10,8 +10,6 @@ import yaml @click.command() -@click.option("-p", "--job-prefix", type=str, required=True, - help="A short but descriptive string to use as an identifier for the job run") @click.option("-c", "--config", "config_file", envvar="CONFIG_PATH", type=click.Path(exists=True), required=True, help="configuration file for this run") @click.option("-j", "--num-jobs", "num_jobs", type=click.IntRange(min=1), required=True, @@ -19,15 +17,24 @@ @click.option("-s", "--sims-per-job", "sims_per_job", type=click.IntRange(min=1), required=True, help="how many sims each job should run") @click.option("-t", "--dvc-target", "dvc_target", type=click.Path(exists=True), required=True, - help="name of the .dvc file that is the last step in the pipeline") -@click.option("-i", "--s3-input-bucket", "s3_input_bucket", type=str, default="idd-input-data-sets") -@click.option("-o", "--s3-output-bucket", "s3_output_bucket", type=str, default="idd-pipeline-results") -@click.option("-d", "--job-definition", "batch_job_definition", type=str, default="Batch-CovidPipeline-Job") -@click.option("-q", "--job-queue", "batch_job_queue", type=str, default="Batch-CovidPipeline") -def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucket, s3_output_bucket, batch_job_definition, batch_job_queue): + help="name of the .dvc file that is the last step in the dvc run pipeline") +@click.option("-i", "--s3-input-bucket", "s3_input_bucket", type=str, default="idd-input-data-sets", show_default=True, + help="The S3 bucket to use for uploading the code and configuration used by the batch job") +@click.option("-o", "--s3-output-bucket", "s3_output_bucket", type=str, default="idd-pipeline-results", show_default=True, + help="The S3 bucket for storing the job's outputs") +@click.option("-d", "--job-definition", "batch_job_definition", type=str, default="Batch-CovidPipeline-Job", show_default=True, + help="The name of the AWS Batch Job Definition to use for the job") +@click.option("-q", "--job-queue", "batch_job_queue", type=str, default="Batch-CovidPipeline", show_default=True, + help="The name of the AWS Batch Job Queue to use for the job") +def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucket, s3_output_bucket, batch_job_definition, batch_job_queue): - # A unique name for this job run, based on the job prefix and current time - job_name = "%s-%d" % (job_prefix, int(time.time())) + raw_config = None + with open(config_file) as f: + raw_config = f.read() + parsed_config = yaml.full_load(raw_config) + + # A unique name for this job run, based on the config name and current time + job_name = "%s-%d" % (parsed_config['name'], int(time.time())) print("Preparing to run job: %s" % job_name) print("Verifying that dvc target is up to date...") @@ -39,10 +46,9 @@ def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3 # Update and save the config file with the number of sims to run print("Updating config file %s to run %d simulations..." % (config_file, sims_per_job)) - config = open(config_file).read() - config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, config) + raw_config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, raw_config) with open(config_file, "w") as f: - f.write(config) + f.write(raw_config) # Prepare to tar up the current directory, excluding any dvc outputs, so it # can be shipped to S3 From 179cc854a3b5853310947743fd8f4c3e99f2ddd3 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 16:45:37 -0700 Subject: [PATCH 07/10] f-string all the things --- batch/launch_job.py | 18 ++++++------ batch/prepare_repo.py | 66 +++++++++++++++++++++---------------------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/batch/launch_job.py b/batch/launch_job.py index 033ba1c69..d181dddac 100644 --- a/batch/launch_job.py +++ b/batch/launch_job.py @@ -34,7 +34,7 @@ def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucke parsed_config = yaml.full_load(raw_config) # A unique name for this job run, based on the config name and current time - job_name = "%s-%d" % (parsed_config['name'], int(time.time())) + job_name = f"{parsed_config['name']}-{int(time.time())}" print("Preparing to run job: %s" % job_name) print("Verifying that dvc target is up to date...") @@ -45,7 +45,7 @@ def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucke return 1 # Update and save the config file with the number of sims to run - print("Updating config file %s to run %d simulations..." % (config_file, sims_per_job)) + print(f"Updating {config_file} to run {sims_per_job} simulations...") raw_config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, raw_config) with open(config_file, "w") as f: f.write(raw_config) @@ -53,7 +53,7 @@ def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucke # Prepare to tar up the current directory, excluding any dvc outputs, so it # can be shipped to S3 dvc_outputs = get_dvc_outputs() - tarfile_name = "%s.tar.gz" % job_name + tarfile_name = f"{job_name}.tar.gz" tar = tarfile.open(tarfile_name, "w:gz") for p in os.listdir('.'): if not (p.startswith(".") or p.endswith("tar.gz") or p in dvc_outputs or p == "batch"): @@ -61,15 +61,15 @@ def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucke tar.close() # Upload the tar'd contents of this directory and the runner script to S3 - runner_script_name = "%s-runner.sh" % job_name + runner_script_name = f"{job_name}-runner.sh" s3_client = boto3.client('s3') s3_client.upload_file("batch/runner.sh", s3_input_bucket, runner_script_name) s3_client.upload_file(tarfile_name, s3_input_bucket, tarfile_name) os.remove(tarfile_name) # Prepare and launch the num_jobs via AWS Batch. - model_data_path = "s3://%s/%s" % (s3_input_bucket, tarfile_name) - results_path = "s3://%s/%s" % (s3_output_bucket, job_name) + model_data_path = f"s3://{s3_input_bucket}/{tarfile_name}" + results_path = f"s3://{s3_output_bucket}/{job_name}" env_vars = [ {"name": "CONFIG_PATH", "value": config_file}, {"name": "S3_MODEL_DATA_PATH", "value": model_data_path}, @@ -77,8 +77,8 @@ def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucke {"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)}, {"name": "S3_RESULTS_PATH", "value": results_path} ] - s3_cp_run_script = "aws s3 cp s3://%s/%s $PWD/run-covid-pipeline" % (s3_input_bucket, runner_script_name) - command = ["sh", "-c", "%s; /bin/bash $PWD/run-covid-pipeline" % s3_cp_run_script] + s3_cp_run_script = f"aws s3 cp s3://{s3_input_bucket}/{runner_script_name} $PWD/run-covid-pipeline" + command = ["sh", "-c", f"{s3_cp_run_script}; /bin/bash $PWD/run-covid-pipeline"] container_overrides = { 'vcpus': 72, 'memory': 184000, @@ -110,7 +110,7 @@ def get_dvc_outputs(): ret = [] for dvc_file in glob.glob("*.dvc"): with open(dvc_file) as df: - d = yaml.load(df, Loader=yaml.FullLoader) + d = yaml.full_load(df) if 'cmd' in d and 'outs' in d: ret.extend([x['path'] for x in d['outs']]) return ret diff --git a/batch/prepare_repo.py b/batch/prepare_repo.py index a90689dc7..6a3f69bb5 100644 --- a/batch/prepare_repo.py +++ b/batch/prepare_repo.py @@ -10,43 +10,43 @@ help="The name of the HopkinsIDD/ repo whose data should be used for the run (e.g., COVID19_Minimal)") @click.option("-u", "--user", "user", envvar="USER", required=True, help="The user who is kicking off this run") -@click.option("-c", "--config", "config_file", type=str, default="config.yml", +@click.option("-c", "--config", "config_file", type=str, default="config.yml", show_default=True, help="The name of the config file in the data repo to use for the current run") -@click.option("-n", "--num-test-sims", "num_test_sims", type=click.IntRange(min=1), default=15, +@click.option("-n", "--num-test-sims", "num_test_sims", type=click.IntRange(min=1), default=15, show_default=True, help="The number of local test simulations to run in the config") def prepare_repo(data_repo, user, config_file, num_test_sims): - # Create a new branch to track the run - branch_name = "run_%s_%s_%s" % (data_repo.lower(), user, datetime.today().strftime('%Y%m%d%H%M%S')) - print("Creating run branch named %s..." % branch_name) - branch_proc = subprocess.Popen(["git", "checkout", "-b", branch_name], stdout=subprocess.PIPE) - print(branch_proc.communicate()[0]) - - # Import the data/ directory from the data_repo with dvc - if not data_repo.endswith(".git"): - data_repo = "git@github.com:HopkinsIDD/%s.git" % data_repo - print("Importing data/ from %s..." % data_repo) - import_proc = subprocess.Popen(["dvc", "import", data_repo, "data"], stdout=subprocess.PIPE) - print(import_proc.communicate()[0]) - - # Get the config file for the run - print("Getting %s from %s..." % (config_file, data_repo)) - import_proc = subprocess.Popen(["dvc", "get", data_repo, config_file], stdout=subprocess.PIPE) - print(import_proc.communicate()[0]) - - print("Updating config file %s to run %d simulations..." % (config_file, num_test_sims)) - config = open(config_file).read() - config = re.sub("nsimulations: \d+", "nsimulations: %d" % num_test_sims, config) - with open(config_file, "w") as f: - f.write(config) - - print("Committing data and config for run...") - add_proc = subprocess.Popen(["git", "add", "data.dvc", config_file], stdout=subprocess.PIPE) - print(add_proc.communicate()[0]) - commit_proc = subprocess.Popen(["git", "commit", "-m", "'Commiting data and config for run'"], stdout=subprocess.PIPE) - print(commit_proc.communicate()[0]) - - print("Branch %s is ready; execute 'run_dvc.sh %s' to setup the commands for the batch run" % (branch_name, config_file)) + # Create a new branch to track the run + branch_name = f"run_{data_repo.lower()}_{user}_{datetime.today().strftime('%Y%m%d%H%M%S')}" + print(f"Creating run branch named {branch_name}...") + branch_proc = subprocess.Popen(["git", "checkout", "-b", branch_name], stdout=subprocess.PIPE) + print(branch_proc.communicate()[0]) + + # Import the data/ directory from the data_repo with dvc + if not data_repo.endswith(".git"): + data_repo = f"git@github.com:HopkinsIDD/{data_repo}.git" + print(f"Importing data/ from {data_repo}...") + import_proc = subprocess.Popen(["dvc", "import", data_repo, "data"], stdout=subprocess.PIPE) + print(import_proc.communicate()[0]) + + # Get the config file for the run + print(f"Getting {config_file} from {data_repo}...") + import_proc = subprocess.Popen(["dvc", "get", data_repo, config_file], stdout=subprocess.PIPE) + print(import_proc.communicate()[0]) + + print(f"Updating config file {config_file} to run {num_test_sims} simulations...") + config = open(config_file).read() + config = re.sub("nsimulations: \d+", "nsimulations: %d" % num_test_sims, config) + with open(config_file, "w") as f: + f.write(config) + + print("Committing data and config for run...") + add_proc = subprocess.Popen(["git", "add", "data.dvc", config_file], stdout=subprocess.PIPE) + print(add_proc.communicate()[0]) + commit_proc = subprocess.Popen(["git", "commit", "-m", "'Commiting data and config for run'"], stdout=subprocess.PIPE) + print(commit_proc.communicate()[0]) + + print(f"Branch {branch_name} is ready; execute 'run_dvc.sh {config_file}' to setup the commands for the batch run") if __name__ == '__main__': From 74167b5f93ad1aec41ada824dd8d4aaf6ede8b3d Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 22:57:12 -0700 Subject: [PATCH 08/10] Various fixes to the batch/dvc setup and run scripts I found while, ya know, actually running them --- batch/launch_job.py | 1 + batch/prepare_repo.py | 26 ++++++++++---------------- run_dvc.sh | 2 +- 3 files changed, 12 insertions(+), 17 deletions(-) mode change 100644 => 100755 batch/launch_job.py mode change 100644 => 100755 batch/prepare_repo.py mode change 100644 => 100755 run_dvc.sh diff --git a/batch/launch_job.py b/batch/launch_job.py old mode 100644 new mode 100755 index d181dddac..360d843e3 --- a/batch/launch_job.py +++ b/batch/launch_job.py @@ -5,6 +5,7 @@ import glob import os import re +import subprocess import tarfile import time import yaml diff --git a/batch/prepare_repo.py b/batch/prepare_repo.py old mode 100644 new mode 100755 index 6a3f69bb5..79c266261 --- a/batch/prepare_repo.py +++ b/batch/prepare_repo.py @@ -3,6 +3,7 @@ import click from datetime import datetime import re +import os import subprocess @click.command() @@ -18,33 +19,26 @@ def prepare_repo(data_repo, user, config_file, num_test_sims): # Create a new branch to track the run branch_name = f"run_{data_repo.lower()}_{user}_{datetime.today().strftime('%Y%m%d%H%M%S')}" - print(f"Creating run branch named {branch_name}...") - branch_proc = subprocess.Popen(["git", "checkout", "-b", branch_name], stdout=subprocess.PIPE) - print(branch_proc.communicate()[0]) + subprocess.run(["git", "checkout", "-b", branch_name]) - # Import the data/ directory from the data_repo with dvc + # Import the data/ directory from the data_repo with dvc if it's not here already if not data_repo.endswith(".git"): data_repo = f"git@github.com:HopkinsIDD/{data_repo}.git" - print(f"Importing data/ from {data_repo}...") - import_proc = subprocess.Popen(["dvc", "import", data_repo, "data"], stdout=subprocess.PIPE) - print(import_proc.communicate()[0]) + if not os.path.isfile("data.dvc"): + subprocess.run(["dvc", "import", data_repo, "data"]) + subprocess.run(["git", "add", "data.dvc"]) + subprocess.run(["git", "commit", "-m", "'Add data.dvc'"]) # Get the config file for the run - print(f"Getting {config_file} from {data_repo}...") - import_proc = subprocess.Popen(["dvc", "get", data_repo, config_file], stdout=subprocess.PIPE) - print(import_proc.communicate()[0]) + subprocess.run(["dvc", "get", data_repo, config_file]) print(f"Updating config file {config_file} to run {num_test_sims} simulations...") config = open(config_file).read() config = re.sub("nsimulations: \d+", "nsimulations: %d" % num_test_sims, config) with open(config_file, "w") as f: f.write(config) - - print("Committing data and config for run...") - add_proc = subprocess.Popen(["git", "add", "data.dvc", config_file], stdout=subprocess.PIPE) - print(add_proc.communicate()[0]) - commit_proc = subprocess.Popen(["git", "commit", "-m", "'Commiting data and config for run'"], stdout=subprocess.PIPE) - print(commit_proc.communicate()[0]) + subprocess.run(["git", "add", config_file]) + subprocess.run(["git", "commit", "-m", "'Commiting config file for run'"]) print(f"Branch {branch_name} is ready; execute 'run_dvc.sh {config_file}' to setup the commands for the batch run") diff --git a/run_dvc.sh b/run_dvc.sh old mode 100644 new mode 100755 index be743fdfa..790386177 --- a/run_dvc.sh +++ b/run_dvc.sh @@ -22,7 +22,7 @@ if [[ ! -f hospitalization.dvc ]]; then -o hospitalization \ Rscript R/scripts/hosp_run.R -c $CONFIG -p . git add hospitalization.dvc - got commit -m "Commit hospitalization.dvc from run" + git commit -m "Commit hospitalization.dvc from run" fi echo "dvc run commands are saved; batch job is ready to be launched on AWS via batch/launch_job.py" From 9b1379038142676425387e1ba35307736961aeb1 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Mon, 20 Apr 2020 23:18:11 -0700 Subject: [PATCH 09/10] Add a forcing call for this since I always forget to run it --- run_dvc.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/run_dvc.sh b/run_dvc.sh index 790386177..35c9bfe5e 100755 --- a/run_dvc.sh +++ b/run_dvc.sh @@ -7,6 +7,9 @@ fi CONFIG=$1 +# First, make sure local R packages are up to date. +Rscript local_install.R + if [[ ! -f model_output.dvc ]]; then dvc run \ -d data -d $CONFIG \ From f1930be2c42915e992e4e6bf0573cc0ecc7f1fb8 Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 21 Apr 2020 15:46:45 -0700 Subject: [PATCH 10/10] Some fixes to make the batch runner script more useful/debuggable --- batch/runner.sh | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/batch/runner.sh b/batch/runner.sh index 3f90e5c20..1651c514d 100644 --- a/batch/runner.sh +++ b/batch/runner.sh @@ -1,5 +1,8 @@ #!/bin/bash +set -x +set -e + # Expected environment variables from AWS Batch env # S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run # DVC_TARGET the name of the dvc file in the model that should be reproduced locally. @@ -35,20 +38,16 @@ dvc repro $DVC_TARGET DVC_OUTPUTS_ARRAY=($DVC_OUTPUTS) if [ -z "$AWS_BATCH_JOB_ARRAY_INDEX" ]; then - echo "Compressing and uploading outputs from singleton batch job" for output in "${DVC_OUTPUTS_ARRAY[@]}" do - "Saving output $output" tar cv --use-compress-program=pbzip2 -f $output.tar.bz2 $output aws s3 cp $output.tar.bz2 $S3_RESULTS_PATH/ done else - echo "Saving outputs from array batch job" for output in "${DVC_OUTPUTS_ARRAY[@]}" do - echo "Saving output $output" - aws s3 cp --recursive $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/ - aws s3 sync $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/ --delete + aws s3 cp --recursive $output $S3_RESULTS_PATH/$AWS_BATCH_JOB_ID/$output/ + aws s3 sync $output $S3_RESULTS_PATH/$AWS_BATCH_JOB_ID/$output/ --delete done fi