Skip to content

Commit

Permalink
Partial clean up of ORC writer (#7324)
Browse files Browse the repository at this point in the history
Issue #6763

Clean up of the code surrounding the column data encode in the ORC writer:
1. Add a 2D version of `hostdevice_vector` (single allocation);
2. Add 2D versions of `host_span` and `device_span`;
3. Add implicit conversions from `hostdevice_vector` to `host_span` and `device_span`.
4. Use the new types to represent collections that currently use flattened `hostdevice_vectors`;
5. Separated a part of `EncChunk` into a separate class, `encoder_chunk_streams`, as this is the only part used after data encode;
6. Add `orc_streams` to represent per-column streams and compute offsets.
7. Partial `writer_impl.cu` code "modernization".
8. Removed redundant size parameters (since 2dspan and 2dvector hold the size info).
9. use `device_uvector` instead of `device_vector`.

Authors:
  - Vukasin Milovanovic (@vuule)

Approvers:
  - Jake Hemstad (@jrhemstad)
  - Kumar Aatish (@kaatish)
  - Ram (Ramakrishna Prabhu) (@rgsl888prabhu)

URL: #7324
  • Loading branch information
vuule authored Mar 4, 2021
1 parent 72438d8 commit d619f77
Show file tree
Hide file tree
Showing 11 changed files with 965 additions and 765 deletions.
87 changes: 87 additions & 0 deletions cpp/include/cudf/utilities/span.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ struct host_span : public cudf::detail::span_base<T, Extent, host_span<T, Extent
constexpr host_span(C const& in) : base(in.data(), in.size())
{
}

template <typename OtherT,
std::size_t OtherExtent,
typename std::enable_if<(Extent == OtherExtent || Extent == dynamic_extent) &&
std::is_convertible<OtherT (*)[], T (*)[]>::value,
void>::type* = nullptr>
constexpr host_span(const host_span<OtherT, OtherExtent>& other) noexcept
: base(other.data(), other.size())
{
}
};

// ===== device_span ===============================================================================
Expand Down Expand Up @@ -174,6 +184,83 @@ struct device_span : public cudf::detail::span_base<T, Extent, device_span<T, Ex
constexpr device_span(C const& in) : base(thrust::raw_pointer_cast(in.data()), in.size())
{
}

template <typename OtherT,
std::size_t OtherExtent,
typename std::enable_if<(Extent == OtherExtent || Extent == dynamic_extent) &&
std::is_convertible<OtherT (*)[], T (*)[]>::value,
void>::type* = nullptr>
constexpr device_span(const device_span<OtherT, OtherExtent>& other) noexcept
: base(other.data(), other.size())
{
}
};

namespace detail {

/**
* @brief Generic class for row-major 2D spans. Not compliant with STL container semantics/syntax.
*
* The index operator returns the corresponding row.
*/
template <typename T, template <typename, std::size_t> typename RowType>
class base_2dspan {
public:
using size_type = std::pair<size_t, size_t>;

constexpr base_2dspan() noexcept = default;
constexpr base_2dspan(T* data, size_t rows, size_t columns) noexcept
: _data{data}, _size{rows, columns}
{
}
base_2dspan(T* data, size_type size) noexcept : _data{data}, _size{size} {}

constexpr auto data() const noexcept { return _data; }
constexpr auto size() const noexcept { return _size; }
constexpr auto count() const noexcept { return size().first * size().second; }
constexpr bool is_empty() const noexcept { return count() == 0; }

static constexpr size_t flatten_index(size_t row, size_t column, size_type size) noexcept
{
return row * size.second + column;
}

constexpr RowType<T, dynamic_extent> operator[](size_t row)
{
return {this->data() + flatten_index(row, 0, this->size()), this->size().second};
}

template <typename OtherT,
template <typename, size_t>
typename OtherRowType,
typename std::enable_if<std::is_convertible<OtherRowType<OtherT, dynamic_extent>,
RowType<T, dynamic_extent>>::value,
void>::type* = nullptr>
constexpr base_2dspan(base_2dspan<OtherT, OtherRowType> const& other) noexcept
: _data{other.data()}, _size{other.size()}
{
}

protected:
T* _data = nullptr;
size_type _size{0, 0};
};

/**
* @brief Alias for the 2D span for host data.
*
* Index operator returns rows as `host_span`.
*/
template <class T>
using host_2dspan = base_2dspan<T, host_span>;

/**
* @brief Alias for the 2D span for device data.
*
* Index operator returns rows as `device_span`.
*/
template <class T>
using device_2dspan = base_2dspan<T, device_span>;

} // namespace detail
} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ __global__ void __launch_bounds__(block_size)
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void InitDictionaryIndices(DictionaryChunk *chunks,
uint32_t num_columns,
Expand All @@ -425,7 +425,7 @@ void InitDictionaryIndices(DictionaryChunk *chunks,
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void BuildStripeDictionaries(StripeDictionary *stripes,
StripeDictionary *stripes_host,
Expand Down
111 changes: 55 additions & 56 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,25 @@ struct RowGroup {
* @brief Struct to describe an encoder data chunk
*/
struct EncChunk {
uint8_t *streams[CI_NUM_STREAMS]; // encoded output
int32_t strm_id[CI_NUM_STREAMS]; // stream id or -1 if not present
uint32_t strm_len[CI_NUM_STREAMS]; // in: max length, out: actual length
const uint32_t *valid_map_base; // base ptr of input valid bit map
size_type column_offset; // index of the first element relative to the base memory
const void *column_data_base; // base ptr of input column data
uint32_t start_row; // start row of this chunk
uint32_t num_rows; // number of rows in this chunk
uint32_t valid_rows; // max number of valid rows
uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind)
uint8_t type_kind; // column data type (orc::TypeKind)
uint8_t dtype_len; // data type length
uint8_t scale; // scale for decimals or timestamps
const uint32_t *valid_map_base; // base ptr of input valid bit map
size_type column_offset; // index of the first element relative to the base memory
const void *column_data_base; // base ptr of input column data
uint32_t start_row; // start row of this chunk
uint32_t num_rows; // number of rows in this chunk
uint32_t valid_rows; // max number of valid rows
uint8_t encoding_kind; // column encoding kind (orc::ColumnEncodingKind)
uint8_t type_kind; // column data type (orc::TypeKind)
uint8_t dtype_len; // data type length
uint8_t scale; // scale for decimals or timestamps
};

/**
* @brief Struct to describe the streams that correspond to a single `EncChunk`.
*/
struct encoder_chunk_streams {
uint8_t *data_ptrs[CI_NUM_STREAMS]; // encoded output
int32_t ids[CI_NUM_STREAMS]; // stream id; -1 if stream is not present
uint32_t lengths[CI_NUM_STREAMS]; // in: max length, out: actual length
};

/**
Expand Down Expand Up @@ -193,7 +199,7 @@ struct StripeDictionary {
* @param[in] compression_block_size maximum size of compressed blocks (up to 16M)
* @param[in] log2maxcr log2 of maximum compression ratio (used to infer max uncompressed size from
*compressed size)
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void ParseCompressedStripeData(CompressedStreamInfo *strm_info,
int32_t num_streams,
Expand All @@ -206,7 +212,7 @@ void ParseCompressedStripeData(CompressedStreamInfo *strm_info,
*
* @param[in] strm_info List of compressed streams
* @param[in] num_streams Number of compressed streams
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void PostDecompressionReassemble(CompressedStreamInfo *strm_info,
int32_t num_streams,
Expand All @@ -221,7 +227,7 @@ void PostDecompressionReassemble(CompressedStreamInfo *strm_info,
* @param[in] num_columns Number of columns
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void ParseRowGroupIndex(RowGroup *row_groups,
CompressedStreamInfo *strm_info,
Expand All @@ -241,7 +247,7 @@ void ParseRowGroupIndex(RowGroup *row_groups,
* @param[in] num_stripes Number of stripes
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void DecodeNullsAndStringDictionaries(ColumnDesc *chunks,
DictionaryEntry *global_dictionary,
Expand All @@ -265,9 +271,9 @@ void DecodeNullsAndStringDictionaries(ColumnDesc *chunks,
* @param[in] row_groups Optional row index data
* @param[in] num_rowgroups Number of row groups in row index data
* @param[in] rowidx_stride Row index stride
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void DecodeOrcColumnData(ColumnDesc *chunks,
void DecodeOrcColumnData(ColumnDesc const *chunks,
DictionaryEntry *global_dictionary,
uint32_t num_columns,
uint32_t num_stripes,
Expand All @@ -282,79 +288,72 @@ void DecodeOrcColumnData(ColumnDesc *chunks,
/**
* @brief Launches kernel for encoding column data
*
* @param[in] chunks EncChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default 0
* @param[in] chunks encoder chunk device array [column][rowgroup]
* @param[in, out] streams chunk streams device array [column][rowgroup]
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void EncodeOrcColumnData(EncChunk *chunks,
uint32_t num_columns,
uint32_t num_rowgroups,
void EncodeOrcColumnData(detail::device_2dspan<EncChunk const> chunks,
detail::device_2dspan<encoder_chunk_streams> streams,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Launches kernel for encoding column dictionaries
*
* @param[in] stripes Stripe dictionaries device array [stripe][string_column]
* @param[in] chunks EncChunk device array [rowgroup][column]
* @param[in] chunks encoder chunk device array [column][rowgroup]
* @param[in] num_string_columns Number of string columns
* @param[in] num_columns Number of columns
* @param[in] num_stripes Number of stripes
* @param[in] stream CUDA stream to use, default 0
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void EncodeStripeDictionaries(StripeDictionary *stripes,
EncChunk *chunks,
detail::device_2dspan<EncChunk const> chunks,
uint32_t num_string_columns,
uint32_t num_columns,
uint32_t num_stripes,
detail::device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Launches kernel for compacting chunked column data prior to compression
*
* @param[in] strm_desc StripeStream device array [stripe][stream]
* @param[in] chunks EncChunk device array [rowgroup][column]
* @param[in] num_stripe_streams Total number of streams
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default 0
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void CompactOrcDataStreams(StripeStream *strm_desc,
EncChunk *chunks,
uint32_t num_stripe_streams,
uint32_t num_columns,
void CompactOrcDataStreams(detail::device_2dspan<StripeStream> strm_desc,
detail::device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Launches kernel(s) for compressing data streams
*
* @param[in] compressed_data Output compressed blocks
* @param[in] strm_desc StripeStream device array [stripe][stream]
* @param[in] chunks EncChunk device array [rowgroup][column]
* @param[in] num_compressed_blocks Total number of compressed blocks
* @param[in] compression Type of compression
* @param[in] comp_blk_size Compression block size
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[out] comp_in Per-block compression input parameters
* @param[out] comp_out Per-block compression status
* @param[in] num_stripe_streams Total number of streams
* @param[in] compression Type of compression
* @param[in] num_compressed_blocks Total number of compressed blocks
* @param[in] stream CUDA stream to use, default 0
*/
void CompressOrcDataStreams(uint8_t *compressed_data,
StripeStream *strm_desc,
EncChunk *chunks,
gpu_inflate_input_s *comp_in,
gpu_inflate_status_s *comp_out,
uint32_t num_stripe_streams,
uint32_t num_compressed_blocks,
CompressionKind compression,
uint32_t comp_blk_size,
detail::device_2dspan<StripeStream> strm_desc,
detail::device_2dspan<encoder_chunk_streams> enc_streams,
gpu_inflate_input_s *comp_in,
gpu_inflate_status_s *comp_out,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @brief Launches kernel for initializing dictionary chunks
*
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in,out] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void InitDictionaryIndices(DictionaryChunk *chunks,
uint32_t num_columns,
Expand All @@ -370,7 +369,7 @@ void InitDictionaryIndices(DictionaryChunk *chunks,
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void BuildStripeDictionaries(StripeDictionary *stripes_dev,
StripeDictionary *stripes_host,
Expand All @@ -388,7 +387,7 @@ void BuildStripeDictionaries(StripeDictionary *stripes_dev,
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of rowgroups
* @param[in] row_index_stride Rowgroup size in rows
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void orc_init_statistics_groups(statistics_group *groups,
const stats_column_desc *cols,
Expand All @@ -403,7 +402,7 @@ void orc_init_statistics_groups(statistics_group *groups,
* @param[in,out] groups Statistics merge groups
* @param[in] chunks Statistics chunks
* @param[in] statistics_count Number of statistics buffers to encode
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void orc_init_statistics_buffersize(statistics_merge_group *groups,
const statistics_chunk *chunks,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/stats_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ __global__ void __launch_bounds__(encode_threads_per_block)
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of rowgroups
* @param[in] row_index_stride Rowgroup size in rows
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void orc_init_statistics_groups(statistics_group *groups,
const stats_column_desc *cols,
Expand All @@ -392,7 +392,7 @@ void orc_init_statistics_groups(statistics_group *groups,
* @param[in,out] groups Statistics merge groups
* @param[in] chunks Statistics chunks
* @param[in] statistics_count Number of statistics buffers to encode
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void orc_init_statistics_buffersize(statistics_merge_group *groups,
const statistics_chunk *chunks,
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ static const __device__ __constant__ uint32_t kTimestampNanoScale[8] = {
// blockDim {block_size,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size)
gpuDecodeOrcColumnData(ColumnDesc *chunks,
gpuDecodeOrcColumnData(ColumnDesc const *chunks,
DictionaryEntry *global_dictionary,
timezone_table_view tz_table,
const RowGroup *row_groups,
Expand Down Expand Up @@ -1742,7 +1742,7 @@ __global__ void __launch_bounds__(block_size)
* @param[in] num_stripes Number of stripes
* @param[in] max_rows Maximum number of rows to load
* @param[in] first_row Crop all rows below first_row
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void __host__ DecodeNullsAndStringDictionaries(ColumnDesc *chunks,
DictionaryEntry *global_dictionary,
Expand Down Expand Up @@ -1771,9 +1771,9 @@ void __host__ DecodeNullsAndStringDictionaries(ColumnDesc *chunks,
* @param[in] row_groups Optional row index data
* @param[in] num_rowgroups Number of row groups in row index data
* @param[in] rowidx_stride Row index stride
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
*/
void __host__ DecodeOrcColumnData(ColumnDesc *chunks,
void __host__ DecodeOrcColumnData(ColumnDesc const *chunks,
DictionaryEntry *global_dictionary,
uint32_t num_columns,
uint32_t num_stripes,
Expand Down
Loading

0 comments on commit d619f77

Please sign in to comment.