From e8d26159ce96f26643480189557685fe614f4eb5 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Tue, 5 Nov 2024 19:33:43 -0800 Subject: [PATCH 1/2] Add write_parquet to pylibcudf --- python/cudf/cudf/_lib/parquet.pyx | 121 ++---- python/pylibcudf/pylibcudf/io/parquet.pxd | 65 +++- python/pylibcudf/pylibcudf/io/parquet.pyx | 368 +++++++++++++++++- python/pylibcudf/pylibcudf/io/types.pxd | 35 ++ python/pylibcudf/pylibcudf/io/types.pyx | 204 ++++++++++ .../pylibcudf/libcudf/io/parquet.pxd | 22 +- 6 files changed, 711 insertions(+), 104 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 1212637d330..546fd0698cd 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -14,7 +14,6 @@ except ImportError: import numpy as np -from cython.operator cimport dereference from cudf.api.types import is_list_like @@ -38,8 +37,6 @@ from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, - parquet_writer_options, - write_parquet as parquet_writer, ) from pylibcudf.libcudf.io.parquet_metadata cimport ( parquet_metadata, @@ -73,46 +70,6 @@ from pylibcudf cimport Table from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT -cdef class BufferArrayFromVector: - cdef Py_ssize_t length - cdef unique_ptr[vector[uint8_t]] in_vec - - # these two things declare part of the buffer interface - cdef Py_ssize_t shape[1] - cdef Py_ssize_t strides[1] - - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] in_vec - ): - cdef BufferArrayFromVector buf = BufferArrayFromVector() - buf.in_vec = move(in_vec) - buf.length = dereference(buf.in_vec).size() - return buf - - def __getbuffer__(self, Py_buffer *buffer, int flags): - cdef Py_ssize_t itemsize = sizeof(uint8_t) - - self.shape[0] = self.length - self.strides[0] = 1 - - buffer.buf = dereference(self.in_vec).data() - - buffer.format = NULL # byte - buffer.internal = NULL - buffer.itemsize = itemsize - buffer.len = self.length * itemsize # product(shape) * itemsize - buffer.ndim = 1 - buffer.obj = self - buffer.readonly = 0 - buffer.shape = self.shape - buffer.strides = self.strides - buffer.suboffsets = NULL - - def __releasebuffer__(self, Py_buffer *buffer): - pass - - def _parse_metadata(meta): file_is_range_index = False file_index_cols = None @@ -442,7 +399,7 @@ def write_parquet( object compression="snappy", object statistics="ROWGROUP", object metadata_file_path=None, - object int96_timestamps=False, + bool int96_timestamps=False, object row_group_size_bytes=None, object row_group_size_rows=None, object max_page_size_bytes=None, @@ -469,7 +426,6 @@ def write_parquet( # Create the write options cdef table_input_metadata tbl_meta - cdef vector[map[string, string]] user_data cdef table_view tv cdef vector[unique_ptr[data_sink]] _data_sinks cdef sink_info sink = make_sinks_info( @@ -515,20 +471,18 @@ def write_parquet( output_as_binary ) - cdef map[string, string] tmp_user_data if partitions_info is not None: + user_data = [] for start_row, num_row in partitions_info: + # TODO: Avoid shallow copy? partitioned_df = table.iloc[start_row: start_row + num_row].copy( deep=False ) pandas_metadata = generate_pandas_metadata(partitioned_df, index) - tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata) - user_data.push_back(tmp_user_data) - tmp_user_data.clear() + user_data.append({"pandas": pandas_metadata}) else: pandas_metadata = generate_pandas_metadata(table, index) - tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata) - user_data.push_back(tmp_user_data) + user_data = [{"pandas": pandas_metadata}] if header_version not in ("1.0", "2.0"): raise ValueError( @@ -536,30 +490,22 @@ def write_parquet( "Valid values are '1.0' and '2.0'" ) - dict_policy = ( - plc.io.types.DictionaryPolicy.ADAPTIVE - if use_dictionary - else plc.io.types.DictionaryPolicy.NEVER - ) - - comp_type = _get_comp_type(compression) - stat_freq = _get_stat_freq(statistics) - - cdef unique_ptr[vector[uint8_t]] out_metadata_c - cdef vector[string] c_column_chunks_file_paths - cdef bool _int96_timestamps = int96_timestamps cdef vector[partition_info] partitions # Perform write - cdef parquet_writer_options args = move( - parquet_writer_options.builder(sink, tv) + options = ( + plc.io.parquet.ParquetWriterOptions.builder(sink, tv) .metadata(tbl_meta) - .key_value_metadata(move(user_data)) - .compression(comp_type) - .stats_level(stat_freq) - .int96_timestamps(_int96_timestamps) + .key_value_metadata(user_data) + .compression(_get_comp_type(compression)) + .stats_level(_get_stat_freq(statistics)) + .int96_timestamps(int96_timestamps) .write_v2_headers(header_version == "2.0") - .dictionary_policy(dict_policy) + .dictionary_policy( + plc.io.types.DictionaryPolicy.ADAPTIVE + if use_dictionary + else plc.io.types.DictionaryPolicy.NEVER + ) .utc_timestamps(False) .write_arrow_schema(write_arrow_schema) .build() @@ -570,33 +516,28 @@ def write_parquet( partitions.push_back( partition_info(part[0], part[1]) ) - args.set_partitions(move(partitions)) + options.set_partitions(move(partitions)) if metadata_file_path is not None: - if is_list_like(metadata_file_path): - for path in metadata_file_path: - c_column_chunks_file_paths.push_back(str.encode(path)) + if isinstance(metadata_file_path, str): + options.set_column_chunks_file_paths([metadata_file_path]) else: - c_column_chunks_file_paths.push_back( - str.encode(metadata_file_path) - ) - args.set_column_chunks_file_paths(move(c_column_chunks_file_paths)) + options.set_column_chunks_file_paths(list(metadata_file_path)) if row_group_size_bytes is not None: - args.set_row_group_size_bytes(row_group_size_bytes) + options.set_row_group_size_bytes(row_group_size_bytes) if row_group_size_rows is not None: - args.set_row_group_size_rows(row_group_size_rows) + options.set_row_group_size_rows(row_group_size_rows) if max_page_size_bytes is not None: - args.set_max_page_size_bytes(max_page_size_bytes) + options.set_max_page_size_bytes(max_page_size_bytes) if max_page_size_rows is not None: - args.set_max_page_size_rows(max_page_size_rows) + options.set_max_page_size_rows(max_page_size_rows) if max_dictionary_size is not None: - args.set_max_dictionary_size(max_dictionary_size) + options.set_max_dictionary_size(max_dictionary_size) - with nogil: - out_metadata_c = move(parquet_writer(args)) + out_metadata = plc.io.parquet.write_parquet(options) if metadata_file_path is not None: - out_metadata_py = BufferArrayFromVector.from_unique_ptr( - move(out_metadata_c) + out_metadata_py = plc.io.parquet.BufferArrayFromVector.from_unique_ptr( + move(out_metadata) ) return np.asarray(out_metadata_py) else: @@ -740,7 +681,7 @@ cdef class ParquetWriter: ) if metadata_file_path is not None: - out_metadata_py = BufferArrayFromVector.from_unique_ptr( + out_metadata_py = plc.io.parquet.BufferArrayFromVector.from_unique_ptr( move(out_metadata_c) ) return np.asarray(out_metadata_py) @@ -842,7 +783,9 @@ cpdef merge_filemetadata(object filemetadata_list): with nogil: output_c = move(parquet_merge_metadata(list_c)) - out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c)) + out_metadata_py = plc.io.parquet.BufferArrayFromVector.from_unique_ptr( + move(output_c) + ) return np.asarray(out_metadata_py) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 9c476030ded..cbd10024224 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -1,14 +1,16 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.memory cimport unique_ptr +from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport SinkInfo, SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, ) from pylibcudf.libcudf.types cimport size_type +from pylibcudf.table cimport Table from pylibcudf.types cimport DataType @@ -33,3 +35,62 @@ cpdef read_parquet( # ReaderColumnSchema reader_column_schema = *, # DataType timestamp_type = * ) + +cdef class ParquetWriterOptions: + cdef parquet_writer_options options + + @staticmethod + cpdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table) + + cpdef void set_partitions(list partitions) + + cpdef void set_column_chunks_file_paths(list file_paths) + + cpdef void set_row_group_size_bytes(int size_bytes) + + cpdef void set_row_group_size_rows(int size_rows) + + cpdef void set_max_page_size_bytes(int size_bytes) + + cpdef void set_max_page_size_rows(int size_rows) + + cpdef void set_max_dictionary_size(int size_rows) + +cdef class ParquetWriterOptionsBuilder: + cdef parquet_writer_options_builder builder + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata) + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, dict metadata) + + cpdef ParquetWriterOptionsBuilder compression(self, CompressionType compression) + + cpdef ParquetWriterOptionsBuilder stats_level(self, StatisticsFreq sf) + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, DictionaryPolicy val) + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled) + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled) + + cpdef ParquetWriterOptions build(self) + + +cdef class BufferArrayFromVector: + cdef Py_ssize_t length + cdef unique_ptr[vector[uint8_t]] in_vec + + # these two things declare part of the buffer interface + cdef Py_ssize_t shape[1] + cdef Py_ssize_t strides[1] + + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ) + +cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 981ca7b8159..2baabb9cd7b 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -1,17 +1,26 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from cython.operator cimport dereference -from libc.stdint cimport int64_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport ( + CompressionType, + SinkInfo, + SourceInfo, + StatisticsFreq, + TableInputMetadata, + TableWithMetadata +) from pylibcudf.libcudf.expressions cimport expression from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_reader as cpp_chunked_parquet_reader, parquet_reader_options, read_parquet as cpp_read_parquet, + write_parquet as cpp_write_parquet, + parquet_writer_options, ) from pylibcudf.libcudf.io.types cimport table_with_metadata from pylibcudf.libcudf.types cimport size_type @@ -217,3 +226,358 @@ cpdef read_parquet( c_result = move(cpp_read_parquet(opts)) return TableWithMetadata.from_libcudf(c_result) + + +cdef class ParquetWriterOptions: + + @staticmethod + cpdef ParquetWriterOptionsBuilder builder(SinkInfo sink, Table table): + """ + Create builder to create ParquetWriterOptionsBuilder. + + Parameters + ---------- + sink : SinkInfo + The sink used for writer output + + table : Table + Table to be written to output + + Returns + ------- + ParquetWriterOptionsBuilder + """ + cdef ParquetWriterOptionsBuilder bldr = ParquetWriterOptionsBuilder.__new__( + ParquetWriterOptionsBuilder + ) + bldr.builder = parquet_writer_options.builder(sink.c_obj, table.view()) + return bldr + + cpdef void set_partitions(list partitions): + """ + Sets partitions. + + Parameters + ---------- + partitions : list[Partitions] + Partitions of input table in {start_row, num_rows} pairs. + + Returns + ------- + None + """ + self.options.set_partitions(partitions) + + cpdef void set_column_chunks_file_paths(list file_paths): + """ + Sets column chunks file path to be set in the raw output metadata. + + Parameters + ---------- + file_paths : list[str] + Vector of Strings which indicates file path. + + Returns + ------- + None + """ + self.options.set_column_chunks_file_paths([fp.encode() for fp in file_paths]) + + cpdef void set_row_group_size_bytes(int size_bytes): + """ + Sets the maximum row group size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum row group size, in bytes to set + + Returns + ------- + None + """ + self.options.set_row_group_size_bytes(size_bytes) + + cpdef void set_row_group_size_rows(int size_rows): + """ + Sets the maximum row group size, in rows. + + Parameters + ---------- + size_rows : int + Maximum row group size, in rows to set + + Returns + ------- + None + """ + self.options.set_row_group_size_rows(size_rows) + + cpdef void set_max_page_size_bytes(int size_bytes): + """ + Sets the maximum uncompressed page size, in bytes. + + Parameters + ---------- + size_bytes : int + Maximum uncompressed page size, in bytes to set + + Returns + ------- + None + """ + self.options.set_max_page_size_bytes(size_bytes) + + cpdef void set_max_page_size_rows(int size_rows): + """ + Sets the maximum page size, in rows. + + Parameters + ---------- + size_rows : int + Maximum page size, in rows to set. + + Returns + ------- + None + """ + self.options.set_max_page_size_rows(size_rows) + + cpdef void set_max_dictionary_size(int size_rows): + """ + Sets the maximum dictionary size, in bytes. + + Parameters + ---------- + size_rows : int + Sets the maximum dictionary size, in bytes.. + + Returns + ------- + None + """ + self.options.set_max_dictionary_size(size_rows) + + +cdef class ParquetWriterOptionsBuilder: + + cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata): + """ + Sets metadata. + + Parameters + ---------- + metadata : TableInputMetadata + Associated metadata + + Returns + ------- + Self + """ + self.builder.metadata(metadata.metadata) + return self + + cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + metadata : list[dict[str, str]] + Key-Value footer metadata + + Returns + ------- + Self + """ + self.builder.key_value_metadata( + [ + {key.encode(): value.encode() for key, value in mapping.items()} + for mapping in metadata + ] + ) + return self + + cpdef ParquetWriterOptionsBuilder compression(self, CompressionType compression): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + compression : CompressionType + The compression type to use + + Returns + ------- + Self + """ + self.builder.compression(compression) + return self + + cpdef ParquetWriterOptionsBuilder stats_level(self, StatisticsFreq sf): + """ + Sets the level of statistics. + + Parameters + ---------- + sf : StatisticsFreq + Level of statistics requested in the output file + + Returns + ------- + Self + """ + self.builder.stats_level(sf) + return self + + cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled): + """ + Sets whether int96 timestamps are written or not. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable int96 timestamps + + Returns + ------- + Self + """ + self.builder.int96_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled): + """ + Set to true if V2 page headers are to be written. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of V2 page headers. + + Returns + ------- + Self + """ + self.builder.write_v2_headers(enabled) + return self + + cpdef ParquetWriterOptionsBuilder dictionary_policy(self, DictionaryPolicy val): + """ + Sets the policy for dictionary use. + + Parameters + ---------- + val : DictionaryPolicy + Policy for dictionary use. + + Returns + ------- + Self + """ + self.builder.int96_timestamps(sf) + return self + + cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled): + """ + Set to true if timestamps are to be written as UTC. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of timestamps as UTC. + + Returns + ------- + Self + """ + self.builder.utc_timestamps(enabled) + return self + + cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled): + """ + Set to true if arrow schema is to be written. + + Parameters + ---------- + enabled : bool + Boolean value to enable/disable writing of arrow schema. + + Returns + ------- + Self + """ + self.builder.write_arrow_schema(enabled) + return self + + cpdef ParquetWriterOptions build(self): + """ + Options member once it's built + + Returns + ------- + ParquetWriterOptions + """ + cdef ParquetWriterOptions parquet_options = ParquetWriterOptions.__new__( + ParquetWriterOptions + ) + parquet_options.options = move(self.builder.build()) + return parquet_options + + +cdef class BufferArrayFromVector: + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ): + cdef BufferArrayFromVector buf = BufferArrayFromVector() + buf.in_vec = move(in_vec) + buf.length = dereference(buf.in_vec).size() + return buf + + def __getbuffer__(self, Py_buffer *buffer, int flags): + cdef Py_ssize_t itemsize = sizeof(uint8_t) + + self.shape[0] = self.length + self.strides[0] = 1 + + buffer.buf = dereference(self.in_vec).data() + + buffer.format = NULL # byte + buffer.internal = NULL + buffer.itemsize = itemsize + buffer.len = self.length * itemsize # product(shape) * itemsize + buffer.ndim = 1 + buffer.obj = self + buffer.readonly = 0 + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + + def __releasebuffer__(self, Py_buffer *buffer): + pass + + +cpdef BufferArrayFromVector write_parquet(ParquetWriterOptions options): + """ + Writes a set of columns to parquet format. + + Parameters + ---------- + options : ParquetWriterOptions + Settings for controlling writing behavior + + Returns + ------- + BufferArrayFromVector + A blob that contains the file metadata + (parquet FileMetadata thrift message) if requested in + parquet_writer_options (empty blob otherwise). + """ + cdef parquet_writer_options c_options = options.options + cdef unique_ptr[vector[uint8_t]] c_result + + with nogil: + c_result = cpp_write_parquet(c_options) + + return BufferArrayFromVector.from_unique_ptr(move(c_result)) diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 0ab28cb0973..64a98eca7ca 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -19,8 +19,43 @@ from pylibcudf.libcudf.io.types cimport ( table_with_metadata, ) from pylibcudf.table cimport Table +from pylibcudf.io.types cimport ColumnEncoding +cdef class ColumnInMetadata: + cdef column_in_metadata metadata + + cpdef set_name(self, str name) + + cpdef ColumnInMetadata set_name(self, str name) + + cpdef ColumnInMetadata set_nullability(self, bool nullable) + + cpdef ColumnInMetadata set_list_column_as_map(self) + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req) + + cpdef ColumnInMetadata set_decimal_precision(self, int req) + + cpdef ColumnInMetadata child(self, int i) + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary) + + cpdef ColumnInMetadata set_type_length(self, int type_length) + + cpdef ColumnInMetadata set_skip_compression(self, bool skip) + + cpdef ColumnInMetadata set_encoding(self, ColumnEncoding encoding) + + cpdef str get_name(self) + +cdef class TableInputMetadata: + cdef public Table table + cdef table_input_metadata metadata + + @property + cpdef list column_metadata(self) + cdef class TableWithMetadata: cdef public Table tbl cdef table_metadata metadata diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 967d05e7057..e83abc372a4 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -10,9 +10,11 @@ from pylibcudf.io.datasource cimport Datasource from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.datasource cimport datasource from pylibcudf.libcudf.io.types cimport ( + column_in_metadata, column_name_info, host_buffer, source_info, + table_input_metadata, table_with_metadata, ) @@ -30,6 +32,208 @@ from pylibcudf.libcudf.io.types import ( statistics_freq as StatisticsFreq, # no-cython-lint ) +cdef class ColumnInMetadata: + """ + Metadata for a column + + Parameters + ---------- + metadata : column_in_metadata + """ + + def __init__(self, column_in_metadata metadata): + self.metadata = metadata + + cpdef ColumnInMetadata set_name(self, str name): + """ + Set the name of this column. + + Parameters + ---------- + name : str + Name of the column + + Returns + ------- + Self + """ + self.metadata.set_name(name.encode()) + return self + + cpdef ColumnInMetadata set_nullability(self, bool nullable): + """ + Set the nullability of this column. + + Parameters + ---------- + nullable : bool + Whether this column is nullable + + Returns + ------- + Self + """ + self.metadata.set_nullability(nullable) + return self + + cpdef ColumnInMetadata set_list_column_as_map(self): + """ + Specify that this list column should be encoded as a map in the + written file. + + Returns + ------- + Self + """ + self.metadata.set_list_column_as_map() + return self + + cpdef ColumnInMetadata set_int96_timestamps(self, bool req): + """ + Specifies whether this timestamp column should be encoded using + the deprecated int96. + + Parameters + ---------- + req : bool + True = use int96 physical type. False = use int64 physical type. + + Returns + ------- + Self + """ + self.metadata.set_int96_timestamps(req) + return self + + cpdef ColumnInMetadata set_decimal_precision(self, int req): + """ + Set the decimal precision of this column. + Only valid if this column is a decimal (fixed-point) type. + + Parameters + ---------- + precision : int + The integer precision to set for this decimal column + + Returns + ------- + Self + """ + self.metadata.set_decimal_precision(precision) + return self + + cpdef ColumnInMetadata child(self, int i): + """ + Get reference to a child of this column. + + Parameters + ---------- + i : int + Index of the child to get. + + Returns + ------- + Self + """ + self.metadata.child(i) + return self + + cpdef ColumnInMetadata set_output_as_binary(self, bool binary): + """ + Specifies whether this column should be written as binary or string data. + + Parameters + ---------- + binary : bool + True = use binary data type. False = use string data type + + Returns + ------- + Self + """ + self.metadata.set_output_as_binary(binary) + return self + + cpdef ColumnInMetadata set_type_length(self, int type_length): + """ + Sets the length of fixed length data. + + Parameters + ---------- + type_length : int + Size of the data type in bytes + + Returns + ------- + Self + """ + self.metadata.set_type_length(type_length) + return self + + cpdef ColumnInMetadata set_skip_compression(self, bool skip): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + skip : bool + If `true` do not compress this column + + Returns + ------- + Self + """ + self.metadata.set_skip_compression(skip) + return self + + cpdef ColumnInMetadata set_encoding(self, ColumnEncoding encoding): + """ + Specifies whether this column should not be compressed + regardless of the compression. + + Parameters + ---------- + encoding : ColumnEncoding + The encoding to use + + Returns + ------- + ColumnInMetadata + """ + self.metadata.set_encoding(encoding) + return self + + cpdef str get_name(self): + """ + Get the name of this column. + + Returns + ------- + str + The name of this column + """ + return self.metadata.get_name() + + +cdef class TableInputMetadata: + """ + Metadata for a table + + Parameters + ---------- + table : Table + The Table to construct metadata for + """ + def __init__(self, Table table): + self.metadata = table_input_metadata(table.view()) + + @property + cpdef list column_metadata(self): + return [ + ColumnInMetadata(metadata) for metadata in self.metadata.column_metadata + ] + cdef class TableWithMetadata: """A container holding a table and its associated metadata diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index de6a6c1e82d..4e96ff94a0b 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -86,7 +86,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_writer_options_base: parquet_writer_options_base() except + - sink_info get_sink_info() except + + sink_info get_sink() except + compression_type get_compression() except + statistics_freq get_stats_level() except + const optional[table_input_metadata]& get_metadata( @@ -116,11 +116,11 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_utc_timestamps( bool enabled ) except + - void set_row_group_size_bytes(size_t val) except + - void set_row_group_size_rows(size_type val) except + - void set_max_page_size_bytes(size_t val) except + - void set_max_page_size_rows(size_type val) except + - void set_max_dictionary_size(size_t val) except + + void set_row_group_size_bytes(size_t size_bytes) except + + void set_row_group_size_rows(size_type size_rows) except + + void set_max_page_size_bytes(size_t size_bytes) except + + void set_max_page_size_rows(size_type size_rows) except + + void set_max_dictionary_size(size_t size_bytes) except + void enable_write_v2_headers(bool val) except + void enable_write_arrow_schema(bool val) except + void set_dictionary_policy(dictionary_policy policy) except + @@ -133,7 +133,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: vector[partition_info] partitions ) except + void set_column_chunks_file_paths( - vector[string] column_chunks_file_paths + vector[string] file_paths ) except + @staticmethod @@ -146,10 +146,10 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder_base() except + BuilderT& metadata( - table_input_metadata m + table_input_metadata metadata ) except + BuilderT& key_value_metadata( - vector[map[string, string]] kvm + vector[map[string, string]] metadata ) except + BuilderT& stats_level( statistics_freq sf @@ -182,7 +182,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_t val ) except + BuilderT& write_v2_headers( - bool val + bool enabled ) except + BuilderT& dictionary_policy( dictionary_policy val @@ -205,7 +205,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except + cdef unique_ptr[vector[uint8_t]] write_parquet( - parquet_writer_options args + parquet_writer_options options ) except + cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): From 22ba40064b22b0b544b79a5a4aad4a28a535656f Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 6 Nov 2024 14:27:03 -0800 Subject: [PATCH 2/2] Start factoring out chunked reader --- python/cudf/cudf/_lib/parquet.pyx | 113 +++++++++++----------- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- python/pylibcudf/pylibcudf/io/types.pxd | 9 +- python/pylibcudf/pylibcudf/io/types.pyx | 53 ++++++---- 4 files changed, 96 insertions(+), 81 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 546fd0698cd..06585dfa5d7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -1,6 +1,7 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import io +import itertools import pyarrow as pa @@ -45,7 +46,6 @@ from pylibcudf.libcudf.io.parquet_metadata cimport ( from pylibcudf.libcudf.io.types cimport ( source_info, sink_info, - column_in_metadata, table_input_metadata, partition_info, statistics_freq, @@ -424,44 +424,45 @@ def write_parquet( """ # Create the write options - cdef table_input_metadata tbl_meta - - cdef table_view tv - cdef vector[unique_ptr[data_sink]] _data_sinks - cdef sink_info sink = make_sinks_info( - filepaths_or_buffers, _data_sinks - ) + sink = plc.io.SinkInfo(filepaths_or_buffers) if index is True or ( index is None and not isinstance(table._index, cudf.RangeIndex) ): - tv = table_view_from_table(table) - tbl_meta = table_input_metadata(tv) - for level, idx_name in enumerate(table._index.names): + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain(table.index._columns, table._columns) + ] + ) + tbl_meta = plc.io.types.TableInputMetadata(plc_table) + for level, idx_name in enumerate(table.index.names): tbl_meta.column_metadata[level].set_name( - str.encode( - _index_level_name(idx_name, level, table._column_names) - ) + _index_level_name(idx_name, level, table._column_names) ) - num_index_cols_meta = len(table._index.names) + num_index_cols_meta = level else: - tv = table_view_from_table(table, ignore_index=True) - tbl_meta = table_input_metadata(tv) + plc_table = plc.Table( + [col.to_pylibcudf(mode="read") for col in table._columns] + ) + tbl_meta = plc.io.types.TableInputMetadata(plc_table) num_index_cols_meta = 0 - for i, name in enumerate(table._column_names, num_index_cols_meta): + for i, (name, col) in enumerate( + table._column_labels_and_values, start=num_index_cols_meta + ): if not isinstance(name, str): if cudf.get_option("mode.pandas_compatible"): - tbl_meta.column_metadata[i].set_name(str(name).encode()) + tbl_meta.column_metadata[i].set_name(str(name)) else: raise ValueError( "Writing a Parquet file requires string column names" ) else: - tbl_meta.column_metadata[i].set_name(name.encode()) + tbl_meta.column_metadata[i].set_name(name) _set_col_metadata( - table[name]._column, + col, tbl_meta.column_metadata[i], force_nullable_schema, None, @@ -490,11 +491,9 @@ def write_parquet( "Valid values are '1.0' and '2.0'" ) - cdef vector[partition_info] partitions - # Perform write options = ( - plc.io.parquet.ParquetWriterOptions.builder(sink, tv) + plc.io.parquet.ParquetWriterOptions.builder(sink, plc_table) .metadata(tbl_meta) .key_value_metadata(user_data) .compression(_get_comp_type(compression)) @@ -511,12 +510,12 @@ def write_parquet( .build() ) if partitions_info is not None: + partitions = [ + plc.io.types.PartitionInfo(start, num) + for (start, num) in partitions_info + ] partitions.reserve(len(partitions_info)) - for part in partitions_info: - partitions.push_back( - partition_info(part[0], part[1]) - ) - options.set_partitions(move(partitions)) + options.set_partitions(partitions) if metadata_file_path is not None: if isinstance(metadata_file_path, str): options.set_column_chunks_file_paths([metadata_file_path]) @@ -696,34 +695,38 @@ cdef class ParquetWriter: def _initialize_chunked_state(self, table, num_partitions=1): """ Prepares all the values required to build the chunked_parquet_writer_options and creates a writer""" - cdef table_view tv - # Set the table_metadata - num_index_cols_meta = 0 - self.tbl_meta = table_input_metadata( - table_view_from_table(table, ignore_index=True)) if self.index is not False: - if isinstance(table._index, cudf.core.multiindex.MultiIndex): - tv = table_view_from_table(table) - self.tbl_meta = table_input_metadata(tv) - for level, idx_name in enumerate(table._index.names): + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") for col in + itertools.chain(table.index._columns, table._columns) + ] + ) + self.tbl_meta = plc.io.types.TableInputMetadata(plc_table) + if isinstance(table.index, cudf.core.multiindex.MultiIndex): + for level, idx_name in enumerate(table.index.names): self.tbl_meta.column_metadata[level].set_name( - (str.encode(idx_name)) - ) - num_index_cols_meta = len(table._index.names) - else: - if table._index.name is not None: - tv = table_view_from_table(table) - self.tbl_meta = table_input_metadata(tv) - self.tbl_meta.column_metadata[0].set_name( - str.encode(table._index.name) + idx_name ) - num_index_cols_meta = 1 + num_index_cols_meta = level + elif table.index.name is not None: + self.tbl_meta.column_metadata[0].set_name( + table.index.name + ) + num_index_cols_meta = 1 + else: + self.tbl_meta = table_input_metadata( + table_view_from_table(table, ignore_index=True) + ) + num_index_cols_meta = 0 - for i, name in enumerate(table._column_names, num_index_cols_meta): - self.tbl_meta.column_metadata[i].set_name(name.encode()) + for i, (name, col) in enumerate( + table._column_labels_and_values, start=num_index_cols_meta + ): + self.tbl_meta.column_metadata[i].set_name(name) _set_col_metadata( - table[name]._column, + col, self.tbl_meta.column_metadata[i], ) @@ -815,7 +818,7 @@ cdef compression_type _get_comp_type(object compression): cdef _set_col_metadata( Column col, - column_in_metadata& col_meta, + plc.io.types.ColumnInMetadata col_meta, bool force_nullable_schema=False, str path=None, object skip_compression=None, @@ -825,7 +828,7 @@ cdef _set_col_metadata( ): need_path = (skip_compression is not None or column_encoding is not None or column_type_length is not None or output_as_binary is not None) - name = col_meta.get_name().decode('UTF-8') if need_path else None + name = col_meta.get_name() if need_path else None full_path = path + "." + name if path is not None else name if force_nullable_schema: @@ -858,7 +861,7 @@ cdef _set_col_metadata( for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) ): - col_meta.child(i).set_name(name.encode()) + col_meta.child(i).set_name(name) _set_col_metadata( child_col, col_meta.child(i), @@ -872,7 +875,7 @@ cdef _set_col_metadata( elif isinstance(col.dtype, cudf.ListDtype): if full_path is not None: full_path = full_path + ".list" - col_meta.child(1).set_name("element".encode()) + col_meta.child(1).set_name("element") _set_col_metadata( col.children[1], col_meta.child(1), diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 2baabb9cd7b..62322a40d55 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -266,7 +266,7 @@ cdef class ParquetWriterOptions: ------- None """ - self.options.set_partitions(partitions) + self.options.set_partitions([partition.c_obj for partition in partitions]) cpdef void set_column_chunks_file_paths(list file_paths): """ diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 64a98eca7ca..bb0a310cd10 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -21,9 +21,11 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.table cimport Table from pylibcudf.io.types cimport ColumnEncoding +cdef class PartitionInfo: + cdef partition_info c_obj cdef class ColumnInMetadata: - cdef column_in_metadata metadata + cdef column_in_metadata c_obj cpdef set_name(self, str name) @@ -51,10 +53,7 @@ cdef class ColumnInMetadata: cdef class TableInputMetadata: cdef public Table table - cdef table_input_metadata metadata - - @property - cpdef list column_metadata(self) + cdef table_input_metadata c_obj cdef class TableWithMetadata: cdef public Table tbl diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index e83abc372a4..ec1bc5097d6 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -13,6 +13,7 @@ from pylibcudf.libcudf.io.types cimport ( column_in_metadata, column_name_info, host_buffer, + partition_info, source_info, table_input_metadata, table_with_metadata, @@ -32,6 +33,22 @@ from pylibcudf.libcudf.io.types import ( statistics_freq as StatisticsFreq, # no-cython-lint ) +cdef class PartitionInfo: + """ + Information used while writing partitioned datasets. + + Parameters + ---------- + start_row : int + The start row of the partition. + + num_rows : int + The number of rows in the partition. + """ + def __init__(self, int start_row, int num_rows): + self.c_obj = partition_info(start_row, num_row) + + cdef class ColumnInMetadata: """ Metadata for a column @@ -42,7 +59,7 @@ cdef class ColumnInMetadata: """ def __init__(self, column_in_metadata metadata): - self.metadata = metadata + self.c_obj = metadata cpdef ColumnInMetadata set_name(self, str name): """ @@ -57,7 +74,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_name(name.encode()) + self.c_obj.set_name(name.encode()) return self cpdef ColumnInMetadata set_nullability(self, bool nullable): @@ -73,7 +90,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_nullability(nullable) + self.c_obj.set_nullability(nullable) return self cpdef ColumnInMetadata set_list_column_as_map(self): @@ -85,7 +102,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_list_column_as_map() + self.c_obj.set_list_column_as_map() return self cpdef ColumnInMetadata set_int96_timestamps(self, bool req): @@ -102,7 +119,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_int96_timestamps(req) + self.c_obj.set_int96_timestamps(req) return self cpdef ColumnInMetadata set_decimal_precision(self, int req): @@ -119,7 +136,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_decimal_precision(precision) + self.c_obj.set_decimal_precision(precision) return self cpdef ColumnInMetadata child(self, int i): @@ -133,10 +150,9 @@ cdef class ColumnInMetadata: Returns ------- - Self + ColumnInMetadata """ - self.metadata.child(i) - return self + return ColumnInMetadata(self.c_obj.child(i)) cpdef ColumnInMetadata set_output_as_binary(self, bool binary): """ @@ -151,7 +167,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_output_as_binary(binary) + self.c_obj.set_output_as_binary(binary) return self cpdef ColumnInMetadata set_type_length(self, int type_length): @@ -167,7 +183,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_type_length(type_length) + self.c_obj.set_type_length(type_length) return self cpdef ColumnInMetadata set_skip_compression(self, bool skip): @@ -184,7 +200,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.metadata.set_skip_compression(skip) + self.c_obj.set_skip_compression(skip) return self cpdef ColumnInMetadata set_encoding(self, ColumnEncoding encoding): @@ -201,7 +217,7 @@ cdef class ColumnInMetadata: ------- ColumnInMetadata """ - self.metadata.set_encoding(encoding) + self.c_obj.set_encoding(encoding) return self cpdef str get_name(self): @@ -213,7 +229,7 @@ cdef class ColumnInMetadata: str The name of this column """ - return self.metadata.get_name() + return self.c_obj.get_name().decode() cdef class TableInputMetadata: @@ -226,12 +242,9 @@ cdef class TableInputMetadata: The Table to construct metadata for """ def __init__(self, Table table): - self.metadata = table_input_metadata(table.view()) - - @property - cpdef list column_metadata(self): - return [ - ColumnInMetadata(metadata) for metadata in self.metadata.column_metadata + self.c_obj = table_input_metadata(table.view()) + self.column_metadata = [ + ColumnInMetadata(metadata) for metadata in self.c_obj.column_metadata ]