diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index b662b660557..7c1d2c8fd9a 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -125,6 +125,8 @@ class json_reader_options { // Normalize unquoted spaces and tabs bool _normalize_whitespace = false; + bool _nullify_empty_lines = false; + // Whether to recover after an invalid JSON line json_recovery_mode_t _recovery_mode = json_recovery_mode_t::FAIL; @@ -313,6 +315,13 @@ class json_reader_options { */ [[nodiscard]] bool is_enabled_normalize_whitespace() const { return _normalize_whitespace; } + /** + * @brief Whether the reader should nullify empty lines for json lines format with recovery mode + * + * @returns true if the reader should nullify empty lines, false otherwise + */ + [[nodiscard]] bool is_nullify_empty_lines() const { return _nullify_empty_lines; } + /** * @brief Queries the JSON reader's behavior on invalid JSON lines. * @@ -412,13 +421,15 @@ class json_reader_options { void set_byte_range_size(size_t size) { _byte_range_size = size; } /** - * @brief Set delimiter separating records in JSON lines + * @brief Function to decide if passed argument is a valid delimiter * - * @param delimiter Delimiter separating records in JSON lines + * @param c Character to test as valid delimiter + * @return Boolean value indicating if passed character can be used as delimiter */ - void set_delimiter(char delimiter) + static constexpr bool can_be_delimiter(char c) { - switch (delimiter) { + // The character list below is from `json_reader_options.set_delimiter`. + switch (c) { case '{': case '[': case '}': @@ -430,8 +441,20 @@ class json_reader_options { case '\\': case ' ': case '\t': - case '\r': CUDF_FAIL("Unsupported delimiter character.", std::invalid_argument); break; + case '\r': return false; + default: return true; } + } + + /** + * @brief Set delimiter separating records in JSON lines + * + * @param delimiter Delimiter separating records in JSON lines + */ + void set_delimiter(char delimiter) + { + CUDF_EXPECTS( + can_be_delimiter(delimiter), "Unsupported delimiter character", std::invalid_argument); _delimiter = delimiter; } @@ -502,6 +525,14 @@ class json_reader_options { */ void enable_normalize_whitespace(bool val) { _normalize_whitespace = val; } + /** + * @brief Set whether the reader should nullify empty lines for json lines format with recovery + * mode + * + * @param val Boolean value to indicate whether the reader should nullify empty lines + */ + void nullify_empty_lines(bool val) { _nullify_empty_lines = val; } + /** * @brief Specifies the JSON reader's behavior on invalid JSON lines. * @@ -779,6 +810,19 @@ class json_reader_options_builder { return *this; } + /** + * @brief Set whether the reader should nullify empty lines for json lines format with recovery + * mode + * + * @param val Boolean value to indicate whether the reader should nullify empty lines + * @return this for chaining + */ + json_reader_options_builder& nullify_empty_lines(bool val) + { + options._nullify_empty_lines = val; + return *this; + } + /** * @brief Specifies the JSON reader's behavior on invalid JSON lines. * diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index f6be4539d7f..48bf558a182 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -265,6 +265,7 @@ void get_stack_context(device_span json_in, * * @param tokens The tokens to be post-processed * @param token_indices The tokens' corresponding indices that are post-processed + * @param nullify_empty_lines Whether to nullify empty lines * @param stream The cuda stream to dispatch GPU kernels to * @return Returns the post-processed token stream */ @@ -272,6 +273,7 @@ CUDF_EXPORT std::pair, rmm::device_uvector> process_token_stream( device_span tokens, device_span token_indices, + bool nullify_empty_lines, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 534b30a6089..84823b18665 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -195,11 +195,12 @@ using SymbolGroupT = uint8_t; /** * @brief Definition of the DFA's states */ -enum class dfa_states : StateT { VALID, INVALID, NUM_STATES }; +enum class dfa_states : StateT { START, VALID, INVALID, NUM_STATES }; // Aliases for readability of the transition table -constexpr auto TT_INV = dfa_states::INVALID; -constexpr auto TT_VLD = dfa_states::VALID; +constexpr auto TT_START = dfa_states::START; +constexpr auto TT_INV = dfa_states::INVALID; +constexpr auto TT_VLD = dfa_states::VALID; /** * @brief Definition of the symbol groups @@ -238,14 +239,17 @@ struct UnwrapTokenFromSymbolOp { * invalid lines. */ struct TransduceToken { + bool nullify_empty_lines; template constexpr CUDF_HOST_DEVICE SymbolT operator()(StateT const state_id, SymbolGroupT const match_id, RelativeOffsetT const relative_offset, SymbolT const read_symbol) const { + bool const is_empty_invalid = + (nullify_empty_lines && state_id == static_cast(TT_START)); bool const is_end_of_invalid_line = - (state_id == static_cast(TT_INV) && + ((state_id == static_cast(TT_INV) or is_empty_invalid) && match_id == static_cast(dfa_symbol_group_id::DELIMITER)); if (is_end_of_invalid_line) { @@ -265,14 +269,17 @@ struct TransduceToken { constexpr int32_t num_inv_tokens = 2; bool const is_delimiter = match_id == static_cast(dfa_symbol_group_id::DELIMITER); + bool const is_empty_invalid = + (nullify_empty_lines && state_id == static_cast(TT_START)); // If state is either invalid or we're entering an invalid state, we discard tokens bool const is_part_of_invalid_line = (match_id != static_cast(dfa_symbol_group_id::ERROR) && - state_id == static_cast(TT_VLD)); + (state_id == static_cast(TT_VLD) or state_id == static_cast(TT_START))); // Indicates whether we transition from an invalid line to a potentially valid line - bool const is_end_of_invalid_line = (state_id == static_cast(TT_INV) && is_delimiter); + bool const is_end_of_invalid_line = + ((state_id == static_cast(TT_INV) or is_empty_invalid) && is_delimiter); int32_t const emit_count = is_end_of_invalid_line ? num_inv_tokens : (is_part_of_invalid_line && !is_delimiter ? 1 : 0); @@ -283,11 +290,12 @@ struct TransduceToken { // Transition table std::array, TT_NUM_STATES> const transition_table{ {/* IN_STATE ERROR DELIM OTHER */ - /* VALID */ {{TT_INV, TT_VLD, TT_VLD}}, - /* INVALID */ {{TT_INV, TT_VLD, TT_INV}}}}; + /* START */ {{TT_INV, TT_START, TT_VLD}}, + /* VALID */ {{TT_INV, TT_START, TT_VLD}}, + /* INVALID */ {{TT_INV, TT_START, TT_INV}}}}; // The DFA's starting state -constexpr auto start_state = static_cast(TT_VLD); +constexpr auto start_state = static_cast(TT_START); } // namespace token_filter // JSON to stack operator DFA (Deterministic Finite Automata) @@ -1506,17 +1514,19 @@ void get_stack_context(device_span json_in, std::pair, rmm::device_uvector> process_token_stream( device_span tokens, device_span token_indices, + bool nullify_empty_lines, rmm::cuda_stream_view stream) { // Instantiate FST for post-processing the token stream to remove all tokens that belong to an // invalid JSON line token_filter::UnwrapTokenFromSymbolOp sgid_op{}; - using symbol_t = thrust::tuple; - auto filter_fst = fst::detail::make_fst( - fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op), - fst::detail::make_transition_table(token_filter::transition_table), - fst::detail::make_translation_functor(token_filter::TransduceToken{}), - stream); + using symbol_t = thrust::tuple; + auto filter_fst = + fst::detail::make_fst(fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op), + fst::detail::make_transition_table(token_filter::transition_table), + fst::detail::make_translation_functor( + token_filter::TransduceToken{nullify_empty_lines}), + stream); auto const mr = cudf::get_current_device_resource_ref(); cudf::detail::device_scalar d_num_selected_tokens(stream, mr); @@ -1663,7 +1673,7 @@ std::pair, rmm::device_uvector> ge tokens.set_element(0, token_t::LineEnd, stream); validate_token_stream(json_in, tokens, tokens_indices, options, stream); auto [filtered_tokens, filtered_tokens_indices] = - process_token_stream(tokens, tokens_indices, stream); + process_token_stream(tokens, tokens_indices, options.is_nullify_empty_lines(), stream); tokens = std::move(filtered_tokens); tokens_indices = std::move(filtered_tokens_indices); } diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index c424d2b3b62..ffd95cab084 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -18,12 +18,14 @@ #include "io/json/nested_json.hpp" #include "read_json.hpp" +#include #include #include #include #include #include #include +#include #include #include #include @@ -31,8 +33,11 @@ #include #include +#include +#include #include #include +#include #include #include @@ -442,4 +447,118 @@ table_with_metadata read_json(host_span> sources, {partial_tables[0].metadata.schema_info}}; } +std::tuple preprocess(cudf::strings_column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto constexpr num_levels = 256; + auto constexpr lower_level = std::numeric_limits::min(); + auto constexpr upper_level = std::numeric_limits::max(); + auto const num_chars = input.chars_size(stream); + + char delimiter; + { + auto histogram = + cudf::detail::make_zeroed_device_uvector_async(num_levels, stream, mr); + size_t temp_storage_bytes = 0; + cub::DeviceHistogram::HistogramEven(nullptr, + temp_storage_bytes, + input.chars_begin(stream), + histogram.begin(), + num_levels, + lower_level, + upper_level, + num_chars, + stream.value()); + rmm::device_buffer d_temp(temp_storage_bytes, stream); + cub::DeviceHistogram::HistogramEven(d_temp.data(), + temp_storage_bytes, + input.chars_begin(stream), + histogram.begin(), + num_levels, + lower_level, + upper_level, + num_chars, + stream.value()); + + auto const zero_level_idx = -lower_level; // the bin storing count for character `\0` + auto const first_zero_count_pos = thrust::find_if( + rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0) + zero_level_idx, // ignore the negative characters + thrust::make_counting_iterator(0) + num_levels, + [zero_level_idx, counts = histogram.begin()] __device__(auto idx) -> bool { + auto const count = counts[idx]; + if (count > 0) { return false; } + auto const first_non_existing_char = static_cast(idx - zero_level_idx); + return json_reader_options::can_be_delimiter(first_non_existing_char); + }); + + // This should never happen since the input should never cover the entire char range. + if (first_zero_count_pos == thrust::make_counting_iterator(0) + num_levels) { + throw std::logic_error( + "Cannot find any character suitable as delimiter during joining json strings."); + } + delimiter = static_cast( + thrust::distance(thrust::make_counting_iterator(0) + zero_level_idx, first_zero_count_pos)); + } + + auto d_offsets_colview = input.offsets(); + device_span d_offsets(d_offsets_colview.data(), + d_offsets_colview.size()); + + rmm::device_buffer concatenated_buffer(num_chars + d_offsets.size() - 2, stream); + + thrust::scatter( + rmm::exec_policy_nosync(stream), + thrust::make_constant_iterator(delimiter), + thrust::make_constant_iterator(delimiter) + d_offsets.size() - 2, + thrust::make_transform_iterator( + thrust::make_counting_iterator(1), + cuda::proclaim_return_type( + [d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> cudf::size_type { + return d_offsets[idx] + idx - 1; + })), + reinterpret_cast(concatenated_buffer.data())); + + { + // cub device batched copy + auto input_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + cuda::proclaim_return_type( + [input = input.chars_begin(stream), d_offsets = d_offsets.begin()] __device__( + cudf::size_type idx) -> char const* { return input + d_offsets[idx]; })); + auto output_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + cuda::proclaim_return_type( + [output = reinterpret_cast(concatenated_buffer.data()), + d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> char* { + return output + d_offsets[idx] + idx; + })); + auto sizes_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + cuda::proclaim_return_type( + [d_offsets = d_offsets.begin()] __device__(cudf::size_type idx) -> cudf::size_type { + return d_offsets[idx + 1] - d_offsets[idx]; + })); + size_t temp_storage_bytes = 0; + cub::DeviceCopy::Batched(nullptr, + temp_storage_bytes, + input_it, + output_it, + sizes_it, + static_cast(d_offsets.size() - 1), + stream.value()); + rmm::device_buffer temp_storage(temp_storage_bytes, stream); + cub::DeviceCopy::Batched(temp_storage.data(), + temp_storage_bytes, + input_it, + output_it, + sizes_it, + static_cast(d_offsets.size() - 1), + stream.value()); + } + + return std::tuple{std::move(concatenated_buffer), delimiter}; +} + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 982190eecb5..72ca3848bcc 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -73,5 +74,9 @@ table_with_metadata read_json(host_span> sources, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +std::tuple preprocess(cudf::strings_column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + } // namespace io::json::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index cb6716f4a18..d8cac4c0579 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "io/json/read_json.hpp" + #include #include #include @@ -2975,4 +2977,53 @@ TEST_F(JsonReaderTest, JsonDtypeSchema) cudf::test::debug_output_level::ALL_ERRORS); } +TEST_F(JsonReaderTest, PreprocessAndNullifyEmptyRows) +{ + // Test input + std::string const row_string = R"({"A":"TEST"})"; + auto string_col = cudf::test::strings_column_wrapper( + {row_string, row_string, "", row_string, row_string, row_string, ""}, + {true, true, true, true, true, true, true}) + .release(); + rmm::cuda_stream stream{}; + rmm::cuda_stream_view stream_view(stream); + auto [processed_buffer, delim] = cudf::io::json::detail::preprocess( + cudf::strings_column_view(*string_col), stream_view, cudf::get_current_device_resource_ref()); + + /* + std::printf("delimiter = %d\n", delim); + std::string h_processed_buffer; + h_processed_buffer.resize(processed_buffer.size()); + CUDF_CUDA_TRY(cudaMemcpyAsync(h_processed_buffer.data(), + reinterpret_cast(processed_buffer.data()), + processed_buffer.size() * sizeof(char), + cudaMemcpyDeviceToHost, + stream.value())); + std::cout << "processed_buffer = \n" << h_processed_buffer; + std::cout << std::endl; + */ + + cudf::io::json_reader_options input_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{reinterpret_cast(processed_buffer.data()), + processed_buffer.size()}) + .lines(true) + .delimiter(delim) + .nullify_empty_lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL); + + cudf::io::table_with_metadata result = cudf::io::read_json(input_options); + + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 7); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING); + EXPECT_EQ(result.metadata.schema_info[0].name, "A"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + result.tbl->get_column(0), + cudf::test::strings_column_wrapper({"TEST", "TEST", "", "TEST", "TEST", "TEST", ""}, + {true, true, false, true, true, true, false}), + cudf::test::debug_output_level::ALL_ERRORS); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/io/json/nested_json_test.cpp b/cpp/tests/io/json/nested_json_test.cpp index f32aba0e632..8481cbb8483 100644 --- a/cpp/tests/io/json/nested_json_test.cpp +++ b/cpp/tests/io/json/nested_json_test.cpp @@ -864,7 +864,7 @@ TEST_F(JsonTest, PostProcessTokenStream) // Run system-under-test auto [d_filtered_tokens, d_filtered_indices] = - cuio_json::detail::process_token_stream(d_tokens, d_offsets, stream); + cuio_json::detail::process_token_stream(d_tokens, d_offsets, false, stream); auto const filtered_tokens = cudf::detail::make_std_vector_async(d_filtered_tokens, stream); auto const filtered_indices = cudf::detail::make_std_vector_async(d_filtered_indices, stream);