Skip to content

Commit

Permalink
Merge pull request #196 from shellshocked2003/frontera
Browse files Browse the repository at this point in the history
Frontera
  • Loading branch information
dimtsap authored Feb 22, 2023
2 parents 1bfec7c + 0fcdb28 commit 291bd55
Show file tree
Hide file tree
Showing 39 changed files with 438 additions and 1,026 deletions.
25 changes: 25 additions & 0 deletions docs/code/RunModel/ClusterScript_Example/add_numbers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import sys
import os
import json
import numpy as np

def addNumbers():
inputPath = sys.argv[1]
outputPath = sys.argv[2]

# Open JSON file
with open(inputPath, "r") as jsonFile:
data = json.load(jsonFile)

# Read generated numbers
number1 = data["number1"]
number2 = data["number2"]

randomAddition = number1 + number2

# Write addition to file
with open(outputPath, 'w') as outputFile:
outputFile.write('{}\n'.format(randomAddition))

if __name__ == '__main__':
addNumbers()
18 changes: 18 additions & 0 deletions docs/code/RunModel/ClusterScript_Example/addition_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import shutil
import fire

def runAddition(index):
index = int(index)

inputRealizationPath = os.path.join(os.getcwd(), 'run_' + str(index), 'InputFiles', 'inputRealization_' \
+ str(index) + ".json")
outputPath = os.path.join(os.getcwd(), 'OutputFiles')

# This is where pre-processing commands would be executed prior to running the cluster script.
command1 = ("echo \"This is where pre-processing would be happening\"")

os.system(command1)

if __name__ == '__main__':
fire.Fire(runAddition)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"number1" : <var_1>,
"number2" : <var_2>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import numpy as np
from pathlib import Path

class OutputProcessor:

def __init__(self, index):
filePath = Path("./OutputFiles/qoiFile_" + str(index) + ".txt")
self.numberOfColumns = 0
self.numberOfLines = 0
addedNumbers = []

# Check if file exists
if filePath.is_file():
# Now, open and read data
with open(filePath) as f:
for line in f:
currentLine = line.split()

if len(currentLine) != 0:
addedNumbers.append(currentLine[:])

if len(addedNumbers) != 0:
self.qoi = np.vstack(addedNumbers)
else:
self.qoi = np.empty(shape=(0,0))
57 changes: 57 additions & 0 deletions docs/code/RunModel/ClusterScript_Example/run_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

# NOTE: The job configuration etc. would be in the batch script that launches
# your python script that uses UQpy. This script would then utilize those
# resources by using the appropriate commands here to launch parallel jobs. For
# example, TACC uses slurm and ibrun, so you would launch your python script in
# the slurm batch script and then use ibrun here to tile parallel runs.

# This function is where you can define all the parts of a single
taskFunction(){
coresPerProc=$1
runNumber=$2
host=$3

let offset=$coresPerProc*$runNumber # Sometimes, this might be necessary to pass as an argument to launch jobs. Not used here.

cd run_$runNumber
# Here, we launch a parallel job. The example uses multiple cores to add numbers,
# which is somewhat pointless. This is just to illustrate the process for how tiled
# parallel jobs are launched and where MPI-capable applications would be initiated
mkdir -p ./OutputFiles
mpirun -n $coresPerProc --host $host:$coresPerProc python3 ../add_numbers.py ./InputFiles/inputRealization_$runNumber.json ./OutputFiles/qoiFile_$runNumber.txt
cd ..
}

# Get list of hosts
echo $SLURM_NODELIST > hostfile

# Split by comma
IFS="," read -ra HOSTS < hostfile

# This is the loop that launches taskFunction in parallel
coresPerProcess=$1
numberOfJobs=$2
# This number will vary depending on the number of cores per node. In this case, it is 32.
N=32

echo
echo "Starting parallel job launch"

declare -i index=0

for i in $(seq 0 $((numberOfJobs-1)))
do
# Launch task function and put into the background
echo "Launching job number ${i} on ${HOSTS[$index]}"
taskFunction $coresPerProcess $i ${HOSTS[$index]}&

# Increment host when all nodes allocated on current node
if !((${i}%N)) && [ $i -ne 0 ]
then
index=${index}+1
fi
done

