Skip to content

Commit

Permalink
Fixes potential race in JSON parser when parsing JSON lines format an…
Browse files Browse the repository at this point in the history
…d when recovering from invalid lines (rapidsai#15419)

PR adds a missing synchronization before the FST destructor of the FST used for cleaning excess characters following the first valid record on a JSON line.

The problem is that the FST's destructor could otherwise free memory that is yet to be used by the still running FST instance.


Closes rapidsai#15409

Authors:
  - Elias Stehle (https://github.com/elstehle)

Approvers:
  - Alessandro Bellina (https://github.com/abellina)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai#15419
  • Loading branch information
elstehle authored Apr 2, 2024
1 parent 268996a commit aab8a76
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,9 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
thrust::make_discard_iterator(),
fix_stack_of_excess_chars::start_state,
stream);

// Make sure memory of the FST's lookup tables isn't freed before the FST completes
stream.synchronize();
}

constexpr auto max_translation_table_size =
Expand Down
107 changes: 107 additions & 0 deletions cpp/tests/io/json_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/io/arrow_io_source.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/memory_resource.hpp>
#include <cudf/strings/convert/convert_fixed_point.hpp>
#include <cudf/strings/repeat_strings.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <rmm/mr/pinned_host_memory_resource.hpp>

#include <thrust/iterator/constant_iterator.h>

#include <arrow/io/api.h>

#include <fstream>
#include <limits>
#include <memory>
#include <type_traits>

#define wrapper cudf::test::fixed_width_column_wrapper
Expand Down Expand Up @@ -2050,6 +2054,109 @@ TEST_F(JsonReaderTest, JSONLinesRecoveringIgnoreExcessChars)
float64_wrapper{{0.0, 0.0, 0.0, 1.2, 0.0, 0.0, 0.0, 0.0}, c_validity.cbegin()});
}

// Sanity test that checks whether there's a race on the FST destructor
TEST_F(JsonReaderTest, JSONLinesRecoveringSync)
{
// Set up host pinned memory pool to avoid implicit synchronizations to test for any potential
// races due to missing host-device synchronizations
using host_pooled_mr = rmm::mr::pool_memory_resource<rmm::mr::pinned_host_memory_resource>;
host_pooled_mr mr{std::make_shared<rmm::mr::pinned_host_memory_resource>().get(),
size_t{128} * 1024 * 1024};

// Set new resource
auto last_mr = cudf::io::set_host_memory_resource(mr);

/**
* @brief Spark has the specific need to ignore extra characters that come after the first record
* on a JSON line
*/
std::string data =
// 0 -> a: -2 (valid)
R"({"a":-2}{})"
"\n"
// 1 -> (invalid)
R"({"b":{}should_be_invalid})"
"\n"
// 2 -> b (valid)
R"({"b":{"a":3} })"
"\n"
// 3 -> c: (valid)
R"({"c":1.2 } )"
"\n"
"\n"
// 4 -> (valid)
R"({"a":4} 123)"
"\n"
// 5 -> (valid)
R"({"a":5}//Comment after record)"
"\n"
// 6 -> (valid)
R"({"a":6} //Comment after whitespace)"
"\n"
// 7 -> (invalid)
R"({"a":5 //Invalid Comment within record})";

// Create input of a certain size to potentially reveal a missing host/device sync
std::size_t const target_size = 40000000;
auto const repetitions_log2 =
static_cast<std::size_t>(std::ceil(std::log2(target_size / data.size())));
auto const repetitions = 1ULL << repetitions_log2;

for (std::size_t i = 0; i < repetitions_log2; ++i) {
data = data + "\n" + data;
}

auto filepath = temp_env->get_temp_dir() + "RecoveringLinesExcessChars.json";
{
std::ofstream outfile(filepath, std::ofstream::out);
outfile << data;
}

cudf::io::json_reader_options in_options =
cudf::io::json_reader_options::builder(cudf::io::source_info{filepath})
.lines(true)
.recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL);

cudf::io::table_with_metadata result = cudf::io::read_json(in_options);

EXPECT_EQ(result.tbl->num_columns(), 3);
EXPECT_EQ(result.tbl->num_rows(), 8 * repetitions);
EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::STRUCT);
EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::FLOAT64);

std::vector<bool> a_validity{true, false, false, false, true, true, true, false};
std::vector<bool> b_validity{false, false, true, false, false, false, false, false};
std::vector<bool> c_validity{false, false, false, true, false, false, false, false};

std::vector<std::int32_t> a_data{-2, 0, 0, 0, 4, 5, 6, 0};
std::vector<std::int32_t> b_a_data{0, 0, 3, 0, 0, 0, 0, 0};
std::vector<double> c_data{0.0, 0.0, 0.0, 1.2, 0.0, 0.0, 0.0, 0.0};

for (std::size_t i = 0; i < repetitions_log2; ++i) {
a_validity.insert(a_validity.end(), a_validity.cbegin(), a_validity.cend());
b_validity.insert(b_validity.end(), b_validity.cbegin(), b_validity.cend());
c_validity.insert(c_validity.end(), c_validity.cbegin(), c_validity.cend());
a_data.insert(a_data.end(), a_data.cbegin(), a_data.cend());
b_a_data.insert(b_a_data.end(), b_a_data.cbegin(), b_a_data.cend());
c_data.insert(c_data.end(), c_data.cbegin(), c_data.cend());
}

// Child column b->a
auto b_a_col = int64_wrapper(b_a_data.cbegin(), b_a_data.cend());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(
result.tbl->get_column(0), int64_wrapper{a_data.cbegin(), a_data.cend(), a_validity.cbegin()});
CUDF_TEST_EXPECT_COLUMNS_EQUAL(
result.tbl->get_column(1), cudf::test::structs_column_wrapper({b_a_col}, b_validity.cbegin()));
CUDF_TEST_EXPECT_COLUMNS_EQUAL(
result.tbl->get_column(2),
float64_wrapper{c_data.cbegin(), c_data.cend(), c_validity.cbegin()});

// Restore original memory source
cudf::io::set_host_memory_resource(last_mr);
}

TEST_F(JsonReaderTest, MixedTypes)
{
using LCWS = cudf::test::lists_column_wrapper<cudf::string_view>;
Expand Down

0 comments on commit aab8a76

Please sign in to comment.