Skip to content

Commit

Permalink
Fix List offsets bug in mixed type list column in nested JSON reader (#…
Browse files Browse the repository at this point in the history
…12447)

Fixes the bug in list offsets in mixed types - string/value type with struct type.

In nested JSON reader, following json string
`[{"a":[123, {"0": 123}], "b":1.0}, {"b":1.1}, {"b":2.1}]`
should produce first column as

```
List<Struct<float,>>:
Length : 3
Offsets : 0, 2, 2, 2
Null count: 2
001
   Struct<float,>:
   Length : 2:
   Null count: 1
   10
      10
      NULL, 123
```

Depends on  #12330
closes #12418

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Bradley Dice (https://github.com/bdice)

URL: #12447
  • Loading branch information
karthikeyann authored Jan 11, 2023
1 parent 6dda9d8 commit f990f93
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 51 deletions.
169 changes: 119 additions & 50 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/io/detail/data_casting.cuh>
Expand All @@ -36,6 +37,7 @@
#include <thrust/count.h>
#include <thrust/for_each.h>
#include <thrust/functional.h>
#include <thrust/gather.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
Expand Down Expand Up @@ -195,6 +197,45 @@ reduce_to_column_tree(tree_meta_t& tree,
: col_ids[parent_node_id];
});

// Mixed types in List children go to different columns,
// so all immediate children of list column should have same max_row_offsets.
// create list's children max_row_offsets array. (initialize to zero)
// atomicMax on children max_row_offsets array.
// gather the max_row_offsets from children row offset array.
{
rmm::device_uvector<NodeIndexT> list_parents_children_max_row_offsets(num_columns, stream);
thrust::fill(rmm::exec_policy(stream),
list_parents_children_max_row_offsets.begin(),
list_parents_children_max_row_offsets.end(),
0);
thrust::for_each(rmm::exec_policy(stream),
unique_col_ids.begin(),
unique_col_ids.end(),
[column_categories = column_categories.begin(),
parent_col_ids = parent_col_ids.begin(),
max_row_offsets = max_row_offsets.begin(),
list_parents_children_max_row_offsets =
list_parents_children_max_row_offsets.begin()] __device__(auto col_id) {
auto parent_col_id = parent_col_ids[col_id];
if (parent_col_id != parent_node_sentinel and
column_categories[parent_col_id] == node_t::NC_LIST) {
atomicMax(list_parents_children_max_row_offsets + parent_col_id,
max_row_offsets[col_id]);
}
});
thrust::gather_if(
rmm::exec_policy(stream),
parent_col_ids.begin(),
parent_col_ids.end(),
parent_col_ids.begin(),
list_parents_children_max_row_offsets.begin(),
max_row_offsets.begin(),
[column_categories = column_categories.begin()] __device__(size_type parent_col_id) {
return parent_col_id != parent_node_sentinel and
column_categories[parent_col_id] == node_t::NC_LIST;
});
}

// copy lists' max_row_offsets to children.
// all structs should have same size.
thrust::transform_if(
Expand Down Expand Up @@ -465,47 +506,7 @@ void make_device_json_column(device_span<SymbolT const> input,
auto d_ignore_vals = cudf::detail::make_device_uvector_async(ignore_vals, stream);
auto d_columns_data = cudf::detail::make_device_uvector_async(columns_data, stream);

// 3. scatter List offset
// pre-condition: {node_id} is already sorted by {col_id}
// unique_copy_by_key {parent_node_id} {row_offset} to
// col[parent_col_id].child_offsets[row_offset[parent_node_id]]

auto ordered_parent_node_ids =
thrust::make_permutation_iterator(tree.parent_node_ids.begin(), node_ids.begin());
auto ordered_row_offsets =
thrust::make_permutation_iterator(row_offsets.begin(), node_ids.begin());
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::counting_iterator<size_type>(0),
num_nodes,
[num_nodes,
ordered_parent_node_ids,
ordered_row_offsets,
col_ids = col_ids.begin(),
sorted_col_ids = sorted_col_ids.begin(),
row_offsets = row_offsets.begin(),
node_categories = tree.node_categories.begin(),
d_columns_data = d_columns_data.begin()] __device__(size_type i) {
auto parent_node_id = ordered_parent_node_ids[i];
if (parent_node_id != parent_node_sentinel and node_categories[parent_node_id] == NC_LIST) {
// unique item
if (i == 0 or (sorted_col_ids[i - 1] != sorted_col_ids[i] or
ordered_parent_node_ids[i - 1] != parent_node_id)) {
// scatter to list_offset
d_columns_data[col_ids[parent_node_id]].child_offsets[row_offsets[parent_node_id]] =
ordered_row_offsets[i];
}
// TODO: verify if this code is right. check with more test cases.
if (i == num_nodes - 1 or (sorted_col_ids[i] != sorted_col_ids[i + 1] or
ordered_parent_node_ids[i + 1] != parent_node_id)) {
// last value of list child_offset is its size.
d_columns_data[col_ids[parent_node_id]].child_offsets[row_offsets[parent_node_id] + 1] =
ordered_row_offsets[i] + 1;
}
}
});

// 4. scatter string offsets to respective columns, set validity bits
// 3. scatter string offsets to respective columns, set validity bits
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::counting_iterator<size_type>(0),
Expand All @@ -531,6 +532,65 @@ void make_device_json_column(device_span<SymbolT const> input,
}
});

