Skip to content

Commit

Permalink
Add column indexes to Parquet writer (#11302)
Browse files Browse the repository at this point in the history
Closes #9268.

The column indexes are actually two different structures. The column index itself which is essentially per-page min/max statistics, and the offset index which stores each page's location, compressed size, and first row index. Since the column index contains information already in the EncColumnChunk structure, I calculate and encode the column index per chunk on device, storing the result in a blob I added to the EncColumnChunk struct. The offset index requires information available only after writing the file, so it is created on the CPU and stored in the aggregate_writer_metadata struct. The indexes themselves are then written to the file before the footer.

The current implementation does not include truncation of the statistics as recommended.  This will be addressed in a later PR.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - https://github.com/nvdbaranec
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #11302
  • Loading branch information
etseidl authored Jul 26, 2022
1 parent 8faf2b0 commit 96f747b
Show file tree
Hide file tree
Showing 6 changed files with 1,190 additions and 77 deletions.
1 change: 1 addition & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ enum statistics_freq {
STATISTICS_NONE = 0, ///< No column statistics
STATISTICS_ROWGROUP = 1, ///< Per-Rowgroup column statistics
STATISTICS_PAGE = 2, ///< Per-page column statistics
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
Expand Down
276 changes: 246 additions & 30 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,12 @@ __global__ void __launch_bounds__(128) gpuDecideCompression(device_span<EncColum
/**
* Minimal thrift compact protocol support
*/
inline __device__ uint8_t* cpw_put_uint8(uint8_t* p, uint8_t v)
{
*p++ = v;
return p;
}

inline __device__ uint8_t* cpw_put_uint32(uint8_t* p, uint32_t v)
{
while (v > 0x7f) {
Expand Down Expand Up @@ -1223,6 +1229,35 @@ class header_encoder {
current_field_index = field;
}

inline __device__ void field_list_begin(int field, size_t len, int type)
{
current_header_ptr = cpw_put_fldh(current_header_ptr, field, current_field_index, ST_FLD_LIST);
current_header_ptr = cpw_put_uint8(
current_header_ptr, static_cast<uint8_t>((std::min(len, size_t{0xfu}) << 4) | type));
if (len >= 0xf) { current_header_ptr = cpw_put_uint32(current_header_ptr, len); }
current_field_index = 0;
}

inline __device__ void field_list_end(int field) { current_field_index = field; }

inline __device__ void put_bool(bool value)
{
current_header_ptr = cpw_put_uint8(current_header_ptr, value ? ST_FLD_TRUE : ST_FLD_FALSE);
}

inline __device__ void put_binary(const void* value, uint32_t length)
{
current_header_ptr = cpw_put_uint32(current_header_ptr, length);
memcpy(current_header_ptr, value, length);
current_header_ptr += length;
}

template <typename T>
inline __device__ void put_int64(T value)
{
current_header_ptr = cpw_put_int64(current_header_ptr, static_cast<int64_t>(value));
}

template <typename T>
inline __device__ void field_int32(int field, T value)
{
Expand Down Expand Up @@ -1272,12 +1307,13 @@ static __device__ void byte_reverse128(__int128_t v, void* dst)
d_char_ptr);
}

__device__ uint8_t* EncodeStatistics(uint8_t* start,
const statistics_chunk* s,
uint8_t dtype,
void* scratch)
__device__ void get_extremum(const statistics_val* stats_val,
statistics_dtype dtype,
void* scratch,
const void** val,
uint32_t* len)
{
uint8_t *end, dtype_len;
uint8_t dtype_len;
switch (dtype) {
case dtype_bool: dtype_len = 1; break;
case dtype_int8:
Expand All @@ -1293,37 +1329,40 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start,
case dtype_string:
default: dtype_len = 0; break;
}

if (dtype == dtype_string) {
*len = stats_val->str_val.length;
*val = stats_val->str_val.ptr;
} else {
*len = dtype_len;
if (dtype == dtype_float32) { // Convert from double to float32
auto const fp_scratch = static_cast<float*>(scratch);
fp_scratch[0] = stats_val->fp_val;
*val = scratch;
} else if (dtype == dtype_decimal128) {
byte_reverse128(stats_val->d128_val, scratch);
*val = scratch;
} else {
*val = stats_val;
}
}
}

__device__ uint8_t* EncodeStatistics(uint8_t* start,
const statistics_chunk* s,
statistics_dtype dtype,
void* scratch)
{
uint8_t* end;
header_encoder encoder(start);
encoder.field_int64(3, s->null_count);
if (s->has_minmax) {
const void *vmin, *vmax;
uint32_t lmin, lmax;

if (dtype == dtype_string) {
lmin = s->min_value.str_val.length;
vmin = s->min_value.str_val.ptr;
lmax = s->max_value.str_val.length;
vmax = s->max_value.str_val.ptr;
} else {
lmin = lmax = dtype_len;
if (dtype == dtype_float32) { // Convert from double to float32
auto const fp_scratch = static_cast<float*>(scratch);
fp_scratch[0] = s->min_value.fp_val;
fp_scratch[1] = s->max_value.fp_val;
vmin = &fp_scratch[0];
vmax = &fp_scratch[1];
} else if (dtype == dtype_decimal128) {
auto const d128_scratch = static_cast<uint8_t*>(scratch);
byte_reverse128(s->min_value.d128_val, d128_scratch);
byte_reverse128(s->max_value.d128_val, &d128_scratch[16]);
vmin = &d128_scratch[0];
vmax = &d128_scratch[16];
} else {
vmin = &s->min_value;
vmax = &s->max_value;
}
}
get_extremum(&s->max_value, dtype, scratch, &vmax, &lmax);
encoder.field_binary(5, vmax, lmax);
get_extremum(&s->min_value, dtype, scratch, &vmin, &lmin);
encoder.field_binary(6, vmin, lmin);
}
encoder.end(&end);
Expand All @@ -1341,7 +1380,7 @@ __global__ void __launch_bounds__(128)
__shared__ __align__(8) parquet_column_device_view col_g;
__shared__ __align__(8) EncColumnChunk ck_g;
__shared__ __align__(8) EncPage page_g;
__shared__ __align__(8) unsigned char scratch[32];
__shared__ __align__(8) unsigned char scratch[16];

uint32_t t = threadIdx.x;

Expand Down Expand Up @@ -1465,6 +1504,176 @@ __global__ void __launch_bounds__(1024)
}
}

/**
* @brief Tests if statistics are comparable given the column's
* physical and converted types
*/
static __device__ bool is_comparable(Type ptype, ConvertedType ctype)
{
switch (ptype) {
case Type::BOOLEAN:
case Type::INT32:
case Type::INT64:
case Type::FLOAT:
case Type::DOUBLE:
case Type::BYTE_ARRAY: return true;
case Type::FIXED_LEN_BYTE_ARRAY:
if (ctype == ConvertedType::DECIMAL) { return true; }
[[fallthrough]];
default: return false;
}
}

/**
* @brief Compares two values.
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
*/
template <typename T>
constexpr __device__ int32_t compare(T& v1, T& v2)
{
return (v1 > v2) - (v1 < v2);
}

/**
* @brief Compares two statistics_val structs.
* @return < 0 if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2
*/
static __device__ int32_t compare_values(Type ptype,
ConvertedType ctype,
const statistics_val& v1,
const statistics_val& v2)
{
switch (ptype) {
case Type::BOOLEAN: return compare(v1.u_val, v2.u_val);
case Type::INT32:
case Type::INT64:
switch (ctype) {
case ConvertedType::UINT_8:
case ConvertedType::UINT_16:
case ConvertedType::UINT_32:
case ConvertedType::UINT_64: return compare(v1.u_val, v2.u_val);
default: // assume everything else is signed
return compare(v1.i_val, v2.i_val);
}
case Type::FLOAT:
case Type::DOUBLE: return compare(v1.fp_val, v2.fp_val);
case Type::BYTE_ARRAY: return static_cast<string_view>(v1.str_val).compare(v2.str_val);
case Type::FIXED_LEN_BYTE_ARRAY:
if (ctype == ConvertedType::DECIMAL) { return compare(v1.d128_val, v2.d128_val); }
}
// calling is_comparable() should prevent reaching here
CUDF_UNREACHABLE("Trying to compare non-comparable type");
return 0;
}

/**
* @brief Determine if a set of statstistics are in ascending order.
*/
static __device__ bool is_ascending(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
for (uint32_t i = 1; i < num_pages; i++) {
if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) > 0 ||
compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) > 0) {
return false;
}
}
return true;
}

