Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column indexes to Parquet writer #11302

Merged
merged 85 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
7892c5a
fix CheckPageRows to use datasources
etseidl Jun 29, 2022
2ed90a0
add thrift support for parquet column and offset indexes
etseidl Jun 30, 2022
5303443
fix a bug in writing of min/max statistics for decimal128 types in pa…
etseidl Jun 30, 2022
7349adb
forgot to replace one fp_scratch
etseidl Jun 30, 2022
617faf3
Merge remote-tracking branch 'origin/feature/parquet-serde' into feat…
etseidl Jun 30, 2022
8fba754
Merge remote-tracking branch 'origin/feature/decimal128_stats' into f…
etseidl Jun 30, 2022
2a77e5b
modify parquet writer to add column indexes
etseidl Jun 30, 2022
65ea003
change scratch to void*
etseidl Jun 30, 2022
6ef2f2b
fix suggested by reviewer
etseidl Jul 1, 2022
80ec547
better documentation for read_footer function, and change return
etseidl Jul 1, 2022
c4f0f9c
change typing in aggregation_type to match extrema_type
etseidl Jul 2, 2022
1591bdd
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 4, 2022
7e8d038
update copyright
etseidl Jul 4, 2022
f142141
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 5, 2022
b88807e
use CUDF_EXPECTS rather than the EXPECT_XX macros when testing for
etseidl Jul 5, 2022
edb7f86
Merge remote-tracking branch 'origin/feature/11038' into feature/colidx
etseidl Jul 5, 2022
eed2920
switch to using CUDF_EXPECTS more often, clean up debug statements
etseidl Jul 5, 2022
ba6b9ac
add some comments to test code
etseidl Jul 5, 2022
47de717
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 5, 2022
06822a6
Merge remote-tracking branch 'origin/feature/decimal128_stats' into f…
etseidl Jul 5, 2022
5c4b50e
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 6, 2022
646135b
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 6, 2022
646d934
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 6, 2022
ef3997f
Merge branch 'branch-22.08' into feature/colidx
etseidl Jul 6, 2022
2133fda
formatting
etseidl Jul 6, 2022
e1f451c
add read_column_index and read_offset_index methods
etseidl Jul 7, 2022
18f041b
add read_page_header
etseidl Jul 7, 2022
b330680
add function to parse statistics
etseidl Jul 7, 2022
562bf89
delete commented out line
etseidl Jul 7, 2022
aae5aa9
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 7, 2022
5ab7e19
formatting
etseidl Jul 7, 2022
af4b4bd
Merge remote-tracking branch 'github/feature/decimal128_stats' into f…
etseidl Jul 7, 2022
abe98a4
only allocate memory for column indexes if needed
etseidl Jul 7, 2022
9542388
add some comments and get rid of magic number
etseidl Jul 7, 2022
088672b
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 7, 2022
2147971
Merge branch 'branch-22.08' into feature/decimal128_stats
etseidl Jul 8, 2022
f206255
add test of decimal128 stats
etseidl Jul 8, 2022
7be2705
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
9f0fa88
ensure stats will be written
etseidl Jul 8, 2022
be73d05
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
b031393
Merge branch 'feature/decimal128_stats' of github.com:etseidl/cudf in…
etseidl Jul 8, 2022
7139f51
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
722cf34
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
c676cb3
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
e11de2d
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
b6b85d3
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
17f71de
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 8, 2022
b530bee
changes from review
etseidl Jul 11, 2022
7d9b8ae
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 11, 2022
025e6a0
more test clean up
etseidl Jul 11, 2022
0f2f5bf
Apply suggestions from code review
etseidl Jul 11, 2022
48ec76a
fix formatting
etseidl Jul 11, 2022
a96ecb7
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 11, 2022
934fc76
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 11, 2022
f31f867
more changes from review
etseidl Jul 12, 2022
a07dac1
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 12, 2022
2b6e915
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 13, 2022
7085234
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 13, 2022
591b847
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 14, 2022
2547da7
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 14, 2022
f8a35a7
Merge branch 'feature/decimal128_stats' into feature/colidx
etseidl Jul 14, 2022
f929745
fix some things missed in merge
etseidl Jul 14, 2022
ad621e9
rework metadata readers
etseidl Jul 14, 2022
9ffd509
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 14, 2022
656b826
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 15, 2022
6a48bc6
Merge branch 'rapidsai:branch-22.08' into feature/colidx
etseidl Jul 19, 2022
e57331a
change some function names to snake case
etseidl Jul 19, 2022
ed9c38a
replace get_min_max with get_extremum
etseidl Jul 19, 2022
2f52654
format fixes
etseidl Jul 19, 2022
c7f1f9c
Merge branch 'branch-22.08' into feature/colidx
etseidl Jul 19, 2022
f2d439d
change some refs to pointers
etseidl Jul 19, 2022
1ecab10
clean up some comments and add unreachable message to compare_values()
etseidl Jul 21, 2022
372f64a
changes from review
etseidl Jul 21, 2022
82543af
use proper types for physical and converted type args
etseidl Jul 21, 2022
770338b
formatting fixes
etseidl Jul 21, 2022
fabfcbe
more const and data type cleanup
etseidl Jul 21, 2022
30b81bb
remove magic numbers and add more explanation to column_index_buffer_…
etseidl Jul 22, 2022
5c0e93c
rename put_byte per suggestion
etseidl Jul 22, 2022
7cac483
do not call functions with side effects from macros
etseidl Jul 22, 2022
383925b
refactor get_extremum
etseidl Jul 22, 2022
d814f6b
pass statistics_dtype rather than uint8_t
etseidl Jul 22, 2022
f74d185
change converted_type to have enum type in parquet_column_device_view
etseidl Jul 22, 2022
82df8a9
add more consts
etseidl Jul 22, 2022
c6f3750
add a little more clarification to column_index_buffer_size()
etseidl Jul 22, 2022
17b3389
make compare constexpr per suggestion
etseidl Jul 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ enum statistics_freq {
STATISTICS_NONE = 0, ///< No column statistics
STATISTICS_ROWGROUP = 1, ///< Per-Rowgroup column statistics
STATISTICS_PAGE = 2, ///< Per-page column statistics
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
Expand Down
276 changes: 246 additions & 30 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,12 @@ __global__ void __launch_bounds__(128) gpuDecideCompression(device_span<EncColum
/**
* Minimal thrift compact protocol support
*/
inline __device__ uint8_t* cpw_put_uint8(uint8_t* p, uint8_t v)
{
*p++ = v;
return p;
}

inline __device__ uint8_t* cpw_put_uint32(uint8_t* p, uint32_t v)
{
while (v > 0x7f) {
Expand Down Expand Up @@ -1237,6 +1243,35 @@ class header_encoder {
current_field_index = field;
}

inline __device__ void field_list_begin(int field, size_t len, int type)
{
current_header_ptr = cpw_put_fldh(current_header_ptr, field, current_field_index, ST_FLD_LIST);
current_header_ptr = cpw_put_uint8(
current_header_ptr, static_cast<uint8_t>((std::min(len, size_t{0xfu}) << 4) | type));
if (len >= 0xf) { current_header_ptr = cpw_put_uint32(current_header_ptr, len); }
current_field_index = 0;
}

inline __device__ void field_list_end(int field) { current_field_index = field; }

inline __device__ void put_bool(bool value)
{
current_header_ptr = cpw_put_uint8(current_header_ptr, value ? ST_FLD_TRUE : ST_FLD_FALSE);
}

inline __device__ void put_binary(const void* value, uint32_t length)
{
current_header_ptr = cpw_put_uint32(current_header_ptr, length);
memcpy(current_header_ptr, value, length);
current_header_ptr += length;
}

template <typename T>
inline __device__ void put_int64(T value)
{
current_header_ptr = cpw_put_int64(current_header_ptr, static_cast<int64_t>(value));
}

template <typename T>
inline __device__ void field_int32(int field, T value)
{
Expand Down Expand Up @@ -1286,12 +1321,13 @@ static __device__ void byte_reverse128(__int128_t v, void* dst)
d_char_ptr);
}

__device__ uint8_t* EncodeStatistics(uint8_t* start,
const statistics_chunk* s,
uint8_t dtype,
void* scratch)
__device__ void get_extremum(const statistics_val* stats_val,
statistics_dtype dtype,
void* scratch,
const void** val,
uint32_t* len)
{
uint8_t *end, dtype_len;
uint8_t dtype_len;
switch (dtype) {
case dtype_bool: dtype_len = 1; break;
case dtype_int8:
Expand All @@ -1307,37 +1343,40 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start,
case dtype_string:
default: dtype_len = 0; break;
}

if (dtype == dtype_string) {
*len = stats_val->str_val.length;
*val = stats_val->str_val.ptr;
} else {
*len = dtype_len;
if (dtype == dtype_float32) { // Convert from double to float32
auto const fp_scratch = static_cast<float*>(scratch);
fp_scratch[0] = stats_val->fp_val;
*val = scratch;
} else if (dtype == dtype_decimal128) {
byte_reverse128(stats_val->d128_val, scratch);
*val = scratch;
} else {
*val = stats_val;
}
}
}

__device__ uint8_t* EncodeStatistics(uint8_t* start,
const statistics_chunk* s,
statistics_dtype dtype,
void* scratch)
{
uint8_t* end;
header_encoder encoder(start);
encoder.field_int64(3, s->null_count);
if (s->has_minmax) {
const void *vmin, *vmax;
uint32_t lmin, lmax;

if (dtype == dtype_string) {
lmin = s->min_value.str_val.length;
vmin = s->min_value.str_val.ptr;
lmax = s->max_value.str_val.length;
vmax = s->max_value.str_val.ptr;
} else {
lmin = lmax = dtype_len;
if (dtype == dtype_float32) { // Convert from double to float32
auto const fp_scratch = static_cast<float*>(scratch);
fp_scratch[0] = s->min_value.fp_val;
fp_scratch[1] = s->max_value.fp_val;
vmin = &fp_scratch[0];
vmax = &fp_scratch[1];
} else if (dtype == dtype_decimal128) {
auto const d128_scratch = static_cast<uint8_t*>(scratch);
byte_reverse128(s->min_value.d128_val, d128_scratch);
byte_reverse128(s->max_value.d128_val, &d128_scratch[16]);
vmin = &d128_scratch[0];
vmax = &d128_scratch[16];
} else {
vmin = &s->min_value;
vmax = &s->max_value;
}
}
get_extremum(&s->max_value, dtype, scratch, &vmax, &lmax);
encoder.field_binary(5, vmax, lmax);
get_extremum(&s->min_value, dtype, scratch, &vmin, &lmin);
encoder.field_binary(6, vmin, lmin);
}
encoder.end(&end);
Expand All @@ -1355,7 +1394,7 @@ __global__ void __launch_bounds__(128)
__shared__ __align__(8) parquet_column_device_view col_g;
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;
__shared__ __align__(8) unsigned char scratch[32];
__shared__ __align__(8) unsigned char scratch[16];

uint32_t t = threadIdx.x;

Expand Down Expand Up @@ -1479,6 +1518,176 @@ __global__ void __launch_bounds__(1024)
}
}

/**
* @brief Tests if statistics are comparable given the column's
* physical and converted types
*/
static __device__ bool is_comparable(Type ptype, ConvertedType ctype)
{
switch (ptype) {
case Type::BOOLEAN:
case Type::INT32:
case Type::INT64:
case Type::FLOAT:
case Type::DOUBLE:
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
case Type::BYTE_ARRAY: return true;
case Type::FIXED_LEN_BYTE_ARRAY:
if (ctype == ConvertedType::DECIMAL) { return true; }
[[fallthrough]];
default: return false;
}
}

/**
* @brief Compares two values.
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
*/
template <typename T>
constexpr __device__ int32_t compare(T& v1, T& v2)
{
return (v1 > v2) - (v1 < v2);
}

/**
* @brief Compares two statistics_val structs.
* @return < 0 if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2
*/
static __device__ int32_t compare_values(Type ptype,
ConvertedType ctype,
const statistics_val& v1,
const statistics_val& v2)
{
switch (ptype) {
case Type::BOOLEAN: return compare(v1.u_val, v2.u_val);
case Type::INT32:
case Type::INT64:
switch (ctype) {
case ConvertedType::UINT_8:
case ConvertedType::UINT_16:
case ConvertedType::UINT_32:
case ConvertedType::UINT_64: return compare(v1.u_val, v2.u_val);
default: // assume everything else is signed
return compare(v1.i_val, v2.i_val);
}
case Type::FLOAT:
case Type::DOUBLE: return compare(v1.fp_val, v2.fp_val);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
case Type::BYTE_ARRAY: return static_cast<string_view>(v1.str_val).compare(v2.str_val);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race is on to see who can merge first!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're going to win...there's so much to do still. >.<

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you haven't seen the state of my PRs yet!

case Type::FIXED_LEN_BYTE_ARRAY:
if (ctype == ConvertedType::DECIMAL) { return compare(v1.d128_val, v2.d128_val); }
}
// calling is_comparable() should prevent reaching here
CUDF_UNREACHABLE("Trying to compare non-comparable type");
return 0;
}

/**
* @brief Determine if a set of statstistics are in ascending order.
*/
static __device__ bool is_ascending(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
for (uint32_t i = 1; i < num_pages; i++) {
if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) > 0 ||
compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) > 0) {
return false;
}
}
return true;
}