// 4. scatter List offset
// copy_if only node's whose parent is list, (node_id, parent_col_id)
// stable_sort by parent_col_id of {node_id}.
// For all unique parent_node_id of (i==0, i-1!=i), write start offset.
// (i==last, i+1!=i), write end offset.
// unique_copy_by_key {parent_node_id} {row_offset} to
// col[parent_col_id].child_offsets[row_offset[parent_node_id]]

auto& parent_col_ids = sorted_col_ids; // reuse sorted_col_ids
auto parent_col_id = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0),
[col_ids = col_ids.begin(),
parent_node_ids = tree.parent_node_ids.begin()] __device__(size_type node_id) {
return parent_node_ids[node_id] == parent_node_sentinel ? parent_node_sentinel
: col_ids[parent_node_ids[node_id]];
});
auto const list_children_end = thrust::copy_if(
rmm::exec_policy(stream),
thrust::make_zip_iterator(thrust::make_counting_iterator<size_type>(0), parent_col_id),
thrust::make_zip_iterator(thrust::make_counting_iterator<size_type>(0), parent_col_id) +
num_nodes,
thrust::make_counting_iterator<size_type>(0),
thrust::make_zip_iterator(node_ids.begin(), parent_col_ids.begin()),
[node_categories = tree.node_categories.begin(),
parent_node_ids = tree.parent_node_ids.begin()] __device__(size_type node_id) {
auto parent_node_id = parent_node_ids[node_id];
return parent_node_id != parent_node_sentinel and node_categories[parent_node_id] == NC_LIST;
});

auto const num_list_children =
list_children_end - thrust::make_zip_iterator(node_ids.begin(), parent_col_ids.begin());
thrust::stable_sort_by_key(rmm::exec_policy(stream),
parent_col_ids.begin(),
parent_col_ids.begin() + num_list_children,
node_ids.begin());
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
num_list_children,
[node_ids = node_ids.begin(),
parent_node_ids = tree.parent_node_ids.begin(),
parent_col_ids = parent_col_ids.begin(),
row_offsets = row_offsets.begin(),
d_columns_data = d_columns_data.begin(),
num_list_children] __device__(size_type i) {
auto const node_id = node_ids[i];
auto const parent_node_id = parent_node_ids[node_id];
// scatter to list_offset
if (i == 0 or parent_node_ids[node_ids[i - 1]] != parent_node_id) {
d_columns_data[parent_col_ids[i]].child_offsets[row_offsets[parent_node_id]] =
row_offsets[node_id];
}
// last value of list child_offset is its size.
if (i == num_list_children - 1 or parent_node_ids[node_ids[i + 1]] != parent_node_id) {
d_columns_data[parent_col_ids[i]].child_offsets[row_offsets[parent_node_id] + 1] =
row_offsets[node_id] + 1;
}
});

