Skip to content

Commit

Permalink
Refactor multibyte_split output_builder (#11945)
Browse files Browse the repository at this point in the history
This PR moves the `output_builder` and `split_device_span` classes out of `multibyte_split` and adds an iterator for the `split_device_span`, enabling it to be used directly in Thrust algorithms.

I also included a fix from #11875 to make the integration easier once that is merged.

Authors:
  - Tobias Ribizel (https://github.com/upsj)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #11945
  • Loading branch information
upsj authored Oct 27, 2022
1 parent b4ca894 commit 43eb7a0
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 215 deletions.
217 changes: 2 additions & 215 deletions cpp/src/io/text/multibyte_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
* limitations under the License.
*/

// Can be removed once we use Thrust 1.16+
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpragmas"
#pragma GCC diagnostic ignored "-Wsizeof-array-div"
#include <io/utilities/output_builder.cuh>

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
Expand Down Expand Up @@ -48,54 +45,12 @@
#include <cub/block/block_load.cuh>
#include <cub/block/block_scan.cuh>

#pragma GCC diagnostic pop

#include <cstdint>
#include <limits>
#include <memory>
#include <numeric>
#include <optional>

namespace cudf {

/**
* @brief A device span consisting of two separate device_spans acting as if they were part of a
* single span. The first head.size() entries are served from the first span, the remaining
* tail.size() entries are served from the second span.
*
* @tparam T The type of elements in the span.
*/
template <typename T>
class split_device_span {
public:
explicit constexpr split_device_span(device_span<T> head, device_span<T> tail = {})
: _head{head}, _tail{tail}
{
}

[[nodiscard]] constexpr T& operator[](size_type i)
{
return i < _head.size() ? _head[i] : _tail[i - _head.size()];
}

[[nodiscard]] constexpr const T& operator[](size_type i) const
{
return i < _head.size() ? _head[i] : _tail[i - _head.size()];
}

[[nodiscard]] constexpr size_type size() const { return _head.size() + _tail.size(); }

[[nodiscard]] constexpr device_span<T> head() const { return _head; }

[[nodiscard]] constexpr device_span<T> tail() const { return _tail; }

private:
device_span<T> _head;
device_span<T> _tail;
};

} // namespace cudf

namespace {

using cudf::io::text::detail::multistate;
Expand Down Expand Up @@ -385,172 +340,6 @@ std::vector<rmm::cuda_stream_view> get_streams(int32_t count, rmm::cuda_stream_p
return streams;
}

/**
* @brief A chunked storage class that provides preallocated memory for algorithms with known
* worst-case output size. It provides functionality to retrieve the next chunk to write to, for
* reporting how much memory was actually written and for gathering all previously written outputs
* into a single contiguous vector.
*
* @tparam T The output element type.
*/
template <typename T>
class output_builder {
public:
using size_type = typename rmm::device_uvector<T>::size_type;

/**
* @brief Initializes an output builder with given worst-case output size and stream.
*
* @param max_write_size the maximum number of elements that will be written into a
* split_device_span returned from `next_output`.
* @param stream the stream used to allocate the first chunk of memory.
* @param mr optional, the memory resource to use for allocation.
*/
output_builder(size_type max_write_size,
size_type max_growth,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
: _size{0}, _max_write_size{max_write_size}, _max_growth{max_growth}
{
CUDF_EXPECTS(max_write_size > 0, "Internal error");
_chunks.emplace_back(0, stream, mr);
_chunks.back().reserve(max_write_size * 2, stream);
}

output_builder(output_builder&&) = delete;
output_builder(const output_builder&) = delete;
output_builder& operator=(output_builder&&) = delete;
output_builder& operator=(const output_builder&) = delete;

/**
* @brief Returns the next free chunk of `max_write_size` elements from the underlying storage.
* Must be followed by a call to `advance_output` after the memory has been written to.
*
* @param stream The stream to allocate a new chunk of memory with, if necessary.
* This should be the stream that will write to the `split_device_span`.
* @return A `split_device_span` starting directly after the last output and providing at least
* `max_write_size` entries of storage.
*/
[[nodiscard]] split_device_span<T> next_output(rmm::cuda_stream_view stream)
{
auto head_it = _chunks.end() - (_chunks.size() > 1 and _chunks.back().is_empty() ? 2 : 1);
auto head_span = get_free_span(*head_it);
if (head_span.size() >= _max_write_size) { return split_device_span<T>{head_span}; }
if (head_it == _chunks.end() - 1) {
// insert a new vector of double size
auto const next_chunk_size =
std::min(_max_growth * _max_write_size, 2 * _chunks.back().capacity());
_chunks.emplace_back(0, stream, _chunks.back().memory_resource());
_chunks.back().reserve(next_chunk_size, stream);
}
auto tail_span = get_free_span(_chunks.back());
CUDF_EXPECTS(head_span.size() + tail_span.size() >= _max_write_size, "Internal error");
return split_device_span<T>{head_span, tail_span};
}

/**
* @brief Advances the output sizes after a `split_device_span` returned from `next_output` was
* written to.
*
* @param actual_size The number of elements that were written to the result of the previous
* `next_output` call.
*/
void advance_output(size_type actual_size, rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(actual_size <= _max_write_size, "Internal error");
if (_chunks.size() < 2) {
auto const new_size = _chunks.back().size() + actual_size;
inplace_resize(_chunks.back(), new_size, stream);
} else {
auto& tail = _chunks.back();
auto& prev = _chunks.rbegin()[1];
auto const prev_advance = std::min(actual_size, prev.capacity() - prev.size());
auto const tail_advance = actual_size - prev_advance;
inplace_resize(prev, prev.size() + prev_advance, stream);
inplace_resize(tail, tail.size() + tail_advance, stream);
}
_size += actual_size;
}

/**
* @brief Returns the first element that was written to the output.
* Requires a previous call to `next_output` and `advance_output` and `size() > 0`.
* @param stream The stream used to access the element.
* @return The first element that was written to the output.
*/
[[nodiscard]] T front_element(rmm::cuda_stream_view stream) const
{
return _chunks.front().front_element(stream);
}

/**
* @brief Returns the last element that was written to the output.
* Requires a previous call to `next_output` and `advance_output` and `size() > 0`.
* @param stream The stream used to access the element.
* @return The last element that was written to the output.
*/
[[nodiscard]] T back_element(rmm::cuda_stream_view stream) const
{
auto const& last_nonempty_chunk =
_chunks.size() > 1 and _chunks.back().is_empty() ? _chunks.rbegin()[1] : _chunks.back();
return last_nonempty_chunk.back_element(stream);
}

[[nodiscard]] size_type size() const { return _size; }

/**
* @brief Gathers all previously written outputs into a single contiguous vector.
*
* @param stream The stream used to allocate and gather the output vector. All previous write
* operations to the output buffer must have finished or happened on this stream.
* @param mr The memory resource used to allocate the output vector.
* @return The output vector.
*/
rmm::device_uvector<T> gather(rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const
{
rmm::device_uvector<T> output{size(), stream, mr};
auto output_it = output.begin();
for (auto const& chunk : _chunks) {
output_it = thrust::copy(
rmm::exec_policy_nosync(stream), chunk.begin(), chunk.begin() + chunk.size(), output_it);
}
return output;
}

private:
/**
* @brief Resizes a vector without reallocating
*
* @param vector The vector
* @param new_size The new size. Must be smaller than the vector's capacity
*/
static void inplace_resize(rmm::device_uvector<T>& vector,
size_type new_size,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(new_size <= vector.capacity(), "Internal error");
vector.resize(new_size, stream);
}

/**
* @brief Returns the span consisting of all currently unused elements in the vector
* (`i >= size() and i < capacity()`).
*
* @param vector The vector.
* @return The span of unused elements.
*/
static device_span<T> get_free_span(rmm::device_uvector<T>& vector)
{
return device_span<T>{vector.data() + vector.size(), vector.capacity() - vector.size()};
}

size_type _size;
size_type _max_write_size;
size_type _max_growth;
std::vector<rmm::device_uvector<T>> _chunks;
};

std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source const& source,
std::string const& delimiter,
byte_range_info byte_range,
Expand Down Expand Up @@ -732,9 +521,7 @@ std::unique_ptr<cudf::column> multibyte_split(cudf::io::text::data_chunk_source
chunk->data() + std::min<byte_offset>(sentinel - chunk_offset, chunk->size());
auto const output_size = end - begin;
auto char_output = char_storage.next_output(scan_stream);
auto const split = begin + std::min<byte_offset>(output_size, char_output.head().size());
thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, split, char_output.head().begin());
thrust::copy(rmm::exec_policy_nosync(scan_stream), split, end, char_output.tail().begin());
thrust::copy(rmm::exec_policy_nosync(scan_stream), begin, end, char_output.begin());
char_storage.advance_output(output_size, scan_stream);
}

Expand Down
Loading

0 comments on commit 43eb7a0

Please sign in to comment.