Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Parquet kernel_error #14464

Merged
merged 7 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ namespace cudf::io::parquet {
* the object's lifetime.
*/
class kernel_error {
public:
using error_type = uint32_t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to come up with a consistent API. We currently expose data() and value(), i.e. the device_scalar API. If that does not change, we should probably stay consistent and declare value_type and pointer aliases.
https://docs.rapids.ai/api/librmm/stable/classrmm_1_1device__scalar.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

using pointer_type = error_type*;

private:
rmm::device_scalar<int32_t> _error_code;
rmm::device_scalar<error_type> _error_code;

public:
/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer_type error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -622,7 +622,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "error.hpp"
#include "parquet_gpu.hpp"
#include "rle_stream.cuh"

Expand Down Expand Up @@ -44,7 +45,7 @@ struct page_state_s {
int32_t dict_val{};
uint32_t initial_rle_run[NUM_LEVEL_TYPES]{}; // [def,rep]
int32_t initial_rle_value[NUM_LEVEL_TYPES]{}; // [def,rep]
int32_t error{};
kernel_error::error_type error{};
PageInfo page{};
ColumnChunkDesc col{};

Expand Down Expand Up @@ -73,13 +74,13 @@ struct page_state_s {

inline __device__ void set_error_code(decode_error err)
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{error};
ref.fetch_or(static_cast<int32_t>(err), cuda::std::memory_order_relaxed);
cuda::atomic_ref<kernel_error::error_type, cuda::thread_scope_block> ref{error};
ref.fetch_or(static_cast<kernel_error::error_type>(err), cuda::std::memory_order_relaxed);
}

inline __device__ void reset_error_code()
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{error};
cuda::atomic_ref<kernel_error::error_type, cuda::thread_scope_block> ref{error};
ref.store(0, cuda::std::memory_order_release);
}
};
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ __global__ void __launch_bounds__(96)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer_type error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -346,7 +346,8 @@ __global__ void __launch_bounds__(96)

auto const batch_size = db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
set_error(static_cast<kernel_error::error_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
error_code);
return;
}

Expand Down Expand Up @@ -428,7 +429,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer_type error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_byte_array_decoder db_state;
Expand Down Expand Up @@ -475,7 +476,8 @@ __global__ void __launch_bounds__(decode_block_size)
if (prefix_db->values_per_mb != suffix_db->values_per_mb or
prefix_db->block_size != suffix_db->block_size or
prefix_db->value_count != suffix_db->value_count) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAM_MISMATCH), error_code);
set_error(static_cast<kernel_error::error_type>(decode_error::DELTA_PARAM_MISMATCH),
error_code);
return;
}

Expand All @@ -485,7 +487,8 @@ __global__ void __launch_bounds__(decode_block_size)

auto const batch_size = prefix_db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
set_error(static_cast<kernel_error::error_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
error_code);
return;
}

Expand Down Expand Up @@ -567,10 +570,7 @@ __global__ void __launch_bounds__(decode_block_size)
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace
Expand All @@ -583,7 +583,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -608,7 +608,7 @@ void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "error.hpp"
#include "parquet_gpu.hpp"
#include <io/utilities/block_utils.cuh>

Expand Down Expand Up @@ -345,14 +346,16 @@ struct gpuParsePageHeader {
* @param[in] num_chunks Number of column chunks
*/
// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuDecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, int32_t* error_code)
__global__ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
kernel_error::pointer_type error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
__shared__ byte_stream_s bs_g[4];

int32_t error[4] = {0};
kernel_error::error_type error[4] = {0};

auto const lane_id = threadIdx.x % warp_size;
auto const warp_id = threadIdx.x / warp_size;
auto const chunk = (blockIdx.x * 4) + warp_id;
Expand Down Expand Up @@ -440,7 +443,8 @@ __global__ void __launch_bounds__(128)
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
if (bs->cur > bs->end) {
error[warp_id] |= static_cast<int32_t>(decode_error::DATA_STREAM_OVERRUN);
error[warp_id] |=
static_cast<kernel_error::error_type>(decode_error::DATA_STREAM_OVERRUN);
}
bs->page.kernel_mask = kernel_mask_for_page(bs->page, bs->ck);
} else {
Expand Down Expand Up @@ -513,7 +517,7 @@ __global__ void __launch_bounds__(128)

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "delta_binary.cuh"
#include "error.hpp"
#include "page_decode.cuh"
#include "page_string_utils.cuh"

Expand Down Expand Up @@ -784,7 +785,7 @@ __global__ void __launch_bounds__(decode_block_size)
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
kernel_error::pointer_type error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) page_state_s state_g;
Expand Down Expand Up @@ -1057,7 +1058,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include "error.hpp"

#include "io/comp/gpuinflate.hpp"
#include "io/parquet/parquet.hpp"
#include "io/parquet/parquet_common.hpp"
Expand Down Expand Up @@ -74,10 +76,10 @@ constexpr bool is_supported_encoding(Encoding enc)
/**
* @brief Atomically OR `error` into `error_code`.
*/
constexpr void set_error(int32_t error, int32_t* error_code)
constexpr void set_error(kernel_error::error_type error, kernel_error::pointer_type error_code)
{
if (error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
cuda::atomic_ref<kernel_error::error_type, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(error, cuda::std::memory_order_relaxed);
}
}
Expand All @@ -87,7 +89,7 @@ constexpr void set_error(int32_t error, int32_t* error_code)
*
* These values are used as bitmasks, so they must be powers of 2.
*/
enum class decode_error : int32_t {
enum class decode_error : uint32_t {
vuule marked this conversation as resolved.
Show resolved Hide resolved
DATA_STREAM_OVERRUN = 0x1,
LEVEL_STREAM_OVERRUN = 0x2,
UNSUPPORTED_ENCODING = 0x4,
Expand Down Expand Up @@ -549,7 +551,7 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*/
void DecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream);

/**
Expand Down Expand Up @@ -655,7 +657,7 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -677,7 +679,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -699,7 +701,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream);

/**
Expand All @@ -721,7 +723,7 @@ void DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
kernel_error::pointer_type error_code,
rmm::cuda_stream_view stream);

/**
Expand Down