Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt cudf::scalar classes to changes in rmm::device_scalar #8411

Merged
merged 19 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/copy_if_else.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ std::unique_ptr<column> copy_if_else(
<<<grid.num_blocks, block_size, 0, stream.value()>>>(
lhs_begin, rhs, filter, *out_v, valid_count.data());

out->set_null_count(size - valid_count.value());
out->set_null_count(size - valid_count.value(stream));
} else {
// call the kernel
copy_if_else_kernel<block_size, Element, LeftIter, RightIter, FilterFn, false>
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/copy_range.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void copy_range(SourceValueIterator source_value_begin,
target_end,
null_count.data());

target.set_null_count(null_count.value());
target.set_null_count(null_count.value(stream));
} else {
auto kernel =
copy_range_kernel<block_size, SourceValueIterator, SourceValidityIterator, T, false>;
Expand Down
31 changes: 16 additions & 15 deletions cpp/include/cudf/scalar/scalar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class scalar {
* @param is_valid true: set the value to valid. false: set it to null
* @param stream CUDA stream used for device memory operations.
*/
void set_valid(bool is_valid, rmm::cuda_stream_view stream = rmm::cuda_stream_default);
void set_valid_async(bool is_valid, rmm::cuda_stream_view stream = rmm::cuda_stream_default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the r-value version of this function needs to be deleted?

Copy link
Member Author

@harrism harrism Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think it should not be deleted. Doing so results in ambiguity between the bool and bool&& versions of the function. The reason to delete would be to avoid the case where a reference to a temporary is passed to an async function, and the temporary is destroyed before the copy completes. But bool is a special case for which the lowest-level implementation in device_uvector uses cudaMemsetAsync. And since both set_valid_async and cudaMemsetAsync both take non-reference parameters, the value of the temporary will be copied into cudaMemsetAsync before the temporary can be destroyed.


/**
* @brief Indicates whether the scalar contains a valid value
Expand All @@ -97,10 +97,10 @@ class scalar {
bool const* validity_data() const;

protected:
data_type _type{type_id::EMPTY}; ///< Logical type of value in the scalar
rmm::device_scalar<bool> _is_valid{}; ///< Device bool signifying validity
data_type _type{type_id::EMPTY}; ///< Logical type of value in the scalar
rmm::device_scalar<bool> _is_valid; ///< Device bool signifying validity

scalar() = default;
scalar() = delete;

/**
* @brief Construct a new scalar object
Expand Down Expand Up @@ -175,9 +175,9 @@ class fixed_width_scalar : public scalar {
T const* data() const;

protected:
rmm::device_scalar<T> _data{}; ///< device memory containing the value
rmm::device_scalar<T> _data; ///< device memory containing the value

fixed_width_scalar();
fixed_width_scalar() = delete;

/**
* @brief Construct a new fixed width scalar object
Expand Down Expand Up @@ -218,7 +218,7 @@ class numeric_scalar : public detail::fixed_width_scalar<T> {
static_assert(is_numeric<T>(), "Unexpected non-numeric type.");

public:
numeric_scalar() = default;
numeric_scalar() = delete;
~numeric_scalar() = default;
numeric_scalar(numeric_scalar&& other) = default;

Expand Down Expand Up @@ -276,7 +276,7 @@ class fixed_point_scalar : public scalar {
using rep_type = typename T::rep;
using value_type = T;

fixed_point_scalar();
fixed_point_scalar() = delete;
~fixed_point_scalar() = default;
fixed_point_scalar(fixed_point_scalar&& other) = default;

Expand Down Expand Up @@ -375,7 +375,7 @@ class fixed_point_scalar : public scalar {
rep_type const* data() const;

protected:
rmm::device_scalar<rep_type> _data{}; ///< device memory containing the value
rmm::device_scalar<rep_type> _data; ///< device memory containing the value
};

/**
Expand All @@ -385,10 +385,11 @@ class string_scalar : public scalar {
public:
using value_type = cudf::string_view;

string_scalar();
string_scalar() = delete;
~string_scalar() = default;
string_scalar(string_scalar&& other) = default;

// string_scalar(string_scalar const& other) = delete;
string_scalar& operator=(string_scalar const& other) = delete;
string_scalar& operator=(string_scalar&& other) = delete;

Expand Down Expand Up @@ -488,7 +489,7 @@ class chrono_scalar : public detail::fixed_width_scalar<T> {
static_assert(is_chrono<T>(), "Unexpected non-chrono type");

public:
chrono_scalar() = default;
chrono_scalar() = delete;
~chrono_scalar() = default;
chrono_scalar(chrono_scalar&& other) = default;

Expand Down Expand Up @@ -540,7 +541,7 @@ class timestamp_scalar : public chrono_scalar<T> {
using chrono_scalar<T>::chrono_scalar;
using rep_type = typename T::rep;

timestamp_scalar() = default;
timestamp_scalar() = delete;
timestamp_scalar(timestamp_scalar&& other) = default;

/**
Expand Down Expand Up @@ -583,7 +584,7 @@ class duration_scalar : public chrono_scalar<T> {
using chrono_scalar<T>::chrono_scalar;
using rep_type = typename T::rep;

duration_scalar() = default;
duration_scalar() = delete;
duration_scalar(duration_scalar&& other) = default;

/**
Expand Down Expand Up @@ -621,7 +622,7 @@ class duration_scalar : public chrono_scalar<T> {
*/
class list_scalar : public scalar {
public:
list_scalar();
list_scalar() = delete;
~list_scalar() = default;
list_scalar(list_scalar&& other) = default;

Expand Down Expand Up @@ -681,7 +682,7 @@ class list_scalar : public scalar {
*/
class struct_scalar : public scalar {
public:
struct_scalar();
struct_scalar() = delete;
~struct_scalar() = default;
struct_scalar(struct_scalar&& other) = default;
struct_scalar(struct_scalar const& other) = default;
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/copying/get_element.cu
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ struct get_element_functor {
{
auto device_col = column_device_view::create(input, stream);

rmm::device_scalar<string_view> temp_data;
rmm::device_scalar<bool> temp_valid;
rmm::device_scalar<string_view> temp_data(stream, mr);
rmm::device_scalar<bool> temp_valid(stream, mr);

device_single_thread(
[buffer = temp_data.data(),
Expand Down Expand Up @@ -105,7 +105,7 @@ struct get_element_functor {

if (!key_index_scalar.is_valid(stream)) {
auto null_result = make_default_constructed_scalar(dict_view.keys().type(), stream, mr);
null_result->set_valid(false, stream);
null_result->set_valid_async(false, stream);
return null_result;
}

Expand Down Expand Up @@ -154,8 +154,8 @@ struct get_element_functor {

auto device_col = column_device_view::create(input, stream);

rmm::device_scalar<Type> temp_data;
rmm::device_scalar<bool> temp_valid;
rmm::device_scalar<Type> temp_data(stream, mr);
rmm::device_scalar<bool> temp_valid(stream, mr);

device_single_thread(
[buffer = temp_data.data(),
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/json/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ std::unique_ptr<table> create_json_keys_info_table(const parse_options_view &opt
options, data, row_offsets, key_counter.data(), {}, stream);

// Allocate columns to store hash value, length, and offset of each JSON object key in the input
auto const num_keys = key_counter.value();
auto const num_keys = key_counter.value(stream);
std::vector<std::unique_ptr<column>> info_columns;
info_columns.emplace_back(make_numeric_column(data_type(type_id::UINT64), num_keys));
info_columns.emplace_back(make_numeric_column(data_type(type_id::UINT16), num_keys));
Expand All @@ -157,7 +157,7 @@ std::unique_ptr<table> create_json_keys_info_table(const parse_options_view &opt
auto const info_table_mdv = mutable_table_device_view::create(info_table->mutable_view(), stream);

// Reset the key counter - now used for indexing
key_counter.set_value_zero(stream);
key_counter.set_value_to_zero_async(stream);
// Fill the allocated columns
cudf::io::json::gpu::collect_keys_info(
options, data, row_offsets, key_counter.data(), {*info_table_mdv}, stream);
Expand Down Expand Up @@ -433,7 +433,7 @@ void reader::impl::set_column_names(device_span<uint64_t const> rec_starts,
// use keys as column names if input rows are objects
auto keys_desc = get_json_object_keys_hashes(rec_starts, stream);
metadata_.column_names = keys_desc.first;
set_column_map(std::move(keys_desc.second));
set_column_map(std::move(keys_desc.second), stream);
} else {
int cols_found = 0;
bool quotation = false;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/json/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ class reader::impl {
* @brief Sets the column map data member and makes a device copy to be used as a kernel
* parameter.
*/
void set_column_map(col_map_ptr_type &&map)
void set_column_map(col_map_ptr_type &&map, rmm::cuda_stream_view stream)
{
key_to_col_idx_map_ = std::move(map);
d_key_col_map_ = std::make_unique<rmm::device_scalar<col_map_type>>(*key_to_col_idx_map_);
d_key_col_map_ =
std::make_unique<rmm::device_scalar<col_map_type>>(*key_to_col_idx_map_, stream);
}
/**
* @brief Gets the pointer to the column hash map in the device memory.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ probe_join_hash_table(cudf::table_device_view build_table,

constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE};
detail::grid_1d config(probe_table.num_rows(), block_size);
write_index.set_value_zero(stream);
write_index.set_value_to_zero_async(stream);

row_hash hash_probe{probe_table};
row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ std::size_t estimate_join_output_size(table_device_view build_table,
do {
sample_probe_num_rows = std::min(sample_probe_num_rows, probe_table_num_rows);

size_estimate.set_value_zero(stream);
size_estimate.set_value_to_zero_async(stream);

row_hash hash_probe{probe_table};
row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/compound.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ std::unique_ptr<scalar> compound_reduction(column_view const& col,
}

// set scalar is valid
result->set_valid(col.null_count() < col.size(), stream);
result->set_valid_async(col.null_count() < col.size(), stream);
return result;
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/reductions/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::unique_ptr<scalar> reduce(
rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource())
{
std::unique_ptr<scalar> result = make_default_constructed_scalar(output_dtype, stream, mr);
result->set_valid(false, stream);
result->set_valid_async(false, stream);

// check if input column is empty
if (col.size() <= col.null_count()) return result;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reductions/simple.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ std::unique_ptr<scalar> simple_reduction(column_view const& col,
}();

// set scalar is valid
result->set_valid((col.null_count() < col.size()), stream);
result->set_valid_async(col.null_count() < col.size(), stream);
return result;
}

Expand Down Expand Up @@ -147,7 +147,7 @@ std::unique_ptr<scalar> dictionary_reduction(column_view const& col,
}();

// set scalar is valid
result->set_valid((col.null_count() < col.size()), stream);
result->set_valid_async(col.null_count() < col.size(), stream);
return result;
}

Expand Down
29 changes: 8 additions & 21 deletions cpp/src/scalar/scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ scalar::scalar(scalar const& other,

data_type scalar::type() const noexcept { return _type; }

void scalar::set_valid(bool is_valid, rmm::cuda_stream_view stream)
void scalar::set_valid_async(bool is_valid, rmm::cuda_stream_view stream)
{
_is_valid.set_value(is_valid, stream);
_is_valid.set_value_async(is_valid, stream);
}

bool scalar::is_valid(rmm::cuda_stream_view stream) const { return _is_valid.value(stream); }
Expand All @@ -57,8 +57,6 @@ bool* scalar::validity_data() { return _is_valid.data(); }

bool const* scalar::validity_data() const { return _is_valid.data(); }

string_scalar::string_scalar() : scalar(data_type(type_id::STRING)) {}

string_scalar::string_scalar(std::string const& string,
bool is_valid,
rmm::cuda_stream_view stream,
Expand Down Expand Up @@ -112,17 +110,14 @@ std::string string_scalar::to_string(rmm::cuda_stream_view stream) const
return result;
}

template <typename T>
fixed_point_scalar<T>::fixed_point_scalar() : scalar(data_type(type_to_id<T>())){};

template <typename T>
fixed_point_scalar<T>::fixed_point_scalar(rep_type value,
numeric::scale_type scale,
bool is_valid,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: scalar{data_type{type_to_id<T>(), static_cast<int32_t>(scale)}, is_valid, stream, mr},
_data{value}
_data{value, stream, mr}
{
}

Expand All @@ -131,7 +126,7 @@ fixed_point_scalar<T>::fixed_point_scalar(rep_type value,
bool is_valid,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: scalar{data_type{type_to_id<T>(), 0}, is_valid, stream, mr}, _data{value}
: scalar{data_type{type_to_id<T>(), 0}, is_valid, stream, mr}, _data{value, stream, mr}
{
}

Expand All @@ -140,7 +135,8 @@ fixed_point_scalar<T>::fixed_point_scalar(T value,
bool is_valid,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: scalar{data_type{type_to_id<T>(), value.scale()}, is_valid, stream, mr}, _data{value.value()}
: scalar{data_type{type_to_id<T>(), value.scale()}, is_valid, stream, mr},
_data{value.value(), stream, mr}
{
}

Expand Down Expand Up @@ -202,11 +198,6 @@ template class fixed_point_scalar<numeric::decimal64>;

namespace detail {

template <typename T>
fixed_width_scalar<T>::fixed_width_scalar() : scalar(data_type(type_to_id<T>()))
{
}

template <typename T>
fixed_width_scalar<T>::fixed_width_scalar(T value,
bool is_valid,
Expand Down Expand Up @@ -237,8 +228,8 @@ fixed_width_scalar<T>::fixed_width_scalar(fixed_width_scalar<T> const& other,
template <typename T>
void fixed_width_scalar<T>::set_value(T value, rmm::cuda_stream_view stream)
{
_data.set_value(value, stream);
this->set_valid(true, stream);
_data.set_value_async(value, stream);
this->set_valid_async(true, stream);
}

template <typename T>
Expand Down Expand Up @@ -491,8 +482,6 @@ TS_CTOR(timestamp_ns, duration_us)
TS_CTOR(timestamp_ns, duration_ns)
TS_CTOR(timestamp_ns, int64_t)

list_scalar::list_scalar() : scalar(data_type(type_id::LIST)) {}

list_scalar::list_scalar(cudf::column_view const& data,
bool is_valid,
rmm::cuda_stream_view stream,
Expand All @@ -518,8 +507,6 @@ list_scalar::list_scalar(list_scalar const& other,

column_view list_scalar::view() const { return _data.view(); }

struct_scalar::struct_scalar() : scalar(data_type(type_id::STRUCT)) {}

struct_scalar::struct_scalar(table_view const& data,
bool is_valid,
rmm::cuda_stream_view stream,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/strings/attributes.cu
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ std::unique_ptr<column> code_points(
return length;
},
thrust::plus<size_type>());
size_type const zero = 0;
offsets.set_element_async(0, zero, stream);

offsets.set_element_to_zero_async(0, stream);

// the total size is the number of characters in the entire column
size_type num_characters = offsets.back_element(stream);
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/strings/combine/join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,9 @@ std::unique_ptr<column> join_strings(strings_column_view const& strings,
return bytes;
},
thrust::plus<size_type>());
size_type const zero = 0;
output_offsets.set_element_async(0, zero, stream);

output_offsets.set_element_to_zero_async(0, stream);
// total size is the last entry
// Note this call does a synchronize on the stream and thereby also protects the
// set_element_async parameter from going out of scope before it is used.
size_type const bytes = output_offsets.back_element(stream);

// build offsets column (only 1 string so 2 offset entries)
Expand Down
Loading