Skip to content

Commit

Permalink
Merge branch 'master' into string-view-like
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jun 24, 2024
2 parents f848b63 + 0c3a24d commit bc85501
Show file tree
Hide file tree
Showing 24 changed files with 1,020 additions and 208 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ versions approximately every 2 months.

## Related Projects

There are two related crates in different repositories
There are several related crates in different repositories

| Crate | Description | Documentation |
| -------------- | --------------------------------------- | ----------------------------- |
| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] |
| [`ballista`] | Distributed query execution | [(README)][ballista-readme] |
| Crate | Description | Documentation |
| ------------------------ | ------------------------------------------- | --------------------------------------- |
| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] |
| [`ballista`] | Distributed query execution | [(README)][ballista-readme] |
| [`object_store_opendal`] | Use [`opendal`] as [`object_store`] backend | [(README)][object_store_opendal-readme] |

[`datafusion`]: https://crates.io/crates/datafusion
[`ballista`]: https://crates.io/crates/ballista
[`object_store_opendal`]: https://crates.io/crates/object_store_opendal
[`opendal`]: https://crates.io/crates/opendal
[object_store_opendal-readme]: https://github.com/apache/opendal/blob/main/integrations/object_store/README.md

Collectively, these crates support a wider array of functionality for analytic computations in Rust.

Expand Down
4 changes: 4 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ harness = false
[[bench]]
name = "gc_view_types"
harness = false

[[bench]]
name = "fixed_size_list_array"
harness = false
51 changes: 51 additions & 0 deletions arrow-array/benches/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{Array, FixedSizeListArray, Int32Array};
use arrow_schema::Field;
use criterion::*;
use rand::{thread_rng, Rng};
use std::sync::Arc;

fn gen_fsl(len: usize, value_len: usize) -> FixedSizeListArray {
let mut rng = thread_rng();
let values = Arc::new(Int32Array::from(
(0..len).map(|_| rng.gen::<i32>()).collect::<Vec<_>>(),
));
let field = Arc::new(Field::new("item", values.data_type().clone(), true));
FixedSizeListArray::new(field, value_len as i32, values, None)
}

fn criterion_benchmark(c: &mut Criterion) {
let len = 4096;
for value_len in [1, 32, 1024] {
let fsl = gen_fsl(len, value_len);
c.bench_function(
&format!("fixed_size_list_array(len: {len}, value_len: {value_len})"),
|b| {
b.iter(|| {
for i in 0..len / value_len {
black_box(fsl.value(i));
}
});
},
);
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
54 changes: 51 additions & 3 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
// under the License.

use crate::array::print_long_array;
use crate::builder::GenericByteViewBuilder;
use crate::builder::{ArrayBuilder, GenericByteViewBuilder};
use crate::iterator::ArrayIter;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayAccessor, ArrayRef, Scalar};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use crate::{Array, ArrayAccessor, ArrayRef, GenericByteArray, OffsetSizeTrait, Scalar};
use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use num::ToPrimitive;
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use super::ByteArrayType;

/// [Variable-size Binary View Layout]: An array of variable length bytes view arrays.
///
/// Different than [`crate::GenericByteArray`] as it stores both an offset and length
Expand Down Expand Up @@ -429,6 +432,51 @@ impl<T: ByteViewType + ?Sized> From<ArrayData> for GenericByteViewArray<T> {
}
}

/// Convert a [`GenericByteArray`] to a [`GenericByteViewArray`] but in a smart way:
/// If the offsets are all less than u32::MAX, then we directly build the view array on top of existing buffer.
impl<FROM, V> From<&GenericByteArray<FROM>> for GenericByteViewArray<V>
where
FROM: ByteArrayType,
FROM::Offset: OffsetSizeTrait + ToPrimitive,
V: ByteViewType<Native = FROM::Native>,
{
fn from(byte_array: &GenericByteArray<FROM>) -> Self {
let offsets = byte_array.offsets();

let can_reuse_buffer = match offsets.last() {
Some(offset) => offset.as_usize() < u32::MAX as usize,
None => true,
};

if can_reuse_buffer {
let len = byte_array.len();
let mut views_builder = GenericByteViewBuilder::<V>::with_capacity(len);
let str_values_buf = byte_array.values().clone();
let block = views_builder.append_block(str_values_buf);
for (i, w) in offsets.windows(2).enumerate() {
let offset = w[0].as_usize();
let end = w[1].as_usize();
let length = end - offset;

if byte_array.is_null(i) {
views_builder.append_null();
} else {
// Safety: the input was a valid array so it valid UTF8 (if string). And
// all offsets were valid
unsafe {
views_builder.append_view_unchecked(block, offset as u32, length as u32)
}
}
}
assert_eq!(views_builder.len(), len);
views_builder.finish()
} else {
// TODO: the first u32::MAX can still be reused
GenericByteViewArray::<V>::from_iter(byte_array.iter())
}
}
}

impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
fn from(mut array: GenericByteViewArray<T>) -> Self {
let len = array.len();
Expand Down
8 changes: 4 additions & 4 deletions arrow-array/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ impl FixedSizeListArray {
/// Returns ith value of this list array.
pub fn value(&self, i: usize) -> ArrayRef {
self.values
.slice(self.value_offset(i) as usize, self.value_length() as usize)
.slice(self.value_offset_at(i), self.value_length() as usize)
}

/// Returns the offset for value at index `i`.
///
/// Note this doesn't do any bound checking, for performance reason.
#[inline]
pub fn value_offset(&self, i: usize) -> i32 {
self.value_offset_at(i)
self.value_offset_at(i) as i32
}

/// Returns the length for an element.
Expand All @@ -265,8 +265,8 @@ impl FixedSizeListArray {
}

#[inline]
const fn value_offset_at(&self, i: usize) -> i32 {
i as i32 * self.value_length
const fn value_offset_at(&self, i: usize) -> usize {
i * self.value_length as usize
}

/// Returns a zero-copy slice of this array with the indicated offset and length.
Expand Down
56 changes: 56 additions & 0 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ mod tests_from_ffi {
use std::sync::Arc;

use arrow_buffer::{bit_util, buffer::Buffer, MutableBuffer, OffsetBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field};

Expand All @@ -1234,6 +1235,7 @@ mod tests_from_ffi {
Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array,
},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
make_array, ArrayRef,
};

use super::{ImportedArrowArray, Result};
Expand Down Expand Up @@ -1458,4 +1460,58 @@ mod tests_from_ffi {

test_round_trip(&imported_array.consume()?)
}

fn roundtrip_string_array(array: StringArray) -> StringArray {
let data = array.into_data();

let array = FFI_ArrowArray::new(&data);
let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap();

let array = unsafe { from_ffi(array, &schema) }.unwrap();
StringArray::from(array)
}

fn extend_array(array: &dyn Array) -> ArrayRef {
let len = array.len();
let data = array.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, len);
mutable.extend(0, 0, len);
make_array(mutable.freeze())
}

#[test]
fn test_extend_imported_string_slice() {
let mut strings = vec![];

for i in 0..1000 {
strings.push(format!("string: {}", i));
}

let string_array = StringArray::from(strings);

let imported = roundtrip_string_array(string_array.clone());
assert_eq!(imported.len(), 1000);
assert_eq!(imported.value(0), "string: 0");
assert_eq!(imported.value(499), "string: 499");

let copied = extend_array(&imported);
assert_eq!(
copied.as_any().downcast_ref::<StringArray>().unwrap(),
&imported
);

let slice = string_array.slice(500, 500);

let imported = roundtrip_string_array(slice);
assert_eq!(imported.len(), 500);
assert_eq!(imported.value(0), "string: 500");
assert_eq!(imported.value(499), "string: 999");

let copied = extend_array(&imported);
assert_eq!(
copied.as_any().downcast_ref::<StringArray>().unwrap(),
&imported
);
}
}
13 changes: 13 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ impl Buffer {
}
}

/// Returns the offset, in bytes, of `Self::ptr` to `Self::data`
///
/// self.ptr and self.data can be different after slicing or advancing the buffer.
pub fn ptr_offset(&self) -> usize {
// Safety: `ptr` is always in bounds of `data`.
unsafe { self.ptr.offset_from(self.data.ptr().as_ptr()) as usize }
}

/// Returns the pointer to the start of the buffer without the offset.
pub fn data_ptr(&self) -> NonNull<u8> {
self.data.ptr()
}

/// Create a [`Buffer`] from the provided [`Vec`] without copying
#[inline]
pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
Expand Down
74 changes: 74 additions & 0 deletions arrow-cast/src/cast/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::cast::*;

/// Helper function that takes a map container and casts the inner datatype.
pub(crate) fn cast_map_values(
from: &MapArray,
to_data_type: &DataType,
cast_options: &CastOptions,
to_ordered: bool,
) -> Result<ArrayRef, ArrowError> {
let entries_field = if let DataType::Map(entries_field, _) = to_data_type {
entries_field
} else {
return Err(ArrowError::CastError(
"Internal Error: to_data_type is not a map type.".to_string(),
));
};

let key_field = key_field(entries_field).ok_or(ArrowError::CastError(
"map is missing key field".to_string(),
))?;
let value_field = value_field(entries_field).ok_or(ArrowError::CastError(
"map is missing value field".to_string(),
))?;

let key_array = cast_with_options(from.keys(), key_field.data_type(), cast_options)?;
let value_array = cast_with_options(from.values(), value_field.data_type(), cast_options)?;

Ok(Arc::new(MapArray::new(
entries_field.clone(),
from.offsets().clone(),
StructArray::new(
Fields::from(vec![key_field, value_field]),
vec![key_array, value_array],
from.entries().nulls().cloned(),
),
from.nulls().cloned(),
to_ordered,
)))
}

/// Gets the key field from the entries of a map. For all other types returns None.
pub(crate) fn key_field(entries_field: &FieldRef) -> Option<FieldRef> {
if let DataType::Struct(fields) = entries_field.data_type() {
fields.first().cloned()
} else {
None
}
}

/// Gets the value field from the entries of a map. For all other types returns None.
pub(crate) fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
if let DataType::Struct(fields) = entries_field.data_type() {
fields.get(1).cloned()
} else {
None
}
}
Loading

0 comments on commit bc85501

Please sign in to comment.