Skip to content

Commit

Permalink
Merge branch 'branch-24.06' of github.com:rapidsai/cudf into limit-py…
Browse files Browse the repository at this point in the history
…arrow-16.0
  • Loading branch information
jameslamb committed May 20, 2024
2 parents ff82c0e + 9ce1721 commit fa996d3
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 149 deletions.
45 changes: 45 additions & 0 deletions cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class json_reader_options {
bool _lines = false;
// Parse mixed types as a string column
bool _mixed_types_as_string = false;
// Delimiter separating records in JSON lines
char _delimiter = '\n';
// Prune columns on read, selected based on the _dtypes option
bool _prune_columns = false;

Expand Down Expand Up @@ -229,6 +231,13 @@ class json_reader_options {
return base_padding + num_columns * column_bytes;
}

/**
* @brief Returns delimiter separating records in JSON lines
*
* @return Delimiter separating records in JSON lines
*/
char get_delimiter() const { return _delimiter; }

/**
* @brief Whether to read the file as a json object per line.
*
Expand Down Expand Up @@ -340,6 +349,30 @@ class json_reader_options {
*/
void set_byte_range_size(size_type size) { _byte_range_size = size; }

/**
* @brief Set delimiter separating records in JSON lines
*
* @param delimiter Delimiter separating records in JSON lines
*/
void set_delimiter(char delimiter)
{
switch (delimiter) {
case '{':
case '[':
case '}':
case ']':
case ',':
case ':':
case '"':
case '\'':
case '\\':
case ' ':
case '\t':
case '\r': CUDF_FAIL("Unsupported delimiter character.", std::invalid_argument); break;
}
_delimiter = delimiter;
}

/**
* @brief Set whether to read the file as a json object per line.
*
Expand Down Expand Up @@ -507,6 +540,18 @@ class json_reader_options_builder {
return *this;
}

/**
* @brief Set delimiter separating records in JSON lines
*
* @param delimiter Delimiter separating records in JSON lines
* @return this for chaining
*/
json_reader_options_builder& delimiter(char delimiter)
{
options.set_delimiter(delimiter);
return *this;
}

/**
* @brief Set whether to read the file as a json object per line.
*
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ enum class stack_behavior_t : char {
PushPopWithoutReset,

/// Opening brackets and braces, [, {, push onto the stack, closing brackets and braces, ], }, pop
/// from the stack. Newline characters are considered delimiters and therefore reset to an empty
/// stack.
/// from the stack. Delimiter characters are passed when the stack context is constructed to
/// reset to an empty stack.
ResetOnDelimiter
};

Expand Down Expand Up @@ -198,11 +198,13 @@ namespace detail {
* within the context of a struct, a '[' represents that it is within the context of an array, and a
* '_' symbol that it is at the root of the JSON.
* @param[in] stack_behavior Specifies the stack's behavior
* @param[in] delimiter Specifies the delimiter to use as separator for JSON lines input
* @param[in] stream The cuda stream to dispatch GPU kernels to
*/
void get_stack_context(device_span<SymbolT const> json_in,
SymbolT* d_top_of_stack,
stack_behavior_t stack_behavior,
SymbolT delimiter,
rmm::cuda_stream_view stream);

/**
Expand Down
144 changes: 84 additions & 60 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ constexpr auto NUM_SYMBOL_GROUPS = static_cast<uint32_t>(dfa_symbol_group_id::NU
* @brief Function object to map (input_symbol,stack_context) tuples to a symbol group.
*/
struct SymbolPairToSymbolGroupId {
SymbolT delimiter = '\n';
CUDF_HOST_DEVICE SymbolGroupT operator()(thrust::tuple<SymbolT, StackSymbolT> symbol) const
{
auto const input_symbol = thrust::get<0>(symbol);
auto const stack_symbol = thrust::get<1>(symbol);
return static_cast<SymbolGroupT>(
input_symbol == '\n'
input_symbol == delimiter
? dfa_symbol_group_id::DELIMITER
: (stack_symbol == '_' ? dfa_symbol_group_id::ROOT : dfa_symbol_group_id::OTHER));
}
Expand Down Expand Up @@ -331,50 +332,72 @@ enum class dfa_symbol_group_id : uint8_t {
CLOSING_BRACKET, ///< Closing bracket SG: ]
QUOTE_CHAR, ///< Quote character SG: "
ESCAPE_CHAR, ///< Escape character SG: '\'
NEWLINE_CHAR, ///< Newline character SG: '\n'
DELIMITER_CHAR, ///< Delimiter character SG
OTHER_SYMBOLS, ///< SG implicitly matching all other characters
NUM_SYMBOL_GROUPS ///< Total number of symbol groups
};

constexpr auto TT_NUM_STATES = static_cast<StateT>(dfa_states::TT_NUM_STATES);
constexpr auto NUM_SYMBOL_GROUPS = static_cast<uint32_t>(dfa_symbol_group_id::NUM_SYMBOL_GROUPS);

// The i-th string representing all the characters of a symbol group
std::array<std::string, NUM_SYMBOL_GROUPS - 1> const symbol_groups{
{{"{"}, {"["}, {"}"}, {"]"}, {"\""}, {"\\"}, {"\n"}}};
// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_OOS);

// Transition table for the default JSON and JSON lines formats
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}};

