Skip to content

Commit

Permalink
Use cramjam and py3 speed for string decode (#580)
Browse files Browse the repository at this point in the history
* Use cramjam and py3 speed for string decode

* update env

* include the code

* Add cramjam LZ4 (block) codec

* Omitted ttype attribute

* Remove .c file

This used to be the done thing ... before wheels! Now it's just bloat.
  • Loading branch information
martindurant authored Apr 19, 2021
1 parent 39a9d7c commit 2e65722
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 8,647 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ jobs:
strategy:
fail-fast: false
matrix:
CONDA_ENV: [py37, py38, py37z]
CONDA_ENV: [py37, py38, py39]
steps:
- name: APT
run: sudo apt-get install liblzo2-dev libsnappy-dev
run: sudo apt-get install liblzo2-dev

- name: Checkout
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions ci/environment-py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
1 change: 1 addition & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
5 changes: 3 additions & 2 deletions ci/environment-py37z.yml → ci/environment-py39.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- python=3.9
- brotli
- bson
- lz4
- lzo
- snappy
- zstd
- zstandard
- pytest
- numba
- cython
Expand All @@ -20,3 +20,4 @@ dependencies:
- numpy
- packaging
- python-snappy
- cramjam
9 changes: 6 additions & 3 deletions docs/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ Required:
- numpy
- pandas
- pytest
- cramjam

Optional (compression algorithms; gzip is always available):
`cramjam`_ provides compression codecs: gzip, snappy, lz4, brotli, zstd

.. _cramjam: https://github.com/milesgranger/pyrus-cramjam

Optional compression codec:

- python-snappy
- python-lzo
- brotli

Installation
------------
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,12 +662,12 @@ def filter_out_stats(rg, filters, schema):
s = column.meta_data.statistics
if s.max is not None:
b = ensure_bytes(s.max)
vmax = encoding.read_plain(b, column.meta_data.type, 1)
vmax = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
if se.converted_type is not None:
vmax = converted_types.convert(vmax, se)
if s.min is not None:
b = ensure_bytes(s.min)
vmin = encoding.read_plain(b, column.meta_data.type, 1)
vmin = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
if se.converted_type is not None:
vmin = converted_types.convert(vmin, se)
if filter_val(op, val, vmin, vmax):
Expand Down Expand Up @@ -708,7 +708,7 @@ def statistics(obj):
rv['max'] = ensure_bytes(s.max)
else:
rv['max'] = encoding.read_plain(ensure_bytes(s.max),
md.type, 1)[0]
md.type, 1, stat=True)[0]
except:
rv['max'] = None
if s.min is not None:
Expand All @@ -717,7 +717,7 @@ def statistics(obj):
rv['min'] = ensure_bytes(s.min)
else:
rv['min'] = encoding.read_plain(ensure_bytes(s.min),
md.type, 1)[0]
md.type, 1, stat=True)[0]
except:
rv['min'] = None
if s.null_count is not None:
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/benchmarks/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def time_column():
df = d[[col]]
write(fn, df)
with measure('%s: write, no nulls' % d.dtypes[col], result):
write(fn, df, has_nulls=False)
write(fn, df, has_nulls=False)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -46,7 +46,7 @@ def time_column():
pf.to_pandas()

