diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 735743b568..5b03efa04e 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -228,9 +228,19 @@ export FHOUT_ICE=3 export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: all 4 cycles. # GFS output and frequency -export FHMIN_GFS=0 -export FHMIN=${FHMIN_GFS} -export FHMAX_GFS=@FHMAX_GFS@ +# Forecast hour intervals to run the forecast over +# For a single-segment forecast, this is simply "$FHMIN_GFS,$FHMAX_GFS" +export FCST_SEGMENTS_STR_GFS="@FCST_SEGMENTS_GFS@" +IFS=', ' read -ra FCST_SEGMENTS_GFS <<< "${FCST_SEGMENTS_STR_GFS}" +if (( ${FCST_SEGMENT:- -1} < 0 )); then + # Jobs other than the forecast don't care about segments, only the + # absolute start and end + declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[0]} + declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[-1]} +else + declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}]} + declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}+1]} +fi export FHOUT_GFS=6 export FHMAX_HF_GFS=@FHMAX_HF_GFS@ export FHOUT_HF_GFS=1 diff --git a/parm/config/gefs/config.fcst b/parm/config/gefs/config.fcst index e66fc15f87..07cf2a9f2b 100644 --- a/parm/config/gefs/config.fcst +++ b/parm/config/gefs/config.fcst @@ -30,6 +30,7 @@ string="--fv3 ${CASE}" # shellcheck disable=SC2086 source "${EXPDIR}/config.ufs" ${string} +export FHMIN=${FHMIN_GFS} # shellcheck disable=SC2153 export FHMAX=${FHMAX_GFS} # shellcheck disable=SC2153 diff --git a/parm/config/gefs/yaml/defaults.yaml b/parm/config/gefs/yaml/defaults.yaml index d2b486e7ca..17a78ea53d 100644 --- a/parm/config/gefs/yaml/defaults.yaml +++ b/parm/config/gefs/yaml/defaults.yaml @@ -9,7 +9,7 @@ base: DO_AWIPS: "NO" KEEPDATA: "NO" DO_EXTRACTVARS: "NO" - FHMAX_GFS: 120 + FCST_SEGMENTS_GFS: "0,48,120" FHMAX_HF_GFS: 0 REPLAY_ICS: "NO" USE_OCN_PERTURB_FILES: "false" diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 56005199aa..873a12a644 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -284,8 +284,19 @@ export EUPD_CYC="@EUPD_CYC@" export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4: all 4 cycles. # GFS output and frequency -export FHMIN_GFS=0 -export FHMAX_GFS=@FHMAX_GFS@ +# Comma-separated forecast hour intervals to run the forecast over +# For a single-segment forecast, this is simply "$FHMIN_GFS,$FHMAX_GFS" +export FCST_SEGMENTS_STR_GFS="@FCST_SEGMENTS_GFS@" +IFS=', ' read -ra FCST_SEGMENTS_GFS <<< "${FCST_SEGMENTS_STR_GFS}" +if (( ${FCST_SEGMENT:- -1} < 0 )); then + # Jobs other than the forecast don't care about segments, only the + # absolute start and end + declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[0]} + declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[-1]} +else + declare -x FHMIN_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}]} + declare -x FHMAX_GFS=${FCST_SEGMENTS_GFS[${FCST_SEGMENT}+1]} +fi export FHOUT_GFS=3 # 3 for ops export FHMAX_HF_GFS=@FHMAX_HF_GFS@ export FHOUT_HF_GFS=1 diff --git a/parm/config/gfs/config.fcst b/parm/config/gfs/config.fcst index 4982b8f6e6..f75762fa7b 100644 --- a/parm/config/gfs/config.fcst +++ b/parm/config/gfs/config.fcst @@ -33,6 +33,7 @@ source "${EXPDIR}/config.ufs" ${string} # Forecast length for GFS forecast case ${RUN} in *gfs) + export FHMIN=${FHMIN_GFS} # shellcheck disable=SC2153 export FHMAX=${FHMAX_GFS} # shellcheck disable=SC2153 diff --git a/parm/config/gfs/yaml/defaults.yaml b/parm/config/gfs/yaml/defaults.yaml index da4d587dff..3a4cb0f457 100644 --- a/parm/config/gfs/yaml/defaults.yaml +++ b/parm/config/gfs/yaml/defaults.yaml @@ -14,7 +14,7 @@ base: DO_GENESIS: "YES" DO_GENESIS_FSU: "NO" DO_METP: "YES" - FHMAX_GFS: 120 + FCST_SEGMENTS_GFS: "0,120" FHMAX_HF_GFS: 0 DO_VRFY_OCEANDA: "NO" GSI_SOILANAL: "NO" diff --git a/sorc/wxflow b/sorc/wxflow index d314e06510..264e8f3a50 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit d314e065101041a4d45e5a11ec19cd2dc5f38c67 +Subproject commit 264e8f3a508493c98e9ecad28a2a0346c40c0f63 diff --git a/ush/forecast_predet.sh b/ush/forecast_predet.sh index ebf7cfd282..6b72f574d8 100755 --- a/ush/forecast_predet.sh +++ b/ush/forecast_predet.sh @@ -77,6 +77,7 @@ common_predet(){ CDATE=${CDATE:-"${PDY}${cyc}"} ENSMEM=${ENSMEM:-000} + MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER # Define significant cycles half_window=$(( assim_freq / 2 )) @@ -154,7 +155,6 @@ FV3_predet(){ FV3_OUTPUT_FH="${FV3_OUTPUT_FH} $(seq -s ' ' "${fhr}" "${FHOUT}" "${FHMAX}")" # Other options - MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER PREFIX_ATMINC=${PREFIX_ATMINC:-""} # allow ensemble to use recentered increment # IAU options diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index 97a77c2c21..0624c988d2 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -75,6 +75,7 @@ def __init__(self, conf: Configuration) -> None: self.do_hpssarch = _base.get('HPSSARCH', False) self.nens = _base.get('NMEM_ENS', 0) + self.fcst_segments = _base.get('FCST_SEGMENTS_STR_GFS', None) self.wave_runs = None if self.do_wave: diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index e78ac96d83..5c4f431856 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -138,19 +138,35 @@ def fcst(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT': '#seg#' + } + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'fcst_mem000' + task_name = f'fcst_mem000_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': 'gefs', 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'fcst_mem000', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -169,36 +185,60 @@ def efcs(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) - efcsenvars = self.envars.copy() - efcsenvars_dict = {'ENSMEM': '#member#', - 'MEMDIR': 'mem#member#' - } - for key, value in efcsenvars_dict.items(): - efcsenvars.append(rocoto.create_envar(name=key, value=str(value))) - + num_fcst_segments = len(self.app_config.fcst_segments) - 1 resources = self.get_resource('efcs') - task_name = f'fcst_mem#member#' - task_dict = {'task_name': task_name, - 'resources': resources, - 'dependency': dependencies, - 'envars': efcsenvars, - 'cycledef': 'gefs', - 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', - 'job_name': f'{self.pslot}_{task_name}_@H', - 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', - 'maxtries': '&MAXTRIES;' - } - - member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])} - metatask_dict = {'task_name': 'fcst_ens', - 'var_dict': member_var_dict, - 'task_dict': task_dict + # Kludge to work around bug in rocoto with serial metatasks nested + # in a parallel one (see christopherwharrop/rocoto#109). For now, + # loop over member to create a separate metatask for each instead + # of a metatask of a metatask. + # + tasks=[] + for member in [f"{mem:03d}" for mem in range(1, self.nmem + 1)]: + + efcsenvars = self.envars.copy() + efcsenvars_dict = {'ENSMEM': f'{member}', + 'MEMDIR': f'mem{member}', + 'FCST_SEGMENT': '#seg#' + } + for key, value in efcsenvars_dict.items(): + efcsenvars.append(rocoto.create_envar(name=key, value=str(value))) + + task_name = f'fcst_mem{member}_seg#seg#' + task_dict = {'task_name': task_name, + 'resources': resources, + 'dependency': dependencies, + 'envars': efcsenvars, + 'cycledef': 'gefs', + 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', + 'job_name': f'{self.pslot}_{task_name}_@H', + 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', + 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(metatask_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + seg_metatask_dict = {'task_name': f'fcst_mem{member}', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } - return task + tasks.append(rocoto.create_task(seg_metatask_dict)) + + return '\n'.join(tasks) + + # Keeping this in hopes the kludge is no longer necessary at some point + # + # member_var_dict = {'member': ' '.join([f"{mem:03d}" for mem in range(1, self.nmem + 1)])} + # mem_metatask_dict = {'task_name': 'fcst_ens', + # 'is_serial': False, + # 'var_dict': member_var_dict, + # 'task_dict': seg_metatask_dict + # } + + # task = rocoto.create_task(mem_metatask_dict) + + # return task def atmos_prod(self): return self._atmosoceaniceprod('atmos') diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index 960a7548ab..b43e11fc90 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -880,12 +880,23 @@ def _fcst_forecast_only(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=dependencies) + if self.run in ['gfs']: + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + else: + num_fcst_segments = 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT': '#seg#' + } + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'{self.run}fcst' + task_name = f'{self.run}fcst_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': self.run.replace('enkf', ''), 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', @@ -893,7 +904,14 @@ def _fcst_forecast_only(self): 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'{self.run}fcst', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -929,12 +947,23 @@ def _fcst_cycled(self): cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run + if self.run in ['gfs']: + num_fcst_segments = len(self.app_config.fcst_segments) - 1 + else: + num_fcst_segments = 1 + + fcst_vars = self.envars.copy() + fcst_envars_dict = {'FCST_SEGMENT', '#seg#' + } + for key, value in fcst_envars_dict.items(): + fcst_vars.append(rocoto.create_envar(name=key, value=str(value))) + resources = self.get_resource('fcst') - task_name = f'{self.run}fcst' + task_name = f'{self.run}fcst_seg#seg#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, - 'envars': self.envars, + 'envars': fcst_vars, 'cycledef': cycledef, 'command': f'{self.HOMEgfs}/jobs/rocoto/fcst.sh', 'job_name': f'{self.pslot}_{task_name}_@H', @@ -942,7 +971,14 @@ def _fcst_cycled(self): 'maxtries': '&MAXTRIES;' } - task = rocoto.create_task(task_dict) + seg_var_dict = {'seg': ' '.join([f"{seg}" for seg in range(0, num_fcst_segments)])} + metatask_dict = {'task_name': f'{self.run}fcst', + 'is_serial': True, + 'var_dict': seg_var_dict, + 'task_dict': task_dict + } + + task = rocoto.create_task(metatask_dict) return task @@ -1169,7 +1205,7 @@ def wavepostsbs(self): def wavepostbndpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) @@ -1221,7 +1257,7 @@ def wavepostbndpntbll(self): def wavepostpnt(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) if self.app_config.do_wave_bnd: dep_dict = {'type': 'task', 'name': f'{self.run}wavepostbndpntbll'} @@ -1318,7 +1354,7 @@ def waveawipsgridded(self): def postsnd(self): deps = [] - dep_dict = {'type': 'task', 'name': f'{self.run}fcst'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}fcst'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) diff --git a/workflow/rocoto/rocoto.py b/workflow/rocoto/rocoto.py index 0abb56cafb..2a20820da8 100644 --- a/workflow/rocoto/rocoto.py +++ b/workflow/rocoto/rocoto.py @@ -56,9 +56,10 @@ def create_task(task_dict: Dict[str, Any]) -> List[str]: else: # There is a nested task_dict, so this is a metatask metataskname = f"{task_dict.get('task_name', 'demometatask')}" + metataskmode = 'serial' if task_dict.get('is_serial', False) else 'parallel' var_dict = task_dict.get('var_dict', None) - strings = [f'\n', + strings = [f'\n', '\n'] if var_dict is None: