Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

WIP for launching batch runs on AWS directly from COVIDScenarioPipeline #223

Closed
wants to merge 12 commits into from
9 changes: 9 additions & 0 deletions .dvc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/config.local
/updater
/lock
/updater.lock
/tmp
/state-journal
/state-wal
/state
/cache
2 changes: 2 additions & 0 deletions .dvc/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[core]
analytics = false
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
COVID-19_California.Rproj
__pycache__/
COVID-19_California.code-workspace
data/RAW/
data/dataside/
data/
importation/
model_output/
model_parameters/
hospitalization/
dev-versions/
reports/
doc/html
Expand Down
121 changes: 121 additions & 0 deletions batch/launch_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#!/usr/bin/env python

import boto3
import click
import glob
import os
import re
import subprocess
import tarfile
import time
import yaml

@click.command()
@click.option("-c", "--config", "config_file", envvar="CONFIG_PATH", type=click.Path(exists=True), required=True,
help="configuration file for this run")
@click.option("-j", "--num-jobs", "num_jobs", type=click.IntRange(min=1), required=True,
help="total number of jobs to run in this batch")
@click.option("-s", "--sims-per-job", "sims_per_job", type=click.IntRange(min=1), required=True,
help="how many sims each job should run")
@click.option("-t", "--dvc-target", "dvc_target", type=click.Path(exists=True), required=True,
help="name of the .dvc file that is the last step in the dvc run pipeline")
@click.option("-i", "--s3-input-bucket", "s3_input_bucket", type=str, default="idd-input-data-sets", show_default=True,
help="The S3 bucket to use for uploading the code and configuration used by the batch job")
@click.option("-o", "--s3-output-bucket", "s3_output_bucket", type=str, default="idd-pipeline-results", show_default=True,
help="The S3 bucket for storing the job's outputs")
@click.option("-d", "--job-definition", "batch_job_definition", type=str, default="Batch-CovidPipeline-Job", show_default=True,
help="The name of the AWS Batch Job Definition to use for the job")
@click.option("-q", "--job-queue", "batch_job_queue", type=str, default="Batch-CovidPipeline", show_default=True,
help="The name of the AWS Batch Job Queue to use for the job")
def launch_batch(config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucket, s3_output_bucket, batch_job_definition, batch_job_queue):

raw_config = None
with open(config_file) as f:
raw_config = f.read()
parsed_config = yaml.full_load(raw_config)

# A unique name for this job run, based on the config name and current time
job_name = f"{parsed_config['name']}-{int(time.time())}"
print("Preparing to run job: %s" % job_name)

print("Verifying that dvc target is up to date...")
exit_code, output = subprocess.getstatusoutput("dvc status")
if exit_code != 0:
print("dvc status is not up to date...")
print(output)
return 1

# Update and save the config file with the number of sims to run
print(f"Updating {config_file} to run {sims_per_job} simulations...")
raw_config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, raw_config)
with open(config_file, "w") as f:
f.write(raw_config)

# Prepare to tar up the current directory, excluding any dvc outputs, so it
# can be shipped to S3
dvc_outputs = get_dvc_outputs()
tarfile_name = f"{job_name}.tar.gz"
tar = tarfile.open(tarfile_name, "w:gz")
for p in os.listdir('.'):
if not (p.startswith(".") or p.endswith("tar.gz") or p in dvc_outputs or p == "batch"):
tar.add(p)
tar.close()

# Upload the tar'd contents of this directory and the runner script to S3
runner_script_name = f"{job_name}-runner.sh"
s3_client = boto3.client('s3')
s3_client.upload_file("batch/runner.sh", s3_input_bucket, runner_script_name)
s3_client.upload_file(tarfile_name, s3_input_bucket, tarfile_name)
os.remove(tarfile_name)

