Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to nullify empty lines #17028

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class json_reader_options {
// Normalize unquoted spaces and tabs
bool _normalize_whitespace = false;

bool _nullify_empty_lines = false;

// Whether to recover after an invalid JSON line
json_recovery_mode_t _recovery_mode = json_recovery_mode_t::FAIL;

Expand Down Expand Up @@ -313,6 +315,13 @@ class json_reader_options {
*/
[[nodiscard]] bool is_enabled_normalize_whitespace() const { return _normalize_whitespace; }

/**
* @brief Whether the reader should nullify empty lines for json lines format with recovery mode
*
* @returns true if the reader should nullify empty lines, false otherwise
*/
[[nodiscard]] bool is_nullify_empty_lines() const { return _nullify_empty_lines; }

/**
* @brief Queries the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -502,6 +511,14 @@ class json_reader_options {
*/
void enable_normalize_whitespace(bool val) { _normalize_whitespace = val; }

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
*/
void nullify_empty_lines(bool val) { _nullify_empty_lines = val; }

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down Expand Up @@ -779,6 +796,19 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set whether the reader should nullify empty lines for json lines format with recovery
* mode
*
* @param val Boolean value to indicate whether the reader should nullify empty lines
* @return this for chaining
*/
json_reader_options_builder& nullify_empty_lines(bool val)
{
options._nullify_empty_lines = val;
return *this;
}

/**
* @brief Specifies the JSON reader's behavior on invalid JSON lines.
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ void get_stack_context(device_span<SymbolT const> json_in,
*
* @param tokens The tokens to be post-processed
* @param token_indices The tokens' corresponding indices that are post-processed
* @param nullify_empty_lines Whether to nullify empty lines
* @param stream The cuda stream to dispatch GPU kernels to
* @return Returns the post-processed token stream
*/
CUDF_EXPORT
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream);

/**
Expand Down
42 changes: 26 additions & 16 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ using SymbolGroupT = uint8_t;
/**
* @brief Definition of the DFA's states
*/
enum class dfa_states : StateT { VALID, INVALID, NUM_STATES };
enum class dfa_states : StateT { START, VALID, INVALID, NUM_STATES };

// Aliases for readability of the transition table
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;
constexpr auto TT_START = dfa_states::START;
constexpr auto TT_INV = dfa_states::INVALID;
constexpr auto TT_VLD = dfa_states::VALID;

/**
* @brief Definition of the symbol groups
Expand Down Expand Up @@ -239,14 +240,17 @@ struct UnwrapTokenFromSymbolOp {
* invalid lines.
*/
struct TransduceToken {
bool nullify_empty_lines;
Comment on lines 241 to +242
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply any performance hit? Please run benchmark with this. If there is any slowdown, we probably need to make this as a template argument (with sacrificing compile time) so we can optimize the code out if it is false.

template <typename RelativeOffsetT, typename SymbolT>
constexpr CUDF_HOST_DEVICE SymbolT operator()(StateT const state_id,
SymbolGroupT const match_id,
RelativeOffsetT const relative_offset,
SymbolT const read_symbol) const
{
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));
bool const is_end_of_invalid_line =
(state_id == static_cast<StateT>(TT_INV) &&
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) &&
match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER));

if (is_end_of_invalid_line) {
Expand All @@ -266,14 +270,17 @@ struct TransduceToken {
constexpr int32_t num_inv_tokens = 2;

bool const is_delimiter = match_id == static_cast<SymbolGroupT>(dfa_symbol_group_id::DELIMITER);
bool const is_empty_invalid =
(nullify_empty_lines && state_id == static_cast<StateT>(TT_START));

// If state is either invalid or we're entering an invalid state, we discard tokens
bool const is_part_of_invalid_line =
(match_id != static_cast<SymbolGroupT>(dfa_symbol_group_id::ERROR) &&
state_id == static_cast<StateT>(TT_VLD));
(state_id == static_cast<StateT>(TT_VLD) or state_id == static_cast<StateT>(TT_START)));

// Indicates whether we transition from an invalid line to a potentially valid line
bool const is_end_of_invalid_line = (state_id == static_cast<StateT>(TT_INV) && is_delimiter);
bool const is_end_of_invalid_line =
((state_id == static_cast<StateT>(TT_INV) or is_empty_invalid) && is_delimiter);

int32_t const emit_count =
is_end_of_invalid_line ? num_inv_tokens : (is_part_of_invalid_line && !is_delimiter ? 1 : 0);
Expand All @@ -284,11 +291,12 @@ struct TransduceToken {
// Transition table
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE ERROR DELIM OTHER */
/* VALID */ {{TT_INV, TT_VLD, TT_VLD}},
/* INVALID */ {{TT_INV, TT_VLD, TT_INV}}}};
/* START */ {{TT_INV, TT_START, TT_VLD}},
/* VALID */ {{TT_INV, TT_START, TT_VLD}},
/* INVALID */ {{TT_INV, TT_START, TT_INV}}}};

// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_VLD);
constexpr auto start_state = static_cast<StateT>(TT_START);
} // namespace token_filter

// JSON to stack operator DFA (Deterministic Finite Automata)
Expand Down Expand Up @@ -1507,17 +1515,19 @@ void get_stack_context(device_span<SymbolT const> json_in,
std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> process_token_stream(
device_span<PdaTokenT const> tokens,
device_span<SymbolOffsetT const> token_indices,
bool nullify_empty_lines,
rmm::cuda_stream_view stream)
{
// Instantiate FST for post-processing the token stream to remove all tokens that belong to an
// invalid JSON line
token_filter::UnwrapTokenFromSymbolOp sgid_op{};
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(token_filter::TransduceToken{}),
stream);
using symbol_t = thrust::tuple<PdaTokenT, SymbolOffsetT>;
auto filter_fst =
fst::detail::make_fst(fst::detail::make_symbol_group_lut(token_filter::symbol_groups, sgid_op),
fst::detail::make_transition_table(token_filter::transition_table),
fst::detail::make_translation_functor<symbol_t, 0, 2>(
token_filter::TransduceToken{nullify_empty_lines}),
stream);

auto const mr = cudf::get_current_device_resource_ref();
rmm::device_scalar<SymbolOffsetT> d_num_selected_tokens(stream, mr);
Expand Down Expand Up @@ -1664,7 +1674,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
tokens.set_element(0, token_t::LineEnd, stream);
validate_token_stream(json_in, tokens, tokens_indices, options, stream);
auto [filtered_tokens, filtered_tokens_indices] =
process_token_stream(tokens, tokens_indices, stream);
process_token_stream(tokens, tokens_indices, options.is_nullify_empty_lines(), stream);
tokens = std::move(filtered_tokens);
tokens_indices = std::move(filtered_tokens_indices);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/io/json/nested_json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ TEST_F(JsonTest, PostProcessTokenStream)

// Run system-under-test
auto [d_filtered_tokens, d_filtered_indices] =
cuio_json::detail::process_token_stream(d_tokens, d_offsets, stream);
cuio_json::detail::process_token_stream(d_tokens, d_offsets, false, stream);

auto const filtered_tokens = cudf::detail::make_std_vector_async(d_filtered_tokens, stream);
auto const filtered_indices = cudf::detail::make_std_vector_async(d_filtered_indices, stream);
Expand Down
Loading