Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.04' into rm/pandas_cre
Browse files Browse the repository at this point in the history
  • Loading branch information
mroeschke committed Jan 31, 2024
2 parents d3ba9ab + bb59715 commit 86a5f0f
Show file tree
Hide file tree
Showing 125 changed files with 5,214 additions and 5,696 deletions.
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ repos:
entry: '(category=|\s)DeprecationWarning[,)]'
language: pygrep
types_or: [python, cython]
# We need to exclude just the following file because few APIs still need
# DeprecationWarning: https://github.com/pandas-dev/pandas/issues/54970
exclude: |
(?x)^(
^python/cudf/cudf/core/dtypes.py
)
- id: no-programmatic-xfail
name: no-programmatic-xfail
description: 'Enforce that pytest.xfail is not introduced (see dev docs for details)'
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ dependencies:
- ninja
- notebook
- numba>=0.57
- numpy>=1.21,<1.25
- numpy>=1.21
- numpydoc
- nvcc_linux-64=11.8
- nvcomp==3.0.5
- nvtx>=0.2.1
- packaging
- pandas>=1.3,<1.6.0dev0
- pandas>=2.0,<2.1.5dev0
- pandoc
- pip
- pre-commit
Expand Down
4 changes: 2 additions & 2 deletions conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ dependencies:
- ninja
- notebook
- numba>=0.57
- numpy>=1.21,<1.25
- numpy>=1.21
- numpydoc
- nvcomp==3.0.5
- nvtx>=0.2.1
- packaging
- pandas>=1.3,<1.6.0dev0
- pandas>=2.0,<2.1.5dev0
- pandoc
- pip
- pre-commit
Expand Down
5 changes: 2 additions & 3 deletions conda/recipes/cudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ requirements:
- {{ pin_compatible('protobuf', min_pin='x.x', max_pin='x') }}
- python
- typing_extensions >=4.0.0
- pandas >=1.3,<1.6.0dev0
- pandas >=2.0,<2.1.5dev0
- cupy >=12.0.0
# TODO: Pin to numba<0.58 until #14160 is resolved
- numba >=0.57,<0.58
# TODO: Pin to numpy<1.25 until cudf requires pandas 2
- numpy >=1.21,<1.25
- numpy >=1.21
- {{ pin_compatible('pyarrow', max_pin='x') }}
- libcudf ={{ version }}
- {{ pin_compatible('rmm', max_pin='x.x') }}
Expand Down
129 changes: 82 additions & 47 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,27 @@ __device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint64_t* buffer,
return packer.flush();
}

