From a27feabf1a46fd3fcf388c7cbadb49831416f8ba Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 3 May 2024 01:31:58 -1000 Subject: [PATCH 1/6] Preserve column metadata during more DataFrame operations (#15519) Supersedes https://github.com/rapidsai/cudf/pull/15410/, adds a `ColumnAccessor._from_columns_like_self` that will preserve column attributes during DataFrame operations. This can wholly replace `_from_data_like_self` Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/15519 --- python/cudf/cudf/core/column_accessor.py | 33 ++++++++++++++++++++++++ python/cudf/cudf/core/dataframe.py | 16 +++++++----- python/cudf/cudf/core/frame.py | 33 ++++++++++++++---------- python/cudf/cudf/core/indexed_frame.py | 16 +++++++----- python/cudf/cudf/core/multiindex.py | 6 +++-- python/cudf/cudf/core/series.py | 4 ++- python/cudf/cudf/tests/test_dataframe.py | 20 ++++++++++++++ 7 files changed, 99 insertions(+), 29 deletions(-) diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index 33085bede78..fbce6e02330 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -3,6 +3,7 @@ from __future__ import annotations import itertools +import sys from collections import abc from functools import cached_property, reduce from typing import ( @@ -174,6 +175,38 @@ def __repr__(self) -> str: ) return f"{type_info}\n{column_info}" + def _from_columns_like_self( + self, columns: abc.Iterable[ColumnBase], verify: bool = True + ): + """ + Return a new ColumnAccessor with columns and the properties of self. + + Parameters + ---------- + columns : iterable of Columns + New columns for the ColumnAccessor. + verify : bool, optional + Whether to verify column length and type. + """ + if sys.version_info.major >= 3 and sys.version_info.minor >= 10: + data = zip(self.names, columns, strict=True) + else: + columns = list(columns) + if len(columns) != len(self.names): + raise ValueError( + f"The number of columns ({len(columns)}) must match " + f"the number of existing column labels ({len(self.names)})." + ) + data = zip(self.names, columns) + return type(self)( + data=dict(data), + multiindex=self.multiindex, + level_names=self.level_names, + rangeindex=self.rangeindex, + label_dtype=self.label_dtype, + verify=verify, + ) + @property def level_names(self) -> Tuple[Any, ...]: if self._level_names is None or len(self._level_names) == 0: diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 1e6ae861679..bf8201e4dc1 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -3036,8 +3036,11 @@ def where(self, cond, other=None, inplace=False): # First process the condition. if isinstance(cond, Series): - cond = self._from_data_like_self( - {name: cond._column for name in self._column_names}, + cond = self._from_data( + self._data._from_columns_like_self( + itertools.repeat(cond._column, len(self._column_names)), + verify=False, + ) ) elif hasattr(cond, "__cuda_array_interface__"): cond = DataFrame( @@ -3078,7 +3081,7 @@ def where(self, cond, other=None, inplace=False): should be equal to number of columns of self""" ) - out = {} + out = [] for (name, col), other_col in zip(self._data.items(), other_cols): col, other_col = _check_and_cast_columns_with_other( source_col=col, @@ -3091,16 +3094,17 @@ def where(self, cond, other=None, inplace=False): col, other_col, cond_col ) - out[name] = _make_categorical_like(result, self._data[name]) + out.append(_make_categorical_like(result, self._data[name])) else: out_mask = cudf._lib.null_mask.create_null_mask( len(col), state=cudf._lib.null_mask.MaskState.ALL_NULL, ) - out[name] = col.set_mask(out_mask) + out.append(col.set_mask(out_mask)) return self._mimic_inplace( - self._from_data_like_self(out), inplace=inplace + self._from_data_like_self(self._data._from_columns_like_self(out)), + inplace=inplace, ) @docutils.doc_apply( diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index cd42bf52ea1..017190ab5b4 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -1120,7 +1120,9 @@ def isna(self): array([False, False, True, True, False, False]) """ data_columns = (col.isnull() for col in self._columns) - return self._from_data_like_self(zip(self._column_names, data_columns)) + return self._from_data_like_self( + self._data._from_columns_like_self(data_columns) + ) # Alias for isna isnull = isna @@ -1199,7 +1201,9 @@ def notna(self): array([ True, True, False, False, True, True]) """ data_columns = (col.notnull() for col in self._columns) - return self._from_data_like_self(zip(self._column_names, data_columns)) + return self._from_data_like_self( + self._data._from_columns_like_self(data_columns) + ) # Alias for notna notnull = notna @@ -1506,7 +1510,9 @@ def _encode(self): @_cudf_nvtx_annotate def _unaryop(self, op): data_columns = (col.unary_operator(op) for col in self._columns) - return self._from_data_like_self(zip(self._column_names, data_columns)) + return self._from_data_like_self( + self._data._from_columns_like_self(data_columns) + ) @classmethod @_cudf_nvtx_annotate @@ -1638,12 +1644,14 @@ def _apply_cupy_ufunc_to_operands( def __neg__(self): """Negate for integral dtypes, logical NOT for bools.""" return self._from_data_like_self( - { - name: col.unary_operator("not") - if is_bool_dtype(col.dtype) - else -1 * col - for name, col in self._data.items() - } + self._data._from_columns_like_self( + ( + col.unary_operator("not") + if col.dtype.kind == "b" + else -1 * col + for col in self._data.columns + ) + ) ) @_cudf_nvtx_annotate @@ -1897,10 +1905,9 @@ def __copy__(self): def __invert__(self): """Bitwise invert (~) for integral dtypes, logical NOT for bools.""" return self._from_data_like_self( - { - name: _apply_inverse_column(col) - for name, col in self._data.items() - } + self._data._from_columns_like_self( + (_apply_inverse_column(col) for col in self._data.columns) + ) ) @_cudf_nvtx_annotate diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index bec97bd3290..62ee780ebbb 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -1903,13 +1903,15 @@ def nans_to_nulls(self): 1 3.14 2 """ - result_data = {} - for name, col in self._data.items(): - try: - result_data[name] = col.nans_to_nulls() - except AttributeError: - result_data[name] = col.copy() - return self._from_data_like_self(result_data) + result = ( + col.nans_to_nulls() + if isinstance(col, cudf.core.column.NumericalColumn) + else col.copy() + for col in self._data.columns + ) + return self._from_data_like_self( + self._data._from_columns_like_self(result) + ) def _copy_type_metadata( self, diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 019daacddba..1ab42df111f 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -2088,6 +2088,8 @@ def _split_columns_by_levels(self, levels): return data_columns, index_columns, data_names, index_names def repeat(self, repeats, axis=None): - return self._from_columns_like_self( - Frame._repeat([*self._columns], repeats, axis), self._column_names + return self._from_data( + self._data._from_columns_like_self( + super()._repeat([*self._columns], repeats, axis) + ) ) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 275dc664175..b6ed28f9093 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -3654,7 +3654,9 @@ def pct_change( def where(self, cond, other=None, inplace=False): result_col = super().where(cond, other, inplace) return self._mimic_inplace( - self._from_data_like_self({self.name: result_col}), + self._from_data_like_self( + self._data._from_columns_like_self([result_col]) + ), inplace=inplace, ) diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index e287603de07..f52076407b5 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -10986,3 +10986,23 @@ def test_squeeze(axis, data): result = df.squeeze(axis=axis) expected = df.to_pandas().squeeze(axis=axis) assert_eq(result, expected) + + +@pytest.mark.parametrize("column", [range(1), np.array([1], dtype=np.int8)]) +@pytest.mark.parametrize( + "operation", + [ + lambda df: df.where(df < 2, 2), + lambda df: df.nans_to_nulls(), + lambda df: df.isna(), + lambda df: df.notna(), + lambda df: abs(df), + lambda df: -df, + lambda df: ~df, + ], +) +def test_op_preserves_column_metadata(column, operation): + df = cudf.DataFrame([1], columns=cudf.Index(column)) + result = operation(df).columns + expected = pd.Index(column) + pd.testing.assert_index_equal(result, expected, exact=True) From c60860dfb3ac78a8439966e0fa5c7282b9988b15 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Fri, 3 May 2024 09:03:02 -0400 Subject: [PATCH 2/6] Fix make_offsets_child_column usage in cudf::strings::detail::shift (#15630) Fixes the `cudf::strings::detail::shift()` function to use the correct `make_offsets_child_column` function to support large strings. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/15630 --- cpp/src/strings/copying/shift.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/strings/copying/shift.cu b/cpp/src/strings/copying/shift.cu index 562ee6a7088..5bba4855390 100644 --- a/cpp/src/strings/copying/shift.cu +++ b/cpp/src/strings/copying/shift.cu @@ -19,8 +19,8 @@ #include #include #include -#include #include +#include #include #include @@ -104,8 +104,8 @@ std::unique_ptr shift(strings_column_view const& input, auto const d_input = column_device_view::create(input.parent(), stream); auto sizes_itr = cudf::detail::make_counting_transform_iterator( 0, output_sizes_fn{*d_input, d_fill_str, offset}); - auto [offsets_column, total_bytes] = - cudf::detail::make_offsets_child_column(sizes_itr, sizes_itr + input.size(), stream, mr); + auto [offsets_column, total_bytes] = cudf::strings::detail::make_offsets_child_column( + sizes_itr, sizes_itr + input.size(), stream, mr); auto offsets_view = offsets_column->view(); // compute the shift-offset for the output characters child column From 18f2e7a84a03342bf6305f63ae1f8164ffbccd99 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Fri, 3 May 2024 09:03:59 -0400 Subject: [PATCH 3/6] Large strings support in MD5 and SHA hashers (#15631) Updates the hash functions for md5 and sha to support creating large strings results. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/15631 --- cpp/src/hash/md5_hash.cu | 4 ++-- cpp/src/hash/sha_hash.cuh | 29 +++++++++++++++-------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/cpp/src/hash/md5_hash.cu b/cpp/src/hash/md5_hash.cu index 8f490ada8ff..0b559e8e86c 100644 --- a/cpp/src/hash/md5_hash.cu +++ b/cpp/src/hash/md5_hash.cu @@ -309,7 +309,7 @@ std::unique_ptr md5(table_view const& input, // Result column allocation and creation auto begin = thrust::make_constant_iterator(digest_size); auto [offsets_column, bytes] = - cudf::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr); + cudf::strings::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr); rmm::device_uvector chars(bytes, stream, mr); auto d_chars = chars.data(); @@ -322,7 +322,7 @@ std::unique_ptr md5(table_view const& input, thrust::make_counting_iterator(0), thrust::make_counting_iterator(input.num_rows()), [d_chars, device_input = *device_input] __device__(auto row_index) { - MD5Hasher hasher(d_chars + (row_index * digest_size)); + MD5Hasher hasher(d_chars + (static_cast(row_index) * digest_size)); for (auto const& col : device_input) { if (col.is_valid(row_index)) { if (col.type().id() == type_id::LIST) { diff --git a/cpp/src/hash/sha_hash.cuh b/cpp/src/hash/sha_hash.cuh index 005578cb2c2..6976241057e 100644 --- a/cpp/src/hash/sha_hash.cuh +++ b/cpp/src/hash/sha_hash.cuh @@ -518,7 +518,7 @@ std::unique_ptr sha_hash(table_view const& input, // Result column allocation and creation auto begin = thrust::make_constant_iterator(Hasher::digest_size); auto [offsets_column, bytes] = - cudf::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr); + cudf::strings::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr); auto chars = rmm::device_uvector(bytes, stream, mr); auto d_chars = chars.data(); @@ -526,19 +526,20 @@ std::unique_ptr sha_hash(table_view const& input, auto const device_input = table_device_view::create(input, stream); // Hash each row, hashing each element sequentially left to right - thrust::for_each(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.num_rows()), - [d_chars, device_input = *device_input] __device__(auto row_index) { - Hasher hasher(d_chars + (row_index * Hasher::digest_size)); - for (auto const& col : device_input) { - if (col.is_valid(row_index)) { - cudf::type_dispatcher( - col.type(), HasherDispatcher(&hasher, col), row_index); - } - } - hasher.finalize(); - }); + thrust::for_each( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.num_rows()), + [d_chars, device_input = *device_input] __device__(auto row_index) { + Hasher hasher(d_chars + (static_cast(row_index) * Hasher::digest_size)); + for (auto const& col : device_input) { + if (col.is_valid(row_index)) { + cudf::type_dispatcher( + col.type(), HasherDispatcher(&hasher, col), row_index); + } + } + hasher.finalize(); + }); return make_strings_column(input.num_rows(), std::move(offsets_column), chars.release(), 0, {}); } From 35d77afab14d4d5a5faec321bdb2d87112c07eb2 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Fri, 3 May 2024 09:11:27 -0400 Subject: [PATCH 4/6] Use experimental make_strings_children for strings convert (#15629) Updates strings convert functions to use the new experimental `make_strings_children` which supports building large strings. Reference https://github.com/rapidsai/cudf/issues/15579 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Paul Mattione (https://github.com/pmattione-nvidia) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/15629 --- cpp/src/strings/convert/convert_booleans.cu | 17 +++++++-------- cpp/src/strings/convert/convert_datetime.cu | 12 +++++------ cpp/src/strings/convert/convert_durations.cu | 21 +++++++++---------- .../strings/convert/convert_fixed_point.cu | 16 +++++++------- cpp/src/strings/convert/convert_floats.cu | 15 +++++++------ cpp/src/strings/convert/convert_hex.cu | 11 +++++----- cpp/src/strings/convert/convert_integers.cu | 15 +++++++------ cpp/src/strings/convert/convert_ipv4.cu | 11 +++++----- 8 files changed, 56 insertions(+), 62 deletions(-) diff --git a/cpp/src/strings/convert/convert_booleans.cu b/cpp/src/strings/convert/convert_booleans.cu index bf73800ad06..6b64006fa24 100644 --- a/cpp/src/strings/convert/convert_booleans.cu +++ b/cpp/src/strings/convert/convert_booleans.cu @@ -16,23 +16,19 @@ #include #include -#include #include #include #include #include -#include +#include #include #include #include -#include -#include #include #include #include -#include #include #include @@ -99,13 +95,14 @@ struct from_booleans_fn { column_device_view const d_column; string_view d_true; string_view d_false; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; __device__ void operator()(size_type idx) const { if (d_column.is_null(idx)) { - if (d_chars == nullptr) { d_offsets[idx] = 0; } + if (d_chars == nullptr) { d_sizes[idx] = 0; } return; } @@ -113,7 +110,7 @@ struct from_booleans_fn { auto const result = d_column.element(idx) ? d_true : d_false; memcpy(d_chars + d_offsets[idx], result.data(), result.size_bytes()); } else { - d_offsets[idx] = d_column.element(idx) ? d_true.size_bytes() : d_false.size_bytes(); + d_sizes[idx] = d_column.element(idx) ? d_true.size_bytes() : d_false.size_bytes(); } }; }; @@ -143,8 +140,8 @@ std::unique_ptr from_booleans(column_view const& booleans, // copy null mask rmm::device_buffer null_mask = cudf::detail::copy_bitmask(booleans, stream, mr); - auto [offsets, chars] = - make_strings_children(from_booleans_fn{d_column, d_true, d_false}, strings_count, stream, mr); + auto [offsets, chars] = experimental::make_strings_children( + from_booleans_fn{d_column, d_true, d_false}, strings_count, stream, mr); return make_strings_column(strings_count, std::move(offsets), diff --git a/cpp/src/strings/convert/convert_datetime.cu b/cpp/src/strings/convert/convert_datetime.cu index d6449fbb6c8..ddf68eae951 100644 --- a/cpp/src/strings/convert/convert_datetime.cu +++ b/cpp/src/strings/convert/convert_datetime.cu @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,7 +37,6 @@ #include #include -#include #include #include #include @@ -756,8 +755,9 @@ struct datetime_formatter_fn { column_device_view const d_timestamps; column_device_view const d_format_names; device_span const d_format_items; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; /** * @brief Specialized modulo expression that handles negative values. @@ -1087,14 +1087,14 @@ struct datetime_formatter_fn { __device__ void operator()(size_type idx) const { if (d_timestamps.is_null(idx)) { - if (!d_chars) { d_offsets[idx] = 0; } + if (!d_chars) { d_sizes[idx] = 0; } return; } auto const tstamp = d_timestamps.element(idx); if (d_chars) { timestamp_to_string(tstamp, d_chars + d_offsets[idx]); } else { - d_offsets[idx] = compute_output_size(tstamp); + d_sizes[idx] = compute_output_size(tstamp); } } }; @@ -1109,7 +1109,7 @@ struct dispatch_from_timestamps_fn { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const { - return make_strings_children( + return experimental::make_strings_children( datetime_formatter_fn{d_timestamps, d_format_names, d_format_items}, d_timestamps.size(), stream, diff --git a/cpp/src/strings/convert/convert_durations.cu b/cpp/src/strings/convert/convert_durations.cu index 77c750848cf..faf9a83f016 100644 --- a/cpp/src/strings/convert/convert_durations.cu +++ b/cpp/src/strings/convert/convert_durations.cu @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include @@ -26,10 +26,8 @@ #include #include -#include #include #include -#include #include #include @@ -192,8 +190,9 @@ struct from_durations_fn { column_device_view d_durations; format_item const* d_format_items; size_type items_count; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; __device__ int8_t format_length(char format_char, duration_component const* const timeparts) const { @@ -378,14 +377,14 @@ struct from_durations_fn { __device__ void operator()(size_type idx) { if (d_durations.is_null(idx)) { - if (d_chars == nullptr) { d_offsets[idx] = 0; } + if (d_chars == nullptr) { d_sizes[idx] = 0; } return; } if (d_chars != nullptr) { set_chars(idx); } else { - d_offsets[idx] = string_size(d_durations.template element(idx)); + d_sizes[idx] = string_size(d_durations.template element(idx)); } } }; @@ -415,11 +414,11 @@ struct dispatch_from_durations_fn { // copy null mask rmm::device_buffer null_mask = cudf::detail::copy_bitmask(durations, stream, mr); - auto [offsets, chars] = - make_strings_children(from_durations_fn{d_column, d_format_items, compiler.items_count()}, - strings_count, - stream, - mr); + auto [offsets, chars] = experimental::make_strings_children( + from_durations_fn{d_column, d_format_items, compiler.items_count()}, + strings_count, + stream, + mr); return make_strings_column(strings_count, std::move(offsets), diff --git a/cpp/src/strings/convert/convert_fixed_point.cu b/cpp/src/strings/convert/convert_fixed_point.cu index 446baa8dea9..34f81b8b407 100644 --- a/cpp/src/strings/convert/convert_fixed_point.cu +++ b/cpp/src/strings/convert/convert_fixed_point.cu @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include @@ -37,10 +37,7 @@ #include #include #include -#include -#include #include -#include #include namespace cudf { @@ -198,8 +195,9 @@ namespace { template struct from_fixed_point_fn { column_device_view d_decimals; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; /** * @brief Converts a decimal element into a string. @@ -219,13 +217,13 @@ struct from_fixed_point_fn { __device__ void operator()(size_type idx) { if (d_decimals.is_null(idx)) { - if (d_chars == nullptr) { d_offsets[idx] = 0; } + if (d_chars == nullptr) { d_sizes[idx] = 0; } return; } if (d_chars != nullptr) { fixed_point_element_to_string(idx); } else { - d_offsets[idx] = + d_sizes[idx] = fixed_point_string_size(d_decimals.element(idx), d_decimals.type().scale()); } } @@ -244,8 +242,8 @@ struct dispatch_from_fixed_point_fn { auto const d_column = column_device_view::create(input, stream); - auto [offsets, chars] = - make_strings_children(from_fixed_point_fn{*d_column}, input.size(), stream, mr); + auto [offsets, chars] = experimental::make_strings_children( + from_fixed_point_fn{*d_column}, input.size(), stream, mr); return make_strings_column(input.size(), std::move(offsets), diff --git a/cpp/src/strings/convert/convert_floats.cu b/cpp/src/strings/convert/convert_floats.cu index c6061f7d8e6..0ed80b976fd 100644 --- a/cpp/src/strings/convert/convert_floats.cu +++ b/cpp/src/strings/convert/convert_floats.cu @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -32,9 +32,7 @@ #include #include -#include #include -#include #include #include @@ -356,8 +354,9 @@ struct ftos_converter { template struct from_floats_fn { column_device_view d_floats; - size_type* d_offsets; + size_type* d_sizes; char* d_chars; + cudf::detail::input_offsetalator d_offsets; __device__ size_type compute_output_size(FloatType value) { @@ -375,13 +374,13 @@ struct from_floats_fn { __device__ void operator()(size_type idx) { if (d_floats.is_null(idx)) { - if (d_chars == nullptr) { d_offsets[idx] = 0; } + if (d_chars == nullptr) { d_sizes[idx] = 0; } return; } if (d_chars != nullptr) { float_to_string(idx); } else { - d_offsets[idx] = compute_output_size(d_floats.element(idx)); + d_sizes[idx] = compute_output_size(d_floats.element(idx)); } } }; @@ -404,8 +403,8 @@ struct dispatch_from_floats_fn { // copy the null mask rmm::device_buffer null_mask = cudf::detail::copy_bitmask(floats, stream, mr); - auto [offsets, chars] = - make_strings_children(from_floats_fn{d_column}, strings_count, stream, mr); + auto [offsets, chars] = experimental::make_strings_children( + from_floats_fn{d_column}, strings_count, stream, mr); return make_strings_column(strings_count, std::move(offsets), diff --git a/cpp/src/strings/convert/convert_hex.cu b/cpp/src/strings/convert/convert_hex.cu index 95af378fc3f..1f9fc3858f8 100644 --- a/cpp/src/strings/convert/convert_hex.cu +++ b/cpp/src/strings/convert/convert_hex.cu @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -123,8 +123,9 @@ struct dispatch_hex_to_integers_fn { template struct integer_to_hex_fn { column_device_view const d_column; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; __device__ void byte_to_hex(uint8_t byte, char* hex) { @@ -141,7 +142,7 @@ struct integer_to_hex_fn { __device__ void operator()(size_type idx) { if (d_column.is_null(idx)) { - if (!d_chars) { d_offsets[idx] = 0; } + if (!d_chars) { d_sizes[idx] = 0; } return; } @@ -167,7 +168,7 @@ struct integer_to_hex_fn { --byte_index; } } else { - d_offsets[idx] = static_cast(bytes) * 2; // 2 hex characters per byte + d_sizes[idx] = static_cast(bytes) * 2; // 2 hex characters per byte } } }; @@ -181,7 +182,7 @@ struct dispatch_integers_to_hex_fn { { auto const d_column = column_device_view::create(input, stream); - auto [offsets_column, chars] = cudf::strings::detail::make_strings_children( + auto [offsets_column, chars] = experimental::make_strings_children( integer_to_hex_fn{*d_column}, input.size(), stream, mr); return make_strings_column(input.size(), diff --git a/cpp/src/strings/convert/convert_integers.cu b/cpp/src/strings/convert/convert_integers.cu index f3e639817a6..918369ead4d 100644 --- a/cpp/src/strings/convert/convert_integers.cu +++ b/cpp/src/strings/convert/convert_integers.cu @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include @@ -34,9 +34,7 @@ #include #include -#include #include -#include #include #include #include @@ -314,8 +312,9 @@ namespace { template struct from_integers_fn { column_device_view d_integers; - size_type* d_offsets; + size_type* d_sizes; char* d_chars; + cudf::detail::input_offsetalator d_offsets; /** * @brief Converts an integer element into a string. @@ -334,13 +333,13 @@ struct from_integers_fn { __device__ void operator()(size_type idx) { if (d_integers.is_null(idx)) { - if (d_chars == nullptr) { d_offsets[idx] = 0; } + if (d_chars == nullptr) { d_sizes[idx] = 0; } return; } if (d_chars != nullptr) { integer_element_to_string(idx); } else { - d_offsets[idx] = count_digits(d_integers.element(idx)); + d_sizes[idx] = count_digits(d_integers.element(idx)); } } }; @@ -363,8 +362,8 @@ struct dispatch_from_integers_fn { // copy the null mask rmm::device_buffer null_mask = cudf::detail::copy_bitmask(integers, stream, mr); - auto [offsets, chars] = - make_strings_children(from_integers_fn{d_column}, strings_count, stream, mr); + auto [offsets, chars] = experimental::make_strings_children( + from_integers_fn{d_column}, strings_count, stream, mr); return make_strings_column(strings_count, std::move(offsets), diff --git a/cpp/src/strings/convert/convert_ipv4.cu b/cpp/src/strings/convert/convert_ipv4.cu index 3d259f0ab82..33f6c553001 100644 --- a/cpp/src/strings/convert/convert_ipv4.cu +++ b/cpp/src/strings/convert/convert_ipv4.cu @@ -20,7 +20,7 @@ #include #include #include -#include +#include #include #include #include @@ -124,13 +124,14 @@ namespace { */ struct integers_to_ipv4_fn { column_device_view const d_column; - size_type* d_offsets{}; + size_type* d_sizes{}; char* d_chars{}; + cudf::detail::input_offsetalator d_offsets; __device__ void operator()(size_type idx) { if (d_column.is_null(idx)) { - if (!d_chars) d_offsets[idx] = 0; + if (!d_chars) { d_sizes[idx] = 0; } return; } @@ -151,7 +152,7 @@ struct integers_to_ipv4_fn { shift_bits -= 8; } - if (!d_chars) { d_offsets[idx] = bytes; } + if (!d_chars) { d_sizes[idx] = bytes; } } }; @@ -167,7 +168,7 @@ std::unique_ptr integers_to_ipv4(column_view const& integers, CUDF_EXPECTS(integers.type().id() == type_id::INT64, "Input column must be type_id::INT64 type"); auto d_column = column_device_view::create(integers, stream); - auto [offsets_column, chars] = cudf::strings::detail::make_strings_children( + auto [offsets_column, chars] = experimental::make_strings_children( integers_to_ipv4_fn{*d_column}, integers.size(), stream, mr); return make_strings_column(integers.size(), From b8503bc000f19b983b19292b16f0048254f2b3a9 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 3 May 2024 07:44:08 -0700 Subject: [PATCH 5/6] Add support for large string columns to Parquet reader and writer (#15632) Part of #13733. Adds support for reading and writing cuDF string columns where the string data exceeds 2GB. This is accomplished by skipping the final offsets calculation in the string decoding kernel when the 2GB threshold is exceeded, and instead uses `cudf::strings::detail::make_offsets_child_column()`. This could lead to increased overhead with many columns (see #13024), so this will need some more benchmarking. But if there are many columns that exceed the 2GB limit, it's likely reads will have to be chunked to stay within the memory budget. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Muhammad Haseeb (https://github.com/mhaseeb123) - David Wendt (https://github.com/davidwendt) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/15632 --- cpp/CMakeLists.txt | 1 + cpp/src/io/parquet/page_delta_decode.cu | 34 +++++---- cpp/src/io/parquet/page_string_decode.cu | 33 +++++---- cpp/src/io/parquet/parquet_gpu.hpp | 8 +- cpp/src/io/parquet/reader_impl.cpp | 29 ++++++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 8 +- cpp/src/io/parquet/writer_impl.cu | 6 +- cpp/src/io/utilities/column_buffer.cpp | 10 --- cpp/src/io/utilities/column_buffer_strings.cu | 53 ++++++++++++++ cpp/tests/CMakeLists.txt | 2 +- cpp/tests/large_strings/parquet_tests.cpp | 73 +++++++++++++++++++ 11 files changed, 200 insertions(+), 57 deletions(-) create mode 100644 cpp/src/io/utilities/column_buffer_strings.cu create mode 100644 cpp/tests/large_strings/parquet_tests.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 232a4f40d8e..f11f3fc3c9a 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -430,6 +430,7 @@ add_library( src/io/text/multibyte_split.cu src/io/utilities/arrow_io_source.cpp src/io/utilities/column_buffer.cpp + src/io/utilities/column_buffer_strings.cu src/io/utilities/config_utils.cpp src/io/utilities/data_casting.cu src/io/utilities/data_sink.cpp diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index da1bbaebd73..0c9d4e77f0c 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -579,15 +579,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } @@ -738,15 +741,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } // finally, copy the string data into place auto const dst = nesting_info_base[leaf_level_index].string_out; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 5ba813f518f..cf1dc58b06a 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -955,7 +955,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) { using cudf::detail::warp_size; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(4) size_type last_offset; + __shared__ size_t last_offset; __shared__ __align__(16) page_state_buffers_s state_buffers; @@ -1054,9 +1054,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) ? gpuGetStringData(s, sb, src_pos + skipped_leaf_values + i) : cuda::std::pair{nullptr, 0}; - __shared__ cub::WarpScan::TempStorage temp_storage; - size_type offset, warp_total; - cub::WarpScan(temp_storage).ExclusiveSum(len, offset, warp_total); + __shared__ cub::WarpScan::TempStorage temp_storage; + size_t offset, warp_total; + cub::WarpScan(temp_storage).ExclusiveSum(len, offset, warp_total); offset += last_offset; // choose a character parallel string copy when the average string is longer than a warp @@ -1075,10 +1075,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } __syncwarp(); } else if (use_char_ll) { - __shared__ __align__(8) uint8_t const* pointers[warp_size]; - __shared__ __align__(4) size_type offsets[warp_size]; - __shared__ __align__(4) int dsts[warp_size]; - __shared__ __align__(4) int lengths[warp_size]; + __shared__ uint8_t const* pointers[warp_size]; + __shared__ size_t offsets[warp_size]; + __shared__ int dsts[warp_size]; + __shared__ int lengths[warp_size]; offsets[me] = offset; pointers[me] = reinterpret_cast(ptr); @@ -1119,15 +1119,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) __syncthreads(); } - // now turn array of lengths into offsets - int value_count = nesting_info_base[leaf_level_index].value_count; + // Now turn the array of lengths into offsets, but skip if this is a large string column. In the + // latter case, offsets will be computed during string column creation. + if (not s->col.is_large_string_col) { + int value_count = nesting_info_base[leaf_level_index].value_count; - // if no repetition we haven't calculated start/end bounds and instead just skipped - // values until we reach first_row. account for that here. - if (!has_repetition) { value_count -= s->first_row; } + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } - auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); - block_excl_sum(offptr, value_count, s->page.str_offset); + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + } if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index c06fb63acda..3b18175dccd 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -326,8 +326,8 @@ struct PageInfo { int32_t skipped_leaf_values; // for string columns only, the size of all the chars in the string for // this page. only valid/computed during the base preprocess pass + size_t str_offset; // offset into string data for this page int32_t str_bytes; - int32_t str_offset; // offset into string data for this page bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes // nesting information (input/output) for each page. this array contains @@ -420,7 +420,8 @@ struct ColumnChunkDesc { src_col_schema(src_col_schema_), h_chunk_info(chunk_info_), list_bytes_per_row_est(list_bytes_per_row_est_), - is_strings_to_cat(strings_to_categorical_) + is_strings_to_cat(strings_to_categorical_), + is_large_string_col(false) { } @@ -454,7 +455,8 @@ struct ColumnChunkDesc { float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row - bool is_strings_to_cat{}; // convert strings to hashes + bool is_strings_to_cat{}; // convert strings to hashes + bool is_large_string_col{}; // `true` if string data uses 64-bit offsets }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b7172f5ba67..0602b5ec007 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -99,11 +100,21 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row col_string_sizes = calculate_page_string_offsets(); // check for overflow - if (std::any_of(col_string_sizes.cbegin(), col_string_sizes.cend(), [](std::size_t sz) { - return sz > std::numeric_limits::max(); - })) { + auto const threshold = static_cast(strings::detail::get_offset64_threshold()); + auto const has_large_strings = std::any_of(col_string_sizes.cbegin(), + col_string_sizes.cend(), + [=](std::size_t sz) { return sz > threshold; }); + if (has_large_strings and not strings::detail::is_large_strings_enabled()) { CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); } + + // mark any chunks that are large string columns + if (has_large_strings) { + for (auto& chunk : pass.chunks) { + auto const idx = chunk.src_col_index; + if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; } + } + } } // In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector @@ -348,11 +359,13 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row } else if (out_buf.type.id() == type_id::STRING) { // need to cap off the string offsets column auto const sz = static_cast(col_string_sizes[idx]); - CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast(out_buf.data()) + out_buf.size, - &sz, - sizeof(size_type), - cudaMemcpyDefault, - _stream.value())); + if (sz <= strings::detail::get_offset64_threshold()) { + CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast(out_buf.data()) + out_buf.size, + &sz, + sizeof(size_type), + cudaMemcpyDefault, + _stream.value())); + } } } } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 4b7a64ac6ab..8c9b3c1a1e6 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1169,10 +1169,10 @@ struct page_to_string_size { struct page_offset_output_iter { PageInfo* p; - using value_type = size_type; - using difference_type = size_type; - using pointer = size_type*; - using reference = size_type&; + using value_type = size_t; + using difference_type = size_t; + using pointer = size_t*; + using reference = size_t&; using iterator_category = thrust::output_device_iterator_tag; __host__ __device__ page_offset_output_iter operator+(int i) { return {p + i}; } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 286c7b361a9..24aa630a05f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -278,8 +279,9 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream) return size_of(column.type()) * column.size(); } else if (column.type().id() == type_id::STRING) { auto const scol = strings_column_view(column); - return cudf::detail::get_value(scol.offsets(), column.size(), stream) - - cudf::detail::get_value(scol.offsets(), 0, stream); + return cudf::strings::detail::get_offset_value( + scol.offsets(), column.size() + column.offset(), stream) - + cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream); } else if (column.type().id() == type_id::STRUCT) { auto const scol = structs_column_view(column); size_t ret = 0; diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5dc2291abdc..db84778edc6 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -69,16 +69,6 @@ void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes _string_data = rmm::device_buffer(num_bytes, stream, _mr); } -std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_column_impl( - rmm::cuda_stream_view stream) -{ - // no need for copies, just transfer ownership of the data_buffers to the columns - auto offsets_col = std::make_unique( - data_type{type_to_id()}, size + 1, std::move(_data), rmm::device_buffer{}, 0); - return make_strings_column( - size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); -} - namespace { /** diff --git a/cpp/src/io/utilities/column_buffer_strings.cu b/cpp/src/io/utilities/column_buffer_strings.cu new file mode 100644 index 00000000000..4bc303a34a5 --- /dev/null +++ b/cpp/src/io/utilities/column_buffer_strings.cu @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "column_buffer.hpp" + +#include +#include +#include + +namespace cudf::io::detail { + +std::unique_ptr cudf::io::detail::inline_column_buffer::make_string_column_impl( + rmm::cuda_stream_view stream) +{ + // if the size of _string_data is over the threshold for 64bit size_type, _data will contain + // sizes rather than offsets. need special handling for that case. + auto const threshold = static_cast(strings::detail::get_offset64_threshold()); + if (_string_data.size() > threshold) { + if (not strings::detail::is_large_strings_enabled()) { + CUDF_FAIL("String column exceeds the column size limit", std::overflow_error); + } + // create new offsets + auto const offsets_ptr = static_cast(_data.data()); + auto offsets_col = make_numeric_column( + data_type{type_id::INT64}, size + 1, mask_state::UNALLOCATED, stream, _mr); + auto d_offsets64 = offsets_col->mutable_view().template data(); + // it's safe to call with size + 1 because _data is also sized that large + cudf::detail::sizes_to_offsets(offsets_ptr, offsets_ptr + size + 1, d_offsets64, stream); + return make_strings_column( + size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); + } else { + // no need for copies, just transfer ownership of the data_buffers to the columns + auto offsets_col = std::make_unique( + data_type{type_to_id()}, size + 1, std::move(_data), rmm::device_buffer{}, 0); + return make_strings_column( + size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask)); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index fa633dfa67b..bbb919aa2d1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -572,7 +572,7 @@ ConfigureTest( # * large strings test ---------------------------------------------------------------------------- ConfigureTest( LARGE_STRINGS_TEST large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp - large_strings/concatenate_tests.cpp + large_strings/concatenate_tests.cpp large_strings/parquet_tests.cpp GPUS 1 PERCENT 100 ) diff --git a/cpp/tests/large_strings/parquet_tests.cpp b/cpp/tests/large_strings/parquet_tests.cpp new file mode 100644 index 00000000000..007c08ce0fb --- /dev/null +++ b/cpp/tests/large_strings/parquet_tests.cpp @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace { + +cudf::test::TempDirTestEnvironment* const g_temp_env = + static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +} // namespace + +struct ParquetStringsTest : public cudf::test::StringsLargeTest {}; + +TEST_F(ParquetStringsTest, ReadLargeStrings) +{ + // need to create a string column larger than `threshold` + auto const col0 = this->long_column(); + auto const column_size = cudf::strings_column_view(col0).chars_size(cudf::get_default_stream()); + auto const threshold = column_size - 1; + auto const expected = cudf::table_view{{col0, col0, col0}}; + + auto expected_metadata = cudf::io::table_input_metadata{expected}; + expected_metadata.column_metadata[1].set_encoding( + cudf::io::column_encoding::DELTA_LENGTH_BYTE_ARRAY); + expected_metadata.column_metadata[2].set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY); + + // set smaller threshold to reduce file size and execution time + setenv("LIBCUDF_LARGE_STRINGS_THRESHOLD", std::to_string(threshold).c_str(), 1); + + auto const filepath = g_temp_env->get_temp_filepath("ReadLargeStrings.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::ZSTD) + .stats_level(cudf::io::STATISTICS_NONE) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options default_in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto const result = cudf::io::read_parquet(default_in_opts); + auto const result_view = result.tbl->view(); + for (auto cv : result_view) { + auto const offsets = cudf::strings_column_view(cv).offsets(); + EXPECT_EQ(offsets.type(), cudf::data_type{cudf::type_id::INT64}); + } + CUDF_TEST_EXPECT_TABLES_EQUAL(result_view, expected); + + // go back to normal threshold + unsetenv("LIBCUDF_LARGE_STRINGS_THRESHOLD"); +} From ce6902f064e9c028aa97c4a7ec5f2eed1c0c9a90 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Fri, 3 May 2024 08:21:25 -1000 Subject: [PATCH 6/6] Move timezone conversion logic to `DatetimeColumn` (#15545) Moves methods/logic in `python/cudf/cudf/core/_internals/timezones.py` to the newly created `DatetimeColumn.tz_localize` and `DatetimeColumn.tz_convert`. Additionally adds typing and improves an error message when doing `tz_convert(None)` on a tz-naive Series/Index to raise a `TypeError` (like pandas) instead of an `AttributeError` Authors: - Matthew Roeschke (https://github.com/mroeschke) - Lawrence Mitchell (https://github.com/wence-) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/15545 --- python/cudf/cudf/core/_internals/timezones.py | 201 +++--------------- python/cudf/cudf/core/column/datetime.py | 159 +++++++++++++- python/cudf/cudf/core/index.py | 23 +- python/cudf/cudf/core/series.py | 27 +-- python/cudf/cudf/core/tools/datetimes.py | 5 +- .../cudf/tests/series/test_datetimelike.py | 5 + 6 files changed, 206 insertions(+), 214 deletions(-) diff --git a/python/cudf/cudf/core/_internals/timezones.py b/python/cudf/cudf/core/_internals/timezones.py index 4888cdd9ac9..f04cae719c2 100644 --- a/python/cudf/cudf/core/_internals/timezones.py +++ b/python/cudf/cudf/core/_internals/timezones.py @@ -3,23 +3,18 @@ import os import zoneinfo from functools import lru_cache -from typing import Tuple, cast +from typing import Literal, Tuple import numpy as np -import pandas as pd -import cudf -from cudf._lib.labeling import label_bins -from cudf._lib.search import search_sorted from cudf._lib.timezone import make_timezone_transition_table -from cudf.core.column.column import as_column, build_column -from cudf.core.column.datetime import DatetimeColumn, DatetimeTZColumn -from cudf.core.dataframe import DataFrame -from cudf.utils.dtypes import _get_base_dtype +from cudf.core.column.column import as_column +from cudf.core.column.datetime import DatetimeColumn +from cudf.core.column.timedelta import TimeDeltaColumn @lru_cache(maxsize=20) -def get_tz_data(zone_name): +def get_tz_data(zone_name: str) -> Tuple[DatetimeColumn, TimeDeltaColumn]: """ Return timezone data (transition times and UTC offsets) for the given IANA time zone. @@ -31,8 +26,8 @@ def get_tz_data(zone_name): Returns ------- - DataFrame with two columns containing the transition times - ("transition_times") and corresponding UTC offsets ("offsets"). + Tuple with two columns containing the transition times + and corresponding UTC offsets. """ try: # like zoneinfo, we first look in TZPATH @@ -43,19 +38,23 @@ def get_tz_data(zone_name): return tz_table -def _find_and_read_tzfile_tzpath(zone_name): +def _find_and_read_tzfile_tzpath( + zone_name: str, +) -> Tuple[DatetimeColumn, TimeDeltaColumn]: for search_path in zoneinfo.TZPATH: if os.path.isfile(os.path.join(search_path, zone_name)): - return _read_tzfile_as_frame(search_path, zone_name) + return _read_tzfile_as_columns(search_path, zone_name) raise zoneinfo.ZoneInfoNotFoundError(zone_name) -def _find_and_read_tzfile_tzdata(zone_name): +def _find_and_read_tzfile_tzdata( + zone_name: str, +) -> Tuple[DatetimeColumn, TimeDeltaColumn]: import importlib.resources package_base = "tzdata.zoneinfo" try: - return _read_tzfile_as_frame( + return _read_tzfile_as_columns( str(importlib.resources.files(package_base)), zone_name ) # TODO: make it so that the call to libcudf raises a @@ -77,7 +76,9 @@ def _find_and_read_tzfile_tzdata(zone_name): raise zoneinfo.ZoneInfoNotFoundError(zone_name) -def _read_tzfile_as_frame(tzdir, zone_name): +def _read_tzfile_as_columns( + tzdir, zone_name: str +) -> Tuple[DatetimeColumn, TimeDeltaColumn]: transition_times_and_offsets = make_timezone_transition_table( tzdir, zone_name ) @@ -85,91 +86,13 @@ def _read_tzfile_as_frame(tzdir, zone_name): if not transition_times_and_offsets: # this happens for UTC-like zones min_date = np.int64(np.iinfo("int64").min + 1).astype("M8[s]") - transition_times_and_offsets = ( - as_column([min_date]), - as_column([np.timedelta64(0, "s")]), - ) - - return DataFrame._from_data( - dict( - zip(["transition_times", "offsets"], transition_times_and_offsets) - ) - ) - + return (as_column([min_date]), as_column([np.timedelta64(0, "s")])) + return tuple(transition_times_and_offsets) # type: ignore[return-value] -def _find_ambiguous_and_nonexistent( - data: DatetimeColumn, zone_name: str -) -> Tuple: - """ - Recognize ambiguous and nonexistent timestamps for the given timezone. - - Returns a tuple of columns, both of "bool" dtype and of the same - size as `data`, that respectively indicate ambiguous and - nonexistent timestamps in `data` with the value `True`. - - Ambiguous and/or nonexistent timestamps are only possible if any - transitions occur in the time zone database for the given timezone. - If no transitions occur, the tuple `(False, False)` is returned. - """ - tz_data_for_zone = get_tz_data(zone_name) - transition_times = tz_data_for_zone["transition_times"] - offsets = tz_data_for_zone["offsets"].astype( - f"timedelta64[{data.time_unit}]" - ) - if len(offsets) == 1: # no transitions - return False, False - - transition_times, offsets, old_offsets = ( - transition_times[1:]._column, - offsets[1:]._column, - offsets[:-1]._column, - ) - - # Assume we have two clocks at the moment of transition: - # - Clock 1 is turned forward or backwards correctly - # - Clock 2 makes no changes - clock_1 = transition_times + offsets - clock_2 = transition_times + old_offsets - - # At the start of an ambiguous time period, Clock 1 (which has - # been turned back) reads less than Clock 2: - cond = clock_1 < clock_2 - ambiguous_begin = clock_1.apply_boolean_mask(cond) - - # The end of an ambiguous time period is what Clock 2 reads at - # the moment of transition: - ambiguous_end = clock_2.apply_boolean_mask(cond) - ambiguous = label_bins( - data, - left_edges=ambiguous_begin, - left_inclusive=True, - right_edges=ambiguous_end, - right_inclusive=False, - ).notnull() - - # At the start of a non-existent time period, Clock 2 reads less - # than Clock 1 (which has been turned forward): - cond = clock_1 > clock_2 - nonexistent_begin = clock_2.apply_boolean_mask(cond) - - # The end of the non-existent time period is what Clock 1 reads - # at the moment of transition: - nonexistent_end = clock_1.apply_boolean_mask(cond) - nonexistent = label_bins( - data, - left_edges=nonexistent_begin, - left_inclusive=True, - right_edges=nonexistent_end, - right_inclusive=False, - ).notnull() - - return ambiguous, nonexistent - - -def localize( - data: DatetimeColumn, zone_name: str, ambiguous, nonexistent -) -> DatetimeTZColumn: +def check_ambiguous_and_nonexistent( + ambiguous: Literal["NaT"], nonexistent: Literal["NaT"] +) -> Tuple[Literal["NaT"], Literal["NaT"]]: if ambiguous != "NaT": raise NotImplementedError( "Only ambiguous='NaT' is currently supported" @@ -178,80 +101,4 @@ def localize( raise NotImplementedError( "Only nonexistent='NaT' is currently supported" ) - if isinstance(data, DatetimeTZColumn): - raise ValueError( - "Already localized. " - "Use `tz_convert` to convert between time zones." - ) - dtype = pd.DatetimeTZDtype(data.time_unit, zone_name) - ambiguous, nonexistent = _find_ambiguous_and_nonexistent(data, zone_name) - localized = cast( - DatetimeColumn, - data._scatter_by_column( - data.isnull() | (ambiguous | nonexistent), - cudf.Scalar(cudf.NaT, dtype=data.dtype), - ), - ) - gmt_data = local_to_utc(localized, zone_name) - return cast( - DatetimeTZColumn, - build_column( - data=gmt_data.base_data, - dtype=dtype, - mask=localized.base_mask, - size=gmt_data.size, - offset=gmt_data.offset, - ), - ) - - -def delocalize(data: DatetimeColumn) -> DatetimeColumn: - """ - Convert a timezone-aware datetime column to a timezone-naive one. - If the column is already timezone-naive, return it as is. - """ - if isinstance(data, DatetimeTZColumn): - return data._local_time - # already timezone-naive: - return data - - -def convert(data: DatetimeTZColumn, zone_name: str) -> DatetimeTZColumn: - if not isinstance(data, DatetimeTZColumn): - raise TypeError( - "Cannot convert from timezone-naive timestamps to " - "timezone-aware timestamps. For that, " - "use `tz_localize`." - ) - if zone_name == str(data.dtype.tz): - return data.copy() - utc_time = data._utc_time - out = cast( - DatetimeTZColumn, - build_column( - data=utc_time.base_data, - dtype=pd.DatetimeTZDtype(data.time_unit, zone_name), - mask=utc_time.base_mask, - size=utc_time.size, - offset=utc_time.offset, - ), - ) - return out - - -def utc_to_local(data: DatetimeColumn, zone_name: str) -> DatetimeColumn: - tz_data_for_zone = get_tz_data(zone_name) - transition_times, offsets = tz_data_for_zone._columns - transition_times = transition_times.astype(_get_base_dtype(data.dtype)) - indices = search_sorted([transition_times], [data], "right") - 1 - offsets_from_utc = offsets.take(indices, nullify=True) - return data + offsets_from_utc - - -def local_to_utc(data: DatetimeColumn, zone_name: str) -> DatetimeColumn: - tz_data_for_zone = get_tz_data(zone_name) - transition_times, offsets = tz_data_for_zone._columns - transition_times_local = (transition_times + offsets).astype(data.dtype) - indices = search_sorted([transition_times_local], [data], "right") - 1 - offsets_to_utc = offsets.take(indices, nullify=True) - return data - offsets_to_utc + return ambiguous, nonexistent diff --git a/python/cudf/cudf/core/column/datetime.py b/python/cudf/cudf/core/column/datetime.py index 981ef738458..9fe4e5da96d 100644 --- a/python/cudf/cudf/core/column/datetime.py +++ b/python/cudf/cudf/core/column/datetime.py @@ -7,7 +7,7 @@ import locale import re from locale import nl_langinfo -from typing import Any, Optional, Sequence, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Sequence, Tuple, cast import numpy as np import pandas as pd @@ -16,6 +16,8 @@ import cudf from cudf import _lib as libcudf +from cudf._lib.labeling import label_bins +from cudf._lib.search import search_sorted from cudf._typing import ( ColumnBinaryOperand, DatetimeLikeScalar, @@ -31,6 +33,9 @@ from cudf.utils.dtypes import _get_base_dtype from cudf.utils.utils import _all_bools_with_nulls +if TYPE_CHECKING: + from cudf.core.column.numerical import NumericalColumn + if PANDAS_GE_220: _guess_datetime_format = pd.tseries.api.guess_datetime_format else: @@ -665,6 +670,121 @@ def _with_type_metadata(self, dtype): ) return self + def _find_ambiguous_and_nonexistent( + self, zone_name: str + ) -> Tuple[NumericalColumn, NumericalColumn] | Tuple[bool, bool]: + """ + Recognize ambiguous and nonexistent timestamps for the given timezone. + + Returns a tuple of columns, both of "bool" dtype and of the same + size as `self`, that respectively indicate ambiguous and + nonexistent timestamps in `self` with the value `True`. + + Ambiguous and/or nonexistent timestamps are only possible if any + transitions occur in the time zone database for the given timezone. + If no transitions occur, the tuple `(False, False)` is returned. + """ + from cudf.core._internals.timezones import get_tz_data + + transition_times, offsets = get_tz_data(zone_name) + offsets = offsets.astype(f"timedelta64[{self.time_unit}]") # type: ignore[assignment] + + if len(offsets) == 1: # no transitions + return False, False + + transition_times, offsets, old_offsets = ( + transition_times.slice(1, len(transition_times)), + offsets.slice(1, len(offsets)), + offsets.slice(0, len(offsets) - 1), + ) + + # Assume we have two clocks at the moment of transition: + # - Clock 1 is turned forward or backwards correctly + # - Clock 2 makes no changes + clock_1 = transition_times + offsets + clock_2 = transition_times + old_offsets + + # At the start of an ambiguous time period, Clock 1 (which has + # been turned back) reads less than Clock 2: + cond = clock_1 < clock_2 + ambiguous_begin = clock_1.apply_boolean_mask(cond) + + # The end of an ambiguous time period is what Clock 2 reads at + # the moment of transition: + ambiguous_end = clock_2.apply_boolean_mask(cond) + ambiguous = label_bins( + self, + left_edges=ambiguous_begin, + left_inclusive=True, + right_edges=ambiguous_end, + right_inclusive=False, + ).notnull() + + # At the start of a non-existent time period, Clock 2 reads less + # than Clock 1 (which has been turned forward): + cond = clock_1 > clock_2 + nonexistent_begin = clock_2.apply_boolean_mask(cond) + + # The end of the non-existent time period is what Clock 1 reads + # at the moment of transition: + nonexistent_end = clock_1.apply_boolean_mask(cond) + nonexistent = label_bins( + self, + left_edges=nonexistent_begin, + left_inclusive=True, + right_edges=nonexistent_end, + right_inclusive=False, + ).notnull() + + return ambiguous, nonexistent + + def tz_localize( + self, + tz: str | None, + ambiguous: Literal["NaT"] = "NaT", + nonexistent: Literal["NaT"] = "NaT", + ): + from cudf.core._internals.timezones import ( + check_ambiguous_and_nonexistent, + get_tz_data, + ) + + if tz is None: + return self.copy() + ambiguous, nonexistent = check_ambiguous_and_nonexistent( + ambiguous, nonexistent + ) + dtype = pd.DatetimeTZDtype(self.time_unit, tz) + ambiguous_col, nonexistent_col = self._find_ambiguous_and_nonexistent( + tz + ) + localized = self._scatter_by_column( + self.isnull() | (ambiguous_col | nonexistent_col), + cudf.Scalar(cudf.NaT, dtype=self.dtype), + ) + + transition_times, offsets = get_tz_data(tz) + transition_times_local = (transition_times + offsets).astype( + localized.dtype + ) + indices = ( + search_sorted([transition_times_local], [localized], "right") - 1 + ) + offsets_to_utc = offsets.take(indices, nullify=True) + gmt_data = localized - offsets_to_utc + return DatetimeTZColumn( + data=gmt_data.base_data, + dtype=dtype, + mask=localized.base_mask, + size=gmt_data.size, + offset=gmt_data.offset, + ) + + def tz_convert(self, tz: str | None): + raise TypeError( + "Cannot convert tz-naive timestamps, use tz_localize to localize" + ) + class DatetimeTZColumn(DatetimeColumn): def __init__( @@ -731,9 +851,13 @@ def _utc_time(self): @property def _local_time(self): """Return the local time as naive timestamps.""" - from cudf.core._internals.timezones import utc_to_local + from cudf.core._internals.timezones import get_tz_data - return utc_to_local(self, str(self.dtype.tz)) + transition_times, offsets = get_tz_data(str(self.dtype.tz)) + transition_times = transition_times.astype(_get_base_dtype(self.dtype)) + indices = search_sorted([transition_times], [self], "right") - 1 + offsets_from_utc = offsets.take(indices, nullify=True) + return self + offsets_from_utc def as_string_column( self, dtype: Dtype, format: str | None = None @@ -756,3 +880,32 @@ def __repr__(self): f"{arr.to_string()}\n" f"dtype: {self.dtype}" ) + + def tz_localize(self, tz: str | None, ambiguous="NaT", nonexistent="NaT"): + from cudf.core._internals.timezones import ( + check_ambiguous_and_nonexistent, + ) + + if tz is None: + return self._local_time + ambiguous, nonexistent = check_ambiguous_and_nonexistent( + ambiguous, nonexistent + ) + raise ValueError( + "Already localized. " + "Use `tz_convert` to convert between time zones." + ) + + def tz_convert(self, tz: str | None): + if tz is None: + return self._utc_time + elif tz == str(self.dtype.tz): + return self.copy() + utc_time = self._utc_time + return type(self)( + data=utc_time.base_data, + dtype=pd.DatetimeTZDtype(self.time_unit, tz), + mask=utc_time.base_mask, + size=utc_time.size, + offset=utc_time.offset, + ) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index f55fa4c05b5..583e5d74b56 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -2258,7 +2258,12 @@ def round(self, freq): return self.__class__._from_data({self.name: out_column}) - def tz_localize(self, tz, ambiguous="NaT", nonexistent="NaT"): + def tz_localize( + self, + tz: str | None, + ambiguous: Literal["NaT"] = "NaT", + nonexistent: Literal["NaT"] = "NaT", + ): """ Localize timezone-naive data to timezone-aware data. @@ -2300,17 +2305,12 @@ def tz_localize(self, tz, ambiguous="NaT", nonexistent="NaT"): ambiguous or nonexistent timestamps are converted to 'NaT'. """ # noqa: E501 - from cudf.core._internals.timezones import delocalize, localize - - if tz is None: - result_col = delocalize(self._column) - else: - result_col = localize(self._column, tz, ambiguous, nonexistent) + result_col = self._column.tz_localize(tz, ambiguous, nonexistent) return DatetimeIndex._from_data( {self.name: result_col}, freq=self._freq ) - def tz_convert(self, tz): + def tz_convert(self, tz: str | None): """ Convert tz-aware datetimes from one time zone to another. @@ -2342,12 +2342,7 @@ def tz_convert(self, tz): '2018-03-03 14:00:00+00:00'], dtype='datetime64[ns, Europe/London]') """ # noqa: E501 - from cudf.core._internals.timezones import convert - - if tz is None: - result_col = self._column._utc_time - else: - result_col = convert(self._column, tz) + result_col = self._column.tz_convert(tz) return DatetimeIndex._from_data({self.name: result_col}) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index b6ed28f9093..c3d232aaa7c 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -4755,22 +4755,22 @@ def strftime(self, date_format, *args, **kwargs): ) @copy_docstring(DatetimeIndex.tz_localize) - def tz_localize(self, tz, ambiguous="NaT", nonexistent="NaT"): - from cudf.core._internals.timezones import delocalize, localize - - if tz is None: - result_col = delocalize(self.series._column) - else: - result_col = localize( - self.series._column, tz, ambiguous, nonexistent - ) + def tz_localize( + self, + tz: str | None, + ambiguous: Literal["NaT"] = "NaT", + nonexistent: Literal["NaT"] = "NaT", + ): + result_col = self.series._column.tz_localize( + tz, ambiguous, nonexistent + ) return Series._from_data( data={self.series.name: result_col}, index=self.series._index, ) @copy_docstring(DatetimeIndex.tz_convert) - def tz_convert(self, tz): + def tz_convert(self, tz: str | None): """ Parameters ---------- @@ -4780,12 +4780,7 @@ def tz_convert(self, tz): A `tz` of None will convert to UTC and remove the timezone information. """ - from cudf.core._internals.timezones import convert - - if tz is None: - result_col = self.series._column._utc_time - else: - result_col = convert(self.series._column, tz) + result_col = self.series._column.tz_convert(tz) return Series._from_data( {self.series.name: result_col}, index=self.series._index ) diff --git a/python/cudf/cudf/core/tools/datetimes.py b/python/cudf/cudf/core/tools/datetimes.py index 907f3b586d1..7f6ce1100ea 100644 --- a/python/cudf/cudf/core/tools/datetimes.py +++ b/python/cudf/cudf/core/tools/datetimes.py @@ -317,9 +317,6 @@ def _process_col( format: Optional[str], utc: bool, ): - # Causes circular import - from cudf.core._internals.timezones import localize - if col.dtype.kind == "f": if unit not in (None, "ns"): factor = cudf.Scalar( @@ -396,7 +393,7 @@ def _process_col( f"dtype {col.dtype} cannot be converted to {_unit_dtype_map[unit]}" ) if utc and not isinstance(col.dtype, pd.DatetimeTZDtype): - return localize(col, "UTC", ambiguous="NaT", nonexistent="NaT") + return col.tz_localize("UTC") return col diff --git a/python/cudf/cudf/tests/series/test_datetimelike.py b/python/cudf/cudf/tests/series/test_datetimelike.py index 6ee339ee3ea..7ef55761b2b 100644 --- a/python/cudf/cudf/tests/series/test_datetimelike.py +++ b/python/cudf/cudf/tests/series/test_datetimelike.py @@ -218,3 +218,8 @@ def test_contains_tz_aware(item, expected): dti = cudf.date_range("2020", periods=2, freq="D").tz_localize("UTC") result = item in dti assert result == expected + + +def test_tz_convert_naive_typeerror(): + with pytest.raises(TypeError): + cudf.date_range("2020", periods=2, freq="D").tz_convert(None)