Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cramjam and py3 speed for string decode #580

Merged
merged 6 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
strategy:
fail-fast: false
matrix:
CONDA_ENV: [py37, py38, py37z]
CONDA_ENV: [py37, py38, py39]
steps:
- name: APT
run: sudo apt-get install liblzo2-dev libsnappy-dev
run: sudo apt-get install liblzo2-dev

- name: Checkout
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions ci/environment-py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
1 change: 1 addition & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
5 changes: 3 additions & 2 deletions ci/environment-py37z.yml → ci/environment-py39.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- python=3.9
- brotli
- bson
- lz4
- lzo
- snappy
- zstd
- zstandard
- pytest
- numba
- cython
Expand All @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
9 changes: 6 additions & 3 deletions docs/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ Required:
- numpy
- pandas
- pytest
- cramjam

Optional (compression algorithms; gzip is always available):
`cramjam`_ provides compression codecs: gzip, snappy, lz4, brotli, zstd

.. _cramjam: https://github.com/milesgranger/pyrus-cramjam

Optional compression codec:

- python-snappy
- python-lzo
- brotli

Installation
------------
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,12 +662,12 @@ def filter_out_stats(rg, filters, schema):
s = column.meta_data.statistics
if s.max is not None:
b = ensure_bytes(s.max)
vmax = encoding.read_plain(b, column.meta_data.type, 1)
vmax = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
if se.converted_type is not None:
vmax = converted_types.convert(vmax, se)
if s.min is not None:
b = ensure_bytes(s.min)
vmin = encoding.read_plain(b, column.meta_data.type, 1)
vmin = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
if se.converted_type is not None:
vmin = converted_types.convert(vmin, se)
if filter_val(op, val, vmin, vmax):
Expand Down Expand Up @@ -708,7 +708,7 @@ def statistics(obj):
rv['max'] = ensure_bytes(s.max)
else:
rv['max'] = encoding.read_plain(ensure_bytes(s.max),
md.type, 1)[0]
md.type, 1, stat=True)[0]
except:
rv['max'] = None
if s.min is not None:
Expand All @@ -717,7 +717,7 @@ def statistics(obj):
rv['min'] = ensure_bytes(s.min)
else:
rv['min'] = encoding.read_plain(ensure_bytes(s.min),
md.type, 1)[0]
md.type, 1, stat=True)[0]
except:
rv['min'] = None
if s.null_count is not None:
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def time_column():
df = d[[col]]
write(fn, df)
with measure('%s: write, no nulls' % d.dtypes[col], result):
write(fn, df, has_nulls=False)
write(fn, df, has_nulls=False)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -46,7 +46,7 @@ def time_column():
pf.to_pandas()

with measure('%s: write, no nulls, has_null=True' % d.dtypes[col], result):
write(fn, df, has_nulls=True)
write(fn, df, has_nulls=True)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -63,7 +63,7 @@ def time_column():
else:
d.loc[n//2, col] = None
with measure('%s: write, with null, has_null=True' % d.dtypes[col], result):
write(fn, df, has_nulls=True)
write(fn, df, has_nulls=True)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -72,7 +72,7 @@ def time_column():
pf.to_pandas()

with measure('%s: write, with null, has_null=False' % d.dtypes[col], result):
write(fn, df, has_nulls=False)
write(fn, df, has_nulls=False)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand Down
85 changes: 29 additions & 56 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

import gzip
import cramjam
from .thrift_structures import parquet_thrift

# TODO: use stream/direct-to-buffer conversions instead of memcopy
Expand All @@ -16,24 +16,23 @@


def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
return gzip.compress(data, compresslevel=compresslevel)
return cramjam.gzip.compress(data, level=compresslevel)


def gzip_decompress(data, uncompressed_size):
return gzip.decompress(data)
return cramjam.gzip.decompress(data, output_len=uncompressed_size)


compressions['GZIP'] = gzip_compress_v3
decompressions['GZIP'] = gzip_decompress

try:
import snappy
def snappy_decompress(data, uncompressed_size):
return snappy.decompress(data)
compressions['SNAPPY'] = snappy.compress
decompressions['SNAPPY'] = snappy_decompress
except ImportError:
pass

def snappy_decompress(data, uncompressed_size):
return cramjam.snappy.decompress_raw(data)


compressions['SNAPPY'] = cramjam.snappy.compress_raw
decompressions['SNAPPY'] = snappy_decompress
try:
import lzo
def lzo_decompress(data, uncompressed_size):
Expand All @@ -42,51 +41,24 @@ def lzo_decompress(data, uncompressed_size):
decompressions['LZO'] = lzo_decompress
except ImportError:
pass
try:
import brotli
def brotli_decompress(data, uncompressed_size):
return brotli.decompress(data)
compressions['BROTLI'] = brotli.compress
decompressions['BROTLI'] = brotli_decompress
except ImportError:
pass
try:
import lz4.block
def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return lz4.block.compress(data, **kwargs)
def lz4_decompress(data, uncompressed_size):
return lz4.block.decompress(data, uncompressed_size=uncompressed_size)
compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = lz4_decompress
except ImportError:
pass
try:
import zstandard
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstandard.ZstdCompressor(**kwargs)
return cctx.compress(data)
def zstd_decompress(data, uncompressed_size):
dctx = zstandard.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
if 'ZSTD' not in compressions:
try:
import zstd
def zstd_compress(data, level=None):
if level is not None:
return zstd.compress(data, level)
return zstd.compress(data)
def zstd_decompress(data, _uncompressed_size=None):
return zstd.decompress(data)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
compressions['BROTLI'] = cramjam.brotli.compress
decompressions['BROTLI'] = cramjam.brotli.decompress


def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return cramjam.lz4.compress_block(data, **kwargs)


compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = cramjam.lz4.decompress_block

# LZ4 is actually LZ4 block, aka "raw", see
# https://github.com/apache/parquet-format/commit/7f06e838cbd1b7dbd722ff2580b9c2525e37fc46
compressions['LZ4_RAW'] = lz4_compress
decompressions['LZ4_RAW'] = cramjam.lz4.decompress_block
compressions['ZSTD'] = cramjam.zstd.compress
decompressions['ZSTD'] = cramjam.zstd.decompress

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
Expand Down Expand Up @@ -119,6 +91,7 @@ def compress_data(data, compression='gzip'):
raise ValueError("args dict entry is not a dict")
return compressions[algorithm.upper()](data, **args)


def decompress_data(data, uncompressed_size, algorithm='gzip'):
if isinstance(algorithm, int):
algorithm = rev_map[algorithm]
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import sys

from .thrift_structures import parquet_thrift
from .speedups import array_decode_utf8

logger = logging.getLogger('parquet') # pylint: disable=invalid-name

Expand Down Expand Up @@ -93,9 +92,10 @@ def convert(data, se, timestamp96=True):
if ctype is None:
return data
if ctype == parquet_thrift.ConvertedType.UTF8:
if isinstance(data, list) or data.dtype != "O":
data = np.asarray(data, dtype="O")
return array_decode_utf8(data)
if data.dtype != "O":
# stats pairs
return np.array([o.decode() for o in data])
return np.array(data) # was already converted in speedups
if ctype == parquet_thrift.ConvertedType.DECIMAL:
scale_factor = 10**-se.scale
if data.dtype.kind in ['i', 'f']:
Expand Down
20 changes: 11 additions & 9 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,20 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)