wait # This wait call is necessary so that loop above completes before script returns
echo "Analyses done!"
78 changes: 78 additions & 0 deletions docs/code/RunModel/cluster_script_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Cluster Script Example for Third-party
======================================
"""

# %% md
#
# In this case, we're just running a simple addition of random numbers, but
# the process is exactly the same for more complicated workflows. The pre-
# and post-processing is done through `model_script` and `output_script`
# respectively, while the computationally intensive portion of the workflow
# is launched in `cluster_script. The example below provides a minimal framework
# from which more complex cases can be constructed.
#
# Import the necessary libraries

# %%
from UQpy.sampling import LatinHypercubeSampling
from UQpy.run_model.RunModel import RunModel
from UQpy.run_model.model_execution.ThirdPartyModel import ThirdPartyModel
from UQpy.distributions import Uniform
import numpy as np
import time
import csv

# %% md
#
# Define the distribution objects.

# %%

var_names=["var_1", "var_2"]
distributions = [Uniform(250.0, 40.0), Uniform(66.0, 24.0)]

# %% md
#
# Draw the samples using Latin Hypercube Sampling.

# %%

x_lhs = LatinHypercubeSampling(distributions, nsamples=64)

# %% md
#
# Run the model.

# %%

model = ThirdPartyModel(var_names=var_names, input_template='inputRealization.json', model_script='addition_run.py',
output_script='process_addition_output.py', output_object_name='OutputProcessor',
model_dir='AdditionRuns')

t = time.time()
modelRunner = RunModel(model=model, samples=x_lhs.samples, ntasks=1,
cores_per_task=2, nodes=1, resume=False,
run_type='CLUSTER', cluster_script='./run_script.sh')

t_total = time.time() - t
print("\nTotal time for all experiments:")
print(t_total, "\n")

# %% md
#
# Print model results--this is just for illustration

# %%
for index, experiment in enumerate(modelRunner.qoi_list, 0):
if len(experiment.qoi) != 0:
for item in experiment.qoi:
print("These are the random numbers for sample {}:".format(index))
for sample in x_lhs.samples[index]:
print("{}\t".format(sample))

print("This is their sum:")
for result in item:
print("{}\t".format(result))
print()
15 changes: 15 additions & 0 deletions docs/source/runmodel_doc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ at `https://www.open-mpi.org/faq/?category=building <https://www.open-mpi.org/fa

3. Output processing in the parallel case is performed after every individual run.

Parallel Cluster Execution
^^^^^^^^^^^^^^^^^^^^^^^^^^
The :class:`.RunModel` class also supports launching jobs in parallel on HPC clusters. The setup for this execution model is the same for both the Python and
third-party model workflows; however, the user is also able to provide a cluster-specific script that launches the most computationally intensive portions
of the simulation using cluster and scheduler specific commands. The pre- and post-processing can be done outside of this cluster-specific portion of the workflow. In order to enable cluster execution, :class:`.RunModel` must be configured as follows:

1. The execution mode most be configured to parallel by setting any or all of ``ntasks``, ``cores_per_task``, or ``nodes`` to a value greater than 1.

2. The ``RunType`` must be input as ``CLUSTER``

3. The user must provide the ``cluster_script`` input

With this configuration, :class:`RunModel` will launch the computationally intensive portion of the workflow as specified in the ``cluster_script``. The ``cores_per_task``, ``n_new_simulations``, and ``n_existing_simulations`` are passed as command line arguments to
the ``cluster_script`` when it is launched, so it is the user's responsibility to use these inputs in the provided cluster script to configure the simulations for the specific HPC cluster and scheduler setup.

Directory Structure During Third-Party Model Evaluation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ pytest == 6.1.2
coverage == 5.3
pytest-cov == 2.10.1
pylint == 2.6.0
wheel == 0.36.2
wheel == 0.38.1
pytest-azurepipelines == 0.8.0
twine == 3.4.1
pathlib~=1.0.1
beartype ==0.9.1
setuptools~=58.0.4
setuptools~=65.5.1
2 changes: 1 addition & 1 deletion src/UQpy/reliability/taylor_series/FORM.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def run(self, seed_x: Union[list, np.ndarray] = None,
else:
k = k + 1

self.logger.error("Error: %s", error_record[-1])
self.logger.info("Error: %s", error_record[-1])

if converged is True or k > self.n_iterations:
break
Expand Down
31 changes: 26 additions & 5 deletions src/UQpy/run_model/RunModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@

import numpy as np
from beartype import beartype
from enum import Enum, auto

from UQpy.utilities.ValidationTypes import NumpyFloatArray

class RunType(Enum):
LOCAL = auto()
CLUSTER = auto()


class RunModel:
# Authors:
# B.S. Aakash, Lohit Vandanapu, Michael D.Shields
# B.S. Aakash, Lohit Vandanapu, Michael D.Shields, Michael H. Gardner
#
# Last
# modified: 5 / 8 / 2020 by Michael D.Shields
# modified: 8 / 31 / 2022 by Michael H. Gardner
@beartype
def __init__(
self,
Expand All @@ -41,6 +46,8 @@ def __init__(
cores_per_task: int = 1,
nodes: int = 1,
resume: bool = False,
run_type: str = 'LOCAL',
cluster_script: str = None
):
"""
Run a computational model at specified sample points.
Expand Down Expand Up @@ -85,6 +92,9 @@ def __init__(
self.model = model
# Save option for resuming parallel execution
self.resume = resume
# Set location for model runs
self.run_type = RunType[run_type]
self.cluster_script = cluster_script

self.nodes = nodes
self.ntasks = ntasks
Expand Down Expand Up @@ -189,9 +199,20 @@ def parallel_execution(self):
pickle.dump(self.model, filehandle)
with open('samples.pkl', 'wb') as filehandle:
pickle.dump(self.samples, filehandle)
os.system(f"mpirun python -m "
f"UQpy.run_model.model_execution.ParallelExecution {self.n_existing_simulations} "
f"{self.n_new_simulations}")

if self.run_type is RunType.LOCAL:
os.system(f"mpirun python -m "
f"UQpy.run_model.model_execution.ParallelExecution {self.n_existing_simulations} "
f"{self.n_new_simulations}")

elif self.run_type is RunType.CLUSTER:
if self.cluster_script is None:
raise ValueError("\nUQpy: User-provided slurm script not input, please provide this input\n")
os.system(f"python -m UQpy.run_model.model_execution.ClusterExecution {self.cores_per_task} "
f"{self.n_new_simulations} {self.n_existing_simulations} {self.cluster_script}")
else:
raise ValueError("\nUQpy: RunType is not in currently supported list of cluster types\n")

with open('qoi.pkl', 'rb') as filehandle:
results = pickle.load(filehandle)

Expand Down
69 changes: 69 additions & 0 deletions src/UQpy/run_model/model_execution/ClusterExecution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# pragma: no cover
from __future__ import print_function

import math
import sys

import numpy as np
import os
import pickle

try:
model = None
samples = None
samples_per_process = 0
samples_shape = None
samples_list = None
ranges_list = None
local_ranges = None
local_samples = None

cores_per_task = int(sys.argv[1])
n_new_simulations = int(sys.argv[2])
n_existing_simulations = int(sys.argv[3])
cluster_script = str(sys.argv[4])

with open('model.pkl', 'rb') as filehandle:
model = pickle.load(filehandle)

with open('samples.pkl', 'rb') as filehandle:
samples = pickle.load(filehandle)

# Loop over the number of samples and create input files in a folder in current directory
for i in range(len(samples)):
new_text = model._find_and_replace_var_names_with_values(samples[i])
folder_to_write = 'run_' + str(i+n_existing_simulations) + '/InputFiles'
# Write the new text to the input file
model._create_input_files(file_name=model.input_template, num=i+n_existing_simulations,
text=new_text, new_folder=folder_to_write)

# Use model script to perform necessary preprocessing prior to model execution
for i in range(len(samples)):
sample = 'sample' # Sample input in original third-party model, though doesn't seem to use it
model.execute_single_sample(i, sample)

# Run user-provided cluster script--for now, it is assumed the user knows how to
# tile jobs in the script
os.system(f"{cluster_script} {cores_per_task} {n_new_simulations} {n_existing_simulations}")

results = []

for i in range(len(samples)):
# Change current working directory to model run directory
work_dir = os.path.join(model.model_dir, "run_" + str(i))
# if model.verbose:
# print('\nUQpy: Changing to the following directory for output processing:\n' + work_dir)
os.chdir(work_dir)

output = model._output_serial(i)
results.append(output)

# Change back to model directory
os.chdir(model.model_dir)

with open('qoi.pkl', 'wb') as filehandle:
pickle.dump(results, filehandle)

except Exception as e:
print(e)

Loading

0 comments on commit 291bd55

Please sign in to comment.