/**
* @brief Sets `s->cur` to point to the start of encoded page data.
*
* For V1 headers, this will be immediately after the repetition and definition level data. For V2,
* it will be at the next properly aligned location after the level data. The padding in V2 is
* needed for compressors that require aligned input.
*/
template <typename state_type>
inline void __device__ set_page_data_start(state_type* s)
{
s->cur = s->page.page_data + s->page.max_hdr_size;
switch (s->page.page_type) {
case PageType::DATA_PAGE:
s->cur += s->page.level_bytes();
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
break;
case PageType::DATA_PAGE_V2: s->cur += s->page.max_lvl_size; break;
}
}

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -594,8 +615,13 @@ CUDF_KERNEL void __launch_bounds__(128)
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.hdr_size = 0;
page_g.def_lvl_bytes = 0;
page_g.rep_lvl_bytes = 0;
page_g.max_lvl_size = 0;
page_g.comp_data_size = 0;
page_g.max_hdr_size = MAX_V1_HDR_SIZE;
page_g.max_data_size = ck_g.uniq_data_size;
page_g.data_size = ck_g.uniq_data_size;
page_g.start_row = cur_row;
page_g.num_rows = ck_g.num_dict_entries;
page_g.num_leaf_values = ck_g.num_dict_entries;
Expand Down Expand Up @@ -689,12 +715,17 @@ CUDF_KERNEL void __launch_bounds__(128)
page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page);
}
if (!t) {
page_g.num_fragments = fragments_in_chunk - page_start;
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.page_type = data_page_type;
page_g.hdr_size = 0;
page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics
page_g.num_fragments = fragments_in_chunk - page_start;
page_g.chunk = &chunks[blockIdx.y][blockIdx.x];
page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x;
page_g.page_type = data_page_type;
page_g.hdr_size = 0;
page_g.def_lvl_bytes = 0;
page_g.rep_lvl_bytes = 0;
page_g.max_lvl_size = 0;
page_g.data_size = 0;
page_g.comp_data_size = 0;
page_g.max_hdr_size = max_data_page_hdr_size; // Max size excluding statistics
if (ck_g.stats) {
uint32_t stats_hdr_len = 16;
if (col_g.stats_dtype == dtype_string || col_g.stats_dtype == dtype_byte_array) {
Expand All @@ -716,13 +747,19 @@ CUDF_KERNEL void __launch_bounds__(128)
page_g.num_valid = num_valid;
auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page);
auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page);
if (write_v2_headers) {
page_g.max_lvl_size =
util::round_up_unsafe(def_level_size + rep_level_size, page_align);
}
// get a different bound if using delta encoding
if (is_use_delta) {
auto const delta_len =
delta_data_len(physical_type, type_id, page_g.num_leaf_values, page_size);
page_size = max(page_size, delta_len);
}
auto const max_data_size = page_size + def_level_size + rep_level_size + rle_pad;
auto const max_data_size =
page_size + rle_pad +
(write_v2_headers ? page_g.max_lvl_size : def_level_size + rep_level_size);
// page size must fit in 32-bit signed integer
if (max_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page size exceeds maximum for i32");
Expand All @@ -739,7 +776,9 @@ CUDF_KERNEL void __launch_bounds__(128)
page_offset +=
util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align);
if (not comp_page_sizes.empty()) {
comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages];
// V2 does not include level data in compressed size estimate
comp_page_offset += page_g.max_hdr_size + page_g.max_lvl_size +
comp_page_sizes[ck_g.first_page + num_pages];
}
page_headers_size += page_g.max_hdr_size;
max_page_data_size = max(max_page_data_size, page_g.max_data_size);
Expand Down Expand Up @@ -774,8 +813,10 @@ CUDF_KERNEL void __launch_bounds__(128)
}
pages[ck_g.first_page + num_pages] = page_g;
}
// page_sizes should be the number of bytes to be compressed, so don't include level
// data for V2.
if (not page_sizes.empty()) {
page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size;
page_sizes[ck_g.first_page + num_pages] = page_g.max_data_size - page_g.max_lvl_size;
}
if (page_grstats) { page_grstats[ck_g.first_page + num_pages] = pagestats_g; }
}
Expand Down Expand Up @@ -1429,10 +1470,6 @@ __device__ void finish_page_encode(state_buf* s,
return thrust::reduce(thrust::seq, hist_start, hist_end, 0U);
};

// V2 does not compress rep and def level data
size_t const skip_comp_size =
write_v2_headers ? s->page.def_lvl_bytes + s->page.rep_lvl_bytes : 0;

