Skip to content

Commit

Permalink
feat: add into_arrow to IntoCanonicalVTable (#1604)
Browse files Browse the repository at this point in the history
Historically, we've gated the ability to go from Vortex -> Arrow arrays
behind the `Canonical` type, which picks one "blessed" Arrow encoding
for each of our DTypes.

Since the introduction of VarBinView in #1082, we are in a position
where there are now 2 Vortex string encodings that can each be directly
converted to Arrow.

What's more, FSSTArray internally uses a `VarBin` array to encode the
FSST-compressed strings. It delegates in its CompareFn implementation to
running a comparison against the values, which are `VarBin` that will
use the default `compare` codepath which does
`into_canonical()?.into_arrow()?` and then uses the Arrow codec.

This is slow now, because VarBin.into_canonical() will iterate over all
the strings to build a canonical `VarBinView`. This requires a full
decompress which makes the pushdown pointless.

This PR augments the existing `IntoCanonicalVTable` allowing encodings
to implement their own `into_arrow()` method. The default continues to
call `into_canonical().into_arrow()`, but we implement a fast version
for VarBin.
  • Loading branch information
a10y authored Dec 6, 2024
1 parent 6a11488 commit 6e9d779
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 32 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<Ar
let mut batches = vec![];
let mut stream = builder.build().await?;
while let Some(batch) = stream.next().await {
batches.push(batch?.into_canonical()?.into_arrow()?);
batches.push(batch?.into_arrow()?);
}
Ok(batches)
}
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ mod test {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
let vortex_as_arrow = vortex_array.into_canonical().unwrap().into_arrow().unwrap();
let vortex_as_arrow = vortex_array.into_arrow().unwrap();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
}
Expand All @@ -373,7 +373,7 @@ mod test {
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);

let compressed = compressor.compress(&vortex_array).unwrap();
let compressed_as_arrow = compressed.into_canonical().unwrap().into_arrow().unwrap();
let compressed_as_arrow = compressed.into_arrow().unwrap();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
}
Expand Down
8 changes: 2 additions & 6 deletions pyvortex/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ impl PyArray {
if let Ok(chunked_array) = ChunkedArray::try_from(vortex.clone()) {
let chunks: Vec<ArrayRef> = chunked_array
.chunks()
.map(|chunk| -> PyResult<ArrayRef> {
let canonical = chunk.into_canonical()?;
Ok(canonical.into_arrow()?)
})
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_arrow()?) })
.collect::<PyResult<Vec<ArrayRef>>>()?;
if chunks.is_empty() {
return Err(PyValueError::new_err("No chunks in array"));
Expand All @@ -145,8 +142,7 @@ impl PyArray {
} else {
Ok(vortex
.clone()
.into_canonical()
.and_then(|arr| arr.into_arrow())?
.into_arrow()?
.into_data()
.to_pyarrow(py)?
.into_bound(py))
Expand Down
7 changes: 7 additions & 0 deletions vortex-array/src/array/varbin/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow_array::ArrayRef;
use arrow_schema::DataType;
use vortex_dtype::DType;
use vortex_error::VortexResult;
Expand All @@ -21,6 +22,12 @@ impl IntoCanonical for VarBinArray {

VarBinViewArray::try_from(ArrayData::from_arrow(array, nullable)).map(Canonical::VarBinView)
}

fn into_arrow(self) -> VortexResult<ArrayRef> {
// Specialized implementation of `into_arrow` for VarBin since it has a direct
// Arrow representation.
varbin_to_arrow(&self)
}
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/arrow/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ impl TryFrom<ArrayData> for Datum {
.unwrap_or_default()
{
Ok(Self {
array: slice(array, 0, 1)?.into_canonical()?.into_arrow()?,
array: slice(array, 0, 1)?.into_arrow()?,
is_scalar: true,
})
} else {
Ok(Self {
array: array.into_canonical()?.into_arrow()?,
array: array.into_arrow()?,
is_scalar: false,
})
}
Expand Down
28 changes: 17 additions & 11 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Canonical {
// Convert storage array directly into arrow, losing type information
// that will let us round-trip.
// TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167
a.storage().into_canonical()?.into_arrow()?
a.storage().into_arrow()?
}
}
})
Expand Down Expand Up @@ -234,7 +234,7 @@ fn list_to_arrow(list: ListArray) -> VortexResult<ArrayRef> {
list.validity().nullability().into(),
));

let values = list.elements().into_canonical()?.into_arrow()?;
let values = list.elements().into_arrow()?;
let nulls = list.logical_validity().to_null_buffer()?;

Ok(match offsets.ptype() {
Expand Down Expand Up @@ -330,7 +330,7 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult<ArrayRef> {
})
}