/**
* @brief Determine if a set of statstistics are in descending order.
*/
static __device__ bool is_descending(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
for (uint32_t i = 1; i < num_pages; i++) {
if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) < 0 ||
compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) < 0) {
return false;
}
}
return true;
}

/**
* @brief Determine the ordering of a set of statistics.
*/
static __device__ int32_t calculate_boundary_order(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
if (not is_comparable(ptype, ctype)) { return BoundaryOrder::UNORDERED; }
if (is_ascending(s, ptype, ctype, num_pages)) {
return BoundaryOrder::ASCENDING;
} else if (is_descending(s, ptype, ctype, num_pages)) {
return BoundaryOrder::DESCENDING;
}
return BoundaryOrder::UNORDERED;
}

// blockDim(1, 1, 1)
__global__ void __launch_bounds__(1)
gpuEncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this kernel gets invoked as a single thread but it seems to be doing per-page work. Is the number of pages ever large enough that might make sense to parallelize this work a bit? Like maybe:

  • 1024 threads in 1 block, 1 thread per page storing result in shared, with t0 writing things out at the end
  • A 2-kernel arrangement that goes wide on the pages, stores intermediates in global and then a second pass to do the writing?

It doesn't look like there's too much work being done per page, but maybe if we have a zillion pages there's some wins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit like gpuEncodePageHeaders, where t0 does all the work and the other threads just wait for the sync up at the end. In the profiling I've done, the cost of this step is very small (1ms vs 37 ms for encode pages and 400ms for snap_kernel). I don't know if the juice would be worth the squeeze. Can definitely revisit later if need be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thumbs up for the phrase juice worth the squeeze...

