Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use logical types in Parquet reader #15365

Merged
merged 18 commits into from
Mar 27, 2024
Merged
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ __device__ inline void gpuDecodeValues(
constexpr int max_batch_size = num_warps * cudf::detail::warp_size;

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;
int const dtype = s->col.data_type & 7;
int const dtype = s->col.physical_type;

// decode values
int pos = start;
Expand All @@ -187,7 +187,7 @@ __device__ inline void gpuDecodeValues(
uint32_t dtype_len = s->dtype_len;
void* dst =
nesting_info_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
if (s->col.converted_type == DECIMAL) {
if (s->col.logical_type.has_value() && s->col.logical_type->type == LogicalType::DECIMAL) {
switch (dtype) {
case INT32: gpuOutputFast(s, sb, src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, sb, src_pos, static_cast<uint2*>(dst)); break;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
// we only need to preprocess hierarchies with repetition in them (ie, hierarchies
// containing lists anywhere within).
compute_string_sizes =
compute_string_sizes && ((s->col.data_type & 7) == BYTE_ARRAY && s->dtype_len != 4);
compute_string_sizes && s->col.physical_type == BYTE_ARRAY && !s->col.is_strings_to_cat;

// early out optimizations:

Expand Down
18 changes: 10 additions & 8 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
if (s->dict_base) {
out_thread0 = (s->dict_bits > 0) ? 64 : 32;
} else {
switch (s->col.data_type & 7) {
switch (s->col.physical_type) {
case BOOLEAN: [[fallthrough]];
case BYTE_ARRAY: [[fallthrough]];
case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break;
Expand Down Expand Up @@ -123,16 +123,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
// be needed in the other DecodeXXX kernels.
if (s->dict_base) {
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, t & 0x1f).first;
} else if ((s->col.data_type & 7) == BOOLEAN) {
} else if (s->col.physical_type == BOOLEAN) {
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if ((s->col.data_type & 7) == BYTE_ARRAY or
(s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
} else if (s->col.physical_type == BYTE_ARRAY or
s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
}
if (t == 32) { s->dict_pos = src_target_pos; }
} else {
// WARP1..WARP3: Decode values
int const dtype = s->col.data_type & 7;
int const dtype = s->col.physical_type;
src_pos += t - out_thread0;

// the position in the output column/buffer
Expand Down Expand Up @@ -166,10 +166,12 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
uint32_t dtype_len = s->dtype_len;
void* dst =
nesting_info_base[leaf_level_index].data_out + static_cast<size_t>(dst_pos) * dtype_len;
auto const is_decimal =
s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL;
if (dtype == BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
if (is_decimal) {
auto const [ptr, len] = gpuGetStringData(s, sb, val_src_pos);
auto const decimal_precision = s->col.decimal_precision;
auto const decimal_precision = s->col.logical_type->precision();
if (decimal_precision <= MAX_DECIMAL32_PRECISION) {
gpuOutputByteArrayAsInt(ptr, len, static_cast<int32_t*>(dst));
} else if (decimal_precision <= MAX_DECIMAL64_PRECISION) {
Expand All @@ -182,7 +184,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
} else if (dtype == BOOLEAN) {
gpuOutputBoolean(sb, val_src_pos, static_cast<uint8_t*>(dst));
} else if (s->col.converted_type == DECIMAL) {
} else if (is_decimal) {
switch (dtype) {
case INT32: gpuOutputFast(s, sb, val_src_pos, static_cast<uint32_t*>(dst)); break;
case INT64: gpuOutputFast(s, sb, val_src_pos, static_cast<uint2*>(dst)); break;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/io/parquet/page_data.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ template <typename state_buf>
inline __device__ void gpuOutputString(page_state_s* s, state_buf* sb, int src_pos, void* dstv)
{
auto [ptr, len] = gpuGetStringData(s, sb, src_pos);
// make sure to only hash `BYTE_ARRAY` when specified with the output type size
if (s->dtype_len == 4 and (s->col.data_type & 7) == BYTE_ARRAY) {
if (s->col.is_strings_to_cat and s->col.physical_type == BYTE_ARRAY) {
// Output hash. This hash value is used if the option to convert strings to
// categoricals is enabled. The seed value is chosen arbitrarily.
uint32_t constexpr hash_seed = 33;
Expand Down
58 changes: 30 additions & 28 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int ta

while (pos < target_pos) {
int len = 0;
if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (s->col.physical_type == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) { len = s->dtype_len_in; }
} else {
if (k + 4 <= dict_size) {
Expand Down Expand Up @@ -1144,11 +1144,11 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (s->page.num_input_values > 0) {
uint8_t* cur = s->page.page_data;
uint8_t* end = cur + s->page.uncompressed_page_size;

uint32_t dtype_len_out = s->col.data_type >> 3;
s->ts_scale = 0;
s->ts_scale = 0;
// Validate data type
auto const data_type = s->col.data_type & 7;
auto const data_type = s->col.physical_type;
auto const is_decimal =
s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL;
switch (data_type) {
case BOOLEAN:
s->dtype_len = 1; // Boolean are stored as 1 byte on the output
Expand All @@ -1159,13 +1159,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (s->col.ts_clock_rate) {
int32_t units = 0;
// Duration types are not included because no scaling is done when reading
if (s->col.converted_type == TIMESTAMP_MILLIS) {
units = cudf::timestamp_ms::period::den;
} else if (s->col.converted_type == TIMESTAMP_MICROS) {
units = cudf::timestamp_us::period::den;
} else if (s->col.logical_type.has_value() and
s->col.logical_type->is_timestamp_nanos()) {
units = cudf::timestamp_ns::period::den;
if (s->col.logical_type.has_value()) {
auto const& lt = s->col.logical_type.value();
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
if (lt.is_timestamp_millis()) {
units = cudf::timestamp_ms::period::den;
} else if (lt.is_timestamp_micros()) {
units = cudf::timestamp_us::period::den;
} else if (lt.is_timestamp_nanos()) {
units = cudf::timestamp_ns::period::den;
}
}
if (units and units != s->col.ts_clock_rate) {
s->ts_scale = (s->col.ts_clock_rate < units) ? -(units / s->col.ts_clock_rate)
Expand All @@ -1176,8 +1178,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case DOUBLE: s->dtype_len = 8; break;
case INT96: s->dtype_len = 12; break;
case BYTE_ARRAY:
if (s->col.converted_type == DECIMAL) {
auto const decimal_precision = s->col.decimal_precision;
if (is_decimal) {
auto const decimal_precision = s->col.logical_type->precision();
s->dtype_len = [decimal_precision]() {
if (decimal_precision <= MAX_DECIMAL32_PRECISION) {
return sizeof(int32_t);
Expand All @@ -1192,14 +1194,14 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}
break;
default: // FIXED_LEN_BYTE_ARRAY:
s->dtype_len = dtype_len_out;
s->dtype_len = s->col.type_length;
if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); }
break;
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (data_type == FIXED_LEN_BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
if (is_decimal) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
Expand All @@ -1213,17 +1215,17 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dtype_len = sizeof(string_index_pair);
}
} else if (data_type == INT32) {
if (dtype_len_out == 1) {
// INT8 output
s->dtype_len = 1;
} else if (dtype_len_out == 2) {
// INT16 output
s->dtype_len = 2;
} else if (s->col.converted_type == TIME_MILLIS) {
// INT64 output
s->dtype_len = 8;
// check for smaller bitwidths
if (s->col.logical_type.has_value()) {
auto const& lt = s->col.logical_type.value();
if (lt.type == LogicalType::INTEGER) {
s->dtype_len = lt.bit_width() / 8;
} else if (lt.is_time_millis()) {
// cudf outputs as INT64
s->dtype_len = 8;
}
}
} else if (data_type == BYTE_ARRAY && dtype_len_out == 4) {
} else if (data_type == BYTE_ARRAY && s->col.is_strings_to_cat) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->dtype_len = 4; // HASH32 output
} else if (data_type == INT96) {
s->dtype_len = 8; // Convert to 64-bit timestamp
Expand Down Expand Up @@ -1298,7 +1300,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) {
if (s->col.physical_type == BYTE_ARRAY && s->col.str_dict_index != nullptr) {
// String dictionary: use index
s->dict_base = reinterpret_cast<uint8_t const*>(s->col.str_dict_index);
s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair);
Expand All @@ -1316,7 +1318,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
case Encoding::PLAIN:
s->dict_size = static_cast<int32_t>(end - cur);
s->dict_val = 0;
if ((s->col.data_type & 7) == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; }
if (s->col.physical_type == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; }
break;
case Encoding::RLE: {
// first 4 bytes are length of RLE data
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ __device__ inline bool is_nested(ColumnChunkDesc const& chunk)

__device__ inline bool is_byte_array(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BYTE_ARRAY;
return chunk.physical_type == BYTE_ARRAY;
}

__device__ inline bool is_boolean(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BOOLEAN;
return chunk.physical_type == BOOLEAN;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPage
auto const start_value = pp->start_val;

// if data size is known, can short circuit here
if ((chunks[pp->chunk_idx].data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (chunks[pp->chunk_idx].physical_type == FIXED_LEN_BYTE_ARRAY) {
if (t == 0) {
pp->str_bytes = pp->num_valids * s->dtype_len_in;

Expand Down Expand Up @@ -881,7 +881,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
if ((col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (col.physical_type == FIXED_LEN_BYTE_ARRAY) {
str_bytes = pp->num_valids * s->dtype_len_in;
} else {
// now process string info in the range [start_value, end_value)
Expand Down
41 changes: 23 additions & 18 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,32 +370,32 @@ struct ColumnChunkDesc {
explicit ColumnChunkDesc(size_t compressed_size_,
uint8_t* compressed_data_,
size_t num_values_,
uint16_t datatype_,
uint16_t datatype_length_,
Type datatype_,
int32_t datatype_length_,
size_t start_row_,
uint32_t num_rows_,
int16_t max_definition_level_,
int16_t max_repetition_level_,
int16_t max_nesting_depth_,
uint8_t def_level_bits_,
uint8_t rep_level_bits_,
int8_t codec_,
int8_t converted_type_,
Compression codec_,
thrust::optional<LogicalType> logical_type_,
int8_t decimal_precision_,
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_,
column_chunk_info const* chunk_info_,
float list_bytes_per_row_est_)
float list_bytes_per_row_est_,
bool strings_to_categorical_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
start_row(start_row_),
num_rows(num_rows_),
max_level{max_definition_level_, max_repetition_level_},
max_nesting_depth{max_nesting_depth_},
data_type(datatype_ | (datatype_length_ << 3)),
type_length(datatype_length_),
physical_type(datatype_),
level_bits{def_level_bits_, rep_level_bits_},
num_data_pages(0),
num_dict_pages(0),
Expand All @@ -405,14 +405,13 @@ struct ColumnChunkDesc {
column_data_base{nullptr},
column_string_base{nullptr},
codec(codec_),
converted_type(converted_type_),
logical_type(logical_type_),
decimal_precision(decimal_precision_),
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_)
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
{
}

Expand All @@ -423,7 +422,8 @@ struct ColumnChunkDesc {
uint32_t num_rows{}; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]{}; // max definition/repetition level
int16_t max_nesting_depth{}; // max nesting depth of the output
uint16_t data_type{}; // basic column data type, ((type_length << 3) | // parquet::Type)
int32_t type_length{}; // type length from schema (for FLBA only)
Type physical_type{}; // parquet physical data type
uint8_t
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
Expand All @@ -433,10 +433,8 @@ struct ColumnChunkDesc {
bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column
void** column_data_base{}; // base pointers of column data
void** column_string_base{}; // base pointers of column string data
int8_t codec{}; // compressed codec enum
int8_t converted_type{}; // converted type enum
Compression codec{}; // compressed codec enum
thrust::optional<LogicalType> logical_type{}; // logical type
int8_t decimal_precision{}; // Decimal precision
int32_t ts_clock_rate{}; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)

int32_t src_col_index{}; // my input column index
Expand All @@ -446,6 +444,8 @@ struct ColumnChunkDesc {
column_chunk_info const* h_chunk_info{};

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row

bool is_strings_to_cat{}; // convert strings to hashes
};

/**
Expand Down Expand Up @@ -615,11 +615,16 @@ struct EncPage {
*/
constexpr bool is_string_col(ColumnChunkDesc const& chunk)
{
auto const not_converted_to_decimal = chunk.converted_type != DECIMAL;
// return true for non-hashed byte_array and fixed_len_byte_array that isn't representing
// a decimal.
if (chunk.logical_type.has_value() and chunk.logical_type->type == LogicalType::DECIMAL) {
return false;
}

auto const non_hashed_byte_array =
(chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4;
auto const fixed_len_byte_array = (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY;
return not_converted_to_decimal and (non_hashed_byte_array or fixed_len_byte_array);
chunk.physical_type == BYTE_ARRAY and not chunk.is_strings_to_cat;
auto const fixed_len_byte_array = chunk.physical_type == FIXED_LEN_BYTE_ARRAY;
return non_hashed_byte_array or fixed_len_byte_array;
}

/**
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@

namespace cudf::io::parquet::detail {

namespace {
// Tests the passed in logical type for a FIXED_LENGTH_BYTE_ARRAY column to see if it should
// be treated as a string. Currently the only logical type that has special handling is DECIMAL.
// Other valid types in the future would be UUID (still treated as string) and FLOAT16 (which
// for now would also be treated as a string).
inline bool is_treat_fixed_length_as_string(thrust::optional<LogicalType> const& logical_type)
{
if (!logical_type.has_value()) { return true; }
return logical_type->type != LogicalType::DECIMAL;
}

} // namespace

void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
Expand Down Expand Up @@ -66,7 +79,8 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
// TODO: we could probably dummy up size stats for FLBA data since we know the width
auto const has_flba =
std::any_of(pass.chunks.begin(), pass.chunks.end(), [](auto const& chunk) {
return (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY && chunk.converted_type != DECIMAL;
return chunk.physical_type == FIXED_LEN_BYTE_ARRAY and
is_treat_fixed_length_as_string(chunk.logical_type);
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
Expand Down
Loading
Loading