diff --git a/.dockerignore b/.dockerignore index dc16f10421..d09c3e155d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -33,4 +33,5 @@ tox.ini vcm-ml-data dataflow/*/env external/vcm/venv -Dockerfile \ No newline at end of file +Dockerfile +outdir/ diff --git a/Makefile b/Makefile index 03648b5493..bd904fd589 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,7 @@ ################################################################################# # GLOBALS # ################################################################################# - -VERSION ?= v0.1.0 +VERSION ?= v0.1.0-a1 ENVIRONMENT_SCRIPTS = .environment-scripts PROJECT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) BUCKET = [OPTIONAL] your-bucket-for-syncing-data (do not include 's3://') diff --git a/docker/prognostic_run/requirements.txt b/docker/prognostic_run/requirements.txt index 629b2e3787..4be900d590 100644 --- a/docker/prognostic_run/requirements.txt +++ b/docker/prognostic_run/requirements.txt @@ -4,4 +4,5 @@ joblib zarr scikit-image google-cloud-logging +gitpython backoff diff --git a/environment.yml b/environment.yml index b0dbed2127..3fa5a34004 100644 --- a/environment.yml +++ b/environment.yml @@ -49,5 +49,6 @@ dependencies: - pip: - gsutil - nc-time-axis>=1.2.0 + - gitpython - bump2version>=0.5.11 - yq diff --git a/external/fv3config b/external/fv3config index 6bde7b7354..bb1c1fa607 160000 --- a/external/fv3config +++ b/external/fv3config @@ -1 +1 @@ -Subproject commit 6bde7b7354fa3c4c512a7178ee9b55b45189d76e +Subproject commit bb1c1fa6079e5bb08708071f7a642a1da614bc34 diff --git a/fv3net/pipelines/kube_jobs/one_step.py b/fv3net/pipelines/kube_jobs/one_step.py index 3c6484f8a0..b1a65f24dd 100644 --- a/fv3net/pipelines/kube_jobs/one_step.py +++ b/fv3net/pipelines/kube_jobs/one_step.py @@ -1,12 +1,14 @@ import logging import os import fsspec +import pprint +from toolz import assoc import uuid import yaml import re from copy import deepcopy from multiprocessing import Pool -from typing import List, Tuple +from typing import List, Dict import fv3config from . import utils @@ -197,20 +199,16 @@ def _update_config( workflow_name: str, base_config_version: str, user_model_config: dict, - user_kubernetes_config: dict, input_url: str, config_url: str, timestep: str, -) -> Tuple[dict]: +) -> Dict: """ Update kubernetes and fv3 configurations with user inputs to prepare for fv3gfs one-step runs. """ base_model_config = utils.get_base_fv3config(base_config_version) model_config = utils.update_nested_dict(base_model_config, user_model_config) - kubernetes_config = utils.update_nested_dict( - deepcopy(KUBERNETES_CONFIG_DEFAULT), user_kubernetes_config - ) model_config = fv3config.enable_restart(model_config) model_config["experiment_name"] = _get_experiment_name(workflow_name, timestep) @@ -225,27 +223,19 @@ def _update_config( } ) - return model_config, kubernetes_config + return model_config def _upload_config_files( model_config: dict, - kubernetes_config: dict, config_url: str, - local_vertical_grid_file=None, + local_vertical_grid_file, upload_config_filename="fv3config.yml", -) -> Tuple[dict]: +) -> str: """ Upload any files to remote paths necessary for fv3config and the fv3gfs one-step runs. """ - - if "runfile" in kubernetes_config: - runfile_path = kubernetes_config["runfile"] - kubernetes_config["runfile"] = utils.transfer_local_to_remote( - runfile_path, config_url - ) - model_config["diag_table"] = utils.transfer_local_to_remote( model_config["diag_table"], config_url ) @@ -259,38 +249,22 @@ def _upload_config_files( with fsspec.open(config_path, "w") as config_file: config_file.write(yaml.dump(model_config)) - return model_config, kubernetes_config + return config_path -def prepare_and_upload_config( - workflow_name: str, - input_url: str, - config_url: str, - timestep: str, - one_step_config: dict, - base_config_version: str, - **kwargs, -) -> Tuple[dict]: - """Update model and kubernetes configurations for this particular - timestep and upload necessary files to GCS""" - - user_model_config = one_step_config["fv3config"] - user_kubernetes_config = one_step_config["kubernetes"] - - model_config, kube_config = _update_config( - workflow_name, - base_config_version, - user_model_config, - user_kubernetes_config, - input_url, - config_url, - timestep, - ) - model_config, kube_config = _upload_config_files( - model_config, kube_config, config_url, **kwargs +def get_run_kubernetes_kwargs(user_kubernetes_config, config_url): + + kubernetes_config = utils.update_nested_dict( + deepcopy(KUBERNETES_CONFIG_DEFAULT), user_kubernetes_config ) - return model_config, kube_config + if "runfile" in kubernetes_config: + runfile_path = kubernetes_config["runfile"] + kubernetes_config["runfile"] = utils.transfer_local_to_remote( + runfile_path, config_url + ) + + return kubernetes_config def submit_jobs( @@ -305,28 +279,59 @@ def submit_jobs( local_vertical_grid_file=None, ) -> None: """Submit one-step job for all timesteps in timestep_list""" - for timestep in timestep_list: + zarr_url = os.path.join(output_url, "big.zarr") + + logger.info("Working on one-step jobs with arguments:") + logger.info(pprint.pformat(locals())) + # kube kwargs are shared by all jobs + kube_kwargs = get_run_kubernetes_kwargs(one_step_config["kubernetes"], config_url) + + def config_factory(**kwargs): + timestep = timestep_list[kwargs["index"]] curr_input_url = os.path.join(input_url, timestep) - curr_output_url = os.path.join(output_url, timestep) curr_config_url = os.path.join(config_url, timestep) - model_config, kube_config = prepare_and_upload_config( + config = deepcopy(one_step_config) + kwargs["url"] = zarr_url + config["fv3config"]["one_step"] = kwargs + + model_config = _update_config( workflow_name, + base_config_version, + config["fv3config"], curr_input_url, curr_config_url, timestep, - one_step_config, - base_config_version, - local_vertical_grid_file=local_vertical_grid_file, ) + return _upload_config_files( + model_config, curr_config_url, local_vertical_grid_file + ) + + def run_job(wait=False, **kwargs): + """Run a run_kubernetes job + + kwargs are passed workflows/one_step_jobs/runfile.py:post_process + + """ + uid = str(uuid.uuid4()) + labels = assoc(job_labels, "jobid", uid) + model_config_url = config_factory(**kwargs) - jobname = model_config["experiment_name"] - kube_config["jobname"] = jobname + # the one step workflow doesn't need to upload its run directories any longer + # since all the data is in the big zarr. Setting outdir to a pod-local path + # avoids this unecessary upload step. + local_tmp_dir = "/tmp/null" fv3config.run_kubernetes( - os.path.join(curr_config_url, "fv3config.yml"), - curr_output_url, - job_labels=job_labels, - **kube_config, + model_config_url, local_tmp_dir, job_labels=labels, **kube_kwargs ) - logger.info(f"Submitted job {jobname}") + if wait: + utils.wait_for_complete(job_labels, sleep_interval=10) + + for k, timestep in enumerate(timestep_list): + if k == 0: + logger.info("Running the first time step to initialize the zarr store") + run_job(index=k, init=True, wait=True, timesteps=timestep_list) + else: + logger.info(f"Submitting job for timestep {timestep}") + run_job(index=k, init=False) diff --git a/fv3net/runtime/__init__.py b/fv3net/runtime/__init__.py index 5cb26a73c1..d8af81d7a0 100644 --- a/fv3net/runtime/__init__.py +++ b/fv3net/runtime/__init__.py @@ -1,3 +1,3 @@ from . import sklearn_interface as sklearn from .state_io import init_writers, append_to_writers, CF_TO_RESTART_MAP -from .config import get_runfile_config, get_namelist +from .config import get_namelist, get_config diff --git a/fv3net/runtime/config.py b/fv3net/runtime/config.py index 39fe1794be..b78f4faf17 100644 --- a/fv3net/runtime/config.py +++ b/fv3net/runtime/config.py @@ -1,3 +1,4 @@ +from typing import Dict import yaml import f90nml @@ -10,11 +11,11 @@ class dotdict(dict): __delattr__ = dict.__delitem__ -def get_runfile_config(): +def get_config() -> Dict: with open("fv3config.yml") as f: config = yaml.safe_load(f) - return dotdict(config["scikit_learn"]) + return config -def get_namelist(): +def get_namelist() -> f90nml.Namelist: return f90nml.read("input.nml") diff --git a/workflows/end_to_end/full-workflow-config.yaml b/workflows/end_to_end/full-workflow-config.yaml index b0ed9717b8..9959eb7de0 100644 --- a/workflows/end_to_end/full-workflow-config.yaml +++ b/workflows/end_to_end/full-workflow-config.yaml @@ -38,8 +38,9 @@ experiment: restart_data: from: coarsen_restarts experiment_yaml: ./workflows/one_step_jobs/all-physics-off.yml - docker_image: us.gcr.io/vcm-ml/prognostic-run-orchestration - + docker_image: us.gcr.io/vcm-ml/prognostic_run:v0.1.0-a1 + --config-version: v0.3 + create_training_data: command: python -m fv3net.pipelines.create_training_data args: diff --git a/workflows/one_step_jobs/README.md b/workflows/one_step_jobs/README.md index 8cef622d9f..2bcde6ca42 100644 --- a/workflows/one_step_jobs/README.md +++ b/workflows/one_step_jobs/README.md @@ -10,38 +10,36 @@ microphysics) Both of these configurations use a one-minute timestep with no dynamics substepping and have a total duration of 15 minutes. -Workflow call signature: -``` -$ python submit_jobs.py -h -usage: submit_jobs.py [-h] INPUT_URL ONE_STEP_YAML OUTPUT_URL [--n-steps N_STEPS] [-o] - - -h, --help show this help message and exit - INPUT_URL Remote url to initial conditions. Initial conditions - are assumed to be stored as INPUT_URL/{timestamp}/{tim - estamp}.{restart_category}.tile*.nc - ONE_STEP_YAML Path to local run configuration yaml. - DOCKER_IMAGE fv3gfs-python model docker image. - OUTPUT_URL Remote url where model configuration and output will - be saved. Specifically, configuration files will be - saved to OUTPUT_URL/one_step_config and model output - to OUTPUT_URL/one_step_output - --n-steps N_STEPS Number of timesteps to process. By default all - timesteps found in INPUT_URL for which successful runs - do not exist in OUTPUT_URL will be processed. Useful - for testing. - -o, --overwrite Overwrite successful timesteps in OUTPUT_URL. - --init-frequency INIT_FREQUENCY - Frequency (in minutes) to initialize one-step jobs - starting from the first available timestep. - --config-version CONFIG_VERSION - Default fv3config.yml version to use as the base - configuration. This should be consistent with the - fv3gfs-python version in the specified docker image. - Defaults to fv3gfs-python v0.2 style configuration. +This workflow can be submitted with the [orchestrate_submit_jobs.py] script. +This script is self-documenting and its help can be seen by running: + + python orchestrate_submit_jobs.py -h + + +# Minimal example + +Here is a minimal exmaple for how to run this script on a limited set of sample images. + +```sh +workdir=$(pwd) +src=gs://vcm-ml-data/orchestration-testing/test-andrep/coarsen_restarts_source-resolution_384_target-resolution_48/ +output=gs://vcm-ml-data/testing-noah/one-step +VERSION= +image=us.gcr.io/vcm-ml/prognostic_run:$VERSION +yaml=$PWD/deep-conv-off.yml + +gsutil -m rm -r $output > /dev/null + ( + cd ../../ + python $workdir/orchestrate_submit_jobs.py \ + $src $yaml $image $output -o \ + --config-version v0.3 + ) + ``` -### Kubernetes VM access troubleshooting +# Kubernetes VM access troubleshooting To process many (> around 40) runs at once, it is recommended to submit this workflow from a VM authorized with a service account. Users have had issues with API request errors @@ -64,3 +62,13 @@ Use the following command to view your current configuration. It should point to ``` kubectl config view ``` + +# Out of Memory errors + +The one step jobs can be fail with OOMKilled errors if too many dask workers +are used. These errors can typically be avoided by using the single-threaded +dask scheduler. You can enable for this debugging purposes by adding the +following lines to the top of [runfile.py](./runfile.py): + + import dask + dask.config.set(scheduler='single-threaded') diff --git a/workflows/one_step_jobs/orchestrate_submit_jobs.py b/workflows/one_step_jobs/orchestrate_submit_jobs.py index 421ea4b665..9b623252d5 100644 --- a/workflows/one_step_jobs/orchestrate_submit_jobs.py +++ b/workflows/one_step_jobs/orchestrate_submit_jobs.py @@ -11,6 +11,8 @@ PWD = os.path.dirname(os.path.abspath(__file__)) CONFIG_DIRECTORY_NAME = "one_step_config" +RUNFILE = os.path.join(PWD, "runfile.py") + def _create_arg_parser(): parser = argparse.ArgumentParser() @@ -83,7 +85,10 @@ def _create_arg_parser(): one_step_config = yaml.load(file, Loader=yaml.FullLoader) workflow_name = Path(args.one_step_yaml).with_suffix("").name short_id = get_alphanumeric_unique_tag(8) - job_label = {"orchestrator-jobs": f"{workflow_name}-{short_id}"} + job_label = { + "orchestrator-jobs": f"{workflow_name}-{short_id}", + "workflow": "one_step_jobs", + } if not args.config_url: config_url = os.path.join(args.output_url, CONFIG_DIRECTORY_NAME) @@ -98,6 +103,7 @@ def _create_arg_parser(): subsample_frequency=args.init_frequency, ) + one_step_config["kubernetes"]["runfile"] = RUNFILE one_step_config["kubernetes"]["docker_image"] = args.docker_image local_vgrid_file = os.path.join(PWD, one_step.VERTICAL_GRID_FILENAME) diff --git a/workflows/one_step_jobs/runfile.py b/workflows/one_step_jobs/runfile.py new file mode 100644 index 0000000000..4770e52185 --- /dev/null +++ b/workflows/one_step_jobs/runfile.py @@ -0,0 +1,283 @@ +import os +from typing import Sequence, Mapping, cast, Hashable +from fv3net import runtime +import logging +import time + +# avoid out of memory errors +# dask.config.set(scheduler='single-threaded') + +import fsspec +import zarr +import xarray as xr +import numpy as np + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +DELP = "pressure_thickness_of_atmospheric_layer" +TIME = "time" + +TRACERS = ( + "specific_humidity", + "cloud_water_mixing_ratio", + "rain_mixing_ratio", + "cloud_ice_mixing_ratio", + "snow_mixing_ratio", + "graupel_mixing_ratio", + "ozone_mixing_ratio", + "cloud_amount", +) + +VARIABLES = ( + "x_wind", + "y_wind", + "air_temperature", + "specific_humidity", + "pressure_thickness_of_atmospheric_layer", + "vertical_wind", + "vertical_thickness_of_atmospheric_layer", + "surface_geopotential", + "eastward_wind_at_surface", + "mean_cos_zenith_angle", + "sensible_heat_flux", + "latent_heat_flux", + "convective_cloud_fraction", + "convective_cloud_top_pressure", + "convective_cloud_bottom_pressure", + "land_sea_mask", + "surface_temperature", + "water_equivalent_of_accumulated_snow_depth", + "deep_soil_temperature", + "surface_roughness", + "mean_visible_albedo_with_strong_cosz_dependency", + "mean_visible_albedo_with_weak_cosz_dependency", + "mean_near_infrared_albedo_with_strong_cosz_dependency", + "mean_near_infrared_albedo_with_weak_cosz_dependency", + "fractional_coverage_with_strong_cosz_dependency", + "fractional_coverage_with_weak_cosz_dependency", + "vegetation_fraction", + "canopy_water", + "fm_at_10m", + "air_temperature_at_2m", + "specific_humidity_at_2m", + "vegetation_type", + "soil_type", + "friction_velocity", + "fm_parameter", + "fh_parameter", + "sea_ice_thickness", + "ice_fraction_over_open_water", + "surface_temperature_over_ice_fraction", + "total_precipitation", + "snow_rain_flag", + "snow_depth_water_equivalent", + "minimum_fractional_coverage_of_green_vegetation", + "maximum_fractional_coverage_of_green_vegetation", + "surface_slope_type", + "maximum_snow_albedo_in_fraction", + "snow_cover_in_fraction", + "soil_temperature", + "total_soil_moisture", + "liquid_soil_moisture", +) + TRACERS + +SFC_VARIABLES = ( + "DSWRFtoa", + "DSWRFsfc", + "USWRFtoa", + "USWRFsfc", + "DLWRFsfc", + "ULWRFtoa", + "ULWRFsfc", +) + + +def rename_sfc_dt_atmos(sfc: xr.Dataset) -> xr.Dataset: + DIMS = {"grid_xt": "x", "grid_yt": "y", "time": "forecast_time"} + return ( + sfc[list(SFC_VARIABLES)] + .rename(DIMS) + .transpose("forecast_time", "tile", "y", "x") + .drop(["forecast_time", "y", "x"]) + ) + + +def init_data_var(group: zarr.Group, array: xr.DataArray, nt: int): + logger.info(f"Initializing variable: {array.name}") + shape = (nt,) + array.data.shape + chunks = (1,) + tuple(size[0] for size in array.data.chunks) + out_array = group.empty( + name=array.name, shape=shape, chunks=chunks, dtype=array.dtype + ) + out_array.attrs.update(array.attrs) + out_array.attrs["_ARRAY_DIMENSIONS"] = ["initial_time"] + list(array.dims) + + +def init_coord(group: zarr.Group, coord): + logger.info(f"Initializing coordinate: {coord.name}") + # fill_value=NaN is needed below for xr.open_zarr to succesfully load this + # coordinate if decode_cf=True. Otherwise, time=0 gets filled in as nan. very + # confusing... + out_array = group.array(name=coord.name, data=np.asarray(coord), fill_value="NaN") + out_array.attrs.update(coord.attrs) + out_array.attrs["_ARRAY_DIMENSIONS"] = list(coord.dims) + + +def create_zarr_store( + timesteps: Sequence[str], group: zarr.Group, template: xr.Dataset +): + logger.info("Creating group") + ds = template + group.attrs.update(ds.attrs) + nt = len(timesteps) + for name in ds: + init_data_var(group, ds[name], nt) + + for name in ds.coords: + init_coord(group, ds[name]) + dim = group.array("initial_time", data=timesteps) + dim.attrs["_ARRAY_DIMENSIONS"] = ["initial_time"] + + +def _get_forecast_time(time) -> xr.DataArray: + dt = np.asarray(time - time[0]) + return xr.DataArray( + _convert_time_delta_to_float_seconds(dt), + name="time", + dims=["time"], + attrs={"units": "s"}, + ) + + +def _convert_time_delta_to_float_seconds(a): + ns_per_s = 1e9 + return a.astype("timedelta64[ns]").astype(float) / ns_per_s + + +def _merge_monitor_data(paths: Mapping[str, str]) -> xr.Dataset: + datasets = {key: xr.open_zarr(val) for key, val in paths.items()} + time = _get_forecast_time(datasets["begin"].time) + datasets_no_time = [val.drop("time") for val in datasets.values()] + steps = list(datasets.keys()) + return xr.concat(datasets_no_time, dim="step").assign_coords(step=steps, time=time) + + +def _write_to_store(group: zarr.ABSStore, index: int, ds: xr.Dataset): + for variable in ds: + logger.info(f"Writing {variable} to {group}") + dims = group[variable].attrs["_ARRAY_DIMENSIONS"][1:] + dask_arr = ds[variable].transpose(*dims).data + dask_arr.store(group[variable], regions=(index,)) + + +def _safe_get_variables(ds: xr.Dataset, variables: Sequence[Hashable]) -> xr.Dataset: + """ds[...] is very confusing function from a typing perspective and should be + avoided in long-running pipeline codes. This function introduces a type-stable + alternative that works better with mypy. + + In particular, ds[('a' , 'b' ,'c')] looks for a variable named ('a', 'b', 'c') which + usually doesn't exist, so it causes a key error. but ds[['a', 'b', 'c']] makes a + dataset only consisting of the variables 'a', 'b', and 'c'. This causes tons of + hard to find errors. + """ + variables = list(variables) + return cast(xr.Dataset, ds[variables]) + + +def post_process( + monitor_paths: Mapping[str, str], + sfc_pattern: str, + store_url: str, + index: int, + init: bool = False, + timesteps: Sequence = (), +): + + if init and len(timesteps) > 0 and index: + raise ValueError( + f"To initialize the zarr store, {timesteps} must not be empty." + ) + logger.info("Post processing model outputs") + + sfc = xr.open_mfdataset(sfc_pattern, concat_dim="tile", combine="nested").pipe( + rename_sfc_dt_atmos + ) + sfc = _safe_get_variables(sfc, SFC_VARIABLES) + + ds = ( + _merge_monitor_data(monitor_paths) + .rename({"time": "forecast_time"}) + .chunk({"forecast_time": 1, "tile": 6, "step": 3}) + ) + + merged = xr.merge([sfc, ds]) + mapper = fsspec.get_mapper(store_url) + + if init: + logging.info("initializing zarr store") + group = zarr.open_group(mapper, mode="w") + create_zarr_store(timesteps, group, merged) + + group = zarr.open_group(mapper, mode="a") + _write_to_store(group, index, merged) + + +if __name__ == "__main__": + import fv3gfs + from mpi4py import MPI + + RUN_DIR = os.path.dirname(os.path.realpath(__file__)) + + rank = MPI.COMM_WORLD.Get_rank() + size = MPI.COMM_WORLD.Get_size() + current_dir = os.getcwd() + config = runtime.get_config() + MPI.COMM_WORLD.barrier() # wait for master rank to write run directory + logger = logging.getLogger(f"one_step:{rank}/{size}:{config['one_step']['index']}") + + partitioner = fv3gfs.CubedSpherePartitioner.from_namelist(config["namelist"]) + + sfc_pattern = f"{RUN_DIR}/sfc_dt_atmos.tile?.nc" + paths = dict( + begin=os.path.join(RUN_DIR, "before_physics.zarr"), + after_physics=os.path.join(RUN_DIR, "after_physics.zarr"), + after_dynamics=os.path.join(RUN_DIR, "after_dynamics.zarr"), + ) + + monitors = { + key: fv3gfs.ZarrMonitor(path, partitioner, mode="w", mpi_comm=MPI.COMM_WORLD,) + for key, path in paths.items() + } + + fv3gfs.initialize() + state = fv3gfs.get_state(names=VARIABLES + (TIME,)) + if rank == 0: + logger.info("Beginning steps") + for i in range(fv3gfs.get_step_count()): + if rank == 0: + logger.info(f"step {i}") + monitors["begin"].store(state) + fv3gfs.step_dynamics() + state = fv3gfs.get_state(names=VARIABLES + (TIME,)) + monitors["after_dynamics"].store(state) + fv3gfs.step_physics() + state = fv3gfs.get_state(names=VARIABLES + (TIME,)) + monitors["after_physics"].store(state) + + # parallelize across variables + fv3gfs.cleanup() + del monitors + + if rank == 0: + # TODO it would be much cleaner to call this is a separate script, but that + # would be incompatible with the run_k8s api + # sleep a little while to allow all process to finish finalizing the netCDFs + time.sleep(2) + c = config["one_step"] + url = c.pop("url") + index = c.pop("index") + post_process(paths, sfc_pattern, url, index, **c) +else: + logger = logging.getLogger(__name__) diff --git a/workflows/one_step_jobs/submit_jobs.py b/workflows/one_step_jobs/submit_jobs.py deleted file mode 100644 index 64da5d99c9..0000000000 --- a/workflows/one_step_jobs/submit_jobs.py +++ /dev/null @@ -1,89 +0,0 @@ -import logging -import os -import argparse -import yaml -from pathlib import Path -import fv3net.pipelines.kube_jobs.one_step as one_step - -logger = logging.getLogger("run_jobs") - -RUNDIRS_DIRECTORY_NAME = "one_step_output" -CONFIG_DIRECTORY_NAME = "one_step_config" -PWD = Path(os.path.abspath(__file__)).parent -LOCAL_VGRID_FILE = os.path.join(PWD, one_step.VERTICAL_GRID_FILENAME) - - -def _get_arg_parser(): - parser = argparse.ArgumentParser() - parser.add_argument( - "one-step-yaml", type=str, help="Path to local run configuration yaml.", - ) - parser.add_argument( - "input-url", - type=str, - help="Remote url to initial conditions. Initial conditions are assumed to be " - "stored as INPUT_URL/{timestamp}/{timestamp}.{restart_category}.tile*.nc", - ) - parser.add_argument( - "output-url", - type=str, - help="Remote url where model configuration and output will be saved. " - "Specifically, configuration files will be saved to OUTPUT_URL/" - f"{CONFIG_DIRECTORY_NAME} and model output to OUTPUT_URL/" - f"{RUNDIRS_DIRECTORY_NAME}", - ) - parser.add_argument( - "--n-steps", - type=int, - default=None, - help="Number of timesteps to process. By default all timesteps " - "found in INPUT_URL for which successful runs do not exist in " - "OUTPUT_URL will be processed. Useful for testing.", - ) - parser.add_argument( - "-o", - "--overwrite", - action="store_true", - help="Overwrite successful timesteps in OUTPUT_URL.", - ) - parser.add_argument( - "--init-frequency", - type=int, - required=False, - help="Frequency (in minutes) to initialize one-step jobs starting from" - " the first available timestep.", - ) - - return parser - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - parser = _get_arg_parser() - args = parser.parse_args() - - with open(args.one_step_yaml) as file: - one_step_config = yaml.load(file, Loader=yaml.FullLoader) - workflow_name = Path(args.one_step_yaml).with_suffix("").name - - output_url = os.path.join(args.output_url, RUNDIRS_DIRECTORY_NAME) - config_url = os.path.join(args.output_url, CONFIG_DIRECTORY_NAME) - - timestep_list = one_step.timesteps_to_process( - args.input_url, - args.output_url, - args.n_steps, - args.overwrite, - subsample_frequency=args.init_frequency, - ) - - one_step.submit_jobs( - timestep_list, - workflow_name, - one_step_config, - args.input_url, - output_url, - config_url, - local_vertical_grid_file=LOCAL_VGRID_FILE, - ) diff --git a/workflows/one_step_jobs/test_runfile.py b/workflows/one_step_jobs/test_runfile.py new file mode 100644 index 0000000000..8f58533046 --- /dev/null +++ b/workflows/one_step_jobs/test_runfile.py @@ -0,0 +1,41 @@ +import runfile +import zarr +import xarray as xr +import numpy as np + + +def test_init_coord(): + + time = np.array( + [ + "2016-08-01T00:16:00.000000000", + "2016-08-01T00:17:00.000000000", + "2016-08-01T00:18:00.000000000", + "2016-08-01T00:19:00.000000000", + "2016-08-01T00:20:00.000000000", + "2016-08-01T00:21:00.000000000", + "2016-08-01T00:22:00.000000000", + "2016-08-01T00:23:00.000000000", + "2016-08-01T00:24:00.000000000", + "2016-08-01T00:25:00.000000000", + "2016-08-01T00:26:00.000000000", + "2016-08-01T00:27:00.000000000", + "2016-08-01T00:28:00.000000000", + "2016-08-01T00:29:00.000000000", + "2016-08-01T00:30:00.000000000", + ], + dtype="datetime64[ns]", + ) + + ds = xr.Dataset(coords={"time": time}) + time = runfile._get_forecast_time(ds.time) + + ds_lead_time = ds.assign(time=time) + + store = {} + + group = zarr.open_group(store, mode="w") + runfile.init_coord(group, ds_lead_time["time"]) + + loaded = xr.open_zarr(store) + np.testing.assert_equal(loaded.time.values, ds_lead_time.time.values) diff --git a/workflows/one_step_jobs/zarr_stat.py b/workflows/one_step_jobs/zarr_stat.py new file mode 100644 index 0000000000..07352298e6 --- /dev/null +++ b/workflows/one_step_jobs/zarr_stat.py @@ -0,0 +1,30 @@ +import fsspec +import xarray as xr + +print("output structure:") +print() +for root, dirname, filename in fsspec.filesystem("gs").walk( + "gs://vcm-ml-data/testing-noah/one-step" +): + if "big.zarr" not in root: + for name in filename: + print(f"{root}/{name}") + for name in dirname: + print(f"{root}/{name}/") + + +url = "gs://vcm-ml-data/testing-noah/one-step/big.zarr/" +m = fsspec.get_mapper(url) +ds = xr.open_zarr(m) + +print() +print("big.zarr info:") +print(ds) +print() +print(ds.info()) + +print() +print("data size:", ds.isel(initial_time=0).nbytes / 1e9, "GB/initial time") + + +# print(ds.air_temperature.std().compute())