Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evolution #818

Merged
merged 15 commits into from
Nov 14, 2022
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fastparquet/_version.py export-subst
10 changes: 10 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ jobs:

- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v2
Expand Down Expand Up @@ -51,6 +53,8 @@ jobs:

- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v2
Expand Down Expand Up @@ -81,6 +85,8 @@ jobs:

- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v2
Expand Down Expand Up @@ -112,6 +118,8 @@ jobs:

- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v2
Expand Down Expand Up @@ -147,6 +155,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v2
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test_wheel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ jobs:
CIBW_MANYLINUX_X86_64_IMAGE: "manylinux2014"
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/wheel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down Expand Up @@ -81,6 +83,8 @@ jobs:

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down Expand Up @@ -134,6 +138,8 @@ jobs:

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down Expand Up @@ -186,6 +192,8 @@ jobs:

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down Expand Up @@ -233,6 +241,8 @@ jobs:

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v2
Expand Down
3 changes: 2 additions & 1 deletion fastparquet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""parquet - read parquet files."""
__version__ = "0.8.3"

from ._version import __version__
from .writer import write, update_file_custom_metadata
from . import core, schema, converted_types, api
from .api import ParquetFile
from .util import ParquetException

5 changes: 5 additions & 0 deletions fastparquet/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# coding: utf-8
# file generated by setuptools_scm
# don't change, don't track in version control
__version__ = version = '0.8.4.dev5+dirty'
__version_tuple__ = version_tuple = (0, 8, 4, 'dev5', 'dirty')
39 changes: 25 additions & 14 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ class ParquetFile(object):
_categories = None

def __init__(self, fn, verify=False, open_with=default_open, root=False,
sep=None, fs=None, pandas_nulls=True):
sep=None, fs=None, pandas_nulls=True, dtypes=None):
self.pandas_nulls = pandas_nulls
self._base_dtype = dtypes
self.tz = None
if open_with is default_open and fs is None:
fs = fsspec.filesystem("file")
elif fs is not None:
Expand Down Expand Up @@ -139,6 +141,9 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False,
self.fn = join_path(fn)
with open_with(fn, 'rb') as f:
self._parse_header(f, verify)
if root:
paths = [fn.replace(root, "")]
self.file_scheme, self.cats = paths_to_cats(paths, None)
elif "*" in fn or fs.isdir(fn):
fn2 = join_path(fn, '_metadata')
if fs.exists(fn2):
Expand Down Expand Up @@ -168,6 +173,8 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False,
self.fmd = fmd
self._set_attrs()
self.fs = fs
else:
raise FileNotFoundError(fn)
else:
done = False
try:
Expand Down Expand Up @@ -248,10 +255,7 @@ def helper(self):
@property
def columns(self):
""" Column names """
return [c for c, i in self._schema[0]["children"].items()
if len(getattr(i, 'children', [])) == 0
or i.converted_type in [parquet_thrift.ConvertedType.LIST,
parquet_thrift.ConvertedType.MAP]]
return [_ for _ in self.dtypes if _ not in self.cats]

@property
def statistics(self):
Expand Down Expand Up @@ -675,7 +679,7 @@ def _column_filter(self, df, filters):
return out

def to_pandas(self, columns=None, categories=None, filters=[],
index=None, row_filter=False):
index=None, row_filter=False, dtypes=None):
"""
Read data from parquet into a Pandas dataframe.

Expand Down Expand Up @@ -752,7 +756,7 @@ def to_pandas(self, columns=None, categories=None, filters=[],
else:
size = sum(rg.num_rows for rg in rgs)
selected = [None] * len(rgs) # just to fill zip, below
df, views = self.pre_allocate(size, columns, categories, index)
df, views = self.pre_allocate(size, columns, categories, index, dtypes=dtypes)
start = 0
if self.file_scheme == 'simple':
infile = self.open(self.fn, 'rb')
Expand All @@ -775,11 +779,15 @@ def to_pandas(self, columns=None, categories=None, filters=[],
start += thislen
return df

def pre_allocate(self, size, columns, categories, index):
def pre_allocate(self, size, columns, categories, index, dtypes=None):
if dtypes is not None:
columns = list(dtypes)
else:
dtypes = self._dtypes(categories)
categories = self.check_categories(categories)
cats = {k: v for k, v in self.cats.items() if k in columns}
df, arrs = _pre_allocate(size, columns, categories, index, cats,
self._dtypes(categories), self.tz)
dtypes, self.tz)
i_no_name = re.compile(r"__index_level_\d+__")
if self.has_pandas_metadata:
md = self.pandas_metadata
Expand Down Expand Up @@ -913,7 +921,7 @@ def categories(self):
def _dtypes(self, categories=None):
""" Implied types of the columns in the schema """
import pandas as pd
if not hasattr(self, "_base_dtype"):
if self._base_dtype is None:
if self.has_pandas_metadata:
md = self.pandas_metadata['columns']
md = {c['name']: c for c in md}
Expand Down Expand Up @@ -971,17 +979,20 @@ def _dtypes(self, categories=None):
return dtype

def __getstate__(self):
if self.fmd.row_groups is None:
self.fmd.row_groups = []
return {"fn": self.fn, "open": self.open, "fmd": self.fmd,
"pandas_nulls": self.pandas_nulls}
"pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype,
"tz": self.tz}

def __setstate__(self, state):
self.__dict__.update(state)
# Decode 'file_path'.
rgs = self.fmd[4]
rgs = self.fmd[4] or []
# 4th condition should not be necessary, depends on 'deepcopy' version.
# https://github.com/dask/fastparquet/pull/731#issuecomment-1013507287
if (rgs[0][1] and rgs[0][1][0] and rgs[0][1][0].get(1)
and isinstance(rgs[0][1][0].get(1), bytes)):
if (rgs and rgs[0][1] and rgs[0][1][0] and rgs[0][1][0].get(1)
and isinstance(rgs[0][1][0].get(1), bytes)):
# for rg in fmd.row_groups:
for rg in rgs:
# chunk = rg.columns[0]
Expand Down
6 changes: 5 additions & 1 deletion fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
are arrays.
"""
out = assign
remains = set(_ for _ in out if not _.endswith("-catdef") and not _ + "-catdef" in out)
maps = {}

for column in rg.columns:
Expand All @@ -580,8 +581,9 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
name = ".".join(column.meta_data.path_in_schema[:-2])
else:
name = ".".join(column.meta_data.path_in_schema)
if name not in columns:
if name not in columns or name in cats:
continue
remains.discard(name)

read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
selfmade=selfmade, assign=out[name],
Expand All @@ -600,6 +602,8 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
out[name][:] = [dict(zip(k, v)) if k is not None else None
for k, v in zip(key, value)]
del maps[name]
for k in remains:
out[k][:] = None


def read_row_group(file, rg, columns, categories, schema_helper, cats,
Expand Down
Loading