# Prepare and launch the num_jobs via AWS Batch.
model_data_path = f"s3://{s3_input_bucket}/{tarfile_name}"
results_path = f"s3://{s3_output_bucket}/{job_name}"
env_vars = [
{"name": "CONFIG_PATH", "value": config_file},
{"name": "S3_MODEL_DATA_PATH", "value": model_data_path},
{"name": "DVC_TARGET", "value": dvc_target},
{"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)},
{"name": "S3_RESULTS_PATH", "value": results_path}
]
s3_cp_run_script = f"aws s3 cp s3://{s3_input_bucket}/{runner_script_name} $PWD/run-covid-pipeline"
command = ["sh", "-c", f"{s3_cp_run_script}; /bin/bash $PWD/run-covid-pipeline"]
container_overrides = {
'vcpus': 72,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should one allow an override on that? If the advanced user wants a different configuration?

'memory': 184000,
'environment': env_vars,
'command': command
}

batch_client = boto3.client('batch')
if num_jobs > 1:
resp = batch_client.submit_job(
jobName=job_name,
jobQueue=batch_job_queue,
arrayProperties={'size': num_jobs},
jobDefinition=batch_job_definition,
containerOverrides=container_overrides)
else:
resp = batch_client.submit_job(
jobName=job_name,
jobQueue=batch_job_queue,
jobDefinition=batch_job_definition,
containerOverrides=container_overrides)

# TODO: record batch job info to a file so it can be tracked

return 0


def get_dvc_outputs():
ret = []
for dvc_file in glob.glob("*.dvc"):
with open(dvc_file) as df:
d = yaml.full_load(df)
if 'cmd' in d and 'outs' in d:
ret.extend([x['path'] for x in d['outs']])
return ret


if __name__ == '__main__':
launch_batch()
47 changes: 47 additions & 0 deletions batch/prepare_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python

import click
from datetime import datetime
import re
import os
import subprocess

@click.command()
@click.option("-d", "--data-repo", "data_repo", type=str, required=True,
help="The name of the HopkinsIDD/ repo whose data should be used for the run (e.g., COVID19_Minimal)")
@click.option("-u", "--user", "user", envvar="USER", required=True,
help="The user who is kicking off this run")
@click.option("-c", "--config", "config_file", type=str, default="config.yml", show_default=True,
help="The name of the config file in the data repo to use for the current run")
@click.option("-n", "--num-test-sims", "num_test_sims", type=click.IntRange(min=1), default=15, show_default=True,
help="The number of local test simulations to run in the config")
def prepare_repo(data_repo, user, config_file, num_test_sims):

# Create a new branch to track the run
branch_name = f"run_{data_repo.lower()}_{user}_{datetime.today().strftime('%Y%m%d%H%M%S')}"
subprocess.run(["git", "checkout", "-b", branch_name])

# Import the data/ directory from the data_repo with dvc if it's not here already
if not data_repo.endswith(".git"):
data_repo = f"[email protected]:HopkinsIDD/{data_repo}.git"
if not os.path.isfile("data.dvc"):
subprocess.run(["dvc", "import", data_repo, "data"])
subprocess.run(["git", "add", "data.dvc"])
subprocess.run(["git", "commit", "-m", "'Add data.dvc'"])

# Get the config file for the run
subprocess.run(["dvc", "get", data_repo, config_file])

print(f"Updating config file {config_file} to run {num_test_sims} simulations...")
config = open(config_file).read()
config = re.sub("nsimulations: \d+", "nsimulations: %d" % num_test_sims, config)
with open(config_file, "w") as f:
f.write(config)
subprocess.run(["git", "add", config_file])
subprocess.run(["git", "commit", "-m", "'Commiting config file for run'"])

print(f"Branch {branch_name} is ready; execute 'run_dvc.sh {config_file}' to setup the commands for the batch run")


if __name__ == '__main__':
prepare_repo()
54 changes: 54 additions & 0 deletions batch/runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

set -x
set -e

# Expected environment variables from AWS Batch env
# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run
# DVC_TARGET the name of the dvc file in the model that should be reproduced locally.
# DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space
# S3_RESULTS_PATH location in S3 to store the results

# setup the python environment
HOME=/home/app
PYENV_ROOT=$HOME/.pyenv
PYTHON_VERSION=3.7.6
PYTHON_VENV_DIR=$HOME/python_venv
PATH=$PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH
. $PYTHON_VENV_DIR/bin/activate

# set optimized S3 configuration
aws configure set default.s3.max_concurrent_requests 100
aws configure set default.s3.max_queue_size 100
aws configure set default.s3.multipart_threshold 8MB
aws configure set default.s3.multipart_chunksize 8MB

# Copy the complete model + data package from S3 and
# install the local R packages
aws s3 cp $S3_MODEL_DATA_PATH model_data.tar.gz
mkdir model_data
tar -xvzf model_data.tar.gz -C model_data
cd model_data
Rscript local_install.R

# Initialize dvc and run the pipeline to re-create the
# dvc target
dvc init --no-scm
dvc repro $DVC_TARGET

DVC_OUTPUTS_ARRAY=($DVC_OUTPUTS)
if [ -z "$AWS_BATCH_JOB_ARRAY_INDEX" ]; then
for output in "${DVC_OUTPUTS_ARRAY[@]}"
do
tar cv --use-compress-program=pbzip2 -f $output.tar.bz2 $output
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is compression required? We took that route initially but found that just a sync was sufficient and faster overall.

aws s3 cp $output.tar.bz2 $S3_RESULTS_PATH/
done
else
for output in "${DVC_OUTPUTS_ARRAY[@]}"
do
aws s3 cp --recursive $output $S3_RESULTS_PATH/$AWS_BATCH_JOB_ID/$output/
aws s3 sync $output $S3_RESULTS_PATH/$AWS_BATCH_JOB_ID/$output/ --delete
done
fi

echo "Done"
31 changes: 31 additions & 0 deletions run_dvc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

if [[ $# -eq 0 ]] ; then
echo 'Usage: run_dvc.sh <config_file.yml>'
exit 0
fi

CONFIG=$1

# First, make sure local R packages are up to date.
Rscript local_install.R

if [[ ! -f model_output.dvc ]]; then
dvc run \
-d data -d $CONFIG \
-o model_output -o model_parameters \
python3 simulate.py -c $CONFIG
git add model_output.dvc
git commit -m "Commit model_output.dvc from run"
fi

if [[ ! -f hospitalization.dvc ]]; then
dvc run \
-d model_output \
-o hospitalization \
Rscript R/scripts/hosp_run.R -c $CONFIG -p .
git add hospitalization.dvc
git commit -m "Commit hospitalization.dvc from run"
fi

echo "dvc run commands are saved; batch job is ready to be launched on AWS via batch/launch_job.py"