Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ORC Writer to pylibcudf #17310

Merged
merged 25 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4e2f60c
[WIP] Migrate ORC Writer to pylibcudf
Matt711 Nov 13, 2024
3648886
Merge branch 'branch-24.12' into pylibcudf-io-orc
Matt711 Nov 15, 2024
763b870
add orc chuncked writer
Matt711 Nov 18, 2024
120ec07
check style
Matt711 Nov 18, 2024
db110c2
Merge branch 'branch-24.12' into pylibcudf-io-orc
Matt711 Nov 19, 2024
9f20820
add a test
Matt711 Nov 19, 2024
b904747
clean up, address review
Matt711 Nov 19, 2024
0819020
Merge branch 'branch-24.12' into pylibcudf-io-orc
Matt711 Nov 19, 2024
5fc0eec
use a pointer instead
Matt711 Nov 19, 2024
866fdc2
add doc strings
Matt711 Nov 20, 2024
734bbcd
Merge branch 'branch-24.12' into pylibcudf-io-orc
Matt711 Nov 20, 2024
d094bcf
Merge branch 'pylibcudf-io-orc' of github.com:Matt711/cudf into pylib…
Matt711 Nov 20, 2024
50e5e16
skip test if pandas version < 2.2.3
Matt711 Nov 20, 2024
52b1c77
address review
Matt711 Nov 20, 2024
fd4c6cd
address review
Matt711 Nov 21, 2024
960b7d4
merge conflict and add gc test
Matt711 Nov 21, 2024
826f8c8
merge conflict
Matt711 Nov 25, 2024
3bd27a7
clean up
Matt711 Nov 25, 2024
31853a8
try a different approach in gc test
Matt711 Nov 25, 2024
b2a154a
enable diable gc in test
Matt711 Nov 25, 2024
8ae013f
address review
Matt711 Nov 26, 2024
1cf3078
Merge branch 'branch-25.02' into pylibcudf-io-orc
Matt711 Nov 26, 2024
b51c926
Update python/pylibcudf/pylibcudf/tests/io/test_types.py
Matt711 Nov 26, 2024
49953e5
Merge branch 'branch-25.02' into pylibcudf-io-orc
Matt711 Nov 26, 2024
5b12ef5
address review
Matt711 Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 83 additions & 84 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from libc.stdint cimport int64_t
from libcpp cimport bool, int
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import itertools
from collections import OrderedDict

try:
Expand All @@ -16,31 +14,19 @@ except ImportError:
import json

cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
orc_chunked_writer,
orc_writer_options,
write_orc as libcudf_write_orc,
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
sink_info,
table_input_metadata,
)
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.io.utils cimport update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

import cudf
from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES
from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.core.buffer import acquire_spill_lock

from pylibcudf.io.types cimport TableInputMetadata, SinkInfo, ColumnInMetadata
from pylibcudf.io.orc cimport OrcChunkedWriter

# TODO: Consider inlining this function since it seems to only be used in one place.
cpdef read_parsed_orc_statistics(filepath_or_buffer):
Expand Down Expand Up @@ -246,61 +232,58 @@ def write_orc(
--------
cudf.read_orc
"""
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, index)
)

user_data = {}
user_data["pandas"] = generate_pandas_metadata(table, index)
if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
columns = table._columns if table._index is None else [
*table.index._columns, *table._columns
]
plc_table = plc.Table([col.to_pylibcudf(mode="read") for col in columns])
tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
_index_level_name(idx_name, level, table._column_names)
)
num_index_cols_meta = len(table._index.names)
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = TableInputMetadata(plc_table)
num_index_cols_meta = 0

if cols_as_map_type is not None:
cols_as_map_type = set(cols_as_map_type)

for i, name in enumerate(table._column_names, num_index_cols_meta):
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.column_metadata[i].set_name(name)
_set_col_children_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
(cols_as_map_type is not None)
and (name in cols_as_map_type),
)

cdef orc_writer_options c_orc_writer_options = move(
orc_writer_options.builder(
sink_info_c, tv
).metadata(tbl_meta)
.key_value_metadata(move(user_data))
options = (
plc.io.orc.OrcWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc_table
)
.metadata(tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
if stripe_size_bytes is not None:
c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes)
options.set_stripe_size_bytes(stripe_size_bytes)
if stripe_size_rows is not None:
c_orc_writer_options.set_stripe_size_rows(stripe_size_rows)
options.set_stripe_size_rows(stripe_size_rows)
if row_index_stride is not None:
c_orc_writer_options.set_row_index_stride(row_index_stride)
options.set_row_index_stride(row_index_stride)

with nogil:
libcudf_write_orc(c_orc_writer_options)
plc.io.orc.write_orc(options)


cdef int64_t get_skiprows_arg(object arg) except*:
Expand All @@ -326,13 +309,12 @@ cdef class ORCWriter:
cudf.io.orc.to_orc
"""
cdef bool initialized
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef OrcChunkedWriter writer
cdef SinkInfo sink
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef TableInputMetadata tbl_meta
cdef object cols_as_map_type
cdef object stripe_size_bytes
cdef object stripe_size_rows
Expand All @@ -347,8 +329,7 @@ cdef class ORCWriter:
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.sink = plc.io.SinkInfo([path])
self.statistics = statistics
self.compression = compression
self.index = index
Expand All @@ -368,17 +349,21 @@ cdef class ORCWriter:
table._index.name is not None or
isinstance(table._index, cudf.core.multiindex.MultiIndex)
)
tv = table_view_from_table(table, not keep_index)
if keep_index:
columns = [
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
else:
columns = [col.to_pylibcudf(mode="read") for col in table._columns]

with nogil:
self.writer.get()[0].write(tv)
self.writer.write(plc.Table(columns))

def close(self):
if not self.initialized:
return

with nogil:
self.writer.get()[0].close()
self.writer.close()

def __dealloc__(self):
self.close()
Expand All @@ -387,71 +372,85 @@ cdef class ORCWriter:
"""
Prepare all the values required to build the
chunked_orc_writer_options anb creates a writer"""
cdef table_view tv

num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True),
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
self.tbl_meta = TableInputMetadata(plc_table)
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
idx_name
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(
table.index._columns, table._columns
)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
self.tbl_meta.column_metadata[0].set_name(
str.encode(table._index.name)
table._index.name
)
num_index_cols_meta = 1

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
self.tbl_meta.column_metadata[i].set_name(name)
_set_col_children_metadata(
table[name]._column,
self.tbl_meta.column_metadata[i],
(self.cols_as_map_type is not None)
and (name in self.cols_as_map_type),
)

