Skip to content

Commit

Permalink
Add option to sync NetCDF files
Browse files Browse the repository at this point in the history
Sometimes, NetCDF files need syncing (force-writing to disk). This seems
to be particularly the case for GPU runs.

This PR adds an option to provide the `NetCDFWriter` with a schedule to
call `NCDatasets.sync` based on arbitrary conditions.

The default behavior depends on the context: on CPUs, we let `NetCDF`
manage its buffered writes, on GPUs, we call `sync` at the every time
steps for those datasets that need to be synced (ie, those that were
written since the last sync).

In general, we keep track of what files have been recently
touched (adding them to a list internal to the `writer`) and whenever
`sync` is called, `NCDatasets.sync` is called on those files. This
ensures that only files that needs to be synced are synced and that
calling `sync` twice in a row results in a no-op for the second `sync`.
  • Loading branch information
Sbozzolo committed May 13, 2024
1 parent ebe2ee1 commit 88cad2a
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 39 deletions.
15 changes: 15 additions & 0 deletions docs/src/writers.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ dimension is any dataset.

Do not forget to close your writers to avoid file corruption!

To help reducing data loss, `NetCDFWriter` can force __syncing__, i.e. flushing
the values to disk. Usually, NetCDF buffers writes to disk (because they are
expensive), meaning values are not immediately written but are saved to disk in
batch. This can result in data loss, and it is often useful to force NetCDF to
write to disk (this is especially the case when working with GPUs). To do so,
you can pass the `sync_schedule` function to the constructor of `NetCDFWriter`.
When not `nothing`, `sync_schedule` is a callable that takes one argument (the
`integrator`) and returns a bool. When the bool is true, the files that were
modified since the last `sync` will be `sync`ed. For example, to force sync
every 1000 steps, you can pass the
`ClimaDiagnostics.Schedules.DivisorSchedule(1000)` schedule. By default, on
GPUs, we call `sync` at the end of every time step for those files that need to
be synced.

Variables are saved as datasets with attributes, where the attributes include
`long_name`, `standard_name`, `units`...

```@docs
ClimaDiagnostics.Writers.NetCDFWriter
ClimaDiagnostics.Writers.interpolate_field!
ClimaDiagnostics.Writers.write_field!
ClimaDiagnostics.Writers.sync
Base.close
```

