Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in recovering invalid lines in JSONL inputs #17098

Merged
merged 41 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9ff3129
add option to nullify empty lines
karthikeyann Oct 9, 2024
624743b
printf debugging
shrshi Oct 11, 2024
bcecb25
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 14, 2024
f9b7e08
Merge branch 'enh-json_nullify_empty_lines' into json-quote-char-pars…
shrshi Oct 15, 2024
55c13a0
added test; fixed small bug in nullifying empty rows
shrshi Oct 16, 2024
9d2a2f0
formatting
shrshi Oct 16, 2024
3d0a51d
removing from modifications to dfa
shrshi Oct 16, 2024
911e065
remove hardcoding of delimiter
shrshi Oct 16, 2024
ab7659b
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
karthikeyann Oct 17, 2024
0ef5108
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 18, 2024
1dffbf0
Merge branch 'enh-json_nullify_empty_lines' of github.com:karthikeyan…
shrshi Oct 18, 2024
293521f
Update cpp/tests/io/json/json_test.cpp
shrshi Oct 21, 2024
ca8ee32
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
ttnghia Oct 21, 2024
ebc5275
pre-process concat
shrshi Oct 21, 2024
679833b
formatting
shrshi Oct 21, 2024
b192fd2
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 21, 2024
31d5cab
some logic fixes
shrshi Oct 22, 2024
7c3e0f0
formatting
shrshi Oct 22, 2024
35b7177
test
shrshi Oct 22, 2024
9370dc5
formatting
shrshi Oct 22, 2024
6d87031
test cleanup
shrshi Oct 22, 2024
b9005ae
formatting
shrshi Oct 22, 2024
4382ef8
pr reviews
shrshi Oct 22, 2024
f75d8ee
formatting
shrshi Oct 22, 2024
bb9584e
formatting fix
shrshi Oct 22, 2024
6ad06ca
Merge branch 'branch-24.12' into enh-json_nullify_empty_lines
shrshi Oct 22, 2024
424f90f
pr reviews
shrshi Oct 24, 2024
8b48297
Merge branch 'enh-json_nullify_empty_lines' of github.com:karthikeyan…
shrshi Oct 24, 2024
f651087
merge
shrshi Oct 24, 2024
dfba4cd
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 24, 2024
eb82450
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
d3193e3
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
18f1a6e
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
96dce9d
pr reviews
shrshi Oct 29, 2024
f8c5de3
formatting
shrshi Oct 29, 2024
c0d0b3e
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 29, 2024
234c19d
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
77b2f99
oops, undoing accidental merge
shrshi Oct 29, 2024
2e37ed4
Merge branch 'json-quote-char-parsing-fix' of github.com:shrshi/cudf …
shrshi Oct 29, 2024
3784be9
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 29, 2024
f351242
Merge branch 'branch-24.12' into json-quote-char-parsing-fix
shrshi Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions cpp/src/io/json/read_json.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,19 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

std::size_t const total_source_size = sources_size(sources, 0, 0);
auto constexpr num_delimiter_chars = 1;
auto const num_extra_delimiters = num_delimiter_chars * (sources.size() - 1);
auto const delimiter = reader_opts.get_delimiter();
auto const num_extra_delimiters = num_delimiter_chars * sources.size();
compression_type const reader_compression = reader_opts.get_compression();
std::size_t const chunk_offset = reader_opts.get_byte_range_offset();
std::size_t chunk_size = reader_opts.get_byte_range_size();

CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset,
"Invalid offsetting",
std::invalid_argument);
auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size;
auto should_load_till_last_source = !chunk_size || chunk_size >= total_source_size - chunk_offset;
chunk_size = should_load_till_last_source ? total_source_size - chunk_offset : chunk_size;

int num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced;
int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced;
std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size);

