diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 57c35ba89445b..cb791e16f926d 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -19,7 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool -from pyarrow.includes.libarrow_io cimport ReadableFileInterface +from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil: ParquetReadSource(ParquetAllocator* allocator) Open(const shared_ptr[ReadableFileInterface]& file) + cdef cppclass ParquetWriteSink: + ParquetWriteSink(const shared_ptr[OutputStream]& file) + cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil: CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file, @@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil: cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: cdef CStatus WriteFlatTable( const CTable* table, MemoryPool* pool, - const shared_ptr[ParquetOutputStream]& sink, + const shared_ptr[ParquetWriteSink]& sink, int64_t chunk_size, const shared_ptr[WriterProperties]& properties) diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index a6e3ac30684b4..83fddb287a3f1 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -21,7 +21,7 @@ from pyarrow.includes.libarrow cimport * from pyarrow.includes.parquet cimport * -from pyarrow.includes.libarrow_io cimport ReadableFileInterface +from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.array cimport Array @@ -151,7 +151,7 @@ def read_table(source, columns=None): return Table.from_arrays(columns, arrays) -def write_table(table, filename, chunk_size=None, version=None, +def write_table(table, sink, chunk_size=None, version=None, use_dictionary=True, compression=None): """ Write a Table to Parquet format @@ -159,7 +159,7 @@ def write_table(table, filename, chunk_size=None, version=None, Parameters ---------- table : pyarrow.Table - filename : string + sink: string or pyarrow.io.NativeFile chunk_size : int The maximum number of rows in each Parquet RowGroup. As a default, we will write a single RowGroup per file. @@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None, """ cdef Table table_ = table cdef CTable* ctable_ = table_.table - cdef shared_ptr[ParquetOutputStream] sink + cdef shared_ptr[ParquetWriteSink] sink_ + cdef shared_ptr[FileOutputStream] filesink_ cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: @@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None, else: raise ArrowException("Unsupport compression codec") - sink.reset(new LocalFileOutputStream(tobytes(filename))) + if isinstance(sink, six.string_types): + check_status(FileOutputStream.Open(tobytes(sink), &filesink_)) + sink_.reset(new ParquetWriteSink(filesink_)) + elif isinstance(sink, NativeFile): + sink_.reset(new ParquetWriteSink((sink).wr_file)) + with nogil: - check_status(WriteFlatTable(ctable_, default_memory_pool(), sink, + check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_, chunk_size_, properties_builder.build())) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index c1d44ce0d4230..841830f6fba3b 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -18,6 +18,7 @@ import pytest import pyarrow as A +import pyarrow.io as paio import numpy as np import pandas as pd @@ -131,6 +132,32 @@ def test_pandas_column_selection(tmpdir): pdt.assert_frame_equal(df[['uint8']], df_read) +@parquet +def test_pandas_parquet_native_file_roundtrip(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + arrow_table = A.from_pandas_dataframe(df) + imos = paio.InMemoryOutputStream() + pq.write_table(arrow_table, imos, version="2.0") + buf = imos.get_result() + reader = paio.BufferReader(buf) + df_read = pq.read_table(reader).to_pandas() + pdt.assert_frame_equal(df, df_read) + + @parquet def test_pandas_parquet_configuration_options(tmpdir): size = 10000