Skip to content

Commit

Permalink
Merge branch 'fea-json_spark_validation' of github.com:karthikeyann/c…
Browse files Browse the repository at this point in the history
…udf into fea-json_spark_validation
  • Loading branch information
revans2 committed Jun 28, 2024
2 parents 5640d4c + cd6a30f commit 3e4e073
Show file tree
Hide file tree
Showing 196 changed files with 1,944 additions and 1,194 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- docs-build
- wheel-build-cudf
- wheel-tests-cudf
- test-cudf-polars
- wheel-build-dask-cudf
- wheel-tests-dask-cudf
- devcontainer
Expand Down Expand Up @@ -132,6 +133,17 @@ jobs:
with:
build_type: pull-request
script: ci/test_wheel_cudf.sh
test-cudf-polars:
needs: wheel-build-cudf
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
# This selects "ARCH=amd64 + the latest supported Python + CUDA".
matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))]))
build_type: pull-request
# This always runs, but only fails if this PR touches code in
# pylibcudf or cudf_polars
script: "ci/test_cudf_polars.sh"
wheel-build-dask-cudf:
needs: wheel-build-cudf
secrets: inherit
Expand Down
68 changes: 68 additions & 0 deletions ci/test_cudf_polars.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/bin/bash
# Copyright (c) 2024, NVIDIA CORPORATION.

set -eou pipefail

# We will only fail these tests if the PR touches code in pylibcudf
# or cudf_polars itself.
# Note, the three dots mean we are doing diff between the merge-base
# of upstream and HEAD. So this is asking, "does _this branch_ touch
# files in cudf_polars/pylibcudf", rather than "are there changes
# between upstream and this branch which touch cudf_polars/pylibcudf"
# TODO: is the target branch exposed anywhere in an environment variable?
if [ -n "$(git diff --name-only origin/branch-24.08...HEAD -- python/cudf_polars/ python/cudf/cudf/_lib/pylibcudf/)" ];
then
HAS_CHANGES=1
else
HAS_CHANGES=0
fi

RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})"
RAPIDS_PY_WHEEL_NAME="cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 ./dist

RESULTS_DIR=${RAPIDS_TESTS_DIR:-"$(mktemp -d)"}
RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/
mkdir -p "${RAPIDS_TESTS_DIR}"

rapids-logger "Install cudf wheel"
# echo to expand wildcard before adding `[extra]` requires for pip
python -m pip install $(echo ./dist/cudf*.whl)[test]

rapids-logger "Install polars (allow pre-release versions)"
python -m pip install 'polars>=1.0.0a0'

rapids-logger "Install cudf_polars"
python -m pip install --no-deps python/cudf_polars

rapids-logger "Run cudf_polars tests"

function set_exitcode()
{
EXITCODE=$?
}
EXITCODE=0
trap set_exitcode ERR
set +e

python -m pytest \
--cache-clear \
--cov cudf_polars \
--cov-fail-under=100 \
--cov-config=python/cudf_polars/pyproject.toml \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cudf_polars.xml" \
python/cudf_polars/tests

trap ERR
set -e

if [ ${EXITCODE} != 0 ]; then
rapids-logger "Testing FAILED: exitcode ${EXITCODE}"
else
rapids-logger "Testing PASSED"
fi

