diff --git a/.gitattributes b/.gitattributes index 0201d7cb5e4..fbfe7434d50 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,2 +1,4 @@ python/cudf/cudf/_version.py export-subst -CHANGELOG.md merge=union +python/cudf_kafka/cudf_kafka/_version.py export-subst +python/custreamz/custreamz/_version.py export-subst +python/dask_cudf/dask_cudf/_version.py export-subst diff --git a/.gitignore b/.gitignore index df266677373..29df683e9ec 100644 --- a/.gitignore +++ b/.gitignore @@ -24,16 +24,16 @@ cudf.egg-info/ python/build python/*/build python/cudf/cudf-coverage.xml -python/cudf/*/_lib/**/*\.cpp +python/cudf/*/_lib/**/*.cpp python/cudf/*/_lib/**/*.h python/cudf/*/_lib/.nfs* -python/cudf/*/_cuda/*\.cpp +python/cudf/*/_cuda/*.cpp python/cudf/*.ipynb python/cudf/.ipynb_checkpoints python/*/record.txt -python/cudf_kafka/*/_lib/**/*\.cpp +python/cudf_kafka/*/_lib/**/*.cpp python/cudf_kafka/*/_lib/**/*.h -python/custreamz/*/_lib/**/*\.cpp +python/custreamz/*/_lib/**/*.cpp python/custreamz/*/_lib/**/*.h .Python env/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4f838ba3f45..a448b33a76d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,10 +26,15 @@ repos: files: python/.*\.(py|pyx|pxd)$ types: [file] - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.782' + rev: 'v0.971' hooks: - id: mypy - args: ["--config-file=setup.cfg", "python/cudf/cudf", "python/dask_cudf/dask_cudf", "python/custreamz/custreamz", "python/cudf_kafka/cudf_kafka"] + additional_dependencies: [types-cachetools] + args: ["--config-file=setup.cfg", + "python/cudf/cudf", + "python/custreamz/custreamz", + "python/cudf_kafka/cudf_kafka", + "python/dask_cudf/dask_cudf"] pass_filenames: false - repo: https://github.com/PyCQA/pydocstyle rev: 6.1.1 @@ -88,6 +93,18 @@ repos: language: system pass_filenames: false verbose: true + - id: headers-recipe-check + name: headers-recipe-check + entry: ./ci/checks/headers_test.sh + files: | + (?x)^( + ^cpp/include/| + ^conda/.*/meta.yaml + ) + types_or: [file] + language: system + pass_filenames: false + verbose: false default_language_version: python: python3 diff --git a/ci/checks/headers_test.sh b/ci/checks/headers_test.sh index ebfc4b2965e..502bdca0fa7 100755 --- a/ci/checks/headers_test.sh +++ b/ci/checks/headers_test.sh @@ -16,12 +16,9 @@ for DIRNAME in ${DIRNAMES[@]}; do LIB_RETVAL=$? if [ "$LIB_RETVAL" != "0" ]; then - echo -e "\n\n>>>> FAILED: lib${LIBNAME} header existence conda/recipes/lib${LIBNAME}/meta.yaml check; begin output\n\n" + echo -e ">>>> FAILED: lib${LIBNAME} has different headers in include/${DIRNAME}/ and conda/recipes/lib${LIBNAME}/meta.yaml. The diff is shown below:" echo -e "$HEADER_DIFF" - echo -e "\n\n>>>> FAILED: lib${LIBNAME} header existence conda/recipes/lib${LIBNAME}/meta.yaml check; end output\n\n" RETVAL=1 - else - echo -e "\n\n>>>> PASSED: lib${LIBNAME} header existence conda/recipes/lib${LIBNAME}/meta.yaml check\n\n" fi done diff --git a/ci/checks/style.sh b/ci/checks/style.sh index 3c632e03219..27f34dc335e 100755 --- a/ci/checks/style.sh +++ b/ci/checks/style.sh @@ -49,13 +49,8 @@ else echo -e "\n\n>>>> PASSED: clang format check\n\n" fi -# Run header meta.yml check and get results/return code -HEADER_META=`ci/checks/headers_test.sh` -HEADER_META_RETVAL=$? -echo -e "$HEADER_META" - RETVALS=( - $CR_RETVAL $PRE_COMMIT_RETVAL $CLANG_FORMAT_RETVAL $HEADER_META_RETVAL + $CR_RETVAL $PRE_COMMIT_RETVAL $CLANG_FORMAT_RETVAL ) IFS=$'\n' RETVAL=`echo "${RETVALS[*]}" | sort -nr | head -n1` diff --git a/ci/gpu/build.sh b/ci/gpu/build.sh index 08a3b70fe42..316dbcbaa1d 100755 --- a/ci/gpu/build.sh +++ b/ci/gpu/build.sh @@ -152,6 +152,13 @@ if [[ -z "$PROJECT_FLASH" || "$PROJECT_FLASH" == "0" ]]; then echo "Running GoogleTest $test_name" ${gt} --gtest_output=xml:"$WORKSPACE/test-results/" done + + # Test libcudf (csv, orc, and parquet) with `LIBCUDF_CUFILE_POLICY=KVIKIO` + for test_name in "CSV_TEST" "ORC_TEST" "PARQUET_TEST"; do + gt="$WORKSPACE/cpp/build/gtests/$test_name" + echo "Running GoogleTest $test_name (LIBCUDF_CUFILE_POLICY=KVIKIO)" + LIBCUDF_CUFILE_POLICY=KVIKIO ${gt} --gtest_output=xml:"$WORKSPACE/test-results/" + done fi else #Project Flash @@ -182,10 +189,18 @@ else gpuci_logger "GoogleTests" # Run libcudf and libcudf_kafka gtests from libcudf-tests package for gt in "$CONDA_PREFIX/bin/gtests/libcudf"*/* ; do + test_name=$(basename ${gt}) echo "Running GoogleTest $test_name" ${gt} --gtest_output=xml:"$WORKSPACE/test-results/" done + # Test libcudf (csv, orc, and parquet) with `LIBCUDF_CUFILE_POLICY=KVIKIO` + for test_name in "CSV_TEST" "ORC_TEST" "PARQUET_TEST"; do + gt="$CONDA_PREFIX/bin/gtests/libcudf/$test_name" + echo "Running GoogleTest $test_name (LIBCUDF_CUFILE_POLICY=KVIKIO)" + LIBCUDF_CUFILE_POLICY=KVIKIO ${gt} --gtest_output=xml:"$WORKSPACE/test-results/" + done + export LIB_BUILD_DIR="$WORKSPACE/ci/artifacts/cudf/cpu/libcudf_work/cpp/build" # Copy libcudf build time results echo "Checking for build time log $LIB_BUILD_DIR/ninja_log.xml" diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index a697194edba..fb47af9939e 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -36,14 +36,15 @@ dependencies: - nbsphinx - numpydoc - ipython - - pandoc=<2.0.0 + - pandoc<=2.0.0 - cudatoolkit=11.5 - cuda-python >=11.5,<11.7.1 - pip - flake8=3.8.3 - black=22.3.0 - isort=5.10.1 - - mypy=0.782 + - mypy=0.971 + - types-cachetools - doxygen=1.8.20 - pydocstyle=6.1.1 - typing_extensions diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 09a60836fef..69394a34624 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -749,6 +749,11 @@ if(CUDF_BUILD_BENCHMARKS) add_subdirectory(benchmarks) endif() +# build pretty-printer load script +if(Thrust_SOURCE_DIR AND rmm_SOURCE_DIR) + configure_file(scripts/load-pretty-printers.in load-pretty-printers @ONLY) +endif() + # ################################################################################################## # * install targets ------------------------------------------------------------------------------- rapids_cmake_install_lib_dir(lib_dir) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 6b3ecbfac1c..fb46d1b583e 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -165,7 +165,7 @@ ConfigureNVBench(SEARCH_NVBENCH search/contains.cpp) # ################################################################################################## # * sort benchmark -------------------------------------------------------------------------------- ConfigureBench(SORT_BENCH sort/rank.cpp sort/sort.cpp sort/sort_strings.cpp) -ConfigureNVBench(SORT_NVBENCH sort/sort_structs.cpp) +ConfigureNVBench(SORT_NVBENCH sort/sort_lists.cpp sort/sort_structs.cpp) # ################################################################################################## # * quantiles benchmark diff --git a/cpp/benchmarks/sort/sort_lists.cpp b/cpp/benchmarks/sort/sort_lists.cpp new file mode 100644 index 00000000000..dac865de479 --- /dev/null +++ b/cpp/benchmarks/sort/sort_lists.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include + +void nvbench_sort_lists(nvbench::state& state) +{ + cudf::rmm_pool_raii pool_raii; + + const size_t size_bytes(state.get_int64("size_bytes")); + const cudf::size_type depth{static_cast(state.get_int64("depth"))}; + auto const null_frequency{state.get_float64("null_frequency")}; + + data_profile table_profile; + table_profile.set_distribution_params(cudf::type_id::LIST, distribution_id::UNIFORM, 0, 5); + table_profile.set_list_depth(depth); + table_profile.set_null_probability(null_frequency); + auto const table = + create_random_table({cudf::type_id::LIST}, table_size_bytes{size_bytes}, table_profile); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + rmm::cuda_stream_view stream_view{launch.get_stream()}; + cudf::detail::sorted_order(*table, {}, {}, stream_view, rmm::mr::get_current_device_resource()); + }); +} + +NVBENCH_BENCH(nvbench_sort_lists) + .set_name("sort_list") + .add_int64_power_of_two_axis("size_bytes", {10, 18, 24, 28}) + .add_int64_axis("depth", {1, 4}) + .add_float64_axis("null_frequency", {0, 0.2}); diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 5d95bf9812f..ff5b9f5c457 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -57,6 +57,10 @@ class parquet_reader_options { // List of individual row groups to read (ignored if empty) std::vector> _row_groups; + // Number of rows to skip from the start + size_type _skip_rows = 0; + // Number of rows to read; -1 is all + size_type _num_rows = -1; // Whether to store string data as categorical type bool _convert_strings_to_categories = false; @@ -127,6 +131,20 @@ class parquet_reader_options { return _reader_column_schema; } + /** + * @brief Returns number of rows to skip from the start. + * + * @return Number of rows to skip from the start + */ + [[nodiscard]] size_type get_skip_rows() const { return _skip_rows; } + + /** + * @brief Returns number of rows to read. + * + * @return Number of rows to read + */ + [[nodiscard]] size_type get_num_rows() const { return _num_rows; } + /** * @brief Returns names of column to be read, if set. * @@ -162,6 +180,10 @@ class parquet_reader_options { */ void set_row_groups(std::vector> row_groups) { + if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) { + CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows"); + } + _row_groups = std::move(row_groups); } @@ -190,6 +212,34 @@ class parquet_reader_options { _reader_column_schema = std::move(val); } + /** + * @brief Sets number of rows to skip. + * + * @param val Number of rows to skip from start + */ + void set_skip_rows(size_type val) + { + if ((val != 0) and (!_row_groups.empty())) { + CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups"); + } + + _skip_rows = val; + } + + /** + * @brief Sets number of rows to read. + * + * @param val Number of rows to read after skip + */ + void set_num_rows(size_type val) + { + if ((val != -1) and (!_row_groups.empty())) { + CUDF_FAIL("num_rows can't be set along with a non-empty row_groups"); + } + + _num_rows = val; + } + /** * @brief Sets timestamp_type used to cast timestamp columns. * @@ -279,6 +329,30 @@ class parquet_reader_options_builder { return *this; } + /** + * @brief Sets number of rows to skip. + * + * @param val Number of rows to skip from start + * @return this for chaining + */ + parquet_reader_options_builder& skip_rows(size_type val) + { + options.set_skip_rows(val); + return *this; + } + + /** + * @brief Sets number of rows to read. + * + * @param val Number of rows to read after skip + * @return this for chaining + */ + parquet_reader_options_builder& num_rows(size_type val) + { + options.set_num_rows(val); + return *this; + } + /** * @brief timestamp_type used to cast timestamp columns. * diff --git a/cpp/include/cudf/lists/detail/dremel.hpp b/cpp/include/cudf/lists/detail/dremel.hpp index 4ddad4177be..4e3aeec2499 100644 --- a/cpp/include/cudf/lists/detail/dremel.hpp +++ b/cpp/include/cudf/lists/detail/dremel.hpp @@ -28,15 +28,11 @@ namespace cudf::detail { * @see the `dremel_data` struct for more info. */ struct dremel_device_view { - // TODO: These elements are default initializable to support default - // initialization of the object. This is currently exploited to create views - // that will never actually be used. We should consider whether this - // represents a serious issue that should be worked around more robustly. - size_type const* offsets{}; - uint8_t const* rep_levels{}; - uint8_t const* def_levels{}; - size_type const leaf_data_size{}; - uint8_t const max_def_level{}; + size_type const* offsets; + uint8_t const* rep_levels; + uint8_t const* def_levels; + size_type const leaf_data_size; + uint8_t const max_def_level; }; /** diff --git a/cpp/include/cudf/strings/detail/strip.cuh b/cpp/include/cudf/strings/detail/strip.cuh new file mode 100644 index 00000000000..533e76121b5 --- /dev/null +++ b/cpp/include/cudf/strings/detail/strip.cuh @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace cudf { +namespace strings { +namespace detail { + +/** + * @brief Strips a specified character from the either or both ends of a string + * + * @param d_str Input string to strip + * @param d_to_strip String containing the character to strip; + * only the first character is used + * @param side Which ends of the input string to strip from + * @return New string excluding the stripped ends + */ +__device__ cudf::string_view strip(cudf::string_view const d_str, + cudf::string_view const d_to_strip, + side_type side = side_type::BOTH) +{ + auto is_strip_character = [d_to_strip](char_utf8 chr) -> bool { + if (d_to_strip.empty()) return chr <= ' '; // whitespace check + for (auto c : d_to_strip) { + if (c == chr) return true; + } + return false; + }; + + auto const left_offset = [&] { + if (side != side_type::LEFT && side != side_type::BOTH) return 0; + for (auto itr = d_str.begin(); itr < d_str.end(); ++itr) { + if (!is_strip_character(*itr)) return itr.byte_offset(); + } + return d_str.size_bytes(); + }(); + + auto const right_offset = [&] { + if (side != side_type::RIGHT && side != side_type::BOTH) return d_str.size_bytes(); + for (auto itr = d_str.end(); itr > d_str.begin(); --itr) { + if (!is_strip_character(*(itr - 1))) return itr.byte_offset(); + } + return 0; + }(); + + auto const bytes = (right_offset > left_offset) ? right_offset - left_offset : 0; + return cudf::string_view{d_str.data() + left_offset, bytes}; +} + +} // namespace detail +} // namespace strings +} // namespace cudf diff --git a/cpp/include/cudf/table/experimental/row_operators.cuh b/cpp/include/cudf/table/experimental/row_operators.cuh index aad882fcc9a..af7091fc00c 100644 --- a/cpp/include/cudf/table/experimental/row_operators.cuh +++ b/cpp/include/cudf/table/experimental/row_operators.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -210,6 +211,25 @@ struct sorting_physical_element_comparator { } }; +using optional_dremel_view = thrust::optional; + +// The has_nested_columns template parameter of the device_row_comparator is +// necessary to help the compiler optimize our code. Without it, the list and +// struct view specializations are present in the code paths used for primitive +// types, and the compiler fails to inline this nearly as well resulting in a +// significant performance drop. As a result, there is some minor tension in +// the current design between the presence of this parameter and the way that +// the Dremel data is passed around, first as a +// std::optional> in the +// preprocessed_table/device_row_comparator (which is always valid when +// has_nested_columns and is otherwise invalid) that is then unpacked to a +// thrust::optional at the element_comparator level (which +// is always valid for a list column and otherwise invalid). We cannot use an +// additional template parameter for the element_comparator on a per-column +// basis because we cannot conditionally define dremel_device_view member +// variables without jumping through extra hoops with inheritance, so the +// thrust::optional member must be an optional rather than +// a raw dremel_device_view. /** * @brief Computes the lexicographic comparison between 2 rows. * @@ -230,7 +250,8 @@ struct sorting_physical_element_comparator { * rather than logical elements, defaults to `NaN` aware relational comparator that evaluates `NaN` * as greater than all other values. */ -template class device_row_comparator { friend class self_comparator; ///< Allow self_comparator to access private members @@ -256,12 +277,16 @@ class device_row_comparator { device_row_comparator(Nullate check_nulls, table_device_view lhs, table_device_view rhs, + device_span l_dremel_device_views, + device_span r_dremel_device_views, std::optional> depth = std::nullopt, std::optional> column_order = std::nullopt, std::optional> null_precedence = std::nullopt, PhysicalElementComparator comparator = {}) noexcept : _lhs{lhs}, _rhs{rhs}, + _l_dremel(l_dremel_device_views), + _r_dremel(r_dremel_device_views), _check_nulls{check_nulls}, _depth{depth}, _column_order{column_order}, @@ -292,14 +317,18 @@ class device_row_comparator { __device__ element_comparator(Nullate check_nulls, column_device_view lhs, column_device_view rhs, - null_order null_precedence = null_order::BEFORE, - int depth = 0, - PhysicalElementComparator comparator = {}) + null_order null_precedence = null_order::BEFORE, + int depth = 0, + PhysicalElementComparator comparator = {}, + optional_dremel_view l_dremel_device_view = {}, + optional_dremel_view r_dremel_device_view = {}) : _lhs{lhs}, _rhs{rhs}, _check_nulls{check_nulls}, _null_precedence{null_precedence}, _depth{depth}, + _l_dremel_device_view{l_dremel_device_view}, + _r_dremel_device_view{r_dremel_device_view}, _comparator{comparator} { } @@ -333,14 +362,15 @@ class device_row_comparator { template () and - not std::is_same_v)> + (not has_nested_columns or not cudf::is_nested()))> __device__ cuda::std::pair operator()(size_type const, size_type const) const noexcept { CUDF_UNREACHABLE("Attempted to compare elements of uncomparable types."); } - template )> + template )> __device__ cuda::std::pair operator()( size_type const lhs_element_index, size_type const rhs_element_index) const noexcept { @@ -373,12 +403,103 @@ class device_row_comparator { rhs_element_index); } + template )> + __device__ cuda::std::pair operator()(size_type lhs_element_index, + size_type rhs_element_index) + { + // These are all the values from the Dremel encoding. + auto const l_max_def_level = _l_dremel_device_view->max_def_level; + auto const l_def_levels = _l_dremel_device_view->def_levels; + auto const r_def_levels = _r_dremel_device_view->def_levels; + auto const l_rep_levels = _l_dremel_device_view->rep_levels; + auto const r_rep_levels = _r_dremel_device_view->rep_levels; + + // Traverse the nested list hierarchy to get a column device view + // pointing to the underlying child data. + column_device_view lcol = _lhs.slice(lhs_element_index, 1); + column_device_view rcol = _rhs.slice(rhs_element_index, 1); + while (lcol.type().id() == type_id::LIST) { + lcol = detail::lists_column_device_view(lcol).get_sliced_child(); + rcol = detail::lists_column_device_view(rcol).get_sliced_child(); + } + + // These start and end values indicate the start and end points of all + // the elements of the lists in the current list element + // (`[lhs|rhs]_element_index`) that we are comparing. + auto const l_offsets = _l_dremel_device_view->offsets; + auto const r_offsets = _r_dremel_device_view->offsets; + auto l_start = l_offsets[lhs_element_index]; + auto l_end = l_offsets[lhs_element_index + 1]; + auto r_start = r_offsets[rhs_element_index]; + auto r_end = r_offsets[rhs_element_index + 1]; + + // This comparator will be used to compare leaf (non-nested) data types. + auto comparator = + element_comparator{_check_nulls, lcol, rcol, _null_precedence, _depth, _comparator}; + + // Loop over each element in the encoding. Note that this includes nulls + // and empty lists, so not every index corresponds to an actual element + // in the child column. The element_index is used to keep track of the current + // child element that we're actually comparing. + weak_ordering state{weak_ordering::EQUIVALENT}; + for (int l_dremel_index = l_start, r_dremel_index = r_start, element_index = 0; + l_dremel_index < l_end and r_dremel_index < r_end; + ++l_dremel_index, ++r_dremel_index) { + // First early exit: the definition levels do not match. + if (l_def_levels[l_dremel_index] != r_def_levels[r_dremel_index]) { + state = (l_def_levels[l_dremel_index] < r_def_levels[r_dremel_index]) + ? weak_ordering::LESS + : weak_ordering::GREATER; + return cuda::std::pair(state, _depth); + } + + // Second early exit: the repetition levels do not match. + if (l_rep_levels[l_dremel_index] != r_rep_levels[r_dremel_index]) { + state = (l_rep_levels[l_dremel_index] < r_rep_levels[r_dremel_index]) + ? weak_ordering::LESS + : weak_ordering::GREATER; + return cuda::std::pair(state, _depth); + } + + // Third early exit: This case has two branches. + // 1) If we are at the maximum definition level, then we actually have + // an underlying element to compare, not just an empty list or a + // null. Therefore, we access the element_index element of each list + // and compare the values. + // 2) If we are one level below the maximum definition level and the + // column is nullable, the current element must be a null in the + // leaf data. In this case we ignore the null and skip to the next + // element. + if (l_def_levels[l_dremel_index] == l_max_def_level) { + int last_null_depth = _depth; + cuda::std::tie(state, last_null_depth) = cudf::type_dispatcher( + lcol.type(), comparator, element_index, element_index); + if (state != weak_ordering::EQUIVALENT) { return cuda::std::pair(state, _depth); } + ++element_index; + } else if (lcol.nullable() and l_def_levels[l_dremel_index] == l_max_def_level - 1) { + ++element_index; + } + } + + // If we have reached this stage, we know that definition levels, + // repetition levels, and actual elements are identical in both list + // columns up to the `min(l_end - l_start, r_end - r_start)` element of + // the Dremel encoding. However, two lists can only compare equivalent if + // they are of the same length. Otherwise, the shorter of the two is less + // than the longer. This final check determines the appropriate resulting + // ordering by checking how many total elements each list is composed of. + return cuda::std::pair(detail::compare_elements(l_end - l_start, r_end - r_start), _depth); + } + private: column_device_view const _lhs; column_device_view const _rhs; Nullate const _check_nulls; null_order const _null_precedence; int const _depth; + optional_dremel_view _l_dremel_device_view; + optional_dremel_view _r_dremel_device_view; PhysicalElementComparator const _comparator; }; @@ -396,6 +517,7 @@ class device_row_comparator { size_type const rhs_index) const noexcept { int last_null_depth = std::numeric_limits::max(); + size_type list_column_index{0}; for (size_type i = 0; i < _lhs.num_columns(); ++i) { int const depth = _depth.has_value() ? (*_depth)[i] : 0; if (depth > last_null_depth) { continue; } @@ -406,13 +528,30 @@ class device_row_comparator { null_order const null_precedence = _null_precedence.has_value() ? (*_null_precedence)[i] : null_order::BEFORE; + // TODO: At what point do we verify that the columns of lhs and rhs are + // all of the same types? I assume that it's already happened before + // here, otherwise the current code would be failing. + auto [l_dremel_i, r_dremel_i] = [&]() { + if (_lhs.column(i).type().id() == type_id::LIST) { + auto idx = list_column_index++; + return std::make_tuple(optional_dremel_view(_l_dremel[idx]), + optional_dremel_view(_r_dremel[idx])); + } else { + return std::make_tuple(optional_dremel_view{}, optional_dremel_view{}); + } + }(); + auto element_comp = element_comparator{_check_nulls, + _lhs.column(i), + _rhs.column(i), + null_precedence, + depth, + _comparator, + l_dremel_i, + r_dremel_i}; + weak_ordering state; - cuda::std::tie(state, last_null_depth) = cudf::type_dispatcher( - _lhs.column(i).type(), - element_comparator{ - _check_nulls, _lhs.column(i), _rhs.column(i), null_precedence, depth, _comparator}, - lhs_index, - rhs_index); + cuda::std::tie(state, last_null_depth) = + cudf::type_dispatcher(_lhs.column(i).type(), element_comp, lhs_index, rhs_index); if (state == weak_ordering::EQUIVALENT) { continue; } @@ -426,6 +565,8 @@ class device_row_comparator { private: table_device_view const _lhs; table_device_view const _rhs; + device_span const _l_dremel; + device_span const _r_dremel; Nullate const _check_nulls; std::optional> const _depth; std::optional> const _column_order; @@ -534,6 +675,41 @@ struct preprocessed_table { friend class self_comparator; ///< Allow self_comparator to access private members friend class two_table_comparator; ///< Allow two_table_comparator to access private members + /** + * @brief Construct a preprocessed table for use with lexicographical comparison + * + * Sets up the table for use with lexicographical comparison. The resulting preprocessed table can + * be passed to the constructor of `lexicographic::self_comparator` to avoid preprocessing again. + * + * @param table The table to preprocess + * @param column_order Optional, device array the same length as a row that indicates the desired + * ascending/descending order of each column in a row. If empty, it is assumed all columns are + * sorted in ascending order. + * @param null_precedence Optional, device array the same length as a row and indicates how null + * values compare to all other for every column. If it is nullptr, then null precedence would be + * `null_order::BEFORE` for all columns. + * @param depths The depths of each column resulting from decomposing struct columns. + * @param dremel_data The dremel data for each list column. The length of this object is the + * number of list columns in the table. + * @param dremel_device_views Device views into the dremel_data structs contained in the + * `dremel_data` parameter. For columns that are not list columns, this uvector will should + * contain an empty `dremel_device_view`. As such, this uvector has as many elements as there are + * columns in the table (unlike the `dremel_data` parameter, which is only as long as the number + * of list columns). + */ + preprocessed_table(table_device_view_owner&& table, + rmm::device_uvector&& column_order, + rmm::device_uvector&& null_precedence, + rmm::device_uvector&& depths, + std::vector&& dremel_data, + rmm::device_uvector&& dremel_device_views) + : _t(std::move(table)), + _column_order(std::move(column_order)), + _null_precedence(std::move(null_precedence)), + _depths(std::move(depths)), + _dremel_data(std::move(dremel_data)), + _dremel_device_views(std::move(dremel_device_views)){}; + preprocessed_table(table_device_view_owner&& table, rmm::device_uvector&& column_order, rmm::device_uvector&& null_precedence, @@ -541,7 +717,9 @@ struct preprocessed_table { : _t(std::move(table)), _column_order(std::move(column_order)), _null_precedence(std::move(null_precedence)), - _depths(std::move(depths)){}; + _depths(std::move(depths)), + _dremel_data{}, + _dremel_device_views{} {}; /** * @brief Implicit conversion operator to a `table_device_view` of the preprocessed table. @@ -590,11 +768,24 @@ struct preprocessed_table { return _depths.size() ? std::optional>(_depths) : std::nullopt; } + [[nodiscard]] device_span dremel_device_views() const + { + if (_dremel_device_views.has_value()) { + return device_span(*_dremel_device_views); + } else { + return {}; + } + } + private: table_device_view_owner const _t; rmm::device_uvector const _column_order; rmm::device_uvector const _null_precedence; rmm::device_uvector const _depths; + + // Dremel encoding of list columns used for the comparison algorithm + std::optional> _dremel_data; + std::optional> _dremel_device_views; }; /** @@ -660,22 +851,42 @@ class self_comparator { * @param comparator Physical element relational comparison functor. * @return A binary callable object. */ - template auto less(Nullate nullate = {}, PhysicalElementComparator comparator = {}) const noexcept { - return less_comparator{device_row_comparator{ - nullate, *d_t, *d_t, d_t->depths(), d_t->column_order(), d_t->null_precedence(), comparator}}; + return less_comparator{ + device_row_comparator{ + nullate, + *d_t, + *d_t, + d_t->dremel_device_views(), + d_t->dremel_device_views(), + d_t->depths(), + d_t->column_order(), + d_t->null_precedence(), + comparator}}; } /// @copydoc less() - template auto less_equivalent(Nullate nullate = {}, PhysicalElementComparator comparator = {}) const noexcept { - return less_equivalent_comparator{device_row_comparator{ - nullate, *d_t, *d_t, d_t->depths(), d_t->column_order(), d_t->null_precedence(), comparator}}; + return less_equivalent_comparator{ + device_row_comparator{ + nullate, + *d_t, + *d_t, + d_t->dremel_device_views(), + d_t->dremel_device_views(), + d_t->depths(), + d_t->column_order(), + d_t->null_precedence(), + comparator}}; } private: @@ -792,34 +1003,42 @@ class two_table_comparator { * @param comparator Physical element relational comparison functor. * @return A binary callable object. */ - template auto less(Nullate nullate = {}, PhysicalElementComparator comparator = {}) const noexcept { - return less_comparator{ - strong_index_comparator_adapter{device_row_comparator{nullate, - *d_left_table, - *d_right_table, - d_left_table->depths(), - d_left_table->column_order(), - d_left_table->null_precedence(), - comparator}}}; + return less_comparator{strong_index_comparator_adapter{ + device_row_comparator{ + nullate, + *d_left_table, + *d_right_table, + d_left_table->dremel_device_views(), + d_right_table->dremel_device_views(), + d_left_table->depths(), + d_left_table->column_order(), + d_left_table->null_precedence(), + comparator}}}; } /// @copydoc less() - template auto less_equivalent(Nullate nullate = {}, PhysicalElementComparator comparator = {}) const noexcept { - return less_equivalent_comparator{ - strong_index_comparator_adapter{device_row_comparator{nullate, - *d_left_table, - *d_right_table, - d_left_table->depths(), - d_left_table->column_order(), - d_left_table->null_precedence(), - comparator}}}; + return less_equivalent_comparator{strong_index_comparator_adapter{ + device_row_comparator{ + nullate, + *d_left_table, + *d_right_table, + d_left_table->dremel_device_views(), + d_right_table->dremel_device_views(), + d_left_table->depths(), + d_left_table->column_order(), + d_left_table->null_precedence(), + comparator}}}; } private: diff --git a/cpp/include/cudf/table/table_view.hpp b/cpp/include/cudf/table/table_view.hpp index 4d0aee292f6..8b520714b34 100644 --- a/cpp/include/cudf/table/table_view.hpp +++ b/cpp/include/cudf/table/table_view.hpp @@ -166,6 +166,14 @@ class table_view_base { */ table_view_base& operator=(table_view_base&&) = default; }; + +/** + * @brief Determine if any nested columns exist in a given table. + * + * @param table The input table + * @return Whether nested columns exist in the input table + */ +bool has_nested_columns(table_view const& table); } // namespace detail /** diff --git a/cpp/scripts/gdb-pretty-printers.py b/cpp/scripts/gdb-pretty-printers.py new file mode 100644 index 00000000000..ebb56a8c9e6 --- /dev/null +++ b/cpp/scripts/gdb-pretty-printers.py @@ -0,0 +1,84 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import gdb + +global_locals = locals() +if not all( + name in global_locals + for name in ( + "HostIterator", + "DeviceIterator", + "is_template_type_not_alias", + "template_match", + ) +): + raise NameError( + "This file expects the RMM pretty-printers to be loaded already. " + "Either load them manually, or use the generated load-pretty-printers " + "script in the build directory" + ) + + +class CudfHostSpanPrinter(gdb.printing.PrettyPrinter): + """Print a cudf::host_span""" + + def __init__(self, val): + self.val = val + self.pointer = val["_data"] + self.size = int(val["_size"]) + + def children(self): + return HostIterator(self.pointer, self.size) + + def to_string(self): + return f"{self.val.type} of length {self.size} at {hex(self.pointer)}" + + def display_hint(self): + return "array" + + +class CudfDeviceSpanPrinter(gdb.printing.PrettyPrinter): + """Print a cudf::device_span""" + + def __init__(self, val): + self.val = val + self.pointer = val["_data"] + self.size = int(val["_size"]) + + def children(self): + return DeviceIterator(self.pointer, self.size) + + def to_string(self): + return f"{self.val.type} of length {self.size} at {hex(self.pointer)}" + + def display_hint(self): + return "array" + + +def lookup_cudf_type(val): + if not str(val.type.unqualified()).startswith("cudf::"): + return None + suffix = str(val.type.unqualified())[6:] + if not is_template_type_not_alias(suffix): + return None + if template_match(suffix, "host_span"): + return CudfHostSpanPrinter(val) + if template_match(suffix, "device_span"): + return CudfDeviceSpanPrinter(val) + return None + + +gdb.pretty_printers.append(lookup_cudf_type) diff --git a/cpp/scripts/load-pretty-printers.in b/cpp/scripts/load-pretty-printers.in new file mode 100644 index 00000000000..4c00384c878 --- /dev/null +++ b/cpp/scripts/load-pretty-printers.in @@ -0,0 +1,3 @@ +source @Thrust_SOURCE_DIR@/scripts/gdb-pretty-printers.py +source @rmm_SOURCE_DIR@/scripts/gdb-pretty-printers.py +source @PROJECT_SOURCE_DIR@/scripts/gdb-pretty-printers.py diff --git a/cpp/src/binaryop/compiled/struct_binary_ops.cuh b/cpp/src/binaryop/compiled/struct_binary_ops.cuh index 804b931fa5b..def9ebcef97 100644 --- a/cpp/src/binaryop/compiled/struct_binary_ops.cuh +++ b/cpp/src/binaryop/compiled/struct_binary_ops.cuh @@ -93,9 +93,17 @@ void apply_struct_binary_op(mutable_column_view& out, out.end(), device_comparison_functor{optional_iter, is_lhs_scalar, is_rhs_scalar, device_comparator}); }; - is_any_v - ? tabulate_device_operator(table_comparator.less_equivalent(comparator_nulls, comparator)) - : tabulate_device_operator(table_comparator.less(comparator_nulls, comparator)); + if (cudf::detail::has_nested_columns(tlhs) || cudf::detail::has_nested_columns(trhs)) { + is_any_v + ? tabulate_device_operator( + table_comparator.less_equivalent(comparator_nulls, comparator)) + : tabulate_device_operator(table_comparator.less(comparator_nulls, comparator)); + } else { + is_any_v + ? tabulate_device_operator( + table_comparator.less_equivalent(comparator_nulls, comparator)) + : tabulate_device_operator(table_comparator.less(comparator_nulls, comparator)); + } } template fused_concatenate(host_span views, // Allocate output auto const policy = has_nulls ? mask_policy::ALWAYS : mask_policy::NEVER; auto out_col = detail::allocate_like(views.front(), output_size, policy, stream, mr); - out_col->set_null_count(0); // prevent null count from being materialized - auto out_view = out_col->mutable_view(); - auto d_out_view = mutable_column_device_view::create(out_view, stream); + auto out_view = out_col->mutable_view(); + auto d_out_view = mutable_column_device_view::create(out_view, stream); rmm::device_scalar d_valid_count(0, stream); @@ -253,7 +252,11 @@ std::unique_ptr fused_concatenate(host_span views, *d_out_view, d_valid_count.data()); - if (has_nulls) { out_col->set_null_count(output_size - d_valid_count.value(stream)); } + if (has_nulls) { + out_col->set_null_count(output_size - d_valid_count.value(stream)); + } else { + out_col->set_null_count(0); // prevent null count from being materialized + } return out_col; } @@ -273,8 +276,7 @@ std::unique_ptr for_each_concatenate(host_span views, auto const policy = has_nulls ? mask_policy::ALWAYS : mask_policy::NEVER; auto col = cudf::detail::allocate_like(views.front(), total_element_count, policy, stream, mr); - col->set_null_count(0); // prevent null count from being materialized... - auto m_view = col->mutable_view(); // ...when we take a mutable view + auto m_view = col->mutable_view(); auto count = 0; for (auto& v : views) { @@ -285,6 +287,8 @@ std::unique_ptr for_each_concatenate(host_span views, // If concatenated column is nullable, proceed to calculate it if (has_nulls) { cudf::detail::concatenate_masks(views, (col->mutable_view()).null_mask(), stream); + } else { + col->set_null_count(0); // prevent null count from being materialized } return col; diff --git a/cpp/src/datetime/datetime_ops.cu b/cpp/src/datetime/datetime_ops.cu index 7a20ba8d9d4..ee026d6c395 100644 --- a/cpp/src/datetime/datetime_ops.cu +++ b/cpp/src/datetime/datetime_ops.cu @@ -259,6 +259,8 @@ struct dispatch_round { output->mutable_view().begin(), RoundingDispatcher{round_kind, component}); + output->set_null_count(column.null_count()); + return output; } diff --git a/cpp/src/filling/fill.cu b/cpp/src/filling/fill.cu index f7a5ac5bf26..2abb0cf9795 100644 --- a/cpp/src/filling/fill.cu +++ b/cpp/src/filling/fill.cu @@ -159,7 +159,7 @@ std::unique_ptr out_of_place_fill_range_dispatch::operator()(input, stream, mr); auto mview = result->mutable_view(); cudf::detail::set_null_mask(mview.null_mask(), begin, end, false, stream); - mview.set_null_count(input.null_count() + (end - begin)); + result->set_null_count(input.null_count() + (end - begin)); return result; } diff --git a/cpp/src/groupby/sort/group_correlation.cu b/cpp/src/groupby/sort/group_correlation.cu index 35eaf7fa0de..651c5235210 100644 --- a/cpp/src/groupby/sort/group_correlation.cu +++ b/cpp/src/groupby/sort/group_correlation.cu @@ -206,6 +206,9 @@ std::unique_ptr group_correlation(column_view const& covariance, [] __device__(auto const covariance, auto const stddev) { return covariance / thrust::get<0>(stddev) / thrust::get<1>(stddev); }); + + result->set_null_count(covariance.null_count()); + return result; } diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 424882f45bf..531733a7df7 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -56,13 +56,15 @@ struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; const uint8_t* lvl_end; - const uint8_t* dict_base; // ptr to dictionary page data - int32_t dict_size; // size of dictionary data - int32_t num_rows; // Rows in page to decode - int32_t num_input_values; // total # of input/level values in the page - int32_t dtype_len; // Output data type length - int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit - int32_t dict_bits; // # of bits to store dictionary indices + const uint8_t* dict_base; // ptr to dictionary page data + int32_t dict_size; // size of dictionary data + int32_t first_row; // First row in page to output + int32_t num_rows; // Rows in page to decode (including rows to be skipped) + int32_t first_output_value; // First value in page to output + int32_t num_input_values; // total # of input/level values in the page + int32_t dtype_len; // Output data type length + int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit + int32_t dict_bits; // # of bits to store dictionary indices uint32_t dict_run; int32_t dict_val; uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep] @@ -88,6 +90,7 @@ struct page_state_s { uint32_t def[non_zero_buffer_size]; // circular buffer of definition level values const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded + int32_t row_index_lower_bound; // lower bound of row indices we should process }; /** @@ -811,14 +814,17 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s, * @param[in, out] s The local page state to be filled in * @param[in] p The global page to be copied from * @param[in] chunks The global list of chunks - * @param[in] num_rows Maximum number of rows to process + * @param[in] num_rows Maximum number of rows to read + * @param[in] min_row Crop all rows below min_row */ static __device__ bool setupLocalPageInfo(page_state_s* const s, PageInfo const* p, device_span chunks, + size_t min_row, size_t num_rows) { - int const t = threadIdx.x; + int t = threadIdx.x; + int chunk_idx; // Fetch page info if (t == 0) s->page = *p; @@ -826,7 +832,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; } // Fetch column chunk info - int const chunk_idx = s->page.chunk_idx; + chunk_idx = s->page.chunk_idx; if (t == 0) { s->col = chunks[chunk_idx]; } // zero nested value and valid counts @@ -847,18 +853,19 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // our starting row (absolute index) is // col.start_row == absolute row index // page.chunk-row == relative row index within the chunk - size_t const page_start_row = s->col.start_row + s->page.chunk_row; + size_t page_start_row = s->col.start_row + s->page.chunk_row; // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: // - On page N-1, the last row starts, with 2/6 values encoded // - On page N, the remaining 4/6 values are encoded, but there are no new rows. + // if (s->page.num_input_values > 0 && s->page.num_rows > 0) { if (s->page.num_input_values > 0) { - uint8_t const* cur = s->page.page_data; - uint8_t const* const end = cur + s->page.uncompressed_page_size; + uint8_t* cur = s->page.page_data; + uint8_t* end = cur + s->page.uncompressed_page_size; - uint32_t const dtype_len_out = s->col.data_type >> 3; - s->ts_scale = 0; + uint32_t dtype_len_out = s->col.data_type >> 3; + s->ts_scale = 0; // Validate data type auto const data_type = s->col.data_type & 7; switch (data_type) { @@ -907,10 +914,17 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dtype_len = 8; // Convert to 64-bit timestamp } + // first row within the page to output + if (page_start_row >= min_row) { + s->first_row = 0; + } else { + s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows); + } // # of rows within the page to output s->num_rows = s->page.num_rows; - if (page_start_row + s->num_rows > num_rows) { - s->num_rows = (int32_t)max((int64_t)(num_rows - page_start_row), INT64_C(0)); + if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) { + s->num_rows = + (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0)); } // during the decoding step we need to offset the global output buffers @@ -919,18 +933,25 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step if (s->col.column_data_base != nullptr) { - int const max_depth = s->col.max_nesting_depth; + int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo* pni = &s->page.nesting[idx]; - size_t const output_offset = - s->col.max_level[level_type::REPETITION] == 0 ? page_start_row : pni->page_start_value; + size_t output_offset; + // schemas without lists + if (s->col.max_level[level_type::REPETITION] == 0) { + output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; + } + // for schemas with lists, we've already got the exactly value precomputed + else { + output_offset = pni->page_start_value; + } pni->data_out = static_cast(s->col.column_data_base[idx]); if (pni->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. - uint32_t const len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; + uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; pni->data_out += (output_offset * len); } pni->valid_map = s->col.valid_map_base[idx]; @@ -940,6 +961,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } } } + s->first_output_value = 0; // Find the compressed size of repetition levels cur += InitLevelSection(s, cur, end, level_type::REPETITION); @@ -992,9 +1014,53 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_pos = 0; s->src_pos = 0; - s->input_value_count = 0; - s->input_row_count = 0; - s->input_leaf_count = 0; + // for flat hierarchies, we can't know how many leaf values to skip unless we do a full + // preprocess of the definition levels (since nulls will have no actual decodable value, there + // is no direct correlation between # of rows and # of decodable values). so we will start + // processing at the beginning of the value stream and disregard any indices that start + // before the first row. + if (s->col.max_level[level_type::REPETITION] == 0) { + s->page.skipped_values = 0; + s->page.skipped_leaf_values = 0; + s->input_value_count = 0; + s->input_row_count = 0; + + s->row_index_lower_bound = -1; + } + // for nested hierarchies, we have run a preprocess that lets us skip directly to the values + // we need to start decoding at + else { + // input_row_count translates to "how many rows we have processed so far", so since we are + // skipping directly to where we want to start decoding, set it to first_row + s->input_row_count = s->first_row; + + // return the lower bound to compare (page-relative) thread row index against. Explanation: + // In the case of nested schemas, rows can span page boundaries. That is to say, + // we can encounter the first value for row X on page M, but the last value for page M + // might not be the last value for row X. page M+1 (or further) may contain the last value. + // + // This means that the first values we encounter for a given page (M+1) may not belong to the + // row indicated by chunk_row, but to the row before it that spanned page boundaries. If that + // previous row is within the overall row bounds, include the values by allowing relative row + // index -1 + int const max_row = (min_row + num_rows) - 1; + if (min_row < page_start_row && max_row >= page_start_row - 1) { + s->row_index_lower_bound = -1; + } else { + s->row_index_lower_bound = s->first_row; + } + + // if we're in the decoding step, jump directly to the first + // value we care about + if (s->col.column_data_base != nullptr) { + s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; + } else { + s->input_value_count = 0; + s->input_leaf_count = 0; + s->page.skipped_values = -1; + s->page.skipped_leaf_values = -1; + } + } __threadfence_block(); } @@ -1140,7 +1206,10 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); input_row_count += __popc(warp_row_count_mask); // is this thread within read row bounds? - int const in_row_bounds = thread_row_index < s->num_rows; + int const in_row_bounds = thread_row_index >= s->row_index_lower_bound && + thread_row_index < (s->first_row + s->num_rows) + ? 1 + : 0; // compute warp and thread value counts uint32_t const warp_count_mask = @@ -1215,7 +1284,9 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // the correct position to start reading. since we are about to write the validity vector here // we need to adjust our computed mask to take into account the write row bounds. int const in_write_row_bounds = - max_depth == 1 ? thread_row_index < s->num_rows : in_row_bounds; + max_depth == 1 + ? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows) + : in_row_bounds; int const first_thread_in_write_range = max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; // # of bits to of the validity mask to write out @@ -1303,11 +1374,16 @@ __device__ void gpuDecodeLevels(page_state_s* s, int32_t target_leaf_count, int * @param[in] s The local page info * @param[in] target_input_value_count The # of repetition/definition levels to process up to * @param[in] t Thread index + * @param[in] bounds_set Whether or not s->row_index_lower_bound, s->first_row and s->num_rows + * have been computed for this page (they will only be set in the second/trim pass). */ -static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_value_count, int t) +static __device__ void gpuUpdatePageSizes(page_state_s* s, + int32_t target_input_value_count, + int t, + bool bounds_set) { // max nesting depth of the column - int const max_depth = s->col.max_nesting_depth; + int max_depth = s->col.max_nesting_depth; // bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; @@ -1322,23 +1398,44 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_ start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int const is_new_row = start_depth == 0 ? 1 : 0; - uint32_t const warp_row_count_mask = ballot(is_new_row); - int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; - uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); - - // is this thread within row bounds? - int32_t const thread_row_index = - input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); - int const in_row_bounds = thread_row_index < s->num_rows; + int is_new_row = start_depth == 0 ? 1 : 0; + uint32_t warp_row_count_mask = ballot(is_new_row); + int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + uint32_t warp_leaf_count_mask = ballot(is_new_leaf); + + // is this thread within row bounds? on the first pass we don't know the bounds, so we will be + // computing the full size of the column. on the second pass, we will know our actual row + // bounds, so the computation will cap sizes properly. + int in_row_bounds = 1; + if (bounds_set) { + // absolute row index + int32_t thread_row_index = + input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); + in_row_bounds = thread_row_index >= s->row_index_lower_bound && + thread_row_index < (s->first_row + s->num_rows) + ? 1 + : 0; + + uint32_t row_bounds_mask = ballot(in_row_bounds); + int first_thread_in_range = __ffs(row_bounds_mask) - 1; + + // if we've found the beginning of the first row, mark down the position + // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values) + if (!t && first_thread_in_range >= 0 && s->page.skipped_values < 0) { + // how many values we've skipped in the rep/def levels + s->page.skipped_values = input_value_count + first_thread_in_range; + // how many values we've skipped in the actual data stream + s->page.skipped_leaf_values = + input_leaf_count + __popc(warp_leaf_count_mask & ((1 << first_thread_in_range) - 1)); + } + } // increment counts across all nesting depths for (int s_idx = 0; s_idx < max_depth; s_idx++) { // if we are within the range of nesting levels we should be adding value indices for - int const in_nesting_bounds = - (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; - uint32_t const count_mask = ballot(in_nesting_bounds); + uint32_t count_mask = ballot(in_nesting_bounds); if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } } @@ -1362,18 +1459,29 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_ * * @param pages List of pages * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows. + * @param trim_pass Whether or not this is the trim pass. We first have to compute + * the full size information of every page before we come through in a second (trim) pass + * to determine what subset of rows in this page we should be reading. */ __global__ void __launch_bounds__(block_size) - gpuComputePageSizes(PageInfo* pages, device_span chunks) + gpuComputePageSizes(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + bool trim_pass) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* const pp = &pages[page_idx]; + int page_idx = blockIdx.x; + int t = threadIdx.x; + PageInfo* pp = &pages[page_idx]; - if (!setupLocalPageInfo(s, pp, chunks, INT_MAX)) { return; } + if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { + return; + } // zero sizes int d = 0; @@ -1382,12 +1490,21 @@ __global__ void __launch_bounds__(block_size) d += blockDim.x; } if (!t) { - s->input_row_count = 0; - s->input_value_count = 0; + s->page.skipped_values = -1; + s->page.skipped_leaf_values = -1; + s->input_row_count = 0; + s->input_value_count = 0; + + // if this isn't the trim pass, make sure we visit absolutely everything + if (!trim_pass) { + s->first_row = 0; + s->num_rows = INT_MAX; + s->row_index_lower_bound = -1; + } } __syncthreads(); - bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step @@ -1406,18 +1523,22 @@ __global__ void __launch_bounds__(block_size) __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int const actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], - s->lvl_count[level_type::DEFINITION]) - : s->lvl_count[level_type::DEFINITION]; + int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back - gpuUpdatePageSizes(s, actual_input_count, t); + gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); target_input_count = actual_input_count + batch_size; __syncwarp(); } } // update # rows in the actual page - if (!t) { pp->num_rows = s->page.nesting[0].size; } + if (!t) { + pp->num_rows = s->page.nesting[0].size; + pp->skipped_values = s->page.skipped_values; + pp->skipped_leaf_values = s->page.skipped_leaf_values; + } } /** @@ -1430,19 +1551,20 @@ __global__ void __launch_bounds__(block_size) * * @param pages List of pages * @param chunks List of column chunks + * @param min_row Row index to start reading at * @param num_rows Maximum number of rows to read */ -__global__ void __launch_bounds__(block_size) - gpuDecodePageData(PageInfo* pages, device_span chunks, size_t num_rows) +__global__ void __launch_bounds__(block_size) gpuDecodePageData( + PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; + int page_idx = blockIdx.x; + int t = threadIdx.x; int out_thread0; - if (!setupLocalPageInfo(s, &pages[page_idx], chunks, num_rows)) { return; } + if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; } if (s->dict_base) { out_thread0 = (s->dict_bits > 0) ? 64 : 32; @@ -1451,6 +1573,8 @@ __global__ void __launch_bounds__(block_size) ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -1470,7 +1594,8 @@ __global__ void __launch_bounds__(block_size) // - produces non-NULL value indices in s->nz_idx for subsequent decoding gpuDecodeLevels(s, target_pos, t); } else if (t < out_thread0) { - uint32_t src_target_pos = target_pos; + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t src_target_pos = target_pos + skipped_leaf_values; // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { @@ -1483,51 +1608,70 @@ __global__ void __launch_bounds__(block_size) if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values - int const dtype = s->col.data_type & 7; + int dtype = s->col.data_type & 7; src_pos += t - out_thread0; // the position in the output column/buffer - int const dst_pos = s->nz_idx[rolling_index(src_pos)]; + int dst_pos = s->nz_idx[rolling_index(src_pos)]; + + // for the flat hierarchy case we will be reading from the beginning of the value stream, + // regardless of the value of first_row. so adjust our destination offset accordingly. + // example: + // - user has passed skip_rows = 2, so our first_row to output is 2 + // - the row values we get from nz_idx will be + // 0, 1, 2, 3, 4 .... + // - by shifting these values by first_row, the sequence becomes + // -1, -2, 0, 1, 2 ... + // - so we will end up ignoring the first two input rows, and input rows 2..n will + // get written to the output starting at position 0. + // + if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; } // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. if (src_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + uint32_t val_src_pos = src_pos + skipped_leaf_values; + // nesting level that is storing actual leaf values - int const leaf_level_index = s->col.max_nesting_depth - 1; + int leaf_level_index = s->col.max_nesting_depth - 1; - uint32_t const dtype_len = s->dtype_len; + uint32_t dtype_len = s->dtype_len; void* dst = s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { - gpuOutputString(s, src_pos, dst); + gpuOutputString(s, val_src_pos, dst); } else if (dtype == BOOLEAN) { - gpuOutputBoolean(s, src_pos, static_cast(dst)); + gpuOutputBoolean(s, val_src_pos, static_cast(dst)); } else if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: gpuOutputFast(s, src_pos, static_cast(dst)); break; - case INT64: gpuOutputFast(s, src_pos, static_cast(dst)); break; + case INT32: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; + case INT64: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; default: if (s->dtype_len_in <= sizeof(int32_t)) { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); } else if (s->dtype_len_in <= sizeof(int64_t)) { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); } else { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast<__int128_t*>(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst)); } break; } } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, src_pos, static_cast(dst)); + gpuOutputInt96Timestamp(s, val_src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->ts_scale) { - gpuOutputInt64Timestamp(s, src_pos, static_cast(dst)); + gpuOutputInt64Timestamp(s, val_src_pos, static_cast(dst)); } else { - gpuOutputFast(s, src_pos, static_cast(dst)); + gpuOutputFast(s, val_src_pos, static_cast(dst)); } } else if (dtype_len == 4) { - gpuOutputFast(s, src_pos, static_cast(dst)); + gpuOutputFast(s, val_src_pos, static_cast(dst)); } else { - gpuOutputGeneric(s, src_pos, static_cast(dst), dtype_len); + gpuOutputGeneric(s, val_src_pos, static_cast(dst), dtype_len); } } @@ -1598,6 +1742,8 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, + size_t min_row, + bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -1606,7 +1752,16 @@ void PreprocessColumnData(hostdevice_vector& pages, // computes: // PageNestingInfo::size for each level of nesting, for each page. - gpuComputePageSizes<<>>(pages.device_ptr(), chunks); + // This computes the size for the entire page, not taking row bounds into account. + // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims" + // the starting and ending read values to account for these bounds. + gpuComputePageSizes<<>>( + pages.device_ptr(), + chunks, + // if uses_custom_row_bounds is false, include all possible rows. + uses_custom_row_bounds ? min_row : 0, + uses_custom_row_bounds ? num_rows : INT_MAX, + !uses_custom_row_bounds); // computes: // PageInfo::chunk_row for all pages @@ -1620,6 +1775,16 @@ void PreprocessColumnData(hostdevice_vector& pages, page_input, chunk_row_output_iter{pages.device_ptr()}); + // computes: + // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account. + // PageInfo::skipped_values, which tells us where to start decoding in the input . + // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has + // specified artifical bounds). + if (uses_custom_row_bounds) { + gpuComputePageSizes<<>>( + pages.device_ptr(), chunks, min_row, num_rows, true); + } + // ordering of pages is by input column schema, repeated across row groups. so // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like // @@ -1684,11 +1849,13 @@ void PreprocessColumnData(hostdevice_vector& pages, // Handle a specific corner case. It is possible to construct a parquet file such that // a column within a row group contains more rows than the row group itself. This may be // invalid, but we have seen instances of this in the wild, including how they were created - // using the apache parquet tools. So we need to cap the number of rows we will - // allocate/read from the file with the amount specified in the associated row group. This - // only applies to columns that are not children of lists as those may have an arbitrary - // number of rows in them. - if (!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && + // using the apache parquet tools. Normally, the trim pass would handle this case quietly, + // but if we are not running the trim pass (which is most of the time) we need to cap the + // number of rows we will allocate/read from the file with the amount specified in the + // associated row group. This only applies to columns that are not children of lists as + // those may have an arbitrary number of rows in them. + if (!uses_custom_row_bounds && + !(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && size > static_cast(num_rows)) { size = static_cast(num_rows); } @@ -1723,13 +1890,14 @@ void PreprocessColumnData(hostdevice_vector& pages, void __host__ DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, + size_t min_row, rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page gpuDecodePageData<<>>( - pages.device_ptr(), chunks, num_rows); + pages.device_ptr(), chunks, min_row, num_rows); } } // namespace gpu diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index ba8a5b0be4a..d0d367df962 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -135,6 +135,19 @@ struct PageInfo { Encoding definition_level_encoding; // Encoding used for definition levels (data page) Encoding repetition_level_encoding; // Encoding used for repetition levels (data page) + // for nested types, we run a preprocess step in order to determine output + // column sizes. Because of this, we can jump directly to the position in the + // input data to start decoding instead of reading all of the data and discarding + // rows we don't care about. + // + // NOTE: for flat hierarchies we do not do the preprocess step, so skipped_values and + // skipped_leaf_values will always be 0. + // + // # of values skipped in the repetition/definition level stream + int skipped_values; + // # of values skipped in the actual data stream. + int skipped_leaf_values; + // nesting information (input/output) for each page int num_nesting_levels; PageNestingInfo* nesting; @@ -407,6 +420,9 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, * @param input_columns Input column information * @param output_columns Output column information * @param num_rows Maximum number of rows to read + * @param min_rows crop all rows below min_row + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param stream Cuda stream */ void PreprocessColumnData(hostdevice_vector& pages, @@ -414,6 +430,8 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, + size_t min_row, + bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -426,11 +444,13 @@ void PreprocessColumnData(hostdevice_vector& pages, * @param[in,out] pages All pages to be decoded * @param[in] chunks All chunks to be decoded * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read * @param[in] stream CUDA stream to use, default 0 */ void DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, + size_t min_row, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index d2598c79fda..59bef6f5600 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -542,14 +542,15 @@ class aggregate_reader_metadata { * @brief Filters and reduces down to a selection of row groups * * @param row_groups Lists of row groups to read, one per source + * @param row_start Starting row of the selection + * @param row_count Total number of rows selected * - * @return List of row group info structs and the total number of rows + * @return List of row group indexes and its starting row */ - [[nodiscard]] std::pair, size_type> select_row_groups( - std::vector> const& row_groups) const + [[nodiscard]] auto select_row_groups(std::vector> const& row_groups, + size_type& row_start, + size_type& row_count) const { - size_type row_count = 0; - if (!row_groups.empty()) { std::vector selection; CUDF_EXPECTS(row_groups.size() == per_file_metadata.size(), @@ -566,12 +567,17 @@ class aggregate_reader_metadata { row_count += get_row_group(rowgroup_idx, src_idx).num_rows; } } - return {selection, row_count}; + return selection; } - row_count = static_cast( - std::min(get_num_rows(), std::numeric_limits::max())); + row_start = std::max(row_start, 0); + if (row_count < 0) { + row_count = static_cast( + std::min(get_num_rows(), std::numeric_limits::max())); + } + row_count = min(row_count, get_num_rows() - row_start); CUDF_EXPECTS(row_count >= 0, "Invalid row count"); + CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); std::vector selection; size_type count = 0; @@ -579,12 +585,14 @@ class aggregate_reader_metadata { for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { auto const chunk_start_row = count; count += get_row_group(rg_idx, src_idx).num_rows; - selection.emplace_back(rg_idx, chunk_start_row, src_idx); - if (count >= row_count) { break; } + if (count > row_start || count == 0) { + selection.emplace_back(rg_idx, chunk_start_row, src_idx); + } + if (count >= row_start + row_count) { break; } } } - return {selection, row_count}; + return selection; } /** @@ -1343,7 +1351,9 @@ void reader::impl::allocate_nesting_info(hostdevice_vector */ void reader::impl::preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t num_rows, + size_t min_row, + size_t total_rows, + bool uses_custom_row_bounds, bool has_lists) { // TODO : we should be selectively preprocessing only columns that have @@ -1356,15 +1366,22 @@ void reader::impl::preprocess_columns(hostdevice_vector& c [&](std::vector& cols) { for (size_t idx = 0; idx < cols.size(); idx++) { auto& col = cols[idx]; - col.create(num_rows, _stream, _mr); + col.create(total_rows, _stream, _mr); create_columns(col.children); } }; create_columns(_output_columns); } else { // preprocess per-nesting level sizes by page - gpu::PreprocessColumnData( - pages, chunks, _input_columns, _output_columns, num_rows, _stream, _mr); + gpu::PreprocessColumnData(pages, + chunks, + _input_columns, + _output_columns, + total_rows, + min_row, + uses_custom_row_bounds, + _stream, + _mr); _stream.synchronize(); } } @@ -1375,6 +1392,7 @@ void reader::impl::preprocess_columns(hostdevice_vector& c void reader::impl::decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, + size_t min_row, size_t total_rows) { auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { @@ -1496,7 +1514,7 @@ void reader::impl::decode_page_data(hostdevice_vector& chu gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); } - gpu::DecodePageData(pages, chunks, total_rows, _stream); + gpu::DecodePageData(pages, chunks, total_rows, min_row, _stream); pages.device_to_host(_stream); page_nesting.device_to_host(_stream); _stream.synchronize(); @@ -1588,10 +1606,14 @@ reader::impl::impl(std::vector>&& sources, _timestamp_type.id()); } -table_with_metadata reader::impl::read(std::vector> const& row_group_list) +table_with_metadata reader::impl::read(size_type skip_rows, + size_type num_rows, + bool uses_custom_row_bounds, + std::vector> const& row_group_list) { // Select only row groups required - const auto [selected_row_groups, num_rows] = _metadata->select_row_groups(row_group_list); + const auto selected_row_groups = + _metadata->select_row_groups(row_group_list, skip_rows, num_rows); table_metadata out_metadata; @@ -1733,10 +1755,10 @@ table_with_metadata reader::impl::read(std::vector> const // // - for nested schemas, output buffer offset values per-page, per nesting-level for the // purposes of decoding. - preprocess_columns(chunks, pages, num_rows, has_lists); + preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists); // decoding of column data itself - decode_page_data(chunks, pages, page_nesting_info, num_rows); + decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows); // create the final output cudf columns for (size_t i = 0; i < _output_columns.size(); ++i) { @@ -1787,7 +1809,12 @@ reader::~reader() = default; // Forward to implementation table_with_metadata reader::read(parquet_reader_options const& options) { - return _impl->read(options.get_row_groups()); + // if the user has specified custom row bounds + bool const uses_custom_row_bounds = options.get_num_rows() >= 0 || options.get_skip_rows() != 0; + return _impl->read(options.get_skip_rows(), + options.get_num_rows(), + uses_custom_row_bounds, + options.get_row_groups()); } } // namespace parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index b46fe042a13..e1f275bb8e8 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -69,11 +69,18 @@ class reader::impl { /** * @brief Read an entire set or a subset of data and returns a set of columns * + * @param skip_rows Number of rows to skip from the start + * @param num_rows Number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param row_group_indices Lists of row groups to read, one per source * * @return The set of columns along with metadata */ - table_with_metadata read(std::vector> const& row_group_indices); + table_with_metadata read(size_type skip_rows, + size_type num_rows, + bool uses_custom_row_bounds, + std::vector> const& row_group_indices); private: /** @@ -152,13 +159,18 @@ class reader::impl { * * @param chunks All chunks to be decoded * @param pages All pages to be decoded - * @param num_rows The number of rows to be decoded + * @param min_rows crop all rows below min_row + * @param total_rows Maximum number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param has_lists Whether or not this data contains lists and requires * a preprocess. */ void preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t num_rows, + size_t min_row, + size_t total_rows, + bool uses_custom_row_bounds, bool has_lists); /** @@ -167,11 +179,13 @@ class reader::impl { * @param chunks List of column chunk descriptors * @param pages List of page information * @param page_nesting Page nesting array + * @param min_row Minimum number of rows from start * @param total_rows Number of rows to output */ void decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, + size_t min_row, size_t total_rows); private: diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 042afc01253..cba45f693f9 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -88,8 +88,7 @@ class file_sink : public data_sink { void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream) override { - if (!supports_device_write()) CUDF_FAIL("Device writes are not supported for this file."); - return device_write_async(gpu_data, _bytes_written, stream).get(); + return device_write_async(gpu_data, size, stream).get(); } private: diff --git a/cpp/src/lists/copying/scatter_helper.cu b/cpp/src/lists/copying/scatter_helper.cu index d8c481e38a9..cbb3aec76c5 100644 --- a/cpp/src/lists/copying/scatter_helper.cu +++ b/cpp/src/lists/copying/scatter_helper.cu @@ -203,6 +203,8 @@ struct list_child_constructor { return actual_list_row.template element(intra_index); }); + child_column->set_null_count(child_null_mask.second); + return child_column; } diff --git a/cpp/src/lists/set_operations.cu b/cpp/src/lists/set_operations.cu index 865760618c8..00cdfcf7ff1 100644 --- a/cpp/src/lists/set_operations.cu +++ b/cpp/src/lists/set_operations.cu @@ -119,6 +119,8 @@ std::unique_ptr have_overlap(lists_column_view const& lhs, list_indices.begin(), result_begin); + result->set_null_count(null_count); + return result; } diff --git a/cpp/src/reductions/simple_segmented.cuh b/cpp/src/reductions/simple_segmented.cuh index 774b9075736..321ff0e7410 100644 --- a/cpp/src/reductions/simple_segmented.cuh +++ b/cpp/src/reductions/simple_segmented.cuh @@ -113,11 +113,11 @@ std::unique_ptr simple_segmented_reduction( } // Compute the output null mask - auto const bitmask = col.null_mask(); - auto const first_bit_indices_begin = offsets.begin(); - auto const first_bit_indices_end = offsets.end() - 1; - auto const last_bit_indices_begin = first_bit_indices_begin + 1; - auto const [output_null_mask, output_null_count] = cudf::detail::segmented_null_mask_reduction( + auto const bitmask = col.null_mask(); + auto const first_bit_indices_begin = offsets.begin(); + auto const first_bit_indices_end = offsets.end() - 1; + auto const last_bit_indices_begin = first_bit_indices_begin + 1; + auto [output_null_mask, output_null_count] = cudf::detail::segmented_null_mask_reduction( bitmask, first_bit_indices_begin, first_bit_indices_end, @@ -126,7 +126,7 @@ std::unique_ptr simple_segmented_reduction( init.has_value() ? std::optional(init.value().get().is_valid()) : std::nullopt, stream, mr); - result->set_null_mask(output_null_mask, output_null_count, stream); + result->set_null_mask(std::move(output_null_mask), output_null_count); return result; } @@ -187,7 +187,7 @@ std::unique_ptr string_segmented_reduction(column_view const& col, stream, mr) ->release()[0]); - auto const [segmented_null_mask, segmented_null_count] = + auto [segmented_null_mask, segmented_null_count] = cudf::detail::segmented_null_mask_reduction(col.null_mask(), offsets.begin(), offsets.end() - 1, @@ -202,7 +202,7 @@ std::unique_ptr string_segmented_reduction(column_view const& col, if (segmented_null_count > 0) { if (result->null_count() == 0) { // The result has no nulls. Use the segmented null mask. - result->set_null_mask(segmented_null_mask, segmented_null_count, stream); + result->set_null_mask(std::move(segmented_null_mask), segmented_null_count); } else { // Compute the logical AND of the segmented output null mask and the // result null mask to update the result null mask and null count. diff --git a/cpp/src/replace/nans.cu b/cpp/src/replace/nans.cu index a3cf7bced1f..47776422adb 100644 --- a/cpp/src/replace/nans.cu +++ b/cpp/src/replace/nans.cu @@ -203,6 +203,7 @@ std::unique_ptr normalize_nans_and_zeros(column_view const& input, // from device. unique_ptr which gets automatically cleaned up when we leave. auto out_view = out->mutable_view(); normalize_nans_and_zeros(out_view, stream); + out->set_null_count(input.null_count()); return out; } diff --git a/cpp/src/round/round.cu b/cpp/src/round/round.cu index 627ee931427..c60ce7295fb 100644 --- a/cpp/src/round/round.cu +++ b/cpp/src/round/round.cu @@ -228,6 +228,8 @@ std::unique_ptr round_with(column_view const& input, thrust::transform( rmm::exec_policy(stream), input.begin(), input.end(), out_view.begin(), Functor{n}); + result->set_null_count(input.null_count()); + return result; } @@ -277,6 +279,8 @@ std::unique_ptr round_with(column_view const& input, FixedPointRoundFunctor{n}); } + result->set_null_count(input.null_count()); + return result; } diff --git a/cpp/src/search/contains_column.cu b/cpp/src/search/contains_column.cu index 51d265263fb..c7631385270 100644 --- a/cpp/src/search/contains_column.cu +++ b/cpp/src/search/contains_column.cu @@ -86,6 +86,9 @@ struct contains_column_dispatch { out_begin, contains_fn{ haystack_set_dv, *needles_cdv_ptr, needles.has_nulls()}); + + result->set_null_count(needles.null_count()); + return result; } diff --git a/cpp/src/search/search_ordered.cu b/cpp/src/search/search_ordered.cu index e6bcdcedf64..8d3b0f97726 100644 --- a/cpp/src/search/search_ordered.cu +++ b/cpp/src/search/search_ordered.cu @@ -68,28 +68,49 @@ std::unique_ptr search_ordered(table_view const& haystack, auto const comparator = cudf::experimental::row::lexicographic::two_table_comparator( matched_haystack, matched_needles, column_order, null_precedence, stream); - auto const has_nulls = has_nested_nulls(matched_haystack) or has_nested_nulls(matched_needles); - auto const d_comparator = comparator.less(nullate::DYNAMIC{has_nulls}); + auto const has_nulls = has_nested_nulls(matched_haystack) or has_nested_nulls(matched_needles); auto const haystack_it = cudf::experimental::row::lhs_iterator(0); auto const needles_it = cudf::experimental::row::rhs_iterator(0); - if (find_first) { - thrust::lower_bound(rmm::exec_policy(stream), - haystack_it, - haystack_it + haystack.num_rows(), - needles_it, - needles_it + needles.num_rows(), - out_it, - d_comparator); + if (cudf::detail::has_nested_columns(haystack) || cudf::detail::has_nested_columns(needles)) { + auto const d_comparator = comparator.less(nullate::DYNAMIC{has_nulls}); + if (find_first) { + thrust::lower_bound(rmm::exec_policy(stream), + haystack_it, + haystack_it + haystack.num_rows(), + needles_it, + needles_it + needles.num_rows(), + out_it, + d_comparator); + } else { + thrust::upper_bound(rmm::exec_policy(stream), + haystack_it, + haystack_it + haystack.num_rows(), + needles_it, + needles_it + needles.num_rows(), + out_it, + d_comparator); + } } else { - thrust::upper_bound(rmm::exec_policy(stream), - haystack_it, - haystack_it + haystack.num_rows(), - needles_it, - needles_it + needles.num_rows(), - out_it, - d_comparator); + auto const d_comparator = comparator.less(nullate::DYNAMIC{has_nulls}); + if (find_first) { + thrust::lower_bound(rmm::exec_policy(stream), + haystack_it, + haystack_it + haystack.num_rows(), + needles_it, + needles_it + needles.num_rows(), + out_it, + d_comparator); + } else { + thrust::upper_bound(rmm::exec_policy(stream), + haystack_it, + haystack_it + haystack.num_rows(), + needles_it, + needles_it + needles.num_rows(), + out_it, + d_comparator); + } } return result; } diff --git a/cpp/src/sort/sort_impl.cuh b/cpp/src/sort/sort_impl.cuh index f98fda307b8..97fc8ac14cb 100644 --- a/cpp/src/sort/sort_impl.cuh +++ b/cpp/src/sort/sort_impl.cuh @@ -127,18 +127,32 @@ std::unique_ptr sorted_order(table_view input, auto comp = experimental::row::lexicographic::self_comparator(input, column_order, null_precedence, stream); - auto comparator = comp.less(nullate::DYNAMIC{has_nested_nulls(input)}); - - if (stable) { - thrust::stable_sort(rmm::exec_policy(stream), - mutable_indices_view.begin(), - mutable_indices_view.end(), - comparator); + if (cudf::detail::has_nested_columns(input)) { + auto comparator = comp.less(nullate::DYNAMIC{has_nested_nulls(input)}); + if (stable) { + thrust::stable_sort(rmm::exec_policy(stream), + mutable_indices_view.begin(), + mutable_indices_view.end(), + comparator); + } else { + thrust::sort(rmm::exec_policy(stream), + mutable_indices_view.begin(), + mutable_indices_view.end(), + comparator); + } } else { - thrust::sort(rmm::exec_policy(stream), - mutable_indices_view.begin(), - mutable_indices_view.end(), - comparator); + auto comparator = comp.less(nullate::DYNAMIC{has_nested_nulls(input)}); + if (stable) { + thrust::stable_sort(rmm::exec_policy(stream), + mutable_indices_view.begin(), + mutable_indices_view.end(), + comparator); + } else { + thrust::sort(rmm::exec_policy(stream), + mutable_indices_view.begin(), + mutable_indices_view.end(), + comparator); + } } // protection for temporary d_column_order and d_null_precedence stream.synchronize(); diff --git a/cpp/src/strings/capitalize.cu b/cpp/src/strings/capitalize.cu index dbe0d277033..4328765773f 100644 --- a/cpp/src/strings/capitalize.cu +++ b/cpp/src/strings/capitalize.cu @@ -278,6 +278,7 @@ std::unique_ptr is_title(strings_column_view const& input, thrust::make_counting_iterator(input.size()), results->mutable_view().data(), is_title_fn{get_character_flags_table(), *d_column}); + results->set_null_count(input.null_count()); return results; } diff --git a/cpp/src/strings/contains.cu b/cpp/src/strings/contains.cu index abe96d5ab34..431e9ae26e3 100644 --- a/cpp/src/strings/contains.cu +++ b/cpp/src/strings/contains.cu @@ -79,6 +79,8 @@ std::unique_ptr contains_impl(strings_column_view const& input, launch_transform_kernel( contains_fn{*d_strings, beginning_only}, *d_prog, d_results, input.size(), stream); + results->set_null_count(input.null_count()); + return results; } diff --git a/cpp/src/strings/like.cu b/cpp/src/strings/like.cu index 82c19796378..2d3a3d3d52a 100644 --- a/cpp/src/strings/like.cu +++ b/cpp/src/strings/like.cu @@ -132,6 +132,8 @@ std::unique_ptr like( results->mutable_view().data(), like_fn{*d_strings, d_pattern, d_escape}); + results->set_null_count(input.null_count()); + return results; } diff --git a/cpp/src/strings/regex/regcomp.cpp b/cpp/src/strings/regex/regcomp.cpp index c84b1e630c9..9b1013bae09 100644 --- a/cpp/src/strings/regex/regcomp.cpp +++ b/cpp/src/strings/regex/regcomp.cpp @@ -298,9 +298,6 @@ class regex_parser { if (!is_quoted && chr == '^') { type = NCCLASS; std::tie(is_quoted, chr) = next_char(); - // negated classes also don't match '\n' - literals.push_back('\n'); - literals.push_back('\n'); } // parse class into a set of spans @@ -559,6 +556,9 @@ class regex_parser { // are treated as regex expressions and sometimes they are not. if (_items.empty()) { CUDF_FAIL("invalid regex pattern: nothing to repeat at position 0"); } + // handle alternation instruction + if (chr == '|') return OR; + // Check that the previous item can be used with quantifiers. // If the previous item is a capture group, we need to check items inside the // capture group can be used with quantifiers too. @@ -679,7 +679,6 @@ class regex_parser { // otherwise, fixed counted quantifier return COUNTED; } - case '|': return OR; } _chr = chr; return CHAR; diff --git a/cpp/src/strings/strip.cu b/cpp/src/strings/strip.cu index 8f9794f6679..5d51a5a7bed 100644 --- a/cpp/src/strings/strip.cu +++ b/cpp/src/strings/strip.cu @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -28,11 +29,6 @@ #include #include -#include -#include -#include -#include - namespace cudf { namespace strings { namespace detail { @@ -59,38 +55,15 @@ struct strip_fn { if (!d_chars) d_offsets[idx] = 0; return; } + auto const d_str = d_strings.element(idx); - auto is_strip_character = [d_to_strip = d_to_strip] __device__(char_utf8 chr) -> bool { - return d_to_strip.empty() ? (chr <= ' ') : // whitespace check - thrust::any_of( - thrust::seq, d_to_strip.begin(), d_to_strip.end(), [chr] __device__(char_utf8 c) { - return c == chr; - }); - }; - - size_type const left_offset = [&] { - if (side != side_type::LEFT && side != side_type::BOTH) return 0; - auto const itr = - thrust::find_if_not(thrust::seq, d_str.begin(), d_str.end(), is_strip_character); - return itr != d_str.end() ? itr.byte_offset() : d_str.size_bytes(); - }(); - - size_type right_offset = d_str.size_bytes(); - if (side == side_type::RIGHT || side == side_type::BOTH) { - auto const length = d_str.length(); - auto itr = d_str.end(); - for (size_type n = 0; n < length; ++n) { - if (!is_strip_character(*(--itr))) break; - right_offset = itr.byte_offset(); - } + auto const d_stripped = strip(d_str, d_to_strip, side); + if (d_chars) { + copy_string(d_chars + d_offsets[idx], d_stripped); + } else { + d_offsets[idx] = d_stripped.size_bytes(); } - - auto const bytes = (right_offset > left_offset) ? right_offset - left_offset : 0; - if (d_chars) - memcpy(d_chars + d_offsets[idx], d_str.data() + left_offset, bytes); - else - d_offsets[idx] = bytes; } }; diff --git a/cpp/src/table/row_operators.cu b/cpp/src/table/row_operators.cu index af88d6776a4..05e8860d63d 100644 --- a/cpp/src/table/row_operators.cu +++ b/cpp/src/table/row_operators.cu @@ -255,6 +255,24 @@ auto decompose_structs(table_view table, std::move(verticalized_col_depths)); } +/* + * This helper function generates dremel data for any list-type columns in a + * table. This data is necessary for lexicographic comparisons. + */ +auto list_lex_preprocess(table_view table, rmm::cuda_stream_view stream) +{ + std::vector dremel_data; + std::vector dremel_device_views; + for (auto const& col : table) { + if (col.type().id() == type_id::LIST) { + dremel_data.push_back(detail::get_dremel_data(col, {}, false, stream)); + dremel_device_views.push_back(dremel_data.back()); + } + } + auto d_dremel_device_views = detail::make_device_uvector_sync(dremel_device_views, stream); + return std::make_tuple(std::move(dremel_data), std::move(d_dremel_device_views)); +} + using column_checker_fn_t = std::function; /** @@ -264,18 +282,25 @@ using column_checker_fn_t = std::function; */ void check_lex_compatibility(table_view const& input) { - // Basically check if there's any LIST hiding anywhere in the table + // Basically check if there's any LIST of STRUCT or STRUCT of LIST hiding anywhere in the table column_checker_fn_t check_column = [&](column_view const& c) { - CUDF_EXPECTS(c.type().id() != type_id::LIST, - "Cannot lexicographic compare a table with a LIST column"); + if (c.type().id() == type_id::LIST) { + auto const& list_col = lists_column_view(c); + CUDF_EXPECTS(list_col.child().type().id() != type_id::STRUCT, + "Cannot lexicographic compare a table with a LIST of STRUCT column"); + check_column(list_col.child()); + } else if (c.type().id() == type_id::STRUCT) { + for (auto child = c.child_begin(); child < c.child_end(); ++child) { + CUDF_EXPECTS(child->type().id() != type_id::LIST, + "Cannot lexicographic compare a table with a STRUCT of LIST column"); + check_column(*child); + } + } if (not is_nested(c.type())) { CUDF_EXPECTS(is_relationally_comparable(c.type()), "Cannot lexicographic compare a table with a column of type " + jit::get_type_name(c.type())); } - for (auto child = c.child_begin(); child < c.child_end(); ++child) { - check_column(*child); - } }; for (column_view const& c : input) { check_column(c); @@ -336,8 +361,21 @@ std::shared_ptr preprocessed_table::create( auto d_null_precedence = detail::make_device_uvector_async(new_null_precedence, stream); auto d_depths = detail::make_device_uvector_async(verticalized_col_depths, stream); - return std::shared_ptr(new preprocessed_table( - std::move(d_t), std::move(d_column_order), std::move(d_null_precedence), std::move(d_depths))); + if (detail::has_nested_columns(t)) { + auto [dremel_data, d_dremel_device_view] = list_lex_preprocess(verticalized_lhs, stream); + return std::shared_ptr( + new preprocessed_table(std::move(d_t), + std::move(d_column_order), + std::move(d_null_precedence), + std::move(d_depths), + std::move(dremel_data), + std::move(d_dremel_device_view))); + } else { + return std::shared_ptr(new preprocessed_table(std::move(d_t), + std::move(d_column_order), + std::move(d_null_precedence), + std::move(d_depths))); + } } two_table_comparator::two_table_comparator(table_view const& left, diff --git a/cpp/src/table/table_view.cpp b/cpp/src/table/table_view.cpp index a413c8fe65b..0d1cabfd4f6 100644 --- a/cpp/src/table/table_view.cpp +++ b/cpp/src/table/table_view.cpp @@ -134,5 +134,11 @@ template bool is_relationally_comparable(table_view const& lhs, tabl template bool is_relationally_comparable(mutable_table_view const& lhs, mutable_table_view const& rhs); +bool has_nested_columns(table_view const& table) +{ + return std::any_of( + table.begin(), table.end(), [](column_view const& col) { return is_nested(col.type()); }); +} + } // namespace detail } // namespace cudf diff --git a/cpp/src/text/stemmer.cu b/cpp/src/text/stemmer.cu index 9995c5103de..cdf87967a0d 100644 --- a/cpp/src/text/stemmer.cu +++ b/cpp/src/text/stemmer.cu @@ -117,6 +117,7 @@ std::unique_ptr is_letter(cudf::strings_column_view const& strings thrust::make_counting_iterator(strings.size()), results->mutable_view().data(), is_letter_fn{*strings_column, ltype, position_itr}); + results->set_null_count(strings.null_count()); return results; } @@ -226,6 +227,7 @@ std::unique_ptr porter_stemmer_measure(cudf::strings_column_view c thrust::make_counting_iterator(strings.size()), results->mutable_view().data(), porter_stemmer_measure_fn{*strings_column}); + results->set_null_count(strings.null_count()); return results; } diff --git a/cpp/src/unary/math_ops.cu b/cpp/src/unary/math_ops.cu index 7b17002dcae..448ac01babb 100644 --- a/cpp/src/unary/math_ops.cu +++ b/cpp/src/unary/math_ops.cu @@ -303,6 +303,8 @@ std::unique_ptr unary_op_with(column_view const& input, out_view.begin(), FixedPointUnaryOpFunctor{n}); + result->set_null_count(input.null_count()); + return result; } @@ -327,6 +329,7 @@ std::unique_ptr transform_fn(InputIterator begin, auto output_view = output->mutable_view(); thrust::transform(rmm::exec_policy(stream), begin, end, output_view.begin(), UFN{}); + output->set_null_count(null_count); return output; } diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index b65488e9e43..c5000bc0add 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2483,6 +2483,213 @@ TEST_F(ParquetWriterStressTest, DeviceWriteLargeTableWithValids) CUDF_TEST_EXPECT_TABLES_EQUAL(custom_tbl.tbl->view(), expected->view()); } +TEST_F(ParquetReaderTest, UserBounds) +{ + // trying to read more rows than there are should result in + // receiving the properly capped # of rows + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("TooManyRows.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(16); + auto result = cudf_io::read_parquet(read_opts); + + // we should only get back 4 rows + EXPECT_EQ(result.tbl->view().column(0).size(), 4); + } + + // trying to read past the end of the # of actual rows should result + // in empty columns. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("PastBounds.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).skip_rows(4); + auto result = cudf_io::read_parquet(read_opts); + + // we should get empty columns back + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } + + // trying to read 0 rows should result in reading the whole file + // at the moment we get back 4. when that bug gets fixed, this + // test can be flipped. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("ZeroRows.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(0); + auto result = cudf_io::read_parquet(read_opts); + + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } + + // trying to read 0 rows past the end of the # of actual rows should result + // in empty columns. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("ZeroRowsPastBounds.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) + .skip_rows(4) + .num_rows(0); + auto result = cudf_io::read_parquet(read_opts); + + // we should get empty columns back + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } +} + +TEST_F(ParquetReaderTest, UserBoundsWithNulls) +{ + // clang-format off + cudf::test::fixed_width_column_wrapper col{{1,1,1,1,1,1,1,1, 2,2,2,2,2,2,2,2, 3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4, 5,5,5,5,5,5,5,5, 6,6,6,6,6,6,6,6, 7,7,7,7,7,7,7,7, 8,8,8,8,8,8,8,8} + ,{1,1,1,0,0,0,1,1, 1,1,1,1,1,1,1,1, 0,0,0,0,0,0,0,0, 1,1,1,1,1,1,0,0, 1,0,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,0}}; + // clang-format on + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("UserBoundsWithNulls.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {1, 3}, {3, -1}, + {31, -1}, {32, -1}, {33, -1}, + {31, 5}, {32, 5}, {33, 5}, + {-1, 7}, {-1, 31}, {-1, 32}, {-1, 33}, + {62, -1}, {63, -1}, + {62, 2}, {63, 1}}; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + +TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge) +{ + constexpr int num_rows = 30 * 1000000; + + std::mt19937 gen(6747); + std::bernoulli_distribution bn(0.7f); + auto valids = + cudf::detail::make_counting_transform_iterator(0, [&](int index) { return bn(gen); }); + auto values = thrust::make_counting_iterator(0); + + cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids); + + // this file will have row groups of 1,000,000 each + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("UserBoundsWithNullsLarge.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {1613470, -1}, {1999999, -1}, + {31, 1}, {32, 1}, {33, 1}, + // deliberately span some row group boundaries + {999000, 1001}, {999000, 2000}, {2999999, 2}, {13999997, -1}, + {16785678, 3}, {22996176, 31}, + {24001231, 17}, {29000001, 989999}, {29999999, 1} }; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + +TEST_F(ParquetReaderTest, ListUserBoundsWithNullsLarge) +{ + constexpr int num_rows = 5 * 1000000; + auto colp = make_parquet_list_col(0, num_rows, 5, 8, true); + cudf::column_view col = *colp; + + // this file will have row groups of 1,000,000 each + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("ListUserBoundsWithNullsLarge.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {161470, -1}, {4499997, -1}, + {31, 1}, {32, 1}, {33, 1}, + // deliberately span some row group boundaries + {999000, 1001}, {999000, 2000}, {2999999, 2}, + {1678567, 3}, {4299676, 31}, + {4001231, 17}, {1900000, 989999}, {4999999, 1} }; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + TEST_F(ParquetReaderTest, ReorderedColumns) { { diff --git a/cpp/tests/rolling/collect_ops_test.cpp b/cpp/tests/rolling/collect_ops_test.cpp index a0af8f150e3..9dc13b2f9f7 100644 --- a/cpp/tests/rolling/collect_ops_test.cpp +++ b/cpp/tests/rolling/collect_ops_test.cpp @@ -2275,10 +2275,23 @@ TEST_F(CollectSetTest, ListTypeRollingWindow) auto const prev_column = fixed_width_column_wrapper{1, 2, 2, 2, 2}; auto const foll_column = fixed_width_column_wrapper{1, 1, 1, 1, 0}; - EXPECT_THROW(rolling_collect_set(input_column, - prev_column, - foll_column, - 1, - *make_collect_set_aggregation()), - cudf::logic_error); + auto const expected = [] { + auto data = fixed_width_column_wrapper{1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 4, 5, + 6, 7, 8, 9, 6, 7, 8, 9, 10, 7, 8, 9, 10}; + auto inner_offsets = + fixed_width_column_wrapper{0, 3, 5, 8, 10, 11, 13, 14, 17, 18, 21, 22, 25, 26}; + auto outer_offsets = fixed_width_column_wrapper{0, 2, 5, 8, 11, 13}; + + auto inner_list = cudf::make_lists_column(13, inner_offsets.release(), data.release(), 0, {}); + + return cudf::make_lists_column(5, outer_offsets.release(), std::move(inner_list), 0, {}); + }(); + + auto const result = rolling_collect_set(input_column, + prev_column, + foll_column, + 1, + *make_collect_set_aggregation()); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected->view(), result->view()); } diff --git a/cpp/tests/sort/sort_test.cpp b/cpp/tests/sort/sort_test.cpp index 1dd7e21b821..4092597d8e3 100644 --- a/cpp/tests/sort/sort_test.cpp +++ b/cpp/tests/sort/sort_test.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -740,16 +741,100 @@ TYPED_TEST(Sort, ZeroSizedColumns) TYPED_TEST(Sort, WithListColumn) { - using T = int; - lists_column_wrapper lc{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}; - CUDF_EXPECT_THROW_MESSAGE(cudf::sort(table_view({lc})), - "Cannot lexicographic compare a table with a LIST column"); - - std::vector> child_cols; - child_cols.push_back(lc.release()); - structs_column_wrapper sc{std::move(child_cols), {1, 0, 1}}; - CUDF_EXPECT_THROW_MESSAGE(cudf::sort(table_view({sc})), - "Cannot lexicographic compare a table with a LIST column"); + using T = TypeParam; + if (std::is_same_v) { GTEST_SKIP(); } + + using lcw = cudf::test::lists_column_wrapper; + lcw col{ + {{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, + {{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, + {{1, 2, 3}, {}, {4, 5}, {0, 6, 0}}, + {{1, 2}, {3}, {4, 5}, {0, 6, 0}}, + {{7, 8}, {}}, + lcw{lcw{}, lcw{}, lcw{}}, + lcw{lcw{}}, + {lcw{10}}, + lcw{}, + }; + + auto expect = cudf::test::fixed_width_column_wrapper{8, 6, 5, 3, 0, 1, 2, 4, 7}; + auto result = cudf::sorted_order(cudf::table_view({col})); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expect, *result); +} + +TYPED_TEST(Sort, WithNullableListColumn) +{ + using T = TypeParam; + if (std::is_same_v) { GTEST_SKIP(); } + + using lcw = cudf::test::lists_column_wrapper; + using cudf::test::iterators::nulls_at; + lcw col{ + {{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, // 0 + {{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, nulls_at({3})}, // 1 + {{1, 2, 3}, {}, {4, 5}, {0, 6, 0}}, // 2 + {{1, 2}, {3}, {4, 5}, {0, 6, 0}}, // 3 + {{1, 2}, {3}, {4, 5}, {{0, 6, 0}, nulls_at({0})}}, // 4 + {{7, 8}, {}}, // 5 + lcw{lcw{}, lcw{}, lcw{}}, // 6 + lcw{lcw{}}, // 7 + {lcw{10}}, // 8 + lcw{}, // 9 + {{1, 2}, {3}, {4, 5}, {{0, 6, 0}, nulls_at({0, 2})}}, // 10 + {{1, 2}, {3}, {4, 5}, {{0, 7}, nulls_at({0})}}, // 11 + }; + + auto expect = + cudf::test::fixed_width_column_wrapper{9, 7, 6, 10, 4, 11, 3, 1, 0, 2, 5, 8}; + auto result = cudf::sorted_order(cudf::table_view({col})); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expect, *result); +} + +TYPED_TEST(Sort, WithSlicedListColumn) +{ + using T = TypeParam; + if (std::is_same_v) { GTEST_SKIP(); } + + using lcw = cudf::test::lists_column_wrapper; + using cudf::test::iterators::nulls_at; + lcw col{ + {{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, // + {{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, nulls_at({3})}, // 0 + {{1, 2, 3}, {}, {4, 5}, {0, 6, 0}}, // 1 + {{1, 2}, {3}, {4, 5}, {0, 6, 0}}, // 2 + {{1, 2}, {3}, {4, 5}, {{0, 6, 0}, nulls_at({0})}}, // 3 + {{7, 8}, {}}, // 4 + lcw{lcw{}, lcw{}, lcw{}}, // 5 + lcw{lcw{}}, // 6 + {lcw{10}}, // 7 + lcw{}, // 8 + {{1, 2}, {3}, {4, 5}, {{0, 6, 0}, nulls_at({0, 2})}}, // 9 + {{1, 2}, {3}, {4, 5}, {{0, 7}, nulls_at({0})}}, // + }; + + auto sliced_col = cudf::slice(col, {1, 10}); + + auto expect = cudf::test::fixed_width_column_wrapper{8, 6, 5, 3, 2, 0, 1, 4, 7}; + auto result = cudf::sorted_order(cudf::table_view({sliced_col})); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expect, *result); +} + +TYPED_TEST(Sort, WithEmptyListColumn) +{ + using T = TypeParam; + if (std::is_same_v) { GTEST_SKIP(); } + + auto L1 = cudf::make_lists_column(0, + cudf::make_empty_column(cudf::data_type(cudf::type_id::INT32)), + cudf::make_empty_column(cudf::data_type{cudf::type_id::INT64}), + 0, + {}); + auto L0 = cudf::make_lists_column( + 3, cudf::test::fixed_width_column_wrapper{0, 0, 0, 0}.release(), std::move(L1), 0, {}); + + auto expect = cudf::test::fixed_width_column_wrapper{0, 1, 2}; + auto result = cudf::sorted_order(cudf::table_view({*L0})); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expect, *result); } struct SortByKey : public BaseFixture { diff --git a/cpp/tests/strings/contains_tests.cpp b/cpp/tests/strings/contains_tests.cpp index 960ccaec274..9ca4fbb6cb7 100644 --- a/cpp/tests/strings/contains_tests.cpp +++ b/cpp/tests/strings/contains_tests.cpp @@ -460,6 +460,23 @@ TEST_F(StringsContainsTests, OverlappedClasses) } } +TEST_F(StringsContainsTests, NegatedClasses) +{ + auto input = cudf::test::strings_column_wrapper({"abcdefg", "def\tghí", "", "éeé\néeé", "ABC"}); + auto sv = cudf::strings_column_view(input); + + { + auto results = cudf::strings::count_re(sv, "[^a-f]"); + cudf::test::fixed_width_column_wrapper expected({1, 4, 0, 5, 3}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + } + { + auto results = cudf::strings::count_re(sv, "[^a-eá-é]"); + cudf::test::fixed_width_column_wrapper expected({2, 5, 0, 1, 3}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); + } +} + TEST_F(StringsContainsTests, IncompleteClassesRange) { auto input = cudf::test::strings_column_wrapper({"abc-def", "---", "", "ghijkl", "-wxyz-"}); diff --git a/cpp/tests/strings/replace_regex_tests.cpp b/cpp/tests/strings/replace_regex_tests.cpp index eb15745e473..79d968b14ad 100644 --- a/cpp/tests/strings/replace_regex_tests.cpp +++ b/cpp/tests/strings/replace_regex_tests.cpp @@ -162,6 +162,20 @@ TEST_F(StringsReplaceRegexTest, WordBoundary) CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected); } +TEST_F(StringsReplaceRegexTest, Alternation) +{ + cudf::test::strings_column_wrapper input( + {"16 6 brr 232323 1 hello 90", "123 ABC 00 2022", "abé123 4567 89xyz"}); + auto results = cudf::strings::replace_re( + cudf::strings_column_view(input), "(^|\\s)\\d+(\\s|$)", cudf::string_scalar("_")); + auto expected = + cudf::test::strings_column_wrapper({"__ brr __ hello _", "_ABC_2022", "abé123 _ 89xyz"}); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected); + results = cudf::strings::replace_re( + cudf::strings_column_view(input), "(\\s|^)\\d+($|\\s)", cudf::string_scalar("_")); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(*results, expected); +} + TEST_F(StringsReplaceRegexTest, ZeroLengthMatch) { cudf::test::strings_column_wrapper input({"DD", "zéz", "DsDs", ""}); diff --git a/cpp/tests/table/experimental_row_operator_tests.cu b/cpp/tests/table/experimental_row_operator_tests.cu index db5a064b1c2..0566f55e46d 100644 --- a/cpp/tests/table/experimental_row_operator_tests.cu +++ b/cpp/tests/table/experimental_row_operator_tests.cu @@ -54,17 +54,25 @@ auto self_comparison(cudf::table_view input, rmm::cuda_stream_view stream{cudf::default_stream_value}; auto const table_comparator = lexicographic::self_comparator{input, column_order, {}, stream}; - auto const less_comparator = table_comparator.less(cudf::nullate::NO{}, comparator); auto output = cudf::make_numeric_column( cudf::data_type(cudf::type_id::BOOL8), input.num_rows(), cudf::mask_state::UNALLOCATED); - thrust::transform(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.num_rows()), - thrust::make_counting_iterator(0), - output->mutable_view().data(), - less_comparator); + if (cudf::detail::has_nested_columns(input)) { + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.num_rows()), + thrust::make_counting_iterator(0), + output->mutable_view().data(), + table_comparator.less(cudf::nullate::NO{}, comparator)); + } else { + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.num_rows()), + thrust::make_counting_iterator(0), + output->mutable_view().data(), + table_comparator.less(cudf::nullate::NO{}, comparator)); + } return output; } @@ -78,19 +86,27 @@ auto two_table_comparison(cudf::table_view lhs, auto const table_comparator = lexicographic::two_table_comparator{lhs, rhs, column_order, {}, stream}; - auto const less_comparator = table_comparator.less(cudf::nullate::NO{}, comparator); - auto const lhs_it = cudf::experimental::row::lhs_iterator(0); - auto const rhs_it = cudf::experimental::row::rhs_iterator(0); + auto const lhs_it = cudf::experimental::row::lhs_iterator(0); + auto const rhs_it = cudf::experimental::row::rhs_iterator(0); auto output = cudf::make_numeric_column( cudf::data_type(cudf::type_id::BOOL8), lhs.num_rows(), cudf::mask_state::UNALLOCATED); - thrust::transform(rmm::exec_policy(stream), - lhs_it, - lhs_it + lhs.num_rows(), - rhs_it, - output->mutable_view().data(), - less_comparator); + if (cudf::detail::has_nested_columns(lhs) || cudf::detail::has_nested_columns(rhs)) { + thrust::transform(rmm::exec_policy(stream), + lhs_it, + lhs_it + lhs.num_rows(), + rhs_it, + output->mutable_view().data(), + table_comparator.less(cudf::nullate::NO{}, comparator)); + } else { + thrust::transform(rmm::exec_policy(stream), + lhs_it, + lhs_it + lhs.num_rows(), + rhs_it, + output->mutable_view().data(), + table_comparator.less(cudf::nullate::NO{}, comparator)); + } return output; } diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 8c8d2a2d7e4..21809ef7bd9 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -105,6 +105,7 @@ cdef extern from "cudf/io/types.hpp" \ source_info(const vector[string] &filepaths) except + source_info(const vector[host_buffer] &host_buffers) except + source_info(datasource *source) except + + source_info(const vector[datasource*] &datasources) except + cdef cppclass sink_info: io_type type diff --git a/python/cudf/cudf/_lib/interop.pyx b/python/cudf/cudf/_lib/interop.pyx index dece726270d..ee5ce165f95 100644 --- a/python/cudf/cudf/_lib/interop.pyx +++ b/python/cudf/cudf/_lib/interop.pyx @@ -87,56 +87,71 @@ cdef void dlmanaged_tensor_pycapsule_deleter(object pycap_obj): dlpack_tensor.deleter(dlpack_tensor) -cdef vector[column_metadata] gather_metadata(dict cols_dtypes) except *: +cdef vector[column_metadata] gather_metadata(object cols_dtypes) except *: """ Generates a column_metadata vector for each column. Parameters ---------- - cols_dtypes : dict - A dict mapping of column names & their dtypes. + cols_dtypes : iterable + An iterable of ``(column_name, dtype)`` pairs. """ cdef vector[column_metadata] cpp_metadata cpp_metadata.reserve(len(cols_dtypes)) if cols_dtypes is not None: - for idx, (col_name, col_dtype) in enumerate(cols_dtypes.items()): + for idx, (col_name, col_dtype) in enumerate(cols_dtypes): cpp_metadata.push_back(column_metadata(col_name.encode())) - if is_struct_dtype(col_dtype): + if is_struct_dtype(col_dtype) or is_list_dtype(col_dtype): _set_col_children_metadata(col_dtype, cpp_metadata[idx]) else: raise TypeError( - "A dictionary of column names and dtypes is required to " + "An iterable of (column_name, dtype) pairs is required to " "construct column_metadata" ) return cpp_metadata cdef _set_col_children_metadata(dtype, column_metadata& col_meta): + + cdef column_metadata element_metadata + if is_struct_dtype(dtype): - col_meta.children_meta.reserve(len(dtype.fields)) - for i, name in enumerate(dtype.fields): - value = dtype.fields[name] - col_meta.children_meta.push_back(column_metadata(name.encode())) + for name, value in dtype.fields.items(): + element_metadata = column_metadata(name.encode()) _set_col_children_metadata( - value, col_meta.children_meta[i] + value, element_metadata ) + col_meta.children_meta.push_back(element_metadata) + elif is_list_dtype(dtype): + col_meta.children_meta.reserve(2) + # Offsets - child 0 + col_meta.children_meta.push_back(column_metadata()) + + # Element column - child 1 + element_metadata = column_metadata() + _set_col_children_metadata( + dtype.element_type, element_metadata + ) + col_meta.children_meta.push_back(element_metadata) + else: + col_meta.children_meta.push_back(column_metadata()) -def to_arrow(list source_columns, dict cols_dtypes): +def to_arrow(list source_columns, object column_dtypes): """Convert a list of columns from cudf Frame to a PyArrow Table. Parameters ---------- source_columns : a list of columns to convert - cols_dtype : A dict mapping of column names & their dtypes. + column_dtypes : Iterable of ``(column_name, column_dtype)`` pairs Returns ------- pyarrow table """ - cdef vector[column_metadata] cpp_metadata = gather_metadata(cols_dtypes) + cdef vector[column_metadata] cpp_metadata = gather_metadata(column_dtypes) cdef table_view input_table_view = table_view_from_columns(source_columns) cdef shared_ptr[CTable] cpp_arrow_table diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 8e345bf969b..18b26bb5aa6 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from cpython.buffer cimport PyBUF_READ from cpython.memoryview cimport PyMemoryView_FromMemory @@ -40,6 +40,7 @@ cdef source_info make_source_info(list src) except*: cdef vector[host_buffer] c_host_buffers cdef vector[string] c_files cdef Datasource csrc + cdef vector[datasource*] c_datasources empty_buffer = False if isinstance(src[0], bytes): empty_buffer = True @@ -58,8 +59,9 @@ cdef source_info make_source_info(list src) except*: # TODO (ptaylor): Might need to update this check if accepted input types # change when UCX and/or cuStreamz support is added. elif isinstance(src[0], Datasource): - csrc = src[0] - return source_info(csrc.get_datasource()) + for csrc in src: + c_datasources.push_back(csrc.get_datasource()) + return source_info(c_datasources) elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)): # If source is a file, return source_info where type=FILEPATH if not all(os.path.isfile(file) for file in src): diff --git a/python/cudf/cudf/_lib/scalar.pyx b/python/cudf/cudf/_lib/scalar.pyx index e73e994a73d..9b422b77eeb 100644 --- a/python/cudf/cudf/_lib/scalar.pyx +++ b/python/cudf/cudf/_lib/scalar.pyx @@ -396,7 +396,7 @@ cdef _get_py_dict_from_struct(unique_ptr[scalar]& s, dtype): children=tuple(columns), size=1, ) - table = to_arrow([struct_col], {"None": dtype}) + table = to_arrow([struct_col], [("None", dtype)]) python_dict = table.to_pydict()["None"][0] return {k: _nested_na_replace([python_dict[k]])[0] for k in python_dict} @@ -428,14 +428,7 @@ cdef _get_py_list_from_list(unique_ptr[scalar]& s, dtype): cdef column_view list_col_view = (s.get()).view() cdef Column element_col = Column.from_column_view(list_col_view, None) - arrow_obj = to_arrow( - [element_col], - { - "None": dtype.element_type - if isinstance(element_col, cudf.core.column.StructColumn) - else dtype - } - )["None"] + arrow_obj = to_arrow([element_col], [("None", dtype.element_type)])["None"] result = arrow_obj.to_pylist() return _nested_na_replace(result) diff --git a/python/cudf/cudf/_typing.py b/python/cudf/cudf/_typing.py index 87988150fd3..e2ea12a0e4d 100644 --- a/python/cudf/cudf/_typing.py +++ b/python/cudf/cudf/_typing.py @@ -1,5 +1,6 @@ # Copyright (c) 2021-2022, NVIDIA CORPORATION. +import sys from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, TypeVar, Union import numpy as np @@ -9,6 +10,13 @@ if TYPE_CHECKING: import cudf +# Backwards compat: mypy >= 0.790 rejects Type[NotImplemented], but +# NotImplementedType is only introduced in 3.10 +if sys.version_info >= (3, 10): + from types import NotImplementedType +else: + NotImplementedType = Any + # Many of these are from # https://github.com/pandas-dev/pandas/blob/master/pandas/_typing.py diff --git a/python/cudf/cudf/core/_compat.py b/python/cudf/cudf/core/_compat.py index f30d229ee4e..5534d732f53 100644 --- a/python/cudf/cudf/core/_compat.py +++ b/python/cudf/cudf/core/_compat.py @@ -12,3 +12,4 @@ PANDAS_GE_133 = PANDAS_VERSION >= version.parse("1.3.3") PANDAS_GE_134 = PANDAS_VERSION >= version.parse("1.3.4") PANDAS_LT_140 = PANDAS_VERSION < version.parse("1.4.0") +PANDAS_GE_150 = PANDAS_VERSION >= version.parse("1.5.0") diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 1be0190c94f..4fe365768ef 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -233,7 +233,7 @@ def to_arrow(self) -> pa.Array: 4 ] """ - return libcudf.interop.to_arrow([self], {"None": self.dtype})[ + return libcudf.interop.to_arrow([self], [("None", self.dtype)])[ "None" ].chunk(0) diff --git a/python/cudf/cudf/core/column/lists.py b/python/cudf/cudf/core/column/lists.py index 0d5b351f69e..cf5831465a4 100644 --- a/python/cudf/cudf/core/column/lists.py +++ b/python/cudf/cudf/core/column/lists.py @@ -1,7 +1,7 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. from functools import cached_property -from typing import List, Optional, Sequence, Union +from typing import List, Optional, Sequence, Tuple, Union import numpy as np import pyarrow as pa @@ -164,6 +164,11 @@ def set_base_data(self, value): else: super().set_base_data(value) + def set_base_children(self, value: Tuple[ColumnBase, ...]): + super().set_base_children(value) + _, values = value + self._dtype = cudf.ListDtype(element_type=values.dtype) + @property def __cuda_array_interface__(self): raise NotImplementedError( diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index e64e5e50ae5..172a1ed9edc 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -34,6 +34,7 @@ ) from cudf.core.buffer import DeviceBufferLike from cudf.core.column import column, datetime +from cudf.core.column.column import ColumnBase from cudf.core.column.methods import ColumnMethods from cudf.utils.docutils import copy_docstring from cudf.utils.dtypes import can_convert_to_column @@ -3643,7 +3644,12 @@ def isempty(self) -> SeriesOrIndex: 4 False dtype: bool """ - return self._return_or_inplace((self._column == "").fillna(False)) + return self._return_or_inplace( + # mypy can't deduce that the return value of + # StringColumn.__eq__ is ColumnBase because the binops are + # dynamically added by a mixin class + cast(ColumnBase, self._column == "").fillna(False) + ) def isspace(self) -> SeriesOrIndex: r""" diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 97f06efe642..c07a88e9396 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -21,7 +21,6 @@ Optional, Set, Tuple, - Type, TypeVar, Union, ) @@ -40,7 +39,7 @@ import cudf import cudf.core.common from cudf import _lib as libcudf -from cudf._typing import ColumnLike +from cudf._typing import ColumnLike, NotImplementedType from cudf.api.types import ( _is_scalar_or_zero_d_array, is_bool_dtype, @@ -305,7 +304,7 @@ def _getitem_tuple_arg(self, arg): start = arg[0].start if start is None: start = self._frame.index[0] - df.index = as_index(start) + df.index = as_index(start, name=self._frame.index.name) else: row_selection = as_column(arg[0]) if is_bool_dtype(row_selection.dtype): @@ -313,7 +312,9 @@ def _getitem_tuple_arg(self, arg): row_selection ) else: - df.index = as_index(row_selection) + df.index = as_index( + row_selection, name=self._frame.index.name + ) # Step 4: Downcast if self._can_downcast_to_series(df, arg): return self._downcast_to_series(df, arg) @@ -1934,7 +1935,7 @@ def _make_operands_and_index_for_binop( ) -> Tuple[ Union[ Dict[Optional[str], Tuple[ColumnBase, Any, bool, Any]], - Type[NotImplemented], + NotImplementedType, ], Optional[BaseIndex], ]: @@ -3836,7 +3837,7 @@ def groupby( level=None, as_index=True, sort=False, - group_keys=True, + group_keys=False, squeeze=False, observed=False, dropna=True, diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 5bade63ddd3..dad0684b111 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -69,6 +69,12 @@ def _quantile_75(x): ``False`` for better performance. Note this does not influence the order of observations within each group. Groupby preserves the order of rows within each group. +group_keys : bool, optional + When calling apply and the ``by`` argument produces a like-indexed + result, add group keys to index to identify pieces. By default group + keys are not included when the result's index (and column) labels match + the inputs, and are included otherwise. This argument has no effect if + the result produced is not like-indexed with respect to the input. {ret} Examples -------- @@ -135,6 +141,32 @@ def _quantile_75(x): Type Wild 185.0 Captive 210.0 + +>>> df = cudf.DataFrame({{'A': 'a a b'.split(), +... 'B': [1,2,3], +... 'C': [4,6,5]}}) +>>> g1 = df.groupby('A', group_keys=False) +>>> g2 = df.groupby('A', group_keys=True) + +Notice that ``g1`` have ``g2`` have two groups, ``a`` and ``b``, and only +differ in their ``group_keys`` argument. Calling `apply` in various ways, +we can get different grouping results: + +>>> g1[['B', 'C']].apply(lambda x: x / x.sum()) + B C +0 0.333333 0.4 +1 0.666667 0.6 +2 1.000000 1.0 + +In the above, the groups are not part of the index. We can have them included +by using ``g2`` where ``group_keys=True``: + +>>> g2[['B', 'C']].apply(lambda x: x / x.sum()) + B C +A +a 0 0.333333 0.4 + 1 0.666667 0.6 +b 2 1.000000 1.0 """ ) @@ -174,7 +206,14 @@ class GroupBy(Serializable, Reducible, Scannable): _MAX_GROUPS_BEFORE_WARN = 100 def __init__( - self, obj, by=None, level=None, sort=False, as_index=True, dropna=True + self, + obj, + by=None, + level=None, + sort=False, + as_index=True, + dropna=True, + group_keys=True, ): """ Group a DataFrame or Series by a set of columns. @@ -210,6 +249,7 @@ def __init__( self._level = level self._sort = sort self._dropna = dropna + self._group_keys = group_keys if isinstance(by, _Grouping): by._obj = self.obj @@ -544,7 +584,9 @@ def _grouped(self): grouped_key_cols, grouped_value_cols, offsets = self._groupby.groups( [*self.obj._index._columns, *self.obj._columns] ) - grouped_keys = cudf.core.index._index_from_columns(grouped_key_cols) + grouped_keys = cudf.core.index._index_from_columns( + grouped_key_cols, name=self.grouping.keys.name + ) grouped_values = self.obj._from_columns_like_self( grouped_value_cols, column_names=self.obj._column_names, @@ -707,7 +749,7 @@ def mult(df): """ if not callable(function): raise TypeError(f"type {type(function)} is not callable") - group_names, offsets, _, grouped_values = self._grouped() + group_names, offsets, group_keys, grouped_values = self._grouped() ngroups = len(offsets) - 1 if ngroups > self._MAX_GROUPS_BEFORE_WARN: @@ -726,14 +768,21 @@ def mult(df): if cudf.api.types.is_scalar(chunk_results[0]): result = cudf.Series(chunk_results, index=group_names) result.index.names = self.grouping.names - elif isinstance(chunk_results[0], cudf.Series): - if isinstance(self.obj, cudf.DataFrame): + else: + if isinstance(chunk_results[0], cudf.Series) and isinstance( + self.obj, cudf.DataFrame + ): result = cudf.concat(chunk_results, axis=1).T result.index.names = self.grouping.names else: result = cudf.concat(chunk_results) - else: - result = cudf.concat(chunk_results) + if self._group_keys: + result.index = cudf.MultiIndex._from_data( + { + group_keys.name: group_keys._column, + None: grouped_values.index._column, + } + ) if self._sort: result = result.sort_index() @@ -1582,7 +1631,10 @@ class DataFrameGroupBy(GroupBy, GetAttrGetItemMixin): def __getitem__(self, key): return self.obj[key].groupby( - by=self.grouping.keys, dropna=self._dropna, sort=self._sort + by=self.grouping.keys, + dropna=self._dropna, + sort=self._sort, + group_keys=self._group_keys, ) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 0fdcabc0e8b..d1995615e0c 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -231,7 +231,7 @@ def step(self): def _num_rows(self): return len(self) - @cached_property + @cached_property # type: ignore @_cudf_nvtx_annotate def _values(self): if len(self) > 0: diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 0aac8b65fa8..9bda475589a 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -30,7 +30,7 @@ import cudf import cudf._lib as libcudf -from cudf._typing import ColumnLike, DataFrameOrSeries +from cudf._typing import ColumnLike, DataFrameOrSeries, NotImplementedType from cudf.api.types import ( _is_non_decimal_numeric_dtype, is_bool_dtype, @@ -2991,7 +2991,7 @@ def _make_operands_and_index_for_binop( ) -> Tuple[ Union[ Dict[Optional[str], Tuple[ColumnBase, Any, bool, Any]], - Type[NotImplemented], + NotImplementedType, ], Optional[cudf.BaseIndex], ]: @@ -3535,7 +3535,7 @@ def groupby( level=None, as_index=True, sort=False, - group_keys=True, + group_keys=False, squeeze=False, observed=False, dropna=True, @@ -3543,11 +3543,6 @@ def groupby( if axis not in (0, "index"): raise NotImplementedError("axis parameter is not yet implemented") - if group_keys is not True: - raise NotImplementedError( - "The group_keys keyword is not yet implemented" - ) - if squeeze is not False: raise NotImplementedError( "squeeze parameter is not yet implemented" @@ -3562,6 +3557,8 @@ def groupby( raise TypeError( "groupby() requires either by or level to be specified." ) + if group_keys is None: + group_keys = False return ( self.__class__._resampler(self, by=by) @@ -3573,6 +3570,7 @@ def groupby( as_index=as_index, dropna=dropna, sort=sort, + group_keys=group_keys, ) ) diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index 3fb4238a8b6..06a2cc33c1f 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1458,7 +1458,7 @@ def from_pandas(cls, multiindex, nan_as_null=None): ) return cls.from_frame(df, names=multiindex.names) - @cached_property + @cached_property # type: ignore @_cudf_nvtx_annotate def is_unique(self): return len(self) == len(self.unique()) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 24f07e66be8..4ab28cab5a0 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -8,7 +8,7 @@ import textwrap from collections import abc from shutil import get_terminal_size -from typing import Any, Dict, MutableMapping, Optional, Set, Tuple, Type, Union +from typing import Any, Dict, MutableMapping, Optional, Set, Tuple, Union import cupy import numpy as np @@ -19,7 +19,12 @@ import cudf from cudf import _lib as libcudf from cudf._lib.scalar import _is_null_host_scalar -from cudf._typing import ColumnLike, DataFrameOrSeries, ScalarLike +from cudf._typing import ( + ColumnLike, + DataFrameOrSeries, + NotImplementedType, + ScalarLike, +) from cudf.api.types import ( _is_non_decimal_numeric_dtype, _is_scalar_or_zero_d_array, @@ -1289,7 +1294,7 @@ def _make_operands_and_index_for_binop( ) -> Tuple[ Union[ Dict[Optional[str], Tuple[ColumnBase, Any, bool, Any]], - Type[NotImplemented], + NotImplementedType, ], Optional[BaseIndex], ]: @@ -3075,7 +3080,7 @@ def groupby( level=None, as_index=True, sort=False, - group_keys=True, + group_keys=False, squeeze=False, observed=False, dropna=True, diff --git a/python/cudf/cudf/core/single_column_frame.py b/python/cudf/cudf/core/single_column_frame.py index 1dc7ae403c3..4d4d079cf41 100644 --- a/python/cudf/cudf/core/single_column_frame.py +++ b/python/cudf/cudf/core/single_column_frame.py @@ -4,14 +4,14 @@ from __future__ import annotations import warnings -from typing import Any, Dict, Optional, Tuple, Type, TypeVar, Union +from typing import Any, Dict, Optional, Tuple, TypeVar, Union import cupy import numpy as np import pandas as pd import cudf -from cudf._typing import Dtype, ScalarLike +from cudf._typing import Dtype, NotImplementedType, ScalarLike from cudf.api.types import ( _is_scalar_or_zero_d_array, is_bool_dtype, @@ -302,7 +302,7 @@ def _make_operands_for_binop( **kwargs, ) -> Union[ Dict[Optional[str], Tuple[ColumnBase, Any, bool, Any]], - Type[NotImplemented], + NotImplementedType, ]: """Generate the dictionary of operands used for a binary operation. diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index bd5e9fe017b..c1be9cdb290 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -14,7 +14,12 @@ import cudf from cudf import DataFrame, Series -from cudf.core._compat import PANDAS_GE_110, PANDAS_GE_130, PANDAS_LT_140 +from cudf.core._compat import ( + PANDAS_GE_110, + PANDAS_GE_130, + PANDAS_GE_150, + PANDAS_LT_140, +) from cudf.testing._utils import ( DATETIME_TYPES, SIGNED_TYPES, @@ -2677,3 +2682,31 @@ def test_groupby_pct_change_empty_columns(): expected = pdf.groupby("id").pct_change() assert_eq(expected, actual) + + +@pytest.mark.parametrize( + "group_keys", + [ + None, + pytest.param( + True, + marks=pytest.mark.xfail( + condition=not PANDAS_GE_150, + reason="https://github.com/pandas-dev/pandas/pull/34998", + ), + ), + False, + ], +) +def test_groupby_group_keys(group_keys): + gdf = cudf.DataFrame( + {"A": "a a b".split(), "B": [1, 2, 3], "C": [4, 6, 5]} + ) + pdf = gdf.to_pandas() + + g_group = gdf.groupby("A", group_keys=group_keys) + p_group = pdf.groupby("A", group_keys=group_keys) + + actual = g_group[["B", "C"]].apply(lambda x: x / x.sum()) + expected = p_group[["B", "C"]].apply(lambda x: x / x.sum()) + assert_eq(actual, expected) diff --git a/python/cudf/cudf/tests/test_indexing.py b/python/cudf/cudf/tests/test_indexing.py index dbeb1204c73..d726ba16e86 100644 --- a/python/cudf/cudf/tests/test_indexing.py +++ b/python/cudf/cudf/tests/test_indexing.py @@ -286,6 +286,7 @@ def test_dataframe_loc(scalar, step): "d": np.random.random(size).astype(np.float64), } ) + pdf.index.name = "index" df = cudf.DataFrame.from_pandas(pdf) @@ -629,6 +630,9 @@ def test_dataframe_iloc(nelem): pdf["a"] = ha pdf["b"] = hb + gdf.index.name = "index" + pdf.index.name = "index" + assert_eq(gdf.iloc[-1:1], pdf.iloc[-1:1]) assert_eq(gdf.iloc[nelem - 1 : -1], pdf.iloc[nelem - 1 : -1]) assert_eq(gdf.iloc[0 : nelem - 1], pdf.iloc[0 : nelem - 1]) diff --git a/python/cudf/cudf/tests/test_list.py b/python/cudf/cudf/tests/test_list.py index a321d2b430a..aa4e5393e5b 100644 --- a/python/cudf/cudf/tests/test_list.py +++ b/python/cudf/cudf/tests/test_list.py @@ -842,6 +842,7 @@ def test_memory_usage(): 0, ), ([[[[1, 2]], [[2], [3]]], [[[2]]], [[[3]]]], 2), + ([[[{"a": 1, "b": 2, "c": 10}]]], 0), ], ) def test_nested_list_extract_host_scalars(data, idx): diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 11577fe5bb0..022f7cdd6f7 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2547,6 +2547,29 @@ def test_parquet_columns_and_index_param(index, columns): assert_eq(expected, got, check_index_type=True) +def test_parquet_nested_struct_list(): + buffer = BytesIO() + data = { + "payload": { + "Domain": { + "Name": "abc", + "Id": {"Name": "host", "Value": "127.0.0.8"}, + }, + "StreamId": "12345678", + "Duration": 10, + "Offset": 12, + "Resource": [{"Name": "ZoneName", "Value": "RAPIDS"}], + } + } + df = cudf.DataFrame({"a": cudf.Series(data)}) + + df.to_parquet(buffer) + expected = pd.read_parquet(buffer) + actual = cudf.read_parquet(buffer) + assert_eq(expected, actual) + assert_eq(actual.a.dtype, df.a.dtype) + + def test_parquet_writer_zstd(): size = 12345 expected = cudf.DataFrame( diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index b754429555d..1fdd2dae31d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -311,6 +311,38 @@ def test_read_parquet_ext( assert_eq(expect, got1) +def test_read_parquet_multi_file(s3_base, s3so, pdf): + fname_1 = "test_parquet_reader_multi_file_1.parquet" + buffer_1 = BytesIO() + pdf.to_parquet(path=buffer_1) + buffer_1.seek(0) + + fname_2 = "test_parquet_reader_multi_file_2.parquet" + buffer_2 = BytesIO() + pdf.to_parquet(path=buffer_2) + buffer_2.seek(0) + + bucket = "parquet" + with s3_context( + s3_base=s3_base, + bucket=bucket, + files={ + fname_1: buffer_1, + fname_2: buffer_2, + }, + ): + got = cudf.read_parquet( + [ + f"s3://{bucket}/{fname_1}", + f"s3://{bucket}/{fname_2}", + ], + storage_options=s3so, + ).reset_index(drop=True) + + expect = pd.concat([pdf, pdf], ignore_index=True) + assert_eq(expect, got) + + @pytest.mark.parametrize("columns", [None, ["Float", "String"]]) def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer diff --git a/setup.cfg b/setup.cfg index b5be5cda653..d196e8605b2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,33 +37,16 @@ select = [mypy] ignore_missing_imports = True - -[mypy-cudf._lib.*] -ignore_errors = True - -[mypy-cudf._version] -ignore_errors = True - -[mypy-cudf.utils.metadata.orc_column_statistics_pb2] -ignore_errors = True - -[mypy-cudf.tests.*] -ignore_errors = True - -[mypy-dask_cudf._version] -ignore_errors = True - -[mypy-dask_cudf.tests.*] -ignore_errors = True - -[mypy-custreamz._version] -ignore_errors = True - -[mypy-custreamz.tests.*] -ignore_errors = True - -[mypy-cudf_kafka._version] -ignore_errors = True - -[mypy-cudf_kafka.tests.*] -ignore_errors = True +# If we don't specify this, then mypy will check excluded files if +# they are imported by a checked file. +follow_imports = skip +exclude = (?x)( + (cudf|custreamz|cudf_kafka|dask_cudf)/_version\.py + | cudf/_lib/ + | cudf/cudf/benchmarks/ + | cudf/cudf/tests/ + | cudf/cudf/utils/metadata/orc_column_statistics_pb2.py + | custreamz/custreamz/tests/ + | dask_cudf/dask_cudf/tests/ + # This close paren cannot be in column zero otherwise the config parser barfs + )