From aab8a76b532b46713b9784302ffd202586ecb5cc Mon Sep 17 00:00:00 2001
From: Elias Stehle <3958403+elstehle@users.noreply.github.com>
Date: Tue, 2 Apr 2024 02:14:01 +0200
Subject: [PATCH] Fixes potential race in JSON parser when parsing JSON lines
 format and when recovering from invalid lines (#15419)

PR adds a missing synchronization before the FST destructor of the FST used for cleaning excess characters following the first valid record on a JSON line.

The problem is that the FST's destructor could otherwise free memory that is yet to be used by the still running FST instance.


Closes https://github.com/rapidsai/cudf/issues/15409

Authors:
  - Elias Stehle (https://github.com/elstehle)

Approvers:
  - Alessandro Bellina (https://github.com/abellina)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: https://github.com/rapidsai/cudf/pull/15419
---
 cpp/src/io/json/nested_json_gpu.cu |   3 +
 cpp/tests/io/json_test.cpp         | 107 +++++++++++++++++++++++++++++
 2 files changed, 110 insertions(+)

diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu
index a6a57c36b08..4ddbe735963 100644
--- a/cpp/src/io/json/nested_json_gpu.cu
+++ b/cpp/src/io/json/nested_json_gpu.cu
@@ -1583,6 +1583,9 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> ge
                                         thrust::make_discard_iterator(),
                                         fix_stack_of_excess_chars::start_state,
                                         stream);
+
+    // Make sure memory of the FST's lookup tables isn't freed before the FST completes
+    stream.synchronize();
   }
 
   constexpr auto max_translation_table_size =
diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp
index 0b70e5e3f93..bae71d3c2a8 100644
--- a/cpp/tests/io/json_test.cpp
+++ b/cpp/tests/io/json_test.cpp
@@ -28,6 +28,7 @@
 #include <cudf/detail/iterator.cuh>
 #include <cudf/io/arrow_io_source.hpp>
 #include <cudf/io/json.hpp>
+#include <cudf/io/memory_resource.hpp>
 #include <cudf/strings/convert/convert_fixed_point.hpp>
 #include <cudf/strings/repeat_strings.hpp>
 #include <cudf/strings/strings_column_view.hpp>
@@ -35,12 +36,15 @@
 #include <cudf/table/table_view.hpp>
 #include <cudf/types.hpp>
 
+#include <rmm/mr/pinned_host_memory_resource.hpp>
+
 #include <thrust/iterator/constant_iterator.h>
 
 #include <arrow/io/api.h>
 
 #include <fstream>
 #include <limits>
+#include <memory>
 #include <type_traits>
 
 #define wrapper cudf::test::fixed_width_column_wrapper
@@ -2050,6 +2054,109 @@ TEST_F(JsonReaderTest, JSONLinesRecoveringIgnoreExcessChars)
     float64_wrapper{{0.0, 0.0, 0.0, 1.2, 0.0, 0.0, 0.0, 0.0}, c_validity.cbegin()});
 }
 