/**
* @brief Determine if a set of statstistics are in descending order.
*/
static __device__ bool is_descending(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
for (uint32_t i = 1; i < num_pages; i++) {
if (compare_values(ptype, ctype, s[i - 1].min_value, s[i].min_value) < 0 ||
compare_values(ptype, ctype, s[i - 1].max_value, s[i].max_value) < 0) {
return false;
}
}
return true;
}

/**
* @brief Determine the ordering of a set of statistics.
*/
static __device__ int32_t calculate_boundary_order(const statistics_chunk* s,
Type ptype,
ConvertedType ctype,
uint32_t num_pages)
{
if (not is_comparable(ptype, ctype)) { return BoundaryOrder::UNORDERED; }
if (is_ascending(s, ptype, ctype, num_pages)) {
return BoundaryOrder::ASCENDING;
} else if (is_descending(s, ptype, ctype, num_pages)) {
return BoundaryOrder::DESCENDING;
}
return BoundaryOrder::UNORDERED;
}

// blockDim(1, 1, 1)
__global__ void __launch_bounds__(1)
gpuEncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats)
{
const void *vmin, *vmax;
uint32_t lmin, lmax;
uint8_t* col_idx_end;
unsigned char scratch[16];

if (column_stats.empty()) { return; }

EncColumnChunk* ck_g = &chunks[blockIdx.x];
uint32_t num_pages = ck_g->num_pages;
parquet_column_device_view col_g = *ck_g->col_desc;
size_t first_data_page = ck_g->use_dictionary ? 1 : 0;
uint32_t pageidx = ck_g->first_page;

header_encoder encoder(ck_g->column_index_blob);

// null_pages
encoder.field_list_begin(1, num_pages - first_data_page, ST_FLD_TRUE);
for (uint32_t page = first_data_page; page < num_pages; page++) {
encoder.put_bool(column_stats[pageidx + page].non_nulls == 0);
}
encoder.field_list_end(1);
// min_values
encoder.field_list_begin(2, num_pages - first_data_page, ST_FLD_BINARY);
for (uint32_t page = first_data_page; page < num_pages; page++) {
get_extremum(&column_stats[pageidx + page].min_value, col_g.stats_dtype, scratch, &vmin, &lmin);
encoder.put_binary(vmin, lmin);
}
encoder.field_list_end(2);
// max_values
encoder.field_list_begin(3, num_pages - first_data_page, ST_FLD_BINARY);
for (uint32_t page = first_data_page; page < num_pages; page++) {
get_extremum(&column_stats[pageidx + page].max_value, col_g.stats_dtype, scratch, &vmax, &lmax);
encoder.put_binary(vmax, lmax);
}
encoder.field_list_end(3);
// boundary_order
encoder.field_int32(4,
calculate_boundary_order(&column_stats[first_data_page + pageidx],
col_g.physical_type,
col_g.converted_type,
num_pages - first_data_page));
// null_counts
encoder.field_list_begin(5, num_pages - first_data_page, ST_FLD_I64);
for (uint32_t page = first_data_page; page < num_pages; page++) {
encoder.put_int64(column_stats[pageidx + page].null_count);
}
encoder.field_list_end(5);
encoder.end(&col_idx_end, false);

ck_g->column_index_size = static_cast<uint32_t>(col_idx_end - ck_g->column_index_blob);
}

