Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

WIP for launching batch runs on AWS directly from COVIDScenarioPipeline #223

Closed
wants to merge 12 commits into from
103 changes: 103 additions & 0 deletions batch/job_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python

import boto3
import click
import glob
import os
import re
import tarfile
import time
import yaml

@click.command()
@click.option("-p", "--job-prefix", type=str, required=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: intelligent defaults to reduce the number of required params would be nice. e.g., job-prefix is name in config file plus a uniqifer, num-jobs is calculated based on # of scenarios/# of sims with some cap, etc.

help="A short but descriptive string to use as an identifier for the job run")
@click.option("-c", "--config", "config_file", envvar="CONFIG_PATH", type=click.Path(exists=True), required=True,
help="configuration file for this run")
@click.option("-j", "--num-jobs", "num_jobs", type=click.IntRange(min=1), required=True,
help="total number of jobs to run in this batch")
@click.option("-s", "--sims-per-job", "sims_per_job", type=click.IntRange(min=1), required=True,
help="how many sims each job should run")
@click.option("-t", "--dvc-target", "dvc_target", type=click.Path(exists=True), required=True,
help="name of the .dvc file that is the last step in the pipeline")
@click.option("-i", "--s3-input-bucket", "s3_input_bucket", type=str, default="idd-input-data-sets")
@click.option("-o", "--s3-output-bucket", "s3_output_bucket", type=str, default="idd-pipeline-results")
@click.option("-d", "--job-definition", "batch_job_definition", type=str, default="Batch-CovidPipeline-Job")
@click.option("-q", "--job-queue", "batch_job_queue", type=str, default="Batch-CovidPipeline")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for these params, i think you want show_default=True set so when you do a --help you'll see what the default value is

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also missing helpstrings :)

def launch_batch(job_prefix, config_file, num_jobs, sims_per_job, dvc_target, s3_input_bucket, s3_output_bucket, batch_job_definition, batch_job_queue):

# A unique name for this job run, based on the job prefix and current time
job_name = "%s-%d" % (job_prefix, int(time.time()))
print("Preparing to run job: %s" % job_name)

# Update and save the config file with the number of sims to run
print("Updating config file %s to run %d simulations..." % (config_file, sims_per_job))
config = open(config_file).read()
config = re.sub("nsimulations: \d+", "nsimulations: %d" % sims_per_job, config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would use pyyaml or the utils.config module to read-modify-write versus regex, this seems error-prone

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I did at first, but the output was a total re-ordering of the entries in the config file, which seemed not great; I wonder if there's a way to keep the ordering of the entries consistent on the read-edit-write cycle?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to this thread, you need to set sort_keys=False on the dump. Trying it locally, this preserves key order in the file, but strips comments and whitespace.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we would be okay with that, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i think it's fine

with open(config_file, "w") as f:
f.write(config)

# Prepare to tar up the current directory, excluding any dvc outputs, so it
# can be shipped to S3
dvc_outputs = get_dvc_outputs()
tarfile_name = "%s.tar.gz" % job_name
tar = tarfile.open(tarfile_name, "w:gz")
for p in os.listdir('.'):
if not (p.startswith(".") or p.endswith("tar.gz") or p in dvc_outputs or p == "batch"):
tar.add(p)
tar.close()

# Upload the tar'd contents of this directory and the runner script to S3
runner_script_name = "%s-runner.sh" % job_name
s3_client = boto3.client('s3')
s3_client.upload_file("batch/runner.sh", s3_input_bucket, runner_script_name)
s3_client.upload_file(tarfile_name, s3_input_bucket, tarfile_name)
os.remove(tarfile_name)

# Prepare and launch the num_jobs via AWS Batch.
model_data_path = "s3://%s/%s" % (s3_input_bucket, tarfile_name)
results_path = "s3://%s/%s" % (s3_output_bucket, job_name)
env_vars = [
{"name": "CONFIG_PATH", "value": config_file},
{"name": "S3_MODEL_DATA_PATH", "value": model_data_path},
{"name": "DVC_TARGET", "value": dvc_target},
{"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)},
{"name": "S3_RESULTS_PATH", "value": results_path}
]
s3_cp_run_script = "aws s3 cp s3://%s/%s $PWD/run-covid-pipeline" % (s3_input_bucket, runner_script_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(aside, don't know if you know about fstrings in python 3.6+, they make subbing variables much easier/neater)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not really; will take a look!

command = ["sh", "-c", "%s; /bin/bash $PWD/run-covid-pipeline" % s3_cp_run_script]
container_overrides = {
'vcpus': 72,
'memory': 184000,
'environment': env_vars,
'command': command
}

batch_client = boto3.client('batch')
if num_jobs > 1:
resp = batch_client.submit_job(
jobName=job_name,
jobQueue=batch_job_queue,
arrayProperties={'size': num_jobs},
jobDefinition=batch_job_definition,
containerOverrides=container_overrides)
else:
resp = batch_client.submit_job(
jobName=job_name,
jobQueue=batch_job_queue,
jobDefinition=batch_job_definition,
containerOverrides=container_overrides)


def get_dvc_outputs():
ret = []
for dvc_file in glob.glob("*.dvc"):
with open(dvc_file) as df:
d = yaml.load(df, Loader=yaml.FullLoader)
if 'cmd' in d and 'outs' in d:
ret.extend([x['path'] for x in d['outs']])
return ret


if __name__ == '__main__':
launch_batch()
55 changes: 55 additions & 0 deletions batch/runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash

# Expected environment variables from AWS Batch env
# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run
# DVC_TARGET the name of the dvc file in the model that should be reproduced locally.
# DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space
# S3_RESULTS_PATH location in S3 to store the results

# setup the python environment
HOME=/home/app
PYENV_ROOT=$HOME/.pyenv
PYTHON_VERSION=3.7.6
PYTHON_VENV_DIR=$HOME/python_venv
PATH=$PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH
. $PYTHON_VENV_DIR/bin/activate

# set optimized S3 configuration
aws configure set default.s3.max_concurrent_requests 100
aws configure set default.s3.max_queue_size 100
aws configure set default.s3.multipart_threshold 8MB
aws configure set default.s3.multipart_chunksize 8MB

# Copy the complete model + data package from S3 and
# install the local R packages
aws s3 cp $S3_MODEL_DATA_PATH model_data.tar.gz
mkdir model_data
tar -xvzf model_data.tar.gz -C model_data
cd model_data
Rscript local_install.R

# Initialize dvc and run the pipeline to re-create the
# dvc target
dvc init --no-scm
dvc repro $DVC_TARGET

DVC_OUTPUTS_ARRAY=($DVC_OUTPUTS)
if [ -z "$AWS_BATCH_JOB_ARRAY_INDEX" ]; then
echo "Compressing and uploading outputs from singleton batch job"
for output in "${DVC_OUTPUTS_ARRAY[@]}"
do
"Saving output $output"
tar cv --use-compress-program=pbzip2 -f $output.tar.bz2 $output
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is compression required? We took that route initially but found that just a sync was sufficient and faster overall.

aws s3 cp $output.tar.bz2 $S3_RESULTS_PATH/
done
else
echo "Saving outputs from array batch job"
for output in "${DVC_OUTPUTS_ARRAY[@]}"
do
echo "Saving output $output"
aws s3 cp --recursive $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/
aws s3 sync $output $S3_RESULTS_PATH/$output-$AWS_BATCH_JOB_ID/ --delete
done
fi

echo "Done"