From ee4a6c4fb8e8507b91f107b596c3eac80d069db6 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:39:39 -0800 Subject: [PATCH] Improve `concat_json` (#2557) * Add `nullify_invalid_rows` parameter to `concat_json` Signed-off-by: Nghia Truong * Cleanup and add more docs Signed-off-by: Nghia Truong * Rename variable Signed-off-by: Nghia Truong --------- Signed-off-by: Nghia Truong --- src/main/cpp/src/JSONUtilsJni.cpp | 9 +++-- src/main/cpp/src/json_utils.cu | 62 +++++++++++++------------------ src/main/cpp/src/json_utils.hpp | 31 +++++++++++++--- 3 files changed, 58 insertions(+), 44 deletions(-) diff --git a/src/main/cpp/src/JSONUtilsJni.cpp b/src/main/cpp/src/JSONUtilsJni.cpp index 67758e8595..c8d15aba4e 100644 --- a/src/main/cpp/src/JSONUtilsJni.cpp +++ b/src/main/cpp/src/JSONUtilsJni.cpp @@ -163,8 +163,11 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concaten try { cudf::jni::auto_set_device(env); auto const input_cv = reinterpret_cast(j_input); - auto [is_valid, joined_strings, delimiter] = - spark_rapids_jni::concat_json(cudf::strings_column_view{*input_cv}); + + // Currently, set `nullify_invalid_rows = false` as `concatenateJsonStrings` is used only for + // `from_json` with struct schema. + auto [joined_strings, delimiter, should_be_nullify] = spark_rapids_jni::concat_json( + cudf::strings_column_view{*input_cv}, /*nullify_invalid_rows*/ false); // The output array contains 5 elements: // [0]: address of the cudf::column object `is_valid` in host memory @@ -173,7 +176,7 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concaten // [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory // [4]: delimiter char auto out_handles = cudf::jni::native_jlongArray(env, 5); - out_handles[0] = reinterpret_cast(is_valid.release()); + out_handles[0] = reinterpret_cast(should_be_nullify.release()); out_handles[1] = reinterpret_cast(joined_strings->data()); out_handles[2] = static_cast(joined_strings->size()); out_handles[3] = reinterpret_cast(joined_strings.release()); diff --git a/src/main/cpp/src/json_utils.cu b/src/main/cpp/src/json_utils.cu index 85b2dc9301..db0606c38c 100644 --- a/src/main/cpp/src/json_utils.cu +++ b/src/main/cpp/src/json_utils.cu @@ -67,29 +67,40 @@ constexpr bool can_be_delimiter(char c) } // namespace -std::tuple, std::unique_ptr, char> concat_json( +std::tuple, char, std::unique_ptr> concat_json( cudf::strings_column_view const& input, + bool nullify_invalid_rows, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + if (input.is_empty()) { + return {std::make_unique(0, stream, mr), + '\n', + std::make_unique( + rmm::device_uvector{0, stream, mr}, rmm::device_buffer{}, 0)}; + } + auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream); auto const default_mr = rmm::mr::get_current_device_resource(); - // Check if the input rows are either null, equal to `null` string literal, or empty. - // This will be used for masking out the input when doing string concatenation. + // Check if the input rows are null, empty (containing only whitespaces), and invalid JSON. + // This will be used for masking out the null/empty/invalid input rows when doing string + // concatenation. rmm::device_uvector is_valid_input(input.size(), stream, default_mr); - // Check if the input rows are either null or empty. - // This will be returned to the caller. - rmm::device_uvector is_null_or_empty(input.size(), stream, mr); + // Check if the input rows are null, empty (containing only whitespaces), and may also check + // for invalid JSON strings. + // This will be returned to the caller to create null mask for the final output. + rmm::device_uvector should_be_nullified(input.size(), stream, mr); thrust::for_each( rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0L), thrust::make_counting_iterator(input.size() * static_cast(cudf::detail::warp_size)), - [input = *d_input_ptr, + [nullify_invalid_rows, + input = *d_input_ptr, output = thrust::make_zip_iterator(thrust::make_tuple( - is_valid_input.begin(), is_null_or_empty.begin()))] __device__(int64_t tidx) { + is_valid_input.begin(), should_be_nullified.begin()))] __device__(int64_t tidx) { // Execute one warp per row to minimize thread divergence. if ((tidx % cudf::detail::warp_size) != 0) { return; } auto const idx = tidx / cudf::detail::warp_size; @@ -110,28 +121,6 @@ std::tuple, std::unique_ptr, c if (not_whitespace(ch)) { break; } } - if (i + 3 < size && - (d_str[i] == 'n' && d_str[i + 1] == 'u' && d_str[i + 2] == 'l' && d_str[i + 3] == 'l')) { - i += 4; - - // Skip the very last whitespace characters. - bool is_null_literal{true}; - for (; i < size; ++i) { - ch = d_str[i]; - if (not_whitespace(ch)) { - is_null_literal = false; - break; - } - } - - // The current row contains only `null` string literal and not any other non-whitespace - // characters. Such rows need to be masked out as null when doing concatenation. - if (is_null_literal) { - output[idx] = thrust::make_tuple(false, false); - return; - } - } - auto const not_eol = i < size; // If the current row is not null or empty, it should start with `{`. Otherwise, we need to @@ -139,7 +128,7 @@ std::tuple, std::unique_ptr, c // Note that if we want to support ARRAY schema, we need to check for `[` instead. auto constexpr start_character = '{'; if (not_eol && ch != start_character) { - output[idx] = thrust::make_tuple(false, false); + output[idx] = thrust::make_tuple(false, nullify_invalid_rows); return; } @@ -221,9 +210,9 @@ std::tuple, std::unique_ptr, c stream, mr); - return {std::make_unique(std::move(is_null_or_empty), rmm::device_buffer{}, 0), - std::move(concat_strings->release().data), - delimiter}; + return {std::move(concat_strings->release().data), + delimiter, + std::make_unique(std::move(should_be_nullified), rmm::device_buffer{}, 0)}; } std::unique_ptr make_structs(std::vector const& children, @@ -254,13 +243,14 @@ std::unique_ptr make_structs(std::vector const& } // namespace detail -std::tuple, std::unique_ptr, char> concat_json( +std::tuple, char, std::unique_ptr> concat_json( cudf::strings_column_view const& input, + bool nullify_invalid_rows, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - return detail::concat_json(input, stream, mr); + return detail::concat_json(input, nullify_invalid_rows, stream, mr); } std::unique_ptr make_structs(std::vector const& children, diff --git a/src/main/cpp/src/json_utils.hpp b/src/main/cpp/src/json_utils.hpp index 5671a7329a..04530bed1b 100644 --- a/src/main/cpp/src/json_utils.hpp +++ b/src/main/cpp/src/json_utils.hpp @@ -32,15 +32,36 @@ std::unique_ptr from_json_to_raw_map( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); -std::tuple, std::unique_ptr, char> concat_json( - cudf::strings_column_view const& input, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - std::unique_ptr make_structs( std::vector const& input, cudf::column_view const& is_null, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); +/** + * @brief Concatenate the JSON objects given by a strings column into one single character buffer, + * in which each JSON objects is delimited by a special character that does not exist in the input. + * + * Beyond returning the concatenated buffer with delimiter, the function also returns a BOOL8 + * column indicating which rows should be nullified after parsing the concatenated buffer. Each + * row of this column is a `true` value if the corresponding input row is either empty, containing + * only whitespaces, or invalid JSON object depending on the `nullify_invalid_rows` parameter. + * + * Note that an invalid JSON object in this context is a string that does not start with the `{` + * character after whitespaces. + * + * @param input The strings column containing input JSON objects + * @param nullify_invalid_rows Whether to nullify rows containing invalid JSON objects + * @param stream The CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate device memory of the table in the returned + * @return A tuple containing the concatenated JSON objects as a single buffer, the delimiter + * character, and a BOOL8 column indicating which rows should be nullified after parsing + * the concatenated buffer + */ +std::tuple, char, std::unique_ptr> concat_json( + cudf::strings_column_view const& input, + bool nullify_invalid_rows = false, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + } // namespace spark_rapids_jni