From 055d1b0639834015ee9608a63584c7f3fe8a11cb Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 1 Nov 2024 11:53:15 -0700 Subject: [PATCH 1/3] Use public pylibcudf compressiontypes --- python/cudf/cudf/_lib/csv.pyx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index 9ad96f610b3..c09e06bfc59 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -28,7 +28,7 @@ from pylibcudf.libcudf.io.csv cimport ( write_csv as cpp_write_csv, ) from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.types cimport compression_type, sink_info +from pylibcudf.libcudf.io.types cimport sink_info from pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.io.utils cimport make_sink_info @@ -148,13 +148,13 @@ def read_csv( byte_range = (0, 0) if compression is None: - c_compression = compression_type.NONE + c_compression = plc.io.types.CompressionType.NONE else: compression_map = { - "infer": compression_type.AUTO, - "gzip": compression_type.GZIP, - "bz2": compression_type.BZIP2, - "zip": compression_type.ZIP, + "infer": plc.io.types.CompressionType.AUTO, + "gzip": plc.io.types.CompressionType.GZIP, + "bz2": plc.io.types.CompressionType.BZIP2, + "zip": plc.io.types.CompressionType.ZIP, } c_compression = compression_map[compression] From 47b833644cd07746d875ffd17b8825a4fa1af881 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 1 Nov 2024 12:46:08 -0700 Subject: [PATCH 2/3] Use more type enums in json, parquet --- python/cudf/cudf/_lib/json.pyx | 38 ++++++++++--------------- python/cudf/cudf/_lib/parquet.pyx | 6 ++-- python/pylibcudf/pylibcudf/io/types.pyx | 6 ++-- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/python/cudf/cudf/_lib/json.pyx b/python/cudf/cudf/_lib/json.pyx index 9bbbcf60dcf..fb149603960 100644 --- a/python/cudf/cudf/_lib/json.pyx +++ b/python/cudf/cudf/_lib/json.pyx @@ -9,10 +9,6 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -cimport pylibcudf.libcudf.io.types as cudf_io_types -from pylibcudf.io.types cimport compression_type -from pylibcudf.libcudf.io.json cimport json_recovery_mode_t -from pylibcudf.libcudf.io.types cimport compression_type from pylibcudf.libcudf.types cimport data_type, type_id from pylibcudf.types cimport DataType @@ -24,15 +20,6 @@ from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io import pylibcudf as plc -cdef json_recovery_mode_t _get_json_recovery_mode(object on_bad_lines): - if on_bad_lines.lower() == "error": - return json_recovery_mode_t.FAIL - elif on_bad_lines.lower() == "recover": - return json_recovery_mode_t.RECOVER_WITH_NULL - else: - raise TypeError(f"Invalid parameter for {on_bad_lines=}") - - cpdef read_json(object filepaths_or_buffers, object dtype, bool lines, @@ -41,7 +28,7 @@ cpdef read_json(object filepaths_or_buffers, bool keep_quotes, bool mixed_types_as_string, bool prune_columns, - object on_bad_lines): + str on_bad_lines): """ Cython function to call into libcudf API, see `read_json`. @@ -64,19 +51,24 @@ cpdef read_json(object filepaths_or_buffers, filepaths_or_buffers[idx] = filepaths_or_buffers[idx].encode() # Setup arguments - cdef cudf_io_types.compression_type c_compression - if compression is not None: if compression == 'gzip': - c_compression = cudf_io_types.compression_type.GZIP + c_compression = plc.io.types.CompressionType.GZIP elif compression == 'bz2': - c_compression = cudf_io_types.compression_type.BZIP2 + c_compression = plc.io.types.CompressionType.BZIP2 elif compression == 'zip': - c_compression = cudf_io_types.compression_type.ZIP + c_compression = plc.io.types.CompressionType.ZIP else: - c_compression = cudf_io_types.compression_type.AUTO + c_compression = plc.io.types.CompressionType.AUTO + else: + c_compression = plc.io.types.CompressionType.NONE + + if on_bad_lines.lower() == "error": + c_on_bad_lines = plc.io.types.JSONRecoveryMode.FAIL + elif on_bad_lines.lower() == "recover": + c_on_bad_lines = plc.io.types.JSONRecoveryMode.RECOVER_WITH_NULL else: - c_compression = cudf_io_types.compression_type.NONE + raise TypeError(f"Invalid parameter for {on_bad_lines=}") processed_dtypes = None @@ -108,7 +100,7 @@ cpdef read_json(object filepaths_or_buffers, keep_quotes = keep_quotes, mixed_types_as_string = mixed_types_as_string, prune_columns = prune_columns, - recovery_mode = _get_json_recovery_mode(on_bad_lines) + recovery_mode = c_on_bad_lines ) df = cudf.DataFrame._from_data( *_data_from_columns( @@ -130,7 +122,7 @@ cpdef read_json(object filepaths_or_buffers, keep_quotes = keep_quotes, mixed_types_as_string = mixed_types_as_string, prune_columns = prune_columns, - recovery_mode = _get_json_recovery_mode(on_bad_lines) + recovery_mode = c_on_bad_lines ) df = cudf.DataFrame._from_data( diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index fa2690c7f21..faf8bf6ceaa 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -531,10 +531,10 @@ def write_parquet( "Valid values are '1.0' and '2.0'" ) - cdef cudf_io_types.dictionary_policy dict_policy = ( - cudf_io_types.dictionary_policy.ADAPTIVE + dict_policy = ( + plc.io.types.DictionaryPolicy.ADAPTIVE if use_dictionary - else cudf_io_types.dictionary_policy.NEVER + else plc.io.types.DictionaryPolicy.NEVER ) cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 563a02761da..167ec2a056d 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -23,8 +23,10 @@ import os from pylibcudf.libcudf.io.json import \ json_recovery_mode_t as JSONRecoveryMode # no-cython-lint -from pylibcudf.libcudf.io.types import \ - compression_type as CompressionType # no-cython-lint +from pylibcudf.libcudf.io.types import ( + compression_type as CompressionType, # no-cython-lint + dictionary_policy as DictionaryPolicy, # no-cython-lint +) cdef class TableWithMetadata: From 45929e8ff48a68285c35f27d16758578ddbe5de0 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:17:37 -0700 Subject: [PATCH 3/3] standardize plc type imports in parquet --- python/cudf/cudf/_lib/orc.pyx | 43 ++++---- python/cudf/cudf/_lib/parquet.pyx | 132 +++++++++++------------- python/pylibcudf/pylibcudf/io/types.pyx | 2 + 3 files changed, 83 insertions(+), 94 deletions(-) diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index f88c48ce989..32a5e463916 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -15,7 +15,6 @@ try: except ImportError: import json -cimport pylibcudf.libcudf.io.types as cudf_io_types cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.orc cimport ( @@ -26,7 +25,6 @@ from pylibcudf.libcudf.io.orc cimport ( ) from pylibcudf.libcudf.io.types cimport ( column_in_metadata, - compression_type, sink_info, table_input_metadata, ) @@ -137,22 +135,23 @@ cpdef read_orc(object filepaths_or_buffers, return data, index -cdef compression_type _get_comp_type(object compression): +def _get_comp_type(object compression): if compression is None or compression is False: - return compression_type.NONE + return plc.io.types.CompressionType.NONE compression = str(compression).upper() if compression == "SNAPPY": - return compression_type.SNAPPY + return plc.io.types.CompressionType.SNAPPY elif compression == "ZLIB": - return compression_type.ZLIB + return plc.io.types.CompressionType.ZLIB elif compression == "ZSTD": - return compression_type.ZSTD + return plc.io.types.CompressionType.ZSTD elif compression == "LZ4": - return compression_type.LZ4 + return plc.io.types.CompressionType.LZ4 else: raise ValueError(f"Unsupported `compression` type {compression}") + cdef tuple _get_index_from_metadata( vector[map[string, string]] user_data, object names, @@ -210,7 +209,8 @@ cdef tuple _get_index_from_metadata( range_idx ) -cdef cudf_io_types.statistics_freq _get_orc_stat_freq(object statistics): + +def _get_orc_stat_freq(str statistics): """ Convert ORC statistics terms to CUDF convention: - ORC "STRIPE" == CUDF "ROWGROUP" @@ -218,11 +218,11 @@ cdef cudf_io_types.statistics_freq _get_orc_stat_freq(object statistics): """ statistics = str(statistics).upper() if statistics == "NONE": - return cudf_io_types.statistics_freq.STATISTICS_NONE + return plc.io.types.StatisticsFreq.STATISTICS_NONE elif statistics == "STRIPE": - return cudf_io_types.statistics_freq.STATISTICS_ROWGROUP + return plc.io.types.StatisticsFreq.STATISTICS_ROWGROUP elif statistics == "ROWGROUP": - return cudf_io_types.statistics_freq.STATISTICS_PAGE + return plc.io.types.StatisticsFreq.STATISTICS_PAGE else: raise ValueError(f"Unsupported `statistics_freq` type {statistics}") @@ -232,7 +232,7 @@ def write_orc( table, object path_or_buf, object compression="snappy", - object statistics="ROWGROUP", + str statistics="ROWGROUP", object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None, @@ -246,7 +246,6 @@ def write_orc( -------- cudf.read_orc """ - cdef compression_type compression_ = _get_comp_type(compression) cdef unique_ptr[data_sink] data_sink_c cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) cdef table_input_metadata tbl_meta @@ -289,7 +288,7 @@ def write_orc( sink_info_c, tv ).metadata(tbl_meta) .key_value_metadata(move(user_data)) - .compression(compression_) + .compression(_get_comp_type(compression)) .enable_statistics(_get_orc_stat_freq(statistics)) .build() ) @@ -330,8 +329,8 @@ cdef class ORCWriter: cdef unique_ptr[orc_chunked_writer] writer cdef sink_info sink cdef unique_ptr[data_sink] _data_sink - cdef cudf_io_types.statistics_freq stat_freq - cdef compression_type comp_type + cdef str statistics + cdef object compression cdef object index cdef table_input_metadata tbl_meta cdef object cols_as_map_type @@ -343,15 +342,15 @@ cdef class ORCWriter: object path, object index=None, object compression="snappy", - object statistics="ROWGROUP", + str statistics="ROWGROUP", object cols_as_map_type=None, object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None): self.sink = make_sink_info(path, self._data_sink) - self.stat_freq = _get_orc_stat_freq(statistics) - self.comp_type = _get_comp_type(compression) + self.statistics = statistics + self.compression = compression self.index = index self.cols_as_map_type = cols_as_map_type \ if cols_as_map_type is None else set(cols_as_map_type) @@ -429,8 +428,8 @@ cdef class ORCWriter: chunked_orc_writer_options.builder(self.sink) .metadata(self.tbl_meta) .key_value_metadata(move(user_data)) - .compression(self.comp_type) - .enable_statistics(self.stat_freq) + .compression(_get_comp_type(self.compression)) + .enable_statistics(_get_orc_stat_freq(self.statistics)) .build() ) if self.stripe_size_bytes is not None: diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index faf8bf6ceaa..1212637d330 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -31,10 +31,9 @@ from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move from libcpp.vector cimport vector -cimport pylibcudf.libcudf.io.data_sink as cudf_io_data_sink -cimport pylibcudf.libcudf.io.types as cudf_io_types from pylibcudf.expressions cimport Expression from pylibcudf.io.parquet cimport ChunkedParquetReader +from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, @@ -47,8 +46,14 @@ from pylibcudf.libcudf.io.parquet_metadata cimport ( read_parquet_metadata as parquet_metadata_reader, ) from pylibcudf.libcudf.io.types cimport ( + source_info, + sink_info, column_in_metadata, table_input_metadata, + partition_info, + statistics_freq, + compression_type, + dictionary_policy, ) from pylibcudf.libcudf.table.table_view cimport table_view from pylibcudf.libcudf.types cimport size_type @@ -377,7 +382,7 @@ cpdef read_parquet_metadata(filepaths_or_buffers): cudf.io.parquet.read_parquet cudf.io.parquet.to_parquet """ - cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers) + cdef source_info source = make_source_info(filepaths_or_buffers) args = move(source) @@ -466,8 +471,8 @@ def write_parquet( cdef vector[map[string, string]] user_data cdef table_view tv - cdef vector[unique_ptr[cudf_io_data_sink.data_sink]] _data_sinks - cdef cudf_io_types.sink_info sink = make_sinks_info( + cdef vector[unique_ptr[data_sink]] _data_sinks + cdef sink_info sink = make_sinks_info( filepaths_or_buffers, _data_sinks ) @@ -537,13 +542,13 @@ def write_parquet( else plc.io.types.DictionaryPolicy.NEVER ) - cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) - cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics) + comp_type = _get_comp_type(compression) + stat_freq = _get_stat_freq(statistics) cdef unique_ptr[vector[uint8_t]] out_metadata_c cdef vector[string] c_column_chunks_file_paths cdef bool _int96_timestamps = int96_timestamps - cdef vector[cudf_io_types.partition_info] partitions + cdef vector[partition_info] partitions # Perform write cdef parquet_writer_options args = move( @@ -563,7 +568,7 @@ def write_parquet( partitions.reserve(len(partitions_info)) for part in partitions_info: partitions.push_back( - cudf_io_types.partition_info(part[0], part[1]) + partition_info(part[0], part[1]) ) args.set_partitions(move(partitions)) if metadata_file_path is not None: @@ -646,17 +651,17 @@ cdef class ParquetWriter: cdef bool initialized cdef unique_ptr[cpp_parquet_chunked_writer] writer cdef table_input_metadata tbl_meta - cdef cudf_io_types.sink_info sink - cdef vector[unique_ptr[cudf_io_data_sink.data_sink]] _data_sink - cdef cudf_io_types.statistics_freq stat_freq - cdef cudf_io_types.compression_type comp_type + cdef sink_info sink + cdef vector[unique_ptr[data_sink]] _data_sink + cdef str statistics + cdef object compression cdef object index cdef size_t row_group_size_bytes cdef size_type row_group_size_rows cdef size_t max_page_size_bytes cdef size_type max_page_size_rows cdef size_t max_dictionary_size - cdef cudf_io_types.dictionary_policy dict_policy + cdef bool use_dictionary cdef bool write_arrow_schema def __cinit__(self, object filepath_or_buffer, object index=None, @@ -674,8 +679,8 @@ cdef class ParquetWriter: else [filepath_or_buffer] ) self.sink = make_sinks_info(filepaths_or_buffers, self._data_sink) - self.stat_freq = _get_stat_freq(statistics) - self.comp_type = _get_comp_type(compression) + self.statistics = statistics + self.compression = compression self.index = index self.initialized = False self.row_group_size_bytes = row_group_size_bytes @@ -683,11 +688,7 @@ cdef class ParquetWriter: self.max_page_size_bytes = max_page_size_bytes self.max_page_size_rows = max_page_size_rows self.max_dictionary_size = max_dictionary_size - self.dict_policy = ( - cudf_io_types.dictionary_policy.ADAPTIVE - if use_dictionary - else cudf_io_types.dictionary_policy.NEVER - ) + self.use_dictionary = use_dictionary self.write_arrow_schema = store_schema def write_table(self, table, object partitions_info=None): @@ -706,11 +707,11 @@ cdef class ParquetWriter: else: tv = table_view_from_table(table, ignore_index=True) - cdef vector[cudf_io_types.partition_info] partitions + cdef vector[partition_info] partitions if partitions_info is not None: for part in partitions_info: partitions.push_back( - cudf_io_types.partition_info(part[0], part[1]) + partition_info(part[0], part[1]) ) with nogil: @@ -795,13 +796,20 @@ cdef class ParquetWriter: user_data = vector[map[string, string]](num_partitions, tmp_user_data) cdef chunked_parquet_writer_options args + cdef compression_type comp_type = _get_comp_type(self.compression) + cdef statistics_freq stat_freq = _get_stat_freq(self.statistics) + cdef dictionary_policy dict_policy = ( + plc.io.types.DictionaryPolicy.ADAPTIVE + if self.use_dictionary + else plc.io.types.DictionaryPolicy.NEVER + ) with nogil: args = move( chunked_parquet_writer_options.builder(self.sink) .metadata(self.tbl_meta) .key_value_metadata(move(user_data)) - .compression(self.comp_type) - .stats_level(self.stat_freq) + .compression(comp_type) + .stats_level(stat_freq) .row_group_size_bytes(self.row_group_size_bytes) .row_group_size_rows(self.row_group_size_rows) .max_page_size_bytes(self.max_page_size_bytes) @@ -810,7 +818,7 @@ cdef class ParquetWriter: .write_arrow_schema(self.write_arrow_schema) .build() ) - args.set_dictionary_policy(self.dict_policy) + args.set_dictionary_policy(dict_policy) self.writer.reset(new cpp_parquet_chunked_writer(args)) self.initialized = True @@ -838,56 +846,28 @@ cpdef merge_filemetadata(object filemetadata_list): return np.asarray(out_metadata_py) -cdef cudf_io_types.statistics_freq _get_stat_freq(object statistics): - statistics = str(statistics).upper() - if statistics == "NONE": - return cudf_io_types.statistics_freq.STATISTICS_NONE - elif statistics == "ROWGROUP": - return cudf_io_types.statistics_freq.STATISTICS_ROWGROUP - elif statistics == "PAGE": - return cudf_io_types.statistics_freq.STATISTICS_PAGE - elif statistics == "COLUMN": - return cudf_io_types.statistics_freq.STATISTICS_COLUMN - else: +cdef statistics_freq _get_stat_freq(str statistics): + result = getattr( + plc.io.types.StatisticsFreq, + f"STATISTICS_{statistics.upper()}", + None + ) + if result is None: raise ValueError("Unsupported `statistics_freq` type") + return result -cdef cudf_io_types.compression_type _get_comp_type(object compression): +cdef compression_type _get_comp_type(object compression): if compression is None: - return cudf_io_types.compression_type.NONE - - compression = str(compression).upper() - if compression == "SNAPPY": - return cudf_io_types.compression_type.SNAPPY - elif compression == "ZSTD": - return cudf_io_types.compression_type.ZSTD - elif compression == "LZ4": - return cudf_io_types.compression_type.LZ4 - else: + return plc.io.types.CompressionType.NONE + result = getattr( + plc.io.types.CompressionType, + str(compression).upper(), + None + ) + if result is None: raise ValueError("Unsupported `compression` type") - - -cdef cudf_io_types.column_encoding _get_encoding_type(object encoding): - if encoding is None: - return cudf_io_types.column_encoding.USE_DEFAULT - - enc = str(encoding).upper() - if enc == "PLAIN": - return cudf_io_types.column_encoding.PLAIN - elif enc == "DICTIONARY": - return cudf_io_types.column_encoding.DICTIONARY - elif enc == "DELTA_BINARY_PACKED": - return cudf_io_types.column_encoding.DELTA_BINARY_PACKED - elif enc == "DELTA_LENGTH_BYTE_ARRAY": - return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY - elif enc == "DELTA_BYTE_ARRAY": - return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY - elif enc == "BYTE_STREAM_SPLIT": - return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT - elif enc == "USE_DEFAULT": - return cudf_io_types.column_encoding.USE_DEFAULT - else: - raise ValueError("Unsupported `column_encoding` type") + return result cdef _set_col_metadata( @@ -914,7 +894,15 @@ cdef _set_col_metadata( col_meta.set_skip_compression(True) if column_encoding is not None and full_path in column_encoding: - col_meta.set_encoding(_get_encoding_type(column_encoding[full_path])) + encoding = column_encoding[full_path] + if encoding is None: + c_encoding = plc.io.types.ColumnEncoding.USE_DEFAULT + else: + enc = str(encoding).upper() + c_encoding = getattr(plc.io.types.ColumnEncoding, enc, None) + if c_encoding is None: + raise ValueError("Unsupported `column_encoding` type") + col_meta.set_encoding(c_encoding) if column_type_length is not None and full_path in column_type_length: col_meta.set_output_as_binary(True) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 167ec2a056d..967d05e7057 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -25,7 +25,9 @@ from pylibcudf.libcudf.io.json import \ json_recovery_mode_t as JSONRecoveryMode # no-cython-lint from pylibcudf.libcudf.io.types import ( compression_type as CompressionType, # no-cython-lint + column_encoding as ColumnEncoding, # no-cython-lint dictionary_policy as DictionaryPolicy, # no-cython-lint + statistics_freq as StatisticsFreq, # no-cython-lint )