Skip to content

Commit

Permalink
Merge pull request #9043 from rouault/python_arrow_c_stream
Browse files Browse the repository at this point in the history
Python bindings: implement __arrow_c_stream__() interface for ogr.Layer
  • Loading branch information
rouault authored Jan 11, 2024
2 parents 045d8b8 + e4f78be commit f8270f8
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 0 deletions.
48 changes: 48 additions & 0 deletions autotest/ogr/ogr_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,54 @@ def test_ogr_mem_alter_geom_field_defn():
assert lyr.GetSpatialRef() is None


###############################################################################
# Test ogr.Layer.__arrow_c_stream__() interface.
# Cf https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html


@gdaltest.enable_exceptions()
def test_ogr_mem_arrow_stream_pycapsule_interface():
import ctypes

ds = ogr.GetDriverByName("Memory").CreateDataSource("")
lyr = ds.CreateLayer("foo")

stream = lyr.__arrow_c_stream__()
assert stream
t = type(stream)
assert t.__module__ == "builtins"
assert t.__name__ == "PyCapsule"
capsule_get_name = ctypes.pythonapi.PyCapsule_GetName
capsule_get_name.argtypes = [ctypes.py_object]
capsule_get_name.restype = ctypes.c_char_p
assert capsule_get_name(ctypes.py_object(stream)) == b"arrow_array_stream"

with pytest.raises(
Exception, match="An arrow Arrow Stream is in progress on that layer"
):
lyr.__arrow_c_stream__()

del stream

stream = lyr.__arrow_c_stream__()
assert stream
del stream

with pytest.raises(Exception, match="requested_schema != None not implemented"):
# "something" should rather by a PyCapsule with an ArrowSchema...
lyr.__arrow_c_stream__(requested_schema="something")

# Also test GetArrowArrayStreamInterface() to be able to specify options
stream = lyr.GetArrowArrayStreamInterface(
{"INCLUDE_FID": "NO"}
).__arrow_c_stream__()
assert stream
t = type(stream)
assert t.__module__ == "builtins"
assert t.__name__ == "PyCapsule"
del stream


###############################################################################


Expand Down
41 changes: 41 additions & 0 deletions swig/include/ogr.i
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,22 @@ public:
}; /* class ArrowArrayStream */
#endif

#ifdef SWIGPYTHON
// Implements __arrow_c_stream__ export interface:
// https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html#create-a-pycapsule
%{
static void ReleaseArrowArrayStreamPyCapsule(PyObject* capsule) {
struct ArrowArrayStream* stream =
(struct ArrowArrayStream*)PyCapsule_GetPointer(capsule, "arrow_array_stream");
if (stream->release != NULL) {
stream->release(stream);
}
CPLFree(stream);
}
%}

#endif

/************************************************************************/
/* OGRLayer */
/************************************************************************/
Expand Down Expand Up @@ -1507,6 +1523,31 @@ public:

#ifdef SWIGPYTHON

PyObject* ExportArrowArrayStreamPyCapsule(char** options = NULL)
{
struct ArrowArrayStream* stream =
(struct ArrowArrayStream*)CPLMalloc(sizeof(struct ArrowArrayStream));

const int success = OGR_L_GetArrowStream(self, stream, options);

PyObject* ret;
SWIG_PYTHON_THREAD_BEGIN_BLOCK;
if( success )
{
ret = PyCapsule_New(stream, "arrow_array_stream", ReleaseArrowArrayStreamPyCapsule);
}
else
{
CPLFree(stream);
Py_INCREF(Py_None);
ret = Py_None;
}

SWIG_PYTHON_THREAD_END_BLOCK;

return ret;
}

%newobject GetArrowStream;
ArrowArrayStream* GetArrowStream(char** options = NULL) {
struct ArrowArrayStream* stream = (struct ArrowArrayStream* )malloc(sizeof(struct ArrowArrayStream));
Expand Down
84 changes: 84 additions & 0 deletions swig/include/python/ogr_python.i
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,90 @@ def ReleaseResultSet(self, sql_lyr):
schema = property(schema)


def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule, according to
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
Also note that only one active stream can be queried at a time for a
given layer.
To specify options how the ArrowStream should be generated, use
the GetArrowArrayStreamInterface(self, options) method
Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.
Currently, this is not supported and will raise a
NotImplementedError if the schema is not None
Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""

if requested_schema is not None:
raise NotImplementedError("requested_schema != None not implemented")

return self.ExportArrowArrayStreamPyCapsule()


def GetArrowArrayStreamInterface(self, options = []):
"""
Return a proxy object that implements the __arrow_c_stream__() method,
but allows the user to pass options.
Parameters
----------
options : List of strings or dict with options such as INCLUDE_FID=NO, MAX_FEATURES_IN_BATCH=<number>, etc.
Returns
-------
a proxy object which implements the __arrow_c_stream__() method
"""

class ArrowArrayStreamInterface:
def __init__(self, lyr, options):
self.lyr = lyr
self.options = options

def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule, according to
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Also note that only one active stream can be queried at a time for a
given layer.

To specify options how the ArrowStream should be generated, use
the GetArrowArrayStreamInterface(self, options) method

Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.
Currently, this is not supported and will raise a
NotImplementedError if the schema is not None

Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
if requested_schema is not None:
raise NotImplementedError("requested_schema != None not implemented")

return self.lyr.ExportArrowArrayStreamPyCapsule(self.options)

return ArrowArrayStreamInterface(self, options)


def GetArrowStreamAsPyArrow(self, options = []):
""" Return an ArrowStream as PyArrow Schema and Array objects """

Expand Down

0 comments on commit f8270f8

Please sign in to comment.