/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding.
/// Support trait for transmuting an array into the canonical encoding for its [vortex_dtype::DType].
///
/// This conversion ensures that the array's encoding matches one of the builtin canonical
/// encodings, each of which has a corresponding [Canonical] variant.
Expand All @@ -340,12 +340,21 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult<ArrayRef> {
/// The DType of the array will be unchanged by canonicalization.
pub trait IntoCanonical {
fn into_canonical(self) -> VortexResult<Canonical>;

fn into_arrow(self) -> VortexResult<ArrayRef>
where
Self: Sized,
{
self.into_canonical()?.into_arrow()
}
}

/// Encoding VTable for canonicalizing an array.
#[allow(clippy::wrong_self_convention)]
pub trait IntoCanonicalVTable {
fn into_canonical(&self, array: ArrayData) -> VortexResult<Canonical>;

fn into_arrow(&self, array: ArrayData) -> VortexResult<ArrayRef>;
}

/// Implement the [IntoCanonicalVTable] for all encodings with arrays implementing [IntoCanonical].
Expand All @@ -359,6 +368,10 @@ where
canonical.inherit_statistics(data.statistics());
Ok(canonical)
}

fn into_arrow(&self, array: ArrayData) -> VortexResult<ArrayRef> {
E::Array::try_from(array)?.into_arrow()
}
}

/// Trait for types that can be converted from an owned type into an owned array variant.
Expand Down Expand Up @@ -525,8 +538,6 @@ mod test {
.unwrap();

let arrow_struct = nested_struct_array
.into_canonical()
.unwrap()
.into_arrow()
.unwrap()
.as_any()
Expand Down Expand Up @@ -597,12 +608,7 @@ mod test {

assert_eq!(
&arrow_struct,
vortex_struct
.into_canonical()
.unwrap()
.into_arrow()
.unwrap()
.as_struct()
vortex_struct.into_arrow().unwrap().as_struct()
);
}
}
2 changes: 1 addition & 1 deletion vortex-array/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub fn filter(array: &ArrayData, mask: FilterMask) -> VortexResult<ArrayData> {
array.encoding().id(),
);

let array_ref = array.clone().into_canonical()?.into_arrow()?;
let array_ref = array.clone().into_arrow()?;
let mask_array = BooleanArray::new(mask.to_boolean_buffer()?, None);
let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?;

Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/encoding/opaque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use arrow_array::ArrayRef;
use vortex_error::{vortex_bail, vortex_panic, VortexResult};

use crate::compute::ComputeVTable;
Expand Down Expand Up @@ -47,6 +48,13 @@ impl IntoCanonicalVTable for OpaqueEncoding {
self.0
)
}

fn into_arrow(&self, _array: ArrayData) -> VortexResult<ArrayRef> {
vortex_bail!(
"OpaqueEncoding: into_arrow cannot be called for opaque array ({})",
self.0
)
}
}

impl ComputeVTable for OpaqueEncoding {}
Expand Down
6 changes: 2 additions & 4 deletions vortex-datafusion/src/memory/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl Stream for RowIndicesStream {
.conjunction_expr
.evaluate(vortex_struct.as_ref())
.map_err(|e| DataFusionError::External(e.into()))?
.into_canonical()?
.into_arrow()?;

// Convert the `selection` BooleanArray into a UInt64Array of indices.
Expand Down Expand Up @@ -349,9 +348,8 @@ where
// We should find a way to avoid decoding the filter columns and only decode the other
// columns, then stitch the StructArray back together from those.
let projected_for_output = chunk.project(this.output_projection)?;
let decoded = take(projected_for_output, &row_indices, TakeOptions::default())?
.into_canonical()?
.into_arrow()?;
let decoded =
take(projected_for_output, &row_indices, TakeOptions::default())?.into_arrow()?;

// Send back a single record batch of the decoded data.
let output_batch = RecordBatch::from(decoded.as_struct());
Expand Down
8 changes: 4 additions & 4 deletions vortex-datafusion/src/persistent/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatis
let mut stats = ColumnStatistics::new_unknown();

if let Some(null_count_array) = array.field_by_name(Stat::NullCount.name()) {
let array = null_count_array.into_canonical()?.into_arrow()?;
let array = null_count_array.into_arrow()?;
let array = array.as_primitive::<UInt64Type>();

let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
stats.null_count = Precision::Exact(null_count as usize);
}

if let Some(max_value_array) = array.field_by_name(Stat::Max.name()) {
let array = max_value_array.into_canonical()?.into_arrow()?;
let array = max_value_array.into_arrow()?;
let mut acc = MaxAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

Expand All @@ -31,7 +31,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatis
}

if let Some(min_value_array) = array.field_by_name(Stat::Min.name()) {
let array = min_value_array.into_canonical()?.into_arrow()?;
let array = min_value_array.into_arrow()?;
let mut acc = MinAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

Expand All @@ -46,7 +46,7 @@ pub fn uncompressed_col_size(array: &StructArray) -> VortexResult<Option<u64>> {
match array.field_by_name(Stat::UncompressedSizeInBytes.name()) {
None => Ok(None),
Some(array) => {
let array = array.into_canonical()?.into_arrow()?;
let array = array.into_arrow()?;
let array = array.as_primitive::<UInt64Type>();

let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
Expand Down
2 changes: 1 addition & 1 deletion vortex-ipc/src/stream_writer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ async fn broken_data() {
.collect_chunked()
.await
.unwrap();
let round_tripped = arr.into_canonical().unwrap().into_arrow().unwrap();
let round_tripped = arr.into_arrow().unwrap();
assert_eq!(&arrow_arr, round_tripped.as_primitive::<Int32Type>());
}

0 comments on commit 6e9d779

Please sign in to comment.