cdef map[string, string] user_data
user_data = {}
pandas_metadata = generate_pandas_metadata(table, self.index)
user_data[str.encode("pandas")] = str.encode(pandas_metadata)

cdef chunked_orc_writer_options c_opts = move(
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
user_data["pandas"] = pandas_metadata

options = (
plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
c_opts.set_stripe_size_bytes(self.stripe_size_bytes)
options.set_stripe_size_bytes(self.stripe_size_bytes)
if self.stripe_size_rows is not None:
c_opts.set_stripe_size_rows(self.stripe_size_rows)
options.set_stripe_size_rows(self.stripe_size_rows)
if self.row_index_stride is not None:
c_opts.set_row_index_stride(self.row_index_stride)
options.set_row_index_stride(self.row_index_stride)

with nogil:
self.writer.reset(new orc_chunked_writer(c_opts))
self.writer = plc.io.orc.OrcChunkedWriter.from_options(options)

self.initialized = True

cdef _set_col_children_metadata(Column col,
column_in_metadata& col_meta,
ColumnInMetadata col_meta,
list_column_as_map=False):
if isinstance(col.dtype, cudf.StructDtype):
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
):
col_meta.child(i).set_name(name.encode())
col_meta.child(i).set_name(name)
_set_col_children_metadata(
child_col, col_meta.child(i), list_column_as_map
)
Expand Down
65 changes: 63 additions & 2 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ from libcpp cimport bool
from libcpp.optional cimport optional
from libcpp.string cimport string
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from libcpp.memory cimport unique_ptr
from libcpp.map cimport map
from pylibcudf.io.types cimport (
SourceInfo,
SinkInfo,
TableWithMetadata,
TableInputMetadata,
)
from pylibcudf.libcudf.io.orc_metadata cimport (
column_statistics,
parsed_orc_statistics,
statistics_type,
)
from pylibcudf.libcudf.io.orc cimport (
orc_chunked_writer,
orc_writer_options,
orc_writer_options_builder,
chunked_orc_writer_options,
chunked_orc_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table
from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)

cpdef TableWithMetadata read_orc(
SourceInfo source_info,
Expand Down Expand Up @@ -48,3 +66,46 @@ cdef class ParsedOrcStatistics:
cpdef ParsedOrcStatistics read_parsed_orc_statistics(
SourceInfo source_info
)

cdef class OrcWriterOptions:
cdef orc_writer_options c_obj
cdef Table table
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_type size_rows)
cpdef void set_row_index_stride(self, size_type stride)

cdef class OrcWriterOptionsBuilder:
cdef orc_writer_options_builder c_obj
cdef Table table
cdef SinkInfo sink
cpdef OrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
cpdef void close(self)
cpdef void write(self, Table table)

cdef class ChunkedOrcWriterOptions:
cdef chunked_orc_writer_options c_obj
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_type size_rows)
cpdef void set_row_index_stride(self, size_type stride)

cdef class ChunkedOrcWriterOptionsBuilder:
cdef chunked_orc_writer_options_builder c_obj
cdef SinkInfo sink
cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata(
self, dict kvm
)
cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef ChunkedOrcWriterOptions build(self)
Loading
Loading