From 9c72e56837ddfb3fb9b3d1111cdd08e1f53595c4 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 14 Aug 2021 05:08:16 -0500 Subject: [PATCH 01/21] simplify io/functions.cpp data source/sink factories --- cpp/src/io/functions.cpp | 126 +++++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 59 deletions(-) diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index bf51012211c..e080ea3a2ca 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -106,67 +106,56 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( } namespace { -template -std::unique_ptr make_reader(source_info const& src_info, - reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - if (src_info.type == io_type::FILEPATH) { - return std::make_unique(src_info.filepaths, options, stream, mr); - } - std::vector> datasources; - if (src_info.type == io_type::HOST_BUFFER) { - datasources = cudf::io::datasource::create(src_info.buffers); - } else if (src_info.type == io_type::USER_IMPLEMENTED) { - datasources = cudf::io::datasource::create(src_info.user_sources); - } else { - CUDF_FAIL("Unsupported source type"); +std::vector> make_datasources(source_info const& info) +{ + switch (info.type) { + case io_type::FILEPATH: return cudf::io::datasource::create(info.filepaths); + case io_type::HOST_BUFFER: return cudf::io::datasource::create(info.buffers); + case io_type::USER_IMPLEMENTED: return cudf::io::datasource::create(info.user_sources); + default: CUDF_FAIL("Unsupported source type"); } - - return std::make_unique(std::move(datasources), options, stream, mr); } -template -std::unique_ptr make_writer(sink_info const& sink, Ts&&... args) +std::unique_ptr make_datasink(sink_info const& info) { - if (sink.type == io_type::FILEPATH) { - return std::make_unique(cudf::io::data_sink::create(sink.filepath), - std::forward(args)...); - } - if (sink.type == io_type::HOST_BUFFER) { - return std::make_unique(cudf::io::data_sink::create(sink.buffer), - std::forward(args)...); + switch (info.type) { + case io_type::FILEPATH: return cudf::io::data_sink::create(info.filepath); + case io_type::HOST_BUFFER: return cudf::io::data_sink::create(info.buffer); + case io_type::VOID: return cudf::io::data_sink::create(); + case io_type::USER_IMPLEMENTED: return cudf::io::data_sink::create(info.user_sink); + default: CUDF_FAIL("Unsupported sink type"); } - if (sink.type == io_type::VOID) { - return std::make_unique(cudf::io::data_sink::create(), std::forward(args)...); - } - if (sink.type == io_type::USER_IMPLEMENTED) { - return std::make_unique(cudf::io::data_sink::create(sink.user_sink), - std::forward(args)...); - } - CUDF_FAIL("Unsupported sink type"); } } // namespace -table_with_metadata read_avro(avro_reader_options const& opts, rmm::mr::device_memory_resource* mr) +table_with_metadata read_avro(avro_reader_options const& options, + rmm::mr::device_memory_resource* mr) { namespace avro = cudf::io::detail::avro; CUDF_FUNC_RANGE(); - auto reader = make_reader(opts.get_source(), opts, rmm::cuda_stream_default, mr); - return reader->read(opts); + + auto datasources = make_datasources(options.get_source()); + auto reader = + std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); + + return reader->read(options); } -table_with_metadata read_json(json_reader_options const& opts, rmm::mr::device_memory_resource* mr) +table_with_metadata read_json(json_reader_options const& options, + rmm::mr::device_memory_resource* mr) { namespace json = cudf::io::detail::json; CUDF_FUNC_RANGE(); - auto reader = make_reader(opts.get_source(), opts, rmm::cuda_stream_default, mr); - return reader->read(opts); + + auto datasources = make_datasources(options.get_source()); + auto reader = + std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); + + return reader->read(options); } table_with_metadata read_csv(csv_reader_options const& options, rmm::mr::device_memory_resource* mr) @@ -174,8 +163,10 @@ table_with_metadata read_csv(csv_reader_options const& options, rmm::mr::device_ namespace csv = cudf::io::detail::csv; CUDF_FUNC_RANGE(); + + auto datasources = make_datasources(options.get_source()); auto reader = - make_reader(options.get_source(), options, rmm::cuda_stream_default, mr); + std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); return reader->read(); } @@ -185,7 +176,9 @@ void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resourc { using namespace cudf::io::detail; - auto writer = make_writer(options.get_sink(), options, rmm::cuda_stream_default, mr); + auto sink = make_datasink(options.get_sink()); + auto writer = + std::make_unique(std::move(sink), options, rmm::cuda_stream_default, mr); writer->write(options.get_table(), options.get_metadata()); } @@ -294,8 +287,10 @@ parsed_orc_statistics read_parsed_orc_statistics(source_info const& src_info) table_with_metadata read_orc(orc_reader_options const& options, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - auto reader = - make_reader(options.get_source(), options, rmm::cuda_stream_default, mr); + + auto datasources = make_datasources(options.get_source()); + auto reader = std::make_unique( + std::move(datasources), options, rmm::cuda_stream_default, mr); return reader->read(options); } @@ -305,11 +300,13 @@ table_with_metadata read_orc(orc_reader_options const& options, rmm::mr::device_ */ void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resource* mr) { + namespace io_detail = cudf::io::detail; + CUDF_FUNC_RANGE(); - namespace io_detail = cudf::io::detail; - auto writer = make_writer( - options.get_sink(), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); + auto sink = make_datasink(options.get_sink()); + auto writer = std::make_unique( + std::move(sink), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); writer->write(options.get_table()); } @@ -317,12 +314,15 @@ void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resourc /** * @copydoc cudf::io::orc_chunked_writer::orc_chunked_writer */ -orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& op, +orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options, rmm::mr::device_memory_resource* mr) { namespace io_detail = cudf::io::detail; - writer = make_writer( - op.get_sink(), op, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); + + auto sink = make_datasink(options.get_sink()); + + writer = std::make_unique( + std::move(sink), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); } /** @@ -354,8 +354,10 @@ table_with_metadata read_parquet(parquet_reader_options const& options, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - auto reader = make_reader( - options.get_source(), options, rmm::cuda_stream_default, mr); + + auto datasources = make_datasources(options.get_source()); + auto reader = std::make_unique( + std::move(datasources), options, rmm::cuda_stream_default, mr); return reader->read(options); } @@ -392,25 +394,31 @@ table_input_metadata::table_input_metadata(table_view const& table, std::unique_ptr> write_parquet(parquet_writer_options const& options, rmm::mr::device_memory_resource* mr) { - CUDF_FUNC_RANGE(); namespace io_detail = cudf::io::detail; - auto writer = make_writer( - options.get_sink(), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); + CUDF_FUNC_RANGE(); + + auto sink = make_datasink(options.get_sink()); + auto writer = std::make_unique( + std::move(sink), options, io_detail::SingleWriteMode::YES, rmm::cuda_stream_default, mr); writer->write(options.get_table()); + return writer->close(options.get_column_chunks_file_path()); } /** * @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer */ -parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& op, +parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options, rmm::mr::device_memory_resource* mr) { namespace io_detail = cudf::io::detail; - writer = make_writer( - op.get_sink(), op, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); + + auto sink = make_datasink(options.get_sink()); + + writer = std::make_unique( + std::move(sink), options, io_detail::SingleWriteMode::NO, rmm::cuda_stream_default, mr); } /** From 88e23990151c737dcb4a22a5d6454ef8893285c4 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 17 Aug 2021 00:53:48 -0500 Subject: [PATCH 02/21] remove filepath-related logic from csv and json readers --- cpp/include/cudf/io/csv.hpp | 2 +- cpp/include/cudf/io/json.hpp | 2 +- cpp/src/io/comp/io_uncomp.h | 7 +++-- cpp/src/io/comp/uncomp.cpp | 19 ++++++------ cpp/src/io/csv/reader_impl.cu | 38 ++++++------------------ cpp/src/io/csv/reader_impl.hpp | 4 --- cpp/src/io/functions.cpp | 40 ++++++++++++++++++++++++-- cpp/src/io/json/reader_impl.cu | 31 ++------------------ cpp/src/io/json/reader_impl.hpp | 1 - cpp/src/io/utilities/parsing_utils.cu | 34 ---------------------- cpp/src/io/utilities/parsing_utils.cuh | 18 ------------ python/cudf/cudf/_lib/csv.pyx | 2 +- python/cudf/cudf/tests/test_csv.py | 14 --------- 13 files changed, 66 insertions(+), 146 deletions(-) diff --git a/cpp/include/cudf/io/csv.hpp b/cpp/include/cudf/io/csv.hpp index d4a21b2e98c..c807f189aac 100644 --- a/cpp/include/cudf/io/csv.hpp +++ b/cpp/include/cudf/io/csv.hpp @@ -1199,7 +1199,7 @@ class csv_reader_options_builder { * @return The set of columns along with metadata. */ table_with_metadata read_csv( - csv_reader_options const& options, + csv_reader_options options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 8954f7dcab1..bca60f76260 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -364,7 +364,7 @@ class json_reader_options_builder { * @return The set of columns along with metadata. */ table_with_metadata read_json( - json_reader_options const& options, + json_reader_options options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 8daf73ecd0c..7b1feb84813 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -16,12 +16,13 @@ #pragma once +#include +#include + #include #include #include -#include - using cudf::host_span; namespace cudf { @@ -42,7 +43,7 @@ enum { std::vector io_uncompress_single_h2d(void const* src, size_t src_size, int stream_type); -std::vector get_uncompressed_data(host_span data, std::string const& compression); +std::vector get_uncompressed_data(host_span data, compression_type compression); class HostDecompressor { public: diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 2cb99d897fe..ee451d04dbb 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -369,6 +369,7 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int // Unsupported format break; } + CUDF_EXPECTS(comp_data != nullptr, "Unsupported compressed stream type"); CUDF_EXPECTS(comp_len > 0, "Unsupported compressed stream type"); @@ -422,17 +423,17 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int * @return Vector containing the output uncompressed data */ std::vector get_uncompressed_data(host_span const data, - std::string const& compression) + compression_type compression) { int comp_type = IO_UNCOMP_STREAM_TYPE_INFER; - if (compression == "gzip") - comp_type = IO_UNCOMP_STREAM_TYPE_GZIP; - else if (compression == "zip") - comp_type = IO_UNCOMP_STREAM_TYPE_ZIP; - else if (compression == "bz2") - comp_type = IO_UNCOMP_STREAM_TYPE_BZIP2; - else if (compression == "xz") - comp_type = IO_UNCOMP_STREAM_TYPE_XZ; + + switch (compression) { + case compression_type::GZIP: comp_type = IO_UNCOMP_STREAM_TYPE_GZIP; break; + case compression_type::ZIP: comp_type = IO_UNCOMP_STREAM_TYPE_ZIP; break; + case compression_type::BZIP2: comp_type = IO_UNCOMP_STREAM_TYPE_BZIP2; break; + case compression_type::XZ: comp_type = IO_UNCOMP_STREAM_TYPE_XZ; break; + default: break; + } return io_uncompress_single_h2d(data.data(), data.size(), comp_type); } diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 549b0474fe1..a85a610962e 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -206,10 +206,12 @@ reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream) auto num_rows = opts_.get_nrows(); if (range_offset > 0 || range_size > 0) { - CUDF_EXPECTS(compression_type_ == "none", + CUDF_EXPECTS(opts_.get_compression() == compression_type::NONE, "Reading compressed data using `byte range` is unsupported"); } + size_t map_range_size = 0; + if (range_size != 0) { auto num_given_dtypes = std::visit([](const auto& dtypes) { return dtypes.size(); }, opts_.get_dtypes()); @@ -217,12 +219,7 @@ reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream) map_range_size = range_size + calculateMaxRowSize(num_columns); } - // Support delayed opening of the file if using memory mapping datasource - // This allows only mapping of a subset of the file if using byte range - if (source_ == nullptr) { - assert(!filepath_.empty()); - source_ = datasource::create(filepath_, range_offset, map_range_size); - } + // TODO: provide hint to datasource that we should memory map any underlying file. // Transfer source data to GPU if (!source_->is_empty()) { @@ -235,10 +232,11 @@ reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream) std::vector h_uncomp_data_owner; - if (compression_type_ != "none") { - h_uncomp_data_owner = get_uncompressed_data(h_data, compression_type_); + if (opts_.get_compression() != compression_type::NONE) { + h_uncomp_data_owner = get_uncompressed_data(h_data, opts_.get_compression()); h_data = h_uncomp_data_owner; } + // None of the parameters for row selection is used, we are parsing the entire file const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && skip_end_rows <= 0 && num_rows == -1; @@ -927,35 +925,17 @@ parse_options make_parse_options(csv_reader_options const& reader_opts, } reader::impl::impl(std::unique_ptr source, - std::string filepath, csv_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : mr_(mr), source_(std::move(source)), filepath_(filepath), opts_(options) + : mr_(mr), source_(std::move(source)), opts_(options) { num_actual_cols_ = opts_.get_names().size(); num_active_cols_ = num_actual_cols_; - compression_type_ = - infer_compression_type(opts_.get_compression(), - filepath, - {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); - opts = make_parse_options(options, stream); } -// Forward to implementation -reader::reader(std::vector const& filepaths, - csv_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported."); - // Delay actual instantiation of data source until read to allow for - // partial memory mapping of file using byte ranges - _impl = std::make_unique(nullptr, filepaths[0], options, stream, mr); -} - // Forward to implementation reader::reader(std::vector>&& sources, csv_reader_options const& options, @@ -963,7 +943,7 @@ reader::reader(std::vector>&& sources, rmm::mr::device_memory_resource* mr) { CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(std::move(sources[0]), "", options, stream, mr); + _impl = std::make_unique(std::move(sources[0]), options, stream, mr); } // Destructor within this translation unit diff --git a/cpp/src/io/csv/reader_impl.hpp b/cpp/src/io/csv/reader_impl.hpp index 36c2bf4f9e7..beaa9b816cb 100644 --- a/cpp/src/io/csv/reader_impl.hpp +++ b/cpp/src/io/csv/reader_impl.hpp @@ -72,13 +72,11 @@ class reader::impl { * @brief Constructor from a dataset source with reader options. * * @param source Dataset source - * @param filepath Filepath if reading dataset from a file * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit impl(std::unique_ptr source, - std::string filepath, csv_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -222,8 +220,6 @@ class reader::impl { private: rmm::mr::device_memory_resource* mr_ = nullptr; std::unique_ptr source_; - std::string filepath_; - std::string compression_type_; const csv_reader_options opts_; cudf::size_type num_records_ = 0; // Number of rows with actual data diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index e080ea3a2ca..ccc2eef56c7 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -144,27 +144,61 @@ table_with_metadata read_avro(avro_reader_options const& options, return reader->read(options); } -table_with_metadata read_json(json_reader_options const& options, - rmm::mr::device_memory_resource* mr) +compression_type infer_compression_type(compression_type compression, source_info const& info) +{ + if (compression != compression_type::AUTO) { return compression; } + + if (info.type != io_type::FILEPATH) { return compression_type::NONE; } + + auto filepath = info.filepaths[0]; + + // Attempt to infer from the file extension + const auto pos = filepath.find_last_of('.'); + + if (pos == std::string::npos) { return {}; } + + auto str_tolower = [](const auto& begin, const auto& end) { + std::string out; + std::transform(begin, end, std::back_inserter(out), ::tolower); + return out; + }; + + const auto ext = str_tolower(filepath.begin() + pos + 1, filepath.end()); + + if (ext == "gz") { return compression_type::GZIP; } + if (ext == "zip") { return compression_type::ZIP; } + if (ext == "bz2") { return compression_type::BZIP2; } + if (ext == "xz") { return compression_type::XZ; } + + return compression_type::NONE; +} + +table_with_metadata read_json(json_reader_options options, rmm::mr::device_memory_resource* mr) { namespace json = cudf::io::detail::json; CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); + + options.set_compression(infer_compression_type(options.get_compression(), options.get_source())); + auto reader = std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); return reader->read(options); } -table_with_metadata read_csv(csv_reader_options const& options, rmm::mr::device_memory_resource* mr) +table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_resource* mr) { namespace csv = cudf::io::detail::csv; CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); + + options.set_compression(infer_compression_type(options.get_compression(), options.get_source())); + auto reader = std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index a8f117c22bf..bae7471e307 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -241,15 +241,6 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) map_range_size = range_size + calculate_max_row_size(dtype_option_size); } - // Support delayed opening of the file if using memory mapping datasource - // This allows only mapping of a subset of the file if using byte range - if (sources_.empty()) { - assert(!filepaths_.empty()); - for (const auto& path : filepaths_) { - sources_.emplace_back(datasource::create(path, range_offset, map_range_size)); - } - } - // Iterate through the user defined sources and read the contents into the local buffer CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); size_t total_source_size = 0; @@ -280,11 +271,7 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) */ void reader::impl::decompress_input(rmm::cuda_stream_view stream) { - const auto compression_type = - infer_compression_type(options_.get_compression(), - filepaths_.size() > 0 ? filepaths_[0] : "", - {{"gz", "gzip"}, {"zip", "zip"}, {"bz2", "bz2"}, {"xz", "xz"}}); - if (compression_type == "none") { + if (options_.get_compression() == compression_type::NONE) { // Do not use the owner vector here to avoid extra copy uncomp_data_ = reinterpret_cast(buffer_.data()); uncomp_size_ = buffer_.size(); @@ -293,7 +280,7 @@ void reader::impl::decompress_input(rmm::cuda_stream_view stream) host_span( // reinterpret_cast(buffer_.data()), buffer_.size()), - compression_type); + options_.get_compression()); uncomp_data_ = uncomp_data_owner_.data(); uncomp_size_ = uncomp_data_owner_.size(); @@ -665,7 +652,7 @@ reader::impl::impl(std::vector>&& sources, json_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : options_(options), mr_(mr), sources_(std::move(sources)), filepaths_(filepaths) + : options_(options), mr_(mr), sources_(std::move(sources)) { CUDF_EXPECTS(options_.is_enabled_lines(), "Only JSON Lines format is currently supported.\n"); @@ -713,18 +700,6 @@ table_with_metadata reader::impl::read(json_reader_options const& options, return convert_data_to_table(rec_starts, stream); } -// Forward to implementation -reader::reader(std::vector const& filepaths, - json_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - // Delay actual instantiation of data source until read to allow for - // partial memory mapping of file using byte ranges - std::vector> src = {}; // Empty datasources - _impl = std::make_unique(std::move(src), filepaths, options, stream, mr); -} - // Forward to implementation reader::reader(std::vector>&& sources, json_reader_options const& options, diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index 5cf51369cdf..f7af55b2b90 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -57,7 +57,6 @@ class reader::impl { rmm::mr::device_memory_resource* mr_ = nullptr; std::vector> sources_; - std::vector filepaths_; std::vector buffer_; const char* uncomp_data_ = nullptr; diff --git a/cpp/src/io/utilities/parsing_utils.cu b/cpp/src/io/utilities/parsing_utils.cu index 6c8f01111e5..ba62238c5d3 100644 --- a/cpp/src/io/utilities/parsing_utils.cu +++ b/cpp/src/io/utilities/parsing_utils.cu @@ -209,39 +209,5 @@ cudf::size_type count_all_from_set(const char* h_data, return find_all_from_set(h_data, h_size, keys, 0, nullptr, stream); } -std::string infer_compression_type( - const compression_type& compression_arg, - const std::string& filename, - const std::vector>& ext_to_comp_map) -{ - auto str_tolower = [](const auto& begin, const auto& end) { - std::string out; - std::transform(begin, end, std::back_inserter(out), ::tolower); - return out; - }; - - // Attempt to infer from user-supplied argument - if (compression_arg != compression_type::AUTO) { - switch (compression_arg) { - case compression_type::GZIP: return "gzip"; - case compression_type::BZIP2: return "bz2"; - case compression_type::ZIP: return "zip"; - case compression_type::XZ: return "xz"; - default: break; - } - } - - // Attempt to infer from the file extension - const auto pos = filename.find_last_of('.'); - if (pos != std::string::npos) { - const auto ext = str_tolower(filename.begin() + pos + 1, filename.end()); - for (const auto& mapping : ext_to_comp_map) { - if (mapping.first == ext) { return mapping.second; } - } - } - - return "none"; -} - } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh index 88297423b9b..daf23de7eb2 100644 --- a/cpp/src/io/utilities/parsing_utils.cuh +++ b/cpp/src/io/utilities/parsing_utils.cuh @@ -454,24 +454,6 @@ cudf::size_type count_all_from_set(const char* h_data, const std::vector& keys, rmm::cuda_stream_view stream); -/** - * @brief Infer file compression type based on user supplied arguments. - * - * If the user specifies a valid compression_type for compression arg, - * compression type will be computed based on that. Otherwise the filename - * and ext_to_comp_map will be used. - * - * @param[in] compression_arg User specified compression type (if any) - * @param[in] filename Filename to base compression type (by extension) on - * @param[in] ext_to_comp_map User supplied mapping of file extension to compression type - * - * @return string representing compression type ("gzip, "bz2", etc) - */ -std::string infer_compression_type( - const compression_type& compression_arg, - const std::string& filename, - const std::vector>& ext_to_comp_map); - /** * @brief Checks whether the given character is a whitespace character. * diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index a15a180d466..7a54ccac197 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -101,7 +101,7 @@ cdef csv_reader_options make_csv_reader_options( bool na_filter, object prefix, object index_col, -) except +: +) except *: cdef source_info c_source_info = make_source_info([datasource]) cdef compression_type c_compression cdef size_type c_header diff --git a/python/cudf/cudf/tests/test_csv.py b/python/cudf/cudf/tests/test_csv.py index 5511a65d0a4..8fb5d7cc9eb 100644 --- a/python/cudf/cudf/tests/test_csv.py +++ b/python/cudf/cudf/tests/test_csv.py @@ -1069,20 +1069,6 @@ def test_csv_reader_byte_range(tmpdir, segment_bytes): assert list(df["int2"]) == list(ref_df["int2"]) -def test_csv_reader_byte_range_type_corner_case(tmpdir): - fname = tmpdir.mkdir("gdf_csv").join("tmp_csvreader_file17.csv") - - cudf.datasets.timeseries( - start="2000-01-01", - end="2000-01-02", - dtypes={"name": str, "id": int, "x": float, "y": float}, - ).to_csv(fname, chunksize=100000) - - byte_range = (2_147_483_648, 0) - with pytest.raises(RuntimeError, match="Offset is past end of file"): - cudf.read_csv(fname, byte_range=byte_range, header=None) - - @pytest.mark.parametrize("segment_bytes", [10, 19, 31, 36]) def test_csv_reader_byte_range_strings(segment_bytes): names = ["strings"] From 62b95202d9b1db14f765ef45644d9cf91f782ea7 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Tue, 17 Aug 2021 02:19:27 -0500 Subject: [PATCH 03/21] remove filepath logic from avro, parquet, orc readers --- cpp/include/cudf/io/detail/avro.hpp | 13 ------------- cpp/include/cudf/io/detail/orc.hpp | 13 ------------- cpp/include/cudf/io/detail/parquet.hpp | 13 ------------- cpp/src/io/avro/reader_impl.cu | 10 ---------- cpp/src/io/orc/reader_impl.cu | 9 --------- cpp/src/io/parquet/reader_impl.cu | 9 --------- 6 files changed, 67 deletions(-) diff --git a/cpp/include/cudf/io/detail/avro.hpp b/cpp/include/cudf/io/detail/avro.hpp index 98483d1c03e..306c15dcb72 100644 --- a/cpp/include/cudf/io/detail/avro.hpp +++ b/cpp/include/cudf/io/detail/avro.hpp @@ -38,19 +38,6 @@ class reader { std::unique_ptr _impl; public: - /** - * @brief Constructor from an array of file paths - * - * @param filepaths Paths to the files containing the input dataset - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector const& filepaths, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - /** * @brief Constructor from an array of datasources * diff --git a/cpp/include/cudf/io/detail/orc.hpp b/cpp/include/cudf/io/detail/orc.hpp index ab26c01db74..2174b688da2 100644 --- a/cpp/include/cudf/io/detail/orc.hpp +++ b/cpp/include/cudf/io/detail/orc.hpp @@ -47,19 +47,6 @@ class reader { std::unique_ptr _impl; public: - /** - * @brief Constructor from an array of file paths - * - * @param filepaths Paths to the files containing the input dataset - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector const& filepaths, - orc_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - /** * @brief Constructor from an array of datasources * diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index d95af7a11da..14f27ef8eef 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -49,19 +49,6 @@ class reader { std::unique_ptr _impl; public: - /** - * @brief Constructor from an array of file paths - * - * @param filepaths Paths to the files containing the input dataset - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector const& filepaths, - parquet_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - /** * @brief Constructor from an array of datasources * diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index f6ffdd99d35..08ea96139a1 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -474,16 +474,6 @@ table_with_metadata reader::impl::read(avro_reader_options const& options, return {std::make_unique(std::move(out_columns)), std::move(metadata_out)}; } -// Forward to implementation -reader::reader(std::vector const& filepaths, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - CUDF_EXPECTS(filepaths.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(datasource::create(filepaths[0]), options, mr); -} - // Forward to implementation reader::reader(std::vector>&& sources, avro_reader_options const& options, diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 033a2d9aff5..5d62c45df83 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -1383,15 +1383,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, return {std::make_unique
(std::move(out_columns)), std::move(out_metadata)}; } -// Forward to implementation -reader::reader(std::vector const& filepaths, - orc_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - _impl = std::make_unique(datasource::create(filepaths), options, mr); -} - // Forward to implementation reader::reader(std::vector>&& sources, orc_reader_options const& options, diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 9f9bdfd4755..31ae763d9ff 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1608,15 +1608,6 @@ table_with_metadata reader::impl::read(size_type skip_rows, return {std::make_unique
(std::move(out_columns)), std::move(out_metadata)}; } -// Forward to implementation -reader::reader(std::vector const& filepaths, - parquet_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _impl(std::make_unique(datasource::create(filepaths), options, mr)) -{ -} - // Forward to implementation reader::reader(std::vector>&& sources, parquet_reader_options const& options, From fb0129433bdd2dd264105ba172d96f2a310d8d8d Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 18 Aug 2021 15:19:11 -0500 Subject: [PATCH 04/21] move range size padding calculation out of json/csv reader and in to json/csv options --- cpp/include/cudf/io/csv.hpp | 34 +++++++++++++++++++++ cpp/include/cudf/io/json.hpp | 32 +++++++++++++++++++ cpp/src/io/csv/reader_impl.cu | 49 +++++------------------------- cpp/src/io/functions.cpp | 24 +++++++++++---- cpp/src/io/json/reader_impl.cu | 46 ++++++---------------------- cpp/src/io/json/reader_impl.hpp | 3 +- python/cudf/cudf/tests/test_csv.py | 14 +++++++++ 7 files changed, 116 insertions(+), 86 deletions(-) diff --git a/cpp/include/cudf/io/csv.hpp b/cpp/include/cudf/io/csv.hpp index c807f189aac..1aa6e3bea29 100644 --- a/cpp/include/cudf/io/csv.hpp +++ b/cpp/include/cudf/io/csv.hpp @@ -177,6 +177,40 @@ class csv_reader_options { */ std::size_t get_byte_range_size() const { return _byte_range_size; } + /** + * @brief Returns number of bytes to read with padding. + */ + std::size_t get_byte_range_size_with_padding() const + { + if (_byte_range_size == 0) { + return 0; + } else { + return _byte_range_size + get_byte_range_padding(); + } + } + + /** + * @brief Returns number of bytes to pad when reading. + */ + std::size_t get_byte_range_padding() const + { + auto const num_names = _names.size(); + auto const num_dtypes = std::visit([](const auto& dtypes) { return dtypes.size(); }, _dtypes); + auto const num_columns = std::max(num_dtypes, num_names); + + auto const max_row_bytes = 16 * 1024; // 16KB + auto const column_bytes = 64; + auto const base_padding = 1024; // 1KB + + if (num_columns == 0) { + // Use flat size if the number of columns is not known + return max_row_bytes; + } + + // Expand the size based on the number of columns, if available + return base_padding + num_columns * column_bytes; + } + /** * @brief Returns names of the columns. */ diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index bca60f76260..5d2a4f6fcd1 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -140,6 +140,38 @@ class json_reader_options { */ size_t get_byte_range_size() const { return _byte_range_size; } + /** + * @brief Returns number of bytes to read with padding. + */ + size_t get_byte_range_size_with_padding() const + { + if (_byte_range_size == 0) { + return 0; + } else { + return _byte_range_size + get_byte_range_padding(); + } + } + + /** + * @brief Returns number of bytes to pad when reading. + */ + size_t get_byte_range_padding() const + { + auto const num_columns = std::visit([](const auto& dtypes) { return dtypes.size(); }, _dtypes); + + auto const max_row_bytes = 16 * 1024; // 16KB + auto const column_bytes = 64; + auto const base_padding = 1024; // 1KB + + if (num_columns == 0) { + // Use flat size if the number of columns is not known + return max_row_bytes; + } + + // Expand the size based on the number of columns, if available + return base_padding + num_columns * column_bytes; + } + /** * @brief Whether to read the file as a json object per line. */ diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index a85a610962e..c61cc26800e 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -57,31 +57,6 @@ namespace csv { using namespace cudf::io::csv; using namespace cudf::io; -/** - * @brief Estimates the maximum expected length or a row, based on the number - * of columns - * - * If the number of columns is not available, it will return a value large - * enough for most use cases - * - * @param[in] num_columns Number of columns in the CSV file (optional) - * - * @return Estimated maximum size of a row, in bytes - */ -constexpr size_t calculateMaxRowSize(int num_columns = 0) noexcept -{ - constexpr size_t max_row_bytes = 16 * 1024; // 16KB - constexpr size_t column_bytes = 64; - constexpr size_t base_padding = 1024; // 1KB - if (num_columns == 0) { - // Use flat size if the number of columns is not known - return max_row_bytes; - } else { - // Expand the size based on the number of columns, if available - return base_padding + num_columns * column_bytes; - } -} - /** * @brief Translates a dtype string and returns its dtype enumeration and any * extended dtype flags that are supported by cuIO. Often, this is a column @@ -199,31 +174,21 @@ void erase_except_last(C& container, rmm::cuda_stream_view stream) std::pair, reader::impl::selected_rows_offsets> reader::impl::select_data_and_row_offsets(rmm::cuda_stream_view stream) { - auto range_offset = opts_.get_byte_range_offset(); - auto range_size = opts_.get_byte_range_size(); - auto skip_rows = opts_.get_skiprows(); - auto skip_end_rows = opts_.get_skipfooter(); - auto num_rows = opts_.get_nrows(); + auto range_offset = opts_.get_byte_range_offset(); + auto range_size = opts_.get_byte_range_size(); + auto range_size_padded = opts_.get_byte_range_size_with_padding(); + auto skip_rows = opts_.get_skiprows(); + auto skip_end_rows = opts_.get_skipfooter(); + auto num_rows = opts_.get_nrows(); if (range_offset > 0 || range_size > 0) { CUDF_EXPECTS(opts_.get_compression() == compression_type::NONE, "Reading compressed data using `byte range` is unsupported"); } - size_t map_range_size = 0; - - if (range_size != 0) { - auto num_given_dtypes = - std::visit([](const auto& dtypes) { return dtypes.size(); }, opts_.get_dtypes()); - const auto num_columns = std::max(opts_.get_names().size(), num_given_dtypes); - map_range_size = range_size + calculateMaxRowSize(num_columns); - } - - // TODO: provide hint to datasource that we should memory map any underlying file. - // Transfer source data to GPU if (!source_->is_empty()) { - auto data_size = (map_range_size != 0) ? map_range_size : source_->size(); + auto data_size = (range_size_padded != 0) ? range_size_padded : source_->size(); auto buffer = source_->host_read(range_offset, data_size); auto h_data = host_span( // diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index ccc2eef56c7..438cb1762c6 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -107,10 +107,18 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( namespace { -std::vector> make_datasources(source_info const& info) +std::vector> make_datasources(source_info const& info, + size_t range_offset = 0, + size_t range_size = 0) { switch (info.type) { - case io_type::FILEPATH: return cudf::io::datasource::create(info.filepaths); + case io_type::FILEPATH: { + auto sources = std::vector>(); + for (auto const& filepath : info.filepaths) { + sources.emplace_back(cudf::io::datasource::create(filepath, range_offset, range_size)); + } + return sources; + } case io_type::HOST_BUFFER: return cudf::io::datasource::create(info.buffers); case io_type::USER_IMPLEMENTED: return cudf::io::datasource::create(info.user_sources); default: CUDF_FAIL("Unsupported source type"); @@ -179,10 +187,12 @@ table_with_metadata read_json(json_reader_options options, rmm::mr::device_memor CUDF_FUNC_RANGE(); - auto datasources = make_datasources(options.get_source()); - options.set_compression(infer_compression_type(options.get_compression(), options.get_source())); + auto datasources = make_datasources(options.get_source(), + options.get_byte_range_offset(), + options.get_byte_range_size_with_padding()); + auto reader = std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); @@ -195,10 +205,12 @@ table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_ CUDF_FUNC_RANGE(); - auto datasources = make_datasources(options.get_source()); - options.set_compression(infer_compression_type(options.get_compression(), options.get_source())); + auto datasources = make_datasources(options.get_source(), + options.get_byte_range_offset(), + options.get_byte_range_size_with_padding()); + auto reader = std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index bae7471e307..0618f02e98f 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -50,31 +50,6 @@ namespace detail { namespace json { using namespace cudf::io; -namespace { -/** - * @brief Estimates the maximum expected length or a row, based on the number - * of columns - * - * If the number of columns is not available, it will return a value large - * enough for most use cases - * - * @param[in] num_columns Number of columns in the JSON file (optional) - * - * @return Estimated maximum size of a row, in bytes - */ -constexpr size_t calculate_max_row_size(int num_columns = 0) noexcept -{ - constexpr size_t max_row_bytes = 16 * 1024; // 16KB - constexpr size_t column_bytes = 64; - constexpr size_t base_padding = 1024; // 1KB - return num_columns == 0 - ? max_row_bytes // Use flat size if the # of columns is not known - : base_padding + - num_columns * column_bytes; // Expand size based on the # of columns, if available -} - -} // anonymous namespace - /** * @brief Aggregate the table containing keys info by their hash values. * @@ -231,16 +206,12 @@ std::pair, col_map_ptr_type> reader::impl::get_json_obj * * @param[in] range_offset Number of bytes offset from the start * @param[in] range_size Bytes to read; use `0` for all remaining data + * @param[in] range_size_padded Bytes to read with padding; use `0` for all remaining data */ -void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) +void reader::impl::ingest_raw_input(size_t range_offset, + size_t range_size, + size_t range_size_padded) { - size_t map_range_size = 0; - if (range_size != 0) { - auto const dtype_option_size = - std::visit([](const auto& dtypes) { return dtypes.size(); }, options_.get_dtypes()); - map_range_size = range_size + calculate_max_row_size(dtype_option_size); - } - // Iterate through the user defined sources and read the contents into the local buffer CUDF_EXPECTS(!sources_.empty(), "No sources were defined"); size_t total_source_size = 0; @@ -253,7 +224,7 @@ void reader::impl::ingest_raw_input(size_t range_offset, size_t range_size) size_t bytes_read = 0; for (const auto& source : sources_) { if (!source->is_empty()) { - auto data_size = (map_range_size != 0) ? map_range_size : source->size(); + auto data_size = (range_size_padded != 0) ? range_size_padded : source->size(); bytes_read += source->host_read(range_offset, data_size, &buffer_[bytes_read]); } } @@ -675,10 +646,11 @@ reader::impl::impl(std::vector>&& sources, table_with_metadata reader::impl::read(json_reader_options const& options, rmm::cuda_stream_view stream) { - auto range_offset = options.get_byte_range_offset(); - auto range_size = options.get_byte_range_size(); + auto range_offset = options.get_byte_range_offset(); + auto range_size = options.get_byte_range_size(); + auto range_size_padded = options.get_byte_range_size_with_padding(); - ingest_raw_input(range_offset, range_size); + ingest_raw_input(range_offset, range_size, range_size_padded); CUDF_EXPECTS(buffer_.size() != 0, "Ingest failed: input data is null.\n"); decompress_input(stream); diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index f7af55b2b90..d01f2e8677e 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -109,8 +109,9 @@ class reader::impl { * * @param[in] range_offset Number of bytes offset from the start * @param[in] range_size Bytes to read; use `0` for all remaining data + * @param[in] range_size_padded Bytes to read with padding; use `0` for all remaining data */ - void ingest_raw_input(size_t range_offset, size_t range_size); + void ingest_raw_input(size_t range_offset, size_t range_size, size_t range_size_padded); /** * @brief Extract the JSON objects keys from the input file with object rows. diff --git a/python/cudf/cudf/tests/test_csv.py b/python/cudf/cudf/tests/test_csv.py index 8fb5d7cc9eb..5511a65d0a4 100644 --- a/python/cudf/cudf/tests/test_csv.py +++ b/python/cudf/cudf/tests/test_csv.py @@ -1069,6 +1069,20 @@ def test_csv_reader_byte_range(tmpdir, segment_bytes): assert list(df["int2"]) == list(ref_df["int2"]) +def test_csv_reader_byte_range_type_corner_case(tmpdir): + fname = tmpdir.mkdir("gdf_csv").join("tmp_csvreader_file17.csv") + + cudf.datasets.timeseries( + start="2000-01-01", + end="2000-01-02", + dtypes={"name": str, "id": int, "x": float, "y": float}, + ).to_csv(fname, chunksize=100000) + + byte_range = (2_147_483_648, 0) + with pytest.raises(RuntimeError, match="Offset is past end of file"): + cudf.read_csv(fname, byte_range=byte_range, header=None) + + @pytest.mark.parametrize("segment_bytes", [10, 19, 31, 36]) def test_csv_reader_byte_range_strings(segment_bytes): names = ["strings"] From d422aebbe62d7e9915af93f474563e6e1c571e97 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 18 Aug 2021 15:30:38 -0500 Subject: [PATCH 05/21] remove filepaths from json reader --- cpp/src/io/json/reader_impl.cu | 12 +++++------- cpp/src/io/json/reader_impl.hpp | 3 +-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 0618f02e98f..2964a12568f 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -231,7 +231,7 @@ void reader::impl::ingest_raw_input(size_t range_offset, byte_range_offset_ = range_offset; byte_range_size_ = range_size; - load_whole_file_ = byte_range_offset_ == 0 && byte_range_size_ == 0; + load_whole_source_ = byte_range_offset_ == 0 && byte_range_size_ == 0; } /** @@ -256,7 +256,7 @@ void reader::impl::decompress_input(rmm::cuda_stream_view stream) uncomp_data_ = uncomp_data_owner_.data(); uncomp_size_ = uncomp_data_owner_.size(); } - if (load_whole_file_) data_ = rmm::device_buffer(uncomp_data_, uncomp_size_, stream); + if (load_whole_source_) data_ = rmm::device_buffer(uncomp_data_, uncomp_size_, stream); } rmm::device_uvector reader::impl::find_record_starts(rmm::cuda_stream_view stream) @@ -268,7 +268,7 @@ rmm::device_uvector reader::impl::find_record_starts(rmm::cuda_stream_ if (allow_newlines_in_strings_) { chars_to_count.push_back('\"'); } // If not starting at an offset, add an extra row to account for the first row in the file cudf::size_type prefilter_count = ((byte_range_offset_ == 0) ? 1 : 0); - if (load_whole_file_) { + if (load_whole_source_) { prefilter_count += count_all_from_set(data_, chars_to_count, stream); } else { prefilter_count += count_all_from_set(uncomp_data_, uncomp_size_, chars_to_count, stream); @@ -286,7 +286,7 @@ rmm::device_uvector reader::impl::find_record_starts(rmm::cuda_stream_ std::vector chars_to_find{'\n'}; if (allow_newlines_in_strings_) { chars_to_find.push_back('\"'); } // Passing offset = 1 to return positions AFTER the found character - if (load_whole_file_) { + if (load_whole_source_) { find_all_from_set(data_, chars_to_find, 1, find_result_ptr, stream); } else { find_all_from_set(uncomp_data_, uncomp_size_, chars_to_find, 1, find_result_ptr, stream); @@ -619,7 +619,6 @@ table_with_metadata reader::impl::convert_data_to_table(device_span>&& sources, - std::vector const& filepaths, json_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) @@ -678,8 +677,7 @@ reader::reader(std::vector>&& sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - std::vector file_paths = {}; // Empty filepaths - _impl = std::make_unique(std::move(sources), file_paths, options, stream, mr); + _impl = std::make_unique(std::move(sources), options, stream, mr); } // Destructor within this translation unit diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index d01f2e8677e..d910cce2d72 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -68,7 +68,7 @@ class reader::impl { size_t byte_range_offset_ = 0; size_t byte_range_size_ = 0; - bool load_whole_file_ = true; + bool load_whole_source_ = true; table_metadata metadata_; std::vector dtypes_; @@ -186,7 +186,6 @@ class reader::impl { * @brief Constructor from a dataset source with reader options. */ explicit impl(std::vector>&& sources, - std::vector const& filepaths, json_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); From 9107f2466d7b5a95458a0cfd0e73b57bacc6c759 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 13:25:38 -0500 Subject: [PATCH 06/21] remove unncessary avro reader class --- cpp/include/cudf/io/detail/avro.hpp | 49 +++++++++-------------------- cpp/src/io/avro/reader_impl.cu | 47 +++++++++++---------------- cpp/src/io/avro/reader_impl.hpp | 8 ++--- cpp/src/io/functions.cpp | 6 ++-- 4 files changed, 40 insertions(+), 70 deletions(-) diff --git a/cpp/include/cudf/io/detail/avro.hpp b/cpp/include/cudf/io/detail/avro.hpp index 306c15dcb72..2d30b3df8e0 100644 --- a/cpp/include/cudf/io/detail/avro.hpp +++ b/cpp/include/cudf/io/detail/avro.hpp @@ -29,44 +29,23 @@ namespace cudf { namespace io { namespace detail { namespace avro { + /** - * @brief Class to read Avro dataset data into columns. + * @brief Reads the entire dataset. + * + * @param source Input `datasource` object to read the dataset from + * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource to use for device memory allocation + * + * @return The set of columns along with table metadata */ -class reader { - private: - class impl; - std::unique_ptr _impl; - - public: - /** - * @brief Constructor from an array of datasources - * - * @param sources Input `datasource` objects to read the dataset from - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation - */ - explicit reader(std::vector>&& sources, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - /** - * @brief Destructor explicitly-declared to avoid inlined in header - */ - ~reader(); +table_with_metadata read_avro( + std::unique_ptr&& source, + avro_reader_options const& options, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); - /** - * @brief Reads the entire dataset. - * - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return The set of columns along with table metadata - */ - table_with_metadata read(avro_reader_options const& options, - rmm::cuda_stream_view stream = rmm::cuda_stream_default); -}; } // namespace avro } // namespace detail } // namespace io diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 08ea96139a1..b266c12bdb7 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -138,8 +138,8 @@ class metadata : public file_metadata { datasource* const source; }; -rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_block_data, - rmm::cuda_stream_view stream) +rmm::device_buffer reader_impl::decompress_data(const rmm::device_buffer& comp_block_data, + rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; hostdevice_vector inflate_in(_metadata->block_list.size()); @@ -235,13 +235,13 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer& comp_ return decomp_block_data; } -void reader::impl::decode_data(const rmm::device_buffer& block_data, - const std::vector>& dict, - device_span global_dictionary, - size_t num_rows, - std::vector> selection, - std::vector& out_buffers, - rmm::cuda_stream_view stream) +void reader_impl::decode_data(const rmm::device_buffer& block_data, + const std::vector>& dict, + device_span global_dictionary, + size_t num_rows, + std::vector> selection, + std::vector& out_buffers, + rmm::cuda_stream_view stream) { // Build gpu schema hostdevice_vector schema_desc(_metadata->schema.size()); @@ -334,17 +334,17 @@ void reader::impl::decode_data(const rmm::device_buffer& block_data, } } -reader::impl::impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr) +reader_impl::reader_impl(std::unique_ptr source, + avro_reader_options const& options, + rmm::mr::device_memory_resource* mr) : _mr(mr), _source(std::move(source)), _columns(options.get_columns()) { // Open the source Avro dataset metadata _metadata = std::make_unique(_source.get()); } -table_with_metadata reader::impl::read(avro_reader_options const& options, - rmm::cuda_stream_view stream) +table_with_metadata reader_impl::read(avro_reader_options const& options, + rmm::cuda_stream_view stream) { auto skip_rows = options.get_skip_rows(); auto num_rows = options.get_num_rows(); @@ -475,23 +475,14 @@ table_with_metadata reader::impl::read(avro_reader_options const& options, } // Forward to implementation -reader::reader(std::vector>&& sources, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +table_with_metadata read_avro(std::unique_ptr&& source, + avro_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(sources.size() == 1, "Only a single source is currently supported."); - _impl = std::make_unique(std::move(sources[0]), options, mr); + return reader_impl(std::move(source), options, mr).read(options, stream); } -// Destructor within this translation unit -reader::~reader() = default; - -// Forward to implementation -table_with_metadata reader::read(avro_reader_options const& options, rmm::cuda_stream_view stream) -{ - return _impl->read(options, stream); -} } // namespace avro } // namespace detail } // namespace io diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index 9af32ed88a0..0bd9c446a96 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -51,7 +51,7 @@ class metadata; /** * @brief Implementation for Avro reader */ -class reader::impl { +class reader_impl { public: /** * @brief Constructor from a dataset source with reader options. @@ -60,9 +60,9 @@ class reader::impl { * @param options Settings for controlling reading behavior * @param mr Device memory resource to use for device memory allocation */ - explicit impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr); + explicit reader_impl(std::unique_ptr source, + avro_reader_options const& options, + rmm::mr::device_memory_resource* mr); /** * @brief Read an entire set or a subset of data and returns a set of columns diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 438cb1762c6..511a1a22ee7 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -146,10 +146,10 @@ table_with_metadata read_avro(avro_reader_options const& options, CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = - std::make_unique(std::move(datasources), options, rmm::cuda_stream_default, mr); - return reader->read(options); + CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported."); + + return avro::read_avro(std::move(datasources[0]), options, rmm::cuda_stream_default, mr); } compression_type infer_compression_type(compression_type compression, source_info const& info) From f15622ebfe23c3b4ea316f0f9829e7231fa01077 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 13:44:29 -0500 Subject: [PATCH 07/21] replace avro::reader_impl mr member with local variable --- cpp/src/io/avro/reader_impl.cu | 15 +++++++-------- cpp/src/io/avro/reader_impl.hpp | 11 +++++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index b266c12bdb7..11c24fc54b5 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -334,17 +334,16 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, } } -reader_impl::reader_impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr) - : _mr(mr), _source(std::move(source)), _columns(options.get_columns()) +reader_impl::reader_impl(std::unique_ptr source, avro_reader_options const& options) + : _source(std::move(source)), _columns(options.get_columns()) { // Open the source Avro dataset metadata _metadata = std::make_unique(_source.get()); } table_with_metadata reader_impl::read(avro_reader_options const& options, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto skip_rows = options.get_skip_rows(); auto num_rows = options.get_num_rows(); @@ -447,13 +446,13 @@ table_with_metadata reader_impl::read(avro_reader_options const& options, for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; bool is_nullable = (_metadata->columns[col_idx].schema_null_idx >= 0); - out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, _mr); + out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); } decode_data(block_data, dict, d_global_dict, num_rows, selected_columns, out_buffers, stream); for (size_t i = 0; i < column_types.size(); ++i) { - out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, _mr)); + out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); } } else { // Create empty columns @@ -480,7 +479,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return reader_impl(std::move(source), options, mr).read(options, stream); + return reader_impl(std::move(source), options).read(options, stream, mr); } } // namespace avro diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index 0bd9c446a96..1aa07e3cf9c 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -58,21 +58,21 @@ class reader_impl { * * @param source Dataset source * @param options Settings for controlling reading behavior - * @param mr Device memory resource to use for device memory allocation */ - explicit reader_impl(std::unique_ptr source, - avro_reader_options const& options, - rmm::mr::device_memory_resource* mr); + explicit reader_impl(std::unique_ptr source, avro_reader_options const& options); /** * @brief Read an entire set or a subset of data and returns a set of columns * * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource to use for device memory allocation * * @return The set of columns along with metadata */ - table_with_metadata read(avro_reader_options const& options, rmm::cuda_stream_view stream); + table_with_metadata read(avro_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); private: /** @@ -104,7 +104,6 @@ class reader_impl { rmm::cuda_stream_view stream); private: - rmm::mr::device_memory_resource* _mr = nullptr; std::unique_ptr _source; std::unique_ptr _metadata; From 4cf9f32fb5e4cc94f396b215a945bc03e399aa60 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 13:55:17 -0500 Subject: [PATCH 08/21] replace avro::reader_impl source member with local variable --- cpp/src/io/avro/reader_impl.cu | 34 ++++++++++++++++----------------- cpp/src/io/avro/reader_impl.hpp | 12 +++++++----- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 11c24fc54b5..b75b99ca46a 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -138,7 +138,8 @@ class metadata : public file_metadata { datasource* const source; }; -rmm::device_buffer reader_impl::decompress_data(const rmm::device_buffer& comp_block_data, +rmm::device_buffer reader_impl::decompress_data(datasource* source, + rmm::device_buffer const& comp_block_data, rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; @@ -155,7 +156,7 @@ rmm::device_buffer reader_impl::decompress_data(const rmm::device_buffer& comp_b } else if (_metadata->codec == "snappy") { // Extract the uncompressed length from the snappy stream for (size_t i = 0; i < _metadata->block_list.size(); i++) { - const auto buffer = _source->host_read(_metadata->block_list[i].offset, 4); + const auto buffer = source->host_read(_metadata->block_list[i].offset, 4); const uint8_t* blk = buffer->data(); uint32_t blk_len = blk[0]; if (blk_len > 0x7f) { @@ -334,14 +335,10 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, } } -reader_impl::reader_impl(std::unique_ptr source, avro_reader_options const& options) - : _source(std::move(source)), _columns(options.get_columns()) -{ - // Open the source Avro dataset metadata - _metadata = std::make_unique(_source.get()); -} +reader_impl::reader_impl(avro_reader_options const& options) : _columns(options.get_columns()) {} -table_with_metadata reader_impl::read(avro_reader_options const& options, +table_with_metadata reader_impl::read(datasource* source, + avro_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -351,6 +348,9 @@ table_with_metadata reader_impl::read(avro_reader_options const& options, std::vector> out_columns; table_metadata metadata_out; + // Open the source Avro dataset metadata + _metadata = std::make_unique(source); + // Select and read partial metadata / schema within the subset of rows _metadata->init_and_select_rows(skip_rows, num_rows); @@ -369,21 +369,21 @@ table_with_metadata reader_impl::read(avro_reader_options const& options, if (_metadata->total_data_size > 0) { rmm::device_buffer block_data; - if (_source->is_device_read_preferred(_metadata->total_data_size)) { + if (source->is_device_read_preferred(_metadata->total_data_size)) { block_data = rmm::device_buffer{_metadata->total_data_size, stream}; - auto read_bytes = _source->device_read(_metadata->block_list[0].offset, - _metadata->total_data_size, - static_cast(block_data.data()), - stream); + auto read_bytes = source->device_read(_metadata->block_list[0].offset, + _metadata->total_data_size, + static_cast(block_data.data()), + stream); block_data.resize(read_bytes, stream); } else { const auto buffer = - _source->host_read(_metadata->block_list[0].offset, _metadata->total_data_size); + source->host_read(_metadata->block_list[0].offset, _metadata->total_data_size); block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; } if (_metadata->codec != "" && _metadata->codec != "null") { - auto decomp_block_data = decompress_data(block_data, stream); + auto decomp_block_data = decompress_data(source, block_data, stream); block_data = std::move(decomp_block_data); } else { auto dst_ofs = _metadata->block_list[0].offset; @@ -479,7 +479,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return reader_impl(std::move(source), options).read(options, stream, mr); + return reader_impl(options).read(source.get(), options, stream, mr); } } // namespace avro diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index 1aa07e3cf9c..b558ed5b18d 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -56,21 +56,22 @@ class reader_impl { /** * @brief Constructor from a dataset source with reader options. * - * @param source Dataset source * @param options Settings for controlling reading behavior */ - explicit reader_impl(std::unique_ptr source, avro_reader_options const& options); + explicit reader_impl(avro_reader_options const& options); /** * @brief Read an entire set or a subset of data and returns a set of columns * + * @param source Dataset source * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource to use for device memory allocation * * @return The set of columns along with metadata */ - table_with_metadata read(avro_reader_options const& options, + table_with_metadata read(datasource* source, + avro_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -78,12 +79,14 @@ class reader_impl { /** * @brief Decompresses the block data. * + * @param source Dataset source * @param comp_block_data Compressed block data * @param stream CUDA stream used for device memory operations and kernel launches. * * @return Device buffer to decompressed block data */ - rmm::device_buffer decompress_data(const rmm::device_buffer& comp_block_data, + rmm::device_buffer decompress_data(datasource* source, + rmm::device_buffer const& comp_block_data, rmm::cuda_stream_view stream); /** @@ -104,7 +107,6 @@ class reader_impl { rmm::cuda_stream_view stream); private: - std::unique_ptr _source; std::unique_ptr _metadata; std::vector _columns; From ea2c219a20e0717e6eeec3b24c760a7e2eb58a2c Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 13:58:41 -0500 Subject: [PATCH 09/21] remove unneccessary avro::reader_impl _columns member --- cpp/src/io/avro/reader_impl.cu | 6 ++---- cpp/src/io/avro/reader_impl.hpp | 9 --------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index b75b99ca46a..27e3f347712 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -335,8 +335,6 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, } } -reader_impl::reader_impl(avro_reader_options const& options) : _columns(options.get_columns()) {} - table_with_metadata reader_impl::read(datasource* source, avro_reader_options const& options, rmm::cuda_stream_view stream, @@ -355,7 +353,7 @@ table_with_metadata reader_impl::read(datasource* source, _metadata->init_and_select_rows(skip_rows, num_rows); // Select only columns required by the options - auto selected_columns = _metadata->select_columns(_columns); + auto selected_columns = _metadata->select_columns(options.get_columns()); if (selected_columns.size() != 0) { // Get a list of column data types std::vector column_types; @@ -479,7 +477,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - return reader_impl(options).read(source.get(), options, stream, mr); + return reader_impl().read(source.get(), options, stream, mr); } } // namespace avro diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index b558ed5b18d..aac60c8a7e5 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -53,13 +53,6 @@ class metadata; */ class reader_impl { public: - /** - * @brief Constructor from a dataset source with reader options. - * - * @param options Settings for controlling reading behavior - */ - explicit reader_impl(avro_reader_options const& options); - /** * @brief Read an entire set or a subset of data and returns a set of columns * @@ -108,8 +101,6 @@ class reader_impl { private: std::unique_ptr _metadata; - - std::vector _columns; }; } // namespace avro From 2c16c19e6f27bf05b9aa589e7363bffc45345072 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 14:08:50 -0500 Subject: [PATCH 10/21] replace avro::reader_impl metadata member with local variable --- cpp/src/io/avro/reader_impl.cu | 142 +++++++++++++++++--------------- cpp/src/io/avro/reader_impl.hpp | 7 +- 2 files changed, 78 insertions(+), 71 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 27e3f347712..22c290242e5 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -139,24 +139,25 @@ class metadata : public file_metadata { }; rmm::device_buffer reader_impl::decompress_data(datasource* source, + metadata* meta, rmm::device_buffer const& comp_block_data, rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; - hostdevice_vector inflate_in(_metadata->block_list.size()); - hostdevice_vector inflate_out(_metadata->block_list.size()); + hostdevice_vector inflate_in(meta->block_list.size()); + hostdevice_vector inflate_out(meta->block_list.size()); - if (_metadata->codec == "deflate") { + if (meta->codec == "deflate") { // Guess an initial maximum uncompressed block size - uint32_t initial_blk_len = (_metadata->max_block_size * 2 + 0xfff) & ~0xfff; - uncompressed_data_size = initial_blk_len * _metadata->block_list.size(); + uint32_t initial_blk_len = (meta->max_block_size * 2 + 0xfff) & ~0xfff; + uncompressed_data_size = initial_blk_len * meta->block_list.size(); for (size_t i = 0; i < inflate_in.size(); ++i) { inflate_in[i].dstSize = initial_blk_len; } - } else if (_metadata->codec == "snappy") { + } else if (meta->codec == "snappy") { // Extract the uncompressed length from the snappy stream - for (size_t i = 0; i < _metadata->block_list.size(); i++) { - const auto buffer = source->host_read(_metadata->block_list[i].offset, 4); + for (size_t i = 0; i < meta->block_list.size(); i++) { + const auto buffer = source->host_read(meta->block_list[i].offset, 4); const uint8_t* blk = buffer->data(); uint32_t blk_len = blk[0]; if (blk_len > 0x7f) { @@ -175,28 +176,28 @@ rmm::device_buffer reader_impl::decompress_data(datasource* source, rmm::device_buffer decomp_block_data(uncompressed_data_size, stream); - const auto base_offset = _metadata->block_list[0].offset; - for (size_t i = 0, dst_pos = 0; i < _metadata->block_list.size(); i++) { - const auto src_pos = _metadata->block_list[i].offset - base_offset; + const auto base_offset = meta->block_list[0].offset; + for (size_t i = 0, dst_pos = 0; i < meta->block_list.size(); i++) { + const auto src_pos = meta->block_list[i].offset - base_offset; inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; - inflate_in[i].srcSize = _metadata->block_list[i].size; + inflate_in[i].srcSize = meta->block_list[i].size; inflate_in[i].dstDevice = static_cast(decomp_block_data.data()) + dst_pos; // Update blocks offsets & sizes to refer to uncompressed data - _metadata->block_list[i].offset = dst_pos; - _metadata->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += _metadata->block_list[i].size; + meta->block_list[i].offset = dst_pos; + meta->block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta->block_list[i].size; } for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { inflate_in.host_to_device(stream); CUDA_TRY( cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value())); - if (_metadata->codec == "deflate") { + if (meta->codec == "deflate") { CUDA_TRY(gpuinflate( inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream)); - } else if (_metadata->codec == "snappy") { + } else if (meta->codec == "snappy") { CUDA_TRY( gpu_unsnap(inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), stream)); } else { @@ -205,9 +206,9 @@ rmm::device_buffer reader_impl::decompress_data(datasource* source, inflate_out.device_to_host(stream, true); // Check if larger output is required, as it's not known ahead of time - if (_metadata->codec == "deflate" && !loop_cnt) { + if (meta->codec == "deflate" && !loop_cnt) { size_t actual_uncompressed_size = 0; - for (size_t i = 0; i < _metadata->block_list.size(); i++) { + for (size_t i = 0; i < meta->block_list.size(); i++) { // If error status is 1 (buffer too small), the `bytes_written` field // is actually contains the uncompressed data size if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) { @@ -217,13 +218,13 @@ rmm::device_buffer reader_impl::decompress_data(datasource* source, } if (actual_uncompressed_size > uncompressed_data_size) { decomp_block_data.resize(actual_uncompressed_size, stream); - for (size_t i = 0, dst_pos = 0; i < _metadata->block_list.size(); i++) { + for (size_t i = 0, dst_pos = 0; i < meta->block_list.size(); i++) { auto dst_base = static_cast(decomp_block_data.data()); inflate_in[i].dstDevice = dst_base + dst_pos; - _metadata->block_list[i].offset = dst_pos; - _metadata->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += _metadata->block_list[i].size; + meta->block_list[i].offset = dst_pos; + meta->block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta->block_list[i].size; } } else { break; @@ -236,7 +237,9 @@ rmm::device_buffer reader_impl::decompress_data(datasource* source, return decomp_block_data; } -void reader_impl::decode_data(const rmm::device_buffer& block_data, +void reader_impl::decode_data(metadata* meta, + const rmm::device_buffer& block_data, + const std::vector>& dict, device_span global_dictionary, size_t num_rows, @@ -245,19 +248,19 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, rmm::cuda_stream_view stream) { // Build gpu schema - hostdevice_vector schema_desc(_metadata->schema.size()); + hostdevice_vector schema_desc(meta->schema.size()); uint32_t min_row_data_size = 0; int skip_field_cnt = 0; - for (size_t i = 0; i < _metadata->schema.size(); i++) { - type_kind_e kind = _metadata->schema[i].kind; + for (size_t i = 0; i < meta->schema.size(); i++) { + type_kind_e kind = meta->schema[i].kind; if (skip_field_cnt != 0) { // Exclude union and array members from min_row_data_size - skip_field_cnt += _metadata->schema[i].num_children - 1; + skip_field_cnt += meta->schema[i].num_children - 1; } else { switch (kind) { case type_union: case type_array: - skip_field_cnt = _metadata->schema[i].num_children; + skip_field_cnt = meta->schema[i].num_children; // fall through case type_boolean: case type_int: @@ -270,21 +273,20 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, default: break; } } - if (kind == type_enum && !_metadata->schema[i].symbols.size()) { kind = type_int; } + if (kind == type_enum && !meta->schema[i].symbols.size()) { kind = type_int; } schema_desc[i].kind = kind; - schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)_metadata->schema[i].num_children; + schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)meta->schema[i].num_children; schema_desc[i].dataptr = nullptr; - CUDF_EXPECTS( - kind != type_union || _metadata->schema[i].num_children < 2 || - (_metadata->schema[i].num_children == 2 && (_metadata->schema[i + 1].kind == type_null || - _metadata->schema[i + 2].kind == type_null)), - "Union with non-null type not currently supported"); + CUDF_EXPECTS(kind != type_union || meta->schema[i].num_children < 2 || + (meta->schema[i].num_children == 2 && (meta->schema[i + 1].kind == type_null || + meta->schema[i + 2].kind == type_null)), + "Union with non-null type not currently supported"); } std::vector valid_alias(out_buffers.size(), nullptr); for (size_t i = 0; i < out_buffers.size(); i++) { const auto col_idx = selection[i].first; - int schema_data_idx = _metadata->columns[col_idx].schema_data_idx; - int schema_null_idx = _metadata->columns[col_idx].schema_null_idx; + int schema_data_idx = meta->columns[col_idx].schema_data_idx; + int schema_null_idx = meta->columns[col_idx].schema_null_idx; schema_desc[schema_data_idx].dataptr = out_buffers[i].data(); if (schema_null_idx >= 0) { @@ -294,7 +296,7 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, valid_alias[i] = schema_desc[schema_null_idx].dataptr; } } - if (_metadata->schema[schema_data_idx].kind == type_enum) { + if (meta->schema[schema_data_idx].kind == type_enum) { schema_desc[schema_data_idx].count = dict[i].first; } if (out_buffers[i].null_mask_size()) { @@ -302,17 +304,17 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, } } rmm::device_buffer block_list( - _metadata->block_list.data(), _metadata->block_list.size() * sizeof(block_desc_s), stream); + meta->block_list.data(), meta->block_list.size() * sizeof(block_desc_s), stream); schema_desc.host_to_device(stream); gpu::DecodeAvroColumnData(static_cast(block_list.data()), schema_desc.device_ptr(), global_dictionary, static_cast(block_data.data()), - static_cast(_metadata->block_list.size()), + static_cast(meta->block_list.size()), static_cast(schema_desc.size()), - _metadata->num_rows, - _metadata->skip_rows, + meta->num_rows, + meta->skip_rows, min_row_data_size, stream); @@ -330,7 +332,7 @@ void reader_impl::decode_data(const rmm::device_buffer& block_data, for (size_t i = 0; i < out_buffers.size(); i++) { const auto col_idx = selection[i].first; - const auto schema_null_idx = _metadata->columns[col_idx].schema_null_idx; + const auto schema_null_idx = meta->columns[col_idx].schema_null_idx; out_buffers[i].null_count() = (schema_null_idx >= 0) ? schema_desc[schema_null_idx].count : 0; } } @@ -347,46 +349,45 @@ table_with_metadata reader_impl::read(datasource* source, table_metadata metadata_out; // Open the source Avro dataset metadata - _metadata = std::make_unique(source); + auto meta = std::make_unique(source); // Select and read partial metadata / schema within the subset of rows - _metadata->init_and_select_rows(skip_rows, num_rows); + meta->init_and_select_rows(skip_rows, num_rows); // Select only columns required by the options - auto selected_columns = _metadata->select_columns(options.get_columns()); + auto selected_columns = meta->select_columns(options.get_columns()); if (selected_columns.size() != 0) { // Get a list of column data types std::vector column_types; for (const auto& col : selected_columns) { - auto& col_schema = _metadata->schema[_metadata->columns[col.first].schema_data_idx]; + auto& col_schema = meta->schema[meta->columns[col.first].schema_data_idx]; auto col_type = to_type_id(&col_schema); CUDF_EXPECTS(col_type != type_id::EMPTY, "Unknown type"); column_types.emplace_back(col_type); } - if (_metadata->total_data_size > 0) { + if (meta->total_data_size > 0) { rmm::device_buffer block_data; - if (source->is_device_read_preferred(_metadata->total_data_size)) { - block_data = rmm::device_buffer{_metadata->total_data_size, stream}; - auto read_bytes = source->device_read(_metadata->block_list[0].offset, - _metadata->total_data_size, + if (source->is_device_read_preferred(meta->total_data_size)) { + block_data = rmm::device_buffer{meta->total_data_size, stream}; + auto read_bytes = source->device_read(meta->block_list[0].offset, + meta->total_data_size, static_cast(block_data.data()), stream); block_data.resize(read_bytes, stream); } else { - const auto buffer = - source->host_read(_metadata->block_list[0].offset, _metadata->total_data_size); - block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; + const auto buffer = source->host_read(meta->block_list[0].offset, meta->total_data_size); + block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; } - if (_metadata->codec != "" && _metadata->codec != "null") { - auto decomp_block_data = decompress_data(source, block_data, stream); + if (meta->codec != "" && meta->codec != "null") { + auto decomp_block_data = decompress_data(source, meta.get(), block_data, stream); block_data = std::move(decomp_block_data); } else { - auto dst_ofs = _metadata->block_list[0].offset; - for (size_t i = 0; i < _metadata->block_list.size(); i++) { - _metadata->block_list[i].offset -= dst_ofs; + auto dst_ofs = meta->block_list[0].offset; + for (size_t i = 0; i < meta->block_list.size(); i++) { + meta->block_list[i].offset -= dst_ofs; } } @@ -395,7 +396,7 @@ table_with_metadata reader_impl::read(datasource* source, std::vector> dict(column_types.size()); for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; - auto& col_schema = _metadata->schema[_metadata->columns[col_idx].schema_data_idx]; + auto& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; dict[i].first = static_cast(total_dictionary_entries); dict[i].second = static_cast(col_schema.symbols.size()); total_dictionary_entries += dict[i].second; @@ -411,8 +412,8 @@ table_with_metadata reader_impl::read(datasource* source, std::vector h_global_dict_data(dictionary_data_size); size_t dict_pos = 0; for (size_t i = 0; i < column_types.size(); ++i) { - auto const col_idx = selected_columns[i].first; - auto const& col_schema = _metadata->schema[_metadata->columns[col_idx].schema_data_idx]; + auto const col_idx = selected_columns[i].first; + auto const& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; auto const col_dict_entries = &(h_global_dict[dict[i].first]); for (size_t j = 0; j < dict[i].second; j++) { auto const& symbols = col_schema.symbols[j]; @@ -443,11 +444,18 @@ table_with_metadata reader_impl::read(datasource* source, std::vector out_buffers; for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; - bool is_nullable = (_metadata->columns[col_idx].schema_null_idx >= 0); + bool is_nullable = (meta->columns[col_idx].schema_null_idx >= 0); out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); } - decode_data(block_data, dict, d_global_dict, num_rows, selected_columns, out_buffers, stream); + decode_data(meta.get(), + block_data, + dict, + d_global_dict, + num_rows, + selected_columns, + out_buffers, + stream); for (size_t i = 0; i < column_types.size(); ++i) { out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); @@ -466,7 +474,7 @@ table_with_metadata reader_impl::read(datasource* source, metadata_out.column_names[i] = selected_columns[i].second; } // Return user metadata - metadata_out.user_data = _metadata->user_data; + metadata_out.user_data = meta->user_data; return {std::make_unique
(std::move(out_columns)), std::move(metadata_out)}; } diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index aac60c8a7e5..bad3095f28f 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -79,6 +79,7 @@ class reader_impl { * @return Device buffer to decompressed block data */ rmm::device_buffer decompress_data(datasource* source, + metadata* metadata, rmm::device_buffer const& comp_block_data, rmm::cuda_stream_view stream); @@ -91,16 +92,14 @@ class reader_impl { * @param out_buffers Output columns' device buffers * @param stream CUDA stream used for device memory operations and kernel launches. */ - void decode_data(const rmm::device_buffer& block_data, + void decode_data(metadata* metadata, + const rmm::device_buffer& block_data, const std::vector>& dict, cudf::device_span global_dictionary, size_t num_rows, std::vector> columns, std::vector& out_buffers, rmm::cuda_stream_view stream); - - private: - std::unique_ptr _metadata; }; } // namespace avro From 4188c2b3468b2dee6650a49060a73aa09e7fe66d Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 14:15:13 -0500 Subject: [PATCH 11/21] delete unncessarry avro::reader_impl class --- cpp/src/io/avro/reader_impl.cu | 65 +++++++++---------- cpp/src/io/avro/reader_impl.hpp | 108 -------------------------------- 2 files changed, 31 insertions(+), 142 deletions(-) delete mode 100644 cpp/src/io/avro/reader_impl.hpp diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 22c290242e5..4cee6c8751a 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -14,30 +14,37 @@ * limitations under the License. */ -/** - * @file reader_impl.cu - * @brief cuDF-IO Avro reader class implementation - */ - -#include "reader_impl.hpp" +#include "avro.h" +#include "avro_gpu.h" #include +#include +#include #include +#include +#include #include #include +#include #include #include #include #include +#include +#include +#include +#include + using cudf::device_span; namespace cudf { namespace io { namespace detail { namespace avro { + // Import functionality that's independent of legacy code using namespace cudf::io::avro; using namespace cudf::io; @@ -138,10 +145,10 @@ class metadata : public file_metadata { datasource* const source; }; -rmm::device_buffer reader_impl::decompress_data(datasource* source, - metadata* meta, - rmm::device_buffer const& comp_block_data, - rmm::cuda_stream_view stream) +rmm::device_buffer decompress_data(datasource* source, + metadata* meta, + rmm::device_buffer const& comp_block_data, + rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; hostdevice_vector inflate_in(meta->block_list.size()); @@ -237,15 +244,14 @@ rmm::device_buffer reader_impl::decompress_data(datasource* source, return decomp_block_data; } -void reader_impl::decode_data(metadata* meta, - const rmm::device_buffer& block_data, - - const std::vector>& dict, - device_span global_dictionary, - size_t num_rows, - std::vector> selection, - std::vector& out_buffers, - rmm::cuda_stream_view stream) +void decode_data(metadata* meta, + const rmm::device_buffer& block_data, + const std::vector>& dict, + device_span global_dictionary, + size_t num_rows, + std::vector> selection, + std::vector& out_buffers, + rmm::cuda_stream_view stream) { // Build gpu schema hostdevice_vector schema_desc(meta->schema.size()); @@ -337,10 +343,10 @@ void reader_impl::decode_data(metadata* meta, } } -table_with_metadata reader_impl::read(datasource* source, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +table_with_metadata read_avro(std::unique_ptr&& source, + avro_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { auto skip_rows = options.get_skip_rows(); auto num_rows = options.get_num_rows(); @@ -349,7 +355,7 @@ table_with_metadata reader_impl::read(datasource* source, table_metadata metadata_out; // Open the source Avro dataset metadata - auto meta = std::make_unique(source); + auto meta = std::make_unique(source.get()); // Select and read partial metadata / schema within the subset of rows meta->init_and_select_rows(skip_rows, num_rows); @@ -382,7 +388,7 @@ table_with_metadata reader_impl::read(datasource* source, } if (meta->codec != "" && meta->codec != "null") { - auto decomp_block_data = decompress_data(source, meta.get(), block_data, stream); + auto decomp_block_data = decompress_data(source.get(), meta.get(), block_data, stream); block_data = std::move(decomp_block_data); } else { auto dst_ofs = meta->block_list[0].offset; @@ -479,15 +485,6 @@ table_with_metadata reader_impl::read(datasource* source, return {std::make_unique
(std::move(out_columns)), std::move(metadata_out)}; } -// Forward to implementation -table_with_metadata read_avro(std::unique_ptr&& source, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - return reader_impl().read(source.get(), options, stream, mr); -} - } // namespace avro } // namespace detail } // namespace io diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp deleted file mode 100644 index bad3095f28f..00000000000 --- a/cpp/src/io/avro/reader_impl.hpp +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * @file reader_impl.hpp - * @brief cuDF-IO Avro reader class implementation header - */ - -#pragma once - -#include "avro.h" -#include "avro_gpu.h" - -#include -#include -#include - -#include -#include - -#include - -#include -#include -#include -#include - -namespace cudf { -namespace io { -namespace detail { -namespace avro { -using namespace cudf::io::avro; -using namespace cudf::io; - -// Forward declarations -class metadata; - -/** - * @brief Implementation for Avro reader - */ -class reader_impl { - public: - /** - * @brief Read an entire set or a subset of data and returns a set of columns - * - * @param source Dataset source - * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource to use for device memory allocation - * - * @return The set of columns along with metadata - */ - table_with_metadata read(datasource* source, - avro_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); - - private: - /** - * @brief Decompresses the block data. - * - * @param source Dataset source - * @param comp_block_data Compressed block data - * @param stream CUDA stream used for device memory operations and kernel launches. - * - * @return Device buffer to decompressed block data - */ - rmm::device_buffer decompress_data(datasource* source, - metadata* metadata, - rmm::device_buffer const& comp_block_data, - rmm::cuda_stream_view stream); - - /** - * @brief Convert the avro row-based block data and outputs to columns - * - * @param block_data Uncompressed block data - * @param dict Dictionary entries - * @param global_dictionary Dictionary allocation - * @param out_buffers Output columns' device buffers - * @param stream CUDA stream used for device memory operations and kernel launches. - */ - void decode_data(metadata* metadata, - const rmm::device_buffer& block_data, - const std::vector>& dict, - cudf::device_span global_dictionary, - size_t num_rows, - std::vector> columns, - std::vector& out_buffers, - rmm::cuda_stream_view stream); -}; - -} // namespace avro -} // namespace detail -} // namespace io -} // namespace cudf From cdbdb5ecdcc38edc52d61857f44f7ee54d6d6af8 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 21 Aug 2021 15:25:14 -0500 Subject: [PATCH 12/21] read_avro: replace explicit cudamemcpyasync with uvector factor calls --- cpp/src/io/avro/reader_impl.cu | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 4cee6c8751a..a7579e16e35 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -399,7 +400,9 @@ table_with_metadata read_avro(std::unique_ptr&& source, size_t total_dictionary_entries = 0; size_t dictionary_data_size = 0; - std::vector> dict(column_types.size()); + + auto dict = std::vector>(column_types.size()); + for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; auto& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; @@ -411,12 +414,14 @@ table_with_metadata read_avro(std::unique_ptr&& source, } } - rmm::device_uvector d_global_dict(total_dictionary_entries, stream); - rmm::device_uvector d_global_dict_data(dictionary_data_size, stream); + auto d_global_dict = rmm::device_uvector(0, stream); + auto d_global_dict_data = rmm::device_uvector(0, stream); + if (total_dictionary_entries > 0) { - std::vector h_global_dict(total_dictionary_entries); - std::vector h_global_dict_data(dictionary_data_size); - size_t dict_pos = 0; + auto h_global_dict = std::vector(total_dictionary_entries); + auto h_global_dict_data = std::vector(dictionary_data_size); + size_t dict_pos = 0; + for (size_t i = 0; i < column_types.size(); ++i) { auto const col_idx = selected_columns[i].first; auto const& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; @@ -434,16 +439,9 @@ table_with_metadata read_avro(std::unique_ptr&& source, } } - CUDA_TRY(cudaMemcpyAsync(d_global_dict.data(), - h_global_dict.data(), - h_global_dict.size() * sizeof(string_index_pair), - cudaMemcpyDefault, - stream.value())); - CUDA_TRY(cudaMemcpyAsync(d_global_dict_data.data(), - h_global_dict_data.data(), - h_global_dict_data.size() * sizeof(char), - cudaMemcpyDefault, - stream.value())); + d_global_dict = cudf::detail::make_device_uvector_async(h_global_dict, stream); + d_global_dict_data = cudf::detail::make_device_uvector_async(h_global_dict_data, stream); + stream.synchronize(); } From 85b84bddd8cb7d0874799f9512de8958fb742570 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 00:27:23 -0500 Subject: [PATCH 13/21] read_avro: treat source and metadata as refernces, not raw pointers --- cpp/src/io/avro/reader_impl.cu | 136 ++++++++++++++++----------------- 1 file changed, 65 insertions(+), 71 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index a7579e16e35..728a938c1ef 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -146,26 +146,26 @@ class metadata : public file_metadata { datasource* const source; }; -rmm::device_buffer decompress_data(datasource* source, - metadata* meta, +rmm::device_buffer decompress_data(datasource& source, + metadata& meta, rmm::device_buffer const& comp_block_data, rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; - hostdevice_vector inflate_in(meta->block_list.size()); - hostdevice_vector inflate_out(meta->block_list.size()); + hostdevice_vector inflate_in(meta.block_list.size()); + hostdevice_vector inflate_out(meta.block_list.size()); - if (meta->codec == "deflate") { + if (meta.codec == "deflate") { // Guess an initial maximum uncompressed block size - uint32_t initial_blk_len = (meta->max_block_size * 2 + 0xfff) & ~0xfff; - uncompressed_data_size = initial_blk_len * meta->block_list.size(); + uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff; + uncompressed_data_size = initial_blk_len * meta.block_list.size(); for (size_t i = 0; i < inflate_in.size(); ++i) { inflate_in[i].dstSize = initial_blk_len; } - } else if (meta->codec == "snappy") { + } else if (meta.codec == "snappy") { // Extract the uncompressed length from the snappy stream - for (size_t i = 0; i < meta->block_list.size(); i++) { - const auto buffer = source->host_read(meta->block_list[i].offset, 4); + for (size_t i = 0; i < meta.block_list.size(); i++) { + const auto buffer = source.host_read(meta.block_list[i].offset, 4); const uint8_t* blk = buffer->data(); uint32_t blk_len = blk[0]; if (blk_len > 0x7f) { @@ -184,28 +184,28 @@ rmm::device_buffer decompress_data(datasource* source, rmm::device_buffer decomp_block_data(uncompressed_data_size, stream); - const auto base_offset = meta->block_list[0].offset; - for (size_t i = 0, dst_pos = 0; i < meta->block_list.size(); i++) { - const auto src_pos = meta->block_list[i].offset - base_offset; + const auto base_offset = meta.block_list[0].offset; + for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { + const auto src_pos = meta.block_list[i].offset - base_offset; inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; - inflate_in[i].srcSize = meta->block_list[i].size; + inflate_in[i].srcSize = meta.block_list[i].size; inflate_in[i].dstDevice = static_cast(decomp_block_data.data()) + dst_pos; // Update blocks offsets & sizes to refer to uncompressed data - meta->block_list[i].offset = dst_pos; - meta->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += meta->block_list[i].size; + meta.block_list[i].offset = dst_pos; + meta.block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta.block_list[i].size; } for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { inflate_in.host_to_device(stream); CUDA_TRY( cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value())); - if (meta->codec == "deflate") { + if (meta.codec == "deflate") { CUDA_TRY(gpuinflate( inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream)); - } else if (meta->codec == "snappy") { + } else if (meta.codec == "snappy") { CUDA_TRY( gpu_unsnap(inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), stream)); } else { @@ -214,9 +214,9 @@ rmm::device_buffer decompress_data(datasource* source, inflate_out.device_to_host(stream, true); // Check if larger output is required, as it's not known ahead of time - if (meta->codec == "deflate" && !loop_cnt) { + if (meta.codec == "deflate" && !loop_cnt) { size_t actual_uncompressed_size = 0; - for (size_t i = 0; i < meta->block_list.size(); i++) { + for (size_t i = 0; i < meta.block_list.size(); i++) { // If error status is 1 (buffer too small), the `bytes_written` field // is actually contains the uncompressed data size if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) { @@ -226,13 +226,13 @@ rmm::device_buffer decompress_data(datasource* source, } if (actual_uncompressed_size > uncompressed_data_size) { decomp_block_data.resize(actual_uncompressed_size, stream); - for (size_t i = 0, dst_pos = 0; i < meta->block_list.size(); i++) { + for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { auto dst_base = static_cast(decomp_block_data.data()); inflate_in[i].dstDevice = dst_base + dst_pos; - meta->block_list[i].offset = dst_pos; - meta->block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += meta->block_list[i].size; + meta.block_list[i].offset = dst_pos; + meta.block_list[i].size = static_cast(inflate_in[i].dstSize); + dst_pos += meta.block_list[i].size; } } else { break; @@ -245,7 +245,7 @@ rmm::device_buffer decompress_data(datasource* source, return decomp_block_data; } -void decode_data(metadata* meta, +void decode_data(metadata& meta, const rmm::device_buffer& block_data, const std::vector>& dict, device_span global_dictionary, @@ -255,19 +255,19 @@ void decode_data(metadata* meta, rmm::cuda_stream_view stream) { // Build gpu schema - hostdevice_vector schema_desc(meta->schema.size()); + hostdevice_vector schema_desc(meta.schema.size()); uint32_t min_row_data_size = 0; int skip_field_cnt = 0; - for (size_t i = 0; i < meta->schema.size(); i++) { - type_kind_e kind = meta->schema[i].kind; + for (size_t i = 0; i < meta.schema.size(); i++) { + type_kind_e kind = meta.schema[i].kind; if (skip_field_cnt != 0) { // Exclude union and array members from min_row_data_size - skip_field_cnt += meta->schema[i].num_children - 1; + skip_field_cnt += meta.schema[i].num_children - 1; } else { switch (kind) { case type_union: case type_array: - skip_field_cnt = meta->schema[i].num_children; + skip_field_cnt = meta.schema[i].num_children; // fall through case type_boolean: case type_int: @@ -280,20 +280,20 @@ void decode_data(metadata* meta, default: break; } } - if (kind == type_enum && !meta->schema[i].symbols.size()) { kind = type_int; } + if (kind == type_enum && !meta.schema[i].symbols.size()) { kind = type_int; } schema_desc[i].kind = kind; - schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)meta->schema[i].num_children; + schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)meta.schema[i].num_children; schema_desc[i].dataptr = nullptr; - CUDF_EXPECTS(kind != type_union || meta->schema[i].num_children < 2 || - (meta->schema[i].num_children == 2 && (meta->schema[i + 1].kind == type_null || - meta->schema[i + 2].kind == type_null)), + CUDF_EXPECTS(kind != type_union || meta.schema[i].num_children < 2 || + (meta.schema[i].num_children == 2 && + (meta.schema[i + 1].kind == type_null || meta.schema[i + 2].kind == type_null)), "Union with non-null type not currently supported"); } std::vector valid_alias(out_buffers.size(), nullptr); for (size_t i = 0; i < out_buffers.size(); i++) { const auto col_idx = selection[i].first; - int schema_data_idx = meta->columns[col_idx].schema_data_idx; - int schema_null_idx = meta->columns[col_idx].schema_null_idx; + int schema_data_idx = meta.columns[col_idx].schema_data_idx; + int schema_null_idx = meta.columns[col_idx].schema_null_idx; schema_desc[schema_data_idx].dataptr = out_buffers[i].data(); if (schema_null_idx >= 0) { @@ -303,7 +303,7 @@ void decode_data(metadata* meta, valid_alias[i] = schema_desc[schema_null_idx].dataptr; } } - if (meta->schema[schema_data_idx].kind == type_enum) { + if (meta.schema[schema_data_idx].kind == type_enum) { schema_desc[schema_data_idx].count = dict[i].first; } if (out_buffers[i].null_mask_size()) { @@ -311,17 +311,17 @@ void decode_data(metadata* meta, } } rmm::device_buffer block_list( - meta->block_list.data(), meta->block_list.size() * sizeof(block_desc_s), stream); + meta.block_list.data(), meta.block_list.size() * sizeof(block_desc_s), stream); schema_desc.host_to_device(stream); gpu::DecodeAvroColumnData(static_cast(block_list.data()), schema_desc.device_ptr(), global_dictionary, static_cast(block_data.data()), - static_cast(meta->block_list.size()), + static_cast(meta.block_list.size()), static_cast(schema_desc.size()), - meta->num_rows, - meta->skip_rows, + meta.num_rows, + meta.skip_rows, min_row_data_size, stream); @@ -339,7 +339,7 @@ void decode_data(metadata* meta, for (size_t i = 0; i < out_buffers.size(); i++) { const auto col_idx = selection[i].first; - const auto schema_null_idx = meta->columns[col_idx].schema_null_idx; + const auto schema_null_idx = meta.columns[col_idx].schema_null_idx; out_buffers[i].null_count() = (schema_null_idx >= 0) ? schema_desc[schema_null_idx].count : 0; } } @@ -356,45 +356,45 @@ table_with_metadata read_avro(std::unique_ptr&& source, table_metadata metadata_out; // Open the source Avro dataset metadata - auto meta = std::make_unique(source.get()); + auto meta = metadata(source.get()); // Select and read partial metadata / schema within the subset of rows - meta->init_and_select_rows(skip_rows, num_rows); + meta.init_and_select_rows(skip_rows, num_rows); // Select only columns required by the options - auto selected_columns = meta->select_columns(options.get_columns()); + auto selected_columns = meta.select_columns(options.get_columns()); if (selected_columns.size() != 0) { // Get a list of column data types std::vector column_types; for (const auto& col : selected_columns) { - auto& col_schema = meta->schema[meta->columns[col.first].schema_data_idx]; + auto& col_schema = meta.schema[meta.columns[col.first].schema_data_idx]; auto col_type = to_type_id(&col_schema); CUDF_EXPECTS(col_type != type_id::EMPTY, "Unknown type"); column_types.emplace_back(col_type); } - if (meta->total_data_size > 0) { + if (meta.total_data_size > 0) { rmm::device_buffer block_data; - if (source->is_device_read_preferred(meta->total_data_size)) { - block_data = rmm::device_buffer{meta->total_data_size, stream}; - auto read_bytes = source->device_read(meta->block_list[0].offset, - meta->total_data_size, + if (source->is_device_read_preferred(meta.total_data_size)) { + block_data = rmm::device_buffer{meta.total_data_size, stream}; + auto read_bytes = source->device_read(meta.block_list[0].offset, + meta.total_data_size, static_cast(block_data.data()), stream); block_data.resize(read_bytes, stream); } else { - const auto buffer = source->host_read(meta->block_list[0].offset, meta->total_data_size); + const auto buffer = source->host_read(meta.block_list[0].offset, meta.total_data_size); block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; } - if (meta->codec != "" && meta->codec != "null") { - auto decomp_block_data = decompress_data(source.get(), meta.get(), block_data, stream); + if (meta.codec != "" && meta.codec != "null") { + auto decomp_block_data = decompress_data(*source, meta, block_data, stream); block_data = std::move(decomp_block_data); } else { - auto dst_ofs = meta->block_list[0].offset; - for (size_t i = 0; i < meta->block_list.size(); i++) { - meta->block_list[i].offset -= dst_ofs; + auto dst_ofs = meta.block_list[0].offset; + for (size_t i = 0; i < meta.block_list.size(); i++) { + meta.block_list[i].offset -= dst_ofs; } } @@ -405,7 +405,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; - auto& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; + auto& col_schema = meta.schema[meta.columns[col_idx].schema_data_idx]; dict[i].first = static_cast(total_dictionary_entries); dict[i].second = static_cast(col_schema.symbols.size()); total_dictionary_entries += dict[i].second; @@ -424,7 +424,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, for (size_t i = 0; i < column_types.size(); ++i) { auto const col_idx = selected_columns[i].first; - auto const& col_schema = meta->schema[meta->columns[col_idx].schema_data_idx]; + auto const& col_schema = meta.schema[meta.columns[col_idx].schema_data_idx]; auto const col_dict_entries = &(h_global_dict[dict[i].first]); for (size_t j = 0; j < dict[i].second; j++) { auto const& symbols = col_schema.symbols[j]; @@ -448,18 +448,12 @@ table_with_metadata read_avro(std::unique_ptr&& source, std::vector out_buffers; for (size_t i = 0; i < column_types.size(); ++i) { auto col_idx = selected_columns[i].first; - bool is_nullable = (meta->columns[col_idx].schema_null_idx >= 0); + bool is_nullable = (meta.columns[col_idx].schema_null_idx >= 0); out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); } - decode_data(meta.get(), - block_data, - dict, - d_global_dict, - num_rows, - selected_columns, - out_buffers, - stream); + decode_data( + meta, block_data, dict, d_global_dict, num_rows, selected_columns, out_buffers, stream); for (size_t i = 0; i < column_types.size(); ++i) { out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); @@ -478,7 +472,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, metadata_out.column_names[i] = selected_columns[i].second; } // Return user metadata - metadata_out.user_data = meta->user_data; + metadata_out.user_data = meta.user_data; return {std::make_unique
(std::move(out_columns)), std::move(metadata_out)}; } From 1d0b01d99c49c5745a5fba0d48450231ca69e40f Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 00:37:21 -0500 Subject: [PATCH 14/21] enforce constness on avro_decode_row dictionary argument --- cpp/src/io/avro/avro_gpu.cu | 21 +++++++++++---------- cpp/src/io/avro/avro_gpu.h | 2 +- cpp/src/io/avro/reader_impl.cu | 8 ++++---- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 6fabcf00b8f..07cc202c73d 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -65,14 +65,15 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur, * * @return data pointer at the end of the row (start of next row) */ -static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema, - schemadesc_s* schema_g, - uint32_t schema_len, - size_t row, - size_t max_rows, - const uint8_t* cur, - const uint8_t* end, - device_span global_dictionary) +static const uint8_t* __device__ +avro_decode_row(const schemadesc_s* schema, + schemadesc_s* schema_g, + uint32_t schema_len, + size_t row, + size_t max_rows, + const uint8_t* cur, + const uint8_t* end, + device_span global_dictionary) { uint32_t array_start = 0, array_repeat_count = 0; int array_children = 0; @@ -230,7 +231,7 @@ static const uint8_t* __device__ avro_decode_row(const schemadesc_s* schema, extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) gpuDecodeAvroColumnData(block_desc_s* blocks, schemadesc_s* schema_g, - device_span global_dictionary, + device_span global_dictionary, const uint8_t* avro_data, uint32_t num_blocks, uint32_t schema_len, @@ -313,7 +314,7 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) */ void DecodeAvroColumnData(block_desc_s* blocks, schemadesc_s* schema, - device_span global_dictionary, + device_span global_dictionary, const uint8_t* avro_data, uint32_t num_blocks, uint32_t schema_len, diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index a895d1bea02..85f446e228a 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -52,7 +52,7 @@ struct schemadesc_s { */ void DecodeAvroColumnData(block_desc_s* blocks, schemadesc_s* schema, - cudf::device_span global_dictionary, + cudf::device_span global_dictionary, const uint8_t* avro_data, uint32_t num_blocks, uint32_t schema_len, diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 728a938c1ef..c29ddd7086e 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -246,11 +246,11 @@ rmm::device_buffer decompress_data(datasource& source, } void decode_data(metadata& meta, - const rmm::device_buffer& block_data, - const std::vector>& dict, - device_span global_dictionary, + rmm::device_buffer const& block_data, + std::vector> const& dict, + device_span global_dictionary, size_t num_rows, - std::vector> selection, + std::vector> const& selection, std::vector& out_buffers, rmm::cuda_stream_view stream) { From cf9eecf02d336dff20db006666fdae238e98eee4 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 00:38:24 -0500 Subject: [PATCH 15/21] adjust copyright in avro.hpp --- cpp/include/cudf/io/detail/avro.hpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cpp/include/cudf/io/detail/avro.hpp b/cpp/include/cudf/io/detail/avro.hpp index 2d30b3df8e0..62d97081b75 100644 --- a/cpp/include/cudf/io/detail/avro.hpp +++ b/cpp/include/cudf/io/detail/avro.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,6 @@ * limitations under the License. */ -/** - * @file avro.hpp - * @brief cuDF-IO reader classes API - */ - #pragma once #include From 5ceee8ebf44ae2469f59d3d5304e94123226c43e Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 00:42:30 -0500 Subject: [PATCH 16/21] read_avro: return out_buffers rather than mutating in function --- cpp/src/io/avro/reader_impl.cu | 45 +++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index c29ddd7086e..f75ad49d4b4 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -245,15 +245,24 @@ rmm::device_buffer decompress_data(datasource& source, return decomp_block_data; } -void decode_data(metadata& meta, - rmm::device_buffer const& block_data, - std::vector> const& dict, - device_span global_dictionary, - size_t num_rows, - std::vector> const& selection, - std::vector& out_buffers, - rmm::cuda_stream_view stream) +std::vector decode_data(metadata& meta, + rmm::device_buffer const& block_data, + std::vector> const& dict, + device_span global_dictionary, + size_t num_rows, + std::vector> const& selection, + std::vector const& column_types, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { + auto out_buffers = std::vector(); + + for (size_t i = 0; i < column_types.size(); ++i) { + auto col_idx = selection[i].first; + bool is_nullable = (meta.columns[col_idx].schema_null_idx >= 0); + out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); + } + // Build gpu schema hostdevice_vector schema_desc(meta.schema.size()); uint32_t min_row_data_size = 0; @@ -342,6 +351,8 @@ void decode_data(metadata& meta, const auto schema_null_idx = meta.columns[col_idx].schema_null_idx; out_buffers[i].null_count() = (schema_null_idx >= 0) ? schema_desc[schema_null_idx].count : 0; } + + return out_buffers; } table_with_metadata read_avro(std::unique_ptr&& source, @@ -445,15 +456,15 @@ table_with_metadata read_avro(std::unique_ptr&& source, stream.synchronize(); } - std::vector out_buffers; - for (size_t i = 0; i < column_types.size(); ++i) { - auto col_idx = selected_columns[i].first; - bool is_nullable = (meta.columns[col_idx].schema_null_idx >= 0); - out_buffers.emplace_back(column_types[i], num_rows, is_nullable, stream, mr); - } - - decode_data( - meta, block_data, dict, d_global_dict, num_rows, selected_columns, out_buffers, stream); + auto out_buffers = decode_data(meta, + block_data, + dict, + d_global_dict, + num_rows, + selected_columns, + column_types, + stream, + mr); for (size_t i = 0; i < column_types.size(); ++i) { out_columns.emplace_back(make_column(out_buffers[i], nullptr, stream, mr)); From 4a2cfcc2709929460e7f6451d7239d2afbf6ff34 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 00:50:33 -0500 Subject: [PATCH 17/21] use device_span for block list in DecodeAvroColumnData --- cpp/src/io/avro/avro_gpu.cu | 25 +++++++------------------ cpp/src/io/avro/avro_gpu.h | 4 +--- cpp/src/io/avro/reader_impl.cu | 8 ++++---- 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 07cc202c73d..6d6271a2515 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -221,7 +221,6 @@ avro_decode_row(const schemadesc_s* schema, * @param[in] schema Schema description * @param[in] global_Dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] min_row_size Minimum size in bytes of a row * @param[in] max_rows Maximum number of rows to load @@ -229,11 +228,10 @@ avro_decode_row(const schemadesc_s* schema, */ // blockDim {32,num_warps,1} extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) - gpuDecodeAvroColumnData(block_desc_s* blocks, + gpuDecodeAvroColumnData(device_span blocks, schemadesc_s* schema_g, device_span global_dictionary, const uint8_t* avro_data, - uint32_t num_blocks, uint32_t schema_len, uint32_t min_row_size, size_t max_rows, @@ -259,9 +257,9 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) } else { schema = schema_g; } - if (block_id < num_blocks and threadIdx.x == 0) { *blk = blocks[block_id]; } + if (block_id < blocks.size() and threadIdx.x == 0) { *blk = blocks[block_id]; } __syncthreads(); - if (block_id >= num_blocks) { return; } + if (block_id >= blocks.size()) { return; } cur_row = blk->first_row; rows_remaining = blk->num_rows; cur = avro_data + blk->offset; @@ -305,18 +303,16 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) * @param[in] schema Schema description * @param[in] global_dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row * @param[in] min_row_size Minimum size in bytes of a row * @param[in] stream CUDA stream to use, default 0 */ -void DecodeAvroColumnData(block_desc_s* blocks, +void DecodeAvroColumnData(device_span blocks, schemadesc_s* schema, device_span global_dictionary, const uint8_t* avro_data, - uint32_t num_blocks, uint32_t schema_len, size_t max_rows, size_t first_row, @@ -326,17 +322,10 @@ void DecodeAvroColumnData(block_desc_s* blocks, // num_warps warps per threadblock dim3 const dim_block(32, num_warps); // 1 warp per datablock, num_warps datablocks per threadblock - dim3 const dim_grid((num_blocks + num_warps - 1) / num_warps, 1); + dim3 const dim_grid((blocks.size() + num_warps - 1) / num_warps, 1); - gpuDecodeAvroColumnData<<>>(blocks, - schema, - global_dictionary, - avro_data, - num_blocks, - schema_len, - min_row_size, - max_rows, - first_row); + gpuDecodeAvroColumnData<<>>( + blocks, schema, global_dictionary, avro_data, schema_len, min_row_size, max_rows, first_row); } } // namespace gpu diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index 85f446e228a..c7dcb0bb4c4 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -43,18 +43,16 @@ struct schemadesc_s { * @param[in] schema Schema description * @param[in] global_dictionary Global dictionary entries * @param[in] avro_data Raw block data - * @param[in] num_blocks Number of blocks * @param[in] schema_len Number of entries in schema * @param[in] max_rows Maximum number of rows to load * @param[in] first_row Crop all rows below first_row * @param[in] min_row_size Minimum size in bytes of a row * @param[in] stream CUDA stream to use, default 0 */ -void DecodeAvroColumnData(block_desc_s* blocks, +void DecodeAvroColumnData(cudf::device_span blocks, schemadesc_s* schema, cudf::device_span global_dictionary, const uint8_t* avro_data, - uint32_t num_blocks, uint32_t schema_len, size_t max_rows = ~0, size_t first_row = 0, diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index f75ad49d4b4..0a92b59c26f 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -319,15 +319,15 @@ std::vector decode_data(metadata& meta, cudf::detail::set_null_mask(out_buffers[i].null_mask(), 0, num_rows, true, stream); } } - rmm::device_buffer block_list( - meta.block_list.data(), meta.block_list.size() * sizeof(block_desc_s), stream); + + auto block_list = cudf::detail::make_device_uvector_async(meta.block_list, stream); + schema_desc.host_to_device(stream); - gpu::DecodeAvroColumnData(static_cast(block_list.data()), + gpu::DecodeAvroColumnData(block_list, schema_desc.device_ptr(), global_dictionary, static_cast(block_data.data()), - static_cast(meta.block_list.size()), static_cast(schema_desc.size()), meta.num_rows, meta.skip_rows, From 619a87f138d3fca952e5fe3c7ec68ac79f99953a Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 01:05:40 -0500 Subject: [PATCH 18/21] read_avro: style changes --- cpp/src/io/avro/reader_impl.cu | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 0a92b59c26f..8f431b3087d 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -152,8 +152,9 @@ rmm::device_buffer decompress_data(datasource& source, rmm::cuda_stream_view stream) { size_t uncompressed_data_size = 0; - hostdevice_vector inflate_in(meta.block_list.size()); - hostdevice_vector inflate_out(meta.block_list.size()); + + auto inflate_in = hostdevice_vector(meta.block_list.size()); + auto inflate_out = hostdevice_vector(meta.block_list.size()); if (meta.codec == "deflate") { // Guess an initial maximum uncompressed block size @@ -264,9 +265,11 @@ std::vector decode_data(metadata& meta, } // Build gpu schema - hostdevice_vector schema_desc(meta.schema.size()); + auto schema_desc = hostdevice_vector(meta.schema.size()); + uint32_t min_row_data_size = 0; int skip_field_cnt = 0; + for (size_t i = 0; i < meta.schema.size(); i++) { type_kind_e kind = meta.schema[i].kind; if (skip_field_cnt != 0) { @@ -290,8 +293,9 @@ std::vector decode_data(metadata& meta, } } if (kind == type_enum && !meta.schema[i].symbols.size()) { kind = type_int; } - schema_desc[i].kind = kind; - schema_desc[i].count = (kind == type_enum) ? 0 : (uint32_t)meta.schema[i].num_children; + schema_desc[i].kind = kind; + schema_desc[i].count = + (kind == type_enum) ? 0 : static_cast(meta.schema[i].num_children); schema_desc[i].dataptr = nullptr; CUDF_EXPECTS(kind != type_union || meta.schema[i].num_children < 2 || (meta.schema[i].num_children == 2 && From 247f2641592d980d2512704b771a590c1bdcf295 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 22:09:02 -0500 Subject: [PATCH 19/21] read_avro: update const positions --- cpp/src/io/avro/avro_gpu.cu | 6 +++--- cpp/src/io/avro/reader_impl.cu | 32 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 6d6271a2515..9b6aa24bb18 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -66,13 +66,13 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur, * @return data pointer at the end of the row (start of next row) */ static const uint8_t* __device__ -avro_decode_row(const schemadesc_s* schema, +avro_decode_row(schemadesc_s const* schema, schemadesc_s* schema_g, uint32_t schema_len, size_t row, size_t max_rows, - const uint8_t* cur, - const uint8_t* end, + uint8_t const* cur, + uint8_t const* end, device_span global_dictionary) { uint32_t array_start = 0, array_repeat_count = 0; diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 8f431b3087d..aa3bab2d877 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -54,7 +54,7 @@ namespace { /** * @brief Function that translates Avro data kind to cuDF type enum */ -type_id to_type_id(const avro::schema_entry* col) +type_id to_type_id(avro::schema_entry const* col) { switch (col->kind) { case avro::type_boolean: return type_id::BOOL8; @@ -87,7 +87,7 @@ class metadata : public file_metadata { */ void init_and_select_rows(int& row_start, int& row_count) { - const auto buffer = source->host_read(0, source->size()); + auto const buffer = source->host_read(0, source->size()); avro::container pod(buffer->data(), buffer->size()); CUDF_EXPECTS(pod.parse(this, row_count, row_start), "Cannot parse metadata"); row_start = skip_rows; @@ -105,10 +105,10 @@ class metadata : public file_metadata { { std::vector> selection; - const auto num_avro_columns = static_cast(columns.size()); + auto const num_avro_columns = static_cast(columns.size()); if (!use_names.empty()) { int index = 0; - for (const auto& use_name : use_names) { + for (auto const& use_name : use_names) { for (int i = 0; i < num_avro_columns; ++i, ++index) { if (index >= num_avro_columns) { index = 0; } if (columns[index].name == use_name && @@ -166,8 +166,8 @@ rmm::device_buffer decompress_data(datasource& source, } else if (meta.codec == "snappy") { // Extract the uncompressed length from the snappy stream for (size_t i = 0; i < meta.block_list.size(); i++) { - const auto buffer = source.host_read(meta.block_list[i].offset, 4); - const uint8_t* blk = buffer->data(); + auto const buffer = source.host_read(meta.block_list[i].offset, 4); + uint8_t const* blk = buffer->data(); uint32_t blk_len = blk[0]; if (blk_len > 0x7f) { blk_len = (blk_len & 0x7f) | (blk[1] << 7); @@ -185,11 +185,11 @@ rmm::device_buffer decompress_data(datasource& source, rmm::device_buffer decomp_block_data(uncompressed_data_size, stream); - const auto base_offset = meta.block_list[0].offset; + auto const base_offset = meta.block_list[0].offset; for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { - const auto src_pos = meta.block_list[i].offset - base_offset; + auto const src_pos = meta.block_list[i].offset - base_offset; - inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; + inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; inflate_in[i].srcSize = meta.block_list[i].size; inflate_in[i].dstDevice = static_cast(decomp_block_data.data()) + dst_pos; @@ -304,7 +304,7 @@ std::vector decode_data(metadata& meta, } std::vector valid_alias(out_buffers.size(), nullptr); for (size_t i = 0; i < out_buffers.size(); i++) { - const auto col_idx = selection[i].first; + auto const col_idx = selection[i].first; int schema_data_idx = meta.columns[col_idx].schema_data_idx; int schema_null_idx = meta.columns[col_idx].schema_null_idx; @@ -331,7 +331,7 @@ std::vector decode_data(metadata& meta, gpu::DecodeAvroColumnData(block_list, schema_desc.device_ptr(), global_dictionary, - static_cast(block_data.data()), + static_cast(block_data.data()), static_cast(schema_desc.size()), meta.num_rows, meta.skip_rows, @@ -351,8 +351,8 @@ std::vector decode_data(metadata& meta, schema_desc.device_to_host(stream, true); for (size_t i = 0; i < out_buffers.size(); i++) { - const auto col_idx = selection[i].first; - const auto schema_null_idx = meta.columns[col_idx].schema_null_idx; + auto const col_idx = selection[i].first; + auto const schema_null_idx = meta.columns[col_idx].schema_null_idx; out_buffers[i].null_count() = (schema_null_idx >= 0) ? schema_desc[schema_null_idx].count : 0; } @@ -381,7 +381,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, if (selected_columns.size() != 0) { // Get a list of column data types std::vector column_types; - for (const auto& col : selected_columns) { + for (auto const& col : selected_columns) { auto& col_schema = meta.schema[meta.columns[col.first].schema_data_idx]; auto col_type = to_type_id(&col_schema); @@ -399,7 +399,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, stream); block_data.resize(read_bytes, stream); } else { - const auto buffer = source->host_read(meta.block_list[0].offset, meta.total_data_size); + auto const buffer = source->host_read(meta.block_list[0].offset, meta.total_data_size); block_data = rmm::device_buffer{buffer->data(), buffer->size(), stream}; } @@ -424,7 +424,7 @@ table_with_metadata read_avro(std::unique_ptr&& source, dict[i].first = static_cast(total_dictionary_entries); dict[i].second = static_cast(col_schema.symbols.size()); total_dictionary_entries += dict[i].second; - for (const auto& sym : col_schema.symbols) { + for (auto const& sym : col_schema.symbols) { dictionary_data_size += sym.length(); } } From 543b25eb20718162e391cd1f05ee1c3bee461583 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 22:26:27 -0500 Subject: [PATCH 20/21] adjust const positions --- cpp/src/io/avro/avro_gpu.cu | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 9b6aa24bb18..cb1c32458a3 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -65,7 +65,7 @@ static inline int64_t __device__ avro_decode_zigzag_varint(const uint8_t*& cur, * * @return data pointer at the end of the row (start of next row) */ -static const uint8_t* __device__ +static uint8_t const* __device__ avro_decode_row(schemadesc_s const* schema, schemadesc_s* schema_g, uint32_t schema_len, @@ -231,7 +231,7 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) gpuDecodeAvroColumnData(device_span blocks, schemadesc_s* schema_g, device_span global_dictionary, - const uint8_t* avro_data, + uint8_t const* avro_data, uint32_t schema_len, uint32_t min_row_size, size_t max_rows, @@ -312,7 +312,7 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) void DecodeAvroColumnData(device_span blocks, schemadesc_s* schema, device_span global_dictionary, - const uint8_t* avro_data, + uint8_t const* avro_data, uint32_t schema_len, size_t max_rows, size_t first_row, From 98ab9c9ead869a04172ba91983a5926d75ec7181 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 25 Aug 2021 22:27:09 -0500 Subject: [PATCH 21/21] adjust const positions --- cpp/src/io/avro/avro_gpu.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index c7dcb0bb4c4..c87ac8afb13 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -52,7 +52,7 @@ struct schemadesc_s { void DecodeAvroColumnData(cudf::device_span blocks, schemadesc_s* schema, cudf::device_span global_dictionary, - const uint8_t* avro_data, + uint8_t const* avro_data, uint32_t schema_len, size_t max_rows = ~0, size_t first_row = 0,