Expand Down
23 changes: 22 additions & 1 deletion src/Writers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,37 @@ Currently, it implements:
- `HDF5Writer`, to save raw `ClimaCore` `Fields` to HDF5 files
- `NetCDFWriter`, to save remapped `ClimaCore` `Fields` to NetCDF files
Writers are expected to implement:
Writers can implement:
- `interpolate_field!`
- `write_field!`
- `Base.close`
- `sync`
"""
module Writers

import ..AbstractWriter, ..ScheduledDiagnostic
import ..ScheduledDiagnostics: output_short_name, output_long_name

function write_field!(writer::AbstractWriter, field, diagnostic, u, p, t)
# Nothing to be done here :)
return nothing
end

function Base.close(writer::AbstractWriter)
# Nothing to be done here :)
return nothing
end

function interpolate_field!(writer::AbstractWriter, field, diagnostic, u, p, t)
# Nothing to be done here :)
return nothing
end

function sync(writer::AbstractWriter)
# Nothing to be done here :)
return nothing
end

include("dummy_writer.jl")
include("dict_writer.jl")
include("hdf5_writer.jl")
Expand Down
22 changes: 20 additions & 2 deletions src/clima_diagnostics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Accessors
import SciMLBase

import .Schedules: DivisorSchedule, EveryDtSchedule
import .Writers: interpolate_field!, write_field!, AbstractWriter
import .Writers: interpolate_field!, write_field!, sync, AbstractWriter

# We define all the known identities in reduction_identities.jl
include("reduction_identities.jl")
Expand Down Expand Up @@ -113,6 +113,15 @@ function DiagnosticsHandler(scheduled_diagnostics, Y, p, t; dt = nothing)
)
end

# Does the writer associated to `diag` need to be synced?
# It does only when it has a sync_schedule that is a callable and that
# callable returns true when called on the integrator
function _needs_sync(diag, integrator)
hasproperty(diag.output_writer, :sync_schedule) || return false
isnothing(diag.output_writer.sync_schedule) && return false
return diag.output_writer.sync_schedule(integrator)
end

"""
orchestrate_diagnostics(integrator, diagnostic_handler::DiagnosticsHandler)
Expand All @@ -126,9 +135,12 @@ function orchestrate_diagnostics(
scheduled_diagnostics = diagnostic_handler.scheduled_diagnostics
active_compute = Bool[]
active_output = Bool[]
active_sync = Bool[]

for diag in scheduled_diagnostics
push!(active_compute, diag.compute_schedule_func(integrator))
push!(active_output, diag.output_schedule_func(integrator))
push!(active_sync, _needs_sync(diag, integrator))
end

# Compute
Expand Down Expand Up @@ -202,9 +214,15 @@ function orchestrate_diagnostics(

# Post-output clean-up
for diag_index in 1:length(scheduled_diagnostics)
active_output[diag_index] || continue
diag = scheduled_diagnostics[diag_index]

# First, maybe call sync for the writer. This might happen regardless of
# whether the diagnostic was active or not (because diagnostics
# typically share writers)
active_sync[diag_index] && sync(diag.output_writer)

active_output[diag_index] || continue

# Reset accumulator
isa_time_reduction = !isnothing(diag.reduction_time_func)
if isa_time_reduction
Expand Down
10 changes: 0 additions & 10 deletions src/dict_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,3 @@ end
function Base.getindex(writer::DictWriter, key)
return Base.getindex(writer.dict, key)
end

function Base.close(writer::DictWriter)
# Nothing to be done here :)
return nothing
end

function interpolate_field!(writer::DictWriter, field, diagnostic, u, p, t)
# Nothing to be done here :)
return nothing
end
15 changes: 1 addition & 14 deletions src/dummy_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,4 @@ scheduled diagnostic. Mostly useful for testing and debugging ClimaDiagnostics.
"""
struct DummyWriter <: AbstractWriter end

function write_field!(writer::DummyWriter, field, diagnostic, u, p, t)
# Nothing to be done here :)
# return nothing
end

function Base.close(writer::DummyWriter)
# Nothing to be done here :)
return nothing
end

function interpolate_field!(writer::DummyWriter, field, diagnostic, u, p, t)
# Nothing to be done here :)
return nothing
end
# All the methods are the default methods for AbstractWriter
9 changes: 3 additions & 6 deletions src/hdf5_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ end
Close all the files open in `writer`. (Currently no-op.)
"""
Base.close(writer::HDF5Writer) = nothing
function Base.close(writer::HDF5Writer)
return nothing
end

"""
HDF5Writer(output_dir)
Expand Down Expand Up @@ -56,8 +58,3 @@ function write_field!(writer::HDF5Writer, field, diagnostic, u, p, t)
Base.close(hdfwriter)
return nothing
end

function interpolate_field!(writer::HDF5Writer, field, diagnostic, u, p, t)
# Nothing to be done here :)
return nothing
end
48 changes: 44 additions & 4 deletions src/netcdf_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Dates

import ClimaCore: Domains, Geometry, Grids, Fields, Meshes, Spaces
import ClimaCore.Remapping: Remapper, interpolate, interpolate!
import ..Schedules: EveryStepSchedule

import NCDatasets

Expand All @@ -15,7 +16,7 @@ include("netcdf_writer_coordinates.jl")
A struct to remap `ClimaCore` `Fields` to rectangular grids and save the output to NetCDF
files.
"""
struct NetCDFWriter{T, TS, DI} <: AbstractWriter
struct NetCDFWriter{T, TS, DI, SYNC} <: AbstractWriter
"""The base folder where to save the files."""
output_dir::String

Expand Down Expand Up @@ -52,6 +53,17 @@ struct NetCDFWriter{T, TS, DI} <: AbstractWriter
"""Areas of memory preallocated where the interpolation output is saved. Only the root
process uses this."""
preallocated_output_arrays::DI

"""Callable that determines when to call NetCDF.sync. NetCDF.sync is needed to flush the
output to disk. Usually, the NetCDF is able to determine when to write the output to
disk, but this sometimes fails (e.g., on GPUs). In that case, it is convenient to force
NetCDF to write to disk. When `sync_schedule = nothing`, it is up to NetCDF to manage
when to write to disk. Alternatively, pass a `schedule`, a function that takes the
integrator as input and returns a boolean"""
sync_schedule::SYNC

"""Set of datasets that need to be synced. Useful when `sync_schedule` is not `nothing`."""
unsynced_datasets::Set{NCDatasets.NCDataset}
end

"""
Expand All @@ -65,7 +77,7 @@ function Base.close(writer::NetCDFWriter)
end

"""
NetCDFWriter(cspace, output_dir)
NetCDFWriter(space, output_dir)
Save a `ScheduledDiagnostic` to a NetCDF file inside the `output_dir` of the simulation by
performing a pointwise (non-conservative) remapping first.
Expand All @@ -82,15 +94,22 @@ Keyword arguments
at on levels. When disable_vertical_interpolation is true,
the num_points on the vertical direction is ignored.
- `compression_level`: How much to compress the output NetCDF file (0 is no compression, 9
is maximum compression).
is maximum compression).
- `sync_schedule`: Schedule that determines when to call `NetCDF.sync` (to flush the output
to disk). When `NetCDF.sync` is called, you can guarantee that the bits
are written to disk (instead of being buffered in memory). A schedule is
a boolean callable that takes as a single argument the `integrator`.
`sync_schedule` can also be set as `nothing`, in which case we let
handling buffered writes to disk.
"""
function NetCDFWriter(
space,
output_dir;
num_points = (180, 90, 50),
disable_vertical_interpolation = false,
compression_level = 9,
sync_schedule = ClimaComms.device(space) isa ClimaComms.CUDADevice ?
EveryStepSchedule() : nothing,
)
horizontal_space = Spaces.horizontal_space(space)
is_horizontal_space = horizontal_space == space
Expand Down Expand Up @@ -145,10 +164,13 @@ function NetCDFWriter(
ClimaComms.iamroot(comms_ctx) ?
Dict{String, ClimaComms.array_type(space)}() : Dict{String, Nothing}()

unsynced_datasets = Set{String}()

return NetCDFWriter{
typeof(num_points),
typeof(interpolated_physical_z),
typeof(preallocated_arrays),
typeof(sync_schedule),
}(
output_dir,
Dict{String, Remapper}(),
Expand All @@ -158,6 +180,8 @@ function NetCDFWriter(
Dict{String, NCDatasets.NCDataset}(),
disable_vertical_interpolation,
preallocated_arrays,
sync_schedule,
unsynced_datasets,
)
end

Expand Down Expand Up @@ -340,6 +364,22 @@ function write_field!(writer::NetCDFWriter, field, diagnostic, u, p, t)
elseif length(dim_names) == 1
v[time_index, :] = interpolated_field
end

# Add file to list of files that might need manual sync
push!(writer.unsynced_datasets, nc)

return nothing
end

"""
sync(writer::NetCDFWriter)
Call `NCDatasets.sync` on all the files in the `writer.unsynced_datasets` list.
`NCDatasets.sync` ensures that the values are written to file.
"""
function sync(writer::NetCDFWriter)
foreach(NCDatasets.sync, writer.unsynced_datasets)
empty!(writer.unsynced_datasets)
return nothing
end

Expand Down
14 changes: 12 additions & 2 deletions test/writers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ end
# Number of interpolation points
NUM = 100

writer =
Writers.NetCDFWriter(space, output_dir; num_points = (NUM, NUM, NUM))
writer = Writers.NetCDFWriter(
space,
output_dir;
num_points = (NUM, NUM, NUM),
sync_schedule = ClimaDiagnostics.Schedules.DivisorSchedule(2),
)

writer_no_vert_interpolation = Writers.NetCDFWriter(
space,
Expand Down Expand Up @@ -74,6 +78,12 @@ end
Writers.interpolate_field!(writer, field, diagnostic, u, p, t)
Writers.write_field!(writer, field, diagnostic, u, p, t)

@test writer.unsynced_datasets ==
Set((writer.open_files[joinpath(output_dir, "my_short_name.nc")],))

Writers.sync(writer)
@test writer.unsynced_datasets == Set{NCDatasets.NCDataset}()

NCDatasets.NCDataset(joinpath(output_dir, "my_short_name.nc")) do nc
@test nc["ABC"].attrib["short_name"] == "ABC"
@test nc["ABC"].attrib["long_name"] == "My Long Name"
Expand Down

0 comments on commit 88cad2a

Please sign in to comment.