Skip to content

Commit

Permalink
ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects
Browse files Browse the repository at this point in the history
Change-Id: I34e3826164a29b85de6ada6c4692a6a2bbb3aa76
  • Loading branch information
xhochy committed Dec 1, 2016
1 parent 3b946b8 commit 541b2eb
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
7 changes: 5 additions & 2 deletions python/pyarrow/includes/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream


cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
Expand Down Expand Up @@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil:
ParquetReadSource(ParquetAllocator* allocator)
Open(const shared_ptr[ReadableFileInterface]& file)

cdef cppclass ParquetWriteSink:
ParquetWriteSink(const shared_ptr[OutputStream]& file)


cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
Expand All @@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef CStatus WriteFlatTable(
const CTable* table, MemoryPool* pool,
const shared_ptr[ParquetOutputStream]& sink,
const shared_ptr[ParquetWriteSink]& sink,
int64_t chunk_size,
const shared_ptr[WriterProperties]& properties)
18 changes: 12 additions & 6 deletions python/pyarrow/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from pyarrow.includes.libarrow cimport *
from pyarrow.includes.parquet cimport *
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
cimport pyarrow.includes.pyarrow as pyarrow

from pyarrow.array cimport Array
Expand Down Expand Up @@ -151,15 +151,15 @@ def read_table(source, columns=None):
return Table.from_arrays(columns, arrays)


def write_table(table, filename, chunk_size=None, version=None,
def write_table(table, sink, chunk_size=None, version=None,
use_dictionary=True, compression=None):
"""
Write a Table to Parquet format
Parameters
----------
table : pyarrow.Table
filename : string
sink: string or pyarrow.io.NativeFile
chunk_size : int
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
Expand All @@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
"""
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
cdef shared_ptr[ParquetOutputStream] sink
cdef shared_ptr[ParquetWriteSink] sink_
cdef shared_ptr[FileOutputStream] filesink_
cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
Expand Down Expand Up @@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
else:
raise ArrowException("Unsupport compression codec")

sink.reset(new LocalFileOutputStream(tobytes(filename)))
if isinstance(sink, six.string_types):
check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
elif isinstance(sink, NativeFile):
sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))

with nogil:
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink,
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
chunk_size_, properties_builder.build()))
27 changes: 27 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

import pyarrow as A
import pyarrow.io as paio

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -131,6 +132,32 @@ def test_pandas_column_selection(tmpdir):

pdt.assert_frame_equal(df[['uint8']], df_read)

@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
arrow_table = A.from_pandas_dataframe(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
buf = imos.get_result()
reader = paio.BufferReader(buf)
df_read = pq.read_table(reader).to_pandas()
pdt.assert_frame_equal(df, df_read)


@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
Expand Down

0 comments on commit 541b2eb

Please sign in to comment.