diff --git a/cpp/include/cudf/io/detail/avro.hpp b/cpp/include/cudf/io/detail/avro.hpp index 306c15dcb72..62d97081b75 100644 --- a/cpp/include/cudf/io/detail/avro.hpp +++ b/cpp/include/cudf/io/detail/avro.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,6 @@ * limitations under the License. */ -/** - * @file avro.hpp - * @brief cuDF-IO reader classes API - */ - #pragma once #include @@ -29,44 +24,23 @@ namespace cudf { namespace io { namespace detail { namespace avro { + /** - * @brief Class to read Avro dataset data into columns. + * @brief Reads the entire dataset. + * + * @param source Input `datasource` object to read the dataset from + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + * + * @return The set of columns along with table metadata */ -class reader { - private: - class impl; - std::unique_ptr _impl; - - public: - /** - * @brief Constructor from an array of datasources - * - * @param sources Input `datasource` objects to read the dataset from - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector>&& sources, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Destructor explicitly-declared to avoid inlined in header - */ - ~reader(); +table_with_metadata read_avro( + std::unique_ptr&& source, + avro_reader_options const& options, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - /** - * @brief Reads the entire dataset. - * - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return The set of columns along with table metadata - */ - table_with_metadata read(avro_reader_options const& options, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); -}; } // namespace avro } // namespace detail } // namespace io diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 6fabcf00b8f..cb1c32458a3 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -65,14 +65,15 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur, * * @return data pointer at the end of the row (start of next row) */ -static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema, - schemadesc_s* schema_g, - uint32_t schema_len, - size_t row, - size_t max_rows, - const uint8_t* cur, - const uint8_t* end, - device_span global_dictionary) +static uint8_t const* __device__ +avro_decode_row(schemadesc_s const* schema, + schemadesc_s* schema_g, + uint32_t schema_len, + size_t row, + size_t max_rows, + uint8_t const* cur, + uint8_t const* end, + device_span global_dictionary) { uint32_t array_start = 0, array_repeat_count = 0; int array_children = 0; @@ -220,7 +221,6 @@ static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema, * @param[in] schema Schema description * @param[in] global_Dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] min_row_size Minimum size in bytes of a row * @param[in] max_rows Maximum number of rows to load @@ -228,11 +228,10 @@ static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema, */ // blockDim {32,num_warps,1} extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) - gpuDecodeAvroColumnData(block_desc_s* blocks, + gpuDecodeAvroColumnData(device_span blocks, schemadesc_s* schema_g, - device_span global_dictionary, - const uint8_t* avro_data, - uint32_t num_blocks, + device_span global_dictionary, + uint8_t const* avro_data, uint32_t schema_len, uint32_t min_row_size, size_t max_rows, @@ -258,9 +257,9 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) } else { schema = schema_g; } - if (block_id < num_blocks and threadIdx.x == 0) { *blk = blocks[block_id]; } + if (block_id < blocks.size() and threadIdx.x == 0) { *blk = blocks[block_id]; } __syncthreads(); - if (block_id >= num_blocks) { return; } + if (block_id >= blocks.size()) { return; } cur_row = blk->first_row; rows_remaining = blk->num_rows; cur = avro_data + blk->offset; @@ -304,18 +303,16 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) * @param[in] schema Schema description * @param[in] global_dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row * @param[in] min_row_size Minimum size in bytes of a row * @param[in] stream CUDA stream to use, default 0 */ -void DecodeAvroColumnData(block_desc_s* blocks, +void DecodeAvroColumnData(device_span blocks, schemadesc_s* schema, - device_span global_dictionary, - const uint8_t* avro_data, - uint32_t num_blocks, + device_span global_dictionary, + uint8_t const* avro_data, uint32_t schema_len, size_t max_rows, size_t first_row, @@ -325,17 +322,10 @@ void DecodeAvroColumnData(block_desc_s* blocks, // num_warps warps per threadblock dim3 const dim_block(32, num_warps); // 1 warp per datablock, num_warps datablocks per threadblock - dim3 const dim_grid((num_blocks + num_warps - 1) / num_warps, 1); + dim3 const dim_grid((blocks.size() + num_warps - 1) / num_warps, 1); - gpuDecodeAvroColumnData<<>>(blocks, - schema, - global_dictionary, - avro_data, - num_blocks, - schema_len, - min_row_size, - max_rows, - first_row); + gpuDecodeAvroColumnData<<>>( + blocks, schema, global_dictionary, avro_data, schema_len, min_row_size, max_rows, first_row); } } // namespace gpu diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index a895d1bea02..c87ac8afb13 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -43,18 +43,16 @@ struct schemadesc_s { * @param[in] schema Schema description * @param[in] global_dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row * @param[in] min_row_size Minimum size in bytes of a row * @param[in] stream CUDA stream to use, default 0 */ -void DecodeAvroColumnData(block_desc_s* blocks, +void DecodeAvroColumnData(cudf::device_span blocks, schemadesc_s* schema, - cudf::device_span global_dictionary, - const uint8_t* avro_data, - uint32_t num_blocks, + cudf::device_span global_dictionary, + uint8_t const* avro_data, uint32_t schema_len, size_t max_rows = ~0, size_t first_row = 0, diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 08ea96139a1..aa3bab2d877 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -14,30 +14,38 @@ * limitations under the License. */ -/** - * @file reader_impl.cu - * @brief cuDF-IO Avro reader class implementation - */ - -#include "reader_impl.hpp" +#include "avro.h" +#include "avro_gpu.h" #include +#include +#include #include +#include +#include +#include #include #include +#include #include #include #include #include +#include +#include +#include +#include + using cudf::device_span; namespace cudf { namespace io { namespace detail { namespace avro { + // Import functionality that's independent of legacy code using namespace cudf::io::avro; using namespace cudf::io; @@ -46,7 +54,7 @@ namespace { /** * @brief Function that translates Avro data kind to cuDF type enum */ -type_id to_type_id(const avro::schema_entry* col) +type_id to_type_id(avro::schema_entry const* col) { switch (col->kind) { case avro::type_boolean: return type_id::BOOL8; @@ -79,7 +87,7 @@ class metadata : public file_metadata { */ void init_and_select_rows(int& row_start, int& row_count) { - const auto buffer = source->host_read(0, source->size()); + auto const buffer = source->host_read(0, source->size()); avro::container pod(buffer->data(), buffer->size()); CUDF_EXPECTS(pod.parse(this, row_count, row_start), "Cannot parse metadata"); row_start = skip_rows; @@ -97,10 +105,10 @@ class metadata : public file_metadata { { std::vector> selection; - const auto num_avro_columns = static_cast(columns.size()); + auto const num_avro_columns = static_cast(columns.size()); if (!use_names.empty()) { int index = 0; - for (const auto& use_name : use_names) { + for (auto const& use_name : use_names) { for (int i = 0; i < num_avro_columns; ++i, ++index) { if (index >= num_avro_columns) { index = 0; } if (columns[index].name == use_name && @@ -138,25 +146,28 @@ class metadata : public file_metadata { datasource* const source; }; -rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_block_data, - rmm::cuda_stream_view stream) +rmm::device_buffer decompress_data(datasource& source, + metadata& meta, + rmm::device_buffer const& comp_block_data, + rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; - hostdevice_vector inflate_in(_metadata->block_list.size()); - hostdevice_vector inflate_out(_metadata->block_list.size()); - if (_metadata->codec == "deflate") { + auto inflate_in = hostdevice_vector(meta.block_list.size()); + auto inflate_out = hostdevice_vector(meta.block_list.size()); + + if (meta.codec == "deflate") { // Guess an initial maximum uncompressed block size - uint32_t initial_blk_len = (_metadata->max_block_size * 2 + 0xfff) & ~0xfff; - uncompressed_data_size = initial_blk_len * _metadata->block_list.size(); + uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff; + uncompressed_data_size = initial_blk_len * meta.block_list.size(); for (size_t i = 0; i < inflate_in.size(); ++i) { inflate_in[i].dstSize = initial_blk_len; } - } else if (_metadata->codec == "snappy") { + } else if (meta.codec == "snappy") { // Extract the uncompressed length from the snappy stream - for (size_t i = 0; i < _metadata->block_list.size(); i++) { - const auto buffer = _source->host_read(_metadata->block_list[i].offset, 4); - const uint8_t* blk = buffer->data(); + for (size_t i = 0; i < meta.block_list.size(); i++) { + auto const buffer = source.host_read(meta.block_list[i].offset, 4); + uint8_t const* blk = buffer->data(); uint32_t blk_len = blk[0]; if (blk_len > 0x7f) { blk_len = (blk_len & 0x7f) | (blk[1] << 7); @@ -174,28 +185,28 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_ rmm::device_buffer decomp_block_data(uncompressed_data_size, stream); - const auto base_offset = _metadata->block_list[0].offset; - for (size_t i = 0, dst_pos = 0; i < _metadata->block_list.size(); i++) { - const auto src_pos = _metadata->block_list[i].offset - base_offset; + auto const base_offset = meta.block_list[0].offset; + for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { + auto const src_pos = meta.block_list[i].offset - base_offset; - inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; - inflate_in[i].srcSize = _metadata->block_list[i].size; + inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; + inflate_in[i].srcSize = meta.block_list[i].size; inflate_in[i].dstDevice = static_cast(decomp_block_data.data()) + dst_pos; // Update blocks offsets & sizes to refer to uncompressed data - _metadata->block_list[i].offset = dst_pos; - _metadata->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += _metadata->block_list[i].size; + meta.block_list[i].offset = dst_pos; + meta.block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta.block_list[i].size; } for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { inflate_in.host_to_device(stream); CUDA_TRY( cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value())); - if (_metadata->codec == "deflate") { + if (meta.codec == "deflate") { CUDA_TRY(gpuinflate( inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream)); - } else if (_metadata->codec == "snappy") { + } else if (meta.codec == "snappy") { CUDA_TRY( gpu_unsnap(inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), stream)); } else { @@ -204,9 +215,9 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_ inflate_out.device_to_host(stream, true); // Check if larger output is required, as it's not known ahead of time - if (_metadata->codec == "deflate" && !loop_cnt) { + if (meta.codec == "deflate" && !loop_cnt) { size_t actual_uncompressed_size = 0; - for (size_t i = 0; i < _metadata->block_list.size(); i++) { + for (size_t i = 0; i < meta.block_list.size(); i++) { // If error status is 1 (buffer too small), the `bytes_written` field // is actually contains the uncompressed data size if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) { @@ -216,13 +227,13 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_ } if (actual_uncompressed_size > uncompressed_data_size) { decomp_block_data.resize(actual_uncompressed_size, stream); - for (size_t i = 0, dst_pos = 0; i < _metadata->block_list.size(); i++) { + for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { auto dst_base = static_cast(decomp_block_data.data()); inflate_in[i].dstDevice = dst_base + dst_pos; - _metadata->block_list[i].offset = dst_pos; - _metadata->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += _metadata->block_list[i].size; + meta.block_list[i].offset = dst_pos; + meta.block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta.block_list[i].size; } } else { break; @@ -235,28 +246,40 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_ return decomp_block_data; } -void reader::impl::decode_data(const rmm::device_buffer& block_data, - const std::vector>& dict, - device_span global_dictionary, - size_t num_rows, - std::vector> selection, - std::vector& out_buffers, - rmm::cuda_stream_view stream) +std::vector decode_data(metadata& meta, + rmm::device_buffer const& block_data, + std::vector> const& dict, + device_span global_dictionary, + size_t num_rows, + std::vector> const& selection, + std::vector const& column_types, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { + auto out_buffers = std::vector(); + + for (size_t i = 0; i < column_types.size(); ++i) { + auto col_idx = selection[i].first; + bool is_nullable = (meta.columns[col_idx].schema_null_idx >= 0); + out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); + } + // Build gpu schema - hostdevice_vector schema_desc(_metadata->schema.size()); + auto schema_desc = hostdevice_vector(meta.schema.size()); + uint32_t min_row_data_size = 0; int skip_field_cnt = 0; - for (size_t i = 0; i < _metadata->schema.size(); i++) { - type_kind_e kind = _metadata->schema[i].kind; + + for (size_t i = 0; i < meta.schema.size(); i++) { + type_kind_e kind = meta.schema[i].kind; if (skip_field_cnt != 0) { // Exclude union and array members from min_row_data_size - skip_field_cnt += _metadata->schema[i].num_children - 1; + skip_field_cnt += meta.schema[i].num_children - 1; } else { switch (kind) { case type_union: case type_array: - skip_field_cnt = _metadata->schema[i].num_children; + skip_field_cnt = meta.schema[i].num_children; // fall through case type_boolean: case type_int: @@ -269,21 +292,21 @@ void reader::impl::decode_data(const rmm::device_buffer& block_data, default: break; } } - if (kind == type_enum && !_metadata->schema[i].symbols.size()) { kind = type_int; } - schema_desc[i].kind = kind; - schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)_metadata->schema[i].num_children; + if (kind == type_enum && !meta.schema[i].symbols.size()) { kind = type_int; } + schema_desc[i].kind = kind; + schema_desc[i].count = + (kind == type_enum) ? 0 : static_cast(meta.schema[i].num_children); schema_desc[i].dataptr = nullptr; - CUDF_EXPECTS( - kind != type_union || _metadata->schema[i].num_children < 2 || - (_metadata->schema[i].num_children == 2 && (_metadata->schema[i + 1].kind == type_null || - _metadata->schema[i + 2].kind == type_null)), - "Union with non-null type not currently supported"); + CUDF_EXPECTS(kind != type_union || meta.schema[i].num_children < 2 || + (meta.schema[i].num_children == 2 && + (meta.schema[i + 1].kind == type_null || meta.schema[i + 2].kind == type_null)), + "Union with non-null type not currently supported"); } std::vector valid_alias(out_buffers.size(), nullptr); for (size_t i = 0; i < out_buffers.size(); i++) { - const auto col_idx = selection[i].first; - int schema_data_idx = _metadata->columns[col_idx].schema_data_idx; - int schema_null_idx = _metadata->columns[col_idx].schema_null_idx; + auto const col_idx = selection[i].first; + int schema_data_idx = meta.columns[col_idx].schema_data_idx; + int schema_null_idx = meta.columns[col_idx].schema_null_idx; schema_desc[schema_data_idx].dataptr = out_buffers[i].data(); if (schema_null_idx >= 0) { @@ -293,25 +316,25 @@ void reader::impl::decode_data(const rmm::device_buffer& block_data, valid_alias[i] = schema_desc[schema_null_idx].dataptr; } } - if (_metadata->schema[schema_data_idx].kind == type_enum) { + if (meta.schema[schema_data_idx].kind == type_enum) { schema_desc[schema_data_idx].count = dict[i].first; } if (out_buffers[i].null_mask_size()) { cudf::detail::set_null_mask(out_buffers[i].null_mask(), 0, num_rows, true, stream); } } - rmm::device_buffer block_list( - _metadata->block_list.data(), _metadata->block_list.size() * sizeof(block_desc_s), stream); + + auto block_list = cudf::detail::make_device_uvector_async(meta.block_list, stream); + schema_desc.host_to_device(stream); - gpu::DecodeAvroColumnData(static_cast(block_list.data()), + gpu::DecodeAvroColumnData(block_list, schema_desc.device_ptr(), global_dictionary, - static_cast(block_data.data()), - static_cast(_metadata->block_list.size()), + static_cast(block_data.data()), static_cast(schema_desc.size()), - _metadata->num_rows, - _metadata->skip_rows, + meta.num_rows, + meta.skip_rows, min_row_data_size, stream); @@ -328,23 +351,18 @@ void reader::impl::decode_data(const rmm::device_buffer& block_data, schema_desc.device_to_host(stream, true); for (size_t i = 0; i < out_buffers.size(); i++) { - const auto col_idx = selection[i].first; - const auto schema_null_idx = _metadata->columns[col_idx].schema_null_idx; + auto const col_idx = selection[i].first; + auto const schema_null_idx = meta.columns[col_idx].schema_null_idx; out_buffers[i].null_count() = (schema_null_idx >= 0) ? schema_desc[schema_null_idx].count : 0; } -} -reader::impl::impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr) - : _mr(mr), _source(std::move(source)), _columns(options.get_columns()) -{ - // Open the source Avro dataset metadata - _metadata = std::make_unique(_source.get()); + return out_buffers; } -table_with_metadata reader::impl::read(avro_reader_options const& options, - rmm::cuda_stream_view stream) +table_with_metadata read_avro(std::unique_ptr&& source, + avro_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto skip_rows = options.get_skip_rows(); auto num_rows = options.get_num_rows(); @@ -352,70 +370,76 @@ table_with_metadata reader::impl::read(avro_reader_options const& options, std::vector> out_columns; table_metadata metadata_out; + // Open the source Avro dataset metadata + auto meta = metadata(source.get()); + // Select and read partial metadata / schema within the subset of rows - _metadata->init_and_select_rows(skip_rows, num_rows); + meta.init_and_select_rows(skip_rows, num_rows); // Select only columns required by the options - auto selected_columns = _metadata->select_columns(_columns); + auto selected_columns = meta.select_columns(options.get_columns()); if (selected_columns.size() != 0) { // Get a list of column data types std::vector column_types; - for (const auto& col : selected_columns) { - auto& col_schema = _metadata->schema[_metadata->columns[col.first].schema_data_idx]; + for (auto const& col : selected_columns) { + auto& col_schema = meta.schema[meta.columns[col.first].schema_data_idx]; auto col_type = to_type_id(&col_schema); CUDF_EXPECTS(col_type != type_id::EMPTY, "Unknown type"); column_types.emplace_back(col_type); } - if (_metadata->total_data_size > 0) { + if (meta.total_data_size > 0) { rmm::device_buffer block_data; - if (_source->is_device_read_preferred(_metadata->total_data_size)) { - block_data = rmm::device_buffer{_metadata->total_data_size, stream}; - auto read_bytes = _source->device_read(_metadata->block_list[0].offset, - _metadata->total_data_size, - static_cast(block_data.data()), - stream); + if (source->is_device_read_preferred(meta.total_data_size)) { + block_data = rmm::device_buffer{meta.total_data_size, stream}; + auto read_bytes = source->device_read(meta.block_list[0].offset, + meta.total_data_size, + static_cast(block_data.data()), + stream); block_data.resize(read_bytes, stream); } else { - const auto buffer = - _source->host_read(_metadata->block_list[0].offset, _metadata->total_data_size); - block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; + auto const buffer = source->host_read(meta.block_list[0].offset, meta.total_data_size); + block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; } - if (_metadata->codec != "" && _metadata->codec != "null") { - auto decomp_block_data = decompress_data(block_data, stream); + if (meta.codec != "" && meta.codec != "null") { + auto decomp_block_data = decompress_data(*source, meta, block_data, stream); block_data = std::move(decomp_block_data); } else { - auto dst_ofs = _metadata->block_list[0].offset; - for (size_t i = 0; i < _metadata->block_list.size(); i++) { - _metadata->block_list[i].offset -= dst_ofs; + auto dst_ofs = meta.block_list[0].offset; + for (size_t i = 0; i < meta.block_list.size(); i++) { + meta.block_list[i].offset -= dst_ofs; } } size_t total_dictionary_entries = 0; size_t dictionary_data_size = 0; - std::vector> dict(column_types.size()); + + auto dict = std::vector>(column_types.size()); + for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; - auto& col_schema = _metadata->schema[_metadata->columns[col_idx].schema_data_idx]; + auto& col_schema = meta.schema[meta.columns[col_idx].schema_data_idx]; dict[i].first = static_cast(total_dictionary_entries); dict[i].second = static_cast(col_schema.symbols.size()); total_dictionary_entries += dict[i].second; - for (const auto& sym : col_schema.symbols) { + for (auto const& sym : col_schema.symbols) { dictionary_data_size += sym.length(); } } - rmm::device_uvector d_global_dict(total_dictionary_entries, stream); - rmm::device_uvector d_global_dict_data(dictionary_data_size, stream); + auto d_global_dict = rmm::device_uvector(0, stream); + auto d_global_dict_data = rmm::device_uvector(0, stream); + if (total_dictionary_entries > 0) { - std::vector h_global_dict(total_dictionary_entries); - std::vector h_global_dict_data(dictionary_data_size); - size_t dict_pos = 0; + auto h_global_dict = std::vector(total_dictionary_entries); + auto h_global_dict_data = std::vector(dictionary_data_size); + size_t dict_pos = 0; + for (size_t i = 0; i < column_types.size(); ++i) { - auto const col_idx = selected_columns[i].first; - auto const& col_schema = _metadata->schema[_metadata->columns[col_idx].schema_data_idx]; + auto const col_idx = selected_columns[i].first; + auto const& col_schema = meta.schema[meta.columns[col_idx].schema_data_idx]; auto const col_dict_entries = &(h_global_dict[dict[i].first]); for (size_t j = 0; j < dict[i].second; j++) { auto const& symbols = col_schema.symbols[j]; @@ -430,30 +454,24 @@ table_with_metadata reader::impl::read(avro_reader_options const& options, } } - CUDA_TRY(cudaMemcpyAsync(d_global_dict.data(), - h_global_dict.data(), - h_global_dict.size() * sizeof(string_index_pair), - cudaMemcpyDefault, - stream.value())); - CUDA_TRY(cudaMemcpyAsync(d_global_dict_data.data(), - h_global_dict_data.data(), - h_global_dict_data.size() * sizeof(char), - cudaMemcpyDefault, - stream.value())); - stream.synchronize(); - } + d_global_dict = cudf::detail::make_device_uvector_async(h_global_dict, stream); + d_global_dict_data = cudf::detail::make_device_uvector_async(h_global_dict_data, stream); - std::vector out_buffers; - for (size_t i = 0; i < column_types.size(); ++i) { - auto col_idx = selected_columns[i].first; - bool is_nullable = (_metadata->columns[col_idx].schema_null_idx >= 0); - out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, _mr); + stream.synchronize(); } - decode_data(block_data, dict, d_global_dict, num_rows, selected_columns, out_buffers, stream); + auto out_buffers = decode_data(meta, + block_data, + dict, + d_global_dict, + num_rows, + selected_columns, + column_types, + stream, + mr); for (size_t i = 0; i < column_types.size(); ++i) { - out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, _mr)); + out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); } } else { // Create empty columns @@ -469,29 +487,11 @@ table_with_metadata reader::impl::read(avro_reader_options const& options, metadata_out.column_names[i] = selected_columns[i].second; } // Return user metadata - metadata_out.user_data = _metadata->user_data; + metadata_out.user_data = meta.user_data; return {std::make_unique(std::move(out_columns)), std::move(metadata_out)}; } -// Forward to implementation -reader::reader(std::vector>&& sources, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(std::move(sources[0]), options, mr); -} - -// Destructor within this translation unit -reader::~reader() = default; - -// Forward to implementation -table_with_metadata reader::read(avro_reader_options const& options, rmm::cuda_stream_view stream) -{ - return _impl->read(options, stream); -} } // namespace avro } // namespace detail } // namespace io diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp deleted file mode 100644 index 9af32ed88a0..00000000000 --- a/cpp/src/io/avro/reader_impl.hpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @file reader_impl.hpp - * @brief cuDF-IO Avro reader class implementation header - */ - -#pragma once - -#include "avro.h" -#include "avro_gpu.h" - -#include -#include -#include - -#include -#include - -#include - -#include -#include -#include -#include - -namespace cudf { -namespace io { -namespace detail { -namespace avro { -using namespace cudf::io::avro; -using namespace cudf::io; - -// Forward declarations -class metadata; - -/** - * @brief Implementation for Avro reader - */ -class reader::impl { - public: - /** - * @brief Constructor from a dataset source with reader options. - * - * @param source Dataset source - * @param options Settings for controlling reading behavior - * @param mr Device memory resource to use for device memory allocation - */ - explicit impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Read an entire set or a subset of data and returns a set of columns - * - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return The set of columns along with metadata - */ - table_with_metadata read(avro_reader_options const& options, rmm::cuda_stream_view stream); - - private: - /** - * @brief Decompresses the block data. - * - * @param comp_block_data Compressed block data - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return Device buffer to decompressed block data - */ - rmm::device_buffer decompress_data(const rmm::device_buffer& comp_block_data, - rmm::cuda_stream_view stream); - - /** - * @brief Convert the avro row-based block data and outputs to columns - * - * @param block_data Uncompressed block data - * @param dict Dictionary entries - * @param global_dictionary Dictionary allocation - * @param out_buffers Output columns' device buffers - * @param stream CUDA stream used for device memory operations and kernel launches. - */ - void decode_data(const rmm::device_buffer& block_data, - const std::vector>& dict, - cudf::device_span global_dictionary, - size_t num_rows, - std::vector> columns, - std::vector& out_buffers, - rmm::cuda_stream_view stream); - - private: - rmm::mr::device_memory_resource* _mr = nullptr; - std::unique_ptr _source; - std::unique_ptr _metadata; - - std::vector _columns; -}; - -} // namespace avro -} // namespace detail -} // namespace io -} // namespace cudf diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 438cb1762c6..511a1a22ee7 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -146,10 +146,10 @@ table_with_metadata read_avro(avro_reader_options const& options, CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = - std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); - return reader->read(options); + CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported."); + + return avro::read_avro(std::move(datasources[0]), options, rmm::cuda_stream_default, mr); } compression_type infer_compression_type(compression_type compression, source_info const& info)