Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-13763: [Python] Close files in ParquetFile & ParquetDatasetPiece #13821

Merged
merged 12 commits into from
Aug 17, 2022
Merged
19 changes: 16 additions & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,7 @@ cdef class ParquetReader(_Weakrefable):
CMemoryPool* pool
unique_ptr[FileReader] reader
FileMetaData _metadata
shared_ptr[CRandomAccessFile] rd_handle

cdef public:
_column_idx_map
Expand All @@ -1175,7 +1176,6 @@ cdef class ParquetReader(_Weakrefable):
thrift_string_size_limit=None,
thrift_container_size_limit=None):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
Expand Down Expand Up @@ -1221,10 +1221,10 @@ cdef class ParquetReader(_Weakrefable):
string_to_timeunit(coerce_int96_timestamp_unit))

self.source = source
get_reader(source, use_memory_map, &self.rd_handle)

get_reader(source, use_memory_map, &rd_handle)
with nogil:
check_status(builder.Open(rd_handle, properties, c_metadata))
check_status(builder.Open(self.rd_handle, properties, c_metadata))

# Set up metadata
with nogil:
Expand Down Expand Up @@ -1435,6 +1435,19 @@ cdef class ParquetReader(_Weakrefable):
.ReadColumn(column_index, &out))
return pyarrow_wrap_chunked_array(out)

def close(self):
if not self.closed:
with nogil:
check_status(self.rd_handle.get().Close())

@property
def closed(self):
if self.rd_handle == NULL:
return True
with nogil:
closed = self.rd_handle.get().closed()
return closed


cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
Expand Down
25 changes: 22 additions & 3 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,16 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
thrift_string_size_limit=thrift_string_size_limit,
thrift_container_size_limit=thrift_container_size_limit,
)
self._close_source = getattr(source, 'closed', True)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()

def __enter__(self):
return self

def __exit__(self, *args, **kwargs):
self.close()

def _build_nested_paths(self):
paths = self.reader.column_paths

Expand Down Expand Up @@ -375,6 +382,14 @@ def num_row_groups(self):
"""
return self.reader.num_row_groups

def close(self, force: bool = False):
if self._close_source or force:
self.reader.close()

@property
def closed(self) -> bool:
return self.reader.closed
milesgranger marked this conversation as resolved.
Show resolved Hide resolved

def read_row_group(self, i, columns=None, use_threads=True,
use_pandas_metadata=False):
"""
Expand Down Expand Up @@ -1128,8 +1143,8 @@ def get_metadata(self):
-------
metadata : FileMetaData
"""
f = self.open()
return f.metadata
with self.open() as parquet:
return parquet.metadata

def open(self):
"""
Expand Down Expand Up @@ -1203,6 +1218,9 @@ def read(self, columns=None, use_threads=True, partitions=None,
arr = pa.DictionaryArray.from_arrays(indices, dictionary)
table = table.append_column(name, arr)

# To ParquetFile the source looked like it was already open, so won't
# actually close it without overriding.
reader.close(force=True)
return table


Expand Down Expand Up @@ -1889,7 +1907,8 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
"""
tables = []
for piece in self._pieces:
table = piece.read(columns=columns, use_threads=use_threads,
table = piece.read(columns=columns,
use_threads=use_threads,
partitions=self._partitions,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)
Expand Down
52 changes: 52 additions & 0 deletions python/pyarrow/tests/parquet/test_parquet_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io
import os
from unittest import mock

import pytest

Expand Down Expand Up @@ -277,3 +278,54 @@ def test_pre_buffer(pre_buffer):
buf.seek(0)
pf = pq.ParquetFile(buf, pre_buffer=pre_buffer)
assert pf.read().num_rows == N


def test_parquet_file_explicitly_closed(tempdir):
"""
Unopened files should be closed explicitly after use,
and previously opened files should be left open.
Applies to read_table, ParquetDataset, and ParquetFile
"""
# create test parquet file
fn = tempdir.joinpath('file.parquet')
table = pa.table({'col1': [0, 1], 'col2': [0, 1]})
pq.write_table(table, fn)

# read_table (legacy) with opened file (will leave open)
with open(fn, 'rb') as f:
pq.read_table(f, use_legacy_dataset=True)
assert not f.closed # Didn't close it internally after read_table

# read_table (legacy) with unopened file (will close)
with mock.patch.object(pq.ParquetFile, "close") as mock_close:
pq.read_table(fn, use_legacy_dataset=True)
mock_close.assert_called()

# ParquetDataset test (legacy) with unopened file (will close)
with mock.patch.object(pq.ParquetFile, "close") as mock_close:
pq.ParquetDataset(fn, use_legacy_dataset=True).read()
mock_close.assert_called()

# ParquetDataset test (legacy) with opened file (will leave open)
with open(fn, 'rb') as f:
# ARROW-8075: support ParquetDataset from file-like, not just path-like
with pytest.raises(TypeError, match='not a path-like object'):
pq.ParquetDataset(f, use_legacy_dataset=True).read()
assert not f.closed

# ParquetFile with opened file (will leave open)
with open(fn, 'rb') as f:
with pq.ParquetFile(f) as p:
p.read()
assert not f.closed
assert not p.closed
assert not f.closed # opened input file was not closed
assert not p.closed # parquet file obj reports as not closed
assert f.closed
assert p.closed # parquet file being closed reflects underlying file

# ParquetFile with unopened file (will close)
with pq.ParquetFile(fn) as p:
p.read()
assert not p.closed
assert p.closed # parquet file obj reports as closed