Skip to content

Commit

Permalink
[develop] Introduce DA data preprocessing to workflow (#778)
Browse files Browse the repository at this point in the history
This is a set of contributions as part of the RRFS/dev merge. The major capabilities include:

* A new task (get_da_obs) that retrieves and stages observation files for the subsequent data processing tasks. Most of this logic is existing consolidated logic that has been moved out of the process* tasks.
* Add workflow entries for running get_da_obs, process_radarref, process_lightning, and process_bufrobs tasks
* Add a new config.rrfs.yaml file for RRFS-specific options; this will be expanded over time as more capabilities are added.
* Add a new test (process_obs) to test these initial observation processing tasks. Note that in order to run this test, the app must be built with the rrfs_utils and gsi builds, which is not part of the default build at this time.
* More verbose errors for run_WE2E_tests.py
* Improvements to retrieve_data.py
1) Rename --file_type and --external_model arguments to --file_fmt and --data_type, respectively. These are less confusing option names since both model data and obs can be retrieved with this script.
2) Fix bug where script was not looking for files in additional locations if they exist
3) Fix bug where script failed if any files are missing from zip archive; this should not be a failure condition unless all are missing
4) Make some arguments optional if it's possible to continue without them
5) Give more helpful/verbose error messages if an invalid/unavailable data store was selected
6) Add new unit tests "test_rap_obs_from_hpss" and "test_rap_e_obs_from_hpss"

---------

Co-authored-by: Christina.Holt <[email protected]>
  • Loading branch information
mkavulich and christinaholtNOAA authored May 19, 2023
1 parent ab11918 commit 4892952
Show file tree
Hide file tree
Showing 27 changed files with 741 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python_linter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ jobs:
- name: Lint the test directory
run: |
export PYTHONPATH=${PWD}/ush
pylint --ignore-imports=yes tests/test_python/
pylint --min-similarity-lines=15 --ignore-imports=yes tests/test_python/
89 changes: 89 additions & 0 deletions jobs/JREGIONAL_GET_DA_OBS
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/bash

#
#-----------------------------------------------------------------------
#
# Source the variable definitions file and the bash utility functions.
#
#-----------------------------------------------------------------------
#
. $USHdir/source_util_funcs.sh
source_config_for_task "task_run_anl|task_run_enkf" ${GLOBAL_VAR_DEFNS_FP}
. $USHdir/job_preamble.sh
#
#-----------------------------------------------------------------------
#
# Save current shell options (in a global array). Then set new options
# for this script/function.
#
#-----------------------------------------------------------------------
#
{ save_shell_opts; . $USHdir/preamble.sh; } > /dev/null 2>&1
#
#-----------------------------------------------------------------------
#
# Get the full path to the file in which this script/function is located
# (scrfunc_fp), the name of that file (scrfunc_fn), and the directory in
# which the file is located (scrfunc_dir).
#
#-----------------------------------------------------------------------
#
scrfunc_fp=$( $READLINK -f "${BASH_SOURCE[0]}" )
scrfunc_fn=$( basename "${scrfunc_fp}" )
scrfunc_dir=$( dirname "${scrfunc_fp}" )
#
#-----------------------------------------------------------------------
#
# Print message indicating entry into script.
#
#-----------------------------------------------------------------------
#
print_info_msg "
========================================================================
Entering script: \"${scrfunc_fn}\"
In directory: \"${scrfunc_dir}\"
This script retrieves observation data for RRFS data assimilation tasks.
========================================================================"

#
#-----------------------------------------------------------------------
#
# Create the directory where the GSI obs files should be stored
#
#-----------------------------------------------------------------------
#
export DATA="${COMIN}/obs"
mkdir_vrfy -p "${DATA}"

# Set needed date/time variables
export START_DATE=$(echo "${PDY} ${cyc}")
export YYYYMMDDHH=$(date +%Y%m%d%H -d "${START_DATE}")

#
#-----------------------------------------------------------------------
#
# Call the ex-script for this J-job
#
#-----------------------------------------------------------------------
#
$SCRIPTSdir/exregional_get_da_obs.sh || \
print_err_msg_exit "\
Call to ex-script corresponding to J-job \"${scrfunc_fn}\" failed."
#
#-----------------------------------------------------------------------
#
# Run job postamble.
#
#-----------------------------------------------------------------------
#
job_postamble
#
#-----------------------------------------------------------------------
#
# Restore the shell options saved at the beginning of this script/function.
#
#-----------------------------------------------------------------------
#
{ restore_shell_opts; } > /dev/null 2>&1

2 changes: 1 addition & 1 deletion jobs/JREGIONAL_GET_EXTRN_MDL_FILES
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ esac
#
#-----------------------------------------------------------------------
#
# Create the directory where the exetrnal model files should be stored
# Create the directory where the external model files should be stored
#
#-----------------------------------------------------------------------
#
Expand Down
13 changes: 9 additions & 4 deletions jobs/JREGIONAL_PROCESS_BUFR → jobs/JREGIONAL_PROCESS_BUFROBS
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#-----------------------------------------------------------------------
#
. $USHdir/source_util_funcs.sh
source_config_for_task "task_process_bufr" ${GLOBAL_VAR_DEFNS_FP}
source_config_for_task "task_process_bufrobs" ${GLOBAL_VAR_DEFNS_FP}
. $USHdir/job_preamble.sh "TRUE"
#
#-----------------------------------------------------------------------
Expand Down Expand Up @@ -75,11 +75,16 @@ the specified cycle.
#-----------------------------------------------------------------------
#
if [ ${CYCLE_TYPE} == "spinup" ]; then
DATA="${DATA:-${COMIN}/process_bufr_spinup}"
DATA="${DATA:-${COMIN}/process_bufrobs_spinup}"
else
DATA="${DATA:-${COMIN}/process_bufr}"
DATA="${DATA:-${COMIN}/process_bufrobs}"
fi
mkdir_vrfy -p ${DATA}

# Set needed date/time variables
export START_DATE=$(echo "${PDY} ${cyc}")
export YYYYMMDDHH=$(date +%Y%m%d%H -d "${START_DATE}")

#
#-----------------------------------------------------------------------
#
Expand All @@ -88,7 +93,7 @@ mkdir_vrfy -p ${DATA}
#
#-----------------------------------------------------------------------
#
$SCRIPTSdir/exregional_process_bufr.sh || print_err_msg_exit "\
$SCRIPTSdir/exregional_process_bufrobs.sh || print_err_msg_exit "\
Call to ex-script corresponding to J-job \"${scrfunc_fn}\" failed."
#
#-----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion jobs/JREGIONAL_PROCESS_LIGHTNING
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#-----------------------------------------------------------------------
#
. ${USHdir}/source_util_funcs.sh
source_config_for_task "task_process_lightning" ${GLOBAL_VAR_DEFNS_FP}
source_config_for_task "task_process_bufrobs" ${GLOBAL_VAR_DEFNS_FP}
. ${USHdir}/job_preamble.sh "TRUE"
#
#-----------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions modulefiles/tasks/hera/get_da_obs.local.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load("hpss")
load("python_srw")
2 changes: 2 additions & 0 deletions modulefiles/tasks/jet/get_da_obs.local.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load("hpss")
load("python_srw")
11 changes: 10 additions & 1 deletion parm/data_locations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,12 @@ RAP_obs:
archive_format: zip
archive_path:
- /BMC/fdr/Permanent/{yyyy}/{mm}/{dd}/data/grids/rap/obs
- /BMC/fdr/Permanent/{yyyy}/{mm}/{dd}/data/grids/rap/prepbufr
archive_internal_dir:
- ./
- ""
archive_file_names:
- "{yyyymmddhh}00.zip"
- "{yyyymmddhh}00.zip"
file_names:
obs:
- "{yyyymmddhh}.rap.t{hh}z.prepbufr.tm00"
Expand Down Expand Up @@ -358,6 +360,13 @@ RAP_obs:
- "{yyyymmddhh}.rap.t{hh}z.satwnd.tm00.bufr_d"
- "{yyyymmddhh}.rap.t{hh}z.sevasr.tm00.bufr_d"
- "{yyyymmddhh}.rap.t{hh}z.ssmisu.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.prepbufr.tm00"
- "{yyyymmddhh}.rap_e.t{hh}z.1bamua.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.1bhrs4.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.1bmhs.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.lgycld.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.nexrad.tm00.bufr_d"
- "{yyyymmddhh}.rap_e.t{hh}z.satwnd.tm00.bufr_d"
aws:
protocol: download
url: https://noaa-rap-pds.s3.amazonaws.com/rap.{yyyymmdd}
Expand Down
1 change: 0 additions & 1 deletion parm/wflow/coldstart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,3 @@ metatask_run_ensemble:
taskdep:
attrs:
task: aqm_lbcs