with measure('%s: write, no nulls, has_null=True' % d.dtypes[col], result):
write(fn, df, has_nulls=True)
write(fn, df, has_nulls=True)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -63,7 +63,7 @@ def time_column():
else:
d.loc[n//2, col] = None
with measure('%s: write, with null, has_null=True' % d.dtypes[col], result):
write(fn, df, has_nulls=True)
write(fn, df, has_nulls=True)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand All @@ -72,7 +72,7 @@ def time_column():
pf.to_pandas()

with measure('%s: write, with null, has_null=False' % d.dtypes[col], result):
write(fn, df, has_nulls=False)
write(fn, df, has_nulls=False)#, compression="SNAPPY")

pf = ParquetFile(fn)
pf.to_pandas() # warm-up
Expand Down
85 changes: 29 additions & 56 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

import gzip
import cramjam
from .thrift_structures import parquet_thrift

# TODO: use stream/direct-to-buffer conversions instead of memcopy
Expand All @@ -16,24 +16,23 @@


def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
return gzip.compress(data, compresslevel=compresslevel)
return cramjam.gzip.compress(data, level=compresslevel)


def gzip_decompress(data, uncompressed_size):
return gzip.decompress(data)
return cramjam.gzip.decompress(data, output_len=uncompressed_size)


compressions['GZIP'] = gzip_compress_v3
decompressions['GZIP'] = gzip_decompress

try:
import snappy
def snappy_decompress(data, uncompressed_size):
return snappy.decompress(data)
compressions['SNAPPY'] = snappy.compress
decompressions['SNAPPY'] = snappy_decompress
except ImportError:
pass

def snappy_decompress(data, uncompressed_size):
return cramjam.snappy.decompress_raw(data)


compressions['SNAPPY'] = cramjam.snappy.compress_raw
decompressions['SNAPPY'] = snappy_decompress
try:
import lzo
def lzo_decompress(data, uncompressed_size):
Expand All @@ -42,51 +41,24 @@ def lzo_decompress(data, uncompressed_size):
decompressions['LZO'] = lzo_decompress
except ImportError:
pass
try:
import brotli
def brotli_decompress(data, uncompressed_size):
return brotli.decompress(data)
compressions['BROTLI'] = brotli.compress
decompressions['BROTLI'] = brotli_decompress
except ImportError:
pass
try:
import lz4.block
def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return lz4.block.compress(data, **kwargs)
def lz4_decompress(data, uncompressed_size):
return lz4.block.decompress(data, uncompressed_size=uncompressed_size)
compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = lz4_decompress
except ImportError:
pass
try:
import zstandard
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstandard.ZstdCompressor(**kwargs)
return cctx.compress(data)
def zstd_decompress(data, uncompressed_size):
dctx = zstandard.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
if 'ZSTD' not in compressions:
try:
import zstd
def zstd_compress(data, level=None):
if level is not None:
return zstd.compress(data, level)
return zstd.compress(data)
def zstd_decompress(data, _uncompressed_size=None):
return zstd.decompress(data)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
compressions['BROTLI'] = cramjam.brotli.compress
decompressions['BROTLI'] = cramjam.brotli.decompress


def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return cramjam.lz4.compress_block(data, **kwargs)


compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = cramjam.lz4.decompress_block

# LZ4 is actually LZ4 block, aka "raw", see
# https://github.com/apache/parquet-format/commit/7f06e838cbd1b7dbd722ff2580b9c2525e37fc46
compressions['LZ4_RAW'] = lz4_compress
decompressions['LZ4_RAW'] = cramjam.lz4.decompress_block
compressions['ZSTD'] = cramjam.zstd.compress
decompressions['ZSTD'] = cramjam.zstd.decompress

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
Expand Down Expand Up @@ -119,6 +91,7 @@ def compress_data(data, compression='gzip'):
raise ValueError("args dict entry is not a dict")
return compressions[algorithm.upper()](data, **args)


def decompress_data(data, uncompressed_size, algorithm='gzip'):
if isinstance(algorithm, int):
algorithm = rev_map[algorithm]
Expand Down
8 changes: 4 additions & 4 deletions fastparquet/converted_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import sys

from .thrift_structures import parquet_thrift
from .speedups import array_decode_utf8

logger = logging.getLogger('parquet') # pylint: disable=invalid-name

Expand Down Expand Up @@ -93,9 +92,10 @@ def convert(data, se, timestamp96=True):
if ctype is None:
return data
if ctype == parquet_thrift.ConvertedType.UTF8:
if isinstance(data, list) or data.dtype != "O":
data = np.asarray(data, dtype="O")
return array_decode_utf8(data)
if data.dtype != "O":
# stats pairs
return np.array([o.decode() for o in data])
return np.array(data) # was already converted in speedups
if ctype == parquet_thrift.ConvertedType.DECIMAL:
scale_factor = 10**-se.scale
if data.dtype.kind in ['i', 'f']:
Expand Down
20 changes: 11 additions & 9 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,20 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)

nval = daph.num_values - num_nulls
se = helper.schema_element(metadata.path_in_schema)
if daph.encoding == parquet_thrift.Encoding.PLAIN:

width = helper.schema_element(metadata.path_in_schema).type_length
values = encoding.read_plain(raw_bytes[io_obj.loc:],
values = encoding.read_plain(bytearray(raw_bytes)[io_obj.loc:],
metadata.type,
int(daph.num_values - num_nulls),
width=width)
width=width,
utf=se.converted_type == 0)
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
if daph.encoding == parquet_thrift.Encoding.RLE:
bit_width = helper.schema_element(
metadata.path_in_schema).type_length
bit_width = se.type_length
else:
bit_width = io_obj.read_byte()
if bit_width in [8, 16, 32] and selfmade:
Expand Down Expand Up @@ -149,15 +151,15 @@ def skip_definition_bytes(io_obj, num):
n //= 128


def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, utf=False):
"""Read a page containing dictionary data.
Consumes data using the plain encoding and returns an array of values.
"""
raw_bytes = _read_page(file_obj, page_header, column_metadata)
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
values = np.array(unpack_byte_array(raw_bytes,
page_header.dictionary_page_header.num_values), dtype='object')
values = np.array(unpack_byte_array(bytearray(raw_bytes),
page_header.dictionary_page_header.num_values, utf=utf), dtype='object')
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema).type_length
Expand Down Expand Up @@ -196,7 +198,7 @@ def read_col(column, schema_helper, infile, use_cat=False,

dic = None
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic = read_dictionary_page(infile, schema_helper, ph, cmd)
dic = read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0)
ph = read_thrift(infile, parquet_thrift.PageHeader)
dic = convert(dic, se)
if grab_dict:
Expand Down Expand Up @@ -229,7 +231,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
row_idx = 0
while True:
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd))
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0))
dic2 = convert(dic2, se)
if use_cat and (dic2 != dic).any():
raise RuntimeError("Attempt to read as categorical a column"
Expand Down
18 changes: 8 additions & 10 deletions fastparquet/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def read_plain_boolean(raw_bytes, count):
}


def read_plain(raw_bytes, type_, count, width=0):
def read_plain(raw_bytes, type_, count, width=0, utf=False, stat=False):
if type_ in DECODE_TYPEMAP:
dtype = DECODE_TYPEMAP[type_]
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
Expand All @@ -46,15 +46,13 @@ def read_plain(raw_bytes, type_, count, width=0):
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.BOOLEAN:
return read_plain_boolean(raw_bytes, count)
# variable byte arrays (rare)
try:
return np.array(unpack_byte_array(raw_bytes, count), dtype='O')
except RuntimeError:
if count == 1:
# e.g., for statistics
return np.array([raw_bytes], dtype='O')
else:
raise
if type_ == parquet_thrift.Type.BYTE_ARRAY:
if stat:
if utf:
return np.array([bytes(raw_bytes).decode()], dtype='O')
else:
return np.array([bytes(raw_bytes)], dtype='O')
return np.array(unpack_byte_array(raw_bytes, count, utf=utf))


@numba.jit(nogil=True)
Expand Down
1 change: 1 addition & 0 deletions fastparquet/parquet.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ enum CompressionCodec {
BROTLI = 4;
LZ4 = 5;
ZSTD = 6;
LZ4_RAW = 7;
}

enum PageType {
Expand Down
3 changes: 3 additions & 0 deletions fastparquet/parquet_thrift/parquet/ttypes.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2e65722

Please sign in to comment.