Skip to content

Commit

Permalink
undo non-test changes + try import pyarrow.cuda instead to register
Browse files Browse the repository at this point in the history
  • Loading branch information
jorisvandenbossche committed Mar 27, 2024
1 parent 3667071 commit 1bfc05c
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 141 deletions.
7 changes: 7 additions & 0 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ def print_entry(label, value):

import pyarrow.types as types

try:
# Try importing the cuda module to ensure libarrow_cuda gets loaded
# to register the CUDA device for the C Data Interface import
import pyarrow.cuda
except ImportError:
pass


# ----------------------------------------------------------------------
# Deprecations
Expand Down
50 changes: 0 additions & 50 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -965,56 +965,6 @@ def read_record_batch(object buffer, object schema, *,
return pyarrow_wrap_batch(batch)


def _import_device_array_cuda(in_ptr, type):
# equivalent to the definition in array.pxi but using CudaDefaultMemoryMapper
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr,
CudaDefaultMemoryMapper)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type,
CudaDefaultMemoryMapper)
)
return pyarrow_wrap_array(c_array)


def _import_device_recordbatch_cuda(in_ptr, schema):
# equivalent to the definition in table.pxi but using CudaDefaultMemoryMapper
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr,
CudaDefaultMemoryMapper)
)
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema, CudaDefaultMemoryMapper)
)
return pyarrow_wrap_batch(c_batch)


# Public API


Expand Down
54 changes: 20 additions & 34 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,6 @@ import warnings
from cython import sizeof


def _import_device_array_cpu(in_ptr, type):
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr,
DefaultDeviceMapper)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type,
DefaultDeviceMapper)
)
return pyarrow_wrap_array(c_array)


try:
from pyarrow._cuda import _import_device_array_cuda

_import_device_array = _import_device_array_cuda
except ImportError:
_import_device_array = _import_device_array_cpu


cdef _sequence_to_array(object sequence, object mask, object size,
DataType type, CMemoryPool* pool, c_bool from_pandas):
cdef:
Expand Down Expand Up @@ -1867,7 +1834,26 @@ cdef class Array(_PandasConvertible):
This is a low-level function intended for expert users.
"""
return _import_device_array(in_ptr, type)
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
)
return pyarrow_wrap_array(c_array)

def __dlpack__(self, stream=None):
"""Export a primitive array as a DLPack capsule.
Expand Down
20 changes: 5 additions & 15 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer(
const int64_t size, CMemoryPool* pool)

cdef cppclass CMemoryManager" arrow::MemoryManager":
pass

cdef cppclass CSyncEvent" arrow::Device::SyncEvent":
pass

Expand Down Expand Up @@ -2919,8 +2916,6 @@ cdef extern from "arrow/c/abi.h":
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

ctypedef int32_t ArrowDeviceType

cdef struct ArrowDeviceArray:
pass

Expand Down Expand Up @@ -2956,25 +2951,20 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*)
CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*)

ctypedef CResult[shared_ptr[CMemoryManager]] CDeviceMemoryMapper(
ArrowDeviceType, int64_t)

CResult[shared_ptr[CMemoryManager]] DefaultDeviceMapper(
ArrowDeviceType device_type, int64_t device_id)

CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, shared_ptr[CDataType], const CDeviceMemoryMapper&)
ArrowDeviceArray*, shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&)
ArrowDeviceArray*, ArrowSchema*)

CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, shared_ptr[CSchema], const CDeviceMemoryMapper&)
ArrowDeviceArray*, shared_ptr[CSchema])
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, ArrowSchema*, const CDeviceMemoryMapper&)
ArrowDeviceArray*, ArrowSchema*)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
4 changes: 0 additions & 4 deletions python/pyarrow/includes/libarrow_cuda.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil:
CResult[shared_ptr[CCudaHostBuffer]] AllocateCudaHostBuffer(
int device_number, const int64_t size)

CResult[shared_ptr[CMemoryManager]] \
CudaDefaultMemoryMapper" arrow::cuda::DefaultMemoryMapper"(
ArrowDeviceType device_type, int64_t device_id)

# Cuda prefix is added to avoid picking up arrow::cuda functions
# from arrow namespace.
CResult[shared_ptr[CCudaBuffer]] \
Expand Down
2 changes: 0 additions & 2 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,6 @@ cdef shared_ptr[const CKeyValueMetadata] pyarrow_unwrap_metadata(
cdef object pyarrow_wrap_metadata(
const shared_ptr[const CKeyValueMetadata]& meta)

cdef void* _as_c_pointer(v, allow_null=*) except *

#
# Public Cython API for 3rd party code
#
Expand Down
8 changes: 3 additions & 5 deletions python/pyarrow/lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import os
import sys

from cython.operator cimport dereference as deref
from cython cimport binding

from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.includes.common cimport PyObject_to_object
Expand Down Expand Up @@ -164,9 +162,6 @@ include "pandas-shim.pxi"
# Memory pools and allocation
include "memory.pxi"

# File IO
include "io.pxi"

# DataType, Field, Schema
include "types.pxi"

Expand All @@ -188,6 +183,9 @@ include "tensor.pxi"
# DLPack
include "_dlpack.pxi"

# File IO
include "io.pxi"

# IPC / Messaging
include "ipc.pxi"

Expand Down
48 changes: 17 additions & 31 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,6 @@ from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCap
import warnings
from cython import sizeof


def _import_device_recordbatch_cpu(in_ptr, schema):
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr,
DefaultDeviceMapper))
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema, DefaultDeviceMapper))
return pyarrow_wrap_batch(c_batch)


try:
from pyarrow._cuda import _import_device_recordbatch_cuda

_import_device_recordbatch = _import_device_recordbatch_cuda
except ImportError:
_import_device_recordbatch = _import_device_recordbatch_cpu


cdef class ChunkedArray(_PandasConvertible):
"""
An array-like composed from a (possibly empty) collection of pyarrow.Arrays
Expand Down Expand Up @@ -3638,7 +3608,23 @@ cdef class RecordBatch(_Tabular):
This is a low-level function intended for expert users.
"""
return _import_device_recordbatch(in_ptr, schema)
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
return pyarrow_wrap_batch(c_batch)


def _reconstruct_record_batch(columns, schema):
Expand Down

0 comments on commit 1bfc05c

Please sign in to comment.