102 changes: 102 additions & 0 deletions parm/wflow/da_data_preproc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# This group contains all the tasks needed for preprocessing tasks for
# RRFS DA.

default_data_preproc_task: &default_preproc
account: '&ACCOUNT;'
attrs:
cycledefs: #cycledefs_type#
maxtries: '1'
envars: &default_envars
GLOBAL_VAR_DEFNS_FP: '&GLOBAL_VAR_DEFNS_FP;'
USHdir: '&USHdir;'
PDY: !cycstr "@Y@m@d"
cyc: !cycstr "@H"
subcyc: !cycstr "@M"
LOGDIR: !cycstr "&LOGDIR;"
CYCLE_TYPE: '#cycle_type#'
native: '{{ platform.SCHED_NATIVE_CMD }}'
nodes: '{{ nnodes }}:ppn={{ ppn }}'
nnodes: 1
nodesize: "&NCORES_PER_NODE;"
ppn: 1
partition: '{% if platform.get("PARTITION_DEFAULT") %}&PARTITION_DEFAULT;{% else %}None{% endif %}'
queue: '&QUEUE_DEFAULT;'
walltime: 00:25:00


task_get_da_obs:
<<: *default_preproc
command: '&LOAD_MODULES_RUN_TASK_FP; "get_da_obs" "&JOBSdir;/JREGIONAL_GET_DA_OBS"'
attrs:
cycledefs: forecast
maxtries: '1'
join: !cycstr '&LOGDIR;/{{ jobname }}_@Y@m@d@H&LOGEXT;'
partition: '&PARTITION_HPSS;'
queue: '&QUEUE_HPSS;'
dependency:
timedep: '<cyclestr offset="&START_TIME_CONVENTIONAL;">@Y@m@d@H@M00</cyclestr>'

metatask_process_obs_cycle_type:
var:
cycledefs_type: forecast,long_forecast
cycle_type: prod

task_process_radarref_#cycle_type#:
<<: *default_preproc
command: '&LOAD_MODULES_RUN_TASK_FP; "process_obs" "&JOBSdir;/JREGIONAL_PROCESS_RADARREF"'
ppn: 24
join: !cycstr '&LOGDIR;/{{ jobname }}_@Y@m@d@H&LOGEXT;'
dependency:
or:
taskdep:
attrs:
task: get_da_obs
and:
not:
taskvalid:
attrs:
task: get_da_obs
streq:
left: do_real_time
right: '{% if workflow.DO_REAL_TIME %}do_real_time{% endif %}'
timedep: '<cyclestr offset="&START_TIME_NSSLMOSAIC;">@Y@m@d@H@M00</cyclestr>'

task_process_lightning_#cycle_type#:
<<: *default_preproc
command: '&LOAD_MODULES_RUN_TASK_FP; "process_obs" "&JOBSdir;/JREGIONAL_PROCESS_LIGHTNING"'
join: !cycstr '&LOGDIR;/{{ jobname }}_@Y@m@d@H&LOGEXT;'
dependency:
or:
taskdep:
attrs:
task: get_da_obs
and:
not:
taskvalid:
attrs:
task: get_da_obs
streq:
left: do_real_time
right: '{% if workflow.DO_REAL_TIME %}do_real_time{% endif %}'
timedep: '<cyclestr offset="&START_TIME_LIGHTNING;">@Y@m@d@H@M00</cyclestr>'


task_process_bufrobs_#cycle_type#:
<<: *default_preproc
command: '&LOAD_MODULES_RUN_TASK_FP; "process_obs" "&JOBSdir;/JREGIONAL_PROCESS_BUFROBS"'
join: !cycstr '&LOGDIR;/{{ jobname }}_@Y@m@d@H&LOGEXT;'
dependency:
or:
taskdep:
attrs:
task: get_da_obs
and:
not:
taskvalid:
attrs:
task: get_da_obs
streq:
left: do_real_time
right: '{% if workflow.DO_REAL_TIME %}do_real_time{% endif %}'
timedep: '<cyclestr offset="&START_TIME_CONVENTIONAL;">@Y@m@d@H@M00</cyclestr>'

Loading

0 comments on commit 4892952

Please sign in to comment.