// this will be true if max_rep > 0 (i.e. there are lists)
if (s->page.rep_histogram != nullptr) {
// for repetition we get hist[0] from num_rows, and can derive hist[max_rep_level]
Expand Down Expand Up @@ -1489,10 +1526,17 @@ __device__ void finish_page_encode(state_buf* s,
// FIXME(ets): this needs to do error propagation back to the host
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
if (s->page.is_v2()) {
auto const d_base = base + s->page.max_lvl_size;
s->page.data_size = static_cast<uint32_t>(end_ptr - d_base) + s->page.level_bytes();
} else {
s->page.data_size = actual_data_size;
}
if (not comp_in.empty()) {
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size,
auto const c_base = base + s->page.max_lvl_size;
auto const bytes_to_compress = static_cast<uint32_t>(end_ptr - c_base);
comp_in[blockIdx.x] = {c_base, bytes_to_compress};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + s->page.max_lvl_size,
0}; // size is unused
}
pages[blockIdx.x] = s->page;
Expand All @@ -1503,10 +1547,10 @@ __device__ void finish_page_encode(state_buf* s,
}

// copy uncompressed bytes over
if (skip_comp_size != 0 && not comp_in.empty()) {
if (s->page.is_v2() and not comp_in.empty()) {
uint8_t* const src = s->page.page_data + s->page.max_hdr_size;
uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size;
for (int i = t; i < skip_comp_size; i += block_size) {
for (int i = t; i < s->page.level_bytes(); i += block_size) {
dst[i] = src[i];
}
}
Expand Down Expand Up @@ -1536,13 +1580,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
// if V1 data page, need space for the RLE length fields
if (s->page.page_type == PageType::DATA_PAGE) {
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
}
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -1771,13 +1809,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
// if V1 data page, need space for the RLE length fields
if (s->page.page_type == PageType::DATA_PAGE) {
if (s->col.num_def_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
if (s->col.num_rep_level_bits() != 0) { s->cur += RLE_LENGTH_FIELD_LEN; }
}
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -1908,8 +1940,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -2017,8 +2048,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8)
s->col = *s->ck.col_desc;
s->rle_len_pos = nullptr;
// get s->cur back to where it was at the end of encoding the rep and def level data
s->cur =
s->page.page_data + s->page.max_hdr_size + s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
set_page_data_start(s);
}
__syncthreads();

Expand Down Expand Up @@ -2142,11 +2172,10 @@ CUDF_KERNEL void __launch_bounds__(decide_compression_block_size)
auto const num_pages = ck_g[warp_id].num_pages;
for (auto page_id = lane_id; page_id < num_pages; page_id += cudf::detail::warp_size) {
auto const& curr_page = ck_g[warp_id].pages[page_id];
auto const page_data_size = curr_page.max_data_size;
auto const is_v2 = curr_page.page_type == PageType::DATA_PAGE_V2;
auto const lvl_bytes = is_v2 ? curr_page.def_lvl_bytes + curr_page.rep_lvl_bytes : 0;
auto const page_data_size = curr_page.data_size;
uncompressed_data_size += page_data_size;
if (auto comp_res = curr_page.comp_res; comp_res != nullptr) {
auto const lvl_bytes = curr_page.is_v2() ? curr_page.level_bytes() : 0;
compressed_data_size += comp_res->bytes_written + lvl_bytes;
if (comp_res->status != compression_status::SUCCESS) {
atomicOr(&compression_error[warp_id], 1);
Expand Down Expand Up @@ -2614,14 +2643,13 @@ CUDF_KERNEL void __launch_bounds__(128)
EncodeStatistics(hdr_start, &chunk_stats[page_g.chunk_id], col_g.stats_dtype, scratch);
page_g.chunk->ck_stat_size = static_cast<uint32_t>(hdr_end - hdr_start);
}
uncompressed_page_size = page_g.max_data_size;
uncompressed_page_size = page_g.data_size;
if (ck_g.is_compressed) {
auto const is_v2 = page_g.page_type == PageType::DATA_PAGE_V2;
auto const lvl_bytes = is_v2 ? page_g.def_lvl_bytes + page_g.rep_lvl_bytes : 0;
auto const lvl_bytes = page_g.is_v2() ? page_g.level_bytes() : 0;
hdr_start = page_g.compressed_data;
compressed_page_size =
static_cast<uint32_t>(comp_results[blockIdx.x].bytes_written) + lvl_bytes;
page_g.max_data_size = compressed_page_size;
page_g.comp_data_size = compressed_page_size;
} else {
hdr_start = page_g.page_data;
compressed_page_size = uncompressed_page_size;
Expand Down Expand Up @@ -2708,19 +2736,26 @@ CUDF_KERNEL void __launch_bounds__(1024)
if (t == 0) { page_g = first_page[page]; }
__syncthreads();

src = (ck_g.is_compressed) ? page_g.compressed_data : page_g.page_data;
src = ck_g.is_compressed ? page_g.compressed_data : page_g.page_data;
// Copy page header
hdr_len = page_g.hdr_size;
memcpy_block<1024, true>(dst, src, hdr_len, t);
src += page_g.max_hdr_size;
dst += hdr_len;
// Copy page data
uncompressed_size += hdr_len;
data_len = page_g.max_data_size;
data_len = ck_g.is_compressed ? page_g.comp_data_size : page_g.data_size;
// Copy page data. For V2, the level data and page data are disjoint.
if (page_g.is_v2()) {
auto const lvl_len = page_g.level_bytes();
memcpy_block<1024, true>(dst, src, lvl_len, t);
src += page_g.max_lvl_size;
dst += lvl_len;
data_len -= lvl_len;
}
memcpy_block<1024, true>(dst, src, data_len, t);
dst += data_len;
__syncthreads();
if (!t && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
if (t == 0 && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; }
}
if (t == 0) {
chunks[blockIdx.x].bfr_size = uncompressed_size;
Expand Down
43 changes: 27 additions & 16 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,30 +560,41 @@ struct EncColumnChunk {
* @brief Struct describing an encoder data page
*/
struct EncPage {
uint8_t* page_data; //!< Ptr to uncompressed page
uint8_t* compressed_data; //!< Ptr to compressed page
uint16_t num_fragments; //!< Number of fragments in page
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
EncColumnChunk* chunk; //!< Chunk that this page belongs to
// all pointers at the top to keep things properly aligned
uint8_t* page_data; //!< Ptr to uncompressed page
uint8_t* compressed_data; //!< Ptr to compressed page
EncColumnChunk* chunk; //!< Chunk that this page belongs to
compression_result* comp_res; //!< Ptr to compression result
uint32_t* def_histogram; //!< Histogram of counts for each definition level
uint32_t* rep_histogram; //!< Histogram of counts for each repetition level
// put this here in case it's ever made 64-bit
encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run
// the rest can be 4 byte aligned
uint32_t chunk_id; //!< Index in chunk array
uint32_t hdr_size; //!< Size of page header
uint32_t hdr_size; //!< Actual size of encoded page header
uint32_t max_hdr_size; //!< Maximum size of page header
uint32_t max_data_size; //!< Maximum size of coded page data (excluding header)
uint32_t max_data_size; //!< Maximum size of encoded page data (excluding header)
uint32_t data_size; //!< Actual size of encoded page data (includes level data)
uint32_t comp_data_size; //!< Actual size of compressed page data
uint32_t start_row; //!< First row of page
uint32_t num_rows; //!< Rows in page
uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types
uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in
//!< non-leaf levels
uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data (V2 only)
uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data (V2 only)
compression_result* comp_res; //!< Ptr to compression result
uint32_t num_nulls; //!< Number of null values (V2 only) (down here for alignment)
encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run
uint32_t* def_histogram; //!< Histogram of counts for each definition level
uint32_t* rep_histogram; //!< Histogram of counts for each repetition level
uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only)
uint32_t def_lvl_bytes; //!< Number of bytes of encoded definition level data
uint32_t rep_lvl_bytes; //!< Number of bytes of encoded repetition level data
uint32_t max_lvl_size; //!< Maximum size of level data (V2 only, 0 for V1)
uint32_t num_nulls; //!< Number of null values
uint32_t num_valid; //!< Number of valid leaf values
uint32_t var_bytes_size; //!< Number of variable length bytes in the page (byte arrays only)
// enums and smaller stuff down here
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
uint16_t num_fragments; //!< Number of fragments in page

constexpr bool is_v2() const { return page_type == PageType::DATA_PAGE_V2; }

constexpr auto level_bytes() const { return def_lvl_bytes + rep_lvl_bytes; }
};

/**
Expand Down
Loading

0 comments on commit 86a5f0f

Please sign in to comment.