Skip to content

Commit

Permalink
Feature/one step save baseline (#193)
Browse files Browse the repository at this point in the history
This adds several features to the one-step pipeline

- big zarr. Everything is stored as one zarr file
- saves physics outputs
- some refactoring of the job submission.

Sample output: https://gist.github.com/nbren12/84536018dafef01ba5eac0354869fb67
  • Loading branch information
nbren12 authored Mar 26, 2020
1 parent 530ee87 commit f929f75
Show file tree
Hide file tree
Showing 15 changed files with 475 additions and 187 deletions.
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ tox.ini
vcm-ml-data
dataflow/*/env
external/vcm/venv
Dockerfile
Dockerfile
outdir/
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
#################################################################################
# GLOBALS #
#################################################################################

VERSION ?= v0.1.0
VERSION ?= v0.1.0-a1
ENVIRONMENT_SCRIPTS = .environment-scripts
PROJECT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
BUCKET = [OPTIONAL] your-bucket-for-syncing-data (do not include 's3://')
Expand Down
1 change: 1 addition & 0 deletions docker/prognostic_run/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ joblib
zarr
scikit-image
google-cloud-logging
gitpython
backoff
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ dependencies:
- pip:
- gsutil
- nc-time-axis>=1.2.0
- gitpython
- bump2version>=0.5.11
- yq
2 changes: 1 addition & 1 deletion external/fv3config
121 changes: 63 additions & 58 deletions fv3net/pipelines/kube_jobs/one_step.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
import fsspec
import pprint
from toolz import assoc
import uuid
import yaml
import re
from copy import deepcopy
from multiprocessing import Pool
from typing import List, Tuple
from typing import List, Dict

import fv3config
from . import utils
Expand Down Expand Up @@ -197,20 +199,16 @@ def _update_config(
workflow_name: str,
base_config_version: str,
user_model_config: dict,
user_kubernetes_config: dict,
input_url: str,
config_url: str,
timestep: str,
) -> Tuple[dict]:
) -> Dict:
"""
Update kubernetes and fv3 configurations with user inputs
to prepare for fv3gfs one-step runs.
"""
base_model_config = utils.get_base_fv3config(base_config_version)
model_config = utils.update_nested_dict(base_model_config, user_model_config)
kubernetes_config = utils.update_nested_dict(
deepcopy(KUBERNETES_CONFIG_DEFAULT), user_kubernetes_config
)

model_config = fv3config.enable_restart(model_config)
model_config["experiment_name"] = _get_experiment_name(workflow_name, timestep)
Expand All @@ -225,27 +223,19 @@ def _update_config(
}
)

return model_config, kubernetes_config
return model_config


def _upload_config_files(
model_config: dict,
kubernetes_config: dict,
config_url: str,
local_vertical_grid_file=None,
local_vertical_grid_file,
upload_config_filename="fv3config.yml",
) -> Tuple[dict]:
) -> str:
"""
Upload any files to remote paths necessary for fv3config and the
fv3gfs one-step runs.
"""

if "runfile" in kubernetes_config:
runfile_path = kubernetes_config["runfile"]
kubernetes_config["runfile"] = utils.transfer_local_to_remote(
runfile_path, config_url
)

model_config["diag_table"] = utils.transfer_local_to_remote(
model_config["diag_table"], config_url
)
Expand All @@ -259,38 +249,22 @@ def _upload_config_files(
with fsspec.open(config_path, "w") as config_file:
config_file.write(yaml.dump(model_config))

return model_config, kubernetes_config
return config_path


def prepare_and_upload_config(
workflow_name: str,
input_url: str,
config_url: str,
timestep: str,
one_step_config: dict,
base_config_version: str,
**kwargs,
) -> Tuple[dict]:
"""Update model and kubernetes configurations for this particular
timestep and upload necessary files to GCS"""

user_model_config = one_step_config["fv3config"]
user_kubernetes_config = one_step_config["kubernetes"]

model_config, kube_config = _update_config(
workflow_name,
base_config_version,
user_model_config,
user_kubernetes_config,
input_url,
config_url,
timestep,
)
model_config, kube_config = _upload_config_files(
model_config, kube_config, config_url, **kwargs
def get_run_kubernetes_kwargs(user_kubernetes_config, config_url):

kubernetes_config = utils.update_nested_dict(
deepcopy(KUBERNETES_CONFIG_DEFAULT), user_kubernetes_config
)

return model_config, kube_config
if "runfile" in kubernetes_config:
runfile_path = kubernetes_config["runfile"]
kubernetes_config["runfile"] = utils.transfer_local_to_remote(
runfile_path, config_url
)

return kubernetes_config


def submit_jobs(
Expand All @@ -305,28 +279,59 @@ def submit_jobs(
local_vertical_grid_file=None,
) -> None:
"""Submit one-step job for all timesteps in timestep_list"""
for timestep in timestep_list:

zarr_url = os.path.join(output_url, "big.zarr")

logger.info("Working on one-step jobs with arguments:")
logger.info(pprint.pformat(locals()))
# kube kwargs are shared by all jobs
kube_kwargs = get_run_kubernetes_kwargs(one_step_config["kubernetes"], config_url)

def config_factory(**kwargs):
timestep = timestep_list[kwargs["index"]]
curr_input_url = os.path.join(input_url, timestep)
curr_output_url = os.path.join(output_url, timestep)
curr_config_url = os.path.join(config_url, timestep)

model_config, kube_config = prepare_and_upload_config(
config = deepcopy(one_step_config)
kwargs["url"] = zarr_url
config["fv3config"]["one_step"] = kwargs

model_config = _update_config(
workflow_name,
base_config_version,
config["fv3config"],
curr_input_url,
curr_config_url,
timestep,
one_step_config,
base_config_version,
local_vertical_grid_file=local_vertical_grid_file,
)
return _upload_config_files(
model_config, curr_config_url, local_vertical_grid_file
)

def run_job(wait=False, **kwargs):
"""Run a run_kubernetes job
kwargs are passed workflows/one_step_jobs/runfile.py:post_process
"""
uid = str(uuid.uuid4())
labels = assoc(job_labels, "jobid", uid)
model_config_url = config_factory(**kwargs)

jobname = model_config["experiment_name"]
kube_config["jobname"] = jobname
# the one step workflow doesn't need to upload its run directories any longer
# since all the data is in the big zarr. Setting outdir to a pod-local path
# avoids this unecessary upload step.
local_tmp_dir = "/tmp/null"
fv3config.run_kubernetes(
os.path.join(curr_config_url, "fv3config.yml"),
curr_output_url,
job_labels=job_labels,
**kube_config,
model_config_url, local_tmp_dir, job_labels=labels, **kube_kwargs
)
logger.info(f"Submitted job {jobname}")
if wait:
utils.wait_for_complete(job_labels, sleep_interval=10)

for k, timestep in enumerate(timestep_list):
if k == 0:
logger.info("Running the first time step to initialize the zarr store")
run_job(index=k, init=True, wait=True, timesteps=timestep_list)
else:
logger.info(f"Submitting job for timestep {timestep}")
run_job(index=k, init=False)
2 changes: 1 addition & 1 deletion fv3net/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import sklearn_interface as sklearn
from .state_io import init_writers, append_to_writers, CF_TO_RESTART_MAP
from .config import get_runfile_config, get_namelist
from .config import get_namelist, get_config
7 changes: 4 additions & 3 deletions fv3net/runtime/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Dict
import yaml
import f90nml

Expand All @@ -10,11 +11,11 @@ class dotdict(dict):
__delattr__ = dict.__delitem__


def get_runfile_config():
def get_config() -> Dict:
with open("fv3config.yml") as f:
config = yaml.safe_load(f)
return dotdict(config["scikit_learn"])
return config


def get_namelist():
def get_namelist() -> f90nml.Namelist:
return f90nml.read("input.nml")
5 changes: 3 additions & 2 deletions workflows/end_to_end/full-workflow-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ experiment:
restart_data:
from: coarsen_restarts
experiment_yaml: ./workflows/one_step_jobs/all-physics-off.yml
docker_image: us.gcr.io/vcm-ml/prognostic-run-orchestration

docker_image: us.gcr.io/vcm-ml/prognostic_run:v0.1.0-a1
--config-version: v0.3

create_training_data:
command: python -m fv3net.pipelines.create_training_data
args:
Expand Down
66 changes: 37 additions & 29 deletions workflows/one_step_jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,36 @@ microphysics)
Both of these configurations use a one-minute timestep with no dynamics substepping and
have a total duration of 15 minutes.

Workflow call signature:
```
$ python submit_jobs.py -h
usage: submit_jobs.py [-h] INPUT_URL ONE_STEP_YAML OUTPUT_URL [--n-steps N_STEPS] [-o]
-h, --help show this help message and exit
INPUT_URL Remote url to initial conditions. Initial conditions
are assumed to be stored as INPUT_URL/{timestamp}/{tim
estamp}.{restart_category}.tile*.nc
ONE_STEP_YAML Path to local run configuration yaml.
DOCKER_IMAGE fv3gfs-python model docker image.
OUTPUT_URL Remote url where model configuration and output will
be saved. Specifically, configuration files will be
saved to OUTPUT_URL/one_step_config and model output
to OUTPUT_URL/one_step_output
--n-steps N_STEPS Number of timesteps to process. By default all
timesteps found in INPUT_URL for which successful runs
do not exist in OUTPUT_URL will be processed. Useful
for testing.
-o, --overwrite Overwrite successful timesteps in OUTPUT_URL.
--init-frequency INIT_FREQUENCY
Frequency (in minutes) to initialize one-step jobs
starting from the first available timestep.
--config-version CONFIG_VERSION
Default fv3config.yml version to use as the base
configuration. This should be consistent with the
fv3gfs-python version in the specified docker image.
Defaults to fv3gfs-python v0.2 style configuration.
This workflow can be submitted with the [orchestrate_submit_jobs.py] script.
This script is self-documenting and its help can be seen by running:

python orchestrate_submit_jobs.py -h


# Minimal example

Here is a minimal exmaple for how to run this script on a limited set of sample images.

```sh
workdir=$(pwd)
src=gs://vcm-ml-data/orchestration-testing/test-andrep/coarsen_restarts_source-resolution_384_target-resolution_48/
output=gs://vcm-ml-data/testing-noah/one-step
VERSION=<image version>
image=us.gcr.io/vcm-ml/prognostic_run:$VERSION
yaml=$PWD/deep-conv-off.yml

gsutil -m rm -r $output > /dev/null
(
cd ../../
python $workdir/orchestrate_submit_jobs.py \
$src $yaml $image $output -o \
--config-version v0.3
)

```


### Kubernetes VM access troubleshooting
# Kubernetes VM access troubleshooting

To process many (> around 40) runs at once, it is recommended to submit this workflow
from a VM authorized with a service account. Users have had issues with API request errors
Expand All @@ -64,3 +62,13 @@ Use the following command to view your current configuration. It should point to
```
kubectl config view
```

# Out of Memory errors

The one step jobs can be fail with OOMKilled errors if too many dask workers
are used. These errors can typically be avoided by using the single-threaded
dask scheduler. You can enable for this debugging purposes by adding the
following lines to the top of [runfile.py](./runfile.py):

import dask
dask.config.set(scheduler='single-threaded')
8 changes: 7 additions & 1 deletion workflows/one_step_jobs/orchestrate_submit_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
PWD = os.path.dirname(os.path.abspath(__file__))
CONFIG_DIRECTORY_NAME = "one_step_config"

RUNFILE = os.path.join(PWD, "runfile.py")


def _create_arg_parser():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -83,7 +85,10 @@ def _create_arg_parser():
one_step_config = yaml.load(file, Loader=yaml.FullLoader)
workflow_name = Path(args.one_step_yaml).with_suffix("").name
short_id = get_alphanumeric_unique_tag(8)
job_label = {"orchestrator-jobs": f"{workflow_name}-{short_id}"}
job_label = {
"orchestrator-jobs": f"{workflow_name}-{short_id}",
"workflow": "one_step_jobs",
}

if not args.config_url:
config_url = os.path.join(args.output_url, CONFIG_DIRECTORY_NAME)
Expand All @@ -98,6 +103,7 @@ def _create_arg_parser():
subsample_frequency=args.init_frequency,
)

one_step_config["kubernetes"]["runfile"] = RUNFILE
one_step_config["kubernetes"]["docker_image"] = args.docker_image

local_vgrid_file = os.path.join(PWD, one_step.VERTICAL_GRID_FILENAME)
Expand Down
Loading

0 comments on commit f929f75

Please sign in to comment.