Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.04' into p22/test_ro…
Browse files Browse the repository at this point in the history
…lling
  • Loading branch information
mroeschke committed Feb 14, 2024
2 parents 336d646 + 82d1772 commit 02403c8
Show file tree
Hide file tree
Showing 24 changed files with 397 additions and 80 deletions.
1 change: 1 addition & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ ConfigureNVBench(
string/case.cpp
string/char_types.cpp
string/contains.cpp
string/copy_if_else.cpp
string/count.cpp
string/extract.cpp
string/gather.cpp
Expand Down
62 changes: 62 additions & 0 deletions cpp/benchmarks/string/copy_if_else.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>

#include <cudf/copying.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <nvbench/nvbench.cuh>

static void bench_copy(nvbench::state& state)
{
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const row_width = static_cast<cudf::size_type>(state.get_int64("row_width"));

if (static_cast<std::size_t>(num_rows) * static_cast<std::size_t>(row_width) >=
static_cast<std::size_t>(std::numeric_limits<cudf::size_type>::max())) {
state.skip("Skip benchmarks greater than size_type limit");
}

data_profile const str_profile = data_profile_builder().distribution(
cudf::type_id::STRING, distribution_id::NORMAL, 0, row_width);
auto const source_table =
create_random_table({cudf::type_id::STRING}, row_count{num_rows}, str_profile);
auto const target_table =
create_random_table({cudf::type_id::STRING}, row_count{num_rows}, str_profile);
data_profile const bool_profile = data_profile_builder().no_validity();
auto const booleans =
create_random_table({cudf::type_id::BOOL8}, row_count{num_rows}, bool_profile);

auto const source = source_table->view().column(0);
auto const target = target_table->view().column(0);
auto const left_right = booleans->view().column(0);

state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
auto chars_size = cudf::strings_column_view(target).chars_size(cudf::get_default_stream());
state.add_global_memory_reads<nvbench::int8_t>(chars_size); // all bytes are read;
state.add_global_memory_writes<nvbench::int8_t>(chars_size); // both columns are similar size

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
[[maybe_unused]] auto result = cudf::copy_if_else(source, target, left_right);
});
}

NVBENCH_BENCH(bench_copy)
.set_name("copy_if_else")
.add_int64_axis("row_width", {32, 64, 128, 256, 512, 1024, 2048, 4096})
.add_int64_axis("num_rows", {4096, 32768, 262144, 2097152, 16777216});
15 changes: 15 additions & 0 deletions cpp/cmake/Modules/JitifyPreprocessKernels.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,18 @@ add_custom_target(
DEPENDS ${JIT_PREPROCESSED_FILES}
COMMENT "Target representing jitified files."
)

# when a user requests CMake to clean the build directory
#
# * `cmake --build <dir> --target clean`
# * `cmake --build <dir> --clean-first`
# * ninja clean
#
# We also remove the jitify2 program cache as well. This ensures that we don't keep older versions
# of the programs in cache
set(cache_path "$ENV{HOME}/.cudf")
if(ENV{LIBCUDF_KERNEL_CACHE_PATH})
set(cache_path "$ENV{LIBCUDF_KERNEL_CACHE_PATH}")
endif()
cmake_path(APPEND cache_path "${CUDF_VERSION}/")
set_target_properties(jitify_preprocess_run PROPERTIES ADDITIONAL_CLEAN_FILES "${cache_path}")
63 changes: 21 additions & 42 deletions cpp/include/cudf/strings/detail/copy_if_else.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@
#pragma once

#include <cudf/column/column.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/valid_if.cuh>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/strings/detail/strings_column_factories.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/optional.h>
#include <thrust/transform.h>

#include <cuda/functional>

