From 79cb85110d018f3a493dee062f4db21b1ae44c4f Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 23:39:50 +0000 Subject: [PATCH 1/5] build string dictionaries for FLBA --- cpp/src/io/parquet/reader_impl_preprocess.cu | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 4b7a64ac6ab..c3fb4b1a301 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -15,6 +15,7 @@ */ #include "error.hpp" +#include "io/parquet/parquet_common.hpp" #include "reader_impl.hpp" #include @@ -636,6 +637,15 @@ void decode_page_headers(pass_intermediate_data& pass, stream.synchronize(); } +constexpr bool is_string_chunk(ColumnChunkDesc const& chunk) +{ + auto const is_decimal = + chunk.logical_type.has_value() and chunk.logical_type->type == LogicalType::DECIMAL; + auto const is_binary = + chunk.physical_type == BYTE_ARRAY or chunk.physical_type == FIXED_LEN_BYTE_ARRAY; + return is_binary and not is_decimal; +} + struct set_str_dict_index_count { device_span str_dict_index_count; device_span chunks; @@ -643,8 +653,8 @@ struct set_str_dict_index_count { __device__ void operator()(PageInfo const& page) { auto const& chunk = chunks[page.chunk_idx]; - if ((page.flags & PAGEINFO_FLAGS_DICTIONARY) && chunk.physical_type == BYTE_ARRAY && - (chunk.num_dict_pages > 0)) { + if ((page.flags & PAGEINFO_FLAGS_DICTIONARY) != 0 and chunk.num_dict_pages > 0 and + is_string_chunk(chunk)) { // there is only ever one dictionary page per chunk, so this is safe to do in parallel. str_dict_index_count[page.chunk_idx] = page.num_input_values; } @@ -659,7 +669,7 @@ struct set_str_dict_index_ptr { __device__ void operator()(size_t i) { auto& chunk = chunks[i]; - if (chunk.physical_type == BYTE_ARRAY && (chunk.num_dict_pages > 0)) { + if (chunk.num_dict_pages > 0 and is_string_chunk(chunk)) { chunk.str_dict_index = base + str_dict_index_offsets[i]; } } From 0b311e02bf8c13696fbd4e73376299078a9629f7 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 23:56:38 +0000 Subject: [PATCH 2/5] take FLBA into account when building string dict index --- cpp/src/io/parquet/page_hdr.cu | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 6c6afde29e4..92aa1a077a1 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -538,17 +538,28 @@ CUDF_KERNEL void __launch_bounds__(128) int pos = 0, cur = 0; for (int i = 0; i < num_entries; i++) { int len = 0; - if (cur + 4 <= dict_size) { - len = dict[cur + 0] | (dict[cur + 1] << 8) | (dict[cur + 2] << 16) | (dict[cur + 3] << 24); - if (len >= 0 && cur + 4 + len <= dict_size) { + if (ck->physical_type == FIXED_LEN_BYTE_ARRAY) { + if (cur < dict_size) { + len = ck->type_length; pos = cur; - cur = cur + 4 + len; + cur += len; } else { cur = dict_size; } + } else { + if (cur + 4 <= dict_size) { + len = + dict[cur + 0] | (dict[cur + 1] << 8) | (dict[cur + 2] << 16) | (dict[cur + 3] << 24); + if (len >= 0 && cur + 4 + len <= dict_size) { + pos = cur + 4; + cur = cur + 4 + len; + } else { + cur = dict_size; + } + } } // TODO: Could store 8 entries in shared mem, then do a single warp-wide store - dict_index[i].first = reinterpret_cast(dict + pos + 4); + dict_index[i].first = reinterpret_cast(dict + pos); dict_index[i].second = len; } } From 99e0cfa28e6402198b19e12cd93a2ae9f0c36aa1 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 26 Apr 2024 00:45:21 +0000 Subject: [PATCH 3/5] add test and fix initialization of dict_base --- cpp/src/io/parquet/page_decode.cuh | 12 +++++++++--- python/cudf/cudf/tests/test_parquet.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 0c139fced24..6b8992e47e0 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -17,7 +17,9 @@ #pragma once #include "error.hpp" +#include "io/parquet/parquet.hpp" #include "io/utilities/block_utils.cuh" +#include "parquet_common.hpp" #include "parquet_gpu.hpp" #include "rle_stream.cuh" @@ -1298,9 +1300,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // be made to is_supported_encoding() in reader_impl_preprocess.cu switch (s->page.encoding) { case Encoding::PLAIN_DICTIONARY: - case Encoding::RLE_DICTIONARY: + case Encoding::RLE_DICTIONARY: { // RLE-packed dictionary indices, first byte indicates index length in bits - if (s->col.physical_type == BYTE_ARRAY && s->col.str_dict_index != nullptr) { + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; + if ((s->col.physical_type == BYTE_ARRAY or + s->col.physical_type == FIXED_LEN_BYTE_ARRAY) and + not is_decimal and s->col.str_dict_index != nullptr) { // String dictionary: use index s->dict_base = reinterpret_cast(s->col.str_dict_index); s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair); @@ -1314,7 +1320,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->dict_bits > 32 || (!s->dict_base && s->col.dict_page->num_input_values > 0)) { s->set_error_code(decode_error::INVALID_DICT_WIDTH); } - break; + } break; case Encoding::PLAIN: case Encoding::BYTE_STREAM_SPLIT: s->dict_size = static_cast(end - cur); diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 56a4281aad9..b2e28d4d933 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2,6 +2,7 @@ import datetime import glob +import hashlib import math import os import pathlib @@ -2807,6 +2808,24 @@ def test_parquet_reader_fixed_bin(datadir): assert_eq(expect, got) +def test_parquet_reader_fixed_len_with_dict(tmpdir): + def flba(i): + hasher = hashlib.sha256() + hasher.update(i.to_bytes(4, "little")) + return hasher.digest() + + # use pyarrow to write table of fixed_len_byte_array + num_rows = 200 + data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32)) + padf = pa.Table.from_arrays([data], names=["flba"]) + padf_fname = tmpdir.join("padf.parquet") + pq.write_table(padf, padf_fname, use_dictionary=True) + + expect = pd.read_parquet(padf_fname) + got = cudf.read_parquet(padf_fname) + assert_eq(expect, got) + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" From ce99d2e068b515f6c0a700c54671237947e8ce49 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 25 Apr 2024 20:23:29 -0700 Subject: [PATCH 4/5] clean up test of dict length --- cpp/src/io/parquet/page_decode.cuh | 2 -- cpp/src/io/parquet/page_hdr.cu | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 6b8992e47e0..6c9ee3cf648 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -17,9 +17,7 @@ #pragma once #include "error.hpp" -#include "io/parquet/parquet.hpp" #include "io/utilities/block_utils.cuh" -#include "parquet_common.hpp" #include "parquet_gpu.hpp" #include "rle_stream.cuh" diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 92aa1a077a1..28a4c053151 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -539,7 +539,7 @@ CUDF_KERNEL void __launch_bounds__(128) for (int i = 0; i < num_entries; i++) { int len = 0; if (ck->physical_type == FIXED_LEN_BYTE_ARRAY) { - if (cur < dict_size) { + if (cur + ck->type_length <= dict_size) { len = ck->type_length; pos = cur; cur += len; diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index c3fb4b1a301..2bcc16a8a08 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -15,7 +15,6 @@ */ #include "error.hpp" -#include "io/parquet/parquet_common.hpp" #include "reader_impl.hpp" #include From fc1bda0780a9aba647145a62dd6a1475ffdf8a57 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 25 Apr 2024 22:32:43 -0700 Subject: [PATCH 5/5] simplification --- cpp/src/io/parquet/page_hdr.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 28a4c053151..cf0dd85e490 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -552,7 +552,7 @@ CUDF_KERNEL void __launch_bounds__(128) dict[cur + 0] | (dict[cur + 1] << 8) | (dict[cur + 2] << 16) | (dict[cur + 3] << 24); if (len >= 0 && cur + 4 + len <= dict_size) { pos = cur + 4; - cur = cur + 4 + len; + cur = pos + len; } else { cur = dict_size; }