Skip to content

Commit

Permalink
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into pylibcud…
Browse files Browse the repository at this point in the history
…f-io-writers
  • Loading branch information
lithomas1 committed Jun 27, 2024
2 parents aff6178 + 5d49fe6 commit c5a3fbe
Show file tree
Hide file tree
Showing 63 changed files with 1,273 additions and 549 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- docs-build
- wheel-build-cudf
- wheel-tests-cudf
- test-cudf-polars
- wheel-build-dask-cudf
- wheel-tests-dask-cudf
- devcontainer
Expand Down Expand Up @@ -132,6 +133,17 @@ jobs:
with:
build_type: pull-request
script: ci/test_wheel_cudf.sh
test-cudf-polars:
needs: wheel-build-cudf
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: pull-request
# This always runs, but only fails if this PR touches code in
# pylibcudf or cudf_polars
script: "ci/test_cudf_polars.sh"
wheel-build-dask-cudf:
needs: wheel-build-cudf
secrets: inherit
Expand Down
68 changes: 68 additions & 0 deletions ci/test_cudf_polars.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/bin/bash
# Copyright (c) 2024, NVIDIA CORPORATION.

set -eou pipefail

# We will only fail these tests if the PR touches code in pylibcudf
# or cudf_polars itself.
# Note, the three dots mean we are doing diff between the merge-base
# of upstream and HEAD. So this is asking, "does _this branch_ touch
# files in cudf_polars/pylibcudf", rather than "are there changes
# between upstream and this branch which touch cudf_polars/pylibcudf"
# TODO: is the target branch exposed anywhere in an environment variable?
if [ -n "$(git diff --name-only origin/branch-24.08...HEAD -- python/cudf_polars/ python/cudf/cudf/_lib/pylibcudf/)" ];
then
HAS_CHANGES=1
else
HAS_CHANGES=0
fi

RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"
RAPIDS_PY_WHEEL_NAME="cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 ./dist

RESULTS_DIR=${RAPIDS_TESTS_DIR:-"$(mktemp -d)"}
RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/
mkdir -p "${RAPIDS_TESTS_DIR}"

rapids-logger "Install cudf wheel"
# echo to expand wildcard before adding `[extra]` requires for pip
python -m pip install $(echo ./dist/cudf*.whl)[test]

rapids-logger "Install polars (allow pre-release versions)"
python -m pip install 'polars>=1.0.0a0'

rapids-logger "Install cudf_polars"
python -m pip install --no-deps python/cudf_polars

rapids-logger "Run cudf_polars tests"

function set_exitcode()
{
EXITCODE=$?
}
EXITCODE=0
trap set_exitcode ERR
set +e

python -m pytest \
--cache-clear \
--cov cudf_polars \
--cov-fail-under=100 \
--cov-config=python/cudf_polars/pyproject.toml \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cudf_polars.xml" \
python/cudf_polars/tests

trap ERR
set -e

if [ ${EXITCODE} != 0 ]; then
rapids-logger "Testing FAILED: exitcode ${EXITCODE}"
else
rapids-logger "Testing PASSED"
fi

