Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler FFI
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 25, 2022
1 parent 88f05bb commit 69a9f9b
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 123 deletions.
9 changes: 4 additions & 5 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
let py_array = to_py_array(array.map_err(PyO3Error::from)?, py)?;
arrays.push(py_array)
}
Ok(arrays)
Expand All @@ -38,24 +38,23 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
None,
)
.boxed();

// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;
let iter = Box::new(arrays.into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };
let stream = Box::new(ffi::export_iterator(iter, field));

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
36 changes: 12 additions & 24 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,21 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Box<dyn Array>> {

let field = unsafe { ffi::import_field_from_c(schema.as_ref()).map_err(PyO3Error::from)? };
let array =
unsafe { ffi::import_array_from_c(array, field.data_type).map_err(PyO3Error::from)? };
unsafe { ffi::import_array_from_c(*array, field.data_type).map_err(PyO3Error::from)? };

Ok(array.into())
Ok(array)
}

fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema = Box::new(ffi::export_field_to_c(&Field::new(
"",
array.data_type().clone(),
true,
)));
let array = Box::new(ffi::export_array_to_c(array));

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
ffi::export_field_to_c(&Field::new("", array.data_type().clone(), true), schema_ptr);
ffi::export_array_to_c(array, array_ptr);
};
let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema;
let array_ptr: *const arrow2::ffi::ArrowArray = &*array;

let pa = py.import("pyarrow")?;

Expand All @@ -91,11 +90,6 @@ fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

unsafe {
Box::from_raw(array_ptr);
Box::from_raw(schema_ptr);
};

Ok(array.to_object(py))
}

Expand All @@ -115,21 +109,15 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
}

fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
ffi::export_field_to_c(field, schema_ptr);
};
let schema = Box::new(ffi::export_field_to_c(field));
let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema;

let pa = py.import("pyarrow")?;

let array = pa
.getattr("Field")?
.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;

unsafe { Box::from_raw(schema_ptr) };

Ok(array.to_object(py))
}

Expand Down
34 changes: 15 additions & 19 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::ffi;

