-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/use onestep zarr train data #207
Feature/use onestep zarr train data #207
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Anna. Overall it looks like a good implementation of the approach you outlined to me yesterday. I think the handling of variable names could be improved subtantially though. See my comments below.
Also, did you intentionally include the changes to the fv3config submodule? If not, please revert it to whatever is on develop-one-steps. There are also a bunch of formatting changes showing up in the diff, which I would revert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Anna there's a lot here I am trying to use/be consistent with on the one-step diags step. I had one suggestion/question about sharing code and names across multiple steps.
@@ -0,0 +1,69 @@ | |||
# suffixes that denote whether diagnostic variable is from the coarsened |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty much exactly the list of var names that the one step diags will use. If we're in agreement that fv3net.pipelines.common
is a good idea for sharing/consistency across workflow steps, then this file (and the yaml if it's being used) seems like a good candidate to reside there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
referencing this comment from @nbren12 's review: #207 (comment)
Since in a previous discussion we decided against doing the "import names from common .py" route to avoid linking the workflows in that manner, if we're using a lot of common var names across the workflows then I think we should go with the (2) and pass the variable name information to the workflows' respective main/run functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brianhenn As discussed offline, I'll change the source of the var names to be read in and passed to the run function so that a common list can be provided to both workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In particular this commit should address your comment: 354d675
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not actually reviewing this but I somehow started one by responding to Brian's comment
Address PR comments, ready for re-review. As we discussed where the various workflows would use names from earlier, the module level global vars are replaced with a dict that gets read and passed to the run function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making all those changes! I made some suggestions below for how to reduce the complexity of _create_train_cols
, but they probably aren't strictly necessary for this PR. I do however feel a little more strongly about not changing vcm.apparent_source
. This is almost there...FWIW I think we should keep using this pipeline instead of dask jobs, since beam is very robust for this kind of big complicated calculation.
try: | ||
names = yaml.safe_load(stream) | ||
except yaml.YAMLError as exc: | ||
raise ValueError(f"Bad yaml config: {exc}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try catch seems redundant since you re-raise the error raised by yaml
and don't have any special error handling logic. The traceback should make it pretty clear where the error is from.
@@ -19,6 +19,11 @@ | |||
logger.setLevel(logging.INFO) | |||
|
|||
|
|||
def convert_forecast_time_to_timedelta(ds, forecast_time_dim): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a public function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, added _ to name
|
||
logger.info(f"Processing {len(data_batch_urls)} subsets...") | ||
def run(args, pipeline_args, names): | ||
fs = get_fs(args.gcs_input_data_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could use a docstring or at least some type hints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
fs: GCSFileSystem | ||
run_dirs: list of GCS urls to open | ||
|
||
ds (xarray dataset): must have the specified feature vars in cols_to_keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this docstring seems largely out of date. since this is a helper function you could probably just delete all the argument information below.
ds[VAR_Q_U_WIND_ML] = apparent_source(ds.u) | ||
ds[VAR_Q_V_WIND_ML] = apparent_source(ds.v) | ||
ds[VAR_Q_HEATING_ML] = apparent_source(ds.T) | ||
ds[VAR_Q_MOISTENING_ML] = apparent_source(ds.sphum) | ||
ds = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not already done by the apparent_source
function you use above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other feature variables that aren't created by the loop above still have values at coordinates that we want to drop.
] | ||
features_diags_c48 = diags_c48.rename(RENAMED_PROG_DIAG_VARS) | ||
features_diags_c48 = diags_c48.rename(renamed_high_res_vars) | ||
return xr.merge([ds_run, features_diags_c48]) | ||
except (KeyError, AttributeError, ValueError, TypeError) as e: | ||
logger.error(f"Failed to merge in features from high res diagnostics: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the job fail in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the problem used to be that the LoadCloudData step would often fail when open_restarts was called, and if I did not catch that exception the entire pipeline would stop because of hitting the 4 failed jobs. But if that exception is caught, then downstream pipelines should fail and if those exceptions aren't caught then we'll just end up with the whole pipeline stopping again.
I had a discussion with @spencerkclark in the initial PR about this, because this goes against the Dataflow guidelines- there's not really a way to increase the number of failures allowed before stopping the whole pipeline, they expect the input should always be clean/valid. We decided that this was ok as a temporary solution but we should add some kind of step after the one step jobs that would write information about which run dirs were good to use in this step, which could be used to filter out the bad data before it went into the pipeline.
dict of train and test timesteps | ||
) | ||
""" | ||
init_datetime_coords = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This datetime parsing seems out of place. please lift to the calling function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
| "CreateTrainingCols" | ||
>> beam.Map( | ||
_create_train_cols, | ||
cols_to_keep=names["one_step_vars"] + names["target_vars"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, you only ever use one_step_vars + target_vars, and never each individually. If that's true I would just make them one list in the yaml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condensed to single list
ds[VAR_Q_U_WIND_ML] = apparent_source(ds.u) | ||
ds[VAR_Q_V_WIND_ML] = apparent_source(ds.v) | ||
ds[VAR_Q_HEATING_ML] = apparent_source(ds.T) | ||
ds[VAR_Q_MOISTENING_ML] = apparent_source(ds.sphum) | ||
ds = ( | ||
ds[cols_to_keep] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get rid of the cols_to_keep argument too, by lifting it to a beam.Map
too e.g.:
...
| "SelectVariables" > beam.Map(lambda x: x[list(cols_to_keep)])
...
Also be sure to use the list
. I have been burned by passing tuples or other iterables to Dataset.__getitem__
many times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved
Addressed PR comments, ready for re-review. I also saw that there were a couple parts that had been overlooked in regards to renaming to or using the newer dim names: i) centering vars on cell edges and ii) rename high res variables. Those changes were also added. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the hard work!
* Feature/one step save baseline (#193) This adds several features to the one-step pipeline - big zarr. Everything is stored as one zarr file - saves physics outputs - some refactoring of the job submission. Sample output: https://gist.github.com/nbren12/84536018dafef01ba5eac0354869fb67 * save lat/lon grid variables from sfc_dt_atmos (#204) * save lat/lon grid variables from sfc_dt_atmos * Feature/use onestep zarr train data (#207) Use the big zarr from the one step workflow as input to the create training data pipeline * One-step sfc variables time alignment (#214) This makes the diagnostics variables appended to the big zarr have the appropriate step and forecast_time dimensions, just as the variables extracted by the wrapper do. * One step zarr fill values (#223) This accomplishes two things: 1) preventing true model 0s from being cast to NaNs in the one-step big zarr output, and 2) initializing big zarr arrays with NaNs via full so that if they are not filled in due to a failed timestep or other reason, it is more apparent than using empty which produces arbitrary output. * adjustments to be able to run workflows in dev branch (#218) Remove references to hard coded dims and data variables or imports from vcm.cubedsphere.constants, replace with arguments. Can provide coords and dims as args for mappable var * One steps start index (#231) Allows for starting the one-step jobs at the specified index in the timestep list to allow for testing/avoiding spinup timesteps * Dev fix/integration tests (#234) * change order of required args so output is last * fix arg for onestep input to be dir containing big zarr * update end to end integration test ymls * prognostic run adjustments * Improved fv3 logging (#225) This PR introduces several improvements to the logging capability of our prognostic run image - include upstream changes to disable output capturing in `fv3config.fv3run` - Add `capture_fv3gfs_func` function. When called this capture the raw fv3gfs outputs and re-emit it as DEBUG level logging statements that can more easily be filtered. - Refactor `runtime` to `external/runtime/runtime`. This was easy since it did not depend on any other module in fv3net. (except implicitly the code in `fv3net.regression` which is imported when loading the sklearn model with pickle). - updates fv3config to master * manually merge in the refactor from master while keeping new names from develop (#237) * lint * remove logging from testing * Dev fix/arg order (#238) * update history * fix positional args * fix function args * update history * linting Co-authored-by: Anna Kwa <[email protected]> Co-authored-by: brianhenn <[email protected]>
* API: user_project falls back to project Closes #207
Changes the training data pipeline to use the single big zarr output from the one step workflow.
Option to specify the variable names from file in case they change in the one step data.