if [ ${HAS_CHANGES} == 1 ]; then
exit ${EXITCODE}
else
exit 0
fi
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ add_library(
src/unary/math_ops.cu
src/unary/nan_ops.cu
src/unary/null_ops.cu
src/utilities/cuda_memcpy.cu
src/utilities/default_stream.cpp
src/utilities/linked_column.cpp
src/utilities/logger.cpp
Expand Down
53 changes: 53 additions & 0 deletions cpp/include/cudf/detail/utilities/cuda_memcpy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <rmm/cuda_stream_view.hpp>

namespace cudf::detail {

enum class host_memory_kind : uint8_t { PINNED, PAGEABLE };

/**
* @brief Asynchronously copies data between the host and device.
*
* Implementation may use different strategies depending on the size and type of host data.
*
* @param dst Destination memory address
* @param src Source memory address
* @param size Number of bytes to copy
* @param kind Type of host memory
* @param stream CUDA stream used for the copy
*/
void cuda_memcpy_async(
void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream);

/**
* @brief Synchronously copies data between the host and device.
*
* Implementation may use different strategies depending on the size and type of host data.
*
* @param dst Destination memory address
* @param src Source memory address
* @param size Number of bytes to copy
* @param kind Type of host memory
* @param stream CUDA stream used for the copy
*/
void cuda_memcpy(
void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream);

} // namespace cudf::detail
16 changes: 16 additions & 0 deletions cpp/include/cudf/utilities/pinned_memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,20 @@ struct pinned_mr_options {
*/
bool config_default_pinned_memory_resource(pinned_mr_options const& opts);

/**
* @brief Set the threshold size for using kernels for pinned memory copies.
*
* @param threshold The threshold size in bytes. If the size of the copy is less than this
* threshold, the copy will be done using kernels. If the size is greater than or equal to this
* threshold, the copy will be done using cudaMemcpyAsync.
*/
void set_kernel_pinned_copy_threshold(size_t threshold);

/**
* @brief Get the threshold size for using kernels for pinned memory copies.
*
* @return The threshold size in bytes.
*/
size_t get_kernel_pinned_copy_threshold();

} // namespace cudf
7 changes: 6 additions & 1 deletion cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,12 @@ auto make_chars_and_offsets(StringsIterator begin, StringsIterator end, Validity
for (auto str = begin; str < end; ++str) {
std::string tmp = (*v++) ? std::string(*str) : std::string{};
chars.insert(chars.end(), std::cbegin(tmp), std::cend(tmp));
offsets.push_back(offsets.back() + tmp.length());
auto const last_offset = static_cast<std::size_t>(offsets.back());
auto const next_offset = last_offset + tmp.length();
CUDF_EXPECTS(
next_offset < static_cast<std::size_t>(std::numeric_limits<cudf::size_type>::max()),
"Cannot use strings_column_wrapper to build a large strings column");
offsets.push_back(static_cast<cudf::size_type>(next_offset));
}
return std::pair(std::move(chars), std::move(offsets));
};
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ int64_t find_next_split(int64_t cur_pos,
size_t cur_row_index,
size_t cur_cumulative_size,
cudf::host_span<cumulative_page_info const> sizes,
size_t size_limit)
size_t size_limit,
size_t min_row_count)
{
auto const start = thrust::make_transform_iterator(
sizes.begin(),
Expand All @@ -357,7 +358,7 @@ int64_t find_next_split(int64_t cur_pos,
// this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos
// is, we will still move forward instead of failing.
while (split_pos < (static_cast<int64_t>(sizes.size()) - 1) &&
(sizes[split_pos].end_row_index == cur_row_index)) {
(sizes[split_pos].end_row_index - cur_row_index < min_row_count)) {
split_pos++;
}

Expand Down Expand Up @@ -657,8 +658,10 @@ std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
auto const start_index = find_start_index(h_aggregated_info, start_row);
auto const cumulative_size =
start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes;
// when choosing subpasses, we need to guarantee at least 2 rows in the included pages so that all
// list columns have a clear start and end.
auto const end_index =
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit);
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit, 2);
auto const end_row = h_aggregated_info[end_index].end_row_index;

// for each column, collect the set of pages that spans start_row / end_row
Expand Down Expand Up @@ -703,8 +706,8 @@ std::vector<row_range> compute_page_splits_by_row(device_span<cumulative_page_in
size_t cur_cumulative_size = 0;
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().end_row_index);
while (cur_row_index < max_row) {
auto const split_pos =
find_next_split(cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit);
auto const split_pos = find_next_split(
cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit, 1);

auto const start_row = cur_row_index;
cur_row_index = min(max_row, h_aggregated_info[split_pos].end_row_index);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,8 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
// subpass since we know that will safely completed.
bool const is_list = chunk.max_level[level_type::REPETITION] > 0;
if (is_list && max_col_row < last_pass_row) {
size_t const min_col_row = static_cast<size_t>(chunk.start_row + last_page.chunk_row);
auto const& first_page = subpass.pages[page_index];
size_t const min_col_row = static_cast<size_t>(chunk.start_row + first_page.chunk_row);
CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass");
max_col_row--;
}
Expand Down
13 changes: 5 additions & 8 deletions cpp/src/io/utilities/hostdevice_vector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "hostdevice_span.hpp"

#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_vector.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/default_stream.hpp>
Expand Down Expand Up @@ -124,26 +125,22 @@ class hostdevice_vector {

void host_to_device_async(rmm::cuda_stream_view stream)
{
CUDF_CUDA_TRY(
cudaMemcpyAsync(device_ptr(), host_ptr(), size_bytes(), cudaMemcpyDefault, stream.value()));
cuda_memcpy_async(device_ptr(), host_ptr(), size_bytes(), host_memory_kind::PINNED, stream);
}

void host_to_device_sync(rmm::cuda_stream_view stream)
{
host_to_device_async(stream);
stream.synchronize();
cuda_memcpy(device_ptr(), host_ptr(), size_bytes(), host_memory_kind::PINNED, stream);
}

void device_to_host_async(rmm::cuda_stream_view stream)
{
CUDF_CUDA_TRY(
cudaMemcpyAsync(host_ptr(), device_ptr(), size_bytes(), cudaMemcpyDefault, stream.value()));
cuda_memcpy_async(host_ptr(), device_ptr(), size_bytes(), host_memory_kind::PINNED, stream);
}

void device_to_host_sync(rmm::cuda_stream_view stream)
{
device_to_host_async(stream);
stream.synchronize();
cuda_memcpy(host_ptr(), device_ptr(), size_bytes(), host_memory_kind::PINNED, stream);
}

/**
Expand Down
13 changes: 3 additions & 10 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
{
if (right.num_rows() == 0) {
switch (join_type) {
case join_kind::LEFT_ANTI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(left.num_rows(), stream, mr);
case join_kind::LEFT_ANTI_JOIN: return get_trivial_left_join_indices(left, stream, mr).first;
case join_kind::LEFT_SEMI_JOIN:
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
default: CUDF_FAIL("Invalid join kind."); break;
Expand Down Expand Up @@ -96,10 +95,6 @@ std::unique_ptr<rmm::device_uvector<size_type>> conditional_join_anti_semi(
join_size = size.value(stream);
}

if (left.num_rows() == 0) {
return std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr);
}

rmm::device_scalar<size_type> write_index(0, stream);

auto left_indices = std::make_unique<rmm::device_uvector<size_type>>(join_size, stream, mr);
Expand Down Expand Up @@ -149,8 +144,7 @@ conditional_join(table_view const& left,
// with a corresponding NULL from the right.
case join_kind::LEFT_JOIN:
case join_kind::LEFT_ANTI_JOIN:
case join_kind::FULL_JOIN:
return get_trivial_left_join_indices(left, stream, rmm::mr::get_current_device_resource());
case join_kind::FULL_JOIN: return get_trivial_left_join_indices(left, stream, mr);
// Inner and left semi joins return empty output because no matches can exist.
case join_kind::INNER_JOIN:
case join_kind::LEFT_SEMI_JOIN:
Expand All @@ -169,8 +163,7 @@ conditional_join(table_view const& left,
std::make_unique<rmm::device_uvector<size_type>>(0, stream, mr));
// Full joins need to return the trivial complement.
case join_kind::FULL_JOIN: {
auto ret_flipped =
get_trivial_left_join_indices(right, stream, rmm::mr::get_current_device_resource());
auto ret_flipped = get_trivial_left_join_indices(right, stream, mr);
return std::pair(std::move(ret_flipped.second), std::move(ret_flipped.first));
}
default: CUDF_FAIL("Invalid join kind."); break;
Expand Down
Loading

0 comments on commit c5a3fbe

Please sign in to comment.