From a5aaa52a06b83e01357169a5110cbdae65888e6d Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Tue, 1 Nov 2022 09:15:59 -0400 Subject: [PATCH 1/4] Remove default parameters for cudf::dictionary::detail functions (#12006) Removes default parameters from the `cudf::dictionary::detail` functions. Most of these were allowing for the default memory-resource which is unnecessary. One non-stream, non-mr parameter was defaulted but the default was never used. Reference #11967 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Tobias Ribizel (https://github.com/upsj) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/12006 --- cpp/include/cudf/detail/scatter.cuh | 3 +- .../cudf/dictionary/detail/concatenate.hpp | 7 ++-- cpp/include/cudf/dictionary/detail/encode.hpp | 16 ++++---- .../cudf/dictionary/detail/replace.hpp | 18 ++++----- cpp/include/cudf/dictionary/detail/search.hpp | 18 ++++----- .../cudf/dictionary/detail/update_keys.hpp | 38 +++++++++---------- cpp/src/copying/copy_range.cu | 3 +- cpp/src/copying/scatter.cu | 5 ++- cpp/src/dictionary/add_keys.cu | 9 ++--- cpp/src/dictionary/remove_keys.cu | 25 ++++++------ cpp/src/dictionary/replace.cu | 7 ++-- cpp/src/dictionary/set_keys.cu | 9 ++--- cpp/src/filling/fill.cu | 3 +- cpp/src/replace/clamp.cu | 11 ++++-- cpp/src/replace/replace.cu | 7 ++-- cpp/src/search/contains_column.cu | 8 ++-- cpp/src/search/contains_scalar.cu | 3 +- cpp/src/search/search_ordered.cu | 3 +- cpp/tests/dictionary/search_test.cpp | 18 ++++++--- cpp/tests/replace/replace_nulls_tests.cpp | 18 +++------ 20 files changed, 112 insertions(+), 117 deletions(-) diff --git a/cpp/include/cudf/detail/scatter.cuh b/cpp/include/cudf/detail/scatter.cuh index 88babe2f397..ad5a2134afe 100644 --- a/cpp/include/cudf/detail/scatter.cuh +++ b/cpp/include/cudf/detail/scatter.cuh @@ -218,7 +218,8 @@ struct column_scatterer_impl { // first combine keys so both dictionaries have the same set auto target_matched = dictionary::detail::add_keys(target, source.keys(), stream, mr); auto const target_view = dictionary_column_view(target_matched->view()); - auto source_matched = dictionary::detail::set_keys(source, target_view.keys(), stream); + auto source_matched = dictionary::detail::set_keys( + source, target_view.keys(), stream, rmm::mr::get_current_device_resource()); auto const source_view = dictionary_column_view(source_matched->view()); // now build the new indices by doing a scatter on just the matched indices diff --git a/cpp/include/cudf/dictionary/detail/concatenate.hpp b/cpp/include/cudf/dictionary/detail/concatenate.hpp index 716caa3e304..d74429484ce 100644 --- a/cpp/include/cudf/dictionary/detail/concatenate.hpp +++ b/cpp/include/cudf/dictionary/detail/concatenate.hpp @@ -37,10 +37,9 @@ namespace detail { * @param mr Device memory resource used to allocate the returned column's device memory. * @return New column with concatenated results. */ -std::unique_ptr concatenate( - host_span columns, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr concatenate(host_span columns, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace dictionary diff --git a/cpp/include/cudf/dictionary/detail/encode.hpp b/cpp/include/cudf/dictionary/detail/encode.hpp index a16d518dd0d..2aad7dd80ed 100644 --- a/cpp/include/cudf/dictionary/detail/encode.hpp +++ b/cpp/include/cudf/dictionary/detail/encode.hpp @@ -51,11 +51,10 @@ namespace detail { * @param mr Device memory resource used to allocate the returned column's device memory. * @return Returns a dictionary column. */ -std::unique_ptr encode( - column_view const& column, - data_type indices_type = data_type{type_id::UINT32}, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr encode(column_view const& column, + data_type indices_type, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Create a column by gathering the keys from the provided @@ -72,10 +71,9 @@ std::unique_ptr encode( * @param mr Device memory resource used to allocate the returned column's device memory. * @return New column with type matching the dictionary_column's keys. */ -std::unique_ptr decode( - dictionary_column_view const& dictionary_column, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr decode(dictionary_column_view const& dictionary_column, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Return minimal integer type for the given number of elements. diff --git a/cpp/include/cudf/dictionary/detail/replace.hpp b/cpp/include/cudf/dictionary/detail/replace.hpp index 85e2d9a3a85..0778baa84d6 100644 --- a/cpp/include/cudf/dictionary/detail/replace.hpp +++ b/cpp/include/cudf/dictionary/detail/replace.hpp @@ -39,11 +39,10 @@ namespace detail { * @param mr Device memory resource used to allocate the returned column's device memory. * @return New dictionary column with null rows replaced. */ -std::unique_ptr replace_nulls( - dictionary_column_view const& input, - dictionary_column_view const& replacement, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr replace_nulls(dictionary_column_view const& input, + dictionary_column_view const& replacement, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Create a new dictionary column by replacing nulls with a @@ -57,11 +56,10 @@ std::unique_ptr replace_nulls( * @param mr Device memory resource used to allocate the returned column's device memory. * @return New dictionary column with null rows replaced. */ -std::unique_ptr replace_nulls( - dictionary_column_view const& input, - scalar const& replacement, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr replace_nulls(dictionary_column_view const& input, + scalar const& replacement, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace dictionary diff --git a/cpp/include/cudf/dictionary/detail/search.hpp b/cpp/include/cudf/dictionary/detail/search.hpp index 2d65b561cd3..62059306b9a 100644 --- a/cpp/include/cudf/dictionary/detail/search.hpp +++ b/cpp/include/cudf/dictionary/detail/search.hpp @@ -31,11 +31,10 @@ namespace detail { * * @param stream CUDA stream used for device memory operations and kernel launches. */ -std::unique_ptr get_index( - dictionary_column_view const& dictionary, - scalar const& key, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr get_index(dictionary_column_view const& dictionary, + scalar const& key, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @brief Get the index for a key if it were added to the given dictionary. @@ -56,11 +55,10 @@ std::unique_ptr get_index( * @param mr Device memory resource used to allocate the returned column's device memory. * @return Numeric scalar index value of the key within the dictionary */ -std::unique_ptr get_insert_index( - dictionary_column_view const& dictionary, - scalar const& key, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr get_insert_index(dictionary_column_view const& dictionary, + scalar const& key, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace dictionary diff --git a/cpp/include/cudf/dictionary/detail/update_keys.hpp b/cpp/include/cudf/dictionary/detail/update_keys.hpp index 7f78effdd05..6fd743ad526 100644 --- a/cpp/include/cudf/dictionary/detail/update_keys.hpp +++ b/cpp/include/cudf/dictionary/detail/update_keys.hpp @@ -32,11 +32,10 @@ namespace detail { * * @param stream CUDA stream used for device memory operations and kernel launches. */ -std::unique_ptr add_keys( - dictionary_column_view const& dictionary_column, - column_view const& new_keys, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr add_keys(dictionary_column_view const& dictionary_column, + column_view const& new_keys, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc cudf::dictionary::remove_keys(dictionary_column_view const&,column_view @@ -44,11 +43,10 @@ std::unique_ptr add_keys( * * @param stream CUDA stream used for device memory operations and kernel launches. */ -std::unique_ptr remove_keys( - dictionary_column_view const& dictionary_column, - column_view const& keys_to_remove, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr remove_keys(dictionary_column_view const& dictionary_column, + column_view const& keys_to_remove, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc cudf::dictionary::remove_unused_keys(dictionary_column_view @@ -56,10 +54,9 @@ std::unique_ptr remove_keys( * * @param stream CUDA stream used for device memory operations and kernel launches. */ -std::unique_ptr remove_unused_keys( - dictionary_column_view const& dictionary_column, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr remove_unused_keys(dictionary_column_view const& dictionary_column, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc cudf::dictionary::set_keys(dictionary_column_view @@ -67,11 +64,10 @@ std::unique_ptr remove_unused_keys( * * @param stream CUDA stream used for device memory operations and kernel launches. */ -std::unique_ptr set_keys( - dictionary_column_view const& dictionary_column, - column_view const& keys, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr set_keys(dictionary_column_view const& dictionary_column, + column_view const& keys, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc @@ -82,7 +78,7 @@ std::unique_ptr set_keys( std::vector> match_dictionaries( cudf::host_span input, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + rmm::mr::device_memory_resource* mr); /** * @brief Create new dictionaries that have keys merged from dictionary columns @@ -106,7 +102,7 @@ std::vector> match_dictionaries( std::pair>, std::vector> match_dictionaries( std::vector tables, rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace dictionary diff --git a/cpp/src/copying/copy_range.cu b/cpp/src/copying/copy_range.cu index c5fa3a73e1a..dbcae354384 100644 --- a/cpp/src/copying/copy_range.cu +++ b/cpp/src/copying/copy_range.cu @@ -172,7 +172,8 @@ std::unique_ptr out_of_place_copy_range_dispatch::operator()view()); - auto source_matched = cudf::dictionary::detail::set_keys(dict_source, target_view.keys(), stream); + auto source_matched = cudf::dictionary::detail::set_keys( + dict_source, target_view.keys(), stream, rmm::mr::get_current_device_resource()); auto const source_view = cudf::dictionary_column_view(source_matched->view()); // build the new indices by calling in_place_copy_range on just the indices diff --git a/cpp/src/copying/scatter.cu b/cpp/src/copying/scatter.cu index 7b6ff80e3e4..4ebe465b945 100644 --- a/cpp/src/copying/scatter.cu +++ b/cpp/src/copying/scatter.cu @@ -184,8 +184,9 @@ struct column_scalar_scatterer_impl { stream, mr); auto dict_view = dictionary_column_view(dict_target->view()); - auto scalar_index = dictionary::detail::get_index(dict_view, source.get(), stream); - auto scalar_iter = thrust::make_permutation_iterator( + auto scalar_index = dictionary::detail::get_index( + dict_view, source.get(), stream, rmm::mr::get_current_device_resource()); + auto scalar_iter = thrust::make_permutation_iterator( indexalator_factory::make_input_iterator(*scalar_index), thrust::make_constant_iterator(0)); auto new_indices = std::make_unique(dict_view.get_indices_annotated(), stream, mr); auto target_iter = indexalator_factory::make_output_iterator(new_indices->mutable_view()); diff --git a/cpp/src/dictionary/add_keys.cu b/cpp/src/dictionary/add_keys.cu index 0c4e20aa97f..486e7d2d24b 100644 --- a/cpp/src/dictionary/add_keys.cu +++ b/cpp/src/dictionary/add_keys.cu @@ -44,11 +44,10 @@ namespace detail { * d2 is now {[a, b, c, d, e, f], [5, 0, 3, 1, 2, 2, 2, 5, 0]} * ``` */ -std::unique_ptr add_keys( - dictionary_column_view const& dictionary_column, - column_view const& new_keys, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr add_keys(dictionary_column_view const& dictionary_column, + column_view const& new_keys, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(!new_keys.has_nulls(), "Keys must not have nulls"); auto old_keys = dictionary_column.keys(); // [a,b,c,d,f] diff --git a/cpp/src/dictionary/remove_keys.cu b/cpp/src/dictionary/remove_keys.cu index 8a703959d9e..dcb877da686 100644 --- a/cpp/src/dictionary/remove_keys.cu +++ b/cpp/src/dictionary/remove_keys.cu @@ -56,11 +56,10 @@ namespace { * @param mr Device memory resource used to allocate the returned column's device memory. */ template -std::unique_ptr remove_keys_fn( - dictionary_column_view const& dictionary_column, - KeysKeeper keys_to_keep_fn, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr remove_keys_fn(dictionary_column_view const& dictionary_column, + KeysKeeper keys_to_keep_fn, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto const keys_view = dictionary_column.keys(); auto const indices_type = dictionary_column.indices().type(); @@ -148,11 +147,10 @@ std::unique_ptr remove_keys_fn( } // namespace -std::unique_ptr remove_keys( - dictionary_column_view const& dictionary_column, - column_view const& keys_to_remove, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr remove_keys(dictionary_column_view const& dictionary_column, + column_view const& keys_to_remove, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(!keys_to_remove.has_nulls(), "keys_to_remove must not have nulls"); auto const keys_view = dictionary_column.keys(); @@ -166,10 +164,9 @@ std::unique_ptr remove_keys( return remove_keys_fn(dictionary_column, key_matcher, stream, mr); } -std::unique_ptr remove_unused_keys( - dictionary_column_view const& dictionary_column, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr remove_unused_keys(dictionary_column_view const& dictionary_column, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { // locate the keys to remove auto const keys_size = dictionary_column.keys_size(); diff --git a/cpp/src/dictionary/replace.cu b/cpp/src/dictionary/replace.cu index 4acc2d124b2..7069993866c 100644 --- a/cpp/src/dictionary/replace.cu +++ b/cpp/src/dictionary/replace.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -123,8 +123,9 @@ std::unique_ptr replace_nulls(dictionary_column_view const& input, // first add the replacement to the keys so only the indices need to be processed auto input_matched = dictionary::detail::add_keys( input, make_column_from_scalar(replacement, 1, stream)->view(), stream, mr); - auto const input_view = dictionary_column_view(input_matched->view()); - auto const scalar_index = get_index(input_view, replacement, stream); + auto const input_view = dictionary_column_view(input_matched->view()); + auto const scalar_index = + get_index(input_view, replacement, stream, rmm::mr::get_current_device_resource()); // now build the new indices by doing replace-null on the updated indices auto const input_indices = input_view.get_indices_annotated(); diff --git a/cpp/src/dictionary/set_keys.cu b/cpp/src/dictionary/set_keys.cu index db0c4937582..075fb6115e3 100644 --- a/cpp/src/dictionary/set_keys.cu +++ b/cpp/src/dictionary/set_keys.cu @@ -116,11 +116,10 @@ struct dispatch_compute_indices { } // namespace // -std::unique_ptr set_keys( - dictionary_column_view const& dictionary_column, - column_view const& new_keys, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr set_keys(dictionary_column_view const& dictionary_column, + column_view const& new_keys, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(!new_keys.has_nulls(), "keys parameter must not have nulls"); auto keys = dictionary_column.keys(); diff --git a/cpp/src/filling/fill.cu b/cpp/src/filling/fill.cu index 290fff33cf6..dac36032583 100644 --- a/cpp/src/filling/fill.cu +++ b/cpp/src/filling/fill.cu @@ -171,7 +171,8 @@ std::unique_ptr out_of_place_fill_range_dispatch::operator()view()).get_indices_annotated(); // get the index of the key just added - auto index_of_value = cudf::dictionary::detail::get_index(target_matched->view(), value, stream); + auto index_of_value = cudf::dictionary::detail::get_index( + target_matched->view(), value, stream, rmm::mr::get_current_device_resource()); // now call fill using just the indices column and the new index auto new_indices = cudf::type_dispatcher(target_indices.type(), diff --git a/cpp/src/replace/clamp.cu b/cpp/src/replace/clamp.cu index 24822cc6c65..d54ebf25494 100644 --- a/cpp/src/replace/clamp.cu +++ b/cpp/src/replace/clamp.cu @@ -300,14 +300,17 @@ std::unique_ptr dispatch_clamp::operator()( return result; }(); auto matched_view = dictionary_column_view(matched_column->view()); + auto default_mr = rmm::mr::get_current_device_resource(); // get the indexes for lo_replace and for hi_replace - auto lo_replace_index = dictionary::detail::get_index(matched_view, lo_replace, stream); - auto hi_replace_index = dictionary::detail::get_index(matched_view, hi_replace, stream); + auto lo_replace_index = + dictionary::detail::get_index(matched_view, lo_replace, stream, default_mr); + auto hi_replace_index = + dictionary::detail::get_index(matched_view, hi_replace, stream, default_mr); // get the closest indexes for lo and for hi - auto lo_index = dictionary::detail::get_insert_index(matched_view, lo, stream); - auto hi_index = dictionary::detail::get_insert_index(matched_view, hi, stream); + auto lo_index = dictionary::detail::get_insert_index(matched_view, lo, stream, default_mr); + auto hi_index = dictionary::detail::get_insert_index(matched_view, hi, stream, default_mr); // call clamp with the scalar indexes and the matched indices auto matched_indices = matched_view.get_indices_annotated(); diff --git a/cpp/src/replace/replace.cu b/cpp/src/replace/replace.cu index 2a675c00b48..b3ee6e069ed 100644 --- a/cpp/src/replace/replace.cu +++ b/cpp/src/replace/replace.cu @@ -457,9 +457,10 @@ std::unique_ptr replace_kernel_forwarder::operator()view(), stream, mr); }(); auto matched_view = cudf::dictionary_column_view(matched_input->view()); - auto matched_values = cudf::dictionary::detail::set_keys(values, matched_view.keys(), stream); - auto matched_replacements = - cudf::dictionary::detail::set_keys(replacements, matched_view.keys(), stream); + auto matched_values = cudf::dictionary::detail::set_keys( + values, matched_view.keys(), stream, rmm::mr::get_current_device_resource()); + auto matched_replacements = cudf::dictionary::detail::set_keys( + replacements, matched_view.keys(), stream, rmm::mr::get_current_device_resource()); auto indices_type = matched_view.indices().type(); auto new_indices = cudf::type_dispatcher( diff --git a/cpp/src/search/contains_column.cu b/cpp/src/search/contains_column.cu index 31edf88a8cf..08bcf8d48d8 100644 --- a/cpp/src/search/contains_column.cu +++ b/cpp/src/search/contains_column.cu @@ -119,9 +119,11 @@ std::unique_ptr contains_column_dispatch::operator()( dictionary_column_view const haystack(haystack_in); dictionary_column_view const needles(needles_in); // first combine keys so both dictionaries have the same set - auto needles_matched = dictionary::detail::add_keys(needles, haystack.keys(), stream); - auto const needles_view = dictionary_column_view(needles_matched->view()); - auto haystack_matched = dictionary::detail::set_keys(haystack, needles_view.keys(), stream); + auto needles_matched = dictionary::detail::add_keys( + needles, haystack.keys(), stream, rmm::mr::get_current_device_resource()); + auto const needles_view = dictionary_column_view(needles_matched->view()); + auto haystack_matched = dictionary::detail::set_keys( + haystack, needles_view.keys(), stream, rmm::mr::get_current_device_resource()); auto const haystack_view = dictionary_column_view(haystack_matched->view()); // now just use the indices for the contains diff --git a/cpp/src/search/contains_scalar.cu b/cpp/src/search/contains_scalar.cu index 59c7a86d29c..8c500e1e757 100644 --- a/cpp/src/search/contains_scalar.cu +++ b/cpp/src/search/contains_scalar.cu @@ -128,7 +128,8 @@ bool contains_scalar_dispatch::operator()(column_view const& { auto const dict_col = cudf::dictionary_column_view(haystack); // first, find the needle in the dictionary's key set - auto const index = cudf::dictionary::detail::get_index(dict_col, needle, stream); + auto const index = cudf::dictionary::detail::get_index( + dict_col, needle, stream, rmm::mr::get_current_device_resource()); // if found, check the index is actually in the indices column return index->is_valid(stream) && cudf::type_dispatcher(dict_col.indices().type(), contains_scalar_dispatch{}, diff --git a/cpp/src/search/search_ordered.cu b/cpp/src/search/search_ordered.cu index 1da8d2313e6..bf0eb8d46f8 100644 --- a/cpp/src/search/search_ordered.cu +++ b/cpp/src/search/search_ordered.cu @@ -61,7 +61,8 @@ std::unique_ptr search_ordered(table_view const& haystack, // This utility will ensure all corresponding dictionary columns have matching keys. // It will return any new dictionary columns created as well as updated table_views. - auto const matched = dictionary::detail::match_dictionaries({haystack, needles}, stream); + auto const matched = dictionary::detail::match_dictionaries( + {haystack, needles}, stream, rmm::mr::get_current_device_resource()); auto const& matched_haystack = matched.second.front(); auto const& matched_needles = matched.second.back(); diff --git a/cpp/tests/dictionary/search_test.cpp b/cpp/tests/dictionary/search_test.cpp index 8b77d71593d..11cafa7dd8e 100644 --- a/cpp/tests/dictionary/search_test.cpp +++ b/cpp/tests/dictionary/search_test.cpp @@ -35,8 +35,10 @@ TEST_F(DictionarySearchTest, StringsColumn) result = cudf::dictionary::get_index(dictionary, cudf::string_scalar("eee")); EXPECT_FALSE(result->is_valid()); - result = cudf::dictionary::detail::get_insert_index( - dictionary, cudf::string_scalar("eee"), cudf::get_default_stream()); + result = cudf::dictionary::detail::get_insert_index(dictionary, + cudf::string_scalar("eee"), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); n_result = dynamic_cast*>(result.get()); EXPECT_EQ(uint32_t{5}, n_result->value()); } @@ -52,8 +54,10 @@ TEST_F(DictionarySearchTest, WithNulls) result = cudf::dictionary::get_index(dictionary, cudf::numeric_scalar(5)); EXPECT_FALSE(result->is_valid()); - result = cudf::dictionary::detail::get_insert_index( - dictionary, cudf::numeric_scalar(5), cudf::get_default_stream()); + result = cudf::dictionary::detail::get_insert_index(dictionary, + cudf::numeric_scalar(5), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); n_result = dynamic_cast*>(result.get()); EXPECT_EQ(uint32_t{1}, n_result->value()); } @@ -64,7 +68,8 @@ TEST_F(DictionarySearchTest, EmptyColumn) cudf::numeric_scalar key(7); auto result = cudf::dictionary::get_index(dictionary, key); EXPECT_FALSE(result->is_valid()); - result = cudf::dictionary::detail::get_insert_index(dictionary, key, cudf::get_default_stream()); + result = cudf::dictionary::detail::get_insert_index( + dictionary, key, cudf::get_default_stream(), rmm::mr::get_current_device_resource()); EXPECT_FALSE(result->is_valid()); } @@ -74,6 +79,7 @@ TEST_F(DictionarySearchTest, Errors) cudf::numeric_scalar key(7); EXPECT_THROW(cudf::dictionary::get_index(dictionary, key), cudf::logic_error); EXPECT_THROW( - cudf::dictionary::detail::get_insert_index(dictionary, key, cudf::get_default_stream()), + cudf::dictionary::detail::get_insert_index( + dictionary, key, cudf::get_default_stream(), rmm::mr::get_current_device_resource()), cudf::logic_error); } diff --git a/cpp/tests/replace/replace_nulls_tests.cpp b/cpp/tests/replace/replace_nulls_tests.cpp index 2c751a67a63..616ba9d2f64 100644 --- a/cpp/tests/replace/replace_nulls_tests.cpp +++ b/cpp/tests/replace/replace_nulls_tests.cpp @@ -21,7 +21,6 @@ #include -#include #include #include #include @@ -679,32 +678,25 @@ TEST_F(ReplaceDictionaryTest, ReplaceNullsError) auto input_one = cudf::dictionary::encode(input_one_w); auto dict_input = cudf::dictionary_column_view(input_one->view()); auto dict_repl = cudf::dictionary_column_view(replacement->view()); - EXPECT_THROW( - cudf::dictionary::detail::replace_nulls(dict_input, dict_repl, cudf::get_default_stream()), - cudf::logic_error); + EXPECT_THROW(cudf::replace_nulls(input->view(), replacement->view()), cudf::logic_error); } TEST_F(ReplaceDictionaryTest, ReplaceNullsEmpty) { cudf::test::fixed_width_column_wrapper input_empty_w({}); auto input_empty = cudf::dictionary::encode(input_empty_w); - auto dict_input = cudf::dictionary_column_view(input_empty->view()); - auto result = - cudf::dictionary::detail::replace_nulls(dict_input, dict_input, cudf::get_default_stream()); + auto result = cudf::replace_nulls(input_empty->view(), input_empty->view()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(result->view(), input_empty->view()); } TEST_F(ReplaceDictionaryTest, ReplaceNullsNoNulls) { cudf::test::fixed_width_column_wrapper input_w({1, 1, 1}); - auto input = cudf::dictionary::encode(input_w); - auto dict_input = cudf::dictionary_column_view(input->view()); - auto result = - cudf::dictionary::detail::replace_nulls(dict_input, dict_input, cudf::get_default_stream()); + auto input = cudf::dictionary::encode(input_w); + auto result = cudf::replace_nulls(input->view(), input->view()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(result->view(), input->view()); - result = cudf::dictionary::detail::replace_nulls( - dict_input, cudf::numeric_scalar(0, false), cudf::get_default_stream()); + result = cudf::replace_nulls(input->view(), cudf::numeric_scalar(0, false)); CUDF_TEST_EXPECT_COLUMNS_EQUAL(result->view(), input->view()); } From 991c86b13acdbc28ab60609bee6eba2f9eac1ecc Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Tue, 1 Nov 2022 10:00:36 -0400 Subject: [PATCH 2/4] Remove default parameters for nvtext::detail functions (#12007) Removes default parameters from the `nvtext::detail` functions. Most of these were internal default parameters which were unnecessary. The nvtext detail functions are only used within nvtext APIs. Reference #11967 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Bradley Dice (https://github.com/bdice) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/12007 --- cpp/include/nvtext/detail/tokenize.hpp | 38 +++++++++++--------------- cpp/src/text/generate_ngrams.cu | 11 ++++---- cpp/src/text/ngrams_tokenize.cu | 13 ++++----- cpp/src/text/normalize.cu | 7 ++--- 4 files changed, 30 insertions(+), 39 deletions(-) diff --git a/cpp/include/nvtext/detail/tokenize.hpp b/cpp/include/nvtext/detail/tokenize.hpp index 9c1cdbd6310..38b49e63590 100644 --- a/cpp/include/nvtext/detail/tokenize.hpp +++ b/cpp/include/nvtext/detail/tokenize.hpp @@ -35,12 +35,10 @@ namespace detail { * @param mr Device memory resource used to allocate the returned column's device memory. * @return New strings columns of tokens. */ -std::unique_ptr tokenize( - cudf::strings_column_view const& strings, - cudf::string_scalar const& delimiter = cudf::string_scalar{""}, - // Move before delimiter? - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr tokenize(cudf::strings_column_view const& strings, + cudf::string_scalar const& delimiter, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc nvtext::tokenize(strings_column_view const&,strings_column_view @@ -52,11 +50,10 @@ std::unique_ptr tokenize( * @param mr Device memory resource used to allocate the returned column's device memory. * @return New strings columns of tokens. */ -std::unique_ptr tokenize( - cudf::strings_column_view const& strings, - cudf::strings_column_view const& delimiters, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr tokenize(cudf::strings_column_view const& strings, + cudf::strings_column_view const& delimiters, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc nvtext::count_tokens(strings_column_view const&, string_scalar @@ -69,12 +66,10 @@ std::unique_ptr tokenize( * @param mr Device memory resource used to allocate the returned column's device memory. * @return New INT32 column of token counts. */ -std::unique_ptr count_tokens( - cudf::strings_column_view const& strings, - cudf::string_scalar const& delimiter = cudf::string_scalar{""}, - // Move before delimiter? - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr count_tokens(cudf::strings_column_view const& strings, + cudf::string_scalar const& delimiter, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc nvtext::count_tokens(strings_column_view const&,strings_column_view @@ -86,11 +81,10 @@ std::unique_ptr count_tokens( * @param mr Device memory resource used to allocate the returned column's device memory. * @return New INT32 column of token counts. */ -std::unique_ptr count_tokens( - cudf::strings_column_view const& strings, - cudf::strings_column_view const& delimiters, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +std::unique_ptr count_tokens(cudf::strings_column_view const& strings, + cudf::strings_column_view const& delimiters, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); } // namespace detail } // namespace nvtext diff --git a/cpp/src/text/generate_ngrams.cu b/cpp/src/text/generate_ngrams.cu index d5ff7b99344..be50ece28d5 100644 --- a/cpp/src/text/generate_ngrams.cu +++ b/cpp/src/text/generate_ngrams.cu @@ -84,12 +84,11 @@ struct ngram_generator_fn { } // namespace -std::unique_ptr generate_ngrams( - cudf::strings_column_view const& strings, - cudf::size_type ngrams = 2, - cudf::string_scalar const& separator = cudf::string_scalar{"_"}, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr generate_ngrams(cudf::strings_column_view const& strings, + cudf::size_type ngrams, + cudf::string_scalar const& separator, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(separator.is_valid(stream), "Parameter separator must be valid"); cudf::string_view const d_separator(separator.data(), separator.size()); diff --git a/cpp/src/text/ngrams_tokenize.cu b/cpp/src/text/ngrams_tokenize.cu index b0071ed9e88..f1ddcfdc6f8 100644 --- a/cpp/src/text/ngrams_tokenize.cu +++ b/cpp/src/text/ngrams_tokenize.cu @@ -134,13 +134,12 @@ struct ngram_builder_fn { // detail APIs -std::unique_ptr ngrams_tokenize( - cudf::strings_column_view const& strings, - cudf::size_type ngrams = 2, - cudf::string_scalar const& delimiter = cudf::string_scalar(""), - cudf::string_scalar const& separator = cudf::string_scalar{"_"}, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr ngrams_tokenize(cudf::strings_column_view const& strings, + cudf::size_type ngrams, + cudf::string_scalar const& delimiter, + cudf::string_scalar const& separator, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(delimiter.is_valid(stream), "Parameter delimiter must be valid"); cudf::string_view d_delimiter(delimiter.data(), delimiter.size()); diff --git a/cpp/src/text/normalize.cu b/cpp/src/text/normalize.cu index 2d5dd0ebbf8..2931370ac02 100644 --- a/cpp/src/text/normalize.cu +++ b/cpp/src/text/normalize.cu @@ -170,10 +170,9 @@ struct codepoint_to_utf8_fn { } // namespace // detail API -std::unique_ptr normalize_spaces( - cudf::strings_column_view const& strings, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) +std::unique_ptr normalize_spaces(cudf::strings_column_view const& strings, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { if (strings.is_empty()) return cudf::make_empty_column(cudf::data_type{cudf::type_id::STRING}); From 7af461cd0a4b616932e2766050a87d4cc82cd963 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 1 Nov 2022 10:03:17 -0500 Subject: [PATCH 3/4] Update cuda-python dependency to 11.7.1 (#12030) This is a mirror PR of https://github.com/rapidsai/cudf/pull/11994 to unblock gpu-ci which is currently blocked. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) - Ray Douglass (https://github.com/raydouglass) - Ashwin Srinath (https://github.com/shwina) - Bradley Dice (https://github.com/bdice) - Jordan Jacobelli (https://github.com/Ethyling) Approvers: - AJ Schmidt (https://github.com/ajschmidt8) URL: https://github.com/rapidsai/cudf/pull/12030 --- README.md | 29 ++++++++---------------- conda/environments/cudf_dev_cuda11.5.yml | 4 ++-- conda/recipes/cudf/meta.yaml | 2 +- conda/recipes/strings_udf/meta.yaml | 2 +- 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 175f5e7efa8..641ce1316b3 100644 --- a/README.md +++ b/README.md @@ -65,32 +65,21 @@ Please see the [Demo Docker Repository](https://hub.docker.com/r/rapidsai/rapids cuDF can be installed with conda ([miniconda](https://conda.io/miniconda.html), or the full [Anaconda distribution](https://www.anaconda.com/download)) from the `rapidsai` channel: -For `cudf version == 22.06` : ```bash -# for CUDA 11.0 -conda install -c rapidsai -c nvidia -c numba -c conda-forge \ - cudf=22.06 python=3.9 cudatoolkit=11.0 - -# or, for CUDA 11.2 -conda install -c rapidsai -c nvidia -c numba -c conda-forge \ - cudf=22.06 python=3.9 cudatoolkit=11.2 - +# for CUDA 11.5 +conda install -c rapidsai -c conda-forge -c nvidia \ + cudf=22.10 python=3.9 cudatoolkit=11.5 +# for CUDA 11.2 +conda install -c rapidsai -c conda-forge -c nvidia \ + cudf=22.10 python=3.9 cudatoolkit=11.2 ``` -For the nightly version of `cudf` : -```bash -# for CUDA 11.0 -conda install -c rapidsai-nightly -c nvidia -c numba -c conda-forge \ - cudf python=3.9 cudatoolkit=11.0 - -# or, for CUDA 11.2 -conda install -c rapidsai-nightly -c nvidia -c numba -c conda-forge \ - cudf python=3.9 cudatoolkit=11.2 -``` +We also provide [nightly Conda packages](https://anaconda.org/rapidsai-nightly) built from the HEAD +of our latest development branch. Note: cuDF is supported only on Linux, and with Python versions 3.8 and later. -See the [Get RAPIDS version picker](https://rapids.ai/start.html) for more OS and version info. +See the [Get RAPIDS version picker](https://rapids.ai/start.html) for more OS and version info. ## Build/Install from Source See build [instructions](CONTRIBUTING.md#setting-up-your-build-environment). diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index d7178198358..2cad2002456 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -3,10 +3,10 @@ name: cudf_dev channels: - rapidsai - - nvidia - rapidsai-nightly - dask/label/dev - conda-forge + - nvidia dependencies: - c-compiler - cxx-compiler @@ -38,7 +38,7 @@ dependencies: - ipython - pandoc<=2.0.0 - cudatoolkit=11.5 - - cuda-python>=11.5,<11.7.1 + - cuda-python>=11.7.1,<12.0 - pip - doxygen=1.8.20 - typing_extensions diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 9b8e379b25e..380b3652fbb 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -62,7 +62,7 @@ requirements: - packaging - cachetools - cubinlinker # [linux64] # CUDA enhanced compatibility. - - cuda-python >=11.5,<11.7.1 + - cuda-python >=11.7.1,<12.0 test: # [linux64] requires: # [linux64] - cudatoolkit {{ cuda_version }}.* # [linux64] diff --git a/conda/recipes/strings_udf/meta.yaml b/conda/recipes/strings_udf/meta.yaml index e29fb55ce63..a736edef24d 100644 --- a/conda/recipes/strings_udf/meta.yaml +++ b/conda/recipes/strings_udf/meta.yaml @@ -40,7 +40,7 @@ requirements: - numba >=0.54 - libcudf ={{ version }} - cudf ={{ version }} - - cudatoolkit ={{ cuda_version }} + - cudatoolkit {{ cuda_version }}.* run: - python - typing_extensions From d2367790852905a3daa0a940bb3b3f68c9f0b720 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 1 Nov 2022 11:22:52 -0500 Subject: [PATCH 4/4] Reduce/Remove reliance on `**kwargs` and `*args` in `IO` readers & writers (#12025) Resolves: #11780 This PR: - [x] Reduces reliance on `args` & `kwargs` for readers and writers when `cudf` engine is selected. However, these will have to stay for the purpose of other engines we support in few readers & writers such as `pandas` & `pyarrow` engines. - [x] Fixes some bugs where dead parameters were still being used. - [x] Fixes some bugs where parameters weren't being passed until the cython later in the first place. - [x] Updates docs related to newly exposed parameters. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) - Bradley Dice (https://github.com/bdice) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/12025 --- python/cudf/cudf/_lib/orc.pyx | 23 +- python/cudf/cudf/_lib/parquet.pyx | 6 +- python/cudf/cudf/core/dataframe.py | 70 +++++- python/cudf/cudf/io/avro.py | 16 +- python/cudf/cudf/io/csv.py | 7 +- python/cudf/cudf/io/json.py | 74 +++--- python/cudf/cudf/io/orc.py | 19 +- python/cudf/cudf/io/parquet.py | 220 ++++++++++++----- python/cudf/cudf/io/text.py | 4 +- python/cudf/cudf/tests/test_orc.py | 21 +- python/cudf/cudf/tests/test_s3.py | 1 - python/cudf/cudf/utils/ioutils.py | 291 ++++++++++++++++------- python/dask_cudf/dask_cudf/io/parquet.py | 41 +++- 13 files changed, 576 insertions(+), 217 deletions(-) diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index f57e4e8f281..cb364c86dd6 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -120,15 +120,15 @@ cpdef read_orc(object filepaths_or_buffers, c_result = move(libcudf_read_orc(c_orc_reader_options)) names = [name.decode() for name in c_result.metadata.column_names] - actual_index_names, names, is_range_index, reset_index_name, range_idx = \ - _get_index_from_metadata(c_result.metadata.user_data, - names, - skip_rows, - num_rows) + actual_index_names, col_names, is_range_index, reset_index_name, \ + range_idx = _get_index_from_metadata(c_result.metadata.user_data, + names, + skip_rows, + num_rows) data, index = data_from_unique_ptr( move(c_result.tbl), - names, + col_names if columns is None else names, actual_index_names ) @@ -238,9 +238,10 @@ cpdef write_orc(table, object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None, - object cols_as_map_type=None): + object cols_as_map_type=None, + object index=None): """ - Cython function to call into libcudf API, see `write_orc`. + Cython function to call into libcudf API, see `cudf::io::write_orc`. See Also -------- @@ -252,10 +253,12 @@ cpdef write_orc(table, cdef unique_ptr[table_input_metadata] tbl_meta cdef map[string, string] user_data user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata( - table, None) + table, index) ) - if not isinstance(table._index, cudf.RangeIndex): + if index is True or ( + index is None and not isinstance(table._index, cudf.RangeIndex) + ): tv = table_view_from_table(table) tbl_meta = make_unique[table_input_metadata](tv) for level, idx_name in enumerate(table._index.names): diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 6de84ce90c3..2667279e205 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -63,6 +63,8 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile +from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT + cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -312,7 +314,7 @@ cpdef write_parquet( object statistics="ROWGROUP", object metadata_file_path=None, object int96_timestamps=False, - object row_group_size_bytes=None, + object row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, object row_group_size_rows=None, object max_page_size_bytes=None, object max_page_size_rows=None, @@ -481,7 +483,7 @@ cdef class ParquetWriter: def __cinit__(self, object filepath_or_buffer, object index=None, object compression="snappy", str statistics="ROWGROUP", - int row_group_size_bytes=134217728, + int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, int row_group_size_rows=1000000, int max_page_size_bytes=524288, int max_page_size_rows=20000): diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 92ca5148c1e..82a4a4a8b65 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6019,11 +6019,51 @@ def select_dtypes(self, include=None, exclude=None): return df @ioutils.doc_to_parquet() - def to_parquet(self, path, *args, **kwargs): + def to_parquet( + self, + path, + engine="cudf", + compression="snappy", + index=None, + partition_cols=None, + partition_file_name=None, + partition_offsets=None, + statistics="ROWGROUP", + metadata_file_path=None, + int96_timestamps=False, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, + storage_options=None, + return_metadata=False, + *args, + **kwargs, + ): """{docstring}""" from cudf.io import parquet - return parquet.to_parquet(self, path, *args, **kwargs) + return parquet.to_parquet( + self, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + partition_file_name=partition_file_name, + partition_offsets=partition_offsets, + statistics=statistics, + metadata_file_path=metadata_file_path, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + storage_options=storage_options, + return_metadata=return_metadata, + *args, + **kwargs, + ) @ioutils.doc_to_feather() def to_feather(self, path, *args, **kwargs): @@ -6066,11 +6106,33 @@ def to_csv( ) @ioutils.doc_to_orc() - def to_orc(self, fname, compression="snappy", *args, **kwargs): + def to_orc( + self, + fname, + compression="snappy", + statistics="ROWGROUP", + stripe_size_bytes=None, + stripe_size_rows=None, + row_index_stride=None, + cols_as_map_type=None, + storage_options=None, + index=None, + ): """{docstring}""" from cudf.io import orc - orc.to_orc(self, fname, compression, *args, **kwargs) + return orc.to_orc( + df=self, + fname=fname, + compression=compression, + statistics=statistics, + stripe_size_bytes=stripe_size_bytes, + stripe_size_rows=stripe_size_rows, + row_index_stride=row_index_stride, + cols_as_map_type=cols_as_map_type, + storage_options=storage_options, + index=index, + ) @_cudf_nvtx_annotate def stack(self, level=-1, dropna=True): diff --git a/python/cudf/cudf/io/avro.py b/python/cudf/cudf/io/avro.py index 66c5c1c5a56..aaafe60d03f 100644 --- a/python/cudf/cudf/io/avro.py +++ b/python/cudf/cudf/io/avro.py @@ -1,4 +1,7 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. + +import warnings + import cudf from cudf import _lib as libcudf from cudf.utils import ioutils @@ -11,13 +14,13 @@ def read_avro( columns=None, skiprows=None, num_rows=None, - **kwargs, + storage_options=None, ): """{docstring}""" is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer( path_or_data=filepath_or_buffer, - **kwargs, + storage_options=storage_options, ) if not is_single_filepath_or_buffer: raise NotImplementedError( @@ -25,12 +28,19 @@ def read_avro( ) filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( - path_or_data=filepath_or_buffer, compression=None, **kwargs + path_or_data=filepath_or_buffer, + compression=None, + storage_options=storage_options, ) if compression is not None: ValueError("URL content-encoding decompression is not supported") if engine == "cudf": + warnings.warn( + "The `engine` parameter is deprecated and will be removed in a " + "future release", + FutureWarning, + ) return cudf.DataFrame._from_data( *libcudf.avro.read_avro( filepath_or_buffer, columns, skiprows, num_rows diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 0adf432c31d..1eacbbb4458 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -61,6 +61,9 @@ def read_csv( "`use_python_file_object=False`" ) + if bytes_per_thread is None: + bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT + is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer( path_or_data=filepath_or_buffer, storage_options=storage_options, @@ -76,9 +79,7 @@ def read_csv( iotypes=(BytesIO, StringIO, NativeFile), use_python_file_object=use_python_file_object, storage_options=storage_options, - bytes_per_thread=256_000_000 - if bytes_per_thread is None - else bytes_per_thread, + bytes_per_thread=bytes_per_thread, ) if na_values is not None and is_scalar(na_values): diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 2a0ae565974..0ae02dcb62b 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -1,4 +1,5 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. + import warnings from collections import abc from io import BytesIO, StringIO @@ -17,22 +18,23 @@ def read_json( path_or_buf, engine="auto", - dtype=True, + orient=None, + dtype=None, lines=False, compression="infer", byte_range=None, keep_quotes=False, + storage_options=None, *args, **kwargs, ): """{docstring}""" - if not isinstance(dtype, (abc.Mapping, bool)): - warnings.warn( - "passing 'dtype' as list is deprecated, instead pass " - "a dict of column name and types key-value paris." - "in future versions 'dtype' can only be a dict or bool", - FutureWarning, + if dtype is not None and not isinstance(dtype, (abc.Mapping, bool)): + raise TypeError( + "'dtype' parameter only supports " + "a dict of column names and types as key-value pairs, " + f"or a bool, or None. Got {type(dtype)}" ) if engine == "cudf" and not lines: @@ -45,6 +47,20 @@ def read_json( if engine == "auto": engine = "cudf" if lines else "pandas" if engine == "cudf" or engine == "cudf_experimental": + if dtype is None: + dtype = True + + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) + # Multiple sources are passed as a list. If a single source is passed, # wrap it in a list for unified processing downstream. if not is_list_like(path_or_buf): @@ -52,9 +68,13 @@ def read_json( filepaths_or_buffers = [] for source in path_or_buf: - if ioutils.is_directory(source, **kwargs): + if ioutils.is_directory( + path_or_data=source, storage_options=storage_options + ): fs = ioutils._ensure_filesystem( - passed_filesystem=None, path=source, **kwargs + passed_filesystem=None, + path=source, + storage_options=storage_options, ) source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) @@ -64,7 +84,7 @@ def read_json( compression=compression, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, - **kwargs, + storage_options=storage_options, ) if isinstance(tmp_source, list): filepaths_or_buffers.extend(tmp_source) @@ -88,7 +108,7 @@ def read_json( if not ioutils.ensure_single_filepath_or_buffer( path_or_data=path_or_buf, - **kwargs, + storage_options=storage_options, ): raise NotImplementedError( "`read_json` does not yet support reading " @@ -100,28 +120,24 @@ def read_json( compression=compression, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, - **kwargs, + storage_options=storage_options, ) - if kwargs.get("orient") == "table": - pd_value = pd.read_json( - path_or_buf, - lines=lines, - compression=compression, - *args, - **kwargs, - ) - else: - pd_value = pd.read_json( - path_or_buf, - lines=lines, - dtype=dtype, - compression=compression, - *args, - **kwargs, - ) + pd_value = pd.read_json( + path_or_buf, + lines=lines, + dtype=dtype, + compression=compression, + storage_options=storage_options, + orient=orient, + *args, + **kwargs, + ) df = cudf.from_pandas(pd_value) + if dtype is None: + dtype = True + if dtype is True or isinstance(dtype, abc.Mapping): # There exists some dtypes in the result columns that is inferred. # Find them and map them to the default dtypes. diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index b9ce07466e5..8865bdd9d33 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -289,7 +289,8 @@ def read_orc( use_index=True, timestamp_type=None, use_python_file_object=True, - **kwargs, + storage_options=None, + bytes_per_thread=None, ): """{docstring}""" from cudf import DataFrame @@ -326,11 +327,13 @@ def read_orc( filepaths_or_buffers = [] for source in filepath_or_buffer: - if ioutils.is_directory(source, **kwargs): + if ioutils.is_directory( + path_or_data=source, storage_options=storage_options + ): fs = ioutils._ensure_filesystem( passed_filesystem=None, path=source, - **kwargs, + storage_options=storage_options, ) source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) @@ -339,7 +342,8 @@ def read_orc( path_or_data=source, compression=None, use_python_file_object=use_python_file_object, - **kwargs, + storage_options=storage_options, + bytes_per_thread=bytes_per_thread, ) if compression is not None: raise ValueError( @@ -413,7 +417,8 @@ def to_orc( stripe_size_rows=None, row_index_stride=None, cols_as_map_type=None, - **kwargs, + storage_options=None, + index=None, ): """{docstring}""" @@ -434,7 +439,7 @@ def to_orc( raise TypeError("cols_as_map_type must be a list of column names.") path_or_buf = ioutils.get_writer_filepath_or_buffer( - path_or_data=fname, mode="wb", **kwargs + path_or_data=fname, mode="wb", storage_options=storage_options ) if ioutils.is_fsspec_open_file(path_or_buf): with path_or_buf as file_obj: @@ -448,6 +453,7 @@ def to_orc( stripe_size_rows, row_index_stride, cols_as_map_type, + index, ) else: liborc.write_orc( @@ -459,6 +465,7 @@ def to_orc( stripe_size_rows, row_index_stride, cols_as_map_type, + index, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7ac391c5f3d..ceb08cb8058 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,7 +9,6 @@ from typing import Dict, List, Tuple from uuid import uuid4 -import numpy as np from pyarrow import dataset as ds, parquet as pq import cudf @@ -54,12 +53,12 @@ def _write_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, - row_group_size_bytes=None, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, partitions_info=None, - **kwargs, + storage_options=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -73,7 +72,9 @@ def _write_parquet( ValueError("paths must be list-like when partitions_info provided") paths_or_bufs = [ - ioutils.get_writer_filepath_or_buffer(path, mode="wb", **kwargs) + ioutils.get_writer_filepath_or_buffer( + path_or_data=path, mode="wb", storage_options=storage_options + ) for path in paths ] common_args = { @@ -111,12 +112,19 @@ def _write_parquet( def write_to_dataset( df, root_path, + compression="snappy", filename=None, partition_cols=None, fs=None, preserve_index=False, return_metadata=False, - **kwargs, + statistics="ROWGROUP", + int96_timestamps=False, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, + storage_options=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -136,25 +144,51 @@ def write_to_dataset( df : cudf.DataFrame root_path : string, The root directory of the dataset + compression : {'snappy', 'ZSTD', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. filename : string, default None The file name to use (within each partition directory). If None, a random uuid4 hex string will be used for each file name. + partition_cols : list, + Column names by which to partition the dataset. + Columns are partitioned in the order they are given. fs : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem preserve_index : bool, default False Preserve index values in each parquet file. - partition_cols : list, - Column names by which to partition the dataset - Columns are partitioned in the order they are given return_metadata : bool, default False Return parquet metadata for written data. Returned metadata will include the file-path metadata (relative to `root_path`). - **kwargs : dict, - kwargs for to_parquet function. + int96_timestamps : bool, default False + If ``True``, write timestamps in int96 format. This will convert + timestamps from timestamp[ns], timestamp[ms], timestamp[s], and + timestamp[us] to the int96 format, which is the number of Julian + days and the number of nanoseconds since midnight of 1970-01-01. + If ``False``, timestamps will not be altered. + row_group_size_bytes: integer or None, default None + Maximum size of each stripe of the output. + If None, 134217728 (128MB) will be used. + row_group_size_rows: integer or None, default None + Maximum number of rows of each stripe of the output. + If None, 1000000 will be used. + max_page_size_bytes: integer or None, default None + Maximum uncompressed size of each page of the output. + If None, 524288 (512KB) will be used. + max_page_size_rows: integer or None, default None + Maximum number of rows of each page of the output. + If None, 20000 will be used. + + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. """ - fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) + fs = ioutils._ensure_filesystem(fs, root_path, storage_options) fs.mkdirs(root_path, exist_ok=True) if partition_cols is not None and len(partition_cols) > 0: @@ -166,31 +200,50 @@ def write_to_dataset( part_offsets, _, ) = _get_partitioned( - df, - root_path, - partition_cols, - filename, - fs, - preserve_index, - **kwargs, + df=df, + root_path=root_path, + partition_cols=partition_cols, + filename=filename, + fs=fs, + preserve_index=preserve_index, + storage_options=storage_options, ) - - if return_metadata: - kwargs["metadata_file_path"] = metadata_file_paths + metadata_file_path = metadata_file_paths if return_metadata else None metadata = to_parquet( - grouped_df, - full_paths, + df=grouped_df, + path=full_paths, + compression=compression, index=preserve_index, partition_offsets=part_offsets, - **kwargs, + storage_options=storage_options, + metadata_file_path=metadata_file_path, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, ) else: filename = filename or _generate_filename() full_path = fs.sep.join([root_path, filename]) - if return_metadata: - kwargs["metadata_file_path"] = filename - metadata = df.to_parquet(full_path, index=preserve_index, **kwargs) + + metadata_file_path = filename if return_metadata else None + + metadata = df.to_parquet( + path=full_path, + compression=compression, + index=preserve_index, + storage_options=storage_options, + metadata_file_path=metadata_file_path, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + ) return metadata @@ -361,6 +414,7 @@ def read_parquet( filepath_or_buffer, engine="cudf", columns=None, + storage_options=None, filters=None, row_groups=None, strings_to_categorical=False, @@ -368,6 +422,7 @@ def read_parquet( use_python_file_object=True, categorical_partitions=True, open_file_options=None, + bytes_per_thread=None, *args, **kwargs, ): @@ -383,6 +438,9 @@ def read_parquet( ) open_file_options = {} + if bytes_per_thread is None: + bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT + # Multiple sources are passed as a list. If a single source is passed, # wrap it in a list for unified processing downstream. if not is_list_like(filepath_or_buffer): @@ -403,7 +461,9 @@ def read_parquet( # Start by trying construct a filesystem object, so we # can apply filters on remote file-systems - fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs) + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) # Use pyarrow dataset to detect/process directory-partitioned # data and apply filters. Note that we can only support partitioned @@ -418,8 +478,8 @@ def read_parquet( partition_keys, partition_categories, ) = _process_dataset( - paths, - fs, + paths=paths, + fs=fs, filters=filters, row_groups=row_groups, categorical_partitions=categorical_partitions, @@ -431,19 +491,20 @@ def read_parquet( filepaths_or_buffers = [] if use_python_file_object: open_file_options = _default_open_file_options( - open_file_options, - columns, - row_groups, + open_file_options=open_file_options, + columns=columns, + row_groups=row_groups, fs=fs, ) - for i, source in enumerate(filepath_or_buffer): + for source in filepath_or_buffer: tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=None, fs=fs, use_python_file_object=use_python_file_object, open_file_options=open_file_options, - **kwargs, + storage_options=storage_options, + bytes_per_thread=bytes_per_thread, ) if compression is not None: @@ -571,6 +632,16 @@ def _read_parquet( # Simple helper function to dispatch between # cudf and pyarrow to read parquet data if engine == "cudf": + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) return libparquet.read_parquet( filepaths_or_buffers, columns=columns, @@ -600,16 +671,28 @@ def to_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, - row_group_size_bytes=None, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, + storage_options=None, + return_metadata=False, *args, **kwargs, ): """{docstring}""" if engine == "cudf": + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) # Ensure that no columns dtype is 'category' for col in df._column_names: if partition_cols is None or col not in partition_cols: @@ -626,34 +709,32 @@ def to_parquet( "partition_cols are provided. To request returning the " "metadata binary blob, pass `return_metadata=True`" ) - kwargs.update( - { - "compression": compression, - "statistics": statistics, - "int96_timestamps": int96_timestamps, - "row_group_size_bytes": row_group_size_bytes, - "row_group_size_rows": row_group_size_rows, - "max_page_size_bytes": max_page_size_bytes, - "max_page_size_rows": max_page_size_rows, - } - ) + return write_to_dataset( df, filename=partition_file_name, partition_cols=partition_cols, root_path=path, preserve_index=index, - **kwargs, + compression=compression, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + return_metadata=return_metadata, + storage_options=storage_options, ) - if partition_offsets: - kwargs["partitions_info"] = list( - zip( - partition_offsets, - np.roll(partition_offsets, -1) - partition_offsets, - ) - )[:-1] - + partition_info = ( + [ + (i, j - i) + for i, j in zip(partition_offsets, partition_offsets[1:]) + ] + if partition_offsets is not None + else None + ) return _write_parquet( df, paths=path if is_list_like(path) else [path], @@ -666,7 +747,8 @@ def to_parquet( row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, - **kwargs, + partitions_info=partition_info, + storage_options=storage_options, ) else: @@ -730,9 +812,11 @@ def _get_partitioned( filename=None, fs=None, preserve_index=False, - **kwargs, + storage_options=None, ): - fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) + fs = ioutils._ensure_filesystem( + fs, root_path, storage_options=storage_options + ) fs.mkdirs(root_path, exist_ok=True) part_names, grouped_df, part_offsets = _get_groups_and_offsets( @@ -872,6 +956,13 @@ class ParquetDatasetWriter: file_name_prefix : str This is a prefix to file names generated only when `max_file_size` is specified. + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. Examples @@ -915,7 +1006,7 @@ def __init__( statistics="ROWGROUP", max_file_size=None, file_name_prefix=None, - **kwargs, + storage_options=None, ) -> None: if isinstance(path, str) and path.startswith("s3://"): self.fs_meta = {"is_s3": True, "actual_path": path} @@ -938,7 +1029,7 @@ def __init__( # Map of partition_col values to their ParquetWriter's index # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} - self.kwargs = kwargs + self.storage_options = storage_options self.filename = file_name_prefix self.max_file_size = max_file_size if max_file_size is not None: @@ -961,7 +1052,7 @@ def write_table(self, df): partition_cols=self.partition_cols, preserve_index=self.common_args["index"], ) - fs = ioutils._ensure_filesystem(None, self.path) + fs = ioutils._ensure_filesystem(None, self.path, None) fs.mkdirs(self.path, exist_ok=True) full_paths = [] @@ -1044,10 +1135,11 @@ def write_table(self, df): ) existing_cw_batch = defaultdict(dict) new_cw_paths = [] + partition_info = [(i, j - i) for i, j in zip(offsets, offsets[1:])] for path, part_info, meta_path in zip( paths, - zip(offsets, np.roll(offsets, -1) - offsets), + partition_info, metadata_file_paths, ): if path in self.path_cw_map: # path is a currently open file @@ -1097,7 +1189,7 @@ def close(self, return_metadata=False): local_path = self.path s3_path = self.fs_meta["actual_path"] s3_file, _ = ioutils._get_filesystem_and_paths( - s3_path, **self.kwargs + s3_path, storage_options=self.storage_options ) s3_file.put(local_path, s3_path, recursive=True) shutil.rmtree(self.path) diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index f341edbf6c1..eb2c7fa7ef6 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -17,7 +17,7 @@ def read_text( strip_delimiters=False, compression=None, compression_offsets=None, - **kwargs, + storage_options=None, ): """{docstring}""" @@ -28,7 +28,7 @@ def read_text( path_or_data=filepath_or_buffer, compression=None, iotypes=(BytesIO, StringIO), - **kwargs, + storage_options=storage_options, ) return cudf.Series._from_data( diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 5aa049db31a..fbd9b83330e 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1759,11 +1759,26 @@ def test_orc_writer_nvcomp(compression): assert_eq(expected, got) +@pytest.mark.parametrize("index_obj", [None, [10, 11, 12], ["x", "y", "z"]]) @pytest.mark.parametrize("index", [True, False, None]) -@pytest.mark.parametrize("columns", [None, [], ["b", "a"]]) -def test_orc_columns_and_index_param(index, columns): +@pytest.mark.parametrize( + "columns", + [ + None, + [], + pytest.param( + ["b", "a"], + marks=pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/12026" + ), + ), + ], +) +def test_orc_columns_and_index_param(index_obj, index, columns): buffer = BytesIO() - df = cudf.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + df = cudf.DataFrame( + {"a": [1, 2, 3], "b": ["a", "b", "c"]}, index=index_obj + ) df.to_orc(buffer, index=index) expected = pd.read_orc(buffer, columns=columns) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 5c06dea4ca6..d2339930b91 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -298,7 +298,6 @@ def test_read_parquet_ext( f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, - footer_sample_size=3200, columns=columns, ) if index: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 5298e470a91..ebb73ba0ca6 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -25,6 +25,9 @@ fsspec_parquet = None +_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 +_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 + _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for available sources @@ -43,12 +46,20 @@ (such as builtin `open()` file handler function or `BytesIO`). engine : ['cudf'], default 'cudf' Parser engine to use. + This parameter is deprecated. columns : list, default None If not None, only these columns will be read. skiprows : int, default None If not None, the number of rows to skip from the start of the file. num_rows : int, default None If not None, the total number of rows to read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -132,6 +143,13 @@ Parser engine to use. columns : list, default None If not None, only these columns will be read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. filters : list of tuple, list of lists of tuples default None If not None, specifies a filter predicate used to filter out row groups using statistics stored for each row group as Parquet metadata. Row groups @@ -170,6 +188,13 @@ deactivate optimized precaching, set the "method" to `None` under the "precache_options" key. Note that the `open_file_func` key can also be used to specify a custom file-open function. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -195,7 +220,8 @@ cudf.DataFrame.to_parquet cudf.read_orc """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_parquet = docfmt_partial(docstring=_docstring_read_parquet) @@ -208,14 +234,15 @@ File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. Use list of str with partition_offsets to write parts of the dataframe to different files. -compression : {'snappy', 'ZSTD', None}, default 'snappy' +compression : {{'snappy', 'ZSTD', None}}, default 'snappy' Name of the compression to use. Use ``None`` for no compression. index : bool, default None - If ``True``, include the dataframe's index(es) in the file output. If - ``False``, they will not be written to the file. If ``None``, the - engine's default behavior will be used. However, instead of being saved - as values, the ``RangeIndex`` will be stored as a range in the metadata - so it doesn’t require much space and is faster. Other indexes will + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. + If ``None``, similar to ``True`` the dataframe's index(es) will + be saved, however, instead of being saved as values any + ``RangeIndex`` will be stored as a range in the metadata so it + doesn't require much space and is faster. Other indexes will be included as columns in the file output. partition_cols : list, optional, default None Column names by which to partition the dataset @@ -228,7 +255,7 @@ partition_offsets : list, optional, default None Offsets to partition the dataframe by. Should be used when path is list of str. Should be a list of integers of size ``len(path) + 1`` -statistics : {'ROWGROUP', 'PAGE', 'COLUMN', 'NONE'}, default 'ROWGROUP' +statistics : {{'ROWGROUP', 'PAGE', 'COLUMN', 'NONE'}}, default 'ROWGROUP' Level at which column statistics should be included in file. metadata_file_path : str, optional, default None If specified, this function will return a binary blob containing the footer @@ -239,11 +266,12 @@ If ``True``, write timestamps in int96 format. This will convert timestamps from timestamp[ns], timestamp[ms], timestamp[s], and timestamp[us] to the int96 format, which is the number of Julian - days and the number of nanoseconds since midnight. If ``False``, - timestamps will not be altered. -row_group_size_bytes: integer or None, default None + days and the number of nanoseconds since midnight of 1970-01-01. + If ``False``, timestamps will not be altered. +row_group_size_bytes: integer, default {row_group_size_bytes_val} Maximum size of each stripe of the output. - If None, 134217728 (128MB) will be used. + If None, {row_group_size_bytes_val} + ({row_group_size_bytes_val_in_mb} MB) will be used. row_group_size_rows: integer or None, default None Maximum number of rows of each stripe of the output. If None, 1000000 will be used. @@ -253,15 +281,30 @@ max_page_size_rows: integer or None, default None Maximum number of rows of each page of the output. If None, 20000 will be used. -**kwargs +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +return_metadata : bool, default False + Return parquet metadata for written data. Returned metadata will + include the file path metadata (relative to `root_path`). To request metadata binary blob when using with ``partition_cols``, Pass ``return_metadata=True`` instead of specifying ``metadata_file_path`` +**kwargs + Additional parameters will be passed to execution engines other + than ``cudf``. See Also -------- cudf.read_parquet -""" +""".format( + row_group_size_bytes_val=_ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_bytes_val_in_mb=_ROW_GROUP_SIZE_BYTES_DEFAULT / 1024 / 1024, +) doc_to_parquet = docfmt_partial(docstring=_docstring_to_parquet) _docstring_merge_parquet_filemetadata = """ @@ -392,7 +435,20 @@ If True, Arrow-backed PythonFile objects will be used in place of fsspec AbstractBufferedFile objects at IO time. This option is likely to improve performance when making small reads from larger ORC files. -kwargs are passed to the engine +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -416,7 +472,8 @@ -------- cudf.DataFrame.to_orc """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_orc = docfmt_partial(docstring=_docstring_read_orc) @@ -429,8 +486,9 @@ File path or object where the ORC dataset will be stored. compression : {{ 'snappy', 'ZSTD', None }}, default 'snappy' Name of the compression to use. Use None for no compression. -enable_statistics: boolean, default True - Enable writing column statistics. +statistics: str {{ "ROWGROUP", "STRIPE", None }}, default "ROWGROUP" + The granularity with which column statistics must + be written to the file. stripe_size_bytes: integer or None, default None Maximum size of each stripe of the output. If None, 67108864 (64MB) will be used. @@ -444,6 +502,21 @@ A list of column names which should be written as map type in the ORC file. Note that this option only affects columns of ListDtype. Names of other column types will be ignored. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +index : bool, default None + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. + If ``None``, similar to ``True`` the dataframe's index(es) will + be saved, however, instead of being saved as values any + ``RangeIndex`` will be stored as a range in the metadata so it + doesn’t require much space and is faster. Other indexes will + be included as columns in the file output. See Also -------- @@ -504,10 +577,11 @@ ``'columns'``, and ``'records'``. typ : type of object to recover (series or frame), default 'frame' With cudf engine, only frame output is supported. -dtype : boolean or dict, default True +dtype : boolean or dict, default None If True, infer dtypes for all columns; if False, then don't infer dtypes at all, if a dict, provide a mapping from column names to their respective dtype (any missing columns will have their dtype inferred). Applies only to the data. + For all ``orient`` values except ``'table'``, default is ``True``. convert_axes : boolean, default True .. admonition:: Not GPU-accelerated @@ -613,6 +687,13 @@ If `True`, any string values are read literally (and wrapped in an additional set of quotes). If `False` string values are parsed into Python strings. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -1043,7 +1124,7 @@ Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to ``urllib.request.Request`` as header options. - For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and ``urllib`` for more details. bytes_per_thread : int, default None @@ -1051,7 +1132,7 @@ files in parallel. When there is a file of large size, we get slightly better throughput by decomposing it and transferring multiple "blocks" in parallel (using a python thread pool). Default allocation is - 256_000_000 bytes. + {bytes_per_thread} bytes. This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -1089,7 +1170,8 @@ -------- cudf.DataFrame.to_csv """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_csv = docfmt_partial(docstring=_docstring_read_csv) @@ -1139,7 +1221,7 @@ Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to ``urllib.request.Request`` as header options. - For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and ``urllib`` for more details. Returns @@ -1215,6 +1297,12 @@ delimiter : string, default None The delimiter that should be used for splitting text chunks into separate cudf column rows. The delimiter may be one or more characters. +byte_range : list or tuple, default None + Byte range within the input file to be read. The first number is the + offset in bytes, the second number is the range size in bytes. + The output contains all rows that start inside the byte range + (i.e. at or after the offset, and before the end at `offset + size`), + which may include rows that continue past the end. strip_delimiters : boolean, default False Unlike the `str.split()` function, `read_text` preserves the delimiter at the end of a field in output by default, meaning `a;b;c` will turn into @@ -1222,12 +1310,6 @@ Setting this option to `True` will strip these trailing delimiters, leaving only the contents between delimiters in the resulting column: `['a','b','c']` -byte_range : list or tuple, default None - Byte range within the input file to be read. The first number is the - offset in bytes, the second number is the range size in bytes. - The output contains all rows that start inside the byte range - (i.e. at or after the offset, and before the end at `offset + size`), - which may include rows that continue past the end. compression : string, default None Which compression type is the input compressed with. Currently supports only `bgzip`, and requires the path to a file as input. @@ -1238,6 +1320,13 @@ compressed file (upper 48 bits). The start offset points to the first byte to be read, the end offset points one past the last byte to be read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -1247,6 +1336,66 @@ doc_read_text = docfmt_partial(docstring=_docstring_text_datasource) +_docstring_get_reader_filepath_or_buffer = """ +Return either a filepath string to data, or a memory buffer of data. +If filepath, then the source filepath is expanded to user's environment. +If buffer, then data is returned in-memory as bytes or a ByteIO object. + +Parameters +---------- +path_or_data : str, file-like object, bytes, ByteIO + Path to data or the data itself. +compression : str + Type of compression algorithm for the content +mode : str + Mode in which file is opened +iotypes : (), default (BytesIO) + Object type to exclude from file-like check +use_python_file_object : boolean, default False + If True, Arrow-backed PythonFile objects will be used in place + of fsspec AbstractBufferedFile objects. +open_file_options : dict, optional + Optional dictionary of keyword arguments to pass to + `_open_remote_files` (used for remote storage only). +allow_raw_text_input : boolean, default False + If True, this indicates the input `path_or_data` could be a raw text + input and will not check for its existence in the filesystem. If False, + the input must be a path and an error will be raised if it does not + exist. +storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details, and for more examples on storage options + refer `here `__. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a Python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. + +Returns +------- +filepath_or_buffer : str, bytes, BytesIO, list + Filepath string or in-memory buffer of data or a + list of Filepath strings or in-memory buffers of data. +compression : str + Type of compression algorithm for the content + """.format( + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT +) + + +doc_get_reader_filepath_or_buffer = docfmt_partial( + docstring=_docstring_get_reader_filepath_or_buffer +) + + def is_url(url): """Check if a string is a valid URL to a network location. @@ -1295,13 +1444,12 @@ def _is_local_filesystem(fs): return isinstance(fs, fsspec.implementations.local.LocalFileSystem) -def ensure_single_filepath_or_buffer(path_or_data, **kwargs): +def ensure_single_filepath_or_buffer(path_or_data, storage_options=None): """Return False if `path_or_data` resolves to multiple filepaths or buffers. """ path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: fs, _, paths = get_fs_token_paths( @@ -1321,11 +1469,10 @@ def ensure_single_filepath_or_buffer(path_or_data, **kwargs): return True -def is_directory(path_or_data, **kwargs): +def is_directory(path_or_data, storage_options=None): """Returns True if the provided filepath is a directory""" path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: fs = get_fs_token_paths( @@ -1342,7 +1489,7 @@ def is_directory(path_or_data, **kwargs): return False -def _get_filesystem_and_paths(path_or_data, **kwargs): +def _get_filesystem_and_paths(path_or_data, storage_options): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the @@ -1355,7 +1502,6 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): and isinstance(stringify_pathlike(path_or_data[0]), str) ): # Ensure we are always working with a list - storage_options = kwargs.get("storage_options") if isinstance(path_or_data, list): path_or_data = [ os.path.expanduser(stringify_pathlike(source)) @@ -1472,54 +1618,21 @@ def _open_remote_files( ] +@doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, compression, mode="rb", fs=None, iotypes=(BytesIO, NativeFile), - byte_ranges=None, use_python_file_object=False, open_file_options=None, allow_raw_text_input=False, - **kwargs, + storage_options=None, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ): - """Return either a filepath string to data, or a memory buffer of data. - If filepath, then the source filepath is expanded to user's environment. - If buffer, then data is returned in-memory as bytes or a ByteIO object. - - Parameters - ---------- - path_or_data : str, file-like object, bytes, ByteIO - Path to data or the data itself. - compression : str - Type of compression algorithm for the content - mode : str - Mode in which file is opened - iotypes : (), default (BytesIO) - Object type to exclude from file-like check - byte_ranges : list, optional - List of known byte ranges that will be read from path_or_data - use_python_file_object : boolean, default False - If True, Arrow-backed PythonFile objects will be used in place - of fsspec AbstractBufferedFile objects. - open_file_options : dict, optional - Optional dictionary of key-word arguments to pass to - `_open_remote_files` (used for remote storage only). - allow_raw_text_input : boolean, default False - If True, this indicates the input `path_or_data` could be a raw text - input and will not check for its existence in the filesystem. If False, - the input must be a path and an error will be raised if it does not - exist. + """{docstring}""" - Returns - ------- - filepath_or_buffer : str, bytes, BytesIO, list - Filepath string or in-memory buffer of data or a - list of Filepath strings or in-memory buffers of data. - compression : str - Type of compression algorithm for the content - """ path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): @@ -1527,7 +1640,9 @@ def get_reader_filepath_or_buffer( # Get a filesystem object if one isn't already available paths = [path_or_data] if fs is None: - fs, paths = _get_filesystem_and_paths(path_or_data, **kwargs) + fs, paths = _get_filesystem_and_paths( + path_or_data, storage_options + ) if fs is None: return path_or_data, compression @@ -1560,7 +1675,7 @@ def get_reader_filepath_or_buffer( fpath, fs=fs, mode=mode, - **kwargs, + bytes_per_thread=bytes_per_thread, ) ) for fpath in paths @@ -1575,13 +1690,15 @@ def get_reader_filepath_or_buffer( path_or_data = ArrowPythonFile(path_or_data) else: path_or_data = BytesIO( - _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) + _fsspec_data_transfer( + path_or_data, mode=mode, bytes_per_thread=bytes_per_thread + ) ) return path_or_data, compression -def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs): +def get_writer_filepath_or_buffer(path_or_data, mode, storage_options=None): """ Return either a filepath string to data, or a open file object to the output filesystem @@ -1592,14 +1709,23 @@ def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs): Path to data or the data itself. mode : str Mode in which file is opened + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. Returns ------- filepath_or_buffer : str, Filepath string or buffer of data """ + if storage_options is None: + storage_options = {} + if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options", {}) path_or_data = os.path.expanduser(path_or_data) fs = get_fs_token_paths( path_or_data, mode=mode or "w", storage_options=storage_options @@ -1793,11 +1919,11 @@ def _prepare_filters(filters): return filters -def _ensure_filesystem(passed_filesystem, path, **kwargs): +def _ensure_filesystem(passed_filesystem, path, storage_options): if passed_filesystem is None: return get_fs_token_paths( path[0] if isinstance(path, list) else path, - storage_options=kwargs.get("storage_options", {}), + storage_options={} if storage_options is None else storage_options, )[0] return passed_filesystem @@ -1811,11 +1937,12 @@ def _fsspec_data_transfer( path_or_fob, fs=None, file_size=None, - bytes_per_thread=256_000_000, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, max_gap=64_000, mode="rb", - **kwargs, ): + if bytes_per_thread is None: + bytes_per_thread = _BYTES_PER_THREAD_DEFAULT # Require `fs` if `path_or_fob` is not file-like file_like = is_file_like(path_or_fob) @@ -1848,7 +1975,6 @@ def _fsspec_data_transfer( byte_ranges, buf, fs=fs, - **kwargs, ) return buf.tobytes() @@ -1898,7 +2024,6 @@ def _read_byte_ranges( ranges, local_buffer, fs=None, - **kwargs, ): # Simple utility to copy remote byte ranges # into a local buffer for IO in libcudf diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index e64847948cf..bd398cb9607 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -22,7 +22,11 @@ from cudf.io import write_to_dataset from cudf.io.parquet import _default_open_file_options from cudf.utils.dtypes import cudf_dtype_from_pa_type -from cudf.utils.ioutils import _is_local_filesystem, _open_remote_files +from cudf.utils.ioutils import ( + _ROW_GROUP_SIZE_BYTES_DEFAULT, + _is_local_filesystem, + _open_remote_files, +) class CudfEngine(ArrowDatasetEngine): @@ -292,24 +296,47 @@ def write_partition( preserve_index = True if partition_on: md = write_to_dataset( - df, - path, + df=df, + root_path=path, + compression=compression, filename=filename, partition_cols=partition_on, fs=fs, preserve_index=preserve_index, return_metadata=return_metadata, - **kwargs, + statistics=kwargs.get("statistics", "ROWGROUP"), + int96_timestamps=kwargs.get("int96_timestamps", False), + row_group_size_bytes=kwargs.get( + "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT + ), + row_group_size_rows=kwargs.get("row_group_size_rows", None), + max_page_size_bytes=kwargs.get("max_page_size_bytes", None), + max_page_size_rows=kwargs.get("max_page_size_rows", None), + storage_options=kwargs.get("storage_options", None), ) else: with fs.open(fs.sep.join([path, filename]), mode="wb") as out_file: if not isinstance(out_file, IOBase): out_file = BufferedWriter(out_file) md = df.to_parquet( - out_file, - compression=compression, + path=out_file, + engine=kwargs.get("engine", "cudf"), + index=kwargs.get("index", None), + partition_cols=kwargs.get("partition_cols", None), + partition_file_name=kwargs.get( + "partition_file_name", None + ), + partition_offsets=kwargs.get("partition_offsets", None), + statistics=kwargs.get("statistics", "ROWGROUP"), + int96_timestamps=kwargs.get("int96_timestamps", False), + row_group_size_bytes=kwargs.get( + "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT + ), + row_group_size_rows=kwargs.get( + "row_group_size_rows", None + ), + storage_options=kwargs.get("storage_options", None), metadata_file_path=filename if return_metadata else None, - **kwargs, ) # Return the schema needed to write the metadata if return_metadata: