Watch our JuliaCon2023 talk on OndaBatches.jl! Slides (and source + demo)
Public API
Labeled signals
OndaBatches.LabeledSignalV2
— Type@version LabeledSignalV2 > SignalV2 begin
label_span::TimeSpan
labels::Union{Samples,SignalV2}
-end
Legolas.jl record type that represents one Onda signal with associated labels. Labels must be dense and contiguous, and are represented as Onda.Samples or an Onda.Signal that refers to Onda.Samples serialized as LPCM. label_span
corresponds to the time span (relative to the recording) spanned by the labels
.
Note that the signal span
and labels' label_span
are both relative to the start of the recording.
OndaBatches.sub_label_span
— Functionsub_label_span(labeled_signal, new_label_span)
Select a sub-span of labeled signals labeled_signal
(with schema "labeled.signal@2"
), returning a new labeled signal with updated labels
and label_span
.
The new_label_span
should be relative to the start of the recording (like the signal's span
and the current label_span
).
OndaBatches.label_signals
— Functionlabel_signals(signals, annotations;
+end
Legolas.jl record type that represents one Onda signal with associated labels. Labels must be dense and contiguous, and are represented as Onda.Samples or an Onda.Signal that refers to Onda.Samples serialized as LPCM. label_span
corresponds to the time span (relative to the recording) spanned by the labels
.
Note that the signal span
and labels' label_span
are both relative to the start of the recording.
OndaBatches.sub_label_span
— Functionsub_label_span(labeled_signal, new_label_span)
Select a sub-span of labeled signals labeled_signal
(with schema "labeled.signal@2"
), returning a new labeled signal with updated labels
and label_span
.
The new_label_span
should be relative to the start of the recording (like the signal's span
and the current label_span
).
OndaBatches.label_signals
— Functionlabel_signals(signals, annotations;
groups=:recording,
labels_column,
epoch,
encoding,
- roundto=nothing)
Create a "labeled signals" table from a signals table and a table of annotations containing labels.
Annotations will be passed to labels_to_samples_table
, as well as kwargs. labels_to_samples_table
requires these keyword arguments:
groups
: the column to group over, defaults to:recording
.labels_column
: the column in the annotations table containing the labels.epoch
: the sampling period of the labels.encoding::Dict
: the label ->UInt8
mapping to use for encoding the labels.roundto
: controls rounding of "shaggy spans", defaults tonothing
for no rounding.
Annotations must be
- contiguous and non-overlapping (withing
groups
) - regularly sampled, with spans an even integer multiple of the
epoch
kwarg.
Returns a LabeledSignalV2
table (e.g., with schema "labeled.signal@2"
), with labels in :labels
and the signal spans occupied by these labels in :label_span
. Like the signal :span
, the :label_span
is relative to the start of the recording, not necessarily to the start of the data represented by the signal.
If any label span is not entirely contained within the corresponding signal span, this will throw an ArgumentError.
OndaBatches.load_labeled_signal
— Functionfunction load_labeled_signal(labeled_signal, samples_eltype::Type=Float64)
Load signal data as Onda.Samples
from a labeled segment of an Onda.SignalV2
(i.e., a LabeledSignalV2
or row with schema "labeled.signal@2"
), and returns the portion of the samples data corresponding to labeled_signal.label_span
, along with the corresponding labels (as another Onda.Samples
object).
If possible, this will only retrieve the bytes corresponding to labeled_signal.label_span
.
The eltype
of the returned Samples
is samples_eltype
, which defaults to Float64
.
The handling of samples eltype
is different than Onda.load
, for which the eltype
depends on the resolution/offset specified in the samples info: when they are 1/0 respectively, the underlying encoded data is always returned exactly as-is, even if the type differs from the requested eltype
. This allows for some optimizations in such cases, but is a potential footgun when a particular eltype
is actually required. We work around this inconsistency here by always allocating a new array with the requested eltype
to hold the decoded samples.
Returns a samples, labels
tuple.
OndaBatches.store_labels
— Functionstore_labels(labeled_signals, root; format="lpcm")
Store labels to root
, replacing the Onda.Samples
in the labels
column of labeled_signals
with Onda.Signal
s.
store_labels(labeled_signal::LabeledSignalV2, root; format="lpcm")
Store a single set of labels to root
, replacing the Onda.Samples
in the labels
column of labeled_signals
with Onda.SignalV2
s. A single updated LabeledSignalV2
row is returned.
The filepath of the stored labels' Signal is the basename of labeled_signal.file_path
with "labels_"
prepended.
Batch sampling
OndaBatches.BatchItemV2
— Type@version BatchItemV2{T} > LabeledSignalV2 begin
+ roundto=nothing)
Create a "labeled signals" table from a signals table and a table of annotations containing labels.
Annotations will be passed to labels_to_samples_table
, as well as kwargs. labels_to_samples_table
requires these keyword arguments:
groups
: the column to group over, defaults to:recording
.labels_column
: the column in the annotations table containing the labels.epoch
: the sampling period of the labels.encoding::Dict
: the label ->UInt8
mapping to use for encoding the labels.roundto
: controls rounding of "shaggy spans", defaults tonothing
for no rounding.
Annotations must be
- contiguous and non-overlapping (withing
groups
) - regularly sampled, with spans an even integer multiple of the
epoch
kwarg.
Returns a LabeledSignalV2
table (e.g., with schema "labeled.signal@2"
), with labels in :labels
and the signal spans occupied by these labels in :label_span
. Like the signal :span
, the :label_span
is relative to the start of the recording, not necessarily to the start of the data represented by the signal.
If any label span is not entirely contained within the corresponding signal span, this will throw an ArgumentError.
OndaBatches.load_labeled_signal
— Functionfunction load_labeled_signal(labeled_signal, samples_eltype::Type=Float64)
Load signal data as Onda.Samples
from a labeled segment of an Onda.SignalV2
(i.e., a LabeledSignalV2
or row with schema "labeled.signal@2"
), and returns the portion of the samples data corresponding to labeled_signal.label_span
, along with the corresponding labels (as another Onda.Samples
object).
If possible, this will only retrieve the bytes corresponding to labeled_signal.label_span
.
The eltype
of the returned Samples
is samples_eltype
, which defaults to Float64
.
The handling of samples eltype
is different than Onda.load
, for which the eltype
depends on the resolution/offset specified in the samples info: when they are 1/0 respectively, the underlying encoded data is always returned exactly as-is, even if the type differs from the requested eltype
. This allows for some optimizations in such cases, but is a potential footgun when a particular eltype
is actually required. We work around this inconsistency here by always allocating a new array with the requested eltype
to hold the decoded samples.
Returns a samples, labels
tuple.
OndaBatches.store_labels
— Functionstore_labels(labeled_signals, root; format="lpcm")
Store labels to root
, replacing the Onda.Samples
in the labels
column of labeled_signals
with Onda.Signal
s.
store_labels(labeled_signal::LabeledSignalV2, root; format="lpcm")
Store a single set of labels to root
, replacing the Onda.Samples
in the labels
column of labeled_signals
with Onda.SignalV2
s. A single updated LabeledSignalV2
row is returned.
The filepath of the stored labels' Signal is the basename of labeled_signal.file_path
with "labels_"
prepended.
Batch sampling
OndaBatches.BatchItemV2
— Type@version BatchItemV2{T} > LabeledSignalV2 begin
batch_channels::T
-end
Legolas record type representing a single batch item. Fields are inherited from LabeledSignalV2 > SignalV2
, and an additional batch_channels
field gives a channel selector for this batch. A "channel selector" is anything that can be used as a channel index for Onda.Samples
, or missing
(in which case, all channels will be used in the order they occur in the Samples
).
Columns include:
- columns from
Onda.SignalV2
(everything required toOnda.load
the segment) labels
andlabel_span
fromLabeledSignalV2
batch_channels
OndaBatches.RandomBatches
— TypeRandomBatches
An iterator of pseudo-randomly sampled batches derived from a table of densely labeled signals (a labeled.signal@2
table). Batches consist of batch_size
"batch items". A single batch item consists of batch_duration * label_sample_rate
labels, and batch_duration * signal_sample_rate
samples of multichannel data.
Batch items are sampled according to the following procedure:
- A single labeled signal is sampled (optionally with weights)
- A single label from that signal is sampled (optionally with weights)
- One or more channels is selected, optionally randomly.
Each batch item is sampled independently, and in particular different batch items in a given batch can have different channels included (although the same number of them, n_channels
).
The functions iterate_batch_item
and iterate_batch
sample a single batch item and a full batch, respectively.
Fields
labeled_signals::DataFrame
: the table of labeled signals that batches are sampled from.signal_weights::AbstractWeights
: weights for individual signals (unweighted by default). May benothing
duration construction, in which case unit weights are created.label_weights::Vector{AbstractWeights}
: weights for individual labels of each labeled signal (unweighted by default). May benothing
during construction, in which case unit weights will be created for each labeled signal.n_channels::Union{Nothing,Int}
: the number of channels each batch item should have; this many channels are sampled without replacement, unlessn_channels === nothing
in which case all channels are included.batch_size::Int
: the number of items that make one complete batchbatch_duration::TimePeriod
: the duration of the window for a single batch.
OndaBatches.iterate_batch_item
— Functioniterate_batch_item(batches::RandomBatches, rng)
Yields a single "batch item". See documentation for RandomBatches
for the details on the sampling scheme.
Individual batch items are rows of a batch table with schema "batch-item@2"
, and are consumed by materialize_batch_item
.
OndaBatches.iterate_batch
— Functioniterate_batch(batches::Batches, rng)
Return a "batch listing" that can be materialized into model training/evaluation input.
A batch is a table that has one row per batch item, and follows the "batch-item@2"
schema.
This is consumed by a materialize_batch
function that can be run on a remote worker, so this sends just the minimum of information necessary to load the batch signal data, the stage labels, and the spans that say how they line up.
Batch materialization
OndaBatches.materialize_batch_item
— Functionmaterialize_batch_item(batch_item, samples_eltype::Type=Float64)
Load the signal data for a single BatchItemV2
, selecting only the channels specified in the batch_channels
field (using all channels if the field is missing
).
Returns a signal_data, label_data
tuple, which is the contents of the data
field of the signals and labels Samples
objects returned by [load_labeled_signal
](@ref), after the signals data by batch_channels
.
The eltype of signal_data
will be samples_eltype
; the eltype of label_data
is whatever is returned by get_labels
.
OndaBatches.materialize_batch
— Functionmaterialize_batch(batch, samples_eltype::Type=Float64)
Materialize an entire batch, which is a table of BatchItemV2
rows. Each row is materialized concurrently by materialize_batch_item
, and the resulting signals and labels arrays are concatenated on dimension ndims(x) + 1
respectively.
Returns a signal_data, label_data
tuple. The dimensionality of these arrays depends on the dimensionality of the results of materialize_batch_item
, but will in general be ndims(x) + 1
.
The eltype of signal_data
will be samples_eltype
; the eltype of label_data
is whatever is returned by get_labels
.
OndaBatches.get_channel_data
— Functionget_channel_data(samples, channels)
Get the data associated with the specified channels. Default fallback simply calls samples[channels, :].data
. But custom channel selectors can be used to implement more exotic featurization schemes, (see tests for examples).
Batching service
OndaBatches.Batcher
— TypeA struct that provides control of batching process on one or more remote workers. This struct keeps track of
manager::Int
the PID wherestart_batching
will be called.workers
anAbstractWorkerPool
for the worker process(es).channel::RemoteChannel
the channel that batches are loaded into.status::Future
the return value of thestart_batching
function as a Future; seeget_status
for a convenient accessor.batches
the iterator of batches that will be materialized; only requirement is thatiterate_batch
be defined; seeRandomBatches
for an examplestate::Any
batcher state (passed toiterate_batch
, updated with each new batch that's yielded by the batcher.buffer::Int
the size of the batch buffer to keep locally (e.g., the capacity ofchannel
).
Use start!
to start the batching service, stop!
to stop it, and get_status
to check the status.
Once the batcher is started, the sequence of materialized batches (the output of materialize_batch
) and corresponding batcher states can be retrieved by take!
.
Architecture
A Batcher
is meant to run in an architecture where remote workers are created with a Distributed.jl cluster manager. We use the following terminology to describe the roles these different processes play:
"Batch worker": one or more processes that are used to actually load batches (via
materialize_batch
)"Batch manager": the process which coordinates the loading of batches, ensuring consistent iteration order, distributing work to the batch workers, and populating the output channel.
start_batching
runs on this process."Client": the process which is consuming batches via
take!(::Batcher, state)
(which OndaBatches.jl is generally agnostic about and does not manage)"Manager": the process on which the
Batcher
is initially created, and holds the reference for the worker pool (for multi-worker batching).
We try hard to make Batcher
s relocatable to other processes (e.g., serializing to the Client after initialization on the Manager). However, since a new RemoteChannel
is created each time the batcher is started (including when the desired state does not match the Batcher
's current state), some care needs to be taken if it matters where that channel is hosted (although this behavior may change in the future).
Also note that while a running (i.e. start!
ed) Batcher
can be relocated to another process, the status
and channel
fields are not guaranteed to stay in sync on the two copies.
OndaBatches.Batcher
— MethodBatcher([manager::Int,] workers::Vector{Int}, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
-Batcher(manager::Int, workers::AbstractWorkerPool, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
Construct a new Batcher
, using worker IDs, batches, and initial state. The batcher's channel and status will be initialized.
The workers
may be specified as an AbstractWorkerPool
or a vector of PIDs (in which case a WorkerPool
will be constructed).
If workers are supplied as an AbstractWorkerPool
, it is assumed that all workers managed by the pool are available for loading batches. Whenever the batcher is stopped, the worker pool is reset, and all managed workers are returned to the channel of available workers.
See RandomBatches
for an example of creation of batches
.
The initial state
is the state that is used by iterate_batch
, e.g., the RNG used by RandomBatches
.
If start=true
, batching is start!
ed. The state
keyword argument must be supplied in this case to provide an initial state.
The buffer
controls the capacity of the batch channel; a value greater than or equal to the number of workers is recommended so that batch loading workers do not block waiting for batches to be taken off the channel.
Base.take!
— MethodBase.take!(batcher::Batcher, state)
Take one batch + state pair from the batcher, starting at the specified state. If the requested state does not match the batcher's current state, then the batching process will be restarted with the new state. If the batcher is not running (as indicated by get_status
), it will be started with start!
.
If an error has occurred on any of the batch loading workers, the next call to take!
will immediately throw the wrapped RemoteException
, even if there are still good batches on the channel.
Returns an (x, y), state
tuple, where x
is the batch signal data, y
is the label data (see materialize_batch
), and state
is the next batch iterator state.
Runs on the Client.
OndaBatches.start!
— Functionstart!(batcher::Batcher, state)
Start the remote process that loads batches into the batcher's channel. A new channel is created since the old one cannot always be re-used.
This invokes start_batching
on batcher.manager
with remotecall
.
The (modified) batcher is returned.
If the batcher is already running (as indicated by get_status == :running
), a warning is raised and the batcher is returned.
Runs on the Client.
OndaBatches.stop!
— Functionstop!(batcher::Batcher)
Close batcher.channel
to stop the remote batching. This blocks on fetch(batcher.status)
to wait for channel closure. If an error is thrown on the remote worker that is not caught, it will be rethrown here.
The batcher is returned.
Runs on the Client.
OndaBatches.get_status
— Functionget_status(batcher::Batcher)
Check the status of a remote batcher.
Possible return values are
:stopped
: the batcher was created but not started:running
: the batching loop is still running:closed
: the batch channel was closed and the batch loop has terminated:done
: the infinite loop instart_batching
has terminated without error (not expected)- a
RemoteException
that wraps an error thrown bystart_batching
on the batch manager (which may further wrap an exception thrown on a batch worker
Internal utilities
None of the following are meant to be called by users, are not part of the API for semantic versioning purposes, and can change at any time.
OndaBatches.labels_to_samples_table
— Functionlabels_to_samples_table(labels::AbstractDataFrame; labels_column,
- groups=:recording, epoch, kwargs...)
Convert annotations table into a table of labels as Samples. This groups by groups
(defaults to :recording
), and then applies int_encode_labels
to the labels_column
and :span
columns from each group, and converts the resulting UInt8
labels to Onda.Samples
via labels_to_samples
. The sampling rate for the resulting labels is 1 / epoch
. The samples are returned in the :labels
column.
Along with epoch
, additional kwargs are forwarded to int_encode_labels
:
encoding::Dict
the label ->UInt8
mapping to use for encodingroundto
controls rounding of "shaggy spans" (defaults tonothing
for no rounding)
The span
corresponding to these labels is determined by floor_containing
and returned in the :label_span
column.
A DataFrame
is returned with the :labels
and :label_span
per group, as well as the groups
variables.
OndaBatches.labels_to_samples
— Functionlabels_to_samples(labels::AbstractVector{UInt8}; epoch)
+end
Legolas record type representing a single batch item. Fields are inherited from LabeledSignalV2 > SignalV2
, and an additional batch_channels
field gives a channel selector for this batch. A "channel selector" is anything that can be used as a channel index for Onda.Samples
, or missing
(in which case, all channels will be used in the order they occur in the Samples
).
Columns include:
- columns from
Onda.SignalV2
(everything required toOnda.load
the segment) labels
andlabel_span
fromLabeledSignalV2
batch_channels
OndaBatches.RandomBatches
— TypeRandomBatches
An iterator of pseudo-randomly sampled batches derived from a table of densely labeled signals (a labeled.signal@2
table). Batches consist of batch_size
"batch items". A single batch item consists of batch_duration * label_sample_rate
labels, and batch_duration * signal_sample_rate
samples of multichannel data.
Batch items are sampled according to the following procedure:
- A single labeled signal is sampled (optionally with weights)
- A single label from that signal is sampled (optionally with weights)
- One or more channels is selected, optionally randomly.
Each batch item is sampled independently, and in particular different batch items in a given batch can have different channels included (although the same number of them, n_channels
).
The functions iterate_batch_item
and iterate_batch
sample a single batch item and a full batch, respectively.
Fields
labeled_signals::DataFrame
: the table of labeled signals that batches are sampled from.signal_weights::AbstractWeights
: weights for individual signals (unweighted by default). May benothing
duration construction, in which case unit weights are created.label_weights::Vector{AbstractWeights}
: weights for individual labels of each labeled signal (unweighted by default). May benothing
during construction, in which case unit weights will be created for each labeled signal.n_channels::Union{Nothing,Int}
: the number of channels each batch item should have; this many channels are sampled without replacement, unlessn_channels === nothing
in which case all channels are included.batch_size::Int
: the number of items that make one complete batchbatch_duration::TimePeriod
: the duration of the window for a single batch.
OndaBatches.iterate_batch_item
— Functioniterate_batch_item(batches::RandomBatches, rng)
Yields a single "batch item". See documentation for RandomBatches
for the details on the sampling scheme.
Individual batch items are rows of a batch table with schema "batch-item@2"
, and are consumed by materialize_batch_item
.
OndaBatches.iterate_batch
— Functioniterate_batch(batches::Batches, rng)
Return a "batch listing" that can be materialized into model training/evaluation input.
A batch is a table that has one row per batch item, and follows the "batch-item@2"
schema.
This is consumed by a materialize_batch
function that can be run on a remote worker, so this sends just the minimum of information necessary to load the batch signal data, the stage labels, and the spans that say how they line up.
Batch materialization
OndaBatches.materialize_batch_item
— Functionmaterialize_batch_item(batch_item, samples_eltype::Type=Float64)
Load the signal data for a single BatchItemV2
, selecting only the channels specified in the batch_channels
field (using all channels if the field is missing
).
Returns a signal_data, label_data
tuple, which is the contents of the data
field of the signals and labels Samples
objects returned by [load_labeled_signal
](@ref), after the signals data by batch_channels
.
The eltype of signal_data
will be samples_eltype
; the eltype of label_data
is whatever is returned by get_labels
.
OndaBatches.materialize_batch
— Functionmaterialize_batch(batch, samples_eltype::Type=Float64)
Materialize an entire batch, which is a table of BatchItemV2
rows. Each row is materialized concurrently by materialize_batch_item
, and the resulting signals and labels arrays are concatenated on dimension ndims(x) + 1
respectively.
Returns a signal_data, label_data
tuple. The dimensionality of these arrays depends on the dimensionality of the results of materialize_batch_item
, but will in general be ndims(x) + 1
.
The eltype of signal_data
will be samples_eltype
; the eltype of label_data
is whatever is returned by get_labels
.
OndaBatches.get_channel_data
— Functionget_channel_data(samples, channels)
Get the data associated with the specified channels. Default fallback simply calls samples[channels, :].data
. But custom channel selectors can be used to implement more exotic featurization schemes, (see tests for examples).
Batching service
OndaBatches.Batcher
— TypeA struct that provides control of batching process on one or more remote workers. This struct keeps track of
manager::Int
the PID wherestart_batching
will be called.workers
anAbstractWorkerPool
for the worker process(es).channel::RemoteChannel
the channel that batches are loaded into.status::Future
the return value of thestart_batching
function as a Future; seeget_status
for a convenient accessor.batches
the iterator of batches that will be materialized; only requirement is thatiterate_batch
be defined; seeRandomBatches
for an examplestate::Any
batcher state (passed toiterate_batch
, updated with each new batch that's yielded by the batcher.buffer::Int
the size of the batch buffer to keep locally (e.g., the capacity ofchannel
).
Use start!
to start the batching service, stop!
to stop it, and get_status
to check the status.
Once the batcher is started, the sequence of materialized batches (the output of materialize_batch
) and corresponding batcher states can be retrieved by take!
.
Architecture
A Batcher
is meant to run in an architecture where remote workers are created with a Distributed.jl cluster manager. We use the following terminology to describe the roles these different processes play:
"Batch worker": one or more processes that are used to actually load batches (via
materialize_batch
)"Batch manager": the process which coordinates the loading of batches, ensuring consistent iteration order, distributing work to the batch workers, and populating the output channel.
start_batching
runs on this process."Client": the process which is consuming batches via
take!(::Batcher, state)
(which OndaBatches.jl is generally agnostic about and does not manage)"Manager": the process on which the
Batcher
is initially created, and holds the reference for the worker pool (for multi-worker batching).
We try hard to make Batcher
s relocatable to other processes (e.g., serializing to the Client after initialization on the Manager). However, since a new RemoteChannel
is created each time the batcher is started (including when the desired state does not match the Batcher
's current state), some care needs to be taken if it matters where that channel is hosted (although this behavior may change in the future).
Also note that while a running (i.e. start!
ed) Batcher
can be relocated to another process, the status
and channel
fields are not guaranteed to stay in sync on the two copies.
OndaBatches.Batcher
— MethodBatcher([manager::Int,] workers::Vector{Int}, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
+Batcher(manager::Int, workers::AbstractWorkerPool, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
Construct a new Batcher
, using worker IDs, batches, and initial state. The batcher's channel and status will be initialized.
The workers
may be specified as an AbstractWorkerPool
or a vector of PIDs (in which case a WorkerPool
will be constructed).
If workers are supplied as an AbstractWorkerPool
, it is assumed that all workers managed by the pool are available for loading batches. Whenever the batcher is stopped, the worker pool is reset, and all managed workers are returned to the channel of available workers.
See RandomBatches
for an example of creation of batches
.
The initial state
is the state that is used by iterate_batch
, e.g., the RNG used by RandomBatches
.
If start=true
, batching is start!
ed. The state
keyword argument must be supplied in this case to provide an initial state.
The buffer
controls the capacity of the batch channel; a value greater than or equal to the number of workers is recommended so that batch loading workers do not block waiting for batches to be taken off the channel.
Base.take!
— MethodBase.take!(batcher::Batcher, state)
Take one batch + state pair from the batcher, starting at the specified state. If the requested state does not match the batcher's current state, then the batching process will be restarted with the new state. If the batcher is not running (as indicated by get_status
), it will be started with start!
.
If an error has occurred on any of the batch loading workers, the next call to take!
will immediately throw the wrapped RemoteException
, even if there are still good batches on the channel.
Returns an (x, y), state
tuple, where x
is the batch signal data, y
is the label data (see materialize_batch
), and state
is the next batch iterator state.
Runs on the Client.
OndaBatches.start!
— Functionstart!(batcher::Batcher, state)
Start the remote process that loads batches into the batcher's channel. A new channel is created since the old one cannot always be re-used.
This invokes start_batching
on batcher.manager
with remotecall
.
The (modified) batcher is returned.
If the batcher is already running (as indicated by get_status == :running
), a warning is raised and the batcher is returned.
Runs on the Client.
OndaBatches.stop!
— Functionstop!(batcher::Batcher)
Close batcher.channel
to stop the remote batching. This blocks on fetch(batcher.status)
to wait for channel closure. If an error is thrown on the remote worker that is not caught, it will be rethrown here.
The batcher is returned.
Runs on the Client.
OndaBatches.get_status
— Functionget_status(batcher::Batcher)
Check the status of a remote batcher.
Possible return values are
:stopped
: the batcher was created but not started:running
: the batching loop is still running:closed
: the batch channel was closed and the batch loop has terminated:done
: the infinite loop instart_batching
has terminated without error (not expected)- a
RemoteException
that wraps an error thrown bystart_batching
on the batch manager (which may further wrap an exception thrown on a batch worker
Internal utilities
None of the following are meant to be called by users, are not part of the API for semantic versioning purposes, and can change at any time.
OndaBatches.labels_to_samples_table
— Functionlabels_to_samples_table(labels::AbstractDataFrame; labels_column,
+ groups=:recording, epoch, kwargs...)
Convert annotations table into a table of labels as Samples. This groups by groups
(defaults to :recording
), and then applies int_encode_labels
to the labels_column
and :span
columns from each group, and converts the resulting UInt8
labels to Onda.Samples
via labels_to_samples
. The sampling rate for the resulting labels is 1 / epoch
. The samples are returned in the :labels
column.
Along with epoch
, additional kwargs are forwarded to int_encode_labels
:
encoding::Dict
the label ->UInt8
mapping to use for encodingroundto
controls rounding of "shaggy spans" (defaults tonothing
for no rounding)
The span
corresponding to these labels is determined by floor_containing
and returned in the :label_span
column.
A DataFrame
is returned with the :labels
and :label_span
per group, as well as the groups
variables.
OndaBatches.labels_to_samples
— Functionlabels_to_samples(labels::AbstractVector{UInt8}; epoch)
labels_to_samples(; epoch)
Convert a vector of UInt8 stage labels sampled evenly at intervals of epoch
into Onda.Samples
with samples rate of 1/epoch
.
The kwarg only form returns a closure that captures the epoch
.
The returned samples have samples info:
SamplesInfoV2(; sensor_type="label",
channels=["label"],
sample_unit="label",
sample_resolution_in_unit=1,
sample_offset_in_unit=0,
sample_type=UInt8,
- sample_rate=Second(1) / epoch)
OndaBatches.get_labels
— Functionget_labels(labels::Samples, span)
-get_labels(labels::SignalV2, span)
Return labels as Samples, deserializing with Onda.load
if necessary. span
is the span relative to the start of the recording that should be loaded.
This function is meant for internal use only; users should instead use load_labeled_signal
and sub_label_span
.
OndaBatches.int_encode_labels
— Functionint_encode_labels(stages, spans; epoch, encoding::Dict,
+ sample_rate=Second(1) / epoch)
OndaBatches.get_labels
— Functionget_labels(labels::Samples, span)
+get_labels(labels::SignalV2, span)
Return labels as Samples, deserializing with Onda.load
if necessary. span
is the span relative to the start of the recording that should be loaded.
This function is meant for internal use only; users should instead use load_labeled_signal
and sub_label_span
.
OndaBatches.int_encode_labels
— Functionint_encode_labels(stages, spans; epoch, encoding::Dict,
roundto=nothing)
-int_encode_labels(; epoch, encoding, roundto)
Return a Vector{UInt8}
of stage labels, using encoding
to look up each stage label in stages
, sampled evenly at intervals of epoch
. spans
are expanded into non-overlapping, contiguous spans of duration epoch
; spans
must be contiguous and with durations evenly divisible by epoch
, except for the final span which will be truncated. spans
durations will be rounded to the nearest roundto
(can be a TimePeriod
subtype or instance, such as Millisecond(100)
, or nothing
) before division into epochs to accommodate minor errors in stage label durations; if roundto=nothing
(the default) no rounding will be performed.
The Vector{UInt8}
of labels that is returned will have length floor(duration(shortest_timespan_containing(spans)), epoch)
The encoding
is used to map the values in stages
to UInt8
s, and should be provided in the form of a Dict{eltype(stages), UInt8}
.
int_encode_labels(; epoch, encoding, roundto)
will return a closure which captures the configuration options.
OndaBatches.floor_containing
— Functionfloor_containing(spans; epoch)
-floor_containing(; epoch)
Compute the shortest timespan containing contiguous spans
, rounded down to the nearest multiple of epoch
.
Note that this function will not check whether spans are contiguous.
The kwarg-only method returns a closure which captures the epoch.
OndaBatches.is_epoch_divisible
— Functionis_epoch_divisible(span::TimeSpan, epoch; roundto=nothing)
Tests whether span
is evenly divisible into contiguous sub-spans of length epoch
, after optionally rounding to roundto
(by default, no rounding is performed).
OndaBatches.check_epoch_divisible
— Functioncheck_epoch_divisible(spans, epoch; roundto=Second)
Throw an ArgumentError
if any of spans
are not evenly divisible into contiguous sub-spans of length epoch
, according to is_epoch_divisible
.
OndaBatches.all_contiguous
— Functionall_contiguous(spans)
Returns true
if all spans
are contiguous. Assumes spans are sorted by start time.
OndaBatches.sample_label_span
— Functionsample_label_span(rng, labels, label_span, labels_weight, batch_duration)
Return a TimeSpan sampled from labels. First, an epoch is sampled according to labels_weight
. Next, the position of this epoch in a window of batch_duration
is sampled with uniform probability, with the constraint that the window must lie completely within labels
.
The returned TimeSpan will have duration equal to batch_duration
and will be relative to the start of the recording. The earliest possible return span starts at start(label_span)
, and the latest possible span stops at stop(label_span)
.
OndaBatches.start_batching
— Functionstart_batching(channel::RemoteChannel, batches, state)
Begin loading batches onto a RemoteChannel
based on batches (e.g., RandomBatches
) and initial state.
This will run an infinite loop which loads one batch at a time with iterate_batch
and materialize_batch
, and put!
s the resulting (x, y)
and state
values into the channel.
Batching continues until the channel is closed or an error is encountered. When the channel is closed, the InvalidStateException
is caught and :closed
is returned from the function. Other errors are rethrown. If somehow the loop is exited without an error (honestly not sure how this would happen), :done
is returned.
This function is intended to used with @async
or remotecall
(e.g., in a Batcher
); the Future
that remotecall
returns can be monitored with get_status
.
Calls to materialize_batch
are wrapped in Base.retry
to add some measure of resiliency to transient network interruptions.
Runs on the batching manager (i.e. Batcher.manager
), but only when Batcher.workers
is empty.
start_batching(channel::RemoteChannel, batches, state, workers)
Start batching loop, utilizing multiple workers to load batches in parallel. This method will yield batches in the same order that start_batching
without workers
will, using a _feed_jobs!
feed batch materialization jobs to an internal channel (maintaining iteration order while distributing work across workers
).
Runs on the batching manager (i.e. Batcher.manager
)
OndaBatches._feed_jobs!
— Function_feed_jobs!(jobs::Channel, batches, state, workers)
Function that iterates batches
starting from state
, creating a BatchJob
to materialize each one using the pool of workers
. Each job holds is put onto the jobs
channel in the order they were iterated, and is a struct with fields
worker
PID of the worker loading this batchbatch_future
aFuture
containing the output ofmaterialize_batch
state
the iteration state after iterating this batchprev_state
the iteration state before iterating this batch (i.e., the input toiterate_batch(batches, state)
required to reproduce this batch
When batch iteration is complete (as indicated by iterate_batch
returning nothing
, a final placeholder job will be placed on the jobs channel, with values of nothing
everywhere except for prev_state
, which is required to support synchronization on the client end (i.e., to confirm that the user really did ask for the final batch with take!
).
Returns nothing
.
Runs on the batching manager (i.e., Batcher.manager
), in an async Task created in start_batching
.
OndaBatches.reset!
— Functionreset!(pool::AbstractWorkerPool)
Restore worker pool to something like the state it would be in after construction, with the channel populated with one instance of each worker managed by the pool.
This has two phases: first, the contents of the channel are cleared out to avoid double-adding workers to the channel. Second, the contents of pool.workers
is sorted, checked against the list of active processes with procs()
, and then live PIDs put!
into the pool one-by-one. Dead workers are removed from the set of workers held by the pool.
For a WorkerPool
, this operation is forwarded to the process holding the original pool (as with put!
, take!
, etc.) so it is safe to call on serialized copies of the pool.
nothing
is returned.
OndaBatches.with_channel
— Functionwith_channel(f, channel; closed_msg="channel close, stopping")
Run f(channel)
, handling channel closure gracefully and closing the channel if an error is caught.
If the channel is closed, the InvalidStateException
is caught, the closed_msg
is logged as @info
, and :closed
is returned.
If any other error occurs, the channel is closed before rethrowing (with a @debug
log message reporting the error + stacktrace).
Otherwise, the return value is f(channel)
.