// 5. scan on offsets.
for (auto& [id, col_ref] : columns) {
auto& col = col_ref.get();
Expand Down Expand Up @@ -691,7 +751,12 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
auto [child_column, names] =
json_col.child_columns.empty()
? std::pair<std::unique_ptr<column>,
std::vector<column_name_info>>{std::make_unique<column>(), {}}
// EMPTY type could not used because gather throws exception on EMPTY type.
std::vector<column_name_info>>{std::make_unique<column>(
data_type{type_id::INT8},
0,
rmm::device_buffer{0, stream, mr}),
{}}
: device_json_column_to_cudf_column(
json_col.child_columns.begin()->second,
d_input,
Expand All @@ -701,14 +766,18 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_co
mr);
column_names.back().children = names;
auto [result_bitmask, null_count] = make_validity(json_col);
return {make_lists_column(num_rows,
std::move(offsets_column),
std::move(child_column),
null_count,
std::move(result_bitmask),
stream,
mr),
std::move(column_names)};
auto ret_col = make_lists_column(num_rows,
std::move(offsets_column),
std::move(child_column),
0,
rmm::device_buffer{0, stream, mr},
stream,
mr);
// The null_mask is set after creation of list column is to skip the purge_nonempty_nulls and
// null validation applied in make_lists_column factory, which is not needed for json
// parent column cannot be null when its children is non-empty in JSON
ret_col->set_null_mask(std::move(result_bitmask), null_count);
return {std::move(ret_col), std::move(column_names)};
}
default: CUDF_FAIL("Unsupported column type"); break;
}
Expand Down
13 changes: 12 additions & 1 deletion cpp/tests/io/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ TEST_P(JsonReaderParamTest, JsonDtypeSchema)
cudf::test::strings_column_wrapper({"aa ", " bbb"}));
}

TEST_F(JsonReaderTest, DISABLED_JsonNestedDtypeSchema)
TEST_F(JsonReaderTest, JsonNestedDtypeSchema)
{
std::string json_string = R"( [{"a":[123, {"0": 123}], "b":1.0}, {"b":1.1}, {"b":2.1}])";

Expand Down Expand Up @@ -1512,10 +1512,21 @@ TEST_F(JsonReaderTest, DISABLED_JsonNestedDtypeSchema)
// Verify column "b" is an int column
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::INT32);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0).child(0), int_wrapper{{0, 2, 2, 2}});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0).child(1).child(0),
float_wrapper{{0.0, 123.0}, {false, true}});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1),
int_wrapper{{1, 1, 2}, {true, true, true}});
// List column expected
auto leaf_child = float_wrapper{{0.0, 123.0}, {false, true}};
auto const validity = {1, 0, 0};
auto expected = cudf::make_lists_column(
3,
int_wrapper{{0, 2, 2, 2}}.release(),
cudf::test::structs_column_wrapper{{leaf_child}, {false, true}}.release(),
2,
cudf::test::detail::make_null_mask(validity.begin(), validity.end()));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), *expected);
}

TEST_P(JsonReaderParamTest, JsonDtypeParsing)
Expand Down
132 changes: 132 additions & 0 deletions python/cudf/cudf/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,3 +1006,135 @@ def test_json_round_trip_gzip():
fo.seek(loc)
written_df = cudf.read_json(fo, orient="records", lines=True)
assert_eq(written_df, df)


@pytest.mark.parametrize(
"jsonl_string",
[
# simple list with mixed types
"""{"a":[123, {}], "b":1.1}""",
"""{"a":[123, {"0": 123}], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":[{"0": 123}, 123], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":[123, {"0": 123}, 12.3], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":[123, {"0": 123}, null], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":["123", {"0": 123}], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":[{"0": 123}, "123"], "b":1.0}\n {"b":1.1}\n {"b":2.1}""",
"""{"a":["123", {"0": 123}, "123"], "b":1.0}\n {"b":1.1}""",
"""{"a":[123]}\n {"a":[{"0": 123}], "b":1.0}\n {"b":1.1}""",
"""{"a":[{"0": 123}]}\n {"a":[123], "b":1.0}\n {"b":1.1}""",
"""{"a":[{"0": 123}]}\n {"a": []}\n {"a":[123], "b":1.0}\n{"b":1.1}""",
"""{"b":1.0, "a":[{"0": 123}]}\n {"a":[123]}\n {"b":1.1}\n{"a": []}""",
"""{"a": []}\n {"a":[{"0": 123}]}\n {"a":[123], "b":1.0}\n{"b":1.1}""",
"""{"a": []}\n {"a":[123], "b":1.0}\n {"a":[{"0": 123}]}\n{"b":1.1}""",
# nested list with mixed types
"""{"a":[123, [{"0": 123}, {}]], "b":1.0}
{"b":1.1}
{"a":[]}
{"a":[123]}
{"a":[[123], []]}""",
"""{"a":[], "b":1.0}
{"a":[[[456]]]}
{"a":[[123]]}
{"a":[123]}""",
"""{"a":[123], "b":1.0}
{"b":1.1}
{"b":2.1}
{"a":[[[[[[]]]]]]}""",
"""{"a":[123], "b":1.0}
{"a":[[[[[[]]]]]]}
{"a":[[[[[[]]]]], [[[[[]]]]]]}
{"a":[[[[[[]]]], [[[[]]]]]]}
{"a":[[[[[[]]], [[[]]]]]]}
{"a":[[[[[[]], [[]]]]]]}
{"a":[[[[[[], 123, []]]]]]}""",
# mixed elements in multiple columns
"""{"a":[123, {"0": 123}], "b":1.0}
{"c": ["abc"], "b":1.1}
{"c": ["abc", []] }""",
],
)
def test_json_nested_mixed_types_in_list(jsonl_string):
# utility function for this test:
# replace list elements with None if it has dict and non-dict (ignore None)
def _replace_in_list(list_to_replace, replace_items):
return [
_replace_in_list(x, replace_items)
if isinstance(x, list)
else None
if x in replace_items
else x
for x in list_to_replace
]

