Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace default stream for scalars and column factories usages (because of defaulted arguments) #14354

Merged
merged 13 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/benchmarks/stream_compaction/unique.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void nvbench_unique(nvbench::state& state, nvbench::type_list<Type, nvbench::enu
auto input_column = source_column->view();
auto input_table = cudf::table_view({input_column, input_column, input_column, input_column});

auto const run_bench = [&](auto const input) {
auto const run_bench = [&](auto const& input) {
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
auto result = cudf::unique(input, {0}, Keep, cudf::null_equality::EQUAL);
Expand Down Expand Up @@ -120,7 +120,7 @@ void nvbench_unique_list(nvbench::state& state, nvbench::type_list<Type, nvbench
auto const input_table = create_random_table(
{dtype}, table_size_bytes{static_cast<size_t>(size)}, data_profile{builder}, 0);

auto const run_bench = [&](auto const input) {
auto const run_bench = [&](auto const& input) {
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
auto result = cudf::unique(input, {0}, Keep, cudf::null_equality::EQUAL);
Expand Down
6 changes: 4 additions & 2 deletions cpp/include/cudf/scalar/scalar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,10 @@ class timestamp_scalar : public chrono_scalar<T> {

/**
* @brief Returns the duration in number of ticks since the UNIX epoch.
* @param stream CUDA stream used for device memory operations.
* @return The duration in number of ticks since the UNIX epoch
*/
rep_type ticks_since_epoch();
rep_type ticks_since_epoch(rmm::cuda_stream_view stream);
};

/**
Expand Down Expand Up @@ -716,9 +717,10 @@ class duration_scalar : public chrono_scalar<T> {

/**
* @brief Returns the duration in number of ticks.
* @param stream CUDA stream used for device memory operations.
* @return The duration in number of ticks
*/
rep_type count();
rep_type count(rmm::cuda_stream_view stream);
};

/**
Expand Down
10 changes: 8 additions & 2 deletions cpp/include/cudf/table/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ class table {
/**
* @brief Construct a new table by copying the contents of another table.
*
* Uses the specified `stream` and device_memory_resource for all allocations
* and copies.
*
* @param other The table to copy
* @param stream CUDA stream used for device memory operations.
* @param mr Device memory resource to use for all device memory allocations
*/
table(table const& other);

explicit table(table const& other,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
/**
* @brief Moves the contents from a vector of `unique_ptr`s to columns to
* construct a new table.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/column/column_factories.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ std::unique_ptr<column> make_dictionary_from_scalar(scalar const& s,
CUDF_EXPECTS(s.is_valid(stream), "cannot create a dictionary with a null key");
return make_dictionary_column(
make_column_from_scalar(s, 1, stream, mr),
make_column_from_scalar(numeric_scalar<uint32_t>(0), size, stream, mr),
make_column_from_scalar(numeric_scalar<uint32_t>(0, true, stream), size, stream, mr),
rmm::device_buffer{0, stream, mr},
0);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/filling/fill.cu
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct in_place_fill_range_dispatch {
{
auto unscaled = static_cast<cudf::fixed_point_scalar<T> const&>(value).value(stream);
using RepType = typename T::rep;
auto s = cudf::numeric_scalar<RepType>(unscaled, value.is_valid(stream));
auto s = cudf::numeric_scalar<RepType>(unscaled, value.is_valid(stream), stream);
in_place_fill<RepType>(destination, begin, end, s, stream);
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,14 @@ void aggregate_result_functor::operator()<aggregation::MERGE_HISTOGRAM>(aggregat
*
* @param column_0 The first column
* @param column_1 The second column
* @param stream CUDA stream used for device memory operations and kernel launches
* @return tuple with new null mask (if null masks of input differ) and new column views
*/
auto column_view_with_common_nulls(column_view const& column_0, column_view const& column_1)
auto column_view_with_common_nulls(column_view const& column_0,
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
column_view const& column_1,
rmm::cuda_stream_view stream)
{
auto [new_nullmask, null_count] = cudf::bitmask_and(table_view{{column_0, column_1}});
auto [new_nullmask, null_count] = cudf::bitmask_and(table_view{{column_0, column_1}}, stream);
if (null_count == 0) { return std::make_tuple(std::move(new_nullmask), column_0, column_1); }
auto column_view_with_new_nullmask = [](auto const& col, void* nullmask, auto null_count) {
return column_view(col.type(),
Expand Down Expand Up @@ -610,7 +613,7 @@ void aggregate_result_functor::operator()<aggregation::COVARIANCE>(aggregation c
// Covariance only for valid values in both columns.
// in non-identical null mask cases, this prevents caching of the results - STD, MEAN, COUNT.
auto [_, values_child0, values_child1] =
column_view_with_common_nulls(values.child(0), values.child(1));
column_view_with_common_nulls(values.child(0), values.child(1), stream);

auto mean_agg = make_mean_aggregation();
aggregate_result_functor(values_child0, helper, cache, stream, mr).operator()<aggregation::MEAN>(*mean_agg);
Expand Down Expand Up @@ -659,7 +662,7 @@ void aggregate_result_functor::operator()<aggregation::CORRELATION>(aggregation
// Correlation only for valid values in both columns.
// in non-identical null mask cases, this prevents caching of the results - STD, MEAN, COUNT
auto [_, values_child0, values_child1] =
column_view_with_common_nulls(values.child(0), values.child(1));
column_view_with_common_nulls(values.child(0), values.child(1), stream);

auto std_agg = make_std_aggregation();
aggregate_result_functor(values_child0, helper, cache, stream, mr).operator()<aggregation::STD>(*std_agg);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/groupby/sort/sort_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ column_view sort_groupby_helper::key_sort_order(rmm::cuda_stream_view stream)

if (_keys_pre_sorted == sorted::YES) {
_key_sorted_order = cudf::detail::sequence(_keys.num_rows(),
numeric_scalar<size_type>(0),
numeric_scalar<size_type>(1),
numeric_scalar<size_type>(0, true, stream),
numeric_scalar<size_type>(1, true, stream),
stream,
rmm::mr::get_current_device_resource());
return sliced_key_sorted_order();
Expand Down Expand Up @@ -236,7 +236,7 @@ column_view sort_groupby_helper::keys_bitmask_column(rmm::cuda_stream_view strea
auto [row_bitmask, null_count] =
cudf::detail::bitmask_and(_keys, stream, rmm::mr::get_current_device_resource());

auto const zero = numeric_scalar<int8_t>(0);
auto const zero = numeric_scalar<int8_t>(0, true, stream);
// Create a temporary variable and only set _keys_bitmask_column right before the return.
// This way, a 2nd (parallel) call to this will not be given a partially created object.
auto keys_bitmask_column = cudf::detail::sequence(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/hash/md5_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ std::unique_ptr<column> md5(table_view const& input,
{
if (input.num_columns() == 0 || input.num_rows() == 0) {
// Return the MD5 hash of a zero-length input.
string_scalar const string_128bit("d41d8cd98f00b204e9orig98ecf8427e");
string_scalar const string_128bit("d41d8cd98f00b204e9orig98ecf8427e", true, stream);
return make_column_from_scalar(string_128bit, input.num_rows(), stream, mr);
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,8 @@ table_with_metadata read_csv(cudf::io::datasource* source,
stream,
mr);

string_scalar quotechar_scalar(std::string(1, parse_opts.quotechar), true, stream);
string_scalar dblquotechar_scalar(std::string(2, parse_opts.quotechar), true, stream);
cudf::string_scalar quotechar_scalar(std::string(1, parse_opts.quotechar), true, stream);
cudf::string_scalar dblquotechar_scalar(std::string(2, parse_opts.quotechar), true, stream);
for (size_t i = 0; i < column_types.size(); ++i) {
metadata.schema_info.emplace_back(out_buffers[i].name);
if (column_types[i].id() == type_id::STRING && parse_opts.quotechar != '\0' &&
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,6 @@ struct column_to_strings_fn {

// Note: `null` replacement with `na_rep` deferred to `concatenate()`
// instead of column-wise; might be faster
//
// Note: Cannot pass `stream` to detail::<fname> version of <fname> calls below, because they are
// not exposed in header (see, for example, detail::concatenate(tbl_view, separator, na_rep,
// stream, mr) is declared and defined in combine.cu); Possible solution: declare `extern`, or
// just declare a prototype inside `namespace cudf::strings::detail`;

// bools:
//
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream)
auto const scol = structs_column_view(column);
size_t ret = 0;
for (int i = 0; i < scol.num_children(); i++) {
ret += column_size(scol.get_sliced_child(i), stream);
ret += column_size(scol.get_sliced_child(i, stream), stream);
}
return ret;
} else if (column.type().id() == type_id::LIST) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ std::unique_ptr<column> make_column(column_buffer_base<string_policy>& buffer,
std::move(col_content.children[strings_column_view::offsets_column_index]),
std::move(uint8_col),
null_count,
std::move(*col_content.null_mask));
std::move(*col_content.null_mask),
stream);
}

case type_id::LIST: {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/dremel.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ dremel_data get_encoding(column_view h_col,
}
std::unique_ptr<column> empty_list_offset_col;
if (has_empty_list_offsets) {
empty_list_offset_col = make_fixed_width_column(data_type(type_id::INT32), 1);
empty_list_offset_col = make_fixed_width_column(
data_type(type_to_id<size_type>()), 1, mask_state::UNALLOCATED, stream);
CUDF_CUDA_TRY(cudaMemsetAsync(
empty_list_offset_col->mutable_view().head(), 0, sizeof(size_type), stream.value()));
std::function<column_view(column_view const&)> normalize_col = [&](column_view const& col) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/lists/interleave_columns.cu
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ struct interleave_list_entries_impl<T, std::enable_if_t<cudf::is_fixed_width<T>(
if (data_has_null_mask) {
auto [null_mask, null_count] = cudf::detail::valid_if(
validities.begin(), validities.end(), thrust::identity{}, stream, mr);
if (null_count > 0) { output->set_null_mask(null_mask, null_count); }
if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); }
}

return output;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/scan/scan_exclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ std::unique_ptr<column> scan_exclusive(column_view const& input,

auto output = scan_agg_dispatch<scan_dispatcher>(
input, agg, static_cast<bitmask_type*>(mask.data()), stream, mr);
output->set_null_mask(mask, null_count);
output->set_null_mask(std::move(mask), null_count);

return output;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/scan/scan_inclusive.cu
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ std::unique_ptr<column> scan_inclusive(column_view const& input,

auto output = scan_agg_dispatch<scan_dispatcher>(
input, agg, static_cast<bitmask_type*>(mask.data()), stream, mr);
output->set_null_mask(mask, null_count);
output->set_null_mask(std::move(mask), null_count);

// If the input is a structs column, we also need to push down nulls from the parent output column
// into the children columns.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/segmented/simple.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ std::unique_ptr<column> simple_segmented_reduction(

// Cast initial value
ResultType initial_value = [&] {
if (init.has_value() && init.value().get().is_valid()) {
if (init.has_value() && init.value().get().is_valid(stream)) {
using ScalarType = cudf::scalar_type_t<InputType>;
auto input_value = static_cast<ScalarType const*>(&init.value().get())->value(stream);
return static_cast<ResultType>(input_value);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/segmented/update_validity.cu
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void segmented_update_validity(column& result,
offsets.end() - 1,
offsets.begin() + 1,
null_handling,
init.has_value() ? std::optional(init.value().get().is_valid()) : std::nullopt,
init.has_value() ? std::optional(init.value().get().is_valid(stream)) : std::nullopt,
stream,
mr);
result.set_null_mask(std::move(output_null_mask), output_null_count);
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/reductions/simple.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ std::unique_ptr<scalar> simple_reduction(column_view const& col,

// Cast initial value
std::optional<ResultType> const initial_value = [&] {
if (init.has_value() && init.value().get().is_valid()) {
if (init.has_value() && init.value().get().is_valid(stream)) {
using ScalarType = cudf::scalar_type_t<ElementType>;
auto input_value = static_cast<ScalarType const*>(&init.value().get())->value(stream);
return std::optional<ResultType>(static_cast<ResultType>(input_value));
Expand All @@ -89,7 +89,8 @@ std::unique_ptr<scalar> simple_reduction(column_view const& col,

// set scalar is valid
result->set_valid_async(
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid()), stream);
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid(stream)),
stream);
return result;
}

Expand Down Expand Up @@ -130,7 +131,8 @@ std::unique_ptr<scalar> fixed_point_reduction(
auto result_scalar =
cudf::make_fixed_point_scalar<DecimalXX>(val->value(stream), scale, stream, mr);
result_scalar->set_valid_async(
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid()), stream);
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid(stream)),
stream);
return result_scalar;
}

Expand Down Expand Up @@ -169,7 +171,8 @@ std::unique_ptr<scalar> dictionary_reduction(

// set scalar is valid
result->set_valid_async(
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid()), stream);
col.null_count() < col.size() && (!init.has_value() || init.value().get().is_valid(stream)),
stream);
return result;
}

Expand Down
18 changes: 11 additions & 7 deletions cpp/src/scalar/scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,9 @@ duration_scalar<T>::duration_scalar(duration_scalar<T> const& other,
}

template <typename T>
typename duration_scalar<T>::rep_type duration_scalar<T>::count()
typename duration_scalar<T>::rep_type duration_scalar<T>::count(rmm::cuda_stream_view stream)
{
return this->value().count();
return this->value(stream).count();
}

/**
Expand All @@ -439,9 +439,10 @@ template class duration_scalar<duration_us>;
template class duration_scalar<duration_ns>;

template <typename T>
typename timestamp_scalar<T>::rep_type timestamp_scalar<T>::ticks_since_epoch()
typename timestamp_scalar<T>::rep_type timestamp_scalar<T>::ticks_since_epoch(
rmm::cuda_stream_view stream)
{
return this->value().time_since_epoch().count();
return this->value(stream).time_since_epoch().count();
}

/**
Expand Down Expand Up @@ -541,7 +542,7 @@ struct_scalar::struct_scalar(table_view const& data,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: scalar(data_type(type_id::STRUCT), is_valid, stream, mr),
_data{init_data(table{data}, is_valid, stream, mr)}
_data{init_data(table{data, stream, mr}, is_valid, stream, mr)}
{
assert_valid_size();
}
Expand All @@ -551,8 +552,11 @@ struct_scalar::struct_scalar(host_span<column_view const> data,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: scalar(data_type(type_id::STRUCT), is_valid, stream, mr),
_data{init_data(
table{table_view{std::vector<column_view>{data.begin(), data.end()}}}, is_valid, stream, mr)}
_data{
init_data(table{table_view{std::vector<column_view>{data.begin(), data.end()}}, stream, mr},
is_valid,
stream,
mr)}
{
assert_valid_size();
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/sort/sort_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ std::unique_ptr<column> sorted_order(table_view input,
rmm::mr::device_memory_resource* mr)
{
if (input.num_rows() == 0 or input.num_columns() == 0) {
return cudf::make_numeric_column(data_type(type_to_id<size_type>()), 0);
return cudf::make_numeric_column(
data_type(type_to_id<size_type>()), 0, mask_state::UNALLOCATED, stream);
}

if (not column_order.empty()) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/strings/convert/convert_booleans.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ std::unique_ptr<column> to_booleans(strings_column_view const& input,
rmm::mr::device_memory_resource* mr)
{
size_type strings_count = input.size();
if (strings_count == 0) return make_numeric_column(data_type{type_id::BOOL8}, 0);
if (strings_count == 0) {
return make_numeric_column(data_type{type_id::BOOL8}, 0, mask_state::UNALLOCATED, stream);
}

CUDF_EXPECTS(true_string.is_valid(stream) && true_string.size() > 0,
"Parameter true_string must not be empty.");
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/strings/convert/convert_durations.cu
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,9 @@ std::unique_ptr<column> to_durations(strings_column_view const& input,
rmm::mr::device_memory_resource* mr)
{
size_type strings_count = input.size();
if (strings_count == 0) return make_duration_column(duration_type, 0);
if (strings_count == 0) {
return make_duration_column(duration_type, 0, mask_state::UNALLOCATED, stream);
}

CUDF_EXPECTS(!format.empty(), "Format parameter must not be empty.");

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/strings/convert/convert_floats.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ std::unique_ptr<column> to_floats(strings_column_view const& input,
rmm::mr::device_memory_resource* mr)
{
size_type strings_count = input.size();
if (strings_count == 0) return make_numeric_column(output_type, 0);
if (strings_count == 0) {
return make_numeric_column(output_type, 0, mask_state::UNALLOCATED, stream);
}
auto strings_column = column_device_view::create(input.parent(), stream);
auto d_strings = *strings_column;
// create float output column copying the strings null-mask
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/strings/convert/convert_integers.cu
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ std::unique_ptr<column> to_integers(strings_column_view const& input,
rmm::mr::device_memory_resource* mr)
{
size_type strings_count = input.size();
if (strings_count == 0) return make_numeric_column(output_type, 0);
if (strings_count == 0) {
return make_numeric_column(output_type, 0, mask_state::UNALLOCATED, stream);
}

// Create integer output column copying the strings null-mask
auto results = make_numeric_column(output_type,
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/strings/convert/convert_ipv4.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ std::unique_ptr<column> ipv4_to_integers(strings_column_view const& input,
rmm::mr::device_memory_resource* mr)
{
size_type strings_count = input.size();
if (strings_count == 0) return make_numeric_column(data_type{type_id::INT64}, 0);
if (strings_count == 0) {
return make_numeric_column(data_type{type_id::INT64}, 0, mask_state::UNALLOCATED, stream);
}

auto strings_column = column_device_view::create(input.parent(), stream);
// create output column copying the strings' null-mask
Expand Down
Loading