Skip to content

Commit

Permalink
ARROW-2627: [Python] Add option to pass memory_map argument to Parque…
Browse files Browse the repository at this point in the history
…tDataset

I'm hesitant to spend much more energy on this code before diving into a C++ porting effort around this

Author: Wes McKinney <[email protected]>

Closes #3639 from wesm/ARROW-2627 and squashes the following commits:

e78f2c0 <Wes McKinney> Fix module name
59f0ce8 <Wes McKinney> code review feedback
88f45cd <Wes McKinney> Add option to pass memory_map argument to ParquetDataset. Modify some internal-ish APIs to not require passing in a file-open-function
  • Loading branch information
wesm committed Feb 28, 2019
1 parent 2a14c7b commit f2fb02b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 25 deletions.
63 changes: 38 additions & 25 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from collections import defaultdict
from concurrent import futures
from functools import partial

from six.moves.urllib.parse import urlparse
import json
Expand Down Expand Up @@ -432,14 +433,17 @@ class ParquetDatasetPiece(object):
----------
path : str or pathlib.Path
Path to file in the file system where this piece is located
open_file_func : callable
Function to use for obtaining file handle to dataset piece
partition_keys : list of tuples
[(column name, ordinal index)]
row_group : int, default None
Row group to load. By default, reads all row groups
"""

def __init__(self, path, row_group=None, partition_keys=None):
def __init__(self, path, open_file_func=partial(open, mode='rb'),
row_group=None, partition_keys=None):
self.path = _stringify_path(path)
self.open_file_func = open_file_func
self.row_group = row_group
self.partition_keys = partition_keys or []

Expand Down Expand Up @@ -474,24 +478,24 @@ def __str__(self):

return result

def get_metadata(self, open_file_func=None):
def get_metadata(self):

This comment has been minimized.

Copy link
@kszucs

kszucs Mar 13, 2019

Member

@wesm this breaks dask: https://travis-ci.org/kszucs/crossbow/builds/505712978#L5878
I'm creating a PR for them.

This comment has been minimized.

Copy link
@wesm

wesm Mar 13, 2019

Author Member

OK, we can also implement backwards compatibility here. More evidence we need to be running integration tests more often

"""
Given a function that can create an open ParquetFile object, return the
file's metadata
"""
return self._open(open_file_func).metadata
return self.open().metadata

def _open(self, open_file_func=None):
def open(self):
"""
Returns instance of ParquetFile
"""
reader = open_file_func(self.path)
reader = self.open_file_func(self.path)
if not isinstance(reader, ParquetFile):
reader = ParquetFile(reader)
return reader

def read(self, columns=None, use_threads=True, partitions=None,
open_file_func=None, file=None, use_pandas_metadata=False):
file=None, use_pandas_metadata=False):
"""
Read this piece as a pyarrow.Table
Expand All @@ -511,8 +515,8 @@ def read(self, columns=None, use_threads=True, partitions=None,
-------
table : pyarrow.Table
"""
if open_file_func is not None:
reader = self._open(open_file_func)
if self.open_file_func is not None:
reader = self.open()
elif file is not None:
reader = ParquetFile(file)
else:
Expand Down Expand Up @@ -706,10 +710,11 @@ class ParquetManifest(object):
"""
"""
def __init__(self, dirpath, filesystem=None, pathsep='/',
partition_scheme='hive', metadata_nthreads=1):
def __init__(self, dirpath, open_file_func=None, filesystem=None,
pathsep='/', partition_scheme='hive', metadata_nthreads=1):
filesystem, dirpath = _get_filesystem_and_path(filesystem, dirpath)
self.filesystem = filesystem
self.open_file_func = open_file_func
self.pathsep = pathsep
self.dirpath = _stringify_path(dirpath)
self.partition_scheme = partition_scheme
Expand Down Expand Up @@ -803,7 +808,8 @@ def _parse_partition(self, dirname):

def _push_pieces(self, files, part_keys):
self.pieces.extend([
ParquetDatasetPiece(path, partition_keys=part_keys)
ParquetDatasetPiece(path, partition_keys=part_keys,
open_file_func=self.open_file_func)
for path in files
])

Expand Down Expand Up @@ -870,10 +876,13 @@ class ParquetDataset(object):
How many threads to allow the thread pool which is used to read the
dataset metadata. Increasing this is helpful to read partitioned
datasets.
memory_map : boolean, default True
If the source is a file path, use a memory map to read each file in the
dataset if possible, which can improve performance in some environments
"""
def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1):
filters=None, metadata_nthreads=1, memory_map=True):
a_path = path_or_paths
if isinstance(a_path, list):
a_path = a_path[0]
Expand All @@ -884,21 +893,26 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
else:
self.paths = _parse_uri(path_or_paths)