Expand Down Expand Up @@ -65,55 +63,36 @@ std::unique_ptr<cudf::column> copy_if_else(StringIterLeft lhs_begin,
rmm::mr::device_memory_resource* mr)
{
auto strings_count = std::distance(lhs_begin, lhs_end);
if (strings_count == 0) return make_empty_column(type_id::STRING);
if (strings_count == 0) { return make_empty_column(type_id::STRING); }

// create null mask
auto valid_mask = cudf::detail::valid_if(
auto [null_mask, null_count] = cudf::detail::valid_if(
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(strings_count),
[lhs_begin, rhs_begin, filter_fn] __device__(size_type idx) {
return filter_fn(idx) ? lhs_begin[idx].has_value() : rhs_begin[idx].has_value();
},
stream,
mr);
size_type null_count = valid_mask.second;
auto null_mask = (null_count > 0) ? std::move(valid_mask.first) : rmm::device_buffer{};
if (null_count == 0) { null_mask = rmm::device_buffer{}; }

// build offsets column
auto offsets_transformer = cuda::proclaim_return_type<size_type>(
[lhs_begin, rhs_begin, filter_fn] __device__(size_type idx) {
auto const result = filter_fn(idx) ? lhs_begin[idx] : rhs_begin[idx];
return result.has_value() ? result->size_bytes() : 0;
});

auto offsets_transformer_itr = thrust::make_transform_iterator(
thrust::make_counting_iterator<size_type>(0), offsets_transformer);
auto [offsets_column, bytes] = cudf::detail::make_offsets_child_column(
offsets_transformer_itr, offsets_transformer_itr + strings_count, stream, mr);
auto d_offsets = offsets_column->view().template data<int32_t>();
// build vector of strings
rmm::device_uvector<string_index_pair> indices(strings_count, stream);
thrust::transform(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(strings_count),
indices.begin(),
[lhs_begin, rhs_begin, filter_fn] __device__(size_type idx) {
auto const result = filter_fn(idx) ? lhs_begin[idx] : rhs_begin[idx];
auto const d_str = result.has_value() ? *result : string_view{"", 0};
return string_index_pair{d_str.data(), d_str.size_bytes()};
});

// build chars column
auto chars_column = create_chars_child_column(bytes, stream, mr);
auto d_chars = chars_column->mutable_view().template data<char>();
// fill in chars
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
strings_count,
[lhs_begin, rhs_begin, filter_fn, d_offsets, d_chars] __device__(size_type idx) {
auto const result = filter_fn(idx) ? lhs_begin[idx] : rhs_begin[idx];
if (!result.has_value()) return;
auto const d_str = *result;
memcpy(d_chars + d_offsets[idx], d_str.data(), d_str.size_bytes());
});

return make_strings_column(strings_count,
std::move(offsets_column),
std::move(chars_column->release().data.release()[0]),
null_count,
std::move(null_mask));
// convert vector into strings column
auto result = make_strings_column(indices.begin(), indices.end(), stream, mr);
result->set_null_mask(std::move(null_mask), null_count);
return result;
}

} // namespace detail
} // namespace strings
} // namespace cudf
50 changes: 50 additions & 0 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cudf/utilities/error.hpp>
#include <io/utilities/config_utils.hpp>

#include <nvcomp/lz4.h>
#include <nvcomp/snappy.h>

#include <mutex>
Expand Down Expand Up @@ -65,6 +66,8 @@ std::optional<nvcompStatus_t> batched_decompress_get_temp_size_ex(compression_ty
#else
return std::nullopt;
#endif
case compression_type::LZ4:
return nvcompBatchedLZ4DecompressGetTempSizeEx(std::forward<Args>(args)...);
case compression_type::DEFLATE: [[fallthrough]];
default: return std::nullopt;
}
Expand Down Expand Up @@ -93,6 +96,8 @@ auto batched_decompress_get_temp_size(compression_type compression, Args&&... ar
CUDF_FAIL("Decompression error: " +
nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE).value());
#endif
case compression_type::LZ4:
return nvcompBatchedLZ4DecompressGetTempSize(std::forward<Args>(args)...);
default: CUDF_FAIL("Unsupported compression type");
}
}
Expand All @@ -118,6 +123,7 @@ auto batched_decompress_async(compression_type compression, Args&&... args)
CUDF_FAIL("Decompression error: " +
nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE).value());
#endif
case compression_type::LZ4: return nvcompBatchedLZ4DecompressAsync(std::forward<Args>(args)...);
default: CUDF_FAIL("Unsupported compression type");
}
}
Expand All @@ -128,6 +134,7 @@ std::string compression_type_name(compression_type compression)
case compression_type::SNAPPY: return "Snappy";
case compression_type::ZSTD: return "Zstandard";
case compression_type::DEFLATE: return "Deflate";
case compression_type::LZ4: return "LZ4";
}
return "compression_type(" + std::to_string(static_cast<int>(compression)) + ")";
}
Expand Down Expand Up @@ -217,6 +224,10 @@ auto batched_compress_get_temp_size(compression_type compression,
CUDF_FAIL("Compression error: " +
nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD).value());
#endif
case compression_type::LZ4:
nvcomp_status = nvcompBatchedLZ4CompressGetTempSize(
batch_size, max_uncompressed_chunk_bytes, nvcompBatchedLZ4DefaultOpts, &temp_size);
break;
default: CUDF_FAIL("Unsupported compression type");
}

Expand Down Expand Up @@ -256,6 +267,13 @@ auto batched_compress_get_temp_size_ex(compression_type compression,
&temp_size,
max_total_uncompressed_bytes);
break;
case compression_type::LZ4:
nvcomp_status = nvcompBatchedLZ4CompressGetTempSizeEx(batch_size,
max_uncompressed_chunk_bytes,
nvcompBatchedLZ4DefaultOpts,
&temp_size,
max_total_uncompressed_bytes);
break;
default: CUDF_FAIL("Unsupported compression type");
}