// Transition table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_transition_table{
template <typename SymbolT>
auto get_sgid_lut(SymbolT delim)
{
// The i-th string representing all the characters of a symbol group
std::array<std::vector<SymbolT>, NUM_SYMBOL_GROUPS - 1> symbol_groups{
{{'{'}, {'['}, {'}'}, {']'}, {'"'}, {'\\'}, {delim}}};

return symbol_groups;
}

auto get_transition_table(stack_behavior_t stack_behavior)
{
// Transition table for the default JSON and JSON lines formats
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}};

// Translation table for the default JSON and JSON lines formats
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const translation_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}};

// Translation table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_translation_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}};
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_STR, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR}}}};

// Transition table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<dfa_states, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_transition_table{
{/* IN_STATE { [ } ] " \ \n OTHER */
/* TT_OOS */ {{TT_OOS, TT_OOS, TT_OOS, TT_OOS, TT_STR, TT_OOS, TT_OOS, TT_OOS}},
/* TT_STR */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_ESC, TT_OOS, TT_STR}},
/* TT_ESC */ {{TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_STR, TT_OOS, TT_STR}}}};

// Transition table specialized on the choice of whether to reset on newlines
return (stack_behavior == stack_behavior_t::ResetOnDelimiter) ? resetting_transition_table
: transition_table;
}

auto get_translation_table(stack_behavior_t stack_behavior)
{
// Translation table for the default JSON and JSON lines formats
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
translation_table{
{/* IN_STATE { [ } ] " \ <delim> OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {}, {}}}}};

// Translation table for the JSON lines format that recovers from invalid JSON lines
std::array<std::array<std::vector<char>, NUM_SYMBOL_GROUPS>, TT_NUM_STATES> const
resetting_translation_table{
{/* IN_STATE { [ } ] " \ <delim> OTHER */
/* TT_OOS */ {{{'{'}, {'['}, {'}'}, {']'}, {}, {}, {'\n'}, {}}},
/* TT_STR */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}},
/* TT_ESC */ {{{}, {}, {}, {}, {}, {}, {'\n'}, {}}}}};

// Translation table specialized on the choice of whether to reset on newlines
return stack_behavior == stack_behavior_t::ResetOnDelimiter ? resetting_translation_table
: translation_table;
}

// The DFA's starting state
constexpr auto start_state = static_cast<StateT>(TT_OOS);
} // namespace to_stack_op