self.memory_map = memory_map
self._open_file_func = self._get_open_file_func()

(self.pieces,
self.partitions,
self.common_metadata_path,
self.metadata_path) = _make_manifest(
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads)
path_or_paths, self.fs, metadata_nthreads=metadata_nthreads,
open_file_func=self._open_file_func)

if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
self.common_metadata = ParquetFile(f).metadata
self.common_metadata = (ParquetFile(f, memory_map=memory_map)
.metadata)
else:
self.common_metadata = None

if metadata is None and self.metadata_path is not None:
with self.fs.open(self.metadata_path) as f:
self.metadata = ParquetFile(f).metadata
self.metadata = ParquetFile(f, memory_map=memory_map).metadata
else:
self.metadata = metadata

Expand All @@ -917,13 +931,11 @@ def __init__(self, path_or_paths, filesystem=None, schema=None,
self._filter(filters)

def validate_schemas(self):
open_file = self._get_open_file_func()

if self.metadata is None and self.schema is None:
if self.common_metadata is not None:
self.schema = self.common_metadata.schema
else:
self.schema = self.pieces[0].get_metadata(open_file).schema
self.schema = self.pieces[0].get_metadata().schema
elif self.schema is None:
self.schema = self.metadata.schema

Expand All @@ -938,7 +950,7 @@ def validate_schemas(self):
dataset_schema = dataset_schema.remove(field_idx)

for piece in self.pieces:
file_metadata = piece.get_metadata(open_file)
file_metadata = piece.get_metadata()
file_schema = file_metadata.schema.to_arrow_schema()
if not dataset_schema.equals(file_schema, check_metadata=False):
raise ValueError('Schema in {0!s} was different. \n'
Expand All @@ -964,13 +976,10 @@ def read(self, columns=None, use_threads=True, use_pandas_metadata=False):
pyarrow.Table
Content of the file as a table (of columns)
"""
open_file = self._get_open_file_func()

tables = []
for piece in self.pieces:
table = piece.read(columns=columns, use_threads=use_threads,
partitions=self.partitions,
open_file_func=open_file,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)

Expand Down Expand Up @@ -1012,10 +1021,12 @@ def _get_open_file_func(self):
if self.fs is None or isinstance(self.fs, LocalFileSystem):
def open_file(path, meta=None):
return ParquetFile(path, metadata=meta,
memory_map=self.memory_map,
common_metadata=self.common_metadata)
else:
def open_file(path, meta=None):
return ParquetFile(self.fs.open(path, mode='rb'),
memory_map=self.memory_map,
metadata=meta,
common_metadata=self.common_metadata)
return open_file
Expand All @@ -1034,7 +1045,8 @@ def all_filters_accept(piece):
self.pieces = [p for p in self.pieces if all_filters_accept(p)]


def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):
def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1,
open_file_func=None):
partitions = None
common_metadata_path = None
metadata_path = None
Expand All @@ -1045,6 +1057,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):

if _is_path_like(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
open_file_func=open_file_func,
pathsep=fs.pathsep,
metadata_nthreads=metadata_nthreads)
common_metadata_path = manifest.common_metadata_path
Expand All @@ -1064,7 +1077,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):
if not fs.isfile(path):
raise IOError('Passed non-file path: {0}'
.format(path))
piece = ParquetDatasetPiece(path)
piece = ParquetDatasetPiece(path, open_file_func=open_file_func)
pieces.append(piece)

return pieces, partitions, common_metadata_path, metadata_path
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,22 @@ def test_dataset_read_pandas(tempdir):
tm.assert_frame_equal(result, expected)


def test_dataset_no_memory_map(tempdir):
# ARROW-2627: Check that we can use ParquetDataset without memory-mapping
dirpath = tempdir / guid()
dirpath.mkdir()

df = _test_dataframe(10, seed=0)
path = dirpath / '{}.parquet'.format(0)
table = pa.Table.from_pandas(df)
_write_table(table, path, version='2.0')

# TODO(wesm): Not sure how to easily check that memory mapping is _not_
# used. Mocking is not especially easy for pa.memory_map
dataset = pq.ParquetDataset(dirpath, memory_map=False)
assert dataset.pieces[0].read().equals(table)


@pytest.mark.parametrize('preserve_index', [True, False])
def test_dataset_read_pandas_common_metadata(tempdir, preserve_index):
# ARROW-1103
Expand Down

0 comments on commit f2fb02b

Please sign in to comment.