diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index 76115362b6c..ad2e840c71d 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -86,6 +86,9 @@ outputs: {% else %} - {{ compiler('cuda') }} {% endif %} + # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 + # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 + - libarrow requirements: build: - cmake {{ cmake_version }} @@ -105,6 +108,12 @@ outputs: - librmm ={{ minor_version }} - libkvikio ={{ minor_version }} - dlpack {{ dlpack_version }} + # TODO: start taking libarrow's run exports again wwhen they're correct for 16.0 + # ref: https://github.com/conda-forge/arrow-cpp-feedstock/issues/1418 + - libarrow>=16.0.0,<16.1.0a0 + - libarrow-acero>=16.0.0,<16.1.0a0 + - libarrow-dataset>=16.0.0,<16.1.0a0 + - libparquet>=16.0.0,<16.1.0a0 test: commands: - test -f $PREFIX/lib/libcudf.so diff --git a/cpp/include/cudf/detail/utilities/stream_pool.hpp b/cpp/include/cudf/detail/utilities/stream_pool.hpp index 19ef26a10cb..e19cc3ec2f7 100644 --- a/cpp/include/cudf/detail/utilities/stream_pool.hpp +++ b/cpp/include/cudf/detail/utilities/stream_pool.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,11 @@ class cuda_stream_pool { */ cuda_stream_pool* create_global_cuda_stream_pool(); +/** + * @brief Get the global stream pool. + */ +cuda_stream_pool& global_cuda_stream_pool(); + /** * @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another * stream. diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 7374ffc37e6..aa4bee4fb5e 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -101,6 +101,8 @@ class json_reader_options { bool _lines = false; // Parse mixed types as a string column bool _mixed_types_as_string = false; + // Delimiter separating records in JSON lines + char _delimiter = '\n'; // Prune columns on read, selected based on the _dtypes option bool _prune_columns = false; @@ -229,6 +231,13 @@ class json_reader_options { return base_padding + num_columns * column_bytes; } + /** + * @brief Returns delimiter separating records in JSON lines + * + * @return Delimiter separating records in JSON lines + */ + char get_delimiter() const { return _delimiter; } + /** * @brief Whether to read the file as a json object per line. * @@ -340,6 +349,30 @@ class json_reader_options { */ void set_byte_range_size(size_type size) { _byte_range_size = size; } + /** + * @brief Set delimiter separating records in JSON lines + * + * @param delimiter Delimiter separating records in JSON lines + */ + void set_delimiter(char delimiter) + { + switch (delimiter) { + case '{': + case '[': + case '}': + case ']': + case ',': + case ':': + case '"': + case '\'': + case '\\': + case ' ': + case '\t': + case '\r': CUDF_FAIL("Unsupported delimiter character.", std::invalid_argument); break; + } + _delimiter = delimiter; + } + /** * @brief Set whether to read the file as a json object per line. * @@ -507,6 +540,18 @@ class json_reader_options_builder { return *this; } + /** + * @brief Set delimiter separating records in JSON lines + * + * @param delimiter Delimiter separating records in JSON lines + * @return this for chaining + */ + json_reader_options_builder& delimiter(char delimiter) + { + options.set_delimiter(delimiter); + return *this; + } + /** * @brief Set whether to read the file as a json object per line. * diff --git a/cpp/include/cudf/io/memory_resource.hpp b/cpp/include/cudf/io/memory_resource.hpp index ea79d6a3029..e31ebce4b1f 100644 --- a/cpp/include/cudf/io/memory_resource.hpp +++ b/cpp/include/cudf/io/memory_resource.hpp @@ -18,6 +18,8 @@ #include +#include + namespace cudf::io { /** @@ -41,4 +43,21 @@ rmm::host_async_resource_ref set_host_memory_resource(rmm::host_async_resource_r */ rmm::host_async_resource_ref get_host_memory_resource(); +/** + * @brief Options to configure the default host memory resource + */ +struct host_mr_options { + std::optional pool_size; ///< The size of the pool to use for the default host memory + ///< resource. If not set, the default pool size is used. +}; + +/** + * @brief Configure the size of the default host memory resource. + * + * @throws cudf::logic_error if called after the default host memory resource has been created + * + * @param opts Options to configure the default host memory resource + */ +void config_default_host_memory_resource(host_mr_options const& opts); + } // namespace cudf::io diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 5817a01c21f..e12892a2d50 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -59,8 +59,8 @@ enum class stack_behavior_t : char { PushPopWithoutReset, /// Opening brackets and braces, [, {, push onto the stack, closing brackets and braces, ], }, pop - /// from the stack. Newline characters are considered delimiters and therefore reset to an empty - /// stack. + /// from the stack. Delimiter characters are passed when the stack context is constructed to + /// reset to an empty stack. ResetOnDelimiter }; @@ -198,11 +198,13 @@ namespace detail { * within the context of a struct, a '[' represents that it is within the context of an array, and a * '_' symbol that it is at the root of the JSON. * @param[in] stack_behavior Specifies the stack's behavior + * @param[in] delimiter Specifies the delimiter to use as separator for JSON lines input * @param[in] stream The cuda stream to dispatch GPU kernels to */ void get_stack_context(device_span json_in, SymbolT* d_top_of_stack, stack_behavior_t stack_behavior, + SymbolT delimiter, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 8da1bb3ddfc..b243e4ba006 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -131,12 +131,13 @@ constexpr auto NUM_SYMBOL_GROUPS = static_cast(dfa_symbol_group_id::NU * @brief Function object to map (input_symbol,stack_context) tuples to a symbol group. */ struct SymbolPairToSymbolGroupId { + SymbolT delimiter = '\n'; CUDF_HOST_DEVICE SymbolGroupT operator()(thrust::tuple symbol) const { auto const input_symbol = thrust::get<0>(symbol); auto const stack_symbol = thrust::get<1>(symbol); return static_cast( - input_symbol == '\n' + input_symbol == delimiter ? dfa_symbol_group_id::DELIMITER : (stack_symbol == '_' ? dfa_symbol_group_id::ROOT : dfa_symbol_group_id::OTHER)); } @@ -331,7 +332,7 @@ enum class dfa_symbol_group_id : uint8_t { CLOSING_BRACKET, ///< Closing bracket SG: ] QUOTE_CHAR, ///< Quote character SG: " ESCAPE_CHAR, ///< Escape character SG: '\' - NEWLINE_CHAR, ///< Newline character SG: '\n' + DELIMITER_CHAR, ///< Delimiter character SG OTHER_SYMBOLS, ///< SG implicitly matching all other characters NUM_SYMBOL_GROUPS ///< Total number of symbol groups }; @@ -339,42 +340,64 @@ enum class dfa_symbol_group_id : uint8_t { constexpr auto TT_NUM_STATES = static_cast(dfa_states::TT_NUM_STATES); constexpr auto NUM_SYMBOL_GROUPS = static_cast(dfa_symbol_group_id::NUM_SYMBOL_GROUPS); -// The i-th string representing all the characters of a symbol group -std::array const symbol_groups{ - {{"{"}, {"["}, {"}"}, {"]"}, {"\""}, {"\\"}, {"\n"}}}; +// The DFA's starting state +constexpr auto start_state = static_cast(TT_OOS); -// Transition table for the default JSON and JSON lines formats -std::array, TT_NUM_STATES> const transition_table{ - {/* IN_STATE { [ } ] " \ \n OTHER */ - /* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}}, - /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}}, - /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}}; - -// Transition table for the JSON lines format that recovers from invalid JSON lines -std::array, TT_NUM_STATES> const - resetting_transition_table{ +template +auto get_sgid_lut(SymbolT delim) +{ + // The i-th string representing all the characters of a symbol group + std::array, NUM_SYMBOL_GROUPS - 1> symbol_groups{ + {{'{'}, {'['}, {'}'}, {']'}, {'"'}, {'\\'}, {delim}}}; + + return symbol_groups; +} + +auto get_transition_table(stack_behavior_t stack_behavior) +{ + // Transition table for the default JSON and JSON lines formats + std::array, TT_NUM_STATES> const transition_table{ {/* IN_STATE { [ } ] " \ \n OTHER */ /* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}}, - /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}}, - /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}}; - -// Translation table for the default JSON and JSON lines formats -std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const translation_table{ - {/* IN_STATE { [ } ] " \ \n OTHER */ - /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}}, - /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}}, - /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}}; - -// Translation table for the JSON lines format that recovers from invalid JSON lines -std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const - resetting_translation_table{ - {/* IN_STATE { [ } ] " \ \n OTHER */ - /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}}, - /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}, - /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}}; + /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}}, + /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}}; + + // Transition table for the JSON lines format that recovers from invalid JSON lines + std::array, TT_NUM_STATES> const + resetting_transition_table{ + {/* IN_STATE { [ } ] " \ \n OTHER */ + /* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}}, + /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}}, + /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}}; + + // Transition table specialized on the choice of whether to reset on newlines + return (stack_behavior == stack_behavior_t::ResetOnDelimiter) ? resetting_transition_table + : transition_table; +} + +auto get_translation_table(stack_behavior_t stack_behavior) +{ + // Translation table for the default JSON and JSON lines formats + std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const + translation_table{ + {/* IN_STATE { [ } ] " \ OTHER */ + /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}}, + /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}}, + /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}}; + + // Translation table for the JSON lines format that recovers from invalid JSON lines + std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const + resetting_translation_table{ + {/* IN_STATE { [ } ] " \ OTHER */ + /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}}, + /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}, + /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}}; + + // Translation table specialized on the choice of whether to reset on newlines + return stack_behavior == stack_behavior_t::ResetOnDelimiter ? resetting_translation_table + : translation_table; +} -// The DFA's starting state -constexpr auto start_state = static_cast(TT_OOS); } // namespace to_stack_op // JSON tokenizer pushdown automaton @@ -572,6 +595,7 @@ static __constant__ PdaSymbolGroupIdT tos_sg_to_pda_sgid[] = { * visibly pushdown automaton (DVPA) */ struct PdaSymbolToSymbolGroupId { + SymbolT delimiter = '\n'; template __device__ __forceinline__ PdaSymbolGroupIdT operator()(thrust::tuple symbol_pair) const @@ -593,8 +617,15 @@ struct PdaSymbolToSymbolGroupId { // The relative symbol group id of the current input symbol constexpr auto pda_sgid_lookup_size = static_cast(sizeof(tos_sg_to_pda_sgid) / sizeof(tos_sg_to_pda_sgid[0])); + // We map the delimiter character to LINE_BREAK symbol group id, and the newline character + // to OTHER. Note that delimiter cannot be any of opening(closing) brace, bracket, quote, + // escape, comma, colon or whitespace characters. + auto const symbol_position = + symbol == delimiter + ? static_cast('\n') + : (symbol == '\n' ? static_cast(delimiter) : static_cast(symbol)); PdaSymbolGroupIdT symbol_gid = - tos_sg_to_pda_sgid[min(static_cast(symbol), pda_sgid_lookup_size - 1)]; + tos_sg_to_pda_sgid[min(symbol_position, pda_sgid_lookup_size - 1)]; return stack_idx * static_cast(symbol_group_id::NUM_PDA_INPUT_SGS) + symbol_gid; } @@ -1398,6 +1429,7 @@ namespace detail { void get_stack_context(device_span json_in, SymbolT* d_top_of_stack, stack_behavior_t stack_behavior, + SymbolT delimiter, rmm::cuda_stream_view stream) { check_input_size(json_in.size()); @@ -1423,20 +1455,11 @@ void get_stack_context(device_span json_in, constexpr auto max_translation_table_size = to_stack_op::NUM_SYMBOL_GROUPS * to_stack_op::TT_NUM_STATES; - // Transition table specialized on the choice of whether to reset on newlines - const auto transition_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter) - ? to_stack_op::resetting_transition_table - : to_stack_op::transition_table; - - // Translation table specialized on the choice of whether to reset on newlines - const auto translation_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter) - ? to_stack_op::resetting_translation_table - : to_stack_op::translation_table; - auto json_to_stack_ops_fst = fst::detail::make_fst( - fst::detail::make_symbol_group_lut(to_stack_op::symbol_groups), - fst::detail::make_transition_table(transition_table), - fst::detail::make_translation_table(translation_table), + fst::detail::make_symbol_group_lut(to_stack_op::get_sgid_lut(delimiter)), + fst::detail::make_transition_table(to_stack_op::get_transition_table(stack_behavior)), + fst::detail::make_translation_table( + to_stack_op::get_translation_table(stack_behavior)), stream); // "Search" for relevant occurrence of brackets and braces that indicate the beginning/end @@ -1539,16 +1562,16 @@ std::pair, rmm::device_uvector> ge // Range of encapsulating function that parses to internal columnar data representation CUDF_FUNC_RANGE(); - auto const new_line_delimited_json = options.is_enabled_lines(); + auto const delimited_json = options.is_enabled_lines(); + auto const delimiter = options.get_delimiter(); - // (!new_line_delimited_json) => JSON - // (new_line_delimited_json and recover_from_error) => JSON_LINES_RECOVER - // (new_line_delimited_json and !recover_from_error) => JSON_LINES - auto format = new_line_delimited_json - ? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL - ? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER - : tokenizer_pda::json_format_cfg_t::JSON_LINES) - : tokenizer_pda::json_format_cfg_t::JSON; + // (!delimited_json) => JSON + // (delimited_json and recover_from_error) => JSON_LINES_RECOVER + // (delimited_json and !recover_from_error) => JSON_LINES + auto format = delimited_json ? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL + ? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER + : tokenizer_pda::json_format_cfg_t::JSON_LINES) + : tokenizer_pda::json_format_cfg_t::JSON; // Prepare for PDA transducer pass, merging input symbols with stack symbols auto const recover_from_error = (format == tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER); @@ -1559,7 +1582,7 @@ std::pair, rmm::device_uvector> ge // Identify what is the stack context for each input character (JSON-root, struct, or list) auto const stack_behavior = recover_from_error ? stack_behavior_t::ResetOnDelimiter : stack_behavior_t::PushPopWithoutReset; - get_stack_context(json_in, stack_symbols.data(), stack_behavior, stream); + get_stack_context(json_in, stack_symbols.data(), stack_behavior, delimiter, stream); // Input to the full pushdown automaton finite-state transducer, where a input symbol comprises // the combination of a character from the JSON input together with the stack context for that @@ -1573,7 +1596,7 @@ std::pair, rmm::device_uvector> ge if (recover_from_error) { auto fix_stack_of_excess_chars = fst::detail::make_fst( fst::detail::make_symbol_group_lookup_op( - fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{}), + fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{delimiter}), fst::detail::make_transition_table(fix_stack_of_excess_chars::transition_table), fst::detail::make_translation_functor(fix_stack_of_excess_chars::TransduceInputOp{}), stream); @@ -1592,8 +1615,9 @@ std::pair, rmm::device_uvector> ge constexpr auto max_translation_table_size = tokenizer_pda::NUM_PDA_SGIDS * static_cast(tokenizer_pda::pda_state_t::PD_NUM_STATES); + auto json_to_tokens_fst = fst::detail::make_fst( - fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{}), + fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{delimiter}), fst::detail::make_transition_table(tokenizer_pda::get_transition_table(format)), fst::detail::make_translation_table( tokenizer_pda::get_translation_table(recover_from_error)), diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 2f7a6131e3d..7720c073a97 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -16,10 +16,13 @@ #include "config_utils.hpp" +#include +#include #include #include #include +#include #include #include @@ -87,38 +90,198 @@ bool is_stable_enabled() { return is_all_enabled() or get_env_policy() == usage_ } // namespace nvcomp_integration -inline std::mutex& host_mr_lock() +} // namespace detail + +namespace { +class fixed_pinned_pool_memory_resource { + using upstream_mr = rmm::mr::pinned_host_memory_resource; + using host_pooled_mr = rmm::mr::pool_memory_resource; + + private: + upstream_mr upstream_mr_{}; + size_t pool_size_{0}; + // Raw pointer to avoid a segfault when the pool is destroyed on exit + host_pooled_mr* pool_{nullptr}; + void* pool_begin_{nullptr}; + void* pool_end_{nullptr}; + cuda::stream_ref stream_{cudf::detail::global_cuda_stream_pool().get_stream().value()}; + + public: + fixed_pinned_pool_memory_resource(size_t size) + : pool_size_{size}, pool_{new host_pooled_mr(upstream_mr_, size, size)} + { + if (pool_size_ == 0) { return; } + + // Allocate full size from the pinned pool to figure out the beginning and end address + pool_begin_ = pool_->allocate_async(pool_size_, stream_); + pool_end_ = static_cast(static_cast(pool_begin_) + pool_size_); + pool_->deallocate_async(pool_begin_, pool_size_, stream_); + } + + void* do_allocate_async(std::size_t bytes, std::size_t alignment, cuda::stream_ref stream) + { + if (bytes <= pool_size_) { + try { + return pool_->allocate_async(bytes, alignment, stream); + } catch (...) { + // If the pool is exhausted, fall back to the upstream memory resource + } + } + + return upstream_mr_.allocate_async(bytes, alignment, stream); + } + + void do_deallocate_async(void* ptr, + std::size_t bytes, + std::size_t alignment, + cuda::stream_ref stream) noexcept + { + if (bytes <= pool_size_ && ptr >= pool_begin_ && ptr <= pool_end_) { + pool_->deallocate_async(ptr, bytes, alignment, stream); + } else { + upstream_mr_.deallocate_async(ptr, bytes, alignment, stream); + } + } + + void* allocate_async(std::size_t bytes, cuda::stream_ref stream) + { + return do_allocate_async(bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + void* allocate_async(std::size_t bytes, std::size_t alignment, cuda::stream_ref stream) + { + return do_allocate_async(bytes, alignment, stream); + } + + void* allocate(std::size_t bytes, std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) + { + auto const result = do_allocate_async(bytes, alignment, stream_); + stream_.wait(); + return result; + } + + void deallocate_async(void* ptr, std::size_t bytes, cuda::stream_ref stream) noexcept + { + return do_deallocate_async(ptr, bytes, rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + void deallocate_async(void* ptr, + std::size_t bytes, + std::size_t alignment, + cuda::stream_ref stream) noexcept + { + return do_deallocate_async(ptr, bytes, alignment, stream); + } + + void deallocate(void* ptr, + std::size_t bytes, + std::size_t alignment = rmm::RMM_DEFAULT_HOST_ALIGNMENT) noexcept + { + deallocate_async(ptr, bytes, alignment, stream_); + stream_.wait(); + } + + bool operator==(fixed_pinned_pool_memory_resource const& other) const + { + return pool_ == other.pool_ and stream_ == other.stream_; + } + + bool operator!=(fixed_pinned_pool_memory_resource const& other) const + { + return !operator==(other); + } + + [[maybe_unused]] friend void get_property(fixed_pinned_pool_memory_resource const&, + cuda::mr::device_accessible) noexcept + { + } + + [[maybe_unused]] friend void get_property(fixed_pinned_pool_memory_resource const&, + cuda::mr::host_accessible) noexcept + { + } +}; + +static_assert(cuda::mr::resource_with, + ""); + +} // namespace + +CUDF_EXPORT rmm::host_async_resource_ref& make_default_pinned_mr(std::optional config_size) +{ + static fixed_pinned_pool_memory_resource mr = [config_size]() { + auto const size = [&config_size]() -> size_t { + if (auto const env_val = getenv("LIBCUDF_PINNED_POOL_SIZE"); env_val != nullptr) { + return std::atol(env_val); + } + + if (config_size.has_value()) { return *config_size; } + + size_t free{}, total{}; + CUDF_CUDA_TRY(cudaMemGetInfo(&free, &total)); + // 0.5% of the total device memory, capped at 100MB + return std::min(total / 200, size_t{100} * 1024 * 1024); + }(); + + // rmm requires the pool size to be a multiple of 256 bytes + auto const aligned_size = (size + 255) & ~255; + CUDF_LOG_INFO("Pinned pool size = {}", aligned_size); + + // make the pool with max size equal to the initial size + return fixed_pinned_pool_memory_resource{aligned_size}; + }(); + + static rmm::host_async_resource_ref mr_ref{mr}; + return mr_ref; +} + +CUDF_EXPORT std::mutex& host_mr_mutex() { static std::mutex map_lock; return map_lock; } -inline rmm::host_async_resource_ref default_pinned_mr() +// Must be called with the host_mr_mutex mutex held +CUDF_EXPORT rmm::host_async_resource_ref& make_host_mr(std::optional const& opts) { - static rmm::mr::pinned_host_memory_resource default_mr{}; - return default_mr; + static rmm::host_async_resource_ref* mr_ref = nullptr; + if (mr_ref == nullptr) { + mr_ref = &make_default_pinned_mr(opts ? opts->pool_size : std::nullopt); + } else { + // Throw an error if the user tries to reconfigure the default host resource + CUDF_EXPECTS(opts == std::nullopt, "The default host memory resource has already been created"); + } + + return *mr_ref; } -CUDF_EXPORT inline auto& host_mr() +// Must be called with the host_mr_mutex mutex held +CUDF_EXPORT rmm::host_async_resource_ref& host_mr() { - static rmm::host_async_resource_ref host_mr = default_pinned_mr(); - return host_mr; + static rmm::host_async_resource_ref mr_ref = make_host_mr(std::nullopt); + return mr_ref; } -} // namespace detail - rmm::host_async_resource_ref set_host_memory_resource(rmm::host_async_resource_ref mr) { - std::lock_guard lock{detail::host_mr_lock()}; - auto last_mr = detail::host_mr(); - detail::host_mr() = mr; + std::scoped_lock lock{host_mr_mutex()}; + auto last_mr = host_mr(); + host_mr() = mr; return last_mr; } rmm::host_async_resource_ref get_host_memory_resource() { - std::lock_guard lock{detail::host_mr_lock()}; - return detail::host_mr(); + std::scoped_lock lock{host_mr_mutex()}; + return host_mr(); +} + +void config_default_host_memory_resource(host_mr_options const& opts) +{ + std::scoped_lock lock{host_mr_mutex()}; + make_host_mr(opts); } } // namespace cudf::io diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index b25822f6613..35e6adf20e7 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -2434,6 +2434,84 @@ TEST_F(JsonReaderTest, MapTypes) {type_id::LIST, type_id::STRING, type_id::STRING}); } +/** + * @brief Test fixture for parametrized JSON reader tests + */ +struct JsonDelimiterParamTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for executing both nested reader and legacy JSON lines reader +INSTANTIATE_TEST_SUITE_P(JsonDelimiterParamTest, + JsonDelimiterParamTest, + ::testing::Values('\n', '\b', '\v', '\f', 'h')); + +TEST_P(JsonDelimiterParamTest, JsonLinesDelimiter) +{ + using SymbolT = char; + + SymbolT const random_delimiter = GetParam(); + + // Test input + std::string input = R"({"col1":100, "col2":1.1, "col3":"aaa"})"; + std::size_t const string_size = 400; + /* + * We are constructing a JSON lines string where each row is {"col1":100, "col2":1.1, + * "col3":"aaa"} and rows are separated by random_delimiter. Instead of concatenating lines + * linearly in O(n), we can do it in O(log n) by doubling the input in each iteration. The total + * number of such iterations is log_repetitions. + */ + std::size_t const log_repetitions = + static_cast(std::ceil(std::log2(string_size / input.size()))); + std::size_t const repetitions = 1UL << log_repetitions; + for (std::size_t i = 0; i < log_repetitions; i++) { + input = input + random_delimiter + input; + } + + cudf::io::json_reader_options json_parser_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{input.c_str(), input.size()}) + .lines(true) + .delimiter(random_delimiter); + + cudf::io::table_with_metadata result = cudf::io::read_json(json_parser_options); + + EXPECT_EQ(result.tbl->num_columns(), 3); + EXPECT_EQ(result.tbl->num_rows(), repetitions); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64); + EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::STRING); + + EXPECT_EQ(result.metadata.schema_info[0].name, "col1"); + EXPECT_EQ(result.metadata.schema_info[1].name, "col2"); + EXPECT_EQ(result.metadata.schema_info[2].name, "col3"); + + auto col1_iterator = thrust::constant_iterator(100); + auto col2_iterator = thrust::constant_iterator(1.1); + auto col3_iterator = thrust::constant_iterator("aaa"); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), + int64_wrapper(col1_iterator, col1_iterator + repetitions)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), + float64_wrapper(col2_iterator, col2_iterator + repetitions)); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + result.tbl->get_column(2), + cudf::test::strings_column_wrapper(col3_iterator, col3_iterator + repetitions)); +} + +TEST_F(JsonReaderTest, ViableDelimiter) +{ + // Test input + std::string input = R"({"col1":100, "col2":1.1, "col3":"aaa"})"; + + cudf::io::json_reader_options json_parser_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{input.c_str(), input.size()}) + .lines(true); + + json_parser_options.set_delimiter('\f'); + CUDF_EXPECT_NO_THROW(cudf::io::read_json(json_parser_options)); + + EXPECT_THROW(json_parser_options.set_delimiter('\t'), std::invalid_argument); +} + // Test case for dtype prune: // all paths, only one. // one present, another not present, nothing present diff --git a/cpp/tests/io/nested_json_test.cpp b/cpp/tests/io/nested_json_test.cpp index 112ee8fb57b..d6f800cce8b 100644 --- a/cpp/tests/io/nested_json_test.cpp +++ b/cpp/tests/io/nested_json_test.cpp @@ -148,6 +148,7 @@ TEST_F(JsonTest, StackContext) auto const stream = cudf::get_default_stream(); // Test input + char const delimiter = 'h'; std::string const input = R"( [{)" R"("category": "reference",)" R"("index:": [4,12,42],)" @@ -171,7 +172,8 @@ TEST_F(JsonTest, StackContext) // Run algorithm constexpr auto stack_behavior = cuio_json::stack_behavior_t::PushPopWithoutReset; - cuio_json::detail::get_stack_context(d_input, stack_context.device_ptr(), stack_behavior, stream); + cuio_json::detail::get_stack_context( + d_input, stack_context.device_ptr(), stack_behavior, delimiter, stream); // Copy back the results stack_context.device_to_host_async(stream); @@ -210,6 +212,7 @@ TEST_F(JsonTest, StackContextUtf8) auto const stream = cudf::get_default_stream(); // Test input + char const delimiter = 'h'; std::string const input = R"([{"a":{"year":1882,"author": "Bharathi"}, {"a":"filip ʒakotɛ"}}])"; // Prepare input & output buffers @@ -220,7 +223,8 @@ TEST_F(JsonTest, StackContextUtf8) // Run algorithm constexpr auto stack_behavior = cuio_json::stack_behavior_t::PushPopWithoutReset; - cuio_json::detail::get_stack_context(d_input, stack_context.device_ptr(), stack_behavior, stream); + cuio_json::detail::get_stack_context( + d_input, stack_context.device_ptr(), stack_behavior, delimiter, stream); // Copy back the results stack_context.device_to_host_async(stream); @@ -238,7 +242,18 @@ TEST_F(JsonTest, StackContextUtf8) CUDF_TEST_EXPECT_VECTOR_EQUAL(golden_stack_context, stack_context, stack_context.size()); } -TEST_F(JsonTest, StackContextRecovering) +/** + * @brief Test fixture for parametrized JSON reader tests + */ +struct JsonDelimiterParamTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for executing both nested reader and legacy JSON lines reader +INSTANTIATE_TEST_SUITE_P(JsonDelimiterParamTest, + JsonDelimiterParamTest, + ::testing::Values('\n', '\b', '\v', '\f', 'h')); + +TEST_P(JsonDelimiterParamTest, StackContextRecovering) { // Type used to represent the atomic symbol type used within the finite-state machine using SymbolT = char; @@ -248,13 +263,15 @@ TEST_F(JsonTest, StackContextRecovering) auto const stream = cudf::get_default_stream(); // JSON lines input that recovers on invalid lines - std::string const input = R"({"a":-2}, + char const delimiter = GetParam(); + std::string input = R"({"a":-2}, {"a": {"a":{"a":[321 {"a":[1]} {"b":123} )"; + std::replace(input.begin(), input.end(), '\n', delimiter); // Expected stack context (including stack context of the newline characters) std::string const golden_stack_context = @@ -274,7 +291,8 @@ TEST_F(JsonTest, StackContextRecovering) // Run algorithm constexpr auto stack_behavior = cuio_json::stack_behavior_t::ResetOnDelimiter; - cuio_json::detail::get_stack_context(d_input, stack_context.device_ptr(), stack_behavior, stream); + cuio_json::detail::get_stack_context( + d_input, stack_context.device_ptr(), stack_behavior, delimiter, stream); // Copy back the results stack_context.device_to_host_async(stream); @@ -287,15 +305,16 @@ TEST_F(JsonTest, StackContextRecovering) CUDF_TEST_EXPECT_VECTOR_EQUAL(golden_stack_context, stack_context, stack_context.size()); } -TEST_F(JsonTest, StackContextRecoveringFuzz) +TEST_P(JsonDelimiterParamTest, StackContextRecoveringFuzz) { // Type used to represent the atomic symbol type used within the finite-state machine using SymbolT = char; using StackSymbolT = char; - std::random_device rd; + char const delimiter = GetParam(); std::mt19937 gen(42); std::uniform_int_distribution distribution(0, 4); + constexpr std::size_t input_length = 1024 * 1024; std::string input{}; input.reserve(input_length); @@ -313,36 +332,29 @@ TEST_F(JsonTest, StackContextRecoveringFuzz) case 1: current = '['; break; case 2: current = '}'; break; case 3: current = '"'; break; - case 4: current = '\n'; break; + case 4: current = delimiter; break; } - switch (current) { - case '"': inside_quotes = !inside_quotes; break; - case '{': - if (!inside_quotes) { host_stack.push('{'); } - break; - case '[': - if (!inside_quotes) { host_stack.push('['); } - break; - case '}': - if (!inside_quotes) { - if (host_stack.size() > 0) { - // Get the proper 'pop' stack symbol - current = (host_stack.top() == '{' ? '}' : ']'); - host_stack.pop(); - } else - is_ok = false; - } - break; - case '\n': - // Increase chance to have longer lines - if (distribution(gen) == 0) { - is_ok = false; - break; - } else { - host_stack = {}; - inside_quotes = false; - break; - } + if (current == '"') + inside_quotes = !inside_quotes; + else if (current == '{' && !inside_quotes) + host_stack.push('{'); + else if (current == '[' && !inside_quotes) + host_stack.push('['); + else if (current == '}' && !inside_quotes) { + if (host_stack.size() > 0) { + // Get the proper 'pop' stack symbol + current = (host_stack.top() == '{' ? '}' : ']'); + host_stack.pop(); + } else + is_ok = false; + } else if (current == delimiter) { + // Increase chance to have longer lines + if (distribution(gen) == 0) { + is_ok = false; + } else { + host_stack = {}; + inside_quotes = false; + } } } while (!is_ok); input += current; @@ -360,24 +372,19 @@ TEST_F(JsonTest, StackContextRecoveringFuzz) expected_stack_context += host_stack.top(); } - switch (current) { - case '"': inside_quotes = !inside_quotes; break; - case '{': - if (!inside_quotes) { host_stack.push('{'); } - break; - case '[': - if (!inside_quotes) { host_stack.push('['); } - break; - case '}': - if (!inside_quotes && host_stack.size() > 0) { host_stack.pop(); } - break; - case ']': - if (!inside_quotes && host_stack.size() > 0) { host_stack.pop(); } - break; - case '\n': - host_stack = {}; - inside_quotes = false; - break; + if (current == '"') + inside_quotes = !inside_quotes; + else if (current == '{' && !inside_quotes) + host_stack.push('{'); + else if (current == '[' && !inside_quotes) + host_stack.push('['); + else if (current == '}' && !inside_quotes && host_stack.size() > 0) + host_stack.pop(); + else if (current == ']' && !inside_quotes && host_stack.size() > 0) + host_stack.pop(); + else if (current == delimiter) { + host_stack = {}; + inside_quotes = false; } } @@ -392,7 +399,8 @@ TEST_F(JsonTest, StackContextRecoveringFuzz) // Run algorithm constexpr auto stack_behavior = cuio_json::stack_behavior_t::ResetOnDelimiter; - cuio_json::detail::get_stack_context(d_input, stack_context.device_ptr(), stack_behavior, stream); + cuio_json::detail::get_stack_context( + d_input, stack_context.device_ptr(), stack_behavior, delimiter, stream); // Copy back the results stack_context.device_to_host_async(stream); @@ -404,7 +412,9 @@ TEST_F(JsonTest, StackContextRecoveringFuzz) CUDF_TEST_EXPECT_VECTOR_EQUAL(expected_stack_context, stack_context, stack_context.size()); } -TEST_F(JsonTest, TokenStream) +struct JsonNewlineDelimiterTest : public cudf::test::BaseFixture {}; + +TEST_F(JsonNewlineDelimiterTest, TokenStream) { using cuio_json::PdaTokenT; using cuio_json::SymbolOffsetT; @@ -549,7 +559,7 @@ TEST_F(JsonTest, TokenStream) } } -TEST_F(JsonTest, TokenStream2) +TEST_F(JsonNewlineDelimiterTest, TokenStream2) { using cuio_json::PdaTokenT; using cuio_json::SymbolOffsetT; @@ -653,29 +663,32 @@ TEST_F(JsonParserTest, ExtractColumn) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_col2, parsed_col2); } -TEST_F(JsonTest, RecoveringTokenStream) +TEST_P(JsonDelimiterParamTest, RecoveringTokenStream) { // Test input. Inline comments used to indicate character indexes // 012345678 <= line 0 - std::string const input = R"({"a":2 {})" - // 9 - "\n" - // 01234 <= line 1 - R"({"a":)" - // 5 - "\n" - // 67890123456789 <= line 2 - R"({"a":{"a":[321)" - // 0 - "\n" - // 123456789 <= line 3 - R"({"a":[1]})" - // 0 - "\n" - // 1 <= line 4 - "\n" - // 23456789 <= line 5 - R"({"b":123})"; + char const delimiter = GetParam(); + + std::string input = R"({"a":2 {})" + // 9 + "\n" + // 01234 <= line 1 + R"({"a":)" + // 5 + "\n" + // 67890123456789 <= line 2 + R"({"a":{"a":[321)" + // 0 + "\n" + // 123456789 <= line 3 + R"({"a":[1]})" + // 0 + "\n" + // 1 <= line 4 + "\n" + // 23456789 <= line 5 + R"({"b":123})"; + std::replace(input.begin(), input.end(), '\n', delimiter); // Golden token stream sample using token_t = cuio_json::token_t; @@ -717,6 +730,7 @@ TEST_F(JsonTest, RecoveringTokenStream) cudf::io::json_reader_options default_options{}; default_options.set_recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL); default_options.enable_lines(true); + default_options.set_delimiter(delimiter); // Prepare input & output buffers cudf::string_scalar const d_scalar(input, true, stream); @@ -730,6 +744,7 @@ TEST_F(JsonTest, RecoveringTokenStream) auto const tokens_gpu = cudf::detail::make_std_vector_async(d_tokens_gpu, stream); auto const token_indices_gpu = cudf::detail::make_std_vector_async(d_token_indices_gpu, stream); + stream.synchronize(); // Verify the number of tokens matches ASSERT_EQ(golden_token_stream.size(), tokens_gpu.size()); ASSERT_EQ(golden_token_stream.size(), token_indices_gpu.size()); @@ -864,25 +879,29 @@ TEST_F(JsonTest, PostProcessTokenStream) } } -TEST_F(JsonParserTest, UTF_JSON) +TEST_P(JsonDelimiterParamTest, UTF_JSON) { // Prepare cuda stream for data transfers & kernels - auto const stream = cudf::get_default_stream(); - auto mr = rmm::mr::get_current_device_resource(); - auto json_parser = cuio_json::detail::device_parse_nested_json; + auto const stream = cudf::get_default_stream(); + auto mr = rmm::mr::get_current_device_resource(); + auto json_parser = cuio_json::detail::device_parse_nested_json; + char const delimiter = GetParam(); // Default parsing options cudf::io::json_reader_options default_options{}; + default_options.set_delimiter(delimiter); // Only ASCII string - std::string const ascii_pass = R"([ + std::string ascii_pass = R"([ {"a":1,"b":2,"c":[3], "d": {}}, {"a":1,"b":4.0,"c":[], "d": {"year":1882,"author": "Bharathi"}}, {"a":1,"b":6.0,"c":[5, 7], "d": null}, {"a":1,"b":8.0,"c":null, "d": {}}, {"a":1,"b":null,"c":null}, {"a":1,"b":Infinity,"c":[null], "d": {"year":-600,"author": "Kaniyan"}}])"; - auto const d_ascii_pass = cudf::detail::make_device_uvector_sync( + std::replace(ascii_pass.begin(), ascii_pass.end(), '\n', delimiter); + + auto const d_ascii_pass = cudf::detail::make_device_uvector_sync( cudf::host_span{ascii_pass.c_str(), ascii_pass.size()}, stream, rmm::mr::get_current_device_resource()); @@ -890,21 +909,23 @@ TEST_F(JsonParserTest, UTF_JSON) CUDF_EXPECT_NO_THROW(json_parser(d_ascii_pass, default_options, stream, mr)); // utf-8 string that fails parsing. - std::string const utf_failed = R"([ + std::string utf_failed = R"([ {"a":1,"b":2,"c":[3], "d": {}}, {"a":1,"b":4.0,"c":[], "d": {"year":1882,"author": "Bharathi"}}, {"a":1,"b":6.0,"c":[5, 7], "d": null}, {"a":1,"b":8.0,"c":null, "d": {}}, {"a":1,"b":null,"c":null}, {"a":1,"b":Infinity,"c":[null], "d": {"year":-600,"author": "filip ʒakotɛ"}}])"; - auto const d_utf_failed = cudf::detail::make_device_uvector_sync( + std::replace(utf_failed.begin(), utf_failed.end(), '\n', delimiter); + + auto const d_utf_failed = cudf::detail::make_device_uvector_sync( cudf::host_span{utf_failed.c_str(), utf_failed.size()}, stream, rmm::mr::get_current_device_resource()); CUDF_EXPECT_NO_THROW(json_parser(d_utf_failed, default_options, stream, mr)); // utf-8 string that passes parsing. - std::string const utf_pass = R"([ + std::string utf_pass = R"([ {"a":1,"b":2,"c":[3], "d": {}}, {"a":1,"b":4.0,"c":[], "d": {"year":1882,"author": "Bharathi"}}, {"a":1,"b":6.0,"c":[5, 7], "d": null}, @@ -912,7 +933,9 @@ TEST_F(JsonParserTest, UTF_JSON) {"a":1,"b":null,"c":null}, {"a":1,"b":Infinity,"c":[null], "d": {"year":-600,"author": "Kaniyan"}}, {"a":1,"b":NaN,"c":[null, null], "d": {"year": 2, "author": "filip ʒakotɛ"}}])"; - auto const d_utf_pass = cudf::detail::make_device_uvector_sync( + std::replace(utf_pass.begin(), utf_pass.end(), '\n', delimiter); + + auto const d_utf_pass = cudf::detail::make_device_uvector_sync( cudf::host_span{utf_pass.c_str(), utf_pass.size()}, stream, rmm::mr::get_current_device_resource()); @@ -1017,4 +1040,159 @@ TEST_F(JsonParserTest, EmptyString) EXPECT_EQ(cudf_table.tbl->num_columns(), expected_col_count); } +TEST_P(JsonDelimiterParamTest, RecoveringTokenStreamNewlineAndDelimiter) +{ + // Test input. Inline comments used to indicate character indexes + // 012345678 <= line 0 + char const delimiter = GetParam(); + + /* Input: + * {"a":2} + * {"a":{"a":{"a":[321{"a":[1]} + * + * {"b":123} + * {"b":123} + */ + std::string input = R"({"a":2})" + "\n"; + // starting position 8 (zero indexed) + input += R"({"a":)" + std::string(1, delimiter); + // starting position 14 (zero indexed) + input += R"({"a":{"a":[321)" + std::string(1, delimiter); + // starting position 29 (zero indexed) + input += R"({"a":[1]})" + std::string("\n\n") + std::string(1, delimiter); + // starting position 41 (zero indexed) + input += R"({"b":123})" + "\n"; + // starting position 51 (zero indexed) + input += R"({"b":123})"; + + // Golden token stream sample + using token_t = cuio_json::token_t; + std::vector> golden_token_stream; + if (delimiter != '\n') { + golden_token_stream.resize(28); + golden_token_stream = {// Line 0 (valid) + {0, token_t::StructBegin}, + {1, token_t::StructMemberBegin}, + {1, token_t::FieldNameBegin}, + {3, token_t::FieldNameEnd}, + {5, token_t::ValueBegin}, + {6, token_t::ValueEnd}, + {6, token_t::StructMemberEnd}, + {6, token_t::StructEnd}, + // Line 1 (invalid) + {0, token_t::StructBegin}, + {0, token_t::StructEnd}, + // Line 2 (valid) + {29, token_t::StructBegin}, + {30, token_t::StructMemberBegin}, + {30, token_t::FieldNameBegin}, + {32, token_t::FieldNameEnd}, + {34, token_t::ListBegin}, + {35, token_t::ValueBegin}, + {36, token_t::ValueEnd}, + {36, token_t::ListEnd}, + {37, token_t::StructMemberEnd}, + {37, token_t::StructEnd}, + // Line 3 (valid) + {41, token_t::StructBegin}, + {42, token_t::StructMemberBegin}, + {42, token_t::FieldNameBegin}, + {44, token_t::FieldNameEnd}, + {46, token_t::ValueBegin}, + {49, token_t::ValueEnd}, + {49, token_t::StructMemberEnd}, + {49, token_t::StructEnd}}; + } else { + /* Input: + * {"a":2} + * {"a": + * {"a":{"a":[321 + * {"a":[1]} + * + * + * {"b":123} + * {"b":123} + */ + golden_token_stream.resize(38); + golden_token_stream = {// Line 0 (valid) + {0, token_t::StructBegin}, + {1, token_t::StructMemberBegin}, + {1, token_t::FieldNameBegin}, + {3, token_t::FieldNameEnd}, + {5, token_t::ValueBegin}, + {6, token_t::ValueEnd}, + {6, token_t::StructMemberEnd}, + {6, token_t::StructEnd}, + // Line 1 (invalid) + {0, token_t::StructBegin}, + {0, token_t::StructEnd}, + // Line 2 (invalid) + {0, token_t::StructBegin}, + {0, token_t::StructEnd}, + // Line 3 (valid) + {29, token_t::StructBegin}, + {30, token_t::StructMemberBegin}, + {30, token_t::FieldNameBegin}, + {32, token_t::FieldNameEnd}, + {34, token_t::ListBegin}, + {35, token_t::ValueBegin}, + {36, token_t::ValueEnd}, + {36, token_t::ListEnd}, + {37, token_t::StructMemberEnd}, + {37, token_t::StructEnd}, + // Line 4 (valid) + {41, token_t::StructBegin}, + {42, token_t::StructMemberBegin}, + {42, token_t::FieldNameBegin}, + {44, token_t::FieldNameEnd}, + {46, token_t::ValueBegin}, + {49, token_t::ValueEnd}, + {49, token_t::StructMemberEnd}, + {49, token_t::StructEnd}, + // Line 5 (valid) + {51, token_t::StructBegin}, + {52, token_t::StructMemberBegin}, + {52, token_t::FieldNameBegin}, + {54, token_t::FieldNameEnd}, + {56, token_t::ValueBegin}, + {59, token_t::ValueEnd}, + {59, token_t::StructMemberEnd}, + {59, token_t::StructEnd}}; + } + + auto const stream = cudf::get_default_stream(); + + // Default parsing options + cudf::io::json_reader_options default_options{}; + default_options.set_recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL); + default_options.enable_lines(true); + default_options.set_delimiter(delimiter); + + // Prepare input & output buffers + cudf::string_scalar const d_scalar(input, true, stream); + auto const d_input = cudf::device_span{ + d_scalar.data(), static_cast(d_scalar.size())}; + + // Parse the JSON and get the token stream + auto [d_tokens_gpu, d_token_indices_gpu] = cuio_json::detail::get_token_stream( + d_input, default_options, stream, rmm::mr::get_current_device_resource()); + // Copy back the number of tokens that were written + auto const tokens_gpu = cudf::detail::make_std_vector_async(d_tokens_gpu, stream); + auto const token_indices_gpu = cudf::detail::make_std_vector_async(d_token_indices_gpu, stream); + + stream.synchronize(); + // Verify the number of tokens matches + ASSERT_EQ(golden_token_stream.size(), tokens_gpu.size()); + ASSERT_EQ(golden_token_stream.size(), token_indices_gpu.size()); + + for (std::size_t i = 0; i < tokens_gpu.size(); i++) { + // Ensure the index the tokens are pointing to do match + EXPECT_EQ(golden_token_stream[i].first, token_indices_gpu[i]) << "Mismatch at #" << i; + // Ensure the token category is correct + EXPECT_EQ(golden_token_stream[i].second, tokens_gpu[i]) << "Mismatch at #" << i; + } +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/dependencies.yaml b/dependencies.yaml index 4f8f3c16ea1..f20c1591e73 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -341,10 +341,10 @@ dependencies: - output_types: conda packages: # Allow runtime version to float up to minor version - - libarrow-acero>=16.0.0,<17.0.0a0 - - libarrow-dataset>=16.0.0,<17.0.0a0 - - libarrow>=16.0.0,<17.0.0a0 - - libparquet>=16.0.0,<17.0.0a0 + - libarrow-acero>=16.0.0,<16.1.0a0 + - libarrow-dataset>=16.0.0,<16.1.0a0 + - libarrow>=16.0.0,<16.1.0a0 + - libparquet>=16.0.0,<16.1.0a0 pyarrow_run: common: - output_types: [conda, requirements, pyproject] diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py index d50dfb24256..926b7cfaf0e 100644 --- a/python/dask_cudf/dask_cudf/expr/_collection.py +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -15,6 +15,7 @@ from dask import config from dask.dataframe.core import is_dataframe_like +from dask.dataframe.dispatch import is_categorical_dtype import cudf @@ -81,6 +82,24 @@ def from_dict(cls, *args, **kwargs): with config.set({"dataframe.backend": "cudf"}): return DXDataFrame.from_dict(*args, **kwargs) + def sort_values( + self, + by, + **kwargs, + ): + # Raise if the first column is categorical, otherwise the + # upstream divisions logic may produce errors + # (See: https://github.com/rapidsai/cudf/issues/11795) + check_by = by[0] if isinstance(by, list) else by + if is_categorical_dtype(self.dtypes.get(check_by, None)): + raise NotImplementedError( + "Dask-cudf does not support sorting on categorical " + "columns when query-planning is enabled. Please use " + "the legacy API for now." + f"\n{_LEGACY_WORKAROUND}", + ) + return super().sort_values(by, **kwargs) + def groupby( self, by, diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 400600a1598..9d9fe297248 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,7 +10,7 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import xfail_dask_expr +from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) @@ -23,7 +23,7 @@ pytest.param( "d", marks=xfail_dask_expr( - "Dask-expr fails to sort by categorical column." + "Possible segfault when sorting by categorical column.", ), ), ["a", "b"], @@ -47,6 +47,20 @@ def test_sort_values(nelem, nparts, by, ascending): dd.assert_eq(got, expect, check_index=False) +@pytest.mark.parametrize("by", ["b", ["b", "a"]]) +def test_sort_values_categorical_raises(by): + df = cudf.DataFrame() + df["a"] = np.ascontiguousarray(np.arange(10)[::-1]) + df["b"] = df["a"].astype("category") + ddf = dd.from_pandas(df, npartitions=10) + + if QUERY_PLANNING_ON: + with pytest.raises( + NotImplementedError, match="sorting on categorical" + ): + ddf.sort_values(by=by) + + @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) def test_sort_values_single_partition(by, ascending):