From bc876cfd856096fa5c9fb48923412678cded56f6 Mon Sep 17 00:00:00 2001 From: Elias Stehle <3958403+elstehle@users.noreply.github.com> Date: Sun, 22 Oct 2023 08:05:44 -0700 Subject: [PATCH 1/3] fixes stack context for json lines recovering from errors --- cpp/src/io/fst/logical_stack.cuh | 136 +++++++++++++++++++++---- cpp/src/io/json/nested_json_gpu.cu | 31 ++++-- cpp/tests/io/fst/logical_stack_test.cu | 17 ++-- cpp/tests/io/nested_json_test.cpp | 117 +++++++++++++++++++++ 4 files changed, 264 insertions(+), 37 deletions(-) diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index c4f99736306..92aa55e646c 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,14 @@ enum class stack_op_type : int8_t { RESET = 3 ///< Operation popping all items currently on the stack }; +/** + * @brief Describes the kind of stack operations supported by the logical stack. + */ +enum class stack_op_support : bool { + NO_RESET_SUPPORT = false, ///< A stack that only supports push(x) and pop() operations + WITH_RESET_SUPPORT = true ///< A stack that supports push(x), pop(), and reset() operations +}; + namespace detail { /** @@ -130,6 +139,36 @@ struct StackSymbolToStackOp { StackSymbolToStackOpTypeT symbol_to_stack_op_type; }; +/** + * @brief Function object that maps a stack `reset` operation to `1`. + */ +template +struct NewlineToResetStackSegmentOp { + template + constexpr CUDF_HOST_DEVICE uint32_t operator()(StackSymbolT const& stack_symbol) const + { + stack_op_type stack_op = symbol_to_stack_op_type(stack_symbol); + + // Every reset operation marks the beginning of a new segment + return (stack_op == stack_op_type::RESET) ? 1 : 0; + } + + /// Function object returning a stack operation type for a given stack symbol + StackSymbolToStackOpTypeT symbol_to_stack_op_type; +}; + +/** + * @brief Function object that wraps around for values that exceed the largest value of `TargetT` + */ +template +struct ModToTargetTypeOpT { + template + constexpr CUDF_HOST_DEVICE uint32_t operator()(T const& val) const + { + return val % (static_cast(cuda::std::numeric_limits::max()) + static_cast(1)); + } +}; + /** * @brief Binary reduction operator to compute the absolute stack level from relative stack levels * (i.e., +1 for a PUSH, -1 for a POP operation). @@ -140,9 +179,7 @@ struct AddStackLevelFromStackOp { constexpr CUDF_HOST_DEVICE StackOp operator()( StackOp const& lhs, StackOp const& rhs) const { - StackLevelT new_level = (symbol_to_stack_op_type(rhs.value) == stack_op_type::RESET) - ? 0 - : (lhs.stack_level + rhs.stack_level); + StackLevelT new_level = lhs.stack_level + rhs.stack_level; return StackOp{new_level, rhs.value}; } @@ -230,6 +267,8 @@ struct RemapEmptyStack { * onto the stack or pop something from the stack and resolves the symbol that is on top of the * stack. * + * @tparam SupportResetOperation Whether the logical stack also supports `reset` operations that + * reset the stack to the empty stack * @tparam StackLevelT Signed integer type that must be sufficient to cover [-max_stack_level, * max_stack_level] for the given sequence of stack operations. Must be signed as it needs to cover * the stack level of any arbitrary subsequence of stack operations. @@ -261,7 +300,8 @@ struct RemapEmptyStack { * what-is-on-top-of-the-stack * @param[in] stream The cuda stream to which to dispatch the work */ -template ; + // Type used to mark *-by-key segments after `reset` operations + using StackSegmentT = uint8_t; + // The unsigned integer type that we use for radix sorting items of type StackOpT using StackOpUnsignedT = detail::UnsignedStackOpType; static_assert(!std::is_void(), "unsupported StackOpT size"); @@ -292,6 +335,8 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, using TransformInputItT = cub::TransformInputIterator; + constexpr bool supports_reset_op = SupportResetOperation == stack_op_support::WITH_RESET_SUPPORT; + auto const num_symbols_in = d_symbol_positions.size(); // Converting a stack symbol that may either push or pop to a stack operation: @@ -299,6 +344,10 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, StackSymbolToStackOpT stack_sym_to_kv_op{symbol_to_stack_op}; TransformInputItT stack_symbols_in(d_symbols, stack_sym_to_kv_op); + // Iterator that returns `1` for every symbol that corresponds to a `reset` operation + auto reset_segments_it = thrust::make_transform_iterator( + d_symbols, detail::NewlineToResetStackSegmentOp{symbol_to_stack_op}); + // Double-buffer for sorting along the given sequence of symbol positions (the sparse // representation) cub::DoubleBuffer d_symbol_positions_db{nullptr, nullptr}; @@ -330,14 +379,39 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, // Getting temporary storage requirements for the prefix sum of the stack level after each // operation - CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScan( - nullptr, - stack_level_scan_bytes, - stack_symbols_in, - d_kv_operations.Current(), - detail::AddStackLevelFromStackOp{symbol_to_stack_op}, - num_symbols_in, - stream)); + if constexpr (supports_reset_op) { + auto const fake_key_segment_it = static_cast(nullptr); + std::size_t gen_segments_scan_bytes = 0; + std::size_t scan_by_key_bytes = 0; + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveSum( + nullptr, + gen_segments_scan_bytes, + reset_segments_it, + thrust::make_transform_output_iterator(fake_key_segment_it, + detail::ModToTargetTypeOpT{}), + num_symbols_in, + stream)); + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScanByKey( + nullptr, + scan_by_key_bytes, + fake_key_segment_it, + stack_symbols_in, + d_kv_operations.Current(), + detail::AddStackLevelFromStackOp{symbol_to_stack_op}, + num_symbols_in, + cub::Equality{}, + stream)); + stack_level_scan_bytes = std::max(gen_segments_scan_bytes, scan_by_key_bytes); + } else { + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScan( + nullptr, + stack_level_scan_bytes, + stack_symbols_in, + d_kv_operations.Current(), + detail::AddStackLevelFromStackOp{symbol_to_stack_op}, + num_symbols_in, + stream)); + } // Getting temporary storage requirements for the stable radix sort (sorting by stack level of the // operations) @@ -401,14 +475,36 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, d_kv_operations = cub::DoubleBuffer{d_kv_ops_current.data(), d_kv_ops_alt.data()}; // Compute prefix sum of the stack level after each operation - CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScan( - temp_storage.data(), - total_temp_storage_bytes, - stack_symbols_in, - d_kv_operations.Current(), - detail::AddStackLevelFromStackOp{symbol_to_stack_op}, - num_symbols_in, - stream)); + if constexpr (supports_reset_op) { + rmm::device_uvector key_segments{num_symbols_in, stream}; + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveSum( + temp_storage.data(), + total_temp_storage_bytes, + reset_segments_it, + thrust::make_transform_output_iterator(key_segments.data(), + detail::ModToTargetTypeOpT{}), + num_symbols_in, + stream)); + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScanByKey( + temp_storage.data(), + total_temp_storage_bytes, + key_segments.data(), + stack_symbols_in, + d_kv_operations.Current(), + detail::AddStackLevelFromStackOp{symbol_to_stack_op}, + num_symbols_in, + cub::Equality{}, + stream)); + } else { + CUDF_CUDA_TRY(cub::DeviceScan::InclusiveScan( + temp_storage.data(), + total_temp_storage_bytes, + stack_symbols_in, + d_kv_operations.Current(), + detail::AddStackLevelFromStackOp{symbol_to_stack_op}, + num_symbols_in, + stream)); + } // Stable radix sort, sorting by stack level of the operations d_kv_operations_unsigned = cub::DoubleBuffer{ diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 3702d94fd2b..496e5b25e60 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -343,27 +343,35 @@ constexpr auto NUM_SYMBOL_GROUPS = static_cast(dfa_symbol_group_id::NU std::array const symbol_groups{ {{"{"}, {"["}, {"}"}, {"]"}, {"\""}, {"\\"}, {"\n"}}}; -// Transition table +// Transition table for the default JSON and JSON lines formats std::array, TT_NUM_STATES> const transition_table{ {/* IN_STATE { [ } ] " \ \n OTHER */ /* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}}, /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}}, /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}}; -// Translation table (i.e., for each transition, what are the symbols that we output) +// Transition table for the JSON lines format that recovers from invalid JSON lines +std::array, TT_NUM_STATES> const + resetting_transition_table{ + {/* IN_STATE { [ } ] " \ \n OTHER */ + /* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}}, + /* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}}, + /* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}}; + +// Translation table for the default JSON and JSON lines formats std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const translation_table{ {/* IN_STATE { [ } ] " \ \n OTHER */ /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}}, /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}}, /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}}; -// Translation table +// Translation table for the JSON lines format that recovers from invalid JSON lines std::array, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const resetting_translation_table{ {/* IN_STATE { [ } ] " \ \n OTHER */ /* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}}, - /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}}, - /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}}; + /* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}, + /* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}}; // The DFA's starting state constexpr auto start_state = static_cast(TT_OOS); @@ -1415,14 +1423,19 @@ void get_stack_context(device_span json_in, constexpr auto max_translation_table_size = to_stack_op::NUM_SYMBOL_GROUPS * to_stack_op::TT_NUM_STATES; - // Translation table specialized on the choice of whether to reset on newlines outside of strings + // Transition table specialized on the choice of whether to reset on newlines + const auto transition_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter) + ? to_stack_op::resetting_transition_table + : to_stack_op::transition_table; + + // Translation table specialized on the choice of whether to reset on newlines const auto translation_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter) ? to_stack_op::resetting_translation_table : to_stack_op::translation_table; auto json_to_stack_ops_fst = fst::detail::make_fst( fst::detail::make_symbol_group_lut(to_stack_op::symbol_groups), - fst::detail::make_transition_table(to_stack_op::transition_table), + fst::detail::make_transition_table(transition_table), fst::detail::make_translation_table(translation_table), stream); @@ -1441,7 +1454,7 @@ void get_stack_context(device_span json_in, // Stack operations with indices are converted to top of the stack for each character in the input if (stack_behavior == stack_behavior_t::ResetOnDelimiter) { - fst::sparse_stack_op_to_top_of_stack( + fst::sparse_stack_op_to_top_of_stack( stack_ops.data(), device_span{stack_op_indices.data(), num_stack_ops}, JSONWithRecoveryToStackOp{}, @@ -1451,7 +1464,7 @@ void get_stack_context(device_span json_in, json_in.size(), stream); } else { - fst::sparse_stack_op_to_top_of_stack( + fst::sparse_stack_op_to_top_of_stack( stack_ops.data(), device_span{stack_op_indices.data(), num_stack_ops}, JSONToStackOp{}, diff --git a/cpp/tests/io/fst/logical_stack_test.cu b/cpp/tests/io/fst/logical_stack_test.cu index 3d6743702b8..20b8674a717 100644 --- a/cpp/tests/io/fst/logical_stack_test.cu +++ b/cpp/tests/io/fst/logical_stack_test.cu @@ -216,14 +216,15 @@ TEST_F(LogicalStackTest, GroundTruth) stream.value())); // Run algorithm - fst::sparse_stack_op_to_top_of_stack(d_stack_ops.data(), - d_stack_op_idx_span, - JSONToStackOp{}, - top_of_stack_gpu.device_ptr(), - empty_stack_symbol, - read_symbol, - string_size, - stream.value()); + fst::sparse_stack_op_to_top_of_stack( + d_stack_ops.data(), + d_stack_op_idx_span, + JSONToStackOp{}, + top_of_stack_gpu.device_ptr(), + empty_stack_symbol, + read_symbol, + string_size, + stream.value()); // Async copy results from device to host top_of_stack_gpu.device_to_host_async(stream_view); diff --git a/cpp/tests/io/nested_json_test.cpp b/cpp/tests/io/nested_json_test.cpp index 5f79d5b862b..b0ffbe3d154 100644 --- a/cpp/tests/io/nested_json_test.cpp +++ b/cpp/tests/io/nested_json_test.cpp @@ -285,6 +285,123 @@ TEST_F(JsonTest, StackContextRecovering) CUDF_TEST_EXPECT_VECTOR_EQUAL(golden_stack_context, stack_context, stack_context.size()); } +TEST_F(JsonTest, StackContextRecoveringFuzz) +{ + // Type used to represent the atomic symbol type used within the finite-state machine + using SymbolT = char; + using StackSymbolT = char; + + std::random_device rd; + std::mt19937 gen(42); + std::uniform_int_distribution distribution(0, 4); + constexpr std::size_t input_length = 1024 * 1024; + std::string input{}; + input.reserve(input_length); + + bool inside_quotes = false; + std::stack host_stack{}; + for (std::size_t i = 0; i < input_length; ++i) { + bool is_ok = true; + char current{}; + do { + int rand_char = distribution(gen); + is_ok = true; + switch (rand_char) { + case 0: current = '{'; break; + case 1: current = '['; break; + case 2: current = '}'; break; + case 3: current = '"'; break; + case 4: current = '\n'; break; + } + switch (current) { + case '"': inside_quotes = !inside_quotes; break; + case '{': + if (!inside_quotes) { host_stack.push('{'); } + break; + case '[': + if (!inside_quotes) { host_stack.push('['); } + break; + case '}': + if (!inside_quotes) { + if (host_stack.size() > 0) { + // Get the proper 'pop' stack symbol + current = (host_stack.top() == '{' ? '}' : ']'); + host_stack.pop(); + } else + is_ok = false; + } + break; + case '\n': + // Increase chance to have longer lines + if (distribution(gen) == 0) { + is_ok = false; + break; + } else { + host_stack = {}; + inside_quotes = false; + break; + } + } + } while (!is_ok); + input += current; + } + + std::string expected_stack_context{}; + expected_stack_context.reserve(input_length); + inside_quotes = false; + host_stack = std::stack{}; + for (auto const current : input) { + // Write the stack context for the current input symbol + if (host_stack.empty()) { + expected_stack_context += '_'; + } else { + expected_stack_context += host_stack.top(); + } + + switch (current) { + case '"': inside_quotes = !inside_quotes; break; + case '{': + if (!inside_quotes) { host_stack.push('{'); } + break; + case '[': + if (!inside_quotes) { host_stack.push('['); } + break; + case '}': + if (!inside_quotes && host_stack.size() > 0) { host_stack.pop(); } + break; + case ']': + if (!inside_quotes && host_stack.size() > 0) { host_stack.pop(); } + break; + case '\n': + host_stack = {}; + inside_quotes = false; + break; + } + } + + // Prepare cuda stream for data transfers & kernels + auto const stream = cudf::get_default_stream(); + + // Prepare input & output buffers + cudf::string_scalar const d_scalar(input, true, stream); + auto const d_input = + cudf::device_span{d_scalar.data(), static_cast(d_scalar.size())}; + cudf::detail::hostdevice_vector stack_context(input.size(), stream); + + // Run algorithm + constexpr auto stack_behavior = cuio_json::stack_behavior_t::ResetOnDelimiter; + cuio_json::detail::get_stack_context(d_input, stack_context.device_ptr(), stack_behavior, stream); + + // Copy back the results + stack_context.device_to_host_async(stream); + + // Make sure we copied back the stack context + stream.synchronize(); + + ASSERT_EQ(expected_stack_context.size(), stack_context.size()); + CUDF_TEST_EXPECT_VECTOR_EQUAL(expected_stack_context, stack_context, stack_context.size()); +} + TEST_F(JsonTest, TokenStream) { using cuio_json::PdaTokenT; From 40c6b550a45e3d6b2e6426cd2dc9d7898755bfa5 Mon Sep 17 00:00:00 2001 From: Elias Stehle <3958403+elstehle@users.noreply.github.com> Date: Sun, 22 Oct 2023 10:00:23 -0700 Subject: [PATCH 2/3] addresses unused var for if constexpr --- cpp/src/io/fst/logical_stack.cuh | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index 92aa55e646c..26d2262b4cd 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -344,10 +344,6 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, StackSymbolToStackOpT stack_sym_to_kv_op{symbol_to_stack_op}; TransformInputItT stack_symbols_in(d_symbols, stack_sym_to_kv_op); - // Iterator that returns `1` for every symbol that corresponds to a `reset` operation - auto reset_segments_it = thrust::make_transform_iterator( - d_symbols, detail::NewlineToResetStackSegmentOp{symbol_to_stack_op}); - // Double-buffer for sorting along the given sequence of symbol positions (the sparse // representation) cub::DoubleBuffer d_symbol_positions_db{nullptr, nullptr}; @@ -380,6 +376,11 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, // Getting temporary storage requirements for the prefix sum of the stack level after each // operation if constexpr (supports_reset_op) { + // Iterator that returns `1` for every symbol that corresponds to a `reset` operation + auto reset_segments_it = thrust::make_transform_iterator( + d_symbols, + detail::NewlineToResetStackSegmentOp{symbol_to_stack_op}); + auto const fake_key_segment_it = static_cast(nullptr); std::size_t gen_segments_scan_bytes = 0; std::size_t scan_by_key_bytes = 0; @@ -476,6 +477,11 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, // Compute prefix sum of the stack level after each operation if constexpr (supports_reset_op) { + // Iterator that returns `1` for every symbol that corresponds to a `reset` operation + auto reset_segments_it = thrust::make_transform_iterator( + d_symbols, + detail::NewlineToResetStackSegmentOp{symbol_to_stack_op}); + rmm::device_uvector key_segments{num_symbols_in, stream}; CUDF_CUDA_TRY(cub::DeviceScan::InclusiveSum( temp_storage.data(), From 7109874e6b9312c0255206a2221fa63e5df1076c Mon Sep 17 00:00:00 2001 From: Elias Stehle <3958403+elstehle@users.noreply.github.com> Date: Mon, 30 Oct 2023 04:12:21 -0700 Subject: [PATCH 3/3] fixes return type --- cpp/src/io/fst/logical_stack.cuh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index 26d2262b4cd..22385d33c7b 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -163,9 +163,10 @@ struct NewlineToResetStackSegmentOp { template struct ModToTargetTypeOpT { template - constexpr CUDF_HOST_DEVICE uint32_t operator()(T const& val) const + constexpr CUDF_HOST_DEVICE TargetT operator()(T const& val) const { - return val % (static_cast(cuda::std::numeric_limits::max()) + static_cast(1)); + return static_cast( + val % (static_cast(cuda::std::numeric_limits::max()) + static_cast(1))); } };