Skip to content

Commit

Permalink
ARROW-305: Add compression and use_dictionary options to Parquet inte…
Browse files Browse the repository at this point in the history
…rface

Change-Id: If9092030768265e7fb437dc9972461f96b438b72
  • Loading branch information
xhochy committed Sep 27, 2016
1 parent 45d8832 commit 93d653b
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 1 deletion.
12 changes: 12 additions & 0 deletions python/pyarrow/includes/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0"

enum Compression" parquet::Compression::type":
UNCOMPRESSED" parquet::Compression::UNCOMPRESSED"
SNAPPY" parquet::Compression::SNAPPY"
GZIP" parquet::Compression::GZIP"
LZO" parquet::Compression::LZO"
BROTLI" parquet::Compression::BROTLI"

cdef cppclass SchemaDescriptor:
shared_ptr[Node] schema()
GroupNode* group()
Expand Down Expand Up @@ -90,6 +97,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
cdef cppclass WriterProperties:
cppclass Builder:
Builder* version(ParquetVersion version)
Builder* compression(Compression codec)
Builder* compression(const c_string& path, Compression codec)
Builder* disable_dictionary()
Builder* enable_dictionary()
Builder* enable_dictionary(const c_string& path)
shared_ptr[WriterProperties] build()


Expand Down
49 changes: 48 additions & 1 deletion python/pyarrow/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def read_table(source, columns=None):
return reader.read_all()


def write_table(table, filename, chunk_size=None, version=None):
def write_table(table, filename, chunk_size=None, version=None,
use_dictionary=True, compression=None):
"""
Write a Table to Parquet format
Expand All @@ -102,6 +103,11 @@ def write_table(table, filename, chunk_size=None, version=None):
The maximum number of rows in each Parquet RowGroup
version : {"1.0", "2.0"}, default "1.0"
The Parquet format version, defaults to 1.0
use_dictionary : bool or list
Specify if we should use dictionary encoding in general or only for
some columns.
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
"""
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
Expand All @@ -121,6 +127,47 @@ def write_table(table, filename, chunk_size=None, version=None):
else:
raise ArrowException("Unsupported Parquet format version")

if isinstance(use_dictionary, bool):
if use_dictionary:
properties_builder.enable_dictionary()
else:
properties_builder.disable_dictionary()
else:
# Deactivate dictionary encoding by default
properties_builder.disable_dictionary()
for column in use_dictionary:
properties_builder.enable_dictionary(column)

if isinstance(compression, basestring):
if compression == "NONE":
properties_builder.compression(UNCOMPRESSED)
elif compression == "SNAPPY":
properties_builder.compression(SNAPPY)
elif compression == "GZIP":
properties_builder.compression(GZIP)
elif compression == "LZO":
properties_builder.compression(LZO)
elif compression == "BROTLI":
properties_builder.compression(BROTLI)
else:
raise ArrowException("Unsupport compression codec")
elif compression is not None:
# Deactivate dictionary encoding by default
properties_builder.disable_dictionary()
for column, codec in compression.iteritems():
if codec == "NONE":
properties_builder.compression(column, UNCOMPRESSED)
elif codec == "SNAPPY":
properties_builder.compression(column, SNAPPY)
elif codec == "GZIP":
properties_builder.compression(column, GZIP)
elif codec == "LZO":
properties_builder.compression(column, LZO)
elif codec == "BROTLI":
properties_builder.compression(column, BROTLI)
else:
raise ArrowException("Unsupport compression codec")

sink.reset(new LocalFileOutputStream(tobytes(filename)))
with nogil:
check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink,
Expand Down
40 changes: 40 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,43 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
df['uint32'] = df['uint32'].values.astype(np.int64)

pdt.assert_frame_equal(df, df_read)

@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
filename = tmpdir.join('pandas_rountrip.parquet')
arrow_table = A.from_pandas_dataframe(df)

for use_dictionary in [True, False]:
A.parquet.write_table(
arrow_table,
filename.strpath,
version="2.0",
use_dictionary=use_dictionary)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)

for compression in ['NONE', 'SNAPPY', 'GZIP']:
A.parquet.write_table(
arrow_table,
filename.strpath,
version="2.0",
compression=compression)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)

0 comments on commit 93d653b

Please sign in to comment.