Skip to content

Commit

Permalink
apacheGH-42222: [Python] Add bindings for CopyTo on RecordBatch and A…
Browse files Browse the repository at this point in the history
…rray classes
  • Loading branch information
jorisvandenbossche committed Jun 20, 2024
1 parent 89d6354 commit 942184b
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 50 deletions.
24 changes: 24 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,30 @@ cdef class Array(_PandasConvertible):
_append_array_buffers(self.sp_array.get().data().get(), res)
return res

def copy_to(self, MemoryManager memory_manager):
"""
Construct a copy of the array with all buffers on destination
Memory Manager
This method recursively copies the array's buffers and those of its
children onto the destination MemoryManager device and returns the
new Array.
Parameters
----------
memory_manager : pyarrow.MemoryManager
Returns
------
Array
"""
cdef:
shared_ptr[CArray] c_array

with nogil:
c_array = GetResultValue(self.ap.CopyTo(memory_manager.unwrap()))
return pyarrow_wrap_array(c_array)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/device.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ cdef class MemoryManager(_Weakrefable):
self.init(mm)
return self

cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil:
return self.memory_manager

def __repr__(self):
return "<pyarrow.MemoryManager device: {}>".format(
frombytes(self.memory_manager.get().device().get().ToString())
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const

shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
Expand Down Expand Up @@ -1024,6 +1025,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)

CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const

CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major,
CMemoryPool* pool) const

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ cdef class MemoryManager(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CMemoryManager]& mm)

cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil


cdef class Buffer(_Weakrefable):
cdef:
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3549,6 +3549,29 @@ cdef class RecordBatch(_Tabular):
row_major, pool))
return pyarrow_wrap_tensor(c_tensor)

def copy_to(self, MemoryManager memory_manager):
"""
Copy the entire RecordBatch to destination MemoryManager.
This copies each column of the record batch to create
a new record batch where all underlying buffers for the columns have
been copied to the destination MemoryManager.
Parameters
----------
memory_manager : pyarrow.MemoryManager
Returns
------
RecordBatch
"""
cdef:
shared_ptr[CRecordBatch] c_batch

with nogil:
c_batch = GetResultValue(self.batch.CopyTo(memory_manager.unwrap()))
return pyarrow_wrap_batch(c_batch)

def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
Expand Down
73 changes: 23 additions & 50 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,21 +794,20 @@ def test_IPC(size):
assert p.exitcode == 0


def _arr_copy_to_host(carr):
# TODO replace below with copy to device when exposed in python
buffers = []
for cbuf in carr.buffers():
if cbuf is None:
buffers.append(None)
else:
buf = global_context.foreign_buffer(
cbuf.address, cbuf.size, cbuf
).copy_to_host()
buffers.append(buf)

child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
return new
def test_copy_to():
_, buf = make_random_buffer(size=10, target='device')
mm_cuda = buf.memory_manager

arr = pa.array([0, 1, 2])
arr_cuda = arr.copy_to(mm_cuda)
assert not arr_cuda.buffers()[1].is_cpu
assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA

batch = pa.record_batch({"col": arr})
batch_cuda = batch.copy_to(mm_cuda)
buf_cuda = batch_cuda["col"].buffers()[1]
assert not buf_cuda.is_cpu
assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA


def test_device_interface_array():
Expand All @@ -823,19 +822,10 @@ def test_device_interface_array():
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)

# TODO replace below with copy to device when exposed in python
cbuffers = []
for buf in arr.buffers():
if buf is None:
cbuffers.append(None)
else:
cbuf = global_context.new_buffer(buf.size)
cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
cbuffers.append(cbuf)

carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
# copy to device
_, buf = make_random_buffer(size=10, target='device')
mm_cuda = buf.memory_manager
carr = arr.copy_to(mm_cuda)

# Type is known up front
carr._export_to_c_device(ptr_array)
Expand All @@ -849,7 +839,7 @@ def test_device_interface_array():
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)

del carr_new
Expand All @@ -858,15 +848,13 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, typ)

# Schema is exported and imported at the same time
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
])
carr = arr.copy_to(mm_cuda)
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
arr_new = _arr_copy_to_host(carr_new)
arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)

del carr_new
Expand All @@ -875,21 +863,6 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, ptr_schema)


def _batch_copy_to_host(cbatch):
# TODO replace below with copy to device when exposed in python
arrs = []
for col in cbatch.columns:
buffers = [
global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
if buf is not None else None
for buf in col.buffers()
]
new = pa.Array.from_buffers(col.type, len(col), buffers)
arrs.append(new)

return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)


def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi
Expand All @@ -916,7 +889,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)

del cbatch_new
Expand All @@ -931,7 +904,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
batch_new = _batch_copy_to_host(cbatch_new)
batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)

del cbatch_new
Expand Down
13 changes: 13 additions & 0 deletions python/pyarrow/tests/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,16 @@ def test_buffer_device():
assert buf.device.is_cpu
assert buf.device == pa.default_cpu_memory_manager().device
assert buf.memory_manager.is_cpu


def test_copy_to():
mm = pa.default_cpu_memory_manager()
arr = pa.array([0, 1, 2])
arr_copied = arr.copy_to(mm)
assert arr.equals(arr_copied)
assert arr.buffers()[1].address != arr_copied.buffers()[1].address

batch = pa.record_batch({"col": arr})
batch_copied = batch.copy_to(mm)
assert batch.equals(batch_copied)
assert arr.buffers()[1].address != batch_copied["col"].buffers()[1].address

0 comments on commit 942184b

Please sign in to comment.