unsafe fn export(
array: Box<dyn Array>,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
// exporting an array requires an associated field so that the consumer knows its datatype
fn export(array: Box<dyn Array>) -> (ffi::ArrowArray, ffi::ArrowSchema) {
// importing an array requires an associated field so that the consumer knows its datatype.
// Thus, we need to export both
let field = Field::new("a", array.data_type().clone(), true);
ffi::export_array_to_c(array, array_ptr);
ffi::export_field_to_c(&field, schema_ptr);
(
ffi::export_array_to_c(array),
ffi::export_field_to_c(&field),
)
}

unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
/// # Safety
/// `ArrowArray` and `ArrowSchema` must be valid
unsafe fn import(array: ffi::ArrowArray, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
let field = ffi::import_field_from_c(schema)?;
ffi::import_array_from_c(array, field.data_type)
}
Expand All @@ -23,19 +24,14 @@ fn main() -> Result<()> {
// let's assume that we have an array:
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).boxed();

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
let mut array_ptr = Box::new(ffi::ArrowArray::empty());
let mut schema_ptr = Box::new(ffi::ArrowSchema::empty());
// here we export - `array_ffi` and `schema_ffi` are the structs of the C data interface
let (array_ffi, schema_ffi) = export(array.clone());

// this is where a producer (in this case also us ^_^) writes to the pointers' location.
// `array` here could be anything or not even be available, if this was e.g. from Python.
// Safety: we just allocated the pointers
unsafe { export(array.clone(), &mut *array_ptr, &mut *schema_ptr) };
// here we import them. Often the structs are wrapped in a pointer. In that case you
// need to read the pointer to the stack.

// and finally interpret the written memory into a new array.
// Safety: we used `export`, which is a valid exporter to the C data interface
let new_array = unsafe { import(array_ptr, schema_ptr.as_ref())? };
let new_array = unsafe { import(array_ffi, &schema_ffi)? };

// which is equal to the exported array
assert_eq!(array.as_ref(), new_array.as_ref());
Expand Down
6 changes: 3 additions & 3 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::panic::RefUnwindSafe;
use std::{ptr::NonNull, sync::Arc};
use std::ptr::NonNull;

/// Mode of deallocating memory regions
enum Allocation {
/// Native allocation
Native,
// A foreign allocator and its ref count
Foreign(Arc<dyn RefUnwindSafe + Send + Sync>),
Foreign(Box<dyn RefUnwindSafe + Send + Sync>),
}

/// A continuous memory region that may be allocated externally.
Expand All @@ -35,7 +35,7 @@ impl<T> Bytes<T> {
pub unsafe fn from_owned(
ptr: std::ptr::NonNull<T>,
len: usize,
owner: Arc<dyn RefUnwindSafe + Send + Sync>,
owner: Box<dyn RefUnwindSafe + Send + Sync>,
) -> Self {
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
Expand Down
42 changes: 24 additions & 18 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Contains functionality to load an ArrayData from the C Data Interface
use std::{ptr::NonNull, sync::Arc};
use std::ptr::NonNull;
use std::sync::Arc;

use crate::{
array::*,
Expand Down Expand Up @@ -178,7 +179,7 @@ impl ArrowArray {
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
owner: Arc<InternalArrowArray>,
owner: Box<InternalArrowArray>,
index: usize,
) -> Result<Buffer<T>> {
if array.buffers.is_null() {
Expand Down Expand Up @@ -209,7 +210,7 @@ unsafe fn create_buffer<T: NativeType>(
/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer
unsafe fn create_bitmap(
array: &ArrowArray,
owner: Arc<InternalArrowArray>,
owner: Box<InternalArrowArray>,
index: usize,
) -> Result<Bitmap> {
if array.buffers.is_null() {
Expand Down Expand Up @@ -310,7 +311,7 @@ fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result<usiz
fn create_child(
array: &ArrowArray,
field: &DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
index: usize,
) -> Result<ArrowArrayChild<'static>> {
let data_type = get_child(field, index)?;
Expand All @@ -328,7 +329,7 @@ fn create_child(
fn create_dictionary(
array: &ArrowArray,
data_type: &DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
) -> Result<Option<ArrowArrayChild<'static>>> {
if let DataType::Dictionary(_, values, _) = data_type {
let data_type = values.as_ref().clone();
Expand All @@ -341,8 +342,8 @@ fn create_dictionary(
}

pub trait ArrowArrayRef: std::fmt::Debug {
fn owner(&self) -> Arc<InternalArrowArray> {
self.parent().clone()
fn owner(&self) -> Box<InternalArrowArray> {
(*self.parent()).clone()
}

/// returns the null bit buffer.
Expand Down Expand Up @@ -385,7 +386,7 @@ pub trait ArrowArrayRef: std::fmt::Debug {

fn n_buffers(&self) -> usize;

fn parent(&self) -> &Arc<InternalArrowArray>;
fn parent(&self) -> &Box<InternalArrowArray>;
fn array(&self) -> &ArrowArray;
fn data_type(&self) -> &DataType;
}
Expand All @@ -409,25 +410,30 @@ pub trait ArrowArrayRef: std::fmt::Debug {
/// calling [ArrowArray::release] and [ArrowSchema::release] accordingly.
///
/// Furthermore, this struct assumes that the incoming data agrees with the C data interface.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InternalArrowArray {
array: Box<ArrowArray>,
data_type: DataType,
// Arc is used for sharability since this is immutable
array: Arc<ArrowArray>,
// Arced to reduce cost of cloning
data_type: Arc<DataType>,
}

impl InternalArrowArray {
pub fn new(array: Box<ArrowArray>, data_type: DataType) -> Self {
Self { array, data_type }
pub fn new(array: ArrowArray, data_type: DataType) -> Self {
Self {
array: Arc::new(array),
data_type: Arc::new(data_type),
}
}
}

impl ArrowArrayRef for Arc<InternalArrowArray> {
impl ArrowArrayRef for Box<InternalArrowArray> {
/// the data_type as declared in the schema
fn data_type(&self) -> &DataType {
&self.data_type
}

fn parent(&self) -> &Arc<InternalArrowArray> {
fn parent(&self) -> &Box<InternalArrowArray> {
self
}

Expand All @@ -444,7 +450,7 @@ impl ArrowArrayRef for Arc<InternalArrowArray> {
pub struct ArrowArrayChild<'a> {
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
}

impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
Expand All @@ -453,7 +459,7 @@ impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
&self.data_type
}

fn parent(&self) -> &Arc<InternalArrowArray> {
fn parent(&self) -> &Box<InternalArrowArray> {
&self.parent
}

Expand All @@ -470,7 +476,7 @@ impl<'a> ArrowArrayChild<'a> {
fn from_raw(
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
) -> Self {
Self {
array,
Expand Down
24 changes: 8 additions & 16 deletions src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ mod stream;
pub(crate) use array::try_from;
pub(crate) use array::{ArrowArrayRef, InternalArrowArray};

use std::sync::Arc;

use crate::array::Array;
use crate::datatypes::{DataType, Field};
use crate::error::Result;
Expand All @@ -21,36 +19,30 @@ pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema};
pub use stream::{export_iterator, ArrowArrayStreamReader};

/// Exports an [`Box<dyn Array>`] to the C data interface.
/// # Safety
/// The pointer `ptr` must be allocated and valid
pub unsafe fn export_array_to_c(array: Box<dyn Array>, ptr: *mut ArrowArray) {
let array = bridge::align_to_c_data_interface(array);

std::ptr::write_unaligned(ptr, ArrowArray::new(array));
pub fn export_array_to_c(array: Box<dyn Array>) -> ArrowArray {
ArrowArray::new(bridge::align_to_c_data_interface(array))
}

/// Exports a [`Field`] to the C data interface.
/// # Safety
/// The pointer `ptr` must be allocated and valid
pub unsafe fn export_field_to_c(field: &Field, ptr: *mut ArrowSchema) {
std::ptr::write_unaligned(ptr, ArrowSchema::new(field));
pub fn export_field_to_c(field: &Field) -> ArrowSchema {
ArrowSchema::new(field)
}

/// Imports a [`Field`] from the C data interface.
/// # Safety
/// This function is intrinsically `unsafe` and relies on a [`ArrowSchema`]
/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
pub unsafe fn import_field_from_c(field: &ArrowSchema) -> Result<Field> {
to_field(field)
}

/// Imports an [`Array`] from the C data interface.
/// # Safety
/// This function is intrinsically `unsafe` and relies on a [`ArrowArray`]
/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
pub unsafe fn import_array_from_c(
array: Box<ArrowArray>,
array: ArrowArray,
data_type: DataType,
) -> Result<Box<dyn Array>> {
try_from(Arc::new(InternalArrowArray::new(array, data_type)))
try_from(Box::new(InternalArrowArray::new(array, data_type)))
}
Loading

0 comments on commit 69a9f9b

Please sign in to comment.