// The allocation for single source compressed input is estimated by assuming a ~4:1
Expand All @@ -155,17 +156,17 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(

// Offset within buffer indicating first read position
std::int64_t buffer_offset = 0;
auto readbufspan =
ingest_raw_input(bufspan, sources, reader_compression, chunk_offset, chunk_size, stream);
auto readbufspan = ingest_raw_input(
bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream);

auto const shift_for_nonzero_offset = std::min<std::int64_t>(chunk_offset, 1);
auto const first_delim_pos =
chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, '\n', stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean it was a long-standing bug until now? Since we already supported customized delimiter for a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this has been bug until now. I suspect that when we enable recover_with_null, the FST removing excess characters after the delimiter in each line fixes the error in partial lines read due to the hard-coded \n delimiter, preventing us from encountering an error. But I think this bug would have caused lines in the input spanning byte ranges to be skipped.
Also, if the size of the input file is less than 2GB and we always read the whole file i.e. not in byte ranges, then again we would not encounter this bug.

chunk_offset == 0 ? 0 : find_first_delimiter(readbufspan, delimiter, stream);
if (first_delim_pos == -1) {
// return empty owning datasource buffer
auto empty_buf = rmm::device_buffer(0, stream);
return datasource::owning_buffer<rmm::device_buffer>(std::move(empty_buf));
} else if (!should_load_all_sources) {
} else if (!should_load_till_last_source) {
// Find next delimiter
std::int64_t next_delim_pos = -1;
std::size_t next_subchunk_start = chunk_offset + chunk_size;
Expand All @@ -180,14 +181,15 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reader_compression,
next_subchunk_start,
size_per_subchunk,
delimiter,
stream);
next_delim_pos = find_first_delimiter(readbufspan, '\n', stream) + buffer_offset;
next_delim_pos = find_first_delimiter(readbufspan, delimiter, stream) + buffer_offset;
next_subchunk_start += size_per_subchunk;
}
if (next_delim_pos < buffer_offset) {
if (next_subchunk_start >= total_source_size) {
// If we have reached the end of source list but the source does not terminate with a
// newline character
// delimiter character
next_delim_pos = buffer_offset + readbufspan.size();
} else {
// Our buffer_size estimate is insufficient to read until the end of the line! We need to
Expand All @@ -209,10 +211,37 @@ datasource::owning_buffer<rmm::device_buffer> get_record_range_raw_input(
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
next_delim_pos - first_delim_pos - shift_for_nonzero_offset);
}

// Add delimiter to end of buffer iff
// (i) We are reading till the end of the last source i.e. should_load_till_last_source is
// true (ii) The last character in bufspan is not delimiter.
// For (ii) in the case of Spark, if the last character is not a delimiter, it could be the case
// that there are characters after the delimiter in the last record. We then consider those
// characters to be a part of a new (possibly empty) line.
size_t num_chars = readbufspan.size() - first_delim_pos - shift_for_nonzero_offset;
if (num_chars) {
char last_char;
CUDF_CUDA_TRY(cudaMemcpyAsync(&last_char,
reinterpret_cast<char*>(buffer.data()) + readbufspan.size() - 1,
sizeof(char),
cudaMemcpyDeviceToHost,
stream.value()));
stream.synchronize();
shrshi marked this conversation as resolved.
Show resolved Hide resolved
if (last_char != delimiter) {
last_char = delimiter;
CUDF_CUDA_TRY(cudaMemcpyAsync(reinterpret_cast<char*>(buffer.data()) + readbufspan.size(),
&last_char,
sizeof(char),
cudaMemcpyHostToDevice,
stream.value()));
num_chars++;
}
}

return datasource::owning_buffer<rmm::device_buffer>(
std::move(buffer),
reinterpret_cast<uint8_t*>(buffer.data()) + first_delim_pos + shift_for_nonzero_offset,
readbufspan.size() - first_delim_pos - shift_for_nonzero_offset);
num_chars);
}

// Helper function to read the current batch using byte range offsets and size
Expand All @@ -223,6 +252,7 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();

datasource::owning_buffer<rmm::device_buffer> bufview =
get_record_range_raw_input(sources, reader_opts, stream);

Expand All @@ -235,6 +265,7 @@ table_with_metadata read_batch(host_span<std::unique_ptr<datasource>> sources,
auto buffer =
cudf::device_span<char const>(reinterpret_cast<char const*>(bufview.data()), bufview.size());
stream.synchronize();

return device_parse_nested_json(buffer, reader_opts, stream, mr);
}

Expand All @@ -245,6 +276,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
std::size_t range_offset,
std::size_t range_size,
char delimiter,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
Expand Down Expand Up @@ -296,7 +328,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
if (sources.size() > 1) {
static_assert(num_delimiter_chars == 1,
"Currently only single-character delimiters are supported");
auto const delimiter_source = thrust::make_constant_iterator('\n');
auto const delimiter_source = thrust::make_constant_iterator(delimiter);
auto const d_delimiter_map = cudf::detail::make_device_uvector_async(
delimiter_map, stream, cudf::get_current_device_resource_ref());
thrust::scatter(rmm::exec_policy_nosync(stream),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/json/read_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ device_span<char> ingest_raw_input(device_span<char> buffer,
compression_type compression,
size_t range_offset,
size_t range_size,
char delimiter,
rmm::cuda_stream_view stream);

/**
Expand Down
24 changes: 24 additions & 0 deletions cpp/tests/io/json/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cudf_test/column_utilities.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/cudf_gtest.hpp>
#include <cudf_test/debug_utilities.hpp>
shrshi marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/random.hpp>
Expand Down Expand Up @@ -2975,4 +2976,27 @@ TEST_F(JsonReaderTest, JsonDtypeSchema)
cudf::test::debug_output_level::ALL_ERRORS);
}

TEST_F(JsonReaderTest, LastRecordInvalid)
{
std::string data = R"({"key": "1"}
{"key": "})";
std::map<std::string, cudf::io::schema_element> schema{{"key", {dtype<cudf::string_view>()}}};
auto opts =
cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()})
.dtypes(schema)
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL)
.build();
auto const result = cudf::io::read_json(opts);

EXPECT_EQ(result.tbl->num_columns(), 1);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRING);
EXPECT_EQ(result.metadata.schema_info[0].name, "key");
auto const result_view = result.tbl->view().column(0);

EXPECT_EQ(result.tbl->num_rows(), 2);
cudf::test::strings_column_wrapper expected{{"1", ""}, cudf::test::iterators::nulls_at({1})};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(result_view, expected);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
}

CUDF_TEST_PROGRAM_MAIN()
1 change: 1 addition & 0 deletions cpp/tests/io/json/json_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ std::vector<cudf::io::table_with_metadata> split_byte_range_reading(
reader_opts.get_compression(),
reader_opts.get_byte_range_offset(),
reader_opts.get_byte_range_size(),
reader_opts.get_delimiter(),
stream);
// Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the
// return type of that function is size_type. However, when the chunk_size is
Expand Down
Loading