Skip to content

Commit

Permalink
Testing stream pool implementation (#14437)
Browse files Browse the repository at this point in the history
The goal of this PR is to create a global pool containing only the cudf test stream which is to be used all stream tests that invoke `fork_streams` in their execution path. The stream pool is constructed by `create_global_cuda_stream_pool()` in `identify_stream_usage.cpp`, and overrides the function implementation in `utilities/stream_pool.cpp` when preloaded. 

The test checks for only `cudaLaunchKernel` being invoked with the wrong stream.

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)
  - Mark Harris (https://github.com/harrism)

URL: #14437
  • Loading branch information
shrshi authored Dec 19, 2023
1 parent 6b134dd commit 5dfafaf
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 63 deletions.
56 changes: 56 additions & 0 deletions cpp/include/cudf/detail/utilities/stream_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,62 @@

namespace cudf::detail {

class cuda_stream_pool {
public:
// matching type used in rmm::cuda_stream_pool::get_stream(stream_id)
using stream_id_type = std::size_t;

virtual ~cuda_stream_pool() = default;

/**
* @brief Get a `cuda_stream_view` of a stream in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return Stream view.
*/
virtual rmm::cuda_stream_view get_stream() = 0;

/**
* @brief Get a `cuda_stream_view` of the stream associated with `stream_id`.
*
* Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream.
* This function is thread safe with respect to other calls to the same function.
*
* @param stream_id Unique identifier for the desired stream
* @return Requested stream view.
*/
virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0;

/**
* @brief Get a set of `cuda_stream_view` objects from the pool.
*
* An attempt is made to ensure that the returned vector does not contain duplicate
* streams, but this cannot be guaranteed if `count` is greater than the value returned by
* `get_stream_pool_size()`.
*
* This function is thread safe with respect to other calls to the same function.
*
* @param count The number of stream views to return.
* @return Vector containing `count` stream views.
*/
virtual std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) = 0;

/**
* @brief Get the number of unique stream objects in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return the number of stream objects in the pool
*/
virtual std::size_t get_stream_pool_size() const = 0;
};

/**
* @brief Initialize global stream pool.
*/
cuda_stream_pool* create_global_cuda_stream_pool();

/**
* @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another
* stream.
Expand Down
59 changes: 0 additions & 59 deletions cpp/src/utilities/stream_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

namespace cudf::detail {

namespace {

// TODO: what is a good number here. what's the penalty for making it larger?
// Dave Baranec rule of thumb was max_streams_needed * num_concurrent_threads,
// where num_concurrent_threads was estimated to be 4. so using 32 will allow
Expand Down Expand Up @@ -58,57 +56,6 @@ std::size_t constexpr STREAM_POOL_SIZE = 32;
} while (0)
#endif

class cuda_stream_pool {
public:
// matching type used in rmm::cuda_stream_pool::get_stream(stream_id)
using stream_id_type = std::size_t;

virtual ~cuda_stream_pool() = default;

/**
* @brief Get a `cuda_stream_view` of a stream in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return Stream view.
*/
virtual rmm::cuda_stream_view get_stream() = 0;

/**
* @brief Get a `cuda_stream_view` of the stream associated with `stream_id`.
*
* Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream.
* This function is thread safe with respect to other calls to the same function.
*
* @param stream_id Unique identifier for the desired stream
* @return Requested stream view.
*/
virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0;

/**
* @brief Get a set of `cuda_stream_view` objects from the pool.
*
* An attempt is made to ensure that the returned vector does not contain duplicate
* streams, but this cannot be guaranteed if `count` is greater than the value returned by
* `get_stream_pool_size()`.
*
* This function is thread safe with respect to other calls to the same function.
*
* @param count The number of stream views to return.
* @return Vector containing `count` stream views.
*/
virtual std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) = 0;

/**
* @brief Get the number of stream objects in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return the number of stream objects in the pool
*/
virtual std::size_t get_stream_pool_size() const = 0;
};

/**
* @brief Implementation of `cuda_stream_pool` that wraps an `rmm::cuda_stram_pool`.
*/
Expand Down Expand Up @@ -157,13 +104,9 @@ class debug_cuda_stream_pool : public cuda_stream_pool {
std::size_t get_stream_pool_size() const override { return 1UL; }
};

/**
* @brief Initialize global stream pool.
*/
cuda_stream_pool* create_global_cuda_stream_pool()
{
if (getenv("LIBCUDF_USE_DEBUG_STREAM_POOL")) return new debug_cuda_stream_pool();

return new rmm_cuda_stream_pool();
}

Expand Down Expand Up @@ -231,8 +174,6 @@ cuda_stream_pool& global_cuda_stream_pool()
return *pools[device_id.value()];
}

} // anonymous namespace

std::vector<rmm::cuda_stream_view> fork_streams(rmm::cuda_stream_view stream, std::size_t count)
{
auto const streams = global_cuda_stream_pool().get_streams(count);
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
Expand Down
33 changes: 33 additions & 0 deletions cpp/tests/streams/pool_test.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/default_stream.hpp>

#include <cudf/detail/utilities/stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>

class StreamPoolTest : public cudf::test::BaseFixture {};

__global__ void do_nothing_kernel() {}

TEST_F(StreamPoolTest, ForkStreams)
{
auto streams = cudf::detail::fork_streams(cudf::test::get_default_stream(), 2);
for (auto& stream : streams) {
do_nothing_kernel<<<1, 32, 0, stream.value()>>>();
}
}
41 changes: 37 additions & 4 deletions cpp/tests/utilities/identify_stream_usage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <cudf/detail/utilities/stacktrace.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>

#include <rmm/cuda_stream.hpp>
#include <rmm/cuda_stream_view.hpp>
Expand All @@ -31,10 +32,14 @@
#include <string>
#include <unordered_map>

// This file is compiled into a separate library that is dynamically loaded with LD_PRELOAD at
// runtime to libcudf to override some stream-related symbols in libcudf. The goal of such a library
// is to verify if the stream/stream pool is being correctly forwarded between API calls.
//
// We control whether to override cudf::test::get_default_stream or
// cudf::get_default_stream with a compile-time flag. Thesee are the two valid
// options:
// 1. STREAM_MODE_TESTING=OFF: In this mode, cudf::get_default_stream will
// cudf::get_default_stream with a compile-time flag. The behaviour of tests
// depend on whether STREAM_MODE_TESTING is defined:
// 1. If STREAM_MODE_TESTING is not defined, cudf::get_default_stream will
// return a custom stream and stream_is_invalid will return true if any CUDA
// API is called using any of CUDA's default stream constants
// (cudaStreamLegacy, cudaStreamDefault, or cudaStreamPerThread). This check
Expand All @@ -44,7 +49,7 @@
// is not sufficient to guarantee a stream-ordered API because it will not
// identify places in the code that use cudf::get_default_stream instead of
// properly forwarding along a user-provided stream.
// 2. STREAM_MODE_TESTING=ON: In this mode, cudf::test::get_default_stream
// 2. If STREAM_MODE_TESTING compiler option is defined, cudf::test::get_default_stream
// returns a custom stream and stream_is_invalid returns true if any CUDA
// API is called using any stream other than cudf::test::get_default_stream.
// This is a necessary and sufficient condition to ensure that libcudf is
Expand All @@ -66,6 +71,34 @@ rmm::cuda_stream_view const get_default_stream()
} // namespace test
#endif

#ifdef STREAM_MODE_TESTING
namespace detail {

/**
* @brief Implementation of `cuda_stream_pool` that always returns the
* `cudf::test::get_default_stream()`
*/
class test_cuda_stream_pool : public cuda_stream_pool {
public:
rmm::cuda_stream_view get_stream() override { return cudf::test::get_default_stream(); }
[[maybe_unused]] rmm::cuda_stream_view get_stream(stream_id_type stream_id) override
{
return cudf::test::get_default_stream();
}

std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) override
{
return std::vector<rmm::cuda_stream_view>(count, cudf::test::get_default_stream());
}

std::size_t get_stream_pool_size() const override { return 1UL; }
};

cuda_stream_pool* create_global_cuda_stream_pool() { return new test_cuda_stream_pool(); }

} // namespace detail
#endif

} // namespace cudf

bool stream_is_invalid(cudaStream_t stream)
Expand Down

0 comments on commit 5dfafaf

Please sign in to comment.