diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000000..fea518f86f --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,17 @@ +# Contributions Guides and Standards + + +## fv3net + +- Only imports to `vcm`, `vcm.cubedsphere`, and `vcm.cloud` are allowed. No + deeper nesting (e.g. `vcm.cloud.fsspec`) or imports to other modules are + allowed. + + +## vcm + +- The external interfaces are the modules `vcm`, `vcm.cubedsphere`, and + `vcm.cloud`. All routines to be used externally should be imported into one + of these namespaces. This rule could change pending future changes to the vcm API. + + diff --git a/Makefile b/Makefile index 716ce26156..2f19120644 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ ################################################################################# # GLOBALS # ################################################################################# -VERSION = 0.1.0 +VERSION = v0.1.0 ENVIRONMENT_SCRIPTS = .environment-scripts PROJECT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) BUCKET = [OPTIONAL] your-bucket-for-syncing-data (do not include 's3://') diff --git a/external/fv3config b/external/fv3config index e6dc95cef8..6bde7b7354 160000 --- a/external/fv3config +++ b/external/fv3config @@ -1 +1 @@ -Subproject commit e6dc95cef8c755e608dc432c97238907d2cdf558 +Subproject commit 6bde7b7354fa3c4c512a7178ee9b55b45189d76e diff --git a/external/vcm/tests/test_visualize.py b/external/vcm/tests/test_visualize.py index 5be6523ccb..1ee629eef5 100644 --- a/external/vcm/tests/test_visualize.py +++ b/external/vcm/tests/test_visualize.py @@ -269,7 +269,7 @@ def test_plot_cube_axes(sample_dataset, plotting_function): "plotting_function", [("pcolormesh"), ("contour"), ("contourf")] ) def test_plot_cube_with_facets(sample_dataset, plotting_function): - f, axes, hs, cbar = plot_cube( + f, axes, hs, cbar, facet_grid = plot_cube( mappable_var(sample_dataset, "t2m"), col="time", plotting_function=plotting_function, @@ -281,7 +281,7 @@ def test_plot_cube_with_facets(sample_dataset, plotting_function): ) def test_plot_cube_on_axis(sample_dataset, plotting_function): ax = plt.axes(projection=ccrs.Robinson()) - f, axes, hs, cbar = plot_cube( + f, axes, hs, cbar, facet_grid = plot_cube( mappable_var(sample_dataset, "t2m").isel(time=0), plotting_function=plotting_function, ax=ax, diff --git a/external/vcm/vcm/__init__.py b/external/vcm/vcm/__init__.py index 2d33a753e8..8783a91bc1 100644 --- a/external/vcm/vcm/__init__.py +++ b/external/vcm/vcm/__init__.py @@ -11,6 +11,14 @@ parse_timestep_str_from_path, parse_datetime_from_str, ) +from .calc import mass_integrate +from .calc.thermo import ( + net_heating, + net_precipitation, + pressure_at_midpoint_log, + potential_temperature, +) from .coarsen import coarsen_restarts_on_pressure, coarsen_restarts_on_sigma +from .select import mask_to_surface_type from .visualize import plot_cube, mappable_var, plot_cube_axes from .xarray_loaders import open_tiles, open_delayed diff --git a/external/vcm/vcm/calc/thermo.py b/external/vcm/vcm/calc/thermo.py index c1b0c02a96..b32e5c4eb4 100644 --- a/external/vcm/vcm/calc/thermo.py +++ b/external/vcm/vcm/calc/thermo.py @@ -2,6 +2,7 @@ import xarray as xr from ..cubedsphere.constants import COORD_Z_CENTER, COORD_Z_OUTER + # following are defined as in FV3GFS model (see FV3/fms/constants/constants.f90) _GRAVITY = 9.80665 # m /s2 _RDGAS = 287.05 # J / K / kg @@ -19,6 +20,8 @@ _REFERENCE_SURFACE_PRESSURE = 100000 # reference pressure for potential temp [Pa] _REVERSE = slice(None, None, -1) +_SEC_PER_DAY = 86400 + def potential_temperature(P, T): return T * (_REFERENCE_SURFACE_PRESSURE / P) ** _POISSON_CONST @@ -196,8 +199,7 @@ def net_heating( """ lv = latent_heat_vaporization(surface_temperature) - - return ( + da = ( -dlw_sfc - dsw_sfc + ulw_sfc @@ -208,6 +210,8 @@ def net_heating( + shf + surface_rain_rate * lv ) + da.attrs = {"long_name": "net heating from model physics", "units": "W/m^2"} + return da def latent_heat_flux_to_evaporation( @@ -226,35 +230,9 @@ def latent_heat_flux_to_evaporation( return lhf / latent_heat_vaporization(surface_temperature) -def net_heating_from_dataset(ds: xr.Dataset, suffix: str = None) -> xr.DataArray: - """Compute the net heating from a dataset of diagnostic output - - This should be equivalent to the vertical integral (i.e. <>) of Q1:: - - cp - - Args: - ds: a datasets with the names for the heat fluxes and precipitation used - by the ML pipeline - suffix: (optional) suffix of flux data vars if applicable. Will add "_" before - appending to variable names if not already in suffix. - - Returns: - the total net heating, the rate of change of the dry enthalpy - """ - if suffix and suffix[0] != "_": - suffix = "_" + suffix - elif not suffix or suffix == "": - suffix = "" - fluxes = ( - ds["DLWRFsfc" + suffix], - ds["DSWRFsfc" + suffix], - ds["ULWRFsfc" + suffix], - ds["ULWRFtoa" + suffix], - ds["USWRFsfc" + suffix], - ds["USWRFtoa" + suffix], - ds["DSWRFtoa" + suffix], - ds["SHTFLsfc" + suffix], - ds["PRATEsfc" + suffix], +def net_precipitation(lhf, prate): + da = (prate - latent_heat_flux_to_evaporation(lhf)) * _SEC_PER_DAY + da.attrs = ( + {"long_name": "net precipitation from model physics", "units": "mm/day"}, ) - return net_heating(*fluxes) + return da diff --git a/external/vcm/vcm/visualize/plot_cube.py b/external/vcm/vcm/visualize/plot_cube.py index 78fb6863c9..3c54d0b596 100644 --- a/external/vcm/vcm/visualize/plot_cube.py +++ b/external/vcm/vcm/visualize/plot_cube.py @@ -105,10 +105,13 @@ def plot_cube( cbar (obj): `plt.colorbar` object handle associated with figure, if `colorbar` arg is True, else None. + facet_grid (xarray.plot.facetgrid): + xarray plotting facetgrid for multi-axes case. In single-axes case, + retunrs None. Example: # plot diag winds at two times - fig, axes, hs, cbar = plot_cube( + fig, axes, hs, cbar, facet_grid = plot_cube( mappable_var(diag_ds, 'VGRD850').isel(time = slice(2, 4)), plotting_function = "contourf", col = "time", @@ -165,6 +168,7 @@ def plot_cube( handle = _plot_func_short(array, ax=ax) axes = np.array(ax) handles = [handle] + facet_grid = None if coastlines: coastlines_kwargs = dict() if not coastlines_kwargs else coastlines_kwargs @@ -186,7 +190,7 @@ def plot_cube( else: cbar = None - return fig, axes, handles, cbar + return fig, axes, handles, cbar, facet_grid def mappable_var(ds: xr.Dataset, var_name: str): diff --git a/external/vcm/vcm/visualize/plot_diagnostics.py b/external/vcm/vcm/visualize/plot_diagnostics.py index 6289bae0d7..ed3b5267ab 100644 --- a/external/vcm/vcm/visualize/plot_diagnostics.py +++ b/external/vcm/vcm/visualize/plot_diagnostics.py @@ -82,7 +82,9 @@ def plot_diag_var_single_map(da, grid, var_name, plot_cube_kwargs=None): """ da = da.rename(var_name) ds_mappable = mappable_var(xr.merge([da, grid]), var_name) - fig, axes, handles, cbar = plot_cube(ds_mappable, **(plot_cube_kwargs or {})) + fig, axes, handles, cbar, facet_grid = plot_cube( + ds_mappable, **(plot_cube_kwargs or {}) + ) return fig diff --git a/fv3net/diagnostics/data_funcs.py b/fv3net/diagnostics/data_funcs.py index 34445c9666..3621db3e1e 100644 --- a/fv3net/diagnostics/data_funcs.py +++ b/fv3net/diagnostics/data_funcs.py @@ -2,6 +2,7 @@ import pandas as pd import xarray as xr +import vcm from vcm.select import drop_nondim_coords, get_latlon_grid_coords # give as [lat, lon] @@ -12,10 +13,11 @@ "central_canada": [55.0, 258.0], "tropical_west_pacific": [-5.0, 165.0], } +_KG_M2S_TO_MM_DAY = 86400 # kg/m2/s same as mm/s. Using 1000 km/m3 for H20 density def merge_comparison_datasets( - var, datasets, dataset_labels, grid, additional_dataset=None + data_vars, datasets, dataset_labels, grid, additional_dataset=None ): """ Makes a comparison dataset out of multiple datasets that all have a common data variable. They are concatenated with a new dim "dataset" that can be used @@ -39,7 +41,7 @@ def merge_comparison_datasets( src_dim_index = pd.Index(dataset_labels, name="dataset") datasets = [drop_nondim_coords(ds) for ds in datasets] datasets_to_merge = [ - xr.concat([ds[var].squeeze(drop=True) for ds in datasets], src_dim_index), + xr.concat([ds[data_vars].squeeze(drop=True) for ds in datasets], src_dim_index), grid, ] if additional_dataset is not None: @@ -90,3 +92,37 @@ def _conditions(d): cond = np.vectorize(_conditions) return cond(phase) + + +def net_heating_from_dataset(ds: xr.Dataset, suffix: str = None) -> xr.DataArray: + """Compute the net heating from a dataset of diagnostic output + + This should be equivalent to the vertical integral (i.e. <>) of Q1:: + + cp + + Args: + ds: a datasets with the names for the heat fluxes and precipitation used + by the ML pipeline + suffix: (optional) suffix of flux data vars if applicable. Will add '_' before + appending to variable names if not already in suffix. + + Returns: + the total net heating, the rate of change of the dry enthalpy + """ + if suffix and suffix[0] != "_": + suffix = "_" + suffix + elif not suffix or suffix == "": + suffix = "" + fluxes = ( + ds["DLWRFsfc" + suffix], + ds["DSWRFsfc" + suffix], + ds["ULWRFsfc" + suffix], + ds["ULWRFtoa" + suffix], + ds["USWRFsfc" + suffix], + ds["USWRFtoa" + suffix], + ds["DSWRFtoa" + suffix], + ds["SHTFLsfc" + suffix], + ds["PRATEsfc" + suffix], + ) + return vcm.net_heating(*fluxes) diff --git a/fv3net/diagnostics/sklearn_model_performance/__main__.py b/fv3net/diagnostics/sklearn_model_performance/__main__.py index cae28b02fd..bf6c64afdc 100644 --- a/fv3net/diagnostics/sklearn_model_performance/__main__.py +++ b/fv3net/diagnostics/sklearn_model_performance/__main__.py @@ -36,8 +36,9 @@ help="Output dir to write results to. Can be local or a GCS path.", ) parser.add_argument( - "num_test_zarrs", + "--num_test_zarrs", type=int, + default=4, help="Number of zarrs to concat together for use as test set.", ) parser.add_argument( @@ -61,6 +62,7 @@ help="Factor by which to downsample test set time steps", ) args = parser.parse_args() + args.test_data_path = os.path.join(args.test_data_path, "test") # if output path is remote GCS location, save results to local output dir first proto = get_protocol(args.output_path) diff --git a/fv3net/diagnostics/sklearn_model_performance/data_funcs_sklearn.py b/fv3net/diagnostics/sklearn_model_performance/data_funcs_sklearn.py index 088366d9ae..4fc978fad6 100644 --- a/fv3net/diagnostics/sklearn_model_performance/data_funcs_sklearn.py +++ b/fv3net/diagnostics/sklearn_model_performance/data_funcs_sklearn.py @@ -1,14 +1,15 @@ -from scipy.interpolate import UnivariateSpline import os +from scipy.interpolate import UnivariateSpline import xarray as xr import fv3net +from ..data_funcs import net_heating_from_dataset from fv3net.pipelines.create_training_data import ( SUFFIX_COARSE_TRAIN_DIAG, VAR_Q_HEATING_ML, VAR_Q_MOISTENING_ML, ) -from vcm.calc import mass_integrate, thermo +import vcm from vcm.cloud.fsspec import get_fs from vcm.convenience import round_time from vcm.cubedsphere.constants import ( @@ -23,12 +24,24 @@ kg_m2_to_mm, SPECIFIC_HEAT_CONST_PRESSURE, GRAVITY, - SEC_PER_DAY, ) SAMPLE_DIM = "sample" STACK_DIMS = ["tile", INIT_TIME_DIM, COORD_X_CENTER, COORD_Y_CENTER] +THERMO_DATA_VAR_ATTRS = { + "net_precipitation": {"long_name": "net column precipitation", "units": "mm/day"}, + "net_heating": {"long_name": "net column heating", "units": "W/m^2"}, + "net_precipitation_ml": { + "long_name": "residual P-E predicted by ML model", + "units": "mm/day", + }, + "net_heating_ml": { + "long_name": "residual heating predicted by ML model", + "units": "W/m^2", + }, +} + def predict_on_test_data( test_data_path, @@ -79,11 +92,11 @@ def load_high_res_diag_dataset(coarsened_hires_diags_path, init_times): f"are not matched in high res dataset." ) - evaporation = thermo.latent_heat_flux_to_evaporation(ds_hires["LHTFLsfc_coarse"]) - ds_hires["P-E_total"] = SEC_PER_DAY * (ds_hires["PRATEsfc_coarse"] - evaporation) - ds_hires["heating_total"] = thermo.net_heating_from_dataset( - ds_hires, suffix="coarse" + ds_hires["net_precipitation"] = vcm.net_precipitation( + ds_hires[f"LHTFLsfc_coarse"], ds_hires[f"PRATEsfc_coarse"] ) + ds_hires["net_heating"] = net_heating_from_dataset(ds_hires, suffix="coarse") + return ds_hires @@ -95,17 +108,28 @@ def add_column_heating_moistening(ds): ds (xarray dataset): train/test or prediction dataset that has dQ1, dQ2, delp, precip and LHF data variables """ - ds["P-E_total"] = ( - mass_integrate(-ds[VAR_Q_MOISTENING_ML], ds.delp) - - thermo.latent_heat_flux_to_evaporation( - ds[f"LHTFLsfc_{SUFFIX_COARSE_TRAIN_DIAG}"] - ) - + ds[f"PRATEsfc_{SUFFIX_COARSE_TRAIN_DIAG}"] - ) * kg_m2s_to_mm_day - ds["heating_total"] = SPECIFIC_HEAT_CONST_PRESSURE * mass_integrate( + ds["net_precipitation_ml"] = ( + vcm.mass_integrate(-ds[VAR_Q_MOISTENING_ML], ds.delp) * kg_m2s_to_mm_day + ) + ds["net_precipitation_physics"] = vcm.net_precipitation( + ds[f"LHTFLsfc_{SUFFIX_COARSE_TRAIN_DIAG}"], + ds[f"PRATEsfc_{SUFFIX_COARSE_TRAIN_DIAG}"], + ) + + ds["net_precipitation"] = ( + ds["net_precipitation_ml"] + ds["net_precipitation_physics"] + ) + + ds["net_heating_ml"] = SPECIFIC_HEAT_CONST_PRESSURE * vcm.mass_integrate( ds[VAR_Q_HEATING_ML], ds.delp - ) + thermo.net_heating_from_dataset(ds, suffix=SUFFIX_COARSE_TRAIN_DIAG) + ) + ds["net_heating_physics"] = net_heating_from_dataset( + ds, suffix=SUFFIX_COARSE_TRAIN_DIAG + ) + ds["net_heating"] = ds["net_heating_ml"] + ds["net_heating_physics"] + for data_var, data_attrs in THERMO_DATA_VAR_ATTRS.items(): + ds[data_var].attrs = data_attrs def integrate_for_Q(P, sphum, lower_bound=55000, upper_bound=85000): @@ -114,7 +138,7 @@ def integrate_for_Q(P, sphum, lower_bound=55000, upper_bound=85000): def lower_tropospheric_stability(ds): - pressure = thermo.pressure_at_midpoint_log(ds.delp) + pressure = vcm.pressure_at_midpoint_log(ds.delp) T_at_700mb = ( regrid_to_shared_coords( ds["T"], @@ -126,5 +150,5 @@ def lower_tropospheric_stability(ds): .squeeze() .drop("p700mb") ) - theta_700mb = thermo.potential_temperature(70000, T_at_700mb) + theta_700mb = vcm.potential_temperature(70000, T_at_700mb) return theta_700mb - ds["tsea"] diff --git a/fv3net/diagnostics/sklearn_model_performance/plotting_sklearn.py b/fv3net/diagnostics/sklearn_model_performance/plotting_sklearn.py index 834f52498c..a09dc76ed9 100644 --- a/fv3net/diagnostics/sklearn_model_performance/plotting_sklearn.py +++ b/fv3net/diagnostics/sklearn_model_performance/plotting_sklearn.py @@ -15,9 +15,9 @@ VAR_LAT_CENTER, PRESSURE_GRID, ) +import vcm from vcm.cubedsphere.regridz import regrid_to_common_pressure from vcm.select import mask_to_surface_type -from vcm.calc.thermo import pressure_at_midpoint_log from vcm.visualize import plot_cube, mappable_var from vcm.visualize.plot_diagnostics import plot_diurnal_cycle @@ -81,29 +81,56 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): "slmsk" ) ds_pe = merge_comparison_datasets( - "P-E_total", + "net_precipitation", [ds_pred, ds_target, ds_hires], ["prediction", "target C48", "coarsened high res"], grid, slmsk, ) ds_heating = merge_comparison_datasets( - "heating_total", + "net_heating", [ds_pred, ds_target, ds_hires], ["prediction", "target C48", "coarsened high res"], grid, slmsk, ) + # , and as fraction of total 2D integrated vars + ds = merge_comparison_datasets( + data_vars=[ + "net_precipitation_ml", + "net_heating_ml", + "net_precipitation", + "net_heating", + ], + datasets=[ds_pred, ds_target], + dataset_labels=["prediction", "target C48"], + grid=grid, + ) + figs = map_plot_ml_frac_of_total(ds) + fig_pe_ml, fig_pe_ml_frac, fig_heating_ml, fig_heating_ml_frac = figs + fig_pe_ml.savefig(os.path.join(output_dir, "dQ2_vertical_integral_map.png")) + fig_pe_ml_frac.savefig(os.path.join(output_dir, "dQ2_frac_of_PE.png")) + fig_heating_ml.savefig(os.path.join(output_dir, "dQ1_vertical_integral_map.png")) + fig_heating_ml_frac.savefig(os.path.join(output_dir, "dQ1_frac_of_heating.png")) + report_sections["ML model contributions to Q1 and Q2"] = [ + "dQ2_vertical_integral_map.png", + "dQ2_frac_of_PE.png", + "dQ1_vertical_integral_map.png", + "dQ1_frac_of_heating.png", + ] + # LTS PE_pred = ( - mask_to_surface_type(ds_pe.sel(dataset="prediction"), "sea")["P-E_total"] + mask_to_surface_type(ds_pe.sel(dataset="prediction"), "sea")[ + "net_precipitation" + ] .squeeze() .drop("dataset") ) PE_hires = ( mask_to_surface_type(ds_pe.sel(dataset="coarsened high res"), "sea")[ - "P-E_total" + "net_precipitation" ] .squeeze() .drop("dataset") @@ -152,20 +179,23 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): ds_pe["local_time"] = local_time(ds_pe) ds_heating["local_time"] = local_time(ds_heating) plot_diurnal_cycle( - mask_to_surface_type(ds_pe, "sea"), "P-E_total", title="ocean" + mask_to_surface_type(ds_pe, "sea"), "net_precipitation", title="ocean" ).savefig( os.path.join(output_dir, "diurnal_cycle_P-E_sea.png"), dpi=DPI_FIGURES["diurnal_cycle"], ) plot_diurnal_cycle( - mask_to_surface_type(ds_pe, "land"), "P-E_total", title="land" + mask_to_surface_type(ds_pe, "land"), "net_precipitation", title="land" ).savefig( os.path.join(output_dir, "diurnal_cycle_P-E_land.png"), dpi=DPI_FIGURES["diurnal_cycle"], ) for location_name, coords in local_coords.items(): plot_diurnal_cycle( - ds_pe.sel(coords), "P-E_total", title=location_name, ylabel="P-E [mm]" + ds_pe.sel(coords), + "net_precipitation", + title=location_name, + ylabel="P-E [mm/day]", ).savefig( os.path.join(output_dir, f"diurnal_cycle_P-E_{location_name}.png"), dpi=DPI_FIGURES["diurnal_cycle"], @@ -177,13 +207,13 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): # plot column heating across the diurnal cycle plot_diurnal_cycle( - mask_to_surface_type(ds_heating, "sea"), "heating_total", title="sea" + mask_to_surface_type(ds_heating, "sea"), "net_heating", title="sea" ).savefig( os.path.join(output_dir, "diurnal_cycle_heating_sea.png"), dpi=DPI_FIGURES["diurnal_cycle"], ) plot_diurnal_cycle( - mask_to_surface_type(ds_heating, "land"), "heating_total", title="land" + mask_to_surface_type(ds_heating, "land"), "net_heating", title="land" ).savefig( os.path.join(output_dir, "diurnal_cycle_heating_land.png"), dpi=DPI_FIGURES["diurnal_cycle"], @@ -192,7 +222,7 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): for location_name, coords in local_coords.items(): plot_diurnal_cycle( ds_heating.sel(coords), - "heating_total", + "net_heating", title=location_name, ylabel="heating [W/m$^2$]", ).savefig( @@ -207,7 +237,7 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): # map plot variables and compare across prediction/ C48 /coarsened high res data _plot_comparison_maps( ds_pe, - "P-E_total", + "net_precipitation", time_index_selection=None, plot_cube_kwargs={"cbar_label": "time avg, P-E [mm/day]"}, ).savefig( @@ -215,7 +245,7 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): ) _plot_comparison_maps( ds_pe, - "P-E_total", + "net_precipitation", time_index_selection=[0, 2], plot_cube_kwargs={"cbar_label": "timestep snapshot, P-E [mm/day]"}, ).savefig( @@ -226,7 +256,7 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): _plot_comparison_maps( ds_heating, - "heating_total", + "net_heating", time_index_selection=None, plot_cube_kwargs={"cbar_label": "time avg, column heating [W/m$^2$]"}, ).savefig( @@ -235,7 +265,7 @@ def make_all_plots(ds_pred, ds_target, ds_hires, grid, output_dir): ) _plot_comparison_maps( ds_heating, - "heating_total", + "net_heating", time_index_selection=[0, -1], plot_cube_kwargs={"cbar_label": "timestep snapshot, column heating [W/m$^2$]"}, ).savefig( @@ -357,7 +387,10 @@ def _make_vertical_profile_plots(ds_pred, ds_target, var, units, title=None): plt.clf() fig = plt.figure() - pos_mask, neg_mask = ds_target["P-E_total"] > 0, ds_target["P-E_total"] < 0 + pos_mask, neg_mask = ( + ds_target["net_precipitation"] > 0, + ds_target["net_precipitation"] < 0, + ) ds_pred = regrid_to_common_pressure(ds_pred[var], ds_pred["delp"]) ds_target = regrid_to_common_pressure(ds_target[var], ds_target["delp"]) @@ -402,7 +435,7 @@ def _plot_lower_troposphere_stability(ds, PE_pred, PE_hires, lat_max=20): .dropna("sample") ) - ds["pressure"] = pressure_at_midpoint_log(ds["delp"]) + ds["pressure"] = vcm.pressure_at_midpoint_log(ds["delp"]) Q = [ integrate_for_Q(p, qt) for p, qt in zip(ds["pressure"].values.T, ds["sphum"].values.T) @@ -443,3 +476,50 @@ def _plot_lower_troposphere_stability(ds, PE_pred, PE_hires, lat_max=20): ax3.set_title("Avg P-E error (predicted - high res)") plt.show() return fig + + +def map_plot_ml_frac_of_total(ds): + """ Produces plots of the ML predicted components dQ of column heating + and moistening + + Args: + ds (xarray dataset): dataset with "dataset" dimension denoting whether + the dQ quantity was from the high-low res tendency or the ML model prediction + + Returns: + Figure objects for plots of ML predictions of {column heating, P-E} + for both the absolute ML prediction value as well as the ML + prediction as a fraction of the total quantity (ML + physics) + """ + ds = ds.assign( + { + "net_precipitation_ml_frac_of_total": ds["net_precipitation_ml"] + / ds["net_precipitation"], + "net_heating_ml_frac_of_total": ds["net_heating_ml"] / ds["net_heating"], + } + ) + fig_pe_ml = plot_cube( + mappable_var(ds, "net_precipitation_ml").mean(INIT_TIME_DIM), col="dataset" + )[0] + fig_pe_ml.suptitle("P-E [mm/d]: ML contribution") + fig_pe_ml_frac = plot_cube( + mappable_var(ds, "net_precipitation_ml_frac_of_total").mean(INIT_TIME_DIM), + col="dataset", + vmin=-1, + vmax=1, + )[0] + fig_pe_ml_frac.suptitle("P-E: ML prediction as fraction of total") + + fig_heating_ml = plot_cube( + mappable_var(ds, "net_heating_ml").mean(INIT_TIME_DIM), col="dataset" + )[0] + fig_heating_ml.suptitle("heating [W/m$^2$], ML contribution") + fig_heating_ml_frac = plot_cube( + mappable_var(ds, "net_heating_ml_frac_of_total").mean(INIT_TIME_DIM), + col="dataset", + vmin=-1, + vmax=1, + )[0] + fig_heating_ml_frac.suptitle("heating: ML prediction as fraction of total") + + return fig_pe_ml, fig_pe_ml_frac, fig_heating_ml, fig_heating_ml_frac diff --git a/fv3net/pipelines/coarsen_restarts/__main__.py b/fv3net/pipelines/coarsen_restarts/__main__.py index b7e332b7bd..157f7896c4 100644 --- a/fv3net/pipelines/coarsen_restarts/__main__.py +++ b/fv3net/pipelines/coarsen_restarts/__main__.py @@ -14,14 +14,6 @@ type=str, help="Full GCS path to input data for downloading timesteps.", ) - parser.add_argument( - "gcs_dst_dir", - type=str, - help=( - "Full GCS path to output coarsened timestep data. Defaults to input path" - "with target resolution appended as a directory" - ), - ) parser.add_argument( "gcs_grid_spec_path", type=str, @@ -36,6 +28,14 @@ parser.add_argument( "target_resolution", type=int, help="Target coarsening resolution to output.", ) + parser.add_argument( + "gcs_dst_dir", + type=str, + help=( + "Full GCS path to output coarsened timestep data. Defaults to input path" + "with target resolution appended as a directory" + ), + ) parser.add_argument( "--no-target-subdir", action="store_true", diff --git a/fv3net/pipelines/create_training_data/pipeline.py b/fv3net/pipelines/create_training_data/pipeline.py index e33730a384..7510fa35e6 100644 --- a/fv3net/pipelines/create_training_data/pipeline.py +++ b/fv3net/pipelines/create_training_data/pipeline.py @@ -230,8 +230,8 @@ def _open_cloud_data(run_dirs): ds_run = helpers._set_relative_forecast_time_coord(ds_run) ds_runs.append(ds_run) return xr.concat(ds_runs, INIT_TIME_DIM) - except (ValueError, TypeError, AttributeError, KeyError) as e: - logger.error(f"Failed to open restarts from cloud: {e}") + except (IndexError, ValueError, TypeError, AttributeError, KeyError) as e: + logger.error(f"Failed to open restarts from cloud for rundirs {run_dir}: {e}") def _create_train_cols(ds, cols_to_keep=RESTART_VARS + TARGET_VARS): @@ -275,7 +275,7 @@ def _merge_hires_data(ds_run, diag_c48_path): try: init_times = ds_run[INIT_TIME_DIM].values full_zarr_path = os.path.join(diag_c48_path, COARSENED_DIAGS_ZARR_NAME) - diags_c48 = helpers.load_prog_diag(full_zarr_path, init_times)[ + diags_c48 = helpers.load_hires_prog_diag(full_zarr_path, init_times)[ list(RENAMED_PROG_DIAG_VARS.keys()) ] features_diags_c48 = diags_c48.rename(RENAMED_PROG_DIAG_VARS) diff --git a/fv3net/regression/sklearn/test.py b/fv3net/regression/sklearn/test.py index 96605dd7fa..fccbf99653 100644 --- a/fv3net/regression/sklearn/test.py +++ b/fv3net/regression/sklearn/test.py @@ -1,8 +1,10 @@ from vcm.cloud import fsspec import joblib import xarray as xr +import os from ..dataset_handler import stack_and_drop_nan_samples +from .train import MODEL_FILENAME from vcm.convenience import round_time from vcm.cubedsphere.constants import INIT_TIME_DIM from fv3net.pipelines.create_training_data import ( @@ -82,7 +84,7 @@ def load_model(model_path): protocol = fsspec.get_protocol(model_path) if protocol == "gs": fs = fsspec.get_fs(model_path) - fs.get(model_path, "temp_model.pkl") + fs.get(os.path.join(model_path, MODEL_FILENAME), "temp_model.pkl") return joblib.load("temp_model.pkl") else: return joblib.load(model_path) diff --git a/fv3net/regression/sklearn/train.py b/fv3net/regression/sklearn/train.py index bb714cbb84..550a10b38f 100644 --- a/fv3net/regression/sklearn/train.py +++ b/fv3net/regression/sklearn/train.py @@ -160,6 +160,7 @@ def save_output(output_url, model, config): "remove local copy after upload.", ) args = parser.parse_args() + args.train_data_path = os.path.join(args.train_data_path, "train") train_config = load_model_training_config( args.train_config_file, args.train_data_path ) diff --git a/workflows/coarsen_c384_diagnostics/README.md b/workflows/coarsen_c384_diagnostics/README.md index 8e98ba2393..abdb6968e5 100644 --- a/workflows/coarsen_c384_diagnostics/README.md +++ b/workflows/coarsen_c384_diagnostics/README.md @@ -1,7 +1,7 @@ #### Usage Run `workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.sh` -` {INPUT_LOCATION} {OUTPUT_LOCATION} {CONFIG_LOCATION}` +` {INPUT_LOCATION} {CONFIG_LOCATION} {OUTPUT_LOCATION}` #### Description This script coarsens a subset of diagnostic variables from the high resolution runs diff --git a/workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.py b/workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.py index d18c60163c..ae8793425f 100644 --- a/workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.py +++ b/workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.py @@ -3,7 +3,6 @@ import shutil import argparse import yaml -import fsspec import xarray as xr from vcm import coarsen @@ -18,7 +17,7 @@ VAR_LON_OUTER, VAR_LAT_OUTER, ) -from vcm.fv3_restarts import _split_url +from vcm.cloud.fsspec import get_fs from fv3net import COARSENED_DIAGS_ZARR_NAME logging.basicConfig(level=logging.INFO) @@ -79,8 +78,7 @@ def _get_config(config_path): def _get_remote_diags(diags_path): - proto, path = _split_url(diags_path) - fs = fsspec.filesystem(proto) + fs = get_fs(diags_path) mapper = fs.get_mapper(diags_path) return xr.open_zarr(mapper) @@ -90,13 +88,13 @@ def _get_remote_diags(diags_path): parser.add_argument( "input_path", type=str, help="GCS location of C384 diagnostics data zarrs." ) + parser.add_argument( + "config_path", type=str, help="Location of diagnostics coarsening config yaml." + ) parser.add_argument( "output_path", type=str, help="GCS location where= coarsened diagnostics zarrs will be written.", ) - parser.add_argument( - "config_path", type=str, help="Location of diagnostics coarsening config yaml." - ) args = parser.parse_args() coarsen_c384_diagnostics(args) diff --git a/workflows/coarsen_restarts/README.md b/workflows/coarsen_restarts/README.md index d13857789a..34386cc8a2 100644 --- a/workflows/coarsen_restarts/README.md +++ b/workflows/coarsen_restarts/README.md @@ -8,26 +8,20 @@ using pressure-level coarsening defined in `vcm.coarsen`. ```python fv3net.pipelines.coarsen_restarts -usage: __main__.py [-h] --gcs-src-dir GCS_SRC_DIR [--gcs-dst-dir GCS_DST_DIR] - --gcs-grid-spec-path GCS_GRID_SPEC_PATH --source-resolution - SOURCE_RESOLUTION --target_resolution TARGET_RESOLUTION +usage: __main__.py [-h] GCS_SRC_DIR GCS_GRID_SPEC_PATH SOURCE_RESOLUTION + TARGET_RESOLUTION GCS_DST_DIR -optional arguments: +positional arguments: -h, --help show this help message and exit - --gcs-src-dir GCS_SRC_DIR - Full GCS path to input data for downloading timesteps. - --gcs-dst-dir GCS_DST_DIR - Full GCS path to output coarsened timestep data. - Defaults to input pathwith target resolution appended - as a directory - --gcs-grid-spec-path GCS_GRID_SPEC_PATH - Full path with file wildcard 'grid_spec.tile*.nc' to + GCS_SRC_DIR Full GCS path to input data for downloading timesteps + GCS_GRID_SPEC_PATH Full path with file wildcard 'grid_spec.tile*.nc' to select grid spec files with same resolution as the source data - --source-resolution SOURCE_RESOLUTION - Source data cubed-sphere grid resolution. - --target_resolution TARGET_RESOLUTION - Target coarsening resolution to output + SOURCE_RESOLUTION Source data cubed-sphere grid resolution. + TARGET_RESOLUTION Target coarsening resolution to output + GCS_DST_DIR Full GCS path to output coarsened timestep data. + Defaults to input pathwith target resolution appended + as a directory ``` See `workflows/coarsen_restarts/submit_job.sh` to see an example of calling this diff --git a/workflows/coarsen_restarts/orchestrator_job.sh b/workflows/coarsen_restarts/orchestrator_job.sh deleted file mode 100755 index a31d833438..0000000000 --- a/workflows/coarsen_restarts/orchestrator_job.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash - -INPUT_PATH=$1 -GCS_GRIDSPEC_PATH=$2 -OUTPUT_PATH=$3 -SRC_RESOLUTION=$4 -TARGET_RESOLUTION=$5 - -user=$(whoami) -user=${user,,} - -python -m fv3net.pipelines.coarsen_restarts\ - $INPUT_PATH \ - $OUTPUT_PATH \ - $GCS_GRIDSPEC_PATH \ - $SRC_RESOLUTION \ - $TARGET_RESOLUTION \ - --no-target-subdir \ - --runner DataflowRunner \ - --job_name coarsen-restarts-$user \ - --project vcm-ml \ - --region us-central1 \ - --temp_location gs://vcm-ml-data/tmp_dataflow \ - --num_workers 3 \ - --max_num_workers 50 \ - --disk_size_gb 50 \ - --worker_machine_type n1-highmem-4 \ - --setup_file ./setup.py \ - --extra_package external/vcm/dist/vcm-0.1.0.tar.gz \ - --extra_package external/vcm/external/mappm/dist/mappm-0.0.0.tar.gz \ No newline at end of file diff --git a/workflows/coarsen_restarts/orchestrator_job_directrunner.sh b/workflows/coarsen_restarts/orchestrator_job_directrunner.sh deleted file mode 100755 index ccf32a4842..0000000000 --- a/workflows/coarsen_restarts/orchestrator_job_directrunner.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash - -INPUT_PATH=$1 -GRIDSPEC_PATH=$2 -OUTPUT_PATH=$3 -SRC_RESOLUTION=$4 -TARGET_RESOLUTION=$5 - -python -m fv3net.pipelines.coarsen_restarts\ - $INPUT_PATH \ - $OUTPUT_PATH \ - $GRIDSPEC_PATH \ - $SRC_RESOLUTION \ - $TARGET_RESOLUTION \ - --no-target-subdir \ - --runner DirectRunner \ No newline at end of file diff --git a/workflows/coarsen_restarts/submit_job.sh b/workflows/coarsen_restarts/submit_job.sh index 26f5d1cfd3..515f98152f 100755 --- a/workflows/coarsen_restarts/submit_job.sh +++ b/workflows/coarsen_restarts/submit_job.sh @@ -1,20 +1,20 @@ #!/bin/sh GCS_SRC="gs://vcm-ml-data/2019-12-02-40-day-X-SHiELD-simulation-C384-restart-files" -GCS_DST="gs://vcm-ml-data/2020-01-16-X-SHiELD-2019-12-02-pressure-coarsened-rundirs/restarts" GCS_GRIDSPEC="gs://vcm-ml-data/2020-01-06-C384-grid-spec-with-area-dx-dy" SRC_RESOLUTION=384 TARGET_RESOLUTION=48 +GCS_DST="gs://vcm-ml-data/2020-01-16-X-SHiELD-2019-12-02-pressure-coarsened-rundirs/restarts" user=$(whoami) user=${user,,} python -m fv3net.pipelines.coarsen_restarts\ $GCS_SRC \ - $GCS_DST \ $GCS_GRIDSPEC \ $SRC_RESOLUTION \ $TARGET_RESOLUTION \ + $GCS_DST \ --runner DataflowRunner \ --job_name coarsen-restarts-$user \ --project vcm-ml \ diff --git a/workflows/coarsen_restarts/submit_job_directrunner.sh b/workflows/coarsen_restarts/submit_job_directrunner.sh index 9eb0f9b7f8..1e0bc9f6dc 100755 --- a/workflows/coarsen_restarts/submit_job_directrunner.sh +++ b/workflows/coarsen_restarts/submit_job_directrunner.sh @@ -1,15 +1,15 @@ #!/bin/sh GCS_SRC="gs://vcm-ml-data/2019-12-02-40-day-X-SHiELD-simulation-C384-restart-files" -GCS_DST="gs://vcm-ml-data/2020-01-16-X-SHiELD-2019-12-02-pressure-coarsened-rundirs/restarts" GCS_GRIDSPEC="gs://vcm-ml-data/2020-01-06-C384-grid-spec-with-area-dx-dy" SRC_RESOLUTION=384 TARGET_RESOLUTION=48 +GCS_DST="gs://vcm-ml-data/2020-01-16-X-SHiELD-2019-12-02-pressure-coarsened-rundirs/restarts" python -m fv3net.pipelines.coarsen_restarts\ $GCS_SRC \ - $GCS_DST \ $GCS_GRIDSPEC \ $SRC_RESOLUTION \ $TARGET_RESOLUTION \ + $GCS_DST \ --runner DirectRunner \ No newline at end of file diff --git a/workflows/create_training_data/orchestrator_job.sh b/workflows/create_training_data/orchestrator_job.sh deleted file mode 100755 index 80bc09c8dd..0000000000 --- a/workflows/create_training_data/orchestrator_job.sh +++ /dev/null @@ -1,22 +0,0 @@ -DATA_PATH=$1 -DIAG_PATH=$2 -OUTPUT_PATH=$3 - -user=$(whoami) -user=${user,,} - -python -m fv3net.pipelines.create_training_data \ -${DATA_PATH} \ -${DIAG_PATH} \ -${OUTPUT_PATH} \ ---job_name create-training-data-${user} \ ---project vcm-ml \ ---region us-central1 \ ---runner DataflowRunner \ ---temp_location gs://vcm-ml-data/tmp_dataflow \ ---num_workers 4 \ ---max_num_workers 30 \ ---disk_size_gb 30 \ ---worker_machine_type n1-standard-1 \ ---setup_file ./setup.py \ ---extra_package external/vcm/dist/vcm-0.1.0.tar.gz diff --git a/workflows/create_training_data/orchestrator_job_directrunner_test.sh b/workflows/create_training_data/orchestrator_job_directrunner_test.sh deleted file mode 100755 index f90b6dcea6..0000000000 --- a/workflows/create_training_data/orchestrator_job_directrunner_test.sh +++ /dev/null @@ -1,13 +0,0 @@ -DATA_PATH=$1 -DIAG_PATH=$2 -OUTPUT_PATH=$3 -TIMESTEPS_PER_OUTPUT=$4 -TRAIN_FRACTION=$5 - -python -m fv3net.pipelines.create_training_data \ -${DATA_PATH} \ -${DIAG_PATH} \ -${OUTPUT_PATH} \ ---timesteps-per-output-file ${TIMESTEPS_PER_OUTPUT} \ ---train-fraction ${TRAIN_FRACTION} \ ---runner DirectRunner \ No newline at end of file diff --git a/workflows/create_training_data/submit_job.sh b/workflows/create_training_data/submit_job.sh index 4eeae3e93e..b2e794de7e 100755 --- a/workflows/create_training_data/submit_job.sh +++ b/workflows/create_training_data/submit_job.sh @@ -1,8 +1,8 @@ python -m fv3net.pipelines.create_training_data \ -gs://vcm-ml-data/2020-02-28-X-SHiELD-2019-12-02-deep-and-mp-off \ -gs://vcm-ml-data/orchestration-testing/shield-coarsened-diags-2019-12-04 \ -gs://vcm-ml-data/experiments-2020-03/deep-conv-mp-off/train-test-data \ ---job_name test-job-create-training-data-annak \ +gs://vcm-ml-data/2020-01-16-X-SHiELD-2019-12-02-pressure-coarsened-rundirs/one_step_output/C48 \ +gs://vcm-ml-data/2019-12-05-40-day-X-SHiELD-simulation-C384-diagnostics/C48_gfsphysics_15min_coarse.zarr \ +gs://vcm-ml-data/test-annak/2020-02-05_train_data_pipeline/ \ +--job_name test-job-create-training-data-brianh \ --project vcm-ml \ --region us-central1 \ --runner DataflowRunner \ diff --git a/workflows/end_to_end/README.md b/workflows/end_to_end/README.md index 9282442f24..068a8e8026 100644 --- a/workflows/end_to_end/README.md +++ b/workflows/end_to_end/README.md @@ -1,6 +1,6 @@ # End to End Workflow -February 2020 +Updated March 2020 This workflow serves to orchestrate the current set of VCM-ML workflows into one, allowing for going from a completed SHiELD run to a prognostic coarse run with ML @@ -15,6 +15,7 @@ The follwing steps are currently integrated into the workflow for use in experim - training an sklearn model - testing an sklearn model - a prognostic run of the coarse model using the trained sklearn parameterization +- generation of diagnostics for the prognostic run The workflow starting point is flexible, i.e., with any of the steps above, as is its endpoint. If starting at a SHiELD run coarsened to C384, it is required that @@ -22,6 +23,11 @@ the SHiELD C384 restart files and diagnostics are available (locally or remotely If starting at a later point, it is assumed that the outputs of all previous steps are available. +The workflow syntax, specified in the yaml, is designed to be flexible enough to allow +any set of positional and optional arguments to be sent to a `python argparse` script +interface. Thus, intermediate shell scripts to launch steps are unnecessary and +discouraged to reduce workflow maintenance. + ## Usage @@ -55,24 +61,27 @@ experiment: - train_sklearn_model - test_sklearn_model - prognostic_run + - diags_prognostic_run steps_config: coarsen_restarts: - command: workflows/coarsen_restarts/orchestrator_job.sh - inputs: + command: python -m fv3net.pipelines.coarsen_restarts + args: data_to_coarsen: location: gs://vcm-ml-data/orchestration-testing/shield-C384-restarts-2019-12-04 - extra_args: + grid_spec: + location: gs://vcm-ml-data/2020-01-06-C384-grid-spec-with-area-dx-dy source-resolution: 384 target-resolution: 48 + --runner: DataflowRunner one_step_run: - command: python ./workflows/one_step_jobs/orchestrate_submit_jobs.py - inputs: + command: python workflows/one_step_jobs/orchestrate_submit_jobs.py + args: restart_data: - from: coarsen_restarts - extra_args: - experiment_yaml: one_step_yaml - experiment_label: test-orchestration-group + from: coarsen_restart + experiment_yaml: workflows/one_step_jobs/all-physics-off.yml + docker_image: us.gcr.io/vcm-ml/prognostic-run-orchestration --n-steps: 50 + ... ``` #### Top-level arguments: @@ -88,14 +97,17 @@ experiment: #### Step level arguments: -For each step in `steps_to_run`, its configuration is set by its equivalently named block in `step_config`. Step configuration can still be defined here but it will not be executed if it is excluded from `steps_to_run`. *Note:* the order of parameters correspond to the positional commandline arguments appended to the job `command`. Parameters are translated to arguments in the order of _inputs_, _output_, _methodargs_. +For each step in `steps_to_run`, its configuration is set by its equivalently named block in `step_config`. Step configuration can still be defined here but it will not be executed if it is excluded from `steps_to_run`. *Note:* the order of parameters correspond to the positional commandline arguments appended to the job `command`. Parameters are translated to arguments in the order of _args_, _output_. - **command**: command to execute the step, e.g., a python or bash command and script that executes the step (without its arguments) -- **inputs**: a list of required input data sources from either previous steps or pre-existing sources; for each input data type, _one_ (but not both) of the following must be specified: - - **from**: name of the previous step which produces the input data for this step, e.g., `coarsen_restarts` is an input to `one_step_run`. _Note:_ The step referenced in `from` does not have to be included in the `steps_to_run`. Whatever the output location is from that step will be used. However, to make things explicit you should specify the input using `location` instead of `from`. - - **location**: explicit path to required input data for this step, which has been generated prior to the current experiment +- **args**: a dict of positional and optional argument key-value pairs for the _command_ above. Arguments are used to specify step input data, configuration files, and parameters. Values may be literals, names of previous steps which to use output as input for the current step, paths to pre-existing data sources, or lists of multiple values for a given key; each value in the args dict must be one of the following types: + - **value/hashable**: A single argument value + - **dict**: if the arg value is a dict, it is for specifying the source of input data to the step. Supported keys in the dict are as follows (only one may be used for a given step argument): + - **from**: name of the previous step which produces the input data for this step, e.g., `coarsen_restarts` is an input to `one_step_run`. Whatever the output location is from that step will be used. The previous step must be run as part of the experiment. + - **location**: explicit path to required input data for this step, which has been generated prior to the current experiment. + - **list**: multiple values to be passed to the command under the same argument name. This is useful for the `--extra-packages` argument in dataflow jobs which may need to be called on multiple packages. + Argument keys may begin with `--` to denote an optional argument. Optional arguments will be appended to the end of the command as --{param_keyname} {param_value}. Arguments without the leading `--` are treated as positional arguments. - **output_location**: (Optional) explicit path to store output from this step. This parameter is autogenerated if not provided and is used as the source for any input `from` refs. -- **extra_args**: extra key-value pair paremeters required for the workflow step. These are commonly used to reference step-specific configuration files or non-data related step parameters; each value is provided as a command-line argument appended to `command` after I/O arguments. You can also specify python optional command-line arguments here by prefixing the optional variable with "--". These optional arguments will be appended to the end of the command as --{param_keyname} {param_value}. ### Data output locations @@ -107,10 +119,15 @@ If no `output_location` is specified for a step, it will be output via the foll where `experiment_name` is the name plus the UUID (if added), and the `step_name` is defined as the name of the workflow step with the first 3 extra_args key/values appended to it. +### The dataflow `--runner` argument + +For steps using Apache Beam and Google Dataflow, the `--runner` optional argument can be passed. This is a reserved argument key that accepts only two values: `DirectRunner` and `DataflowRunner`. If `DirectRunner`is used, this is passed as the `--runner` value to launch a local job. If `DataflowRunner` is passed, a set of Dataflow arguments are appended to the job submission script to enable launching Dataflow jobs on GCS. Those arguments are stored for specific steps which require them in `workflows/end_to_end/dataflow.py`. + + ## Creating new workflow steps To write a new step in the workflow, create a CLI for the step that follows the following format: -```{COMMAND} {INPUT_1} {INPUT_2} ... {OUTPUT_PATH} {EXTRA_ARG_1} {EXTRA_ARG_2} {EXTRA_ARG_3} ... {--OPTIONAL_ARG} {OPTIONAL_ARG_VALUE} ...``` +```{COMMAND} {POSITIONAL_ARG1} {POSITIONAL_ARG2} ... {POSITIONAL_ARGN} {OUTPUT_PATH} {--OPTIONAL_ARG} {OPTIONAL_ARG_VALUE} ...``` -then add the step to the config YAML file, in both the `steps_to_run` list and the `steps_config` dict. At a minimum, the `command` and `inputs` values must be specified for the step configuration. Additionally, the step must be listed in `steps_to_run` in the order in which it is necessary, i.e., after any steps upon which it depends for input, and before any steps that depend on it for output. \ No newline at end of file +then add the step to the config YAML file, in both the `steps_to_run` list and the `steps_config` dict. At a minimum, the `command` and `args` values must be specified for the step configuration. Additionally, the step must be listed in `steps_to_run` in the order in which it is necessary, i.e., after any steps upon which it depends for input, and before any steps that depend on it for output. \ No newline at end of file diff --git a/workflows/end_to_end/dataflow.py b/workflows/end_to_end/dataflow.py new file mode 100644 index 0000000000..4ab0c94eef --- /dev/null +++ b/workflows/end_to_end/dataflow.py @@ -0,0 +1,35 @@ +from getpass import getuser +from fv3net.pipelines.common import get_alphanumeric_unique_tag + +COARSEN_RESTARTS_DATAFLOW_ARGS = { + "--job_name": ( + f"coarsen-restarts-{getuser().lower()}-{get_alphanumeric_unique_tag(7)}" + ), + "--project": "vcm-ml", + "--region": "us-central1", + "--temp_location": "gs://vcm-ml-data/tmp_dataflow", + "--num_workers": 3, + "--max_num_workers": 50, + "--disk_size_gb": 50, + "--worker_machine_type": "n1-highmem-4", + "--setup_file": "./setup.py", + "--extra_package": [ + "external/vcm/dist/vcm-0.1.0.tar.gz", + "external/vcm/external/mappm/dist/mappm-0.0.0.tar.gz", + ], +} + +CREATE_TRAINING_DATAFLOW_ARGS = COARSEN_RESTARTS_DATAFLOW_ARGS.copy() +CREATE_TRAINING_DATAFLOW_ARGS.update( + { + "--job_name": ( + f"create-training-data-{getuser().lower()}-" + f"{get_alphanumeric_unique_tag(7)}" + ), + "--num_workers": 4, + "--max_num_workers": 30, + "--disk_size_gb": 30, + "--worker_machine_type": "n1-standard-1", + "--extra_package": "external/vcm/dist/vcm-0.1.0.tar.gz", + } +) diff --git a/workflows/end_to_end/example-workflow-config.yaml b/workflows/end_to_end/full-workflow-config.yaml similarity index 65% rename from workflows/end_to_end/example-workflow-config.yaml rename to workflows/end_to_end/full-workflow-config.yaml index 1e899e5f26..b0ed9717b8 100644 --- a/workflows/end_to_end/example-workflow-config.yaml +++ b/workflows/end_to_end/full-workflow-config.yaml @@ -5,83 +5,83 @@ experiment: unique_id: True steps_to_run: - coarsen_restarts - - coarsen_diagnostics +# - coarsen_diagnostics - one_step_run - create_training_data - train_sklearn_model -# - test_sklearn_model + - test_sklearn_model - prognostic_run - - diags_prognostic_run +# - diags_prognostic_run steps_config: coarsen_restarts: - command: workflows/coarsen_restarts/orchestrator_job.sh - inputs: + command: python -m fv3net.pipelines.coarsen_restarts + args: data_to_coarsen: location: gs://vcm-ml-data/orchestration-testing/shield-C384-restarts-2019-12-04 grid_spec: location: gs://vcm-ml-data/2020-01-06-C384-grid-spec-with-area-dx-dy - extra_args: source-resolution: 384 target-resolution: 48 + --runner: DataflowRunner coarsen_diagnostics: command: python ./workflows/coarsen_c384_diagnostics/coarsen_c384_diagnostics.py - inputs: + args: c384_diagnostics: location: gs://vcm-ml-data/2019-12-05-40-day-X-SHiELD-simulation-C384-diagnostics/gfsphysics_15min_coarse.zarr - extra_args: coarsening_config: workflows/coarsen_c384_diagnostics/coarsen-c384-diagnostics.yml one_step_run: command: python workflows/one_step_jobs/orchestrate_submit_jobs.py - inputs: + args: restart_data: from: coarsen_restarts - extra_args: experiment_yaml: ./workflows/one_step_jobs/all-physics-off.yml docker_image: us.gcr.io/vcm-ml/prognostic-run-orchestration create_training_data: - command: workflows/create_training_data/orchestrator_job.sh - inputs: + command: python -m fv3net.pipelines.create_training_data + args: one_step_data: from: one_step_run diagnostics_data: - from: coarsen_diagnostics - extra_args: - timesteps-per-output-file: 2 - mask-to-surface-type: - train-fraction: 0.5 + location: gs://vcm-ml-data/orchestration-testing/shield-coarsened-diags-2019-12-04 + --train-fraction: 0.5 + --runner: DataflowRunner train_sklearn_model: - command: workflows/sklearn_regression/orchestrator_train_sklearn.sh - inputs: + command: python -m fv3net.regression.sklearn.train + args: training_data: from: create_training_data - extra_args: train-config-file: ./workflows/sklearn_regression/example_base_rf_training_config.yml test_sklearn_model: - command: workflows/sklearn_regression/orchestrator_test_sklearn.sh - inputs: + command: python -m fv3net.diagnostics.sklearn_model_performance + args: trained_model: from: train_sklearn_model testing_data: from: create_training_data diagnostics_data: - from: coarsen_diagnostics - extra_args: - num-test-zarrs: 4 + location: gs://vcm-ml-data/orchestration-testing/shield-coarsened-diags-2019-12-04 + --num_test_zarrs: 4 prognostic_run: command: python workflows/prognostic_c48_run/orchestrate_submit_job.py - inputs: - sklearn_model: - from: train_sklearn_model + args: restart_file_dir: - from: one_step_run - extra_args: - prognostic_yaml_adjust: workflows/prognostic_c48_run/prognostic_config.yml + from: one_step_run ic_timestep: "20160801.001500" docker_image: us.gcr.io/vcm-ml/prognostic_run:v0.1.0 + --model_url: + from: train_sklearn_model + --prog_config_yml: workflows/prognostic_c48_run/prognostic_config.yml + + + diags_prognostic_run: + command: bash workflows/prognostic_run_diags/run_all.sh + args: + fv3_output: + from: prognostic_run diff --git a/workflows/end_to_end/get_experiment_steps_and_args.py b/workflows/end_to_end/get_experiment_steps_and_args.py index 0859fcb143..d47ed9b2a2 100644 --- a/workflows/end_to_end/get_experiment_steps_and_args.py +++ b/workflows/end_to_end/get_experiment_steps_and_args.py @@ -3,7 +3,13 @@ import json import os import uuid -from typing import List, Mapping +from typing import List, Mapping, Any, Hashable +from dataflow import COARSEN_RESTARTS_DATAFLOW_ARGS, CREATE_TRAINING_DATAFLOW_ARGS + +DATAFLOW_ARGS_MAPPING = { + "coarsen_restarts": COARSEN_RESTARTS_DATAFLOW_ARGS, + "create_training_data": CREATE_TRAINING_DATAFLOW_ARGS, +} def get_experiment_steps_and_args(config_file: str): @@ -16,12 +22,18 @@ def get_experiment_steps_and_args(config_file: str): config = yaml.safe_load(f) # Resolve inputs, outputs, and other config parameters + workflow_steps = config["experiment"]["steps_to_run"] + if any( + [step not in config["experiment"]["steps_config"] for step in workflow_steps] + ): + raise KeyError( + "'steps_to_run' list contains a step not defined in 'steps_config'." + ) _apply_config_transforms(config) - workflow_steps_config = config["experiment"]["steps_to_run"] - all_step_arguments = _get_all_step_arguments(workflow_steps_config, config) + all_step_arguments = _get_all_step_arguments(config) experiment_steps_and_args = { "name": config["experiment"]["name"], - "workflow": " ".join([step for step in workflow_steps_config]), + "workflow": " ".join([step for step in workflow_steps]), "command_and_args": all_step_arguments, } return json.dumps(experiment_steps_and_args) @@ -32,10 +44,10 @@ def _apply_config_transforms(config: Mapping): Transforms to apply to the configuration dictionary. All transforms are assumed to be in-place. """ - _add_unique_id(config) _resolve_output_location(config) _resolve_input_from(config) + _resolve_dataflow_args(config) def _add_unique_id(config: Mapping): @@ -53,9 +65,9 @@ def _add_unique_id(config: Mapping): def _resolve_output_location(config: Mapping): """Get the step output location if one is not specified""" root_exp_path = _get_experiment_path(config) - all_steps_config = config["experiment"]["steps_config"] + steps_config = config["experiment"]["steps_config"] - for step_name, step_config in all_steps_config.items(): + for step_name, step_config in steps_config.items(): if "output_location" in step_config: continue @@ -71,29 +83,44 @@ def _resolve_input_from(config: Mapping): steps if the "from" keyword is used along with a step name. """ - all_steps_config = config["experiment"]["steps_config"] + steps_config = config["experiment"]["steps_config"] - for step_name, step_config in all_steps_config.items(): - input_config = step_config["inputs"] + for step_name, step_config in steps_config.items(): + args_config = step_config["args"] - for input_source, source_info in input_config.items(): - location = source_info.get("location", None) - from_key = source_info.get("from", None) + for arg, val in args_config.items(): + if isinstance(val, Mapping): + _resolve_input_mapping(val, steps_config, arg) - if location is not None and from_key is not None: - raise ValueError( - f"Ambiguous input location for {step_name}-{input_source}." - f" Both 'from' and 'location' were specified" - ) - if location is not None: - continue - elif from_key is not None: - source_info["location"] = all_steps_config[from_key]["output_location"] - else: - raise KeyError( - f"Input section of {step_name} should have either 'location' " - "or 'from' specified in the orchestration configuration" - ) + +def _resolve_input_mapping(input_mapping: Mapping, steps_config: Mapping, arg: str): + + location = input_mapping.get("location", None) + from_key = input_mapping.get("from", None) + + if location is not None and from_key is not None: + raise ValueError( + f"Ambiguous input location for {arg}." + f" Both 'from' and 'location' were specified" + ) + if location is not None: + return + elif from_key is not None: + previous_step = steps_config.get(from_key, None) + if previous_step is not None: + input_mapping["location"] = previous_step["output_location"] + else: + raise KeyError( + f"A step argument specified 'from' another step requires " + f"that the other step's cofiguration be specified. Add " + f"'{from_key}' to the configuration or specify '{arg}' " + f"with 'location' instead." + ) + else: + raise KeyError( + f"{arg} is provided as a key-value pair," + f" but only 'location' or 'from' may be specified." + ) def _get_experiment_path(config: Mapping): @@ -117,23 +144,43 @@ def _get_experiment_path(config: Mapping): return f"{proto}://{root}/{experiment_name}" -def _get_all_step_arguments(workflow_steps: List[str], config: Mapping): +def _resolve_dataflow_args(config: Mapping): + """Add dataflow arguments to step if it is the job runner""" + + steps_config = config["experiment"]["steps_config"] + for step, step_config in steps_config.items(): + dataflow_arg = step_config["args"].get("--runner", None) + if dataflow_arg == "DataflowRunner": + step_config["args"].update(DATAFLOW_ARGS_MAPPING[step]) + elif dataflow_arg == "DirectRunner": + continue + elif dataflow_arg is not None: + raise ValueError( + f"'runner' arg must be 'DataflowRunner' or 'DirectRunner'; " + f"instead received '{dataflow_arg}'." + ) + + +def _get_all_step_arguments(config: Mapping): """Get a dictionary of each step with i/o and methedological arguments""" steps_config = config["experiment"]["steps_config"] all_step_arguments = {} - for i, step in enumerate(workflow_steps): - curr_config = steps_config[step] - all_input_locations = [ - input_info["location"] for input_info in curr_config["inputs"].values() - ] - output_location = curr_config["output_location"] - command = curr_config["command"] - extra_args = _generate_args(curr_config) - - input_args = " ".join(all_input_locations) - step_args = " ".join([command, input_args, output_location, extra_args]) - all_step_arguments[step] = step_args + for step, step_config in steps_config.items(): + step_args = [step_config["command"]] + required_args = [] + optional_args = [] + for key, value in step_config["args"].items(): + arg_string = _resolve_arg_values(key, value) + if arg_string.startswith("--"): + optional_args.append(arg_string) + else: + required_args.append(arg_string) + output_location = step_config["output_location"] + step_args.extend(required_args) + step_args.append(output_location) + step_args.extend(optional_args) + all_step_arguments[step] = " ".join(step_args) return all_step_arguments @@ -144,48 +191,55 @@ def _generate_output_path_from_config( """generate an output location stub from a step's argument configuration""" output_str = step_name - arg_config = step_config.get("extra_args", None) - if arg_config is not None: - arg_strs = [] - for i, (key, val) in enumerate(arg_config.items()): - if i >= max_config_stubs: - break - val = str(val) - - # get last part of path so string isn't so long - if "/" in val: - val = val.split("/")[-1] - - key = key.strip("--") # remove prefix of optional argument - key_val = f"{key}_{val}" - arg_strs.append(key_val) - arg_output_stub = "_".join(arg_strs) - output_str += "_" + arg_output_stub + arg_config = step_config.get("args", None) + arg_strs = [] + non_map_args = { + key: val for key, val in arg_config.items() if not isinstance(val, Mapping) + } + for n_stubs, (key, val) in enumerate(non_map_args.items(), 1): + if n_stubs > max_config_stubs: + break + val = str(arg_config[key]) + + # get last part of path so string isn't so long + if "/" in val: + val = val.split("/")[-1] + + key = key.strip("--") # remove prefix of optional argument + key_val = f"{key}_{val}" + arg_strs.append(key_val) + arg_output_stub = "_".join(arg_strs) + output_str += "_" + arg_output_stub return output_str -def _generate_args(step_config: Mapping): +def _resolve_arg_values(key: Hashable, value: Any) -> Hashable: + """take a step args key-value pair and process into an appropriate arg string" """ - Generate the arguments for the step as positional arguments - in a string followed by optional arguments. - """ - arg_config = step_config.get("extra_args", None) - - if arg_config is not None: - optional_args = [] - required_args = [] - for arg_key, arg_value in arg_config.items(): - if arg_key[:2] == "--": - optional_args += [arg_key, str(arg_value)] + if isinstance(value, Mapping): + # case for when the arg is a dict {"location" : path} + location_value = value.get("location", None) + if location_value is None: + raise ValueError("Argument 'location' value not specified.") + else: + if key.startswith("--"): + arg_values = " ".join([key, str(location_value)]) else: - required_args.append(str(arg_value)) - - combined_args = " ".join(required_args + optional_args) + arg_values = str(location_value) + elif isinstance(value, List): + # case for when the arg is a list + # i.e., multiple optional args with same key, needed for dataflow packages + multiple_optional_args = [] + for item in value: + multiple_optional_args.extend([key, item]) + arg_values = " ".join(multiple_optional_args) else: - combined_args = "" - - return combined_args + if key.startswith("--"): + arg_values = " ".join([key, str(value)]) + else: + arg_values = str(value) + return arg_values if __name__ == "__main__": diff --git a/workflows/one_step_jobs/README.md b/workflows/one_step_jobs/README.md index 8c48731377..8cef622d9f 100644 --- a/workflows/one_step_jobs/README.md +++ b/workflows/one_step_jobs/README.md @@ -13,18 +13,15 @@ have a total duration of 15 minutes. Workflow call signature: ``` $ python submit_jobs.py -h -usage: submit_jobs.py [-h] --one-step-yaml ONE_STEP_YAML --input-url INPUT_URL - --output-url OUTPUT_URL [--n-steps N_STEPS] [-o] +usage: submit_jobs.py [-h] INPUT_URL ONE_STEP_YAML OUTPUT_URL [--n-steps N_STEPS] [-o] -h, --help show this help message and exit - --one-step-yaml ONE_STEP_YAML - Path to local run configuration yaml. - --input-url INPUT_URL - Remote url to initial conditions. Initial conditions + INPUT_URL Remote url to initial conditions. Initial conditions are assumed to be stored as INPUT_URL/{timestamp}/{tim estamp}.{restart_category}.tile*.nc - --output-url OUTPUT_URL - Remote url where model configuration and output will + ONE_STEP_YAML Path to local run configuration yaml. + DOCKER_IMAGE fv3gfs-python model docker image. + OUTPUT_URL Remote url where model configuration and output will be saved. Specifically, configuration files will be saved to OUTPUT_URL/one_step_config and model output to OUTPUT_URL/one_step_output diff --git a/workflows/one_step_jobs/all-physics-on.yml b/workflows/one_step_jobs/all-physics-on.yml new file mode 100644 index 0000000000..ebe24267b0 --- /dev/null +++ b/workflows/one_step_jobs/all-physics-on.yml @@ -0,0 +1,22 @@ +kubernetes: + docker_image: us.gcr.io/vcm-ml/fv3gfs-python:v0.2.1 +fv3config: + diag_table: workflows/one_step_jobs/diag_table_one_step + namelist: + atmos_model_nml: + fhout: 0.01666 + coupler_nml: + days: 0 + minutes: 15 + seconds: 0 + dt_atmos: 60 # seconds + dt_ocean: 60 # seconds + restart_secs: 60 + fv_core_nml: + external_eta: true + npz: 79 + k_split: 1 + n_split: 1 + gfs_physics_nml: + do_deep: true + fhzero: 0.01666 diff --git a/workflows/one_step_jobs/orchestrate_submit_jobs.py b/workflows/one_step_jobs/orchestrate_submit_jobs.py index a4fc482b9f..421ea4b665 100644 --- a/workflows/one_step_jobs/orchestrate_submit_jobs.py +++ b/workflows/one_step_jobs/orchestrate_submit_jobs.py @@ -20,9 +20,6 @@ def _create_arg_parser(): help="Remote url to initial conditions. Initial conditions are assumed to be " "stored as INPUT_URL/{timestamp}/{timestamp}.{restart_category}.tile*.nc", ) - parser.add_argument( - "output_url", type=str, help="Remote url where model output will be saved." - ) parser.add_argument( "one_step_yaml", type=str, help="Path to local run configuration yaml.", ) @@ -31,6 +28,9 @@ def _create_arg_parser(): type=str, help="Docker image to use for performing the one-step FV3GFS runs.", ) + parser.add_argument( + "output_url", type=str, help="Remote url where model output will be saved." + ) parser.add_argument( "-o", "--overwrite", diff --git a/workflows/prognostic_c48_run/_submit_baseline_jobs.sh b/workflows/prognostic_c48_run/_submit_baseline_jobs.sh new file mode 100644 index 0000000000..6b713956bc --- /dev/null +++ b/workflows/prognostic_c48_run/_submit_baseline_jobs.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -x + +STAMP=$(date +%F)-$(uuid | head -c 6) +output=gs://vcm-ml-data/testing-2020-02 +urls=("gs://vcm-ml-data/2020-03-03-X-SHiELD-2019-12-02-deep-conv-off" "gs://vcm-ml-data/2020-02-26-X-SHiELD-2019-12-02-physics-off") +ic=20160803.061500 +image=us.gcr.io/vcm-ml/prognostic-run-orchestration:fv3py_v2.3-mp-off-switch + +for onestep_url in "${urls[@]}" +do + run=$(basename $onestep_url) + output_url=$output/$run/$STAMP/prognostic_run_baseline + python orchestrate_submit_job.py -d $onestep_url $output_url $ic $image +done \ No newline at end of file diff --git a/workflows/prognostic_c48_run/orchestrate_submit_job.py b/workflows/prognostic_c48_run/orchestrate_submit_job.py index 733431d9d1..61f9ce87f6 100644 --- a/workflows/prognostic_c48_run/orchestrate_submit_job.py +++ b/workflows/prognostic_c48_run/orchestrate_submit_job.py @@ -27,34 +27,44 @@ def _create_arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() - parser.add_argument( - "model_url", type=str, help="Remote url to a trained sklearn model.", - ) parser.add_argument( "initial_condition_url", type=str, help="Remote url to directory holding timesteps with model initial conditions.", ) + parser.add_argument( + "ic_timestep", + type=str, + help="Time step to grab from the initial conditions url.", + ) + parser.add_argument( + "docker_image", + type=str, + help="Docker image to pull for the prognostic run kubernetes pod.", + ) parser.add_argument( "output_url", type=str, help="Remote storage location for prognostic run output.", ) parser.add_argument( - "prog_config_yml", + "--model_url", type=str, - help="Path to a config update YAML file specifying the changes (e.g., " - "diag_table, runtime, ...) from the one-step runs for the prognostic run.", + default=None, + help="Remote url to a trained sklearn model.", ) parser.add_argument( - "ic_timestep", + "--prog_config_yml", type=str, - help="Time step to grab from the initial conditions url.", + default="prognostic_config.yml", + help="Path to a config update YAML file specifying the changes (e.g., " + "diag_table, runtime, ...) from the one-step runs for the prognostic run.", ) parser.add_argument( - "docker_image", - type=str, - help="Docker image to pull for the prognostic run kubernetes pod.", + "-d", + "--detach", + action="store_true", + help="Do not wait for the k8s job to complete.", ) return parser @@ -114,26 +124,26 @@ def _update_with_prognostic_model_config(model_config, prognostic_config): ) # Add prognostic config section - model_config["scikit_learn"] = { - "model": os.path.join(args.model_url, MODEL_FILENAME), - "zarr_output": "diags.zarr", - } + if args.model_url: + model_config["scikit_learn"] = { + "model": os.path.join(args.model_url, MODEL_FILENAME), + "zarr_output": "diags.zarr", + } + kube_opts["runfile"] = kube_jobs.transfer_local_to_remote(RUNFILE, config_dir) # Upload the new prognostic config with fsspec.open(job_config_path, "w") as f: f.write(yaml.dump(model_config)) - remote_runfile_path = kube_jobs.transfer_local_to_remote(RUNFILE, config_dir) - fv3config.run_kubernetes( config_location=job_config_path, outdir=args.output_url, jobname=job_name, docker_image=args.docker_image, - runfile=remote_runfile_path, job_labels=job_label, **kube_opts, ) - successful, _ = kube_jobs.wait_for_complete(job_label) - kube_jobs.delete_job_pods(successful) + if not args.detach: + successful, _ = kube_jobs.wait_for_complete(job_label) + kube_jobs.delete_job_pods(successful) diff --git a/workflows/prognostic_c48_run/prognostic_config.yml b/workflows/prognostic_c48_run/prognostic_config.yml index 8397c2b8df..49d278bfc6 100644 --- a/workflows/prognostic_c48_run/prognostic_config.yml +++ b/workflows/prognostic_c48_run/prognostic_config.yml @@ -16,4 +16,4 @@ namelist: gfs_physics_nml: fhzero: 0.25 # hours - frequency at which precip is set back to zero fv_core_nml: - n_split: 6 # num dynamics steps per physics step \ No newline at end of file + n_split: 6 # num dynamics steps per physics step diff --git a/workflows/sklearn_regression/README.md b/workflows/sklearn_regression/README.md index 452c406b71..9684e68f59 100644 --- a/workflows/sklearn_regression/README.md +++ b/workflows/sklearn_regression/README.md @@ -8,9 +8,9 @@ the trained model output as well as a copy of the model configuration. Example shell script: ``` python -m fv3net.regression.sklearn.train \ - --train-config-file example_rf_training_config.yml \ - --output-dir-suffix sklearn_regression \ - --train-data-path gs://vcm-ml-data/test_annak/2020-02-05_train_data_pipeline/train + gs://vcm-ml-data/test_annak/2020-02-05_train_data_pipeline #input data path where "train" folder is located + example_rf_training_config.yml \ + {output_data_path} \ --delete-local-results-after-upload True ``` The last two arguments are optional and allow the user to save the output directory to @@ -51,8 +51,9 @@ a timestamped output directory. Example shell script: ``` python -m fv3net.regression.model_diagnostics \ - --test-data-path gs://vcm-ml-data/test-annak/2020-02-05_train_data_pipeline/test \ - --model-path 20200205.205016_model_training_files/20200205.205016_sklearn_model.pkl \ - --high-res-data-path gs://vcm-ml-data/2019-12-05-40-day-X-SHiELD-simulation-C384-diagnostics/C48_gfsphysics_15min_coarse.zarr \ + gs://vcm-ml-data/test-annak/2020-02-05_train_data_pipeline \ # location of "test" directory + 20200205.205016_model_training_files \ # location of "sklearn_model.pkl" file + gs://vcm-ml-data/2019-12-05-40-day-X-SHiELD-simulation-C384-diagnostics/C48_gfsphysics_15min_coarse.zarr \ + {output_path} \ --num-test-zarrs 8 ``` \ No newline at end of file diff --git a/workflows/sklearn_regression/orchestrator_test_sklearn.sh b/workflows/sklearn_regression/orchestrator_test_sklearn.sh deleted file mode 100755 index 9512aa034b..0000000000 --- a/workflows/sklearn_regression/orchestrator_test_sklearn.sh +++ /dev/null @@ -1,13 +0,0 @@ -MODEL_PATH=$1 -DATA_PATH=$2 -DIAGS_PATH=$3 -OUTPUT_PATH=$4 -NUM_TEST_ZARRS=$5 - - -python -m fv3net.diagnostics.sklearn_model_performance \ - ${MODEL_PATH}"/sklearn_model.pkl" \ - ${DATA_PATH}"/test" \ - ${DIAGS_PATH} \ - ${OUTPUT_PATH} \ - ${NUM_TEST_ZARRS} diff --git a/workflows/sklearn_regression/orchestrator_train_sklearn.sh b/workflows/sklearn_regression/orchestrator_train_sklearn.sh deleted file mode 100755 index 65254c61b4..0000000000 --- a/workflows/sklearn_regression/orchestrator_train_sklearn.sh +++ /dev/null @@ -1,8 +0,0 @@ -INPUT_DATA_PATH=$1 -OUTPUT_DATA_PATH=$2 -TRAINING_CONFIG_PATH=$3 - -python -m fv3net.regression.sklearn.train \ - ${INPUT_DATA_PATH}"/train" \ - ${TRAINING_CONFIG_PATH} \ - ${OUTPUT_DATA_PATH} diff --git a/workflows/sklearn_regression/test_sklearn.sh b/workflows/sklearn_regression/test_sklearn.sh index 924a3104a2..de58e6b1b0 100755 --- a/workflows/sklearn_regression/test_sklearn.sh +++ b/workflows/sklearn_regression/test_sklearn.sh @@ -1,6 +1,6 @@ python -m fv3net.diagnostics.sklearn_model_performance \ - 20200205.205016_model_training_files/20200205.205016_sklearn_model.pkl \ - gs://vcm-ml-data/test-annak/2020-02-20_train_data_pipeline_downsampled/test \ + gs://vcm-ml-data/orchestration-testing/test-experiment-7021f96d/train_sklearn_model_train-config-file_example_base_rf_training_config.yml \ + gs://vcm-ml-data/orchestration-testing/test-experiment-0ec4a4b1/create_training_data_train-fraction_0.5_runner_DataflowRunner \ gs://vcm-ml-data/2019-12-05-40-day-X-SHiELD-simulation-C384-diagnostics/C48_gfsphysics_15min_coarse.zarr \ gs://vcm-ml-public/test-annak/test-new-diags \ - 48 + --num_test_zarrs 4 diff --git a/workflows/sklearn_regression/train_sklearn.sh b/workflows/sklearn_regression/train_sklearn.sh index e23fd5b63f..07f5983442 100755 --- a/workflows/sklearn_regression/train_sklearn.sh +++ b/workflows/sklearn_regression/train_sklearn.sh @@ -1,4 +1,4 @@ python -m fv3net.regression.sklearn.train \ - gs://vcm-ml-data/test-annak/2020-02-05_train_data_pipeline/train/ \ + gs://vcm-ml-data/orchestration-testing/test-experiment-0ec4a4b1/create_training_data_train-fraction_0.5_runner_DataflowRunner \ workflows/sklearn_regression/example_rf_training_config.yml \ gs://vcm-ml-data/test_annak