Skip to content

Commit

Permalink
ENH: enable support for writing to memory (#397)
Browse files Browse the repository at this point in the history
  • Loading branch information
brendan-ward authored May 2, 2024
1 parent 246ca84 commit 6b3d3dc
Show file tree
Hide file tree
Showing 13 changed files with 697 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
the previous behaviour of returning a `pyarrow.RecordBatchReader`, specify
`use_pyarrow=True` (#349).
- Warn when reading from a multilayer file without specifying a layer (#362).
- Allow writing to a new in-memory datasource using io.BytesIO object (#397).

### Bug fixes

Expand Down
208 changes: 185 additions & 23 deletions pyogrio/_io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@

import contextlib
import datetime
from io import BytesIO
import locale
import logging
import math
import os
from pathlib import Path
import sys
from uuid import uuid4
import warnings

from libc.stdint cimport uint8_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport strlen
from libc.string cimport memcpy, strlen
from libc.math cimport isnan
from cpython.pycapsule cimport PyCapsule_GetPointer

Expand All @@ -29,7 +32,7 @@ from pyogrio._err cimport *
from pyogrio._err import CPLE_BaseError, CPLE_NotSupportedError, NullPointerError
from pyogrio._geometry cimport get_geometry_type, get_geometry_type_code
from pyogrio.errors import CRSError, DataSourceError, DataLayerError, GeometryError, FieldError, FeatureError

from pyogrio._ogr import _get_driver_metadata_item

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -173,6 +176,17 @@ cdef const char* override_threadlocal_config_option(str key, str value):


cdef void* ogr_open(const char* path_c, int mode, char** options) except NULL:
"""Open an existing OGR data source
Parameters
----------
path_c : char *
input path, including an in-memory path (/vsimem/...)
mode : int
set to 1 to allow updating data source
options : char **, optional
dataset open options
"""
cdef void* ogr_dataset = NULL

# Force linear approximations in all cases
Expand Down Expand Up @@ -1976,12 +1990,105 @@ cdef infer_field_types(list dtypes):

return field_types

cdef str get_ogr_vsimem_write_path(object path_or_fp, str driver):
""" Return the original path or a /vsimem/ path
If passed a io.BytesIO object, this will return a /vsimem/ path that can be
used to create a new in-memory file with an extension inferred from the driver
if possible. Path will be contained in an in-memory directory to contain
sibling files (though drivers that create sibling files are not supported for
in-memory files).
Caller is responsible for deleting the directory via delete_vsimem_file()
Parameters
----------
path_or_fp : str or io.BytesIO object
driver : str
"""

if not isinstance(path_or_fp, BytesIO):
return path_or_fp

# Create in-memory directory to contain auxiliary files
memfilename = uuid4().hex
VSIMkdir(f"/vsimem/{memfilename}".encode("utf-8"), 0666)

# file extension is required for some drivers, set it based on driver metadata
ext = ''
recommended_ext = _get_driver_metadata_item(driver, "DMD_EXTENSIONS")
if recommended_ext is not None:
ext = "." + recommended_ext.split(' ')[0]

path = f"/vsimem/{memfilename}/{memfilename}{ext}"

# check for existing bytes
if path_or_fp.getbuffer().nbytes > 0:
raise NotImplementedError("writing to existing in-memory object is not supported")

return path


cdef read_vsimem_to_buffer(str path, object out_buffer):
"""Copy bytes from in-memory file to buffer
This will automatically unlink the in-memory file pointed to by path; caller
is still responsible for calling delete_vsimem_file() to cleanup any other
files contained in the in-memory directory.
Parameters:
-----------
path : str
path to in-memory file
buffer : BytesIO object
"""

cdef unsigned char *vsi_buffer = NULL
cdef vsi_l_offset vsi_buffer_size = 0

try:
# Take ownership of the buffer to avoid a copy; GDAL will automatically
# unlink the memory file
vsi_buffer = VSIGetMemFileBuffer(path.encode("UTF-8"), &vsi_buffer_size, 1)
if vsi_buffer == NULL:
raise RuntimeError("could not read bytes from in-memory file")

# write bytes to buffer
out_buffer.write(<bytes>vsi_buffer[:vsi_buffer_size])
# rewind to beginning to allow caller to read
out_buffer.seek(0)

finally:
if vsi_buffer != NULL:
CPLFree(vsi_buffer)


cdef delete_vsimem_file(str path):
""" Delete in-memory directory containing path
Parameters:
-----------
path : str
path to in-memory file
"""
VSIRmdirRecursive(str(Path(path).parent).encode("UTF-8"))


cdef create_ogr_dataset_layer(
str path, str layer, str driver, str crs, str geometry_type, str encoding,
object dataset_kwargs, object layer_kwargs, bint append,
dataset_metadata, layer_metadata,
OGRDataSourceH* ogr_dataset_out, OGRLayerH* ogr_layer_out,
str path,
bint is_vsi,
str layer,
str driver,
str crs,
str geometry_type,
str encoding,
object dataset_kwargs,
object layer_kwargs,
bint append,
dataset_metadata,
layer_metadata,
OGRDataSourceH* ogr_dataset_out,
OGRLayerH* ogr_layer_out,
):
"""
Construct the OGRDataSource and OGRLayer objects based on input
Expand Down Expand Up @@ -2030,18 +2137,22 @@ cdef create_ogr_dataset_layer(
driver_b = driver.encode('UTF-8')
driver_c = driver_b

# in-memory dataset is always created from scratch
path_exists = os.path.exists(path) if not is_vsi else False

if not layer:
layer = os.path.splitext(os.path.split(path)[1])[0]

# if shapefile, GeoJSON, or FlatGeobuf, always delete first
# for other types, check if we can create layers
# GPKG might be the only multi-layer writeable type. TODO: check this
if driver in ('ESRI Shapefile', 'GeoJSON', 'GeoJSONSeq', 'FlatGeobuf') and os.path.exists(path):
if driver in ('ESRI Shapefile', 'GeoJSON', 'GeoJSONSeq', 'FlatGeobuf') and path_exists:
if not append:
os.unlink(path)
path_exists = False

layer_exists = False
if os.path.exists(path):
if path_exists:
try:
ogr_dataset = ogr_open(path_c, 1, NULL)

Expand All @@ -2063,7 +2174,11 @@ cdef create_ogr_dataset_layer(
raise exc

# otherwise create from scratch
os.unlink(path)
if is_vsi:
VSIUnlink(path_c)
else:
os.unlink(path)

ogr_dataset = NULL

# either it didn't exist or could not open it in write mode
Expand Down Expand Up @@ -2154,15 +2269,29 @@ cdef create_ogr_dataset_layer(

ogr_dataset_out[0] = ogr_dataset
ogr_layer_out[0] = ogr_layer

return create_layer


# TODO: set geometry and field data as memory views?
def ogr_write(
str path, str layer, str driver, geometry, fields, field_data, field_mask,
str crs, str geometry_type, str encoding, object dataset_kwargs,
object layer_kwargs, bint promote_to_multi=False, bint nan_as_null=True,
bint append=False, dataset_metadata=None, layer_metadata=None,
object path_or_fp,
str layer,
str driver,
geometry,
fields,
field_data,
field_mask,
str crs,
str geometry_type,
str encoding,
object dataset_kwargs,
object layer_kwargs,
bint promote_to_multi=False,
bint nan_as_null=True,
bint append=False,
dataset_metadata=None,
layer_metadata=None,
gdal_tz_offsets=None
):
cdef OGRDataSourceH ogr_dataset = NULL
Expand All @@ -2179,6 +2308,7 @@ def ogr_write(
cdef int num_records = -1
cdef int num_field_data = len(field_data) if field_data is not None else 0
cdef int num_fields = len(fields) if fields is not None else 0
cdef bint is_vsi = False

if num_fields != num_field_data:
raise ValueError("field_data array needs to be same length as fields array")
Expand Down Expand Up @@ -2218,9 +2348,13 @@ def ogr_write(
gdal_tz_offsets = {}

try:
### Setup up dataset and layer
# Setup in-memory handler if needed
path = get_ogr_vsimem_write_path(path_or_fp, driver)
is_vsi = path.startswith('/vsimem/')

# Setup dataset and layer
layer_created = create_ogr_dataset_layer(
path, layer, driver, crs, geometry_type, encoding,
path, is_vsi, layer, driver, crs, geometry_type, encoding,
dataset_kwargs, layer_kwargs, append,
dataset_metadata, layer_metadata,
&ogr_dataset, &ogr_layer,
Expand Down Expand Up @@ -2418,6 +2552,16 @@ def ogr_write(

log.info(f"Created {num_records:,} records" )

# close dataset to force driver to flush data
exc = ogr_close(ogr_dataset)
ogr_dataset = NULL
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")

# copy in-memory file back to path_or_fp object
if is_vsi:
read_vsimem_to_buffer(path, path_or_fp)

finally:
### Final cleanup
# make sure that all objects allocated above are released if exceptions
Expand All @@ -2434,13 +2578,15 @@ def ogr_write(
OGR_G_DestroyGeometry(ogr_geometry)
ogr_geometry = NULL

exc = ogr_close(ogr_dataset)
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")
if ogr_dataset != NULL:
ogr_close(ogr_dataset)

if is_vsi:
delete_vsimem_file(path)


def ogr_write_arrow(
str path,
object path_or_fp,
str layer,
str driver,
object arrow_obj,
Expand All @@ -2460,6 +2606,7 @@ def ogr_write_arrow(
cdef OGRDataSourceH ogr_dataset = NULL
cdef OGRLayerH ogr_layer = NULL
cdef char **options = NULL
cdef bint is_vsi = False
cdef ArrowArrayStream* stream = NULL
cdef ArrowSchema schema
cdef ArrowArray array
Expand All @@ -2468,8 +2615,11 @@ def ogr_write_arrow(
array.release = NULL

try:
path = get_ogr_vsimem_write_path(path_or_fp, driver)
is_vsi = path.startswith('/vsimem/')

layer_created = create_ogr_dataset_layer(
path, layer, driver, crs, geometry_type, encoding,
path, is_vsi, layer, driver, crs, geometry_type, encoding,
dataset_kwargs, layer_kwargs, append,
dataset_metadata, layer_metadata,
&ogr_dataset, &ogr_layer,
Expand Down Expand Up @@ -2523,6 +2673,16 @@ def ogr_write_arrow(
if array.release != NULL:
array.release(&array)

# close dataset to force driver to flush data
exc = ogr_close(ogr_dataset)
ogr_dataset = NULL
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")

# copy in-memory file back to path_or_fp object
if is_vsi:
read_vsimem_to_buffer(path, path_or_fp)

finally:
if stream != NULL and stream.release != NULL:
stream.release(stream)
Expand All @@ -2537,9 +2697,11 @@ def ogr_write_arrow(
CSLDestroy(options)
options = NULL

exc = ogr_close(ogr_dataset)
if exc:
raise DataSourceError(f"Failed to write features to dataset {path}; {exc}")
if ogr_dataset != NULL:
ogr_close(ogr_dataset)

if is_vsi:
delete_vsimem_file(path)


cdef get_arrow_extension_metadata(const ArrowSchema* schema):
Expand Down
20 changes: 15 additions & 5 deletions pyogrio/_ogr.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ cdef extern from "cpl_string.h":


cdef extern from "cpl_vsi.h" nogil:

int VSI_STAT_EXISTS_FLAG
ctypedef int vsi_l_offset
ctypedef FILE VSILFILE
ctypedef struct VSIStatBufL:
long st_size
long st_mode
int st_mtime

int VSIFCloseL(VSILFILE *fp)
int VSIFFlushL(VSILFILE *fp)
int VSIUnlink(const char *path)

VSILFILE *VSIFileFromMemBuffer(const char *path, void *data, vsi_l_offset data_len, int take_ownership)
unsigned char *VSIGetMemFileBuffer(const char *path, vsi_l_offset *data_len, int take_ownership)

VSILFILE *VSIFileFromMemBuffer(const char *path, void *data,
int data_len, int take_ownership)
int VSIFCloseL(VSILFILE *fp)
int VSIUnlink(const char *path)
int VSIMkdir(const char *path, long mode)
int VSIRmdirRecursive(const char *pszDirname)


cdef extern from "ogr_core.h":
Expand Down
8 changes: 8 additions & 0 deletions pyogrio/_ogr.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ def ogr_driver_supports_write(driver):
return False


def ogr_driver_supports_vsi(driver):
# check metadata for driver to see if it supports write
if _get_driver_metadata_item(driver, "DCAP_VIRTUALIO") == 'YES':
return True

return False


def ogr_list_drivers():
cdef OGRSFDriverH driver = NULL
cdef int i
Expand Down
Loading

0 comments on commit 6b3d3dc

Please sign in to comment.