Skip to content

Commit

Permalink
Add write_parquet to pylibcudf (#17263)
Browse files Browse the repository at this point in the history
Broken off from #17252 since also replacing cudf Python's `write_parquet` usage would have made the PR fairly large.

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Lawrence Mitchell (https://github.com/wence-)
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Matthew Murray (https://github.com/Matt711)
  - Bradley Dice (https://github.com/bdice)

URL: #17263
  • Loading branch information
mroeschke authored Nov 22, 2024
1 parent 2827a03 commit 53e4525
Show file tree
Hide file tree
Showing 10 changed files with 840 additions and 18 deletions.
13 changes: 13 additions & 0 deletions python/pylibcudf/pylibcudf/contiguous_split.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport uint8_t
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.libcudf.contiguous_split cimport packed_columns

from .gpumemoryview cimport gpumemoryview
from .table cimport Table


cdef class HostBuffer:
cdef unique_ptr[vector[uint8_t]] c_obj
cdef size_t nbytes
cdef Py_ssize_t[1] shape
cdef Py_ssize_t[1] strides

@staticmethod
cdef HostBuffer from_unique_ptr(
unique_ptr[vector[uint8_t]] vec
)

cdef class PackedColumns:
cdef unique_ptr[packed_columns] c_obj

Expand Down
13 changes: 5 additions & 8 deletions python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,23 @@ __all__ = [

cdef class HostBuffer:
"""Owning host buffer that implements the buffer protocol"""
cdef unique_ptr[vector[uint8_t]] c_obj
cdef size_t nbytes
cdef Py_ssize_t[1] shape
cdef Py_ssize_t[1] strides

@staticmethod
cdef HostBuffer from_unique_ptr(
unique_ptr[vector[uint8_t]] vec
):
cdef HostBuffer out = HostBuffer()
cdef HostBuffer out = HostBuffer.__new__(HostBuffer)
# Allow construction from nullptr
out.nbytes = 0 if vec.get() == NULL else dereference(vec).size()
out.c_obj = move(vec)
out.nbytes = dereference(out.c_obj).size()
out.shape[0] = out.nbytes
out.strides[0] = 1
return out

__hash__ = None

def __getbuffer__(self, Py_buffer *buffer, int flags):
buffer.buf = dereference(self.c_obj).data()
# Empty vec produces empty buffer
buffer.buf = NULL if self.nbytes == 0 else dereference(self.c_obj).data()
buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = 1
Expand Down
62 changes: 60 additions & 2 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport int64_t
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.expressions cimport Expression
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.io.types cimport (
compression_type,
dictionary_policy,
statistics_freq,
SinkInfo,
SourceInfo,
TableInputMetadata,
TableWithMetadata,
)
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
parquet_writer_options,
parquet_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType


Expand All @@ -33,3 +45,49 @@ cpdef read_parquet(
# ReaderColumnSchema reader_column_schema = *,
# DataType timestamp_type = *
)

cdef class ParquetWriterOptions:
cdef parquet_writer_options c_obj
cdef Table table_ref
cdef SinkInfo sink_ref

cpdef void set_partitions(self, list partitions)

cpdef void set_column_chunks_file_paths(self, list file_paths)

cpdef void set_row_group_size_bytes(self, size_t size_bytes)

cpdef void set_row_group_size_rows(self, size_type size_rows)

cpdef void set_max_page_size_bytes(self, size_t size_bytes)

cpdef void set_max_page_size_rows(self, size_type size_rows)

cpdef void set_max_dictionary_size(self, size_t size_bytes)

cdef class ParquetWriterOptionsBuilder:
cdef parquet_writer_options_builder c_obj
cdef Table table_ref
cdef SinkInfo sink_ref

cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata)

cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata)

cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression)

cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf)

cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled)

cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled)

cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy val)

cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled)

cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled)

cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)
46 changes: 45 additions & 1 deletion python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping
from typing import Self

from pylibcudf.expressions import Expression
from pylibcudf.io.types import SourceInfo, TableWithMetadata
from pylibcudf.io.types import (
CompressionType,
DictionaryPolicy,
PartitionInfo,
SinkInfo,
SourceInfo,
StatisticsFreq,
TableInputMetadata,
TableWithMetadata,
)
from pylibcudf.table import Table

class ChunkedParquetReader:
def __init__(
Expand Down Expand Up @@ -34,3 +47,34 @@ def read_parquet(
# reader_column_schema: ReaderColumnSchema = *,
# timestamp_type: DataType = *
) -> TableWithMetadata: ...

class ParquetWriterOptions:
def __init__(self): ...
@staticmethod
def builder(
sink: SinkInfo, table: Table
) -> ParquetWriterOptionsBuilder: ...
def set_partitions(self, partitions: list[PartitionInfo]) -> None: ...
def set_column_chunks_file_paths(self, file_paths: list[str]) -> None: ...
def set_row_group_size_bytes(self, size_bytes: int) -> None: ...
def set_row_group_size_rows(self, size_rows: int) -> None: ...
def set_max_page_size_bytes(self, size_bytes: int) -> None: ...
def set_max_page_size_rows(self, size_rows: int) -> None: ...
def set_max_dictionary_size(self, size_bytes: int) -> None: ...

class ParquetWriterOptionsBuilder:
def __init__(self): ...
def metadata(self, metadata: TableInputMetadata) -> Self: ...
def key_value_metadata(
self, metadata: list[Mapping[str, str]]
) -> Self: ...
def compression(self, compression: CompressionType) -> Self: ...
def stats_level(self, sf: StatisticsFreq) -> Self: ...
def int96_timestamps(self, enabled: bool) -> Self: ...
def write_v2_headers(self, enabled: bool) -> Self: ...
def dictionary_policy(self, val: DictionaryPolicy) -> Self: ...
def utc_timestamps(self, enabled: bool) -> Self: ...
def write_arrow_schema(self, enabled: bool) -> Self: ...
def build(self) -> ParquetWriterOptions: ...

def write_parquet(options: ParquetWriterOptions) -> memoryview: ...
Loading

0 comments on commit 53e4525

Please sign in to comment.