Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added suppoer for Extension (logical) type (#359)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 31, 2021
1 parent 145b7c8 commit d2df4a5
Show file tree
Hide file tree
Showing 27 changed files with 478 additions and 334 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ venv/bin/python parquet_integration/write_parquet.py
* `MutableArray` API to work in-memory in-place.
* faster IPC reader (different design that avoids an extra copy of all data)
* IPC supports 2.0 (compression)
* Extension type supported
* All implemented arrow types pass FFI integration tests against pyarrow / C++
* All implemented arrow types pass IPC integration tests against other implementations

Expand All @@ -83,7 +84,7 @@ venv/bin/python parquet_integration/write_parquet.py
## Features in the original not available in this crate

* Parquet read and write of struct and nested lists.
* Map types
* Map type

## Features in this crate not in pyarrow

Expand Down
52 changes: 52 additions & 0 deletions examples/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn main() -> Result<()> {
// declare an extension.
let extension_type =
DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None);

// initialize an array with it.
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());

// from here on, it works as usual
let mut buffer = Cursor::new(vec![]);

// write to IPC
write_ipc(&mut buffer, array)?;

// read it back
let batch = read_ipc(&buffer.into_inner())?;

// and verify that the datatype is preserved.
let array = &batch.columns()[0];
assert_eq!(array.data_type(), &extension_type);

// see https://arrow.apache.org/docs/format/Columnar.html#extension-types
// for consuming by other consumers.
Ok(())
}

fn write_ipc<W: Write + Seek>(writer: &mut W, array: impl Array + 'static) -> Result<()> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let mut writer = write::FileWriter::try_new(writer, &schema)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
let mut reader = Cursor::new(reader);
let metadata = read::read_file_metadata(&mut reader)?;
let mut reader = read::FileReader::new(&mut reader, metadata, None);
reader.next().unwrap()
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Compute](./compute.md)
- [Metadata](./metadata.md)
- [Foreign interfaces](./ffi.md)
- [Extension](./extension.md)
- [IO](./io/README.md)
- [Read CSV](./io/csv_reader.md)
- [Write CSV](./io/csv_write.md)
Expand Down
8 changes: 8 additions & 0 deletions guide/src/extension.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Extension types

This crate supports Arrows' ["extension type"](https://arrow.apache.org/docs/format/Columnar.html#extension-types), to declare, use, and share custom logical types.
The follow example shows how to declare one:

```rust
{{#include ../../../examples/extension.rs}}
```
30 changes: 12 additions & 18 deletions src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,19 @@ pub struct DictionaryArray<K: DictionaryKey> {
impl<K: DictionaryKey> DictionaryArray<K> {
/// Returns a new empty [`DictionaryArray`].
pub fn new_empty(data_type: DataType) -> Self {
if let DataType::Dictionary(_, values) = data_type {
let values = new_empty_array(values.as_ref().clone()).into();
Self::from_data(PrimitiveArray::<K>::new_empty(K::DATA_TYPE), values)
} else {
panic!("DictionaryArray must be initialized with DataType::Dictionary");
}
let values = Self::get_child(&data_type);
let values = new_empty_array(values.clone()).into();
Self::from_data(PrimitiveArray::<K>::new_empty(K::DATA_TYPE), values)
}

/// Returns an [`DictionaryArray`] whose all elements are null
#[inline]
pub fn new_null(data_type: DataType, length: usize) -> Self {
if let DataType::Dictionary(_, values) = data_type {
Self::from_data(
PrimitiveArray::<K>::new_null(K::DATA_TYPE, length),
new_empty_array(values.as_ref().clone()).into(),
)
} else {
panic!("DictionaryArray must be initialized with DataType::Dictionary");
}
let values = Self::get_child(&data_type);
Self::from_data(
PrimitiveArray::<K>::new_null(K::DATA_TYPE, length),
new_empty_array(values.clone()).into(),
)
}

/// The canonical method to create a new [`DictionaryArray`].
Expand Down Expand Up @@ -112,10 +106,10 @@ impl<K: DictionaryKey> DictionaryArray<K> {

impl<K: DictionaryKey> DictionaryArray<K> {
pub(crate) fn get_child(data_type: &DataType) -> &DataType {
if let DataType::Dictionary(_, values) = data_type {
values.as_ref()
} else {
panic!("Wrong DataType")
match data_type {
DataType::Dictionary(_, values) => values.as_ref(),
DataType::Extension(_, inner, _) => Self::get_child(inner),
_ => panic!("DictionaryArray must be initialized with DataType::Dictionary"),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
displays[field](index)
})
}
Extension(_, _, _) => todo!(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl FixedSizeBinaryArray {
pub(crate) fn get_size(data_type: &DataType) -> &i32 {
match data_type {
DataType::FixedSizeBinary(size) => size,
DataType::Extension(_, child, _) => Self::get_size(child),
_ => panic!("Wrong DataType"),
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/array/fixed_size_binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ impl From<MutableFixedSizeBinaryArray> for FixedSizeBinaryArray {
impl MutableFixedSizeBinaryArray {
/// Canonical method to create a new [`MutableFixedSizeBinaryArray`].
pub fn from_data(
size: usize,
data_type: DataType,
values: MutableBuffer<u8>,
validity: Option<MutableBitmap>,
) -> Self {
let size = *FixedSizeBinaryArray::get_size(&data_type) as usize;
assert_eq!(
values.len() % size,
0,
Expand All @@ -52,7 +53,7 @@ impl MutableFixedSizeBinaryArray {
);
}
Self {
data_type: DataType::FixedSizeBinary(size as i32),
data_type,
size,
values,
validity,
Expand All @@ -67,7 +68,7 @@ impl MutableFixedSizeBinaryArray {
/// Creates a new [`MutableFixedSizeBinaryArray`] with capacity for `capacity` entries.
pub fn with_capacity(size: usize, capacity: usize) -> Self {
Self::from_data(
size,
DataType::FixedSizeBinary(size as i32),
MutableBuffer::<u8>::with_capacity(capacity * size),
None,
)
Expand Down
2 changes: 1 addition & 1 deletion src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ impl FixedSizeListArray {
pub(crate) fn get_child_and_size(data_type: &DataType) -> (&Field, &i32) {
match data_type {
DataType::FixedSizeList(child, size) => (child.as_ref(), size),
DataType::Extension(_, child, _) => Self::get_child_and_size(child),
_ => panic!("Wrong DataType"),
}
}

/// Returns a [`DataType`] consistent with this Array.
#[inline]
pub fn default_datatype(data_type: DataType, size: usize) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
DataType::FixedSizeList(field, size as i32)
Expand Down
5 changes: 2 additions & 3 deletions src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl<O: Offset> ListArray<O> {
}

impl<O: Offset> ListArray<O> {
#[inline]
pub fn default_datatype(data_type: DataType) -> DataType {
let field = Box::new(Field::new("item", data_type, true));
if O::is_large() {
Expand All @@ -130,22 +129,22 @@ impl<O: Offset> ListArray<O> {
}
}

#[inline]
pub fn get_child_field(data_type: &DataType) -> &Field {
if O::is_large() {
match data_type {
DataType::LargeList(child) => child.as_ref(),
DataType::Extension(_, child, _) => Self::get_child_field(child),
_ => panic!("Wrong DataType"),
}
} else {
match data_type {
DataType::List(child) => child.as_ref(),
DataType::Extension(_, child, _) => Self::get_child_field(child),
_ => panic!("Wrong DataType"),
}
}
}

#[inline]
pub fn get_child_type(data_type: &DataType) -> &DataType {
Self::get_child_field(data_type).data_type()
}
Expand Down
33 changes: 15 additions & 18 deletions src/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,17 @@ impl StructArray {
values: Vec<Arc<dyn Array>>,
validity: Option<Bitmap>,
) -> Self {
if let DataType::Struct(fields) = &data_type {
assert!(!fields.is_empty());
assert_eq!(fields.len(), values.len());
assert!(values.iter().all(|x| x.len() == values[0].len()));
if let Some(ref validity) = validity {
assert_eq!(values[0].len(), validity.len());
}
Self {
data_type,
values,
validity,
}
} else {
panic!("StructArray must be initialized with DataType::Struct");
let fields = Self::get_fields(&data_type);
assert!(!fields.is_empty());
assert_eq!(fields.len(), values.len());
assert!(values.iter().all(|x| x.len() == values[0].len()));
if let Some(ref validity) = validity {
assert_eq!(values[0].len(), validity.len());
}
Self {
data_type,
values,
validity,
}
}

Expand Down Expand Up @@ -134,10 +131,10 @@ impl StructArray {
impl StructArray {
/// Returns the fields the `DataType::Struct`.
pub fn get_fields(data_type: &DataType) -> &[Field] {
if let DataType::Struct(fields) = data_type {
fields
} else {
panic!("Wrong datatype passed to Struct.")
match data_type {
DataType::Struct(fields) => fields,
DataType::Extension(_, inner, _) => Self::get_fields(inner),
_ => panic!("Wrong datatype passed to Struct."),
}
}
}
Expand Down
71 changes: 35 additions & 36 deletions src/array/union/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,30 @@ impl UnionArray {
fields: Vec<Arc<dyn Array>>,
offsets: Option<Buffer<i32>>,
) -> Self {
let fields_hash = if let DataType::Union(f, ids, is_sparse) = &data_type {
if f.len() != fields.len() {
panic!(
"The number of `fields` must equal the number of fields in the Union DataType"
)
};
let same_data_types = f
.iter()
.zip(fields.iter())
.all(|(f, array)| f.data_type() == array.data_type());
if !same_data_types {
panic!("All fields' datatype in the union must equal the datatypes on the fields.")
}
if offsets.is_none() != *is_sparse {
panic!("Sparsness flag must equal to noness of offsets in UnionArray")
}
ids.as_ref().map(|ids| {
ids.iter()
.map(|x| *x as i8)
.enumerate()
.zip(fields.iter().cloned())
.map(|((i, type_), field)| (type_, (i, field)))
.collect()
})
} else {
panic!("Union struct must be created with the corresponding Union DataType")
let (f, ids, is_sparse) = Self::get_all(&data_type);

if f.len() != fields.len() {
panic!("The number of `fields` must equal the number of fields in the Union DataType")
};
let same_data_types = f
.iter()
.zip(fields.iter())
.all(|(f, array)| f.data_type() == array.data_type());
if !same_data_types {
panic!("All fields' datatype in the union must equal the datatypes on the fields.")
}
if offsets.is_none() != is_sparse {
panic!("Sparsness flag must equal to noness of offsets in UnionArray")
}
let fields_hash = ids.as_ref().map(|ids| {
ids.iter()
.map(|x| *x as i8)
.enumerate()
.zip(fields.iter().cloned())
.map(|((i, type_), field)| (type_, (i, field)))
.collect()
});

// not validated:
// * `offsets` is valid
// * max id < fields.len()
Expand Down Expand Up @@ -218,20 +215,22 @@ impl Array for UnionArray {
}

impl UnionArray {
pub fn get_fields(data_type: &DataType) -> &[Field] {
if let DataType::Union(fields, _, _) = data_type {
fields
} else {
panic!("Wrong datatype passed to UnionArray.")
fn get_all(data_type: &DataType) -> (&[Field], Option<&[i32]>, bool) {
match data_type {
DataType::Union(fields, ids, is_sparse) => {
(fields, ids.as_ref().map(|x| x.as_ref()), *is_sparse)
}
DataType::Extension(_, inner, _) => Self::get_all(inner),
_ => panic!("Wrong datatype passed to UnionArray."),
}
}

pub fn get_fields(data_type: &DataType) -> &[Field] {
Self::get_all(data_type).0
}

pub fn is_sparse(data_type: &DataType) -> bool {
if let DataType::Union(_, _, is_sparse) = data_type {
*is_sparse
} else {
panic!("Wrong datatype passed to UnionArray.")
}
Self::get_all(data_type).2
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::DataType;

/// A logical [`DataType`] and its associated metadata per
/// [Arrow specification](https://arrow.apache.org/docs/cpp/api/datatype.html)
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Field {
/// Its name
pub name: String,
Expand Down Expand Up @@ -255,6 +255,7 @@ impl Field {
| DataType::FixedSizeBinary(_)
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Extension(_, _, _)
| DataType::Decimal(_, _) => {
if self.data_type != from.data_type {
return Err(ArrowError::Schema(
Expand Down
Loading

0 comments on commit d2df4a5

Please sign in to comment.