if [ ${HAS_CHANGES} == 1 ]; then
exit ${EXITCODE}
else
exit 0
fi
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ add_library(
src/unary/math_ops.cu
src/unary/nan_ops.cu
src/unary/null_ops.cu
src/utilities/cuda_memcpy.cu
src/utilities/default_stream.cpp
src/utilities/linked_column.cpp
src/utilities/logger.cpp
Expand Down
5 changes: 2 additions & 3 deletions cpp/benchmarks/io/text/multibyte_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ static cudf::string_scalar create_random_input(int32_t num_chars,

// extract the chars from the returned strings column.
auto input_column_contents = input_column->release();
auto chars_column_contents = input_column_contents.children[1]->release();
auto chars_buffer = chars_column_contents.data.release();
auto chars_buffer = input_column_contents.data.release();

// turn the chars in to a string scalar.
return cudf::string_scalar(std::move(*chars_buffer));
Expand Down Expand Up @@ -218,7 +217,7 @@ NVBENCH_BENCH_TYPES(bench_multibyte_split,
NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list))
.set_name("multibyte_split_source")
.set_min_samples(4)
.add_int64_axis("strip_delimiters", {1})
.add_int64_axis("strip_delimiters", {0, 1})
.add_int64_axis("delim_size", {1})
.add_int64_axis("delim_percent", {1})
.add_int64_power_of_two_axis("size_approx", {15, 30})
Expand Down
53 changes: 53 additions & 0 deletions cpp/include/cudf/detail/utilities/cuda_memcpy.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <rmm/cuda_stream_view.hpp>

namespace cudf::detail {

enum class host_memory_kind : uint8_t { PINNED, PAGEABLE };

/**
* @brief Asynchronously copies data between the host and device.
*
* Implementation may use different strategies depending on the size and type of host data.
*
* @param dst Destination memory address
* @param src Source memory address
* @param size Number of bytes to copy
* @param kind Type of host memory
* @param stream CUDA stream used for the copy
*/
void cuda_memcpy_async(
void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream);

/**
* @brief Synchronously copies data between the host and device.
*
* Implementation may use different strategies depending on the size and type of host data.
*
* @param dst Destination memory address
* @param src Source memory address
* @param size Number of bytes to copy
* @param kind Type of host memory
* @param stream CUDA stream used for the copy
*/
void cuda_memcpy(
void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream);

} // namespace cudf::detail
6 changes: 5 additions & 1 deletion cpp/include/cudf/io/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ class json_reader_options {
*
* @return true if leading zeros are allowed in numeric values
*/
[[nodiscard]] bool is_allowed_numeric_leading_zeros() const { return _allow_numeric_leading_zeros; }
[[nodiscard]] bool is_allowed_numeric_leading_zeros() const
{
return _allow_numeric_leading_zeros;
}

/**
* @brief Whether unquoted number values should be allowed NaN, +INF, -INF, +Infinity, Infinity, and
Expand Down Expand Up @@ -745,6 +748,7 @@ class json_reader_options_builder {
* be enabled for this to have any effect.
*
* @param val Boolean value to indicate whether leading zeros are allowed in numeric values
* @return this for chaining
*/
json_reader_options_builder& numeric_leading_zeros(bool val)
{
Expand Down
16 changes: 16 additions & 0 deletions cpp/include/cudf/utilities/pinned_memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,20 @@ struct pinned_mr_options {
*/
bool config_default_pinned_memory_resource(pinned_mr_options const& opts);

/**
* @brief Set the threshold size for using kernels for pinned memory copies.
*
* @param threshold The threshold size in bytes. If the size of the copy is less than this
* threshold, the copy will be done using kernels. If the size is greater than or equal to this
* threshold, the copy will be done using cudaMemcpyAsync.
*/
void set_kernel_pinned_copy_threshold(size_t threshold);

/**
* @brief Get the threshold size for using kernels for pinned memory copies.
*
* @return The threshold size in bytes.
*/
size_t get_kernel_pinned_copy_threshold();

} // namespace cudf
7 changes: 6 additions & 1 deletion cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,12 @@ auto make_chars_and_offsets(StringsIterator begin, StringsIterator end, Validity
for (auto str = begin; str < end; ++str) {
std::string tmp = (*v++) ? std::string(*str) : std::string{};
chars.insert(chars.end(), std::cbegin(tmp), std::cend(tmp));
offsets.push_back(offsets.back() + tmp.length());
auto const last_offset = static_cast<std::size_t>(offsets.back());
auto const next_offset = last_offset + tmp.length();
CUDF_EXPECTS(
next_offset < static_cast<std::size_t>(std::numeric_limits<cudf::size_type>::max()),
"Cannot use strings_column_wrapper to build a large strings column");
offsets.push_back(static_cast<cudf::size_type>(next_offset));
}
return std::pair(std::move(chars), std::move(offsets));
};
Expand Down
13 changes: 6 additions & 7 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,18 @@ std::pair<rmm::device_uvector<PdaTokenT>, rmm::device_uvector<SymbolOffsetT>> pr

/**
* @brief Validate the tokens conforming to behavior given in options.
*
*
* @param d_input The string of input characters
* @param tokens The tokens to be post-processed
* @param token_indices The tokens' corresponding indices that are post-processed
* @param options Parsing options specifying the parsing behaviour
* @param stream The cuda stream to dispatch GPU kernels to
*/
void validate_token_stream(
device_span<char const> d_input,
device_span<PdaTokenT> tokens,
device_span<SymbolOffsetT> token_indices,
cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream);
void validate_token_stream(device_span<char const> d_input,
device_span<PdaTokenT> tokens,
device_span<SymbolOffsetT> token_indices,
cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream);

/**
* @brief Parses the given JSON string and generates a tree representation of the given input.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -2082,7 +2082,7 @@ cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& opt
parse_opts.trie_false = cudf::detail::create_serialized_trie({"false"}, stream);
std::vector<std::string> na_values{"", "null"};
na_values.insert(na_values.end(), options.get_na_values().begin(), options.get_na_values().end());
parse_opts.trie_na = cudf::detail::create_serialized_trie(na_values, stream);
parse_opts.trie_na = cudf::detail::create_serialized_trie(na_values, stream);
return parse_opts;
}

Expand Down
7 changes: 3 additions & 4 deletions cpp/src/io/json/output_writer_iterator.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,8 +46,7 @@ class output_writer_iterator_proxy {
// type_traits to enable its use with algorithms.
template <class BinaryFunction, class Iterator>
struct is_proxy_reference<output_writer_iterator_proxy<BinaryFunction, Iterator>>
: public thrust::detail::true_type {
};
: public thrust::detail::true_type {};

} // namespace detail

Expand Down Expand Up @@ -137,4 +136,4 @@ make_output_writer_iterator(Iterator out, BinaryFunction fun)
{
return output_writer_iterator<BinaryFunction, Iterator>(out, fun);
} // end make_output_writer_iterator
THRUST_NAMESPACE_END
THRUST_NAMESPACE_END
13 changes: 8 additions & 5 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ int64_t find_next_split(int64_t cur_pos,
size_t cur_row_index,
size_t cur_cumulative_size,
cudf::host_span<cumulative_page_info const> sizes,
size_t size_limit)
size_t size_limit,
size_t min_row_count)
{
auto const start = thrust::make_transform_iterator(
sizes.begin(),
Expand All @@ -357,7 +358,7 @@ int64_t find_next_split(int64_t cur_pos,
// this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos
// is, we will still move forward instead of failing.
while (split_pos < (static_cast<int64_t>(sizes.size()) - 1) &&
(sizes[split_pos].end_row_index == cur_row_index)) {
(sizes[split_pos].end_row_index - cur_row_index < min_row_count)) {
split_pos++;
}

Expand Down Expand Up @@ -657,8 +658,10 @@ std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
auto const start_index = find_start_index(h_aggregated_info, start_row);
auto const cumulative_size =
start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes;
// when choosing subpasses, we need to guarantee at least 2 rows in the included pages so that all
// list columns have a clear start and end.
auto const end_index =
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit);
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit, 2);
auto const end_row = h_aggregated_info[end_index].end_row_index;

// for each column, collect the set of pages that spans start_row / end_row
Expand Down Expand Up @@ -703,8 +706,8 @@ std::vector<row_range> compute_page_splits_by_row(device_span<cumulative_page_in
size_t cur_cumulative_size = 0;
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().end_row_index);
while (cur_row_index < max_row) {
auto const split_pos =
find_next_split(cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit);
auto const split_pos = find_next_split(
cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit, 1);

auto const start_row = cur_row_index;
cur_row_index = min(max_row, h_aggregated_info[split_pos].end_row_index);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,8 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
// subpass since we know that will safely completed.
bool const is_list = chunk.max_level[level_type::REPETITION] > 0;
if (is_list && max_col_row < last_pass_row) {
size_t const min_col_row = static_cast<size_t>(chunk.start_row + last_page.chunk_row);
auto const& first_page = subpass.pages[page_index];
size_t const min_col_row = static_cast<size_t>(chunk.start_row + first_page.chunk_row);
CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass");
max_col_row--;
}
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/io/text/data_chunk_source_factories.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ class istream_data_chunk_reader : public data_chunk_reader {
{
}

void skip_bytes(std::size_t size) override { _datastream->ignore(size); };
void skip_bytes(std::size_t size) override
{
// 20% faster than _datastream->ignore(size) for large files
_datastream->seekg(_datastream->tellg() + static_cast<std::ifstream::pos_type>(size));
};

std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
Expand Down Expand Up @@ -265,7 +269,7 @@ class file_data_chunk_source : public data_chunk_source {
[[nodiscard]] std::unique_ptr<data_chunk_reader> create_reader() const override
{
return std::make_unique<istream_data_chunk_reader>(
std::make_unique<std::ifstream>(_filename, std::ifstream::in));
std::make_unique<std::ifstream>(_filename, std::ifstream::in | std::ifstream::binary));
}

private:
Expand Down
Loading

0 comments on commit 3e4e073

Please sign in to comment.