From 942184b03b2ba8ccccc5a1cceb026144bbbc555b Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Thu, 20 Jun 2024 15:09:24 +0000 Subject: [PATCH] GH-42222: [Python] Add bindings for CopyTo on RecordBatch and Array classes --- python/pyarrow/array.pxi | 24 +++++++++ python/pyarrow/device.pxi | 3 ++ python/pyarrow/includes/libarrow.pxd | 3 ++ python/pyarrow/lib.pxd | 2 + python/pyarrow/table.pxi | 23 +++++++++ python/pyarrow/tests/test_cuda.py | 73 +++++++++------------------- python/pyarrow/tests/test_device.py | 13 +++++ 7 files changed, 91 insertions(+), 50 deletions(-) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 64a6ceaa6eaa4..d3cc595a10da3 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1658,6 +1658,30 @@ cdef class Array(_PandasConvertible): _append_array_buffers(self.sp_array.get().data().get(), res) return res + def copy_to(self, MemoryManager memory_manager): + """ + Construct a copy of the array with all buffers on destination + Memory Manager + + This method recursively copies the array's buffers and those of its + children onto the destination MemoryManager device and returns the + new Array. + + Parameters + ---------- + memory_manager : pyarrow.MemoryManager + + Returns + ------ + Array + """ + cdef: + shared_ptr[CArray] c_array + + with nogil: + c_array = GetResultValue(self.ap.CopyTo(memory_manager.unwrap())) + return pyarrow_wrap_array(c_array) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi index 6e6034752085a..2710d18a74025 100644 --- a/python/pyarrow/device.pxi +++ b/python/pyarrow/device.pxi @@ -130,6 +130,9 @@ cdef class MemoryManager(_Weakrefable): self.init(mm) return self + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil: + return self.memory_manager + def __repr__(self): return "".format( frombytes(self.memory_manager.get().device().get().ToString()) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 53ad95f2430be..1bd2b044dd30f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -234,6 +234,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CStatus Validate() const CStatus ValidateFull() const CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type) + CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data) CResult[shared_ptr[CArray]] MakeArrayOfNull( @@ -1024,6 +1025,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CRecordBatch] Slice(int64_t offset) shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length) + CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const + CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major, CMemoryPool* pool) const diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 1bc639cc8d2ba..f5e2fba39c923 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -543,6 +543,8 @@ cdef class MemoryManager(_Weakrefable): @staticmethod cdef wrap(const shared_ptr[CMemoryManager]& mm) + cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil + cdef class Buffer(_Weakrefable): cdef: diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 767e09004592c..b7a12c2378133 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -3549,6 +3549,29 @@ cdef class RecordBatch(_Tabular): row_major, pool)) return pyarrow_wrap_tensor(c_tensor) + def copy_to(self, MemoryManager memory_manager): + """ + Copy the entire RecordBatch to destination MemoryManager. + + This copies each column of the record batch to create + a new record batch where all underlying buffers for the columns have + been copied to the destination MemoryManager. + + Parameters + ---------- + memory_manager : pyarrow.MemoryManager + + Returns + ------ + RecordBatch + """ + cdef: + shared_ptr[CRecordBatch] c_batch + + with nogil: + c_batch = GetResultValue(self.batch.CopyTo(memory_manager.unwrap())) + return pyarrow_wrap_batch(c_batch) + def _export_to_c(self, out_ptr, out_schema_ptr=0): """ Export to a C ArrowArray struct, given its pointer. diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py index 400db2643bd56..51cd7a05d5963 100644 --- a/python/pyarrow/tests/test_cuda.py +++ b/python/pyarrow/tests/test_cuda.py @@ -794,21 +794,20 @@ def test_IPC(size): assert p.exitcode == 0 -def _arr_copy_to_host(carr): - # TODO replace below with copy to device when exposed in python - buffers = [] - for cbuf in carr.buffers(): - if cbuf is None: - buffers.append(None) - else: - buf = global_context.foreign_buffer( - cbuf.address, cbuf.size, cbuf - ).copy_to_host() - buffers.append(buf) - - child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:]) - new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child]) - return new +def test_copy_to(): + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + + arr = pa.array([0, 1, 2]) + arr_cuda = arr.copy_to(mm_cuda) + assert not arr_cuda.buffers()[1].is_cpu + assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA + + batch = pa.record_batch({"col": arr}) + batch_cuda = batch.copy_to(mm_cuda) + buf_cuda = batch_cuda["col"].buffers()[1] + assert not buf_cuda.is_cpu + assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA def test_device_interface_array(): @@ -823,19 +822,10 @@ def test_device_interface_array(): typ = pa.list_(pa.int32()) arr = pa.array([[1], [2, 42]], type=typ) - # TODO replace below with copy to device when exposed in python - cbuffers = [] - for buf in arr.buffers(): - if buf is None: - cbuffers.append(None) - else: - cbuf = global_context.new_buffer(buf.size) - cbuf.copy_from_host(buf, position=0, nbytes=buf.size) - cbuffers.append(cbuf) - - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + # copy to device + _, buf = make_random_buffer(size=10, target='device') + mm_cuda = buf.memory_manager + carr = arr.copy_to(mm_cuda) # Type is known up front carr._export_to_c_device(ptr_array) @@ -849,7 +839,7 @@ def test_device_interface_array(): del carr carr_new = pa.Array._import_from_c_device(ptr_array, typ) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -858,15 +848,13 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, typ) # Schema is exported and imported at the same time - carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[ - pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:]) - ]) + carr = arr.copy_to(mm_cuda) carr._export_to_c_device(ptr_array, ptr_schema) # Delete and recreate C++ objects from exported pointers del carr carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema) assert carr_new.type == pa.list_(pa.int32()) - arr_new = _arr_copy_to_host(carr_new) + arr_new = carr_new.copy_to(pa.default_cpu_memory_manager()) assert arr_new.equals(arr) del carr_new @@ -875,21 +863,6 @@ def test_device_interface_array(): pa.Array._import_from_c_device(ptr_array, ptr_schema) -def _batch_copy_to_host(cbatch): - # TODO replace below with copy to device when exposed in python - arrs = [] - for col in cbatch.columns: - buffers = [ - global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host() - if buf is not None else None - for buf in col.buffers() - ] - new = pa.Array.from_buffers(col.type, len(col), buffers) - arrs.append(new) - - return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema) - - def test_device_interface_batch_array(): cffi = pytest.importorskip("pyarrow.cffi") ffi = cffi.ffi @@ -916,7 +889,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new @@ -931,7 +904,7 @@ def test_device_interface_batch_array(): del cbatch cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) assert cbatch_new.schema == schema - batch_new = _batch_copy_to_host(cbatch_new) + batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager()) assert batch_new.equals(batch) del cbatch_new diff --git a/python/pyarrow/tests/test_device.py b/python/pyarrow/tests/test_device.py index 6bdb015be1a95..e45fb3e2c6028 100644 --- a/python/pyarrow/tests/test_device.py +++ b/python/pyarrow/tests/test_device.py @@ -41,3 +41,16 @@ def test_buffer_device(): assert buf.device.is_cpu assert buf.device == pa.default_cpu_memory_manager().device assert buf.memory_manager.is_cpu + + +def test_copy_to(): + mm = pa.default_cpu_memory_manager() + arr = pa.array([0, 1, 2]) + arr_copied = arr.copy_to(mm) + assert arr.equals(arr_copied) + assert arr.buffers()[1].address != arr_copied.buffers()[1].address + + batch = pa.record_batch({"col": arr}) + batch_copied = batch.copy_to(mm) + assert batch.equals(batch_copied) + assert arr.buffers()[1].address != batch_copied["col"].buffers()[1].address