Expand Down Expand Up @@ -317,6 +335,10 @@ size_t compress_max_output_chunk_size(compression_type compression,
CUDF_FAIL("Compression error: " +
nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD).value());
#endif
case compression_type::LZ4:
status = nvcompBatchedLZ4CompressGetMaxOutputChunkSize(
capped_uncomp_bytes, nvcompBatchedLZ4DefaultOpts, &max_comp_chunk_size);
break;
default: CUDF_FAIL("Unsupported compression type");
}

Expand Down Expand Up @@ -385,6 +407,18 @@ static void batched_compress_async(compression_type compression,
CUDF_FAIL("Compression error: " +
nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD).value());
#endif
case compression_type::LZ4:
nvcomp_status = nvcompBatchedLZ4CompressAsync(device_uncompressed_ptrs,
device_uncompressed_bytes,
max_uncompressed_chunk_bytes,
batch_size,
device_temp_ptr,
temp_bytes,
device_compressed_ptrs,
device_compressed_bytes,
nvcompBatchedLZ4DefaultOpts,
stream.value());
break;
default: CUDF_FAIL("Unsupported compression type");
}
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in compression");
Expand Down Expand Up @@ -494,6 +528,12 @@ std::optional<std::string> is_compression_disabled_impl(compression_type compres
}
return std::nullopt;
}
case compression_type::LZ4:
if (not params.are_stable_integrations_enabled) {
return "LZ4 compression has been disabled through the `LIBCUDF_NVCOMP_POLICY` "
"environment variable.";
}
return std::nullopt;
default: return "Unsupported compression type";
}
return "Unsupported compression type";
Expand Down Expand Up @@ -572,6 +612,13 @@ std::optional<std::string> is_decompression_disabled_impl(compression_type compr
return std::nullopt;
}
case compression_type::ZSTD: return is_zstd_decomp_disabled(params);
case compression_type::LZ4: {
if (not params.are_stable_integrations_enabled) {
return "LZ4 decompression has been disabled through the `LIBCUDF_NVCOMP_POLICY` "
"environment variable.";
}
return std::nullopt;
}
default: return "Unsupported compression type";
}
return "Unsupported compression type";
Expand Down Expand Up @@ -612,6 +659,7 @@ size_t compress_input_alignment_bits(compression_type compression)
case compression_type::DEFLATE: return 0;
case compression_type::SNAPPY: return 0;
case compression_type::ZSTD: return 2;
case compression_type::LZ4: return 2;
default: CUDF_FAIL("Unsupported compression type");
}
}
Expand All @@ -622,6 +670,7 @@ size_t compress_output_alignment_bits(compression_type compression)
case compression_type::DEFLATE: return 3;
case compression_type::SNAPPY: return 0;
case compression_type::ZSTD: return 0;
case compression_type::LZ4: return 2;
default: CUDF_FAIL("Unsupported compression type");
}
}
Expand All @@ -638,6 +687,7 @@ std::optional<size_t> compress_max_allowed_chunk_size(compression_type compressi
CUDF_FAIL("Compression error: " +
nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD).value());
#endif
case compression_type::LZ4: return 16 * 1024 * 1024;
default: return std::nullopt;
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

namespace cudf::io::nvcomp {

enum class compression_type { SNAPPY, ZSTD, DEFLATE };
enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4 };

/**
* @brief Set of parameters that impact whether the use nvCOMP features is enabled.
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/orc/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,19 @@ rmm::device_buffer decompress_stripe_data(
total_decomp_size,
stream);
break;
case compression_type::LZ4:
if (auto const reason = nvcomp::is_decompression_disabled(nvcomp::compression_type::LZ4);
reason) {
CUDF_FAIL("Decompression error: " + reason.value());
}
nvcomp::batched_decompress(nvcomp::compression_type::LZ4,
inflate_in_view,
inflate_out_view,
inflate_res,
max_uncomp_block_size,
total_decomp_size,
stream);
break;
default: CUDF_FAIL("Unexpected decompression dispatch"); break;
}

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,12 @@ std::optional<writer_compression_statistics> CompressOrcDataStreams(
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);
} else if (compression == LZ4) {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::LZ4);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::LZ4, comp_in, comp_out, comp_res, stream);
} else if (compression != NONE) {
CUDF_FAIL("Unsupported compression type");
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ auto to_nvcomp_compression_type(CompressionKind compression_kind)
if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY;
if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE;
if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD;
if (compression_kind == LZ4) return nvcomp::compression_type::LZ4;
CUDF_FAIL("Unsupported compression type");
}

Expand All @@ -111,6 +112,7 @@ orc::CompressionKind to_orc_compression(compression_type compression)
case compression_type::SNAPPY: return orc::CompressionKind::SNAPPY;
case compression_type::ZLIB: return orc::CompressionKind::ZLIB;
case compression_type::ZSTD: return orc::CompressionKind::ZSTD;
case compression_type::LZ4: return orc::CompressionKind::LZ4;
case compression_type::NONE: return orc::CompressionKind::NONE;
default: CUDF_FAIL("Unsupported compression type");
}
Expand Down
Loading

0 comments on commit 02403c8

Please sign in to comment.