def _replace_with_nulls(df, replace_items):
for col in df.columns:
if df[col].dtype == "object":
df[col] = df[col].apply(
lambda x: _replace_in_list(x, replace_items)
if isinstance(x, list)
else x
)
return df

# both json lines and json string tested.
json_string = "[" + jsonl_string.replace("\n", ",") + "]"
pdf = pd.read_json(jsonl_string, orient="records", lines=True)
pdf2 = pd.read_json(json_string, orient="records", lines=False)
assert_eq(pdf, pdf2)
# replace list elements with None if it has dict and non-dict
# in above test cases, these items are mixed with dict/list items
# so, replace them with None.
pdf = _replace_with_nulls(pdf, [123, "123", 12.3, "abc"])
gdf = cudf.read_json(
StringIO(jsonl_string),
engine="cudf_experimental",
orient="records",
lines=True,
)
gdf2 = cudf.read_json(
StringIO(json_string),
engine="cudf_experimental",
orient="records",
lines=False,
)
if """[{"0": 123}, {}]""" not in jsonl_string:
# {} in pandas is represented as {"0": None} in cudf
assert_eq(gdf, pdf)
assert_eq(gdf2, pdf)
pa_table_pdf = pa.Table.from_pandas(
pdf, schema=gdf.to_arrow().schema, safe=False
)
assert gdf.to_arrow().equals(pa_table_pdf)
assert gdf2.to_arrow().equals(pa_table_pdf)


@pytest.mark.parametrize(
"jsonl_string",
[
# mixed type in list (in different order)
"""{"a":[[{"0": 123}, {}], {"1": 321}], "b":1.0}""",
"""{"a":[{"1": 321}, [{"0": 123}, {}], ], "b":1.0}""",
"""{"a":[123, [{"0": 123}, {}], {"1": 321}], "b":1.0}""",
"""{"a":[null, [{"0": 123}, {}], {"1": 321}], "b":1.0}""",
# mixed type in struct (in different order)
"""{"a": {"b": {"0": 123}, "c": {"1": 321}}, "d":1.0}
{"a": {"b": {"0": 123}, "c": [123, 123]}, "d":1.0}""",
"""{"a": {"b": {"0": 123}, "c": [123, 123]}, "d":1.0}
{"a": {"b": {"0": 123}, "c": {"1": 321}}, "d":1.0}""",
"""{"a": {"b": {"0": 123}, "c": null}, "d":1.0}
{"a": {"b": {"0": 123}, "c": {"1": 321}}, "d":1.0}
{"a": {"b": {"0": 123}, "c": [123, 123]}, "d":1.0}""",
"""{"a": {"b": {"0": 123}, "c": 123}, "d":1.0}
{"a": {"b": {"0": 123}, "c": {"1": 321}}, "d":1.0}
{"a": {"b": {"0": 123}, "c": [123, 123]}, "d":1.0}""",
],
)
def test_json_nested_mixed_types_error(jsonl_string):
# mixing list and struct should raise an exception
with pytest.raises(RuntimeError):
cudf.read_json(
StringIO(jsonl_string),
engine="cudf_experimental",
orient="records",
lines=True,
)

0 comments on commit f990f93

Please sign in to comment.