+// Sanity test that checks whether there's a race on the FST destructor
+TEST_F(JsonReaderTest, JSONLinesRecoveringSync)
+{
+  // Set up host pinned memory pool to avoid implicit synchronizations to test for any potential
+  // races due to missing host-device synchronizations
+  using host_pooled_mr = rmm::mr::pool_memory_resource<rmm::mr::pinned_host_memory_resource>;
+  host_pooled_mr mr{std::make_shared<rmm::mr::pinned_host_memory_resource>().get(),
+                    size_t{128} * 1024 * 1024};
+
+  // Set new resource
+  auto last_mr = cudf::io::set_host_memory_resource(mr);
+
+  /**
+   * @brief Spark has the specific need to ignore extra characters that come after the first record
+   * on a JSON line
+   */
+  std::string data =
+    // 0 -> a: -2 (valid)
+    R"({"a":-2}{})"
+    "\n"
+    // 1 -> (invalid)
+    R"({"b":{}should_be_invalid})"
+    "\n"
+    // 2 -> b (valid)
+    R"({"b":{"a":3} })"
+    "\n"
+    // 3 -> c: (valid)
+    R"({"c":1.2 } )"
+    "\n"
+    "\n"
+    // 4 -> (valid)
+    R"({"a":4} 123)"
+    "\n"
+    // 5 -> (valid)
+    R"({"a":5}//Comment after record)"
+    "\n"
+    // 6 -> (valid)
+    R"({"a":6} //Comment after whitespace)"
+    "\n"
+    // 7 -> (invalid)
+    R"({"a":5 //Invalid Comment within record})";
+
+  // Create input of a certain size to potentially reveal a missing host/device sync
+  std::size_t const target_size = 40000000;
+  auto const repetitions_log2 =
+    static_cast<std::size_t>(std::ceil(std::log2(target_size / data.size())));
+  auto const repetitions = 1ULL << repetitions_log2;
+
+  for (std::size_t i = 0; i < repetitions_log2; ++i) {
+    data = data + "\n" + data;
+  }
+
+  auto filepath = temp_env->get_temp_dir() + "RecoveringLinesExcessChars.json";
+  {
+    std::ofstream outfile(filepath, std::ofstream::out);
+    outfile << data;
+  }
+
+  cudf::io::json_reader_options in_options =
+    cudf::io::json_reader_options::builder(cudf::io::source_info{filepath})
+      .lines(true)
+      .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL);
+
+  cudf::io::table_with_metadata result = cudf::io::read_json(in_options);
+
+  EXPECT_EQ(result.tbl->num_columns(), 3);
+  EXPECT_EQ(result.tbl->num_rows(), 8 * repetitions);
+  EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT64);
+  EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::STRUCT);
+  EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::FLOAT64);
+
+  std::vector<bool> a_validity{true, false, false, false, true, true, true, false};
+  std::vector<bool> b_validity{false, false, true, false, false, false, false, false};
+  std::vector<bool> c_validity{false, false, false, true, false, false, false, false};
+
+  std::vector<std::int32_t> a_data{-2, 0, 0, 0, 4, 5, 6, 0};
+  std::vector<std::int32_t> b_a_data{0, 0, 3, 0, 0, 0, 0, 0};
+  std::vector<double> c_data{0.0, 0.0, 0.0, 1.2, 0.0, 0.0, 0.0, 0.0};
+
+  for (std::size_t i = 0; i < repetitions_log2; ++i) {
+    a_validity.insert(a_validity.end(), a_validity.cbegin(), a_validity.cend());
+    b_validity.insert(b_validity.end(), b_validity.cbegin(), b_validity.cend());
+    c_validity.insert(c_validity.end(), c_validity.cbegin(), c_validity.cend());
+    a_data.insert(a_data.end(), a_data.cbegin(), a_data.cend());
+    b_a_data.insert(b_a_data.end(), b_a_data.cbegin(), b_a_data.cend());
+    c_data.insert(c_data.end(), c_data.cbegin(), c_data.cend());
+  }
+
+  // Child column b->a
+  auto b_a_col = int64_wrapper(b_a_data.cbegin(), b_a_data.cend());
+
+  CUDF_TEST_EXPECT_COLUMNS_EQUAL(
+    result.tbl->get_column(0), int64_wrapper{a_data.cbegin(), a_data.cend(), a_validity.cbegin()});
+  CUDF_TEST_EXPECT_COLUMNS_EQUAL(
+    result.tbl->get_column(1), cudf::test::structs_column_wrapper({b_a_col}, b_validity.cbegin()));
+  CUDF_TEST_EXPECT_COLUMNS_EQUAL(
+    result.tbl->get_column(2),
+    float64_wrapper{c_data.cbegin(), c_data.cend(), c_validity.cbegin()});
+
+  // Restore original memory source
+  cudf::io::set_host_memory_resource(last_mr);
+}
+
 TEST_F(JsonReaderTest, MixedTypes)
 {
   using LCWS    = cudf::test::lists_column_wrapper<cudf::string_view>;