Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read FIXED_LEN_BYTE_ARRAY as binary in parquet reader #13437

Merged
merged 34 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
138cf3f
Read fixed len byte array as string
PointKernel May 24, 2023
f39a02f
Merge remote-tracking branch 'upstream/branch-23.08' into parquet-fix…
PointKernel May 24, 2023
64783d2
Merge with upstream
PointKernel Jul 3, 2023
fdc8b7e
Merge remote-tracking branch 'upstream/branch-23.08' into parquet-fix…
PointKernel Jul 3, 2023
c7b32b2
TEST
PointKernel Jul 5, 2023
2c685a2
Cleanups
PointKernel Jul 5, 2023
cb6825f
Revert test changes
PointKernel Jul 5, 2023
c75675b
Cleanups
PointKernel Jul 5, 2023
fac2eff
Set up dtype_len properly
PointKernel Jul 5, 2023
86e41a7
updates
PointKernel Jul 5, 2023
7f363b1
Cleanup
PointKernel Jul 5, 2023
70ef046
Use switch instead of if conditions
PointKernel Jul 5, 2023
8eee919
Merge remote-tracking branch 'upstream/branch-23.08' into parquet-fix…
PointKernel Jul 5, 2023
de90782
Merge remote-tracking branch 'upstream/branch-23.10' into parquet-fix…
PointKernel Aug 10, 2023
b393b7b
Fix a bug in string col determination
PointKernel Aug 10, 2023
aab1ced
Fix a bug in string col determination
PointKernel Aug 10, 2023
eeafbf7
Fix a bug: break non-default switchcase
PointKernel Aug 10, 2023
7209027
Cleanups: shortcircuit for FIXED_LEN_BYTE_ARRAY
PointKernel Aug 10, 2023
ac3b7a4
Read FIXED_LEN_BYTE_ARRAY always as binary
PointKernel Aug 11, 2023
a2a622d
Add FIXED_LEN_BYTE_ARRAY pytest
PointKernel Aug 11, 2023
5d01183
Minor fix: compare the lower 3 bits only
PointKernel Aug 11, 2023
9f5ff20
Merge branch 'branch-23.10' into parquet-fixed-len-binary
PointKernel Aug 14, 2023
50ec387
Use a smaller test file
PointKernel Aug 16, 2023
f756e56
Merge remote-tracking branch 'upstream/branch-23.10' into parquet-fix…
PointKernel Aug 16, 2023
6304001
Merge branch 'parquet-fixed-len-binary' of github.com:PointKernel/cud…
PointKernel Aug 16, 2023
a969c00
Minor cleanup: init right after declaration to get rid of if branches
PointKernel Aug 17, 2023
277557c
Cleanups: descriptive name for intermediate vars
PointKernel Aug 17, 2023
b712f5f
Add comment
PointKernel Aug 17, 2023
efc495d
Merge remote-tracking branch 'upstream/branch-23.10' into parquet-fix…
PointKernel Aug 17, 2023
2427416
Update cpp/src/io/parquet/reader_impl.cpp
PointKernel Aug 17, 2023
77b3d38
Merge branch 'branch-23.10' into parquet-fixed-len-binary
PointKernel Aug 21, 2023
6453662
Merge branch 'branch-23.10' into parquet-fixed-len-binary
vuule Aug 23, 2023
dfdcb76
Merge branch 'branch-23.10' into parquet-fixed-len-binary
vuule Aug 24, 2023
ed1c81c
Update python/cudf/cudf/tests/test_parquet.py
PointKernel Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ inline __device__ void gpuOutputString(volatile page_state_s* s,
void* dstv)
{
auto [ptr, len] = gpuGetStringData(s, sb, src_pos);
if (s->dtype_len == 4) {
if (s->dtype_len == 4 and (s->col.data_type & 7) == BYTE_ARRAY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this really be a logical and? It seems in the past we would take this path if the data size was 4 bytes, but now we only do it if the data size if 4 bytes AND it is a byte array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, only BYTE_ARRAY invokes gpuOutputString and the input length cannot be 4 in that case. Now with function being potentially invoked by FIXED_LEN_BYTE_ARRAY where the length could be 4, this and logic is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a comment along the lines of "// make sure to only hash variable length byte arrays when specified with the output type size"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really related to this PR, but we have this dtype_len == 4 condition as a stand-in for "output hash" in a few places and it really requires detailed knowledge of the code to understand. Nothing actionable, I just don't like it :D

// Output hash. This hash value is used if the option to convert strings to
// categoricals is enabled. The seed value is chosen arbitrarily.
uint32_t constexpr hash_seed = 33;
Expand Down Expand Up @@ -774,8 +774,12 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
if (s->dict_base) {
out_thread0 = (s->dict_bits > 0) ? 64 : 32;
} else {
out_thread0 =
((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32;
switch (s->col.data_type & 7) {
case BOOLEAN: [[fallthrough]];
case BYTE_ARRAY: [[fallthrough]];
case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break;
default: out_thread0 = 32;
}
}

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;
Expand Down Expand Up @@ -812,7 +816,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, t & 0x1f).first;
} else if ((s->col.data_type & 7) == BOOLEAN) {
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if ((s->col.data_type & 7) == BYTE_ARRAY) {
} else if ((s->col.data_type & 7) == BYTE_ARRAY or
(s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
}
if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; }
Expand Down Expand Up @@ -882,6 +887,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
break;
}
} else if (dtype == FIXED_LEN_BYTE_ARRAY) {
gpuOutputString(s, sb, val_src_pos, dst);
} else if (dtype == INT96) {
gpuOutputInt96Timestamp(s, sb, val_src_pos, static_cast<int64_t*>(dst));
} else if (dtype_len == 8) {
Expand Down
44 changes: 28 additions & 16 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,20 @@ __device__ size_type gpuInitStringDescriptors(page_state_s volatile* s,

while (pos < target_pos) {
int len;
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) {
len = s->dtype_len_in;
} else {
len = 0;
}
} else {
len = 0;
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
} else {
len = 0;
}
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index(pos)] = k;
Expand Down Expand Up @@ -1147,16 +1155,20 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL && data_type == FIXED_LEN_BYTE_ARRAY) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
} else if (dtype_len <= sizeof(int64_t)) {
return sizeof(int64_t);
} else {
return sizeof(__int128_t);
}
}();
if (data_type == FIXED_LEN_BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
} else if (dtype_len <= sizeof(int64_t)) {
return sizeof(int64_t);
} else {
return sizeof(__int128_t);
}
}();
} else {
s->dtype_len = sizeof(string_index_pair);
vuule marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (data_type == INT32) {
if (dtype_len_out == 1) {
// INT8 output
Expand Down Expand Up @@ -1212,7 +1224,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len;
// if this is a string column, then dtype_len is a lie. data will be offsets rather
// than (ptr,len) tuples.
if (data_type == BYTE_ARRAY && s->dtype_len != 4) { len = sizeof(cudf::size_type); }
if (is_string_col(s->col)) { len = sizeof(cudf::size_type); }
nesting_info->data_out += (output_offset * len);
}
if (nesting_info->string_out != nullptr) {
Expand Down
69 changes: 37 additions & 32 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -595,39 +595,44 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
pp->num_valids = s->page.num_valids;
}

// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
auto const& col = s->col;
uint8_t const* data = s->data_start;
uint8_t const* const end = s->data_end;
uint8_t const* dict_base = nullptr;
int dict_size = 0;
size_t str_bytes = 0;

switch (pp->encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
}
auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
if ((col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
str_bytes = pp->num_valids * s->dtype_len_in;
} else {
// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
uint8_t const* data = s->data_start;
uint8_t const* const end = s->data_end;
uint8_t const* dict_base = nullptr;
int dict_size = 0;

switch (pp->encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
}

// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
break;
case Encoding::PLAIN:
dict_size = static_cast<int32_t>(end - data);
str_bytes = is_bounds_pg ? totalPlainEntriesSize(data, dict_size, start_value, end_value)
: dict_size - sizeof(int) * (pp->num_input_values - pp->num_nulls);
break;
// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
break;
case Encoding::PLAIN:
dict_size = static_cast<int32_t>(end - data);
str_bytes = is_bounds_pg ? totalPlainEntriesSize(data, dict_size, start_value, end_value)
: dict_size - sizeof(int) * pp->num_valids;
break;
}
}

if (t == 0) {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,9 @@ struct EncPage {
*/
constexpr bool is_string_col(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4 and
chunk.converted_type != DECIMAL;
return ((chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4 and
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
chunk.converted_type != DECIMAL) or
((chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY and chunk.converted_type != DECIMAL);
}

/**
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,15 @@ table_with_metadata reader::impl::read_chunk_internal(

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
auto const metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
// FIXED_LEN_BYTE_ARRAY always read as binary
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) {
metadata = std::make_optional<reader_column_schema>();
metadata->set_convert_binary_to_strings(false);
}
// Only construct `out_metadata` if `_output_metadata` has not been cached.
if (!_output_metadata) {
column_name_info& col_name = out_metadata.schema_info[i];
Expand Down
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2518,6 +2518,15 @@ def test_parquet_reader_binary_decimal(datadir):
assert_eq(expect, got)


def test_parquet_reader_fixed_bin(datadir):
fname = datadir / "fixed_len_byte_array.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname).to_pandas()
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

assert_eq(expect, got)


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

Expand Down