From a488eeede47518781f284c5ecb2a257fb2e6cf1a Mon Sep 17 00:00:00 2001 From: "Documenter.jl" Date: Mon, 29 Jan 2024 21:24:08 +0000 Subject: [PATCH] build based on d1117d8 --- previews/PR39/index.html | 18 +++++++++--------- previews/PR39/search/index.html | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/previews/PR39/index.html b/previews/PR39/index.html index e0b0cfb..d61bbb2 100644 --- a/previews/PR39/index.html +++ b/previews/PR39/index.html @@ -2,24 +2,24 @@ Documentation · OndaBatches

Watch our JuliaCon2023 talk on OndaBatches.jl! Slides (and source + demo)

Public API

Labeled signals

OndaBatches.LabeledSignalV2Type
@version LabeledSignalV2 > SignalV2 begin
     label_span::TimeSpan
     labels::Union{Samples,SignalV2}
-end

Legolas.jl record type that represents one Onda signal with associated labels. Labels must be dense and contiguous, and are represented as Onda.Samples or an Onda.Signal that refers to Onda.Samples serialized as LPCM. label_span corresponds to the time span (relative to the recording) spanned by the labels.

Note that the signal span and labels' label_span are both relative to the start of the recording.

source
OndaBatches.sub_label_spanFunction
sub_label_span(labeled_signal, new_label_span)

Select a sub-span of labeled signals labeled_signal (with schema "labeled.signal@2"), returning a new labeled signal with updated labels and label_span.