nval = daph.num_values - num_nulls
se = helper.schema_element(metadata.path_in_schema)
if daph.encoding == parquet_thrift.Encoding.PLAIN:

width = helper.schema_element(metadata.path_in_schema).type_length
values = encoding.read_plain(raw_bytes[io_obj.loc:],
values = encoding.read_plain(bytearray(raw_bytes)[io_obj.loc:],
metadata.type,
int(daph.num_values - num_nulls),
width=width)
width=width,
utf=se.converted_type == 0)
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
if daph.encoding == parquet_thrift.Encoding.RLE:
bit_width = helper.schema_element(
metadata.path_in_schema).type_length
bit_width = se.type_length
else:
bit_width = io_obj.read_byte()
if bit_width in [8, 16, 32] and selfmade:
Expand Down Expand Up @@ -149,15 +151,15 @@ def skip_definition_bytes(io_obj, num):
n //= 128


def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, utf=False):
"""Read a page containing dictionary data.

Consumes data using the plain encoding and returns an array of values.
"""
raw_bytes = _read_page(file_obj, page_header, column_metadata)
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
values = np.array(unpack_byte_array(raw_bytes,
page_header.dictionary_page_header.num_values), dtype='object')
values = np.array(unpack_byte_array(bytearray(raw_bytes),
page_header.dictionary_page_header.num_values, utf=utf), dtype='object')
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema).type_length
Expand Down Expand Up @@ -196,7 +198,7 @@ def read_col(column, schema_helper, infile, use_cat=False,

dic = None
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic = read_dictionary_page(infile, schema_helper, ph, cmd)
dic = read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0)
ph = read_thrift(infile, parquet_thrift.PageHeader)
dic = convert(dic, se)
if grab_dict:
Expand Down Expand Up @@ -229,7 +231,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
row_idx = 0
while True:
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd))
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0))
dic2 = convert(dic2, se)
if use_cat and (dic2 != dic).any():
raise RuntimeError("Attempt to read as categorical a column"
Expand Down
18 changes: 8 additions & 10 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def read_plain_boolean(raw_bytes, count):
}


def read_plain(raw_bytes, type_, count, width=0):
def read_plain(raw_bytes, type_, count, width=0, utf=False, stat=False):
if type_ in DECODE_TYPEMAP:
dtype = DECODE_TYPEMAP[type_]
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
Expand All @@ -46,15 +46,13 @@ def read_plain(raw_bytes, type_, count, width=0):
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.BOOLEAN:
return read_plain_boolean(raw_bytes, count)
# variable byte arrays (rare)
try:
return np.array(unpack_byte_array(raw_bytes, count), dtype='O')
except RuntimeError:
if count == 1:
# e.g., for statistics
return np.array([raw_bytes], dtype='O')
else:
raise
if type_ == parquet_thrift.Type.BYTE_ARRAY:
if stat:
if utf:
return np.array([bytes(raw_bytes).decode()], dtype='O')
else:
return np.array([bytes(raw_bytes)], dtype='O')
return np.array(unpack_byte_array(raw_bytes, count, utf=utf))


@numba.jit(nogil=True)
Expand Down
1 change: 1 addition & 0 deletions fastparquet/parquet.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ enum CompressionCodec {
BROTLI = 4;
LZ4 = 5;
ZSTD = 6;
LZ4_RAW = 7;
}

enum PageType {
Expand Down
3 changes: 3 additions & 0 deletions fastparquet/parquet_thrift/parquet/ttypes.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading