Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mostRecentAccessTime attribute to streams #4752

Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,12 @@ function li_core_init(domain, startTimeStamp) result(err)
if (alarm_cursor % isRecurring) then
! Force a read of this stream, and use the latest before time available in the file
call mpas_stream_mgr_read(domain % streamManager, streamID = stream_cursor % name, rightNow = .true., &
whence = MPAS_STREAM_LATEST_BEFORE, actualWhen=actualWhen, ierr=err_tmp)
whence = MPAS_STREAM_LATEST_BEFORE, saveActualWhen=.true., ierr=err_tmp)
err = ior(err, err_tmp)
call mpas_get_time(stream_cursor%mostRecentAccessTime, dateTimeString=actualWhen, ierr=err_tmp)
err = ior(err, err_tmp)
call mpas_log_write(" * Forced an initial read of input stream '" // trim(stream_cursor%name) // &
"' from time: " // trim(actualWhen))
err = ior(err, err_tmp)
endif
exit ! skip the rest of this loop - we processed the alarm we were looking for
endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
type (mpas_pool_type), pointer :: field_pkg_pool => null()
type (mpas_pool_type), pointer :: pkg_pool => null()
type (MPAS_Time_type), pointer :: referenceTime => null()
type (MPAS_Time_type), pointer :: mostRecentAccessTime => null()

! Used by alarms
type (MPAS_stream_list_type), pointer :: streamList => null()
Expand Down
66 changes: 56 additions & 10 deletions components/mpas-framework/src/framework/mpas_stream_manager.F
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ subroutine MPAS_stream_mgr_create_stream(manager, streamID, direction, filename,
integer, intent(out), optional :: ierr

type (MPAS_stream_list_type), pointer :: new_stream
type (MPAS_Time_type) :: mostRecentAccessTime
integer :: err_local, threadNum


Expand Down Expand Up @@ -380,6 +381,11 @@ subroutine MPAS_stream_mgr_create_stream(manager, streamID, direction, filename,
if (present(realPrecision)) then
new_stream % precision = realPrecision
end if
allocate(new_stream % mostRecentAccessTime)
call mpas_set_time(mostRecentAccessTime, dateTimeString = '9999-12-31_23:59:59')
new_stream % mostRecentAccessTime = mostRecentAccessTime
! Not sure what the best value to init to is. Need undefined value?

call MPAS_stream_list_create(new_stream % alarmList_in, ierr=err_local)
if (err_local /= MPAS_STREAM_LIST_NOERR) then
if (present(ierr)) ierr = MPAS_STREAM_MGR_ERROR
Expand Down Expand Up @@ -2798,6 +2804,7 @@ subroutine MPAS_stream_mgr_write(manager, streamID, timeLevel, mgLevel, forceWri
wroteStreams = .true.
stream_cursor % blockWrite = .false.
call write_stream(manager, stream_cursor, blockID, local_timeLevel, local_mgLevel, local_forceWrite, local_writeTime, local_ierr)
stream_cursor % mostRecentAccessTime = local_writeTime
end if
end do

Expand All @@ -2818,6 +2825,7 @@ subroutine MPAS_stream_mgr_write(manager, streamID, timeLevel, mgLevel, forceWri

stream_cursor % blockWrite = .false.
call write_stream(manager, stream_cursor, blockID, local_timeLevel, local_mgLevel, local_forceWrite, local_writeTime, temp_ierr)
stream_cursor % mostRecentAccessTime = local_writeTime
if (temp_ierr /= MPAS_STREAM_MGR_NOERR) then
local_ierr = temp_ierr
end if
Expand Down Expand Up @@ -2972,6 +2980,7 @@ subroutine MPAS_stream_mgr_block_write(manager, writeBlock, streamID, timeLevel,
stream_cursor % blockWrite = .true.
call write_stream(manager, stream_cursor, writeBlock % blockID, local_timeLevel, local_mgLevel, &
local_forceWrite, local_writeTime, local_ierr)
stream_cursor % mostRecentAccessTime = local_writeTime
stream_cursor % blockWrite = .false.
if ( associated(stream_cursor % stream) ) then
stream_cursor % valid = .false.
Expand Down Expand Up @@ -2999,6 +3008,7 @@ subroutine MPAS_stream_mgr_block_write(manager, writeBlock, streamID, timeLevel,
stream_cursor % blockWrite = .true.
call write_stream(manager, stream_cursor, writeBlock % blockID, local_timeLevel, local_mgLevel, &
local_forceWrite, local_writeTime, temp_ierr)
stream_cursor % mostRecentAccessTime = local_writeTime
stream_cursor % blockWrite = .false.
if ( associated(stream_cursor % stream) ) then
stream_cursor % valid = .false.
Expand Down Expand Up @@ -3417,12 +3427,20 @@ end subroutine write_stream !}}}
!> MPAS_STREAM_NEAREST, MPAS_STREAM_LATEST_BEFORE,
!> MPAS_STREAM_LATEST_STRICTLY_BEFORE, MPAS_STREAM_EARLIEST_AFTER, or
!> MPAS_STREAM_EARLIEST_STRICTLY_AFTER.
!> The optional output argument "actualWhen" returns the actual time read
!> from a stream in case an exact match for the "when" time is not found,
!> and a nearby time is selected using the "whence" argument.
!> The optional output argument "saveActualWhen" will save the actual time read
!> from a stream in case an exact match for the "when" time is not found,
!> and a nearby time is selected using the "whence" argument. This value
!> is stored in the streams "mostRecentAccessTime" attribute.
!> An error will occur if saveActualWhen==.true. for a stream that does not
!> include the xtime variable. That is why saveActualWhen is an optional
!> argument that defaults to .false. Thus, it is the responsibility of the
!> calling code to include saveActualWhen for calls to MPAS_stream_mgr_read
!> that require saving of the actualWhen time. Attempts to use i/o error
!> codes to ignore streams without xtime were found to be unreliable due to
!> pervasive occurrence of the generic MPAS_IO_ERR_PIO error.
!
!-----------------------------------------------------------------------
subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, when, whence, actualWhen, ierr) !{{{
subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow, when, whence, saveActualWhen, ierr) !{{{

implicit none

Expand All @@ -3433,7 +3451,7 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
logical, intent(in), optional :: rightNow
character (len=*), intent(in), optional :: when
integer, intent(in), optional :: whence
character (len=*), intent(out), optional :: actualWhen
logical, intent(in), optional :: saveActualWhen
integer, intent(out), optional :: ierr

type (MPAS_stream_list_type), pointer :: stream_cursor
Expand All @@ -3445,6 +3463,9 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
integer :: local_ierr
integer :: temp_ierr
type (MPAS_Time_type) :: now_time
character (len=StrKIND) :: actualWhen_local
logical :: local_saveActualWhen
type (MPAS_Time_type) :: actualWhen_time
integer :: threadNum
logical :: readStreams

Expand All @@ -3453,7 +3474,7 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
STREAM_DEBUG_WRITE('-- Called MPAS_stream_mgr_read()')

if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR
if (present(actualWhen)) write(actualWhen,'(a)') '0000-01-01_00:00:00'
write(actualWhen_local,'(a)') 'UNKNOWN'

!
! Use optional arguments or set defaults
Expand Down Expand Up @@ -3489,6 +3510,11 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
local_whence = MPAS_STREAM_EXACT_TIME
end if

if (present(saveActualWhen)) then
local_saveActualWhen = saveActualWhen
else
local_saveActualWhen = .false.
end if

if ( threadNum == 0 ) then
!
Expand All @@ -3504,8 +3530,18 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
! Verify that the stream is an input stream
if (stream_cursor % direction == MPAS_STREAM_INPUT .or. stream_cursor % direction == MPAS_STREAM_INPUT_OUTPUT) then
readStreams = .true.
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, local_when, &
local_whence, actualWhen, local_ierr)
if (local_saveActualWhen) then
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, local_when, &
local_whence, actualWhen_local, local_ierr)
if (trim(actualWhen_local) /= 'UNKNOWN') then
! Only set mostRecentAccessTime if the stream was successfully read
call mpas_set_time(actualWhen_time, dateTimeString = actualWhen_local)
stream_cursor % mostRecentAccessTime = actualWhen_time
endif
else
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, local_when, &
local_whence, ierr=local_ierr)
endif
end if
end do

Expand All @@ -3527,8 +3563,18 @@ subroutine MPAS_stream_mgr_read(manager, streamID, timeLevel, mgLevel, rightNow,
!
! What should be the meaning of actualWhen if we read multiple streams in this call?
!
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, &
local_when, local_whence, actualWhen, temp_ierr)
if (local_saveActualWhen) then
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, &
local_when, local_whence, actualWhen_local, temp_ierr)
if (trim(actualWhen_local) /= 'UNKNOWN') then
! Only set mostRecentAccessTime if the stream was successfully read
call mpas_set_time(actualWhen_time, dateTimeString = actualWhen_local)
stream_cursor % mostRecentAccessTime = actualWhen_time
endif
else
call read_stream(manager, stream_cursor, local_timeLevel, local_mgLevel, local_rightNow, &
local_when, local_whence, ierr=temp_ierr)
endif
if (temp_ierr /= MPAS_STREAM_MGR_NOERR) then
local_ierr = MPAS_STREAM_MGR_ERROR
end if
Expand Down