Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into stream-json-get-object
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Oct 23, 2023
2 parents 0a37c0b + 253f6a6 commit a3b8a40
Show file tree
Hide file tree
Showing 39 changed files with 1,810 additions and 719 deletions.
6 changes: 3 additions & 3 deletions cpp/include/cudf/lists/detail/scatter.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/lists/detail/scatter_helper.cuh>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
Expand Down Expand Up @@ -130,8 +130,8 @@ std::unique_ptr<column> scatter_impl(rmm::device_uvector<unbound_list_view> cons
std::vector<std::unique_ptr<column>> children;
children.emplace_back(std::move(offsets_column));
children.emplace_back(std::move(child_column));
auto null_mask =
target.has_nulls() ? copy_bitmask(target, stream, mr) : rmm::device_buffer{0, stream, mr};
auto null_mask = target.has_nulls() ? cudf::detail::copy_bitmask(target, stream, mr)
: rmm::device_buffer{0, stream, mr};

// The output column from this function only has null masks copied from the target columns.
// That is still not a correct final null mask for the scatter result.
Expand Down
24 changes: 22 additions & 2 deletions cpp/include/cudf/null_mask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/device_buffer.hpp>
Expand Down Expand Up @@ -80,13 +81,15 @@ size_type num_bitmask_words(size_type number_of_bits);
*
* @param size The number of elements to be represented by the mask
* @param state The desired state of the mask
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
* @return A `device_buffer` for use as a null bitmask
* satisfying the desired size and state
*/
rmm::device_buffer create_null_mask(
size_type size,
mask_state state,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -100,8 +103,13 @@ rmm::device_buffer create_null_mask(
* @param begin_bit Index of the first bit to set (inclusive)
* @param end_bit Index of the last bit to set (exclusive)
* @param valid If true set all entries to valid; otherwise, set all to null
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void set_null_mask(bitmask_type* bitmask, size_type begin_bit, size_type end_bit, bool valid);
void set_null_mask(bitmask_type* bitmask,
size_type begin_bit,
size_type end_bit,
bool valid,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Creates a `device_buffer` from a slice of bitmask defined by a range
Expand All @@ -115,6 +123,7 @@ void set_null_mask(bitmask_type* bitmask, size_type begin_bit, size_type end_bit
* @param mask Bitmask residing in device memory whose bits will be copied
* @param begin_bit Index of the first bit to be copied (inclusive)
* @param end_bit Index of the last bit to be copied (exclusive)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
* @return A `device_buffer` containing the bits
* `[begin_bit, end_bit)` from `mask`.
Expand All @@ -123,6 +132,7 @@ rmm::device_buffer copy_bitmask(
bitmask_type const* mask,
size_type begin_bit,
size_type end_bit,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -132,12 +142,14 @@ rmm::device_buffer copy_bitmask(
* Returns empty `device_buffer` if the column is not nullable
*
* @param view Column view whose bitmask needs to be copied
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
* @return A `device_buffer` containing the bits
* `[view.offset(), view.offset() + view.size())` from `view`'s bitmask.
*/
rmm::device_buffer copy_bitmask(
column_view const& view,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -148,11 +160,13 @@ rmm::device_buffer copy_bitmask(
* If no column in the table is nullable, an empty bitmask is returned.
*
* @param view The table of columns
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
* @return A pair of resulting bitmask and count of unset bits
*/
std::pair<rmm::device_buffer, size_type> bitmask_and(
table_view const& view,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -163,11 +177,13 @@ std::pair<rmm::device_buffer, size_type> bitmask_and(
* If no column in the table is nullable, an empty bitmask is returned.
*
* @param view The table of columns
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned device_buffer
* @return A pair of resulting bitmask and count of unset bits
*/
std::pair<rmm::device_buffer, size_type> bitmask_or(
table_view const& view,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -183,8 +199,12 @@ std::pair<rmm::device_buffer, size_type> bitmask_or(
* @param bitmask Validity bitmask residing in device memory.
* @param start Index of the first bit to count (inclusive).
* @param stop Index of the last bit to count (exclusive).
* @param stream CUDA stream used for device memory operations and kernel launches
* @return The number of null elements in the specified range.
*/
cudf::size_type null_count(bitmask_type const* bitmask, size_type start, size_type stop);
cudf::size_type null_count(bitmask_type const* bitmask,
size_type start,
size_type stop,
rmm::cuda_stream_view stream = cudf::get_default_stream());
/** @} */ // end of group
} // namespace cudf
2 changes: 1 addition & 1 deletion cpp/src/binaryop/binaryop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ std::unique_ptr<column> binary_operation(column_view const& lhs,

CUDF_EXPECTS((lhs.size() == rhs.size()), "Column sizes don't match");

auto [new_mask, null_count] = bitmask_and(table_view({lhs, rhs}), stream, mr);
auto [new_mask, null_count] = cudf::detail::bitmask_and(table_view({lhs, rhs}), stream, mr);
auto out =
make_fixed_width_column(output_type, lhs.size(), std::move(new_mask), null_count, stream, mr);

Expand Down
38 changes: 28 additions & 10 deletions cpp/src/bitmask/null_mask.cu
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,21 @@ void set_null_mask(bitmask_type* bitmask,
// Create a device_buffer for a null mask
rmm::device_buffer create_null_mask(size_type size,
mask_state state,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::create_null_mask(size, state, cudf::get_default_stream(), mr);
return detail::create_null_mask(size, state, stream, mr);
}

// Set pre-allocated null mask of given bit range [begin_bit, end_bit) to valid, if valid==true,
// or null, otherwise;
void set_null_mask(bitmask_type* bitmask, size_type begin_bit, size_type end_bit, bool valid)
void set_null_mask(bitmask_type* bitmask,
size_type begin_bit,
size_type end_bit,
bool valid,
rmm::cuda_stream_view stream)
{
return detail::set_null_mask(bitmask, begin_bit, end_bit, valid, cudf::get_default_stream());
return detail::set_null_mask(bitmask, begin_bit, end_bit, valid, stream);
}

namespace detail {
Expand Down Expand Up @@ -511,33 +516,46 @@ std::pair<rmm::device_buffer, size_type> bitmask_or(table_view const& view,
rmm::device_buffer copy_bitmask(bitmask_type const* mask,
size_type begin_bit,
size_type end_bit,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::copy_bitmask(mask, begin_bit, end_bit, cudf::get_default_stream(), mr);
CUDF_FUNC_RANGE();
return detail::copy_bitmask(mask, begin_bit, end_bit, stream, mr);
}

// Create a bitmask from a column view
rmm::device_buffer copy_bitmask(column_view const& view, rmm::mr::device_memory_resource* mr)
rmm::device_buffer copy_bitmask(column_view const& view,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::copy_bitmask(view, cudf::get_default_stream(), mr);
CUDF_FUNC_RANGE();
return detail::copy_bitmask(view, stream, mr);
}

std::pair<rmm::device_buffer, size_type> bitmask_and(table_view const& view,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::bitmask_and(view, cudf::get_default_stream(), mr);
CUDF_FUNC_RANGE();
return detail::bitmask_and(view, stream, mr);
}

std::pair<rmm::device_buffer, size_type> bitmask_or(table_view const& view,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return detail::bitmask_or(view, cudf::get_default_stream(), mr);
CUDF_FUNC_RANGE();
return detail::bitmask_or(view, stream, mr);
}

// Count non-zero bits in the specified range
cudf::size_type null_count(bitmask_type const* bitmask, size_type start, size_type stop)
cudf::size_type null_count(bitmask_type const* bitmask,
size_type start,
size_type stop,
rmm::cuda_stream_view stream)
{
return detail::null_count(bitmask, start, stop, cudf::get_default_stream());
CUDF_FUNC_RANGE();
return detail::null_count(bitmask, start, stop, stream);
}

} // namespace cudf
2 changes: 1 addition & 1 deletion cpp/src/copying/concatenate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ rmm::device_buffer concatenate_masks(host_span<column_view const> views,
});

rmm::device_buffer null_mask =
create_null_mask(total_element_count, mask_state::UNINITIALIZED, mr);
cudf::detail::create_null_mask(total_element_count, mask_state::UNINITIALIZED, stream, mr);

detail::concatenate_masks(views, static_cast<bitmask_type*>(null_mask.data()), stream);

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ struct column_scalar_scatterer_impl<struct_view, MapIterator> {

// Compute null mask
rmm::device_buffer null_mask =
target.nullable() ? copy_bitmask(target, stream, mr)
: create_null_mask(target.size(), mask_state::UNALLOCATED, stream, mr);
target.nullable()
? detail::copy_bitmask(target, stream, mr)
: detail::create_null_mask(target.size(), mask_state::UNALLOCATED, stream, mr);
column null_mask_stub(data_type{type_id::STRUCT},
target.size(),
rmm::device_buffer{},
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ void sparse_to_dense_results(table_view const& keys,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto row_bitmask = bitmask_and(keys, stream, rmm::mr::get_current_device_resource()).first;
auto row_bitmask =
cudf::detail::bitmask_and(keys, stream, rmm::mr::get_current_device_resource()).first;
bool skip_key_rows_with_nulls = keys_have_nulls and include_null_keys == null_policy::EXCLUDE;
bitmask_type const* row_bitmask_ptr =
skip_key_rows_with_nulls ? static_cast<bitmask_type*>(row_bitmask.data()) : nullptr;
Expand Down
95 changes: 22 additions & 73 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,61 +339,6 @@ struct parquet_field_struct_list : public parquet_field_list<T> {
}
};

// TODO(ets): replace current union handling (which mirrors thrift) to use std::optional fields
// in a struct
/**
* @brief Functor to read a union member from CompactProtocolReader
*
* @tparam is_empty True if tparam `T` type is empty type, else false.
*
* @return True if field types mismatch or if the process of reading a
* union member fails
*/
template <typename T, bool is_empty = false>
class ParquetFieldUnionFunctor : public parquet_field {
bool& is_set;
T& val;

public:
ParquetFieldUnionFunctor(int f, bool& b, T& v) : parquet_field(f), is_set(b), val(v) {}

inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_STRUCT) {
return true;
} else {
is_set = true;
return !cpr->read(&val);
}
}
};

template <typename T>
class ParquetFieldUnionFunctor<T, true> : public parquet_field {
bool& is_set;
T& val;

public:
ParquetFieldUnionFunctor(int f, bool& b, T& v) : parquet_field(f), is_set(b), val(v) {}

inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
if (field_type != ST_FLD_STRUCT) {
return true;
} else {
is_set = true;
cpr->skip_struct_field(field_type);
return false;
}
}
};

template <typename T>
ParquetFieldUnionFunctor<T, std::is_empty_v<T>> ParquetFieldUnion(int f, bool& b, T& v)
{
return ParquetFieldUnionFunctor<T, std::is_empty_v<T>>(f, b, v);
}

/**
* @brief Functor to read a binary from CompactProtocolReader
*
Expand Down Expand Up @@ -595,34 +540,38 @@ bool CompactProtocolReader::read(FileMetaData* f)

bool CompactProtocolReader::read(SchemaElement* s)
{
using optional_converted_type =
parquet_field_optional<ConvertedType, parquet_field_enum<ConvertedType>>;
using optional_logical_type =
parquet_field_optional<LogicalType, parquet_field_struct<LogicalType>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, s->type),
parquet_field_int32(2, s->type_length),
parquet_field_enum<FieldRepetitionType>(3, s->repetition_type),
parquet_field_string(4, s->name),
parquet_field_int32(5, s->num_children),
parquet_field_enum<ConvertedType>(6, s->converted_type),
optional_converted_type(6, s->converted_type),
parquet_field_int32(7, s->decimal_scale),
parquet_field_int32(8, s->decimal_precision),
parquet_field_optional<int32_t, parquet_field_int32>(9, s->field_id),
parquet_field_struct(10, s->logical_type));
optional_logical_type(10, s->logical_type));
return function_builder(this, op);
}

bool CompactProtocolReader::read(LogicalType* l)
{
auto op =
std::make_tuple(ParquetFieldUnion(1, l->isset.STRING, l->STRING),
ParquetFieldUnion(2, l->isset.MAP, l->MAP),
ParquetFieldUnion(3, l->isset.LIST, l->LIST),
ParquetFieldUnion(4, l->isset.ENUM, l->ENUM),
ParquetFieldUnion(5, l->isset.DECIMAL, l->DECIMAL), // read the struct
ParquetFieldUnion(6, l->isset.DATE, l->DATE),
ParquetFieldUnion(7, l->isset.TIME, l->TIME), // read the struct
ParquetFieldUnion(8, l->isset.TIMESTAMP, l->TIMESTAMP), // read the struct
ParquetFieldUnion(10, l->isset.INTEGER, l->INTEGER), // read the struct
ParquetFieldUnion(11, l->isset.UNKNOWN, l->UNKNOWN),
ParquetFieldUnion(12, l->isset.JSON, l->JSON),
ParquetFieldUnion(13, l->isset.BSON, l->BSON));
auto op = std::make_tuple(
parquet_field_union_enumerator(1, l->type),
parquet_field_union_enumerator(2, l->type),
parquet_field_union_enumerator(3, l->type),
parquet_field_union_enumerator(4, l->type),
parquet_field_union_struct<LogicalType::Type, DecimalType>(5, l->type, l->decimal_type),
parquet_field_union_enumerator(6, l->type),
parquet_field_union_struct<LogicalType::Type, TimeType>(7, l->type, l->time_type),
parquet_field_union_struct<LogicalType::Type, TimestampType>(8, l->type, l->timestamp_type),
parquet_field_union_struct<LogicalType::Type, IntType>(10, l->type, l->int_type),
parquet_field_union_enumerator(11, l->type),
parquet_field_union_enumerator(12, l->type),
parquet_field_union_enumerator(13, l->type));
return function_builder(this, op);
}

Expand All @@ -648,9 +597,9 @@ bool CompactProtocolReader::read(TimestampType* t)

bool CompactProtocolReader::read(TimeUnit* u)
{
auto op = std::make_tuple(ParquetFieldUnion(1, u->isset.MILLIS, u->MILLIS),
ParquetFieldUnion(2, u->isset.MICROS, u->MICROS),
ParquetFieldUnion(3, u->isset.NANOS, u->NANOS));
auto op = std::make_tuple(parquet_field_union_enumerator(1, u->type),
parquet_field_union_enumerator(2, u->type),
parquet_field_union_enumerator(3, u->type));
return function_builder(this, op);
}

Expand Down
Loading

0 comments on commit a3b8a40

Please sign in to comment.