// JSON tokenizer pushdown automaton
Expand Down Expand Up @@ -572,6 +595,7 @@ static __constant__ PdaSymbolGroupIdT tos_sg_to_pda_sgid[] = {
* visibly pushdown automaton (DVPA)
*/
struct PdaSymbolToSymbolGroupId {
SymbolT delimiter = '\n';
template <typename SymbolT, typename StackSymbolT>
__device__ __forceinline__ PdaSymbolGroupIdT
operator()(thrust::tuple<SymbolT, StackSymbolT> symbol_pair) const
Expand All @@ -593,8 +617,15 @@ struct PdaSymbolToSymbolGroupId {
// The relative symbol group id of the current input symbol
constexpr auto pda_sgid_lookup_size =
static_cast<int32_t>(sizeof(tos_sg_to_pda_sgid) / sizeof(tos_sg_to_pda_sgid[0]));
// We map the delimiter character to LINE_BREAK symbol group id, and the newline character
// to OTHER. Note that delimiter cannot be any of opening(closing) brace, bracket, quote,
// escape, comma, colon or whitespace characters.
auto const symbol_position =
symbol == delimiter
? static_cast<int32_t>('\n')
: (symbol == '\n' ? static_cast<int32_t>(delimiter) : static_cast<int32_t>(symbol));
PdaSymbolGroupIdT symbol_gid =
tos_sg_to_pda_sgid[min(static_cast<int32_t>(symbol), pda_sgid_lookup_size - 1)];
tos_sg_to_pda_sgid[min(symbol_position, pda_sgid_lookup_size - 1)];
return stack_idx * static_cast<PdaSymbolGroupIdT>(symbol_group_id::NUM_PDA_INPUT_SGS) +
symbol_gid;
}
Expand Down Expand Up @@ -1398,6 +1429,7 @@ namespace detail {
void get_stack_context(device_span<SymbolT const> json_in,
SymbolT* d_top_of_stack,
stack_behavior_t stack_behavior,
SymbolT delimiter,
rmm::cuda_stream_view stream)
{
check_input_size(json_in.size());
Expand All @@ -1423,20 +1455,11 @@ void get_stack_context(device_span<SymbolT const> json_in,
constexpr auto max_translation_table_size =
to_stack_op::NUM_SYMBOL_GROUPS * to_stack_op::TT_NUM_STATES;

// Transition table specialized on the choice of whether to reset on newlines
const auto transition_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter)
? to_stack_op::resetting_transition_table
: to_stack_op::transition_table;

// Translation table specialized on the choice of whether to reset on newlines
const auto translation_table = (stack_behavior == stack_behavior_t::ResetOnDelimiter)
? to_stack_op::resetting_translation_table
: to_stack_op::translation_table;

auto json_to_stack_ops_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lut(to_stack_op::symbol_groups),
fst::detail::make_transition_table(transition_table),
fst::detail::make_translation_table<max_translation_table_size>(translation_table),
fst::detail::make_symbol_group_lut(to_stack_op::get_sgid_lut(delimiter)),
fst::detail::make_transition_table(to_stack_op::get_transition_table(stack_behavior)),
fst::detail::make_translation_table<max_translation_table_size>(
to_stack_op::get_translation_table(stack_behavior)),
stream);

// "Search" for relevant occurrence of brackets and braces that indicate the beginning/end
Expand Down Expand Up @@ -1539,16 +1562,16 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
// Range of encapsulating function that parses to internal columnar data representation
CUDF_FUNC_RANGE();

auto const new_line_delimited_json = options.is_enabled_lines();
auto const delimited_json = options.is_enabled_lines();
auto const delimiter = options.get_delimiter();

// (!new_line_delimited_json) => JSON
// (new_line_delimited_json and recover_from_error) => JSON_LINES_RECOVER
// (new_line_delimited_json and !recover_from_error) => JSON_LINES
auto format = new_line_delimited_json
? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL
? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER
: tokenizer_pda::json_format_cfg_t::JSON_LINES)
: tokenizer_pda::json_format_cfg_t::JSON;
// (!delimited_json) => JSON
// (delimited_json and recover_from_error) => JSON_LINES_RECOVER
// (delimited_json and !recover_from_error) => JSON_LINES
auto format = delimited_json ? (options.recovery_mode() == json_recovery_mode_t::RECOVER_WITH_NULL
? tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER
: tokenizer_pda::json_format_cfg_t::JSON_LINES)
: tokenizer_pda::json_format_cfg_t::JSON;

// Prepare for PDA transducer pass, merging input symbols with stack symbols
auto const recover_from_error = (format == tokenizer_pda::json_format_cfg_t::JSON_LINES_RECOVER);
Expand All @@ -1559,7 +1582,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
// Identify what is the stack context for each input character (JSON-root, struct, or list)
auto const stack_behavior =
recover_from_error ? stack_behavior_t::ResetOnDelimiter : stack_behavior_t::PushPopWithoutReset;
get_stack_context(json_in, stack_symbols.data(), stack_behavior, stream);
get_stack_context(json_in, stack_symbols.data(), stack_behavior, delimiter, stream);

// Input to the full pushdown automaton finite-state transducer, where a input symbol comprises
// the combination of a character from the JSON input together with the stack context for that
Expand All @@ -1573,7 +1596,7 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
if (recover_from_error) {
auto fix_stack_of_excess_chars = fst::detail::make_fst(
fst::detail::make_symbol_group_lookup_op(
fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{}),
fix_stack_of_excess_chars::SymbolPairToSymbolGroupId{delimiter}),
fst::detail::make_transition_table(fix_stack_of_excess_chars::transition_table),
fst::detail::make_translation_functor(fix_stack_of_excess_chars::TransduceInputOp{}),
stream);
Expand All @@ -1592,8 +1615,9 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
constexpr auto max_translation_table_size =
tokenizer_pda::NUM_PDA_SGIDS *
static_cast<tokenizer_pda::StateT>(tokenizer_pda::pda_state_t::PD_NUM_STATES);

auto json_to_tokens_fst = fst::detail::make_fst(
fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{}),
fst::detail::make_symbol_group_lookup_op(tokenizer_pda::PdaSymbolToSymbolGroupId{delimiter}),
fst::detail::make_transition_table(tokenizer_pda::get_transition_table(format)),
fst::detail::make_translation_table<max_translation_table_size>(
tokenizer_pda::get_translation_table(recover_from_error)),
Expand Down
Loading

0 comments on commit fa996d3

Please sign in to comment.