-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(python/adbc_driver_manager): export handles and ingest data through python Arrow PyCapsule interface #1346
Changes from 3 commits
df501e1
a5878b7
07da02c
e0fdba2
3856a6c
40ea168
30d4324
6f51825
acf591f
6564ab1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,10 +24,13 @@ import threading | |
import typing | ||
from typing import List, Tuple | ||
|
||
cimport cpython | ||
import cython | ||
from cpython.bytes cimport PyBytes_FromStringAndSize | ||
from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New | ||
from libc.stdint cimport int32_t, int64_t, uint8_t, uint32_t, uintptr_t | ||
from libc.string cimport memset | ||
from libc.stdlib cimport malloc, free | ||
from libc.string cimport memcpy, memset | ||
from libcpp.vector cimport vector as c_vector | ||
|
||
if typing.TYPE_CHECKING: | ||
|
@@ -304,9 +307,38 @@ cdef class _AdbcHandle: | |
f"with open {self._child_type}") | ||
|
||
|
||
cdef void pycapsule_schema_deleter(object capsule) noexcept: | ||
cdef CArrowSchema* allocated = <CArrowSchema*>PyCapsule_GetPointer( | ||
capsule, "arrow_schema" | ||
) | ||
if allocated.release != NULL: | ||
allocated.release(allocated) | ||
free(allocated) | ||
|
||
|
||
cdef void pycapsule_array_deleter(object capsule) noexcept: | ||
cdef CArrowArray* allocated = <CArrowArray*> PyCapsule_GetPointer( | ||
capsule, "arrow_array" | ||
) | ||
if allocated.release != NULL: | ||
allocated.release(allocated) | ||
free(allocated) | ||
|
||
|
||
cdef void pycapsule_stream_deleter(object capsule) noexcept: | ||
cdef CArrowArrayStream* allocated = <CArrowArrayStream*> PyCapsule_GetPointer( | ||
capsule, "arrow_array_stream" | ||
) | ||
if allocated.release != NULL: | ||
allocated.release(allocated) | ||
free(allocated) | ||
|
||
|
||
cdef class ArrowSchemaHandle: | ||
""" | ||
A wrapper for an allocated ArrowSchema. | ||
|
||
This object implements the Arrow PyCapsule interface. | ||
""" | ||
cdef: | ||
CArrowSchema schema | ||
|
@@ -316,23 +348,56 @@ cdef class ArrowSchemaHandle: | |
"""The address of the ArrowSchema.""" | ||
return <uintptr_t> &self.schema | ||
|
||
def __arrow_c_schema__(self) -> object: | ||
"""Consume this object to get a PyCapsule.""" | ||
# Reference: | ||
# https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html#create-a-pycapsule | ||
cdef CArrowSchema* allocated = <CArrowSchema*> malloc(sizeof(CArrowSchema)) | ||
allocated.release = NULL | ||
capsule = PyCapsule_New( | ||
<void*>allocated, "arrow_schema", &pycapsule_schema_deleter, | ||
) | ||
memcpy(allocated, &self.schema, sizeof(CArrowSchema)) | ||
self.schema.release = NULL | ||
return capsule | ||
|
||
|
||
cdef class ArrowArrayHandle: | ||
""" | ||
A wrapper for an allocated ArrowArray. | ||
|
||
This object implements the Arrow PyCapsule interface. | ||
""" | ||
cdef: | ||
CArrowArray array | ||
|
||
@property | ||
def address(self) -> int: | ||
"""The address of the ArrowArray.""" | ||
""" | ||
The address of the ArrowArray. | ||
""" | ||
return <uintptr_t> &self.array | ||
|
||
def __arrow_c_array__(self, requested_schema=None) -> object: | ||
"""Consume this object to get a PyCapsule.""" | ||
if requested_schema is not None: | ||
raise NotImplementedError("requested_schema") | ||
|
||
cdef CArrowArray* allocated = <CArrowArray*> malloc(sizeof(CArrowArray)) | ||
allocated.release = NULL | ||
capsule = PyCapsule_New( | ||
<void*>allocated, "arrow_array", pycapsule_array_deleter, | ||
) | ||
memcpy(allocated, &self.array, sizeof(CArrowArray)) | ||
self.array.release = NULL | ||
return capsule | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually not being used at the moment, because I think none of the ADBC APIs are returning an ArrowArray (only ArrowSchema or ArrowArrayStream). So I could also remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And realizing now, this implementation is actually also wrong -> it needs to return two capsules, one for the ArrowArray but also one for the ArrowSchema. And this handle only has the array. So I don't think we can add this dunder here. |
||
|
||
|
||
cdef class ArrowArrayStreamHandle: | ||
""" | ||
A wrapper for an allocated ArrowArrayStream. | ||
|
||
This object implements the Arrow PyCapsule interface. | ||
""" | ||
cdef: | ||
CArrowArrayStream stream | ||
|
@@ -342,6 +407,21 @@ cdef class ArrowArrayStreamHandle: | |
"""The address of the ArrowArrayStream.""" | ||
return <uintptr_t> &self.stream | ||
|
||
def __arrow_c_stream__(self, requested_schema=None) -> object: | ||
"""Consume this object to get a PyCapsule.""" | ||
if requested_schema is not None: | ||
raise NotImplementedError("requested_schema") | ||
|
||
cdef CArrowArrayStream* allocated = \ | ||
<CArrowArrayStream*> malloc(sizeof(CArrowArrayStream)) | ||
allocated.release = NULL | ||
capsule = PyCapsule_New( | ||
<void*>allocated, "arrow_array_stream", &pycapsule_stream_deleter, | ||
) | ||
memcpy(allocated, &self.stream, sizeof(CArrowArrayStream)) | ||
self.stream.release = NULL | ||
return capsule | ||
|
||
|
||
class GetObjectsDepth(enum.IntEnum): | ||
ALL = ADBC_OBJECT_DEPTH_ALL | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,8 @@ requires-python = ">=3.9" | |
dynamic = ["version"] | ||
|
||
[project.optional-dependencies] | ||
dbapi = ["pandas", "pyarrow>=8.0.0"] | ||
test = ["duckdb", "pandas", "pyarrow>=8.0.0", "pytest"] | ||
dbapi = ["pandas", "pyarrow>=14.0.1"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we OK with bumping this requirement? (I don't know who are already users of the python adbc packages that might be affected) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I figured we should bump it just because our official guidance is to upgrade. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a huge deal now but if usage grows over time this could be a pain point for pandas. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove it. I'm just not sure if we can express that you need the fix package if you're < 14.0.1 in the requirements There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reverted this change to update the minimum requirement. This PR doesn't strictly speaking need it, so let's keep the discussion to bump the minimum requirement separate. |
||
test = ["duckdb", "pandas", "pyarrow>=14.0.1", "pytest"] | ||
|
||
[project.urls] | ||
homepage = "https://arrow.apache.org/adbc/" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -390,3 +390,57 @@ def test_child_tracking(sqlite): | |
RuntimeError, match="Cannot close AdbcDatabase with open AdbcConnection" | ||
): | ||
db.close() | ||
|
||
|
||
@pytest.mark.sqlite | ||
def test_pycapsule(sqlite): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I further expanded the specific test that David started, but in addition (at least if requiring pyarrow>=14 for testing) I could also update the other tests above to do the import/export using capsules instead of the current calls to |
||
_, conn = sqlite | ||
handle = conn.get_table_types() | ||
with pyarrow.RecordBatchReader._import_from_c_capsule(handle.__arrow_c_stream__()) as reader: | ||
reader.read_all() | ||
|
||
# set up some data | ||
data = pyarrow.record_batch( | ||
[ | ||
[1, 2, 3, 4], | ||
["a", "b", "c", "d"], | ||
], | ||
names=["ints", "strs"], | ||
) | ||
with adbc_driver_manager.AdbcStatement(conn) as stmt: | ||
stmt.set_options(**{adbc_driver_manager.INGEST_OPTION_TARGET_TABLE: "foo"}) | ||
_bind(stmt, data) | ||
stmt.execute_update() | ||
|
||
# importing a schema | ||
|
||
handle = conn.get_table_schema(catalog=None, db_schema=None, table_name="foo") | ||
assert data.schema == pyarrow.schema(handle) | ||
# ensure consumed schema was marked as such | ||
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"): | ||
pyarrow.schema(handle) | ||
|
||
# smoke test for the capsule calling release | ||
capsule = conn.get_table_schema(catalog=None, db_schema=None, table_name="foo").__arrow_c_schema__() | ||
del capsule | ||
|
||
# importing a stream | ||
|
||
with adbc_driver_manager.AdbcStatement(conn) as stmt: | ||
stmt.set_sql_query("SELECT * FROM foo") | ||
handle, _ = stmt.execute_query() | ||
|
||
result = pyarrow.table(handle) | ||
assert result.to_batches()[0] == data | ||
|
||
# ensure consumed schema was marked as such | ||
with pytest.raises(ValueError, match="Cannot import released ArrowArrayStream"): | ||
pyarrow.table(handle) | ||
|
||
# smoke test for the capsule calling release | ||
with adbc_driver_manager.AdbcStatement(conn) as stmt: | ||
stmt.set_sql_query("SELECT * FROM foo") | ||
capsule = stmt.execute_query()[0].__arrow_c_stream__() | ||
del capsule | ||
|
||
# TODO: also need to import from things supporting protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are "moving" the schema here, while in nanoarrow I opted for a hard copy for the schema (using nanoarrow's
ArrowSchemaDeepCopy
).But I think the only advantage of a hard copy is that this means you can consume it multiple times? (or in the case of nanoarrow-python, that the nanoarrow Schema object is still valid and inspectable after it has been converted to eg a pyarrow.Schema)
For ADBC, I think the use case will be much more "receive handle and convert it directly once", given that the Handle object itself isn't useful at all (in contrast to nanoarrow.Schema), so moving here is probably fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think moving makes sense here.