/**
* @brief Functor to get definition level value for a nested struct column until the leaf level or
* the first list level.
Expand Down Expand Up @@ -2053,6 +2262,13 @@ void GatherPages(device_span<EncColumnChunk> chunks,
gpuGatherPages<<<chunks.size(), 1024, 0, stream.value()>>>(chunks, pages);
}

void EncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats,
rmm::cuda_stream_view stream)
{
gpuEncodeColumnIndexes<<<chunks.size(), 1, 0, stream.value()>>>(chunks, column_stats);
}

} // namespace gpu
} // namespace parquet
} // namespace io
Expand Down
17 changes: 15 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ struct ColumnChunkDesc {
* @brief Struct describing an encoder column
*/
struct parquet_column_device_view : stats_column_desc {
Type physical_type; //!< physical data type
uint8_t converted_type; //!< logical data type
Type physical_type; //!< physical data type
ConvertedType converted_type; //!< logical data type
uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble)
//!< levels
constexpr uint8_t num_def_level_bits() { return level_bits & 0xf; }
Expand Down Expand Up @@ -357,6 +357,8 @@ struct EncColumnChunk {
size_type* dict_index; //!< Index of value in dictionary page. column[dict_data[dict_index[row]]]
uint8_t dict_rle_bits; //!< Bit size for encoding dictionary indices
bool use_dictionary; //!< True if the chunk uses dictionary encoding
uint8_t* column_index_blob; //!< Binary blob containing encoded column index for this chunk
uint32_t column_index_size; //!< Size of column index blob
};

/**
Expand Down Expand Up @@ -634,6 +636,17 @@ void GatherPages(device_span<EncColumnChunk> chunks,
device_span<gpu::EncPage const> pages,
rmm::cuda_stream_view stream);

/**
* @brief Launches kernel to calculate ColumnIndex information per chunk
*
* @param[in,out] chunks Column chunks
* @param[in] column_stats Page-level statistics to be encoded
* @param[in] stream CUDA stream to use
*/
void EncodeColumnIndexes(device_span<EncColumnChunk> chunks,
device_span<statistics_chunk const> column_stats,
rmm::cuda_stream_view stream);

} // namespace gpu
} // namespace parquet
} // namespace io
Expand Down
Loading

0 comments on commit 96f747b

Please sign in to comment.