The new_label_span should be relative to the start of the recording (like the signal's span and the current label_span).

source
OndaBatches.label_signalsFunction
label_signals(signals, annotations;
+end

Legolas.jl record type that represents one Onda signal with associated labels. Labels must be dense and contiguous, and are represented as Onda.Samples or an Onda.Signal that refers to Onda.Samples serialized as LPCM. label_span corresponds to the time span (relative to the recording) spanned by the labels.

Note that the signal span and labels' label_span are both relative to the start of the recording.

source
OndaBatches.sub_label_spanFunction
sub_label_span(labeled_signal, new_label_span)

Select a sub-span of labeled signals labeled_signal (with schema "labeled.signal@2"), returning a new labeled signal with updated labels and label_span.

The new_label_span should be relative to the start of the recording (like the signal's span and the current label_span).

source
OndaBatches.label_signalsFunction
label_signals(signals, annotations;
                 groups=:recording,
                 labels_column,
                 epoch,
                 encoding,
-                roundto=nothing)

Create a "labeled signals" table from a signals table and a table of annotations containing labels.

Annotations will be passed to labels_to_samples_table, as well as kwargs. labels_to_samples_table requires these keyword arguments:

  • groups: the column to group over, defaults to :recording.
  • labels_column: the column in the annotations table containing the labels.
  • epoch: the sampling period of the labels.
  • encoding::Dict: the label -> UInt8 mapping to use for encoding the labels.
  • roundto: controls rounding of "shaggy spans", defaults to nothing for no rounding.

Annotations must be

  • contiguous and non-overlapping (withing groups)
  • regularly sampled, with spans an even integer multiple of the epoch kwarg.

Returns a LabeledSignalV2 table (e.g., with schema "labeled.signal@2"), with labels in :labels and the signal spans occupied by these labels in :label_span. Like the signal :span, the :label_span is relative to the start of the recording, not necessarily to the start of the data represented by the signal.

If any label span is not entirely contained within the corresponding signal span, this will throw an ArgumentError.

source
OndaBatches.load_labeled_signalFunction
function load_labeled_signal(labeled_signal, samples_eltype::Type=Float64)

Load signal data as Onda.Samples from a labeled segment of an Onda.SignalV2 (i.e., a LabeledSignalV2 or row with schema "labeled.signal@2"), and returns the portion of the samples data corresponding to labeled_signal.label_span, along with the corresponding labels (as another Onda.Samples object).

If possible, this will only retrieve the bytes corresponding to labeled_signal.label_span.

The eltype of the returned Samples is samples_eltype, which defaults to Float64.

Note

The handling of samples eltype is different than Onda.load, for which the eltype depends on the resolution/offset specified in the samples info: when they are 1/0 respectively, the underlying encoded data is always returned exactly as-is, even if the type differs from the requested eltype. This allows for some optimizations in such cases, but is a potential footgun when a particular eltype is actually required. We work around this inconsistency here by always allocating a new array with the requested eltype to hold the decoded samples.

Returns a samples, labels tuple.

source
OndaBatches.store_labelsFunction
store_labels(labeled_signals, root; format="lpcm")

Store labels to root, replacing the Onda.Samples in the labels column of labeled_signals with Onda.Signals.

source
store_labels(labeled_signal::LabeledSignalV2, root; format="lpcm")

Store a single set of labels to root, replacing the Onda.Samples in the labels column of labeled_signals with Onda.SignalV2s. A single updated LabeledSignalV2 row is returned.

The filepath of the stored labels' Signal is the basename of labeled_signal.file_path with "labels_" prepended.

source

Batch sampling

OndaBatches.BatchItemV2Type
@version BatchItemV2{T} > LabeledSignalV2 begin
+                roundto=nothing)

Create a "labeled signals" table from a signals table and a table of annotations containing labels.

Annotations will be passed to labels_to_samples_table, as well as kwargs. labels_to_samples_table requires these keyword arguments:

  • groups: the column to group over, defaults to :recording.
  • labels_column: the column in the annotations table containing the labels.
  • epoch: the sampling period of the labels.
  • encoding::Dict: the label -> UInt8 mapping to use for encoding the labels.
  • roundto: controls rounding of "shaggy spans", defaults to nothing for no rounding.

Annotations must be

  • contiguous and non-overlapping (withing groups)
  • regularly sampled, with spans an even integer multiple of the epoch kwarg.

Returns a LabeledSignalV2 table (e.g., with schema "labeled.signal@2"), with labels in :labels and the signal spans occupied by these labels in :label_span. Like the signal :span, the :label_span is relative to the start of the recording, not necessarily to the start of the data represented by the signal.

If any label span is not entirely contained within the corresponding signal span, this will throw an ArgumentError.

source
OndaBatches.load_labeled_signalFunction
function load_labeled_signal(labeled_signal, samples_eltype::Type=Float64)

Load signal data as Onda.Samples from a labeled segment of an Onda.SignalV2 (i.e., a LabeledSignalV2 or row with schema "labeled.signal@2"), and returns the portion of the samples data corresponding to labeled_signal.label_span, along with the corresponding labels (as another Onda.Samples object).

If possible, this will only retrieve the bytes corresponding to labeled_signal.label_span.

The eltype of the returned Samples is samples_eltype, which defaults to Float64.

Note

The handling of samples eltype is different than Onda.load, for which the eltype depends on the resolution/offset specified in the samples info: when they are 1/0 respectively, the underlying encoded data is always returned exactly as-is, even if the type differs from the requested eltype. This allows for some optimizations in such cases, but is a potential footgun when a particular eltype is actually required. We work around this inconsistency here by always allocating a new array with the requested eltype to hold the decoded samples.

Returns a samples, labels tuple.

source
OndaBatches.store_labelsFunction
store_labels(labeled_signals, root; format="lpcm")

Store labels to root, replacing the Onda.Samples in the labels column of labeled_signals with Onda.Signals.

source
store_labels(labeled_signal::LabeledSignalV2, root; format="lpcm")

Store a single set of labels to root, replacing the Onda.Samples in the labels column of labeled_signals with Onda.SignalV2s. A single updated LabeledSignalV2 row is returned.

The filepath of the stored labels' Signal is the basename of labeled_signal.file_path with "labels_" prepended.

source

Batch sampling

OndaBatches.BatchItemV2Type
@version BatchItemV2{T} > LabeledSignalV2 begin
     batch_channels::T
-end

Legolas record type representing a single batch item. Fields are inherited from LabeledSignalV2 > SignalV2, and an additional batch_channels field gives a channel selector for this batch. A "channel selector" is anything that can be used as a channel index for Onda.Samples, or missing (in which case, all channels will be used in the order they occur in the Samples).

Columns include:

  • columns from Onda.SignalV2 (everything required to Onda.load the segment)
  • labels and label_span from LabeledSignalV2
  • batch_channels
source
OndaBatches.RandomBatchesType
RandomBatches

An iterator of pseudo-randomly sampled batches derived from a table of densely labeled signals (a labeled.signal@2 table). Batches consist of batch_size "batch items". A single batch item consists of batch_duration * label_sample_rate labels, and batch_duration * signal_sample_rate samples of multichannel data.

Batch items are sampled according to the following procedure:

  1. A single labeled signal is sampled (optionally with weights)
  2. A single label from that signal is sampled (optionally with weights)
  3. One or more channels is selected, optionally randomly.

Each batch item is sampled independently, and in particular different batch items in a given batch can have different channels included (although the same number of them, n_channels).

The functions iterate_batch_item and iterate_batch sample a single batch item and a full batch, respectively.

Fields

  • labeled_signals::DataFrame: the table of labeled signals that batches are sampled from.
  • signal_weights::AbstractWeights: weights for individual signals (unweighted by default). May be nothing duration construction, in which case unit weights are created.
  • label_weights::Vector{AbstractWeights}: weights for individual labels of each labeled signal (unweighted by default). May be nothing during construction, in which case unit weights will be created for each labeled signal.
  • n_channels::Union{Nothing,Int}: the number of channels each batch item should have; this many channels are sampled without replacement, unless n_channels === nothing in which case all channels are included.
  • batch_size::Int: the number of items that make one complete batch
  • batch_duration::TimePeriod: the duration of the window for a single batch.
source
OndaBatches.iterate_batchFunction
iterate_batch(batches::Batches, rng)

Return a "batch listing" that can be materialized into model training/evaluation input.

A batch is a table that has one row per batch item, and follows the "batch-item@2" schema.

This is consumed by a materialize_batch function that can be run on a remote worker, so this sends just the minimum of information necessary to load the batch signal data, the stage labels, and the spans that say how they line up.

source

Batch materialization

OndaBatches.materialize_batch_itemFunction
materialize_batch_item(batch_item, samples_eltype::Type=Float64)

Load the signal data for a single BatchItemV2, selecting only the channels specified in the batch_channels field (using all channels if the field is missing).

Returns a signal_data, label_data tuple, which is the contents of the data field of the signals and labels Samples objects returned by [load_labeled_signal](@ref), after the signals data by batch_channels.

The eltype of signal_data will be samples_eltype; the eltype of label_data is whatever is returned by get_labels.

source
OndaBatches.materialize_batchFunction
materialize_batch(batch, samples_eltype::Type=Float64)

Materialize an entire batch, which is a table of BatchItemV2 rows. Each row is materialized concurrently by materialize_batch_item, and the resulting signals and labels arrays are concatenated on dimension ndims(x) + 1 respectively.

Returns a signal_data, label_data tuple. The dimensionality of these arrays depends on the dimensionality of the results of materialize_batch_item, but will in general be ndims(x) + 1.

The eltype of signal_data will be samples_eltype; the eltype of label_data is whatever is returned by get_labels.

source
OndaBatches.get_channel_dataFunction
get_channel_data(samples, channels)

Get the data associated with the specified channels. Default fallback simply calls samples[channels, :].data. But custom channel selectors can be used to implement more exotic featurization schemes, (see tests for examples).

source

Batching service

OndaBatches.BatcherType

A struct that provides control of batching process on one or more remote workers. This struct keeps track of

  • manager::Int the PID where start_batching will be called.
  • workers an AbstractWorkerPool for the worker process(es).
  • channel::RemoteChannel the channel that batches are loaded into.
  • status::Future the return value of the start_batching function as a Future; see get_status for a convenient accessor.
  • batches the iterator of batches that will be materialized; only requirement is that iterate_batch be defined; see RandomBatches for an example
  • state::Any batcher state (passed to iterate_batch, updated with each new batch that's yielded by the batcher.
  • buffer::Int the size of the batch buffer to keep locally (e.g., the capacity of channel).

Use start! to start the batching service, stop! to stop it, and get_status to check the status.

Once the batcher is started, the sequence of materialized batches (the output of materialize_batch) and corresponding batcher states can be retrieved by take!.

Architecture

A Batcher is meant to run in an architecture where remote workers are created with a Distributed.jl cluster manager. We use the following terminology to describe the roles these different processes play:

  • "Batch worker": one or more processes that are used to actually load batches (via materialize_batch)

  • "Batch manager": the process which coordinates the loading of batches, ensuring consistent iteration order, distributing work to the batch workers, and populating the output channel. start_batching runs on this process.

  • "Client": the process which is consuming batches via take!(::Batcher, state) (which OndaBatches.jl is generally agnostic about and does not manage)

  • "Manager": the process on which the Batcher is initially created, and holds the reference for the worker pool (for multi-worker batching).

Note

We try hard to make Batchers relocatable to other processes (e.g., serializing to the Client after initialization on the Manager). However, since a new RemoteChannel is created each time the batcher is started (including when the desired state does not match the Batcher's current state), some care needs to be taken if it matters where that channel is hosted (although this behavior may change in the future).

Also note that while a running (i.e. start!ed) Batcher can be relocated to another process, the status and channel fields are not guaranteed to stay in sync on the two copies.

source
OndaBatches.BatcherMethod
Batcher([manager::Int,] workers::Vector{Int}, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
-Batcher(manager::Int, workers::AbstractWorkerPool, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)

Construct a new Batcher, using worker IDs, batches, and initial state. The batcher's channel and status will be initialized.

The workers may be specified as an AbstractWorkerPool or a vector of PIDs (in which case a WorkerPool will be constructed).

Warning

If workers are supplied as an AbstractWorkerPool, it is assumed that all workers managed by the pool are available for loading batches. Whenever the batcher is stopped, the worker pool is reset, and all managed workers are returned to the channel of available workers.

See RandomBatches for an example of creation of batches.

The initial state is the state that is used by iterate_batch, e.g., the RNG used by RandomBatches.

If start=true, batching is start!ed. The state keyword argument must be supplied in this case to provide an initial state.

The buffer controls the capacity of the batch channel; a value greater than or equal to the number of workers is recommended so that batch loading workers do not block waiting for batches to be taken off the channel.

source
Base.take!Method
Base.take!(batcher::Batcher, state)

Take one batch + state pair from the batcher, starting at the specified state. If the requested state does not match the batcher's current state, then the batching process will be restarted with the new state. If the batcher is not running (as indicated by get_status), it will be started with start!.

If an error has occurred on any of the batch loading workers, the next call to take! will immediately throw the wrapped RemoteException, even if there are still good batches on the channel.

Returns an (x, y), state tuple, where x is the batch signal data, y is the label data (see materialize_batch), and state is the next batch iterator state.

Runs on the Client.

source
OndaBatches.start!Function
start!(batcher::Batcher, state)

Start the remote process that loads batches into the batcher's channel. A new channel is created since the old one cannot always be re-used.

This invokes start_batching on batcher.manager with remotecall.

The (modified) batcher is returned.

If the batcher is already running (as indicated by get_status == :running), a warning is raised and the batcher is returned.

Runs on the Client.

source
OndaBatches.stop!Function
stop!(batcher::Batcher)

Close batcher.channel to stop the remote batching. This blocks on fetch(batcher.status) to wait for channel closure. If an error is thrown on the remote worker that is not caught, it will be rethrown here.

The batcher is returned.

Runs on the Client.

source
OndaBatches.get_statusFunction
get_status(batcher::Batcher)

Check the status of a remote batcher.

Possible return values are

  • :stopped: the batcher was created but not started
  • :running: the batching loop is still running
  • :closed: the batch channel was closed and the batch loop has terminated
  • :done: the infinite loop in start_batching has terminated without error (not expected)
  • a RemoteException that wraps an error thrown by start_batching on the batch manager (which may further wrap an exception thrown on a batch worker
source

Internal utilities

Warning

None of the following are meant to be called by users, are not part of the API for semantic versioning purposes, and can change at any time.

OndaBatches.labels_to_samples_tableFunction
labels_to_samples_table(labels::AbstractDataFrame; labels_column,
-                        groups=:recording, epoch, kwargs...)

Convert annotations table into a table of labels as Samples. This groups by groups (defaults to :recording), and then applies int_encode_labels to the labels_column and :span columns from each group, and converts the resulting UInt8 labels to Onda.Samples via labels_to_samples. The sampling rate for the resulting labels is 1 / epoch. The samples are returned in the :labels column.

Along with epoch, additional kwargs are forwarded to int_encode_labels:

  • encoding::Dict the label -> UInt8 mapping to use for encoding
  • roundto controls rounding of "shaggy spans" (defaults to nothing for no rounding)

The span corresponding to these labels is determined by floor_containing and returned in the :label_span column.

A DataFrame is returned with the :labels and :label_span per group, as well as the groups variables.

source
OndaBatches.labels_to_samplesFunction
labels_to_samples(labels::AbstractVector{UInt8}; epoch)
+end

Legolas record type representing a single batch item. Fields are inherited from LabeledSignalV2 > SignalV2, and an additional batch_channels field gives a channel selector for this batch. A "channel selector" is anything that can be used as a channel index for Onda.Samples, or missing (in which case, all channels will be used in the order they occur in the Samples).

Columns include:

  • columns from Onda.SignalV2 (everything required to Onda.load the segment)
  • labels and label_span from LabeledSignalV2
  • batch_channels
source
OndaBatches.RandomBatchesType
RandomBatches

An iterator of pseudo-randomly sampled batches derived from a table of densely labeled signals (a labeled.signal@2 table). Batches consist of batch_size "batch items". A single batch item consists of batch_duration * label_sample_rate labels, and batch_duration * signal_sample_rate samples of multichannel data.

Batch items are sampled according to the following procedure:

  1. A single labeled signal is sampled (optionally with weights)
  2. A single label from that signal is sampled (optionally with weights)
  3. One or more channels is selected, optionally randomly.

Each batch item is sampled independently, and in particular different batch items in a given batch can have different channels included (although the same number of them, n_channels).

The functions iterate_batch_item and iterate_batch sample a single batch item and a full batch, respectively.

Fields

  • labeled_signals::DataFrame: the table of labeled signals that batches are sampled from.
  • signal_weights::AbstractWeights: weights for individual signals (unweighted by default). May be nothing duration construction, in which case unit weights are created.
  • label_weights::Vector{AbstractWeights}: weights for individual labels of each labeled signal (unweighted by default). May be nothing during construction, in which case unit weights will be created for each labeled signal.
  • n_channels::Union{Nothing,Int}: the number of channels each batch item should have; this many channels are sampled without replacement, unless n_channels === nothing in which case all channels are included.
  • batch_size::Int: the number of items that make one complete batch
  • batch_duration::TimePeriod: the duration of the window for a single batch.
source
OndaBatches.iterate_batchFunction
iterate_batch(batches::Batches, rng)

Return a "batch listing" that can be materialized into model training/evaluation input.

A batch is a table that has one row per batch item, and follows the "batch-item@2" schema.

This is consumed by a materialize_batch function that can be run on a remote worker, so this sends just the minimum of information necessary to load the batch signal data, the stage labels, and the spans that say how they line up.

source

Batch materialization

OndaBatches.materialize_batch_itemFunction
materialize_batch_item(batch_item, samples_eltype::Type=Float64)

Load the signal data for a single BatchItemV2, selecting only the channels specified in the batch_channels field (using all channels if the field is missing).

Returns a signal_data, label_data tuple, which is the contents of the data field of the signals and labels Samples objects returned by [load_labeled_signal](@ref), after the signals data by batch_channels.

The eltype of signal_data will be samples_eltype; the eltype of label_data is whatever is returned by get_labels.

source
OndaBatches.materialize_batchFunction
materialize_batch(batch, samples_eltype::Type=Float64)

Materialize an entire batch, which is a table of BatchItemV2 rows. Each row is materialized concurrently by materialize_batch_item, and the resulting signals and labels arrays are concatenated on dimension ndims(x) + 1 respectively.

Returns a signal_data, label_data tuple. The dimensionality of these arrays depends on the dimensionality of the results of materialize_batch_item, but will in general be ndims(x) + 1.

The eltype of signal_data will be samples_eltype; the eltype of label_data is whatever is returned by get_labels.

source
OndaBatches.get_channel_dataFunction
get_channel_data(samples, channels)

Get the data associated with the specified channels. Default fallback simply calls samples[channels, :].data. But custom channel selectors can be used to implement more exotic featurization schemes, (see tests for examples).

source

Batching service

OndaBatches.BatcherType

A struct that provides control of batching process on one or more remote workers. This struct keeps track of

  • manager::Int the PID where start_batching will be called.
  • workers an AbstractWorkerPool for the worker process(es).
  • channel::RemoteChannel the channel that batches are loaded into.
  • status::Future the return value of the start_batching function as a Future; see get_status for a convenient accessor.
  • batches the iterator of batches that will be materialized; only requirement is that iterate_batch be defined; see RandomBatches for an example
  • state::Any batcher state (passed to iterate_batch, updated with each new batch that's yielded by the batcher.
  • buffer::Int the size of the batch buffer to keep locally (e.g., the capacity of channel).

Use start! to start the batching service, stop! to stop it, and get_status to check the status.

Once the batcher is started, the sequence of materialized batches (the output of materialize_batch) and corresponding batcher states can be retrieved by take!.

Architecture

A Batcher is meant to run in an architecture where remote workers are created with a Distributed.jl cluster manager. We use the following terminology to describe the roles these different processes play:

  • "Batch worker": one or more processes that are used to actually load batches (via materialize_batch)

  • "Batch manager": the process which coordinates the loading of batches, ensuring consistent iteration order, distributing work to the batch workers, and populating the output channel. start_batching runs on this process.

  • "Client": the process which is consuming batches via take!(::Batcher, state) (which OndaBatches.jl is generally agnostic about and does not manage)

  • "Manager": the process on which the Batcher is initially created, and holds the reference for the worker pool (for multi-worker batching).

Note

We try hard to make Batchers relocatable to other processes (e.g., serializing to the Client after initialization on the Manager). However, since a new RemoteChannel is created each time the batcher is started (including when the desired state does not match the Batcher's current state), some care needs to be taken if it matters where that channel is hosted (although this behavior may change in the future).

Also note that while a running (i.e. start!ed) Batcher can be relocated to another process, the status and channel fields are not guaranteed to stay in sync on the two copies.

source
OndaBatches.BatcherMethod
Batcher([manager::Int,] workers::Vector{Int}, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)
+Batcher(manager::Int, workers::AbstractWorkerPool, batches; start=true, state=nothing, buffer=2 * length(workers) + 1)

Construct a new Batcher, using worker IDs, batches, and initial state. The batcher's channel and status will be initialized.

The workers may be specified as an AbstractWorkerPool or a vector of PIDs (in which case a WorkerPool will be constructed).

Warning

If workers are supplied as an AbstractWorkerPool, it is assumed that all workers managed by the pool are available for loading batches. Whenever the batcher is stopped, the worker pool is reset, and all managed workers are returned to the channel of available workers.

See RandomBatches for an example of creation of batches.

The initial state is the state that is used by iterate_batch, e.g., the RNG used by RandomBatches.

If start=true, batching is start!ed. The state keyword argument must be supplied in this case to provide an initial state.

The buffer controls the capacity of the batch channel; a value greater than or equal to the number of workers is recommended so that batch loading workers do not block waiting for batches to be taken off the channel.

source
Base.take!Method
Base.take!(batcher::Batcher, state)

Take one batch + state pair from the batcher, starting at the specified state. If the requested state does not match the batcher's current state, then the batching process will be restarted with the new state. If the batcher is not running (as indicated by get_status), it will be started with start!.

If an error has occurred on any of the batch loading workers, the next call to take! will immediately throw the wrapped RemoteException, even if there are still good batches on the channel.

Returns an (x, y), state tuple, where x is the batch signal data, y is the label data (see materialize_batch), and state is the next batch iterator state.

Runs on the Client.

source
OndaBatches.start!Function
start!(batcher::Batcher, state)

Start the remote process that loads batches into the batcher's channel. A new channel is created since the old one cannot always be re-used.

This invokes start_batching on batcher.manager with remotecall.

The (modified) batcher is returned.

If the batcher is already running (as indicated by get_status == :running), a warning is raised and the batcher is returned.

Runs on the Client.

source
OndaBatches.stop!Function
stop!(batcher::Batcher)

Close batcher.channel to stop the remote batching. This blocks on fetch(batcher.status) to wait for channel closure. If an error is thrown on the remote worker that is not caught, it will be rethrown here.

The batcher is returned.

Runs on the Client.

source
OndaBatches.get_statusFunction
get_status(batcher::Batcher)

Check the status of a remote batcher.

Possible return values are

  • :stopped: the batcher was created but not started
  • :running: the batching loop is still running
  • :closed: the batch channel was closed and the batch loop has terminated
  • :done: the infinite loop in start_batching has terminated without error (not expected)
  • a RemoteException that wraps an error thrown by start_batching on the batch manager (which may further wrap an exception thrown on a batch worker
source

Internal utilities

Warning

None of the following are meant to be called by users, are not part of the API for semantic versioning purposes, and can change at any time.

OndaBatches.labels_to_samples_tableFunction
labels_to_samples_table(labels::AbstractDataFrame; labels_column,
+                        groups=:recording, epoch, kwargs...)

Convert annotations table into a table of labels as Samples. This groups by groups (defaults to :recording), and then applies int_encode_labels to the labels_column and :span columns from each group, and converts the resulting UInt8 labels to Onda.Samples via labels_to_samples. The sampling rate for the resulting labels is 1 / epoch. The samples are returned in the :labels column.

Along with epoch, additional kwargs are forwarded to int_encode_labels:

  • encoding::Dict the label -> UInt8 mapping to use for encoding
  • roundto controls rounding of "shaggy spans" (defaults to nothing for no rounding)

The span corresponding to these labels is determined by floor_containing and returned in the :label_span column.

A DataFrame is returned with the :labels and :label_span per group, as well as the groups variables.

source
OndaBatches.labels_to_samplesFunction
labels_to_samples(labels::AbstractVector{UInt8}; epoch)
 labels_to_samples(; epoch)

Convert a vector of UInt8 stage labels sampled evenly at intervals of epoch into Onda.Samples with samples rate of 1/epoch.

The kwarg only form returns a closure that captures the epoch.

The returned samples have samples info:

SamplesInfoV2(; sensor_type="label",
               channels=["label"],
               sample_unit="label",
               sample_resolution_in_unit=1,
               sample_offset_in_unit=0,
               sample_type=UInt8,
-              sample_rate=Second(1) / epoch)
source
OndaBatches.get_labelsFunction
get_labels(labels::Samples, span)
-get_labels(labels::SignalV2, span)

Return labels as Samples, deserializing with Onda.load if necessary. span is the span relative to the start of the recording that should be loaded.

This function is meant for internal use only; users should instead use load_labeled_signal and sub_label_span.

source
OndaBatches.get_labelsFunction
get_labels(labels::Samples, span)
+get_labels(labels::SignalV2, span)

Return labels as Samples, deserializing with Onda.load if necessary. span is the span relative to the start of the recording that should be loaded.

This function is meant for internal use only; users should instead use load_labeled_signal and sub_label_span.

source
OndaBatches.int_encode_labelsFunction
int_encode_labels(stages, spans; epoch, encoding::Dict,
                   roundto=nothing)
-int_encode_labels(; epoch, encoding, roundto)

Return a Vector{UInt8} of stage labels, using encoding to look up each stage label in stages, sampled evenly at intervals of epoch. spans are expanded into non-overlapping, contiguous spans of duration epoch; spans must be contiguous and with durations evenly divisible by epoch, except for the final span which will be truncated. spans durations will be rounded to the nearest roundto (can be a TimePeriod subtype or instance, such as Millisecond(100), or nothing) before division into epochs to accommodate minor errors in stage label durations; if roundto=nothing (the default) no rounding will be performed.

The Vector{UInt8} of labels that is returned will have length floor(duration(shortest_timespan_containing(spans)), epoch)

The encoding is used to map the values in stages to UInt8s, and should be provided in the form of a Dict{eltype(stages), UInt8}.

int_encode_labels(; epoch, encoding, roundto) will return a closure which captures the configuration options.

source
OndaBatches.floor_containingFunction
floor_containing(spans; epoch)
-floor_containing(; epoch)

Compute the shortest timespan containing contiguous spans, rounded down to the nearest multiple of epoch.

Note that this function will not check whether spans are contiguous.

The kwarg-only method returns a closure which captures the epoch.

source
OndaBatches.is_epoch_divisibleFunction
is_epoch_divisible(span::TimeSpan, epoch; roundto=nothing)

Tests whether span is evenly divisible into contiguous sub-spans of length epoch, after optionally rounding to roundto (by default, no rounding is performed).

source
OndaBatches.sample_label_spanFunction
sample_label_span(rng, labels, label_span, labels_weight, batch_duration)

Return a TimeSpan sampled from labels. First, an epoch is sampled according to labels_weight. Next, the position of this epoch in a window of batch_duration is sampled with uniform probability, with the constraint that the window must lie completely within labels.

The returned TimeSpan will have duration equal to batch_duration and will be relative to the start of the recording. The earliest possible return span starts at start(label_span), and the latest possible span stops at stop(label_span).

source
OndaBatches.start_batchingFunction
start_batching(channel::RemoteChannel, batches, state)

Begin loading batches onto a RemoteChannel based on batches (e.g., RandomBatches) and initial state.

This will run an infinite loop which loads one batch at a time with iterate_batch and materialize_batch, and put!s the resulting (x, y) and state values into the channel.

Batching continues until the channel is closed or an error is encountered. When the channel is closed, the InvalidStateException is caught and :closed is returned from the function. Other errors are rethrown. If somehow the loop is exited without an error (honestly not sure how this would happen), :done is returned.

This function is intended to used with @async or remotecall (e.g., in a Batcher); the Future that remotecall returns can be monitored with get_status.

Calls to materialize_batch are wrapped in Base.retry to add some measure of resiliency to transient network interruptions.

Runs on the batching manager (i.e. Batcher.manager), but only when Batcher.workers is empty.

source
start_batching(channel::RemoteChannel, batches, state, workers)

Start batching loop, utilizing multiple workers to load batches in parallel. This method will yield batches in the same order that start_batching without workers will, using a _feed_jobs! feed batch materialization jobs to an internal channel (maintaining iteration order while distributing work across workers).

Runs on the batching manager (i.e. Batcher.manager)

source
OndaBatches._feed_jobs!Function
_feed_jobs!(jobs::Channel, batches, state, workers)

Function that iterates batches starting from state, creating a BatchJob to materialize each one using the pool of workers. Each job holds is put onto the jobs channel in the order they were iterated, and is a struct with fields

  • worker PID of the worker loading this batch
  • batch_future a Future containing the output of materialize_batch
  • state the iteration state after iterating this batch
  • prev_state the iteration state before iterating this batch (i.e., the input to iterate_batch(batches, state) required to reproduce this batch

When batch iteration is complete (as indicated by iterate_batch returning nothing, a final placeholder job will be placed on the jobs channel, with values of nothing everywhere except for prev_state, which is required to support synchronization on the client end (i.e., to confirm that the user really did ask for the final batch with take!).

Returns nothing.

Runs on the batching manager (i.e., Batcher.manager), in an async Task created in start_batching.

source
OndaBatches.reset!Function
reset!(pool::AbstractWorkerPool)

Restore worker pool to something like the state it would be in after construction, with the channel populated with one instance of each worker managed by the pool.

This has two phases: first, the contents of the channel are cleared out to avoid double-adding workers to the channel. Second, the contents of pool.workers is sorted, checked against the list of active processes with procs(), and then live PIDs put! into the pool one-by-one. Dead workers are removed from the set of workers held by the pool.

For a WorkerPool, this operation is forwarded to the process holding the original pool (as with put!, take!, etc.) so it is safe to call on serialized copies of the pool.

nothing is returned.

source
OndaBatches.with_channelFunction
with_channel(f, channel; closed_msg="channel close, stopping")

Run f(channel), handling channel closure gracefully and closing the channel if an error is caught.

If the channel is closed, the InvalidStateException is caught, the closed_msg is logged as @info, and :closed is returned.

If any other error occurs, the channel is closed before rethrowing (with a @debug log message reporting the error + stacktrace).

Otherwise, the return value is f(channel).

source
+int_encode_labels(; epoch, encoding, roundto)

Return a Vector{UInt8} of stage labels, using encoding to look up each stage label in stages, sampled evenly at intervals of epoch. spans are expanded into non-overlapping, contiguous spans of duration epoch; spans must be contiguous and with durations evenly divisible by epoch, except for the final span which will be truncated. spans durations will be rounded to the nearest roundto (can be a TimePeriod subtype or instance, such as Millisecond(100), or nothing) before division into epochs to accommodate minor errors in stage label durations; if roundto=nothing (the default) no rounding will be performed.

The Vector{UInt8} of labels that is returned will have length floor(duration(shortest_timespan_containing(spans)), epoch)

The encoding is used to map the values in stages to UInt8s, and should be provided in the form of a Dict{eltype(stages), UInt8}.

int_encode_labels(; epoch, encoding, roundto) will return a closure which captures the configuration options.

source
OndaBatches.floor_containingFunction
floor_containing(spans; epoch)
+floor_containing(; epoch)

Compute the shortest timespan containing contiguous spans, rounded down to the nearest multiple of epoch.

Note that this function will not check whether spans are contiguous.

The kwarg-only method returns a closure which captures the epoch.

source
OndaBatches.is_epoch_divisibleFunction
is_epoch_divisible(span::TimeSpan, epoch; roundto=nothing)

Tests whether span is evenly divisible into contiguous sub-spans of length epoch, after optionally rounding to roundto (by default, no rounding is performed).

source
OndaBatches.check_epoch_divisibleFunction
check_epoch_divisible(spans, epoch; roundto=Second)

Throw an ArgumentError if any of spans are not evenly divisible into contiguous sub-spans of length epoch, according to is_epoch_divisible.

source
OndaBatches.all_contiguousFunction
all_contiguous(spans)

Returns true if all spans are contiguous. Assumes spans are sorted by start time.

source
OndaBatches.sample_label_spanFunction
sample_label_span(rng, labels, label_span, labels_weight, batch_duration)

Return a TimeSpan sampled from labels. First, an epoch is sampled according to labels_weight. Next, the position of this epoch in a window of batch_duration is sampled with uniform probability, with the constraint that the window must lie completely within labels.

The returned TimeSpan will have duration equal to batch_duration and will be relative to the start of the recording. The earliest possible return span starts at start(label_span), and the latest possible span stops at stop(label_span).

source
OndaBatches.start_batchingFunction
start_batching(channel::RemoteChannel, batches, state)

Begin loading batches onto a RemoteChannel based on batches (e.g., RandomBatches) and initial state.

This will run an infinite loop which loads one batch at a time with iterate_batch and materialize_batch, and put!s the resulting (x, y) and state values into the channel.

Batching continues until the channel is closed or an error is encountered. When the channel is closed, the InvalidStateException is caught and :closed is returned from the function. Other errors are rethrown. If somehow the loop is exited without an error (honestly not sure how this would happen), :done is returned.

This function is intended to used with @async or remotecall (e.g., in a Batcher); the Future that remotecall returns can be monitored with get_status.

Calls to materialize_batch are wrapped in Base.retry to add some measure of resiliency to transient network interruptions.

Runs on the batching manager (i.e. Batcher.manager), but only when Batcher.workers is empty.

source
start_batching(channel::RemoteChannel, batches, state, workers)

Start batching loop, utilizing multiple workers to load batches in parallel. This method will yield batches in the same order that start_batching without workers will, using a _feed_jobs! feed batch materialization jobs to an internal channel (maintaining iteration order while distributing work across workers).

Runs on the batching manager (i.e. Batcher.manager)

source
OndaBatches._feed_jobs!Function
_feed_jobs!(jobs::Channel, batches, state, workers)

Function that iterates batches starting from state, creating a BatchJob to materialize each one using the pool of workers. Each job holds is put onto the jobs channel in the order they were iterated, and is a struct with fields

  • worker PID of the worker loading this batch
  • batch_future a Future containing the output of materialize_batch
  • state the iteration state after iterating this batch
  • prev_state the iteration state before iterating this batch (i.e., the input to iterate_batch(batches, state) required to reproduce this batch

When batch iteration is complete (as indicated by iterate_batch returning nothing, a final placeholder job will be placed on the jobs channel, with values of nothing everywhere except for prev_state, which is required to support synchronization on the client end (i.e., to confirm that the user really did ask for the final batch with take!).

Returns nothing.

Runs on the batching manager (i.e., Batcher.manager), in an async Task created in start_batching.

source
OndaBatches.reset!Function
reset!(pool::AbstractWorkerPool)

Restore worker pool to something like the state it would be in after construction, with the channel populated with one instance of each worker managed by the pool.

This has two phases: first, the contents of the channel are cleared out to avoid double-adding workers to the channel. Second, the contents of pool.workers is sorted, checked against the list of active processes with procs(), and then live PIDs put! into the pool one-by-one. Dead workers are removed from the set of workers held by the pool.

For a WorkerPool, this operation is forwarded to the process holding the original pool (as with put!, take!, etc.) so it is safe to call on serialized copies of the pool.

nothing is returned.

source
OndaBatches.with_channelFunction
with_channel(f, channel; closed_msg="channel close, stopping")

Run f(channel), handling channel closure gracefully and closing the channel if an error is caught.

If the channel is closed, the InvalidStateException is caught, the closed_msg is logged as @info, and :closed is returned.

If any other error occurs, the channel is closed before rethrowing (with a @debug log message reporting the error + stacktrace).

Otherwise, the return value is f(channel).

source
diff --git a/previews/PR39/search/index.html b/previews/PR39/search/index.html index 56f9b63..a05ee99 100644 --- a/previews/PR39/search/index.html +++ b/previews/PR39/search/index.html @@ -1,2 +1,2 @@ -Search · OndaBatches

Loading search...

    +Search · OndaBatches

    Loading search...