{
const void *vmin, *vmax;
uint32_t lmin, lmax;
uint8_t* col_idx_end;
unsigned char scratch[16];

if (column_stats.empty()) { return; }

EncColumnChunk* ck_g = &chunks[blockIdx.x];
uint32_t num_pages = ck_g->num_pages;
parquet_column_device_view col_g = *ck_g->col_desc;
size_t first_data_page = ck_g->use_dictionary ? 1 : 0;
uint32_t pageidx = ck_g->first_page;

header_encoder encoder(ck_g->column_index_blob);

// null_pages
encoder.field_list_begin(1, num_pages - first_data_page, ST_FLD_TRUE);
for (uint32_t page = first_data_page; page < num_pages; page++) {
encoder.put_bool(column_stats[pageidx + page].non_nulls == 0);
}
encoder.field_list_end(1);
// min_values
encoder.field_list_begin(2, num_pages - first_data_page, ST_FLD_BINARY);
for (uint32_t page = first_data_page; page < num_pages; page++) {
get_extremum(&column_stats[pageidx + page].min_value, col_g.stats_dtype, scratch, &vmin, &lmin);
encoder.put_binary(vmin, lmin);
}
encoder.field_list_end(2);
// max_values
encoder.field_list_begin(3, num_pages - first_data_page, ST_FLD_BINARY);
for (uint32_t page = first_data_page; page < num_pages; page++) {
get_extremum(&column_stats[pageidx + page].max_value, col_g.stats_dtype, scratch, &vmax, &lmax);
encoder.put_binary(vmax, lmax);
}
encoder.field_list_end(3);
// boundary_order
encoder.field_int32(4,
calculate_boundary_order(&column_stats[first_data_page + pageidx],
col_g.physical_type,
col_g.converted_type,
num_pages - first_data_page));
// null_counts
encoder.field_list_begin(5, num_pages - first_data_page, ST_FLD_I64);
for (uint32_t page = first_data_page; page < num_pages; page++) {
encoder.put_int64(column_stats[pageidx + page].null_count);
}
encoder.field_list_end(5);
encoder.end(&col_idx_end, false);

ck_g->column_index_size = static_cast<uint32_t>(col_idx_end - ck_g->column_index_blob);
}

/**
* @brief Functor to get definition level value for a nested struct column until the leaf level or
* the first list level.
Expand Down Expand Up @@ -2067,6 +2276,13 @@ void GatherPages(device_span<EncColumnChunk> chunks,
gpuGatherPages<<<chunks.size(), 1024, 0, stream.value()>>>(chunks, pages);
}

void EncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats,
rmm::cuda_stream_view stream)
{
gpuEncodeColumnIndexes<<<chunks.size(), 1, 0, stream.value()>>>(chunks, column_stats);
}

} // namespace gpu
} // namespace parquet
} // namespace io
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ struct ColumnChunkDesc {
* @brief Struct describing an encoder column
*/
struct parquet_column_device_view : stats_column_desc {
Type physical_type; //!< physical data type
uint8_t converted_type; //!< logical data type
Type physical_type; //!< physical data type
ConvertedType converted_type; //!< logical data type
uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble)
//!< levels
constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; }
Expand Down Expand Up @@ -358,6 +358,8 @@ struct EncColumnChunk {
uint16_t* dict_index; //!< Index of value in dictionary page. column[dict_data[dict_index[row]]]
uint8_t dict_rle_bits; //!< Bit size for encoding dictionary indices
bool use_dictionary; //!< True if the chunk uses dictionary encoding
uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk
uint32_t column_index_size; //!< Size of column index blob
};

/**
Expand Down Expand Up @@ -632,6 +634,17 @@ void GatherPages(device_span<EncColumnChunk> chunks,
device_span<gpu::EncPage const> pages,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel to calculate ColumnIndex information per chunk
*
* @param[in,out] chunks Column chunks
* @param[in] column_stats Page-level statistics to be encoded
* @param[in] stream CUDA stream to use
*/
void EncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats,
rmm::cuda_stream_view stream);

} // namespace gpu
} // namespace parquet
} // namespace io
Expand Down
Loading