Skip to content

Commit

Permalink
Merge branch 'branch-24.02' into refactor_orc_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ttnghia authored Jan 9, 2024
2 parents 087357b + 3a1601d commit 68f7d2d
Show file tree
Hide file tree
Showing 31 changed files with 560 additions and 290 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
build_type: pull-request
script: ci/test_wheel_cudf.sh
wheel-build-dask-cudf:
needs: wheel-tests-cudf
needs: wheel-build-cudf
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
Expand Down
6 changes: 3 additions & 3 deletions ci/build_wheel.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.

set -euo pipefail

Expand All @@ -23,7 +23,7 @@ pyproject_file="${package_dir}/pyproject.toml"

sed -i "s/^name = \"${package_name}\"/name = \"${package_name}${PACKAGE_CUDA_SUFFIX}\"/g" ${pyproject_file}
echo "${version}" > VERSION
sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_dir}/${package_name}/_version.py"
sed -i "/^__git_commit__/ s/= .*/= \"${commit}\"/g" "${package_dir}/${package_name//-/_}/_version.py"

# For nightlies we want to ensure that we're pulling in alphas as well. The
# easiest way to do so is to augment the spec with a constraint containing a
Expand All @@ -34,7 +34,7 @@ if ! rapids-is-release-build; then
alpha_spec=',>=0.0.0a0'
fi

if [[ ${package_name} == "dask_cudf" ]]; then
if [[ ${package_name} == "dask-cudf" ]]; then
sed -r -i "s/cudf==(.*)\"/cudf${PACKAGE_CUDA_SUFFIX}==\1${alpha_spec}\"/g" ${pyproject_file}
sed -r -i "s/dask-cuda==(.*)\"/dask-cuda==\1${alpha_spec}\"/g" ${pyproject_file}
sed -r -i "s/rapids-dask-dependency==(.*)\"/rapids-dask-dependency==\1${alpha_spec}\"/g" ${pyproject_file}
Expand Down
4 changes: 2 additions & 2 deletions ci/build_wheel_dask_cudf.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#!/bin/bash
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.

set -euo pipefail

package_dir="python/dask_cudf"

./ci/build_wheel.sh dask_cudf ${package_dir}
./ci/build_wheel.sh dask-cudf ${package_dir}

RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"
RAPIDS_PY_WHEEL_NAME="dask_cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-upload-wheels-to-s3 ${package_dir}/dist
1 change: 0 additions & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ dependencies:
- librdkafka>=1.9.0,<1.10.0a0
- librmm==24.2.*
- make
- mimesis>=4.1.0
- moto>=4.0.8
- msgpack-python
- myst-nb
Expand Down
1 change: 0 additions & 1 deletion conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ dependencies:
- librdkafka>=1.9.0,<1.10.0a0
- librmm==24.2.*
- make
- mimesis>=4.1.0
- moto>=4.0.8
- msgpack-python
- myst-nb
Expand Down
10 changes: 6 additions & 4 deletions cpp/include/cudf/detail/offsets_iterator.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,10 +63,11 @@ struct input_offsetalator : base_normalator<input_offsetalator, int64_t> {
*
* Use the indexalator_factory to create an iterator instance.
*
* @param data Pointer to an integer array in device memory.
* @param dtype Type of data in data
* @param data Pointer to an integer array in device memory
* @param dtype Type of data in data
* @param offset Index value within `offsets` to use as the beginning of the iterator
*/
CUDF_HOST_DEVICE input_offsetalator(void const* data, data_type dtype)
CUDF_HOST_DEVICE input_offsetalator(void const* data, data_type dtype, size_type offset = 0)
: base_normalator<input_offsetalator, int64_t>(
dtype, dtype.id() == type_id::INT32 ? sizeof(int32_t) : sizeof(int64_t)),
p_{static_cast<char const*>(data)}
Expand All @@ -78,6 +79,7 @@ struct input_offsetalator : base_normalator<input_offsetalator, int64_t> {
cudf_assert((dtype.id() == type_id::INT32 || dtype.id() == type_id::INT64) &&
"Unexpected offsets type");
#endif
p_ += (this->width_ * offset);
}

protected:
Expand Down
11 changes: 8 additions & 3 deletions cpp/include/cudf/detail/offsets_iterator_factory.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,14 +28,19 @@ namespace detail {
struct offsetalator_factory {
/**
* @brief Create an input offsetalator instance from an offsets column
*
* @param offsets Column to wrap with an offsetalator
* @param offset Index value within `offsets` to use as the beginning of the iterator
*/
static input_offsetalator make_input_iterator(column_view const& offsets)
static input_offsetalator make_input_iterator(column_view const& offsets, size_type offset = 0)
{
return input_offsetalator(offsets.head(), offsets.type());
return input_offsetalator(offsets.head(), offsets.type(), offset);
}

/**
* @brief Create an output offsetalator instance from an offsets column
*
* @param offsets Column to wrap with an offsetalator
*/
static output_offsetalator make_output_iterator(mutable_column_view const& offsets)
{
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/io/detail/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class writer {
* @brief Finishes the chunked/streamed write process.
*/
void close();

/**
* @brief Skip work done in `close()`; should be called if `write()` failed.
*
* Calling skip_close() prevents the writer from writing the (invalid) file footer and the
* postscript.
*/
void skip_close();
};
} // namespace orc::detail
} // namespace cudf::io
41 changes: 21 additions & 20 deletions cpp/include/cudf/strings/detail/gather.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/offsets_iterator_factory.cuh>
#include <cudf/detail/sizes_to_offsets_iterator.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/strings/detail/utilities.hpp>
Expand Down Expand Up @@ -79,7 +80,7 @@ __forceinline__ __device__ uint4 load_uint4(char const* ptr)
template <typename StringIterator, typename MapIterator>
__global__ void gather_chars_fn_string_parallel(StringIterator strings_begin,
char* out_chars,
cudf::device_span<int32_t const> const out_offsets,
cudf::detail::input_offsetalator const out_offsets,
MapIterator string_indices,
size_type total_out_strings)
{
Expand Down Expand Up @@ -109,28 +110,25 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin,
// between `[out_start_aligned, out_end_aligned)` will be copied using uint4.
// `out_start + 4` and `out_end - 4` are used instead of `out_start` and `out_end` to avoid
// `load_uint4` reading beyond string boundaries.
int32_t out_start_aligned =
int64_t const out_start_aligned =
(out_start + in_datatype_size + alignment_offset + out_datatype_size - 1) /
out_datatype_size * out_datatype_size -
alignment_offset;
int32_t out_end_aligned =
int64_t const out_end_aligned =
(out_end - in_datatype_size + alignment_offset) / out_datatype_size * out_datatype_size -
alignment_offset;

for (size_type ichar = out_start_aligned + warp_lane * out_datatype_size;
ichar < out_end_aligned;
for (int64_t ichar = out_start_aligned + warp_lane * out_datatype_size; ichar < out_end_aligned;
ichar += cudf::detail::warp_size * out_datatype_size) {
*(out_chars_aligned + (ichar + alignment_offset) / out_datatype_size) =
load_uint4(in_start + ichar - out_start);
}

// Tail logic: copy characters of the current string outside `[out_start_aligned,
// out_end_aligned)`.
// Copy characters of the current string outside [out_start_aligned, out_end_aligned)
if (out_end_aligned <= out_start_aligned) {
// In this case, `[out_start_aligned, out_end_aligned)` is an empty set, and we copy the
// entire string.
for (int32_t ichar = out_start + warp_lane; ichar < out_end;
ichar += cudf::detail::warp_size) {
for (auto ichar = out_start + warp_lane; ichar < out_end; ichar += cudf::detail::warp_size) {
out_chars[ichar] = in_start[ichar - out_start];
}
} else {
Expand All @@ -139,7 +137,7 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin,
out_chars[out_start + warp_lane] = in_start[warp_lane];
}
// Copy characters in range `[out_end_aligned, out_end)`.
int32_t ichar = out_end_aligned + warp_lane;
auto const ichar = out_end_aligned + warp_lane;
if (ichar < out_end) { out_chars[ichar] = in_start[ichar - out_start]; }
}
}
Expand All @@ -164,11 +162,11 @@ __global__ void gather_chars_fn_string_parallel(StringIterator strings_begin,
template <int strings_per_threadblock, typename StringIterator, typename MapIterator>
__global__ void gather_chars_fn_char_parallel(StringIterator strings_begin,
char* out_chars,
cudf::device_span<int32_t const> const out_offsets,
cudf::detail::input_offsetalator const out_offsets,
MapIterator string_indices,
size_type total_out_strings)
{
__shared__ int32_t out_offsets_threadblock[strings_per_threadblock + 1];
__shared__ int64_t out_offsets_threadblock[strings_per_threadblock + 1];

// Current thread block will process output strings starting at `begin_out_string_idx`.
size_type begin_out_string_idx = blockIdx.x * strings_per_threadblock;
Expand All @@ -185,7 +183,7 @@ __global__ void gather_chars_fn_char_parallel(StringIterator strings_begin,
}
__syncthreads();

for (int32_t out_ibyte = threadIdx.x + out_offsets_threadblock[0];
for (int64_t out_ibyte = threadIdx.x + out_offsets_threadblock[0];
out_ibyte < out_offsets_threadblock[strings_current_threadblock];
out_ibyte += blockDim.x) {
// binary search for the string index corresponding to out_ibyte
Expand All @@ -197,7 +195,7 @@ __global__ void gather_chars_fn_char_parallel(StringIterator strings_begin,
size_type string_idx = thrust::distance(out_offsets_threadblock, string_idx_iter);

// calculate which character to load within the string
int32_t icharacter = out_ibyte - out_offsets_threadblock[string_idx];
auto const icharacter = out_ibyte - out_offsets_threadblock[string_idx];

size_type in_string_idx = string_indices[begin_out_string_idx + string_idx];
out_chars[out_ibyte] = strings_begin[in_string_idx].data()[icharacter];
Expand Down Expand Up @@ -227,7 +225,7 @@ template <typename StringIterator, typename MapIterator>
std::unique_ptr<cudf::column> gather_chars(StringIterator strings_begin,
MapIterator map_begin,
MapIterator map_end,
cudf::device_span<int32_t const> const offsets,
cudf::detail::input_offsetalator const offsets,
size_type chars_bytes,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
Expand Down Expand Up @@ -300,22 +298,25 @@ std::unique_ptr<cudf::column> gather(strings_column_view const& strings,
// build offsets column
auto const d_strings = column_device_view::create(strings.parent(), stream);
auto const d_in_offsets = !strings.is_empty() ? strings.offsets_begin() : nullptr;
auto const d_in_offsets = cudf::detail::offsetalator_factory::make_input_iterator(
strings.is_empty() ? make_empty_column(type_id::INT32)->view() : strings.offsets(),
strings.offset());
auto offsets_itr = thrust::make_transform_iterator(
begin,
cuda::proclaim_return_type<size_type>(
[d_strings = *d_strings, d_in_offsets] __device__(size_type idx) {
if (NullifyOutOfBounds && (idx < 0 || idx >= d_strings.size())) { return 0; }
if (not d_strings.is_valid(idx)) { return 0; }
return d_in_offsets[idx + 1] - d_in_offsets[idx];
return static_cast<size_type>(d_in_offsets[idx + 1] - d_in_offsets[idx]);
}));
auto [out_offsets_column, total_bytes] =
cudf::detail::make_offsets_child_column(offsets_itr, offsets_itr + output_count, stream, mr);
// build chars column
auto const offsets_view = out_offsets_column->view();
auto out_chars_column = gather_chars(
auto const offsets_view =
cudf::detail::offsetalator_factory::make_input_iterator(out_offsets_column->view());
auto out_chars_column = gather_chars(
d_strings->begin<string_view>(), begin, end, offsets_view, total_bytes, stream, mr);
return make_strings_column(output_count,
Expand Down
5 changes: 2 additions & 3 deletions cpp/include/cudf/strings/detail/strings_column_factories.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,9 +103,8 @@ std::unique_ptr<column> make_strings_column(IndexPairIterator begin,
auto const avg_bytes_per_row = bytes / std::max(strings_count - null_count, 1);
// use a character-parallel kernel for long string lengths
if (avg_bytes_per_row > FACTORY_BYTES_PER_ROW_THRESHOLD) {
auto const d_data = offsets_view.template data<size_type>();
auto const d_offsets =
device_span<size_type const>{d_data, static_cast<std::size_t>(offsets_view.size())};
cudf::detail::offsetalator_factory::make_input_iterator(offsets_view);
auto const str_begin = thrust::make_transform_iterator(
begin, cuda::proclaim_return_type<string_view>([] __device__(auto ip) {
return string_view{ip.first, ip.second};
Expand Down
16 changes: 15 additions & 1 deletion cpp/include/cudf/strings/detail/utilities.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,20 @@ rmm::device_uvector<string_view> create_string_vector_from_column(
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Return a normalized offset value from a strings offsets column
*
* @throw std::invalid_argument if `offsets` is neither INT32 nor INT64
*
* @param offsets Input column of type INT32 or INT64
* @param index Row value to retrieve
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Value at `offsets[index]`
*/
int64_t get_offset_value(cudf::column_view const& offsets,
size_type index,
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace strings
} // namespace cudf
11 changes: 10 additions & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,16 @@ void write_orc(orc_writer_options const& options)
auto writer = std::make_unique<orc::detail::writer>(
std::move(sinks[0]), options, io_detail::single_write_mode::YES, cudf::get_default_stream());

writer->write(options.get_table());
try {
writer->write(options.get_table());
} catch (...) {
// If an exception is thrown, the output is incomplete/corrupted.
// Make sure the writer will not close with such corrupted data.
// In addition, the writer may throw an exception while trying to close, which would terminate
// the process.
writer->skip_close();
throw;
}
}

/**
Expand Down
Loading

0 comments on commit 68f7d2d

Please sign in to comment.