diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 98f6ed40502..2ec22edc491 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -247,6 +247,7 @@ add_library(cudf src/io/parquet/reader_impl.cu src/io/parquet/writer_impl.cu src/io/statistics/column_stats.cu + src/io/utilities/column_buffer.cpp src/io/utilities/data_sink.cpp src/io/utilities/datasource.cpp src/io/utilities/file_io_utilities.cpp diff --git a/cpp/src/io/avro/avro_common.h b/cpp/src/io/avro/avro_common.h index 509eca41e61..3ef36863cd2 100644 --- a/cpp/src/io/avro/avro_common.h +++ b/cpp/src/io/avro/avro_common.h @@ -18,6 +18,7 @@ #include #include +#include namespace cudf { namespace io { @@ -56,6 +57,8 @@ enum type_kind_e { type_array, }; +using cudf::io::detail::string_index_pair; + } // namespace avro } // namespace io } // namespace cudf diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 321f5ee8963..ebd7f51a08a 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -72,7 +72,7 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema, size_t max_rows, const uint8_t *cur, const uint8_t *end, - device_span global_dictionary) + device_span global_dictionary) { uint32_t array_start = 0, array_repeat_count = 0; int array_children = 0; @@ -123,8 +123,8 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema, if (kind == type_enum) { // dictionary size_t idx = schema[i].count + v; if (idx < global_dictionary.size()) { - ptr = global_dictionary[idx].ptr; - count = global_dictionary[idx].count; + ptr = global_dictionary[idx].first; + count = global_dictionary[idx].second; } } else if (v >= 0 && cur + v <= end) { // string ptr = reinterpret_cast(cur); @@ -132,8 +132,8 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema, cur += count; } if (dataptr != nullptr && row < max_rows) { - static_cast(dataptr)[row].ptr = ptr; - static_cast(dataptr)[row].count = count; + static_cast(dataptr)[row].first = ptr; + static_cast(dataptr)[row].second = count; } } } break; @@ -230,7 +230,7 @@ static const uint8_t *__device__ avro_decode_row(const schemadesc_s *schema, extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) gpuDecodeAvroColumnData(block_desc_s *blocks, schemadesc_s *schema_g, - device_span global_dictionary, + device_span global_dictionary, const uint8_t *avro_data, uint32_t num_blocks, uint32_t schema_len, @@ -313,7 +313,7 @@ extern "C" __global__ void __launch_bounds__(num_warps * 32, 2) */ void DecodeAvroColumnData(block_desc_s *blocks, schemadesc_s *schema, - device_span global_dictionary, + device_span global_dictionary, const uint8_t *avro_data, uint32_t num_blocks, uint32_t schema_len, diff --git a/cpp/src/io/avro/avro_gpu.h b/cpp/src/io/avro/avro_gpu.h index 95b6e13d3f6..a82d3604d02 100644 --- a/cpp/src/io/avro/avro_gpu.h +++ b/cpp/src/io/avro/avro_gpu.h @@ -25,13 +25,6 @@ namespace cudf { namespace io { namespace avro { namespace gpu { -/** - * @brief Struct to describe the output of a string datatype - */ -struct nvstrdesc_s { - const char *ptr; - size_t count; -}; /** * @brief Struct to describe the avro schema @@ -59,7 +52,7 @@ struct schemadesc_s { */ void DecodeAvroColumnData(block_desc_s *blocks, schemadesc_s *schema, - cudf::device_span global_dictionary, + cudf::device_span global_dictionary, const uint8_t *avro_data, uint32_t num_blocks, uint32_t schema_len, diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 42035687750..600633f0ed8 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -235,7 +235,7 @@ rmm::device_buffer reader::impl::decompress_data(const rmm::device_buffer &comp_ void reader::impl::decode_data(const rmm::device_buffer &block_data, const std::vector> &dict, - device_span global_dictionary, + device_span global_dictionary, size_t num_rows, std::vector> selection, std::vector &out_buffers, @@ -393,10 +393,10 @@ table_with_metadata reader::impl::read(avro_reader_options const &options, for (const auto &sym : col_schema.symbols) { dictionary_data_size += sym.length(); } } - rmm::device_uvector d_global_dict(total_dictionary_entries, stream); + rmm::device_uvector d_global_dict(total_dictionary_entries, stream); rmm::device_uvector d_global_dict_data(dictionary_data_size, stream); if (total_dictionary_entries > 0) { - std::vector h_global_dict(total_dictionary_entries); + std::vector h_global_dict(total_dictionary_entries); std::vector h_global_dict_data(dictionary_data_size); size_t dict_pos = 0; for (size_t i = 0; i < column_types.size(); ++i) { @@ -406,10 +406,10 @@ table_with_metadata reader::impl::read(avro_reader_options const &options, for (size_t j = 0; j < dict[i].second; j++) { auto const &symbols = col_schema.symbols[j]; - auto const data_dst = h_global_dict_data.data() + dict_pos; - auto const len = symbols.length(); - col_dict_entries[j].ptr = data_dst; - col_dict_entries[j].count = len; + auto const data_dst = h_global_dict_data.data() + dict_pos; + auto const len = symbols.length(); + col_dict_entries[j].first = data_dst; + col_dict_entries[j].second = len; std::copy(symbols.c_str(), symbols.c_str() + len, data_dst); dict_pos += len; @@ -418,7 +418,7 @@ table_with_metadata reader::impl::read(avro_reader_options const &options, CUDA_TRY(cudaMemcpyAsync(d_global_dict.data(), h_global_dict.data(), - h_global_dict.size() * sizeof(gpu::nvstrdesc_s), + h_global_dict.size() * sizeof(string_index_pair), cudaMemcpyDefault, stream.value())); CUDA_TRY(cudaMemcpyAsync(d_global_dict_data.data(), diff --git a/cpp/src/io/avro/reader_impl.hpp b/cpp/src/io/avro/reader_impl.hpp index 22fa1aaa760..8e09da03563 100644 --- a/cpp/src/io/avro/reader_impl.hpp +++ b/cpp/src/io/avro/reader_impl.hpp @@ -97,7 +97,7 @@ class reader::impl { */ void decode_data(const rmm::device_buffer &block_data, const std::vector> &dict, - cudf::device_span global_dictionary, + cudf::device_span global_dictionary, size_t num_rows, std::vector> columns, std::vector &out_buffers, diff --git a/cpp/src/io/csv/csv.h b/cpp/src/io/csv/csv.h deleted file mode 100644 index b20ca4222b2..00000000000 --- a/cpp/src/io/csv/csv.h +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2019, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "csv_common.h" diff --git a/cpp/src/io/csv/datetime.cuh b/cpp/src/io/csv/datetime.cuh index 7f3c2ab4942..4e4ddd09a9f 100644 --- a/cpp/src/io/csv/datetime.cuh +++ b/cpp/src/io/csv/datetime.cuh @@ -16,7 +16,7 @@ #pragma once -#include "thrust/reduce.h" +#include #include #include @@ -435,4 +435,4 @@ __inline__ __device__ int64_t to_time_delta(char const* begin, char const* end) } } // namespace io -} // namespace cudf \ No newline at end of file +} // namespace cudf diff --git a/cpp/src/io/csv/reader_impl.hpp b/cpp/src/io/csv/reader_impl.hpp index 2764eb0980c..d61c2847b7e 100644 --- a/cpp/src/io/csv/reader_impl.hpp +++ b/cpp/src/io/csv/reader_impl.hpp @@ -16,7 +16,7 @@ #pragma once -#include "csv.h" +#include "csv_common.h" #include "csv_gpu.h" #include diff --git a/cpp/src/io/csv/writer_impl.hpp b/cpp/src/io/csv/writer_impl.hpp index 9c42a3666fb..965c036dc75 100644 --- a/cpp/src/io/csv/writer_impl.hpp +++ b/cpp/src/io/csv/writer_impl.hpp @@ -16,7 +16,7 @@ #pragma once -#include "csv.h" +#include "csv_common.h" #include "csv_gpu.h" #include diff --git a/cpp/src/io/json/json.h b/cpp/src/io/json/json.h deleted file mode 100644 index 0c2309d9d64..00000000000 --- a/cpp/src/io/json/json.h +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "json_common.h" diff --git a/cpp/src/io/json/json_common.h b/cpp/src/io/json/json_common.h index 0bcd4e95f9a..f33435c1673 100644 --- a/cpp/src/io/json/json_common.h +++ b/cpp/src/io/json/json_common.h @@ -17,6 +17,8 @@ #pragma once #include +#include #include class SerialTrieNode; +using cudf::io::detail::string_index_pair; diff --git a/cpp/src/io/json/json_gpu.cu b/cpp/src/io/json/json_gpu.cu index 75910ae6b5b..b9ced355107 100644 --- a/cpp/src/io/json/json_gpu.cu +++ b/cpp/src/io/json/json_gpu.cu @@ -46,8 +46,6 @@ namespace json { namespace gpu { using namespace ::cudf; -using string_pair = std::pair; - namespace { /** * @brief CUDA Kernel that adjusts the row range to exclude the character outside of the top level @@ -516,7 +514,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts, if (!serialized_trie_contains(opts.trie_na, {desc.value_begin, value_len})) { // Type dispatcher does not handle strings if (column_types[desc.column].id() == type_id::STRING) { - auto str_list = static_cast(output_columns[desc.column]); + auto str_list = static_cast(output_columns[desc.column]); str_list[rec_id].first = desc.value_begin; str_list[rec_id].second = value_len; @@ -537,7 +535,7 @@ __global__ void convert_data_to_columns_kernel(parse_options_view opts, } } } else if (column_types[desc.column].id() == type_id::STRING) { - auto str_list = static_cast(output_columns[desc.column]); + auto str_list = static_cast(output_columns[desc.column]); str_list[rec_id].first = nullptr; str_list[rec_id].second = 0; } diff --git a/cpp/src/io/json/json_gpu.h b/cpp/src/io/json/json_gpu.h index fb8d7b2c7ab..4a68ce48f20 100644 --- a/cpp/src/io/json/json_gpu.h +++ b/cpp/src/io/json/json_gpu.h @@ -16,8 +16,8 @@ #pragma once -#include #include +#include "json_common.h" #include diff --git a/cpp/src/io/json/reader_impl.hpp b/cpp/src/io/json/reader_impl.hpp index ffd3dc58fe7..e6df503619f 100644 --- a/cpp/src/io/json/reader_impl.hpp +++ b/cpp/src/io/json/reader_impl.hpp @@ -21,7 +21,7 @@ #pragma once -#include "json.h" +#include "json_common.h" #include "json_gpu.h" #include diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 9abaabace4f..bef6bd56cba 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -14,10 +14,10 @@ * limitations under the License. */ -#include -#include -#include +#include "orc.h" #include +#include "orc_field_reader.hpp" +#include "orc_field_writer.hpp" namespace cudf { namespace io { diff --git a/cpp/src/io/orc/orc_field_reader.hpp b/cpp/src/io/orc/orc_field_reader.hpp index 9bb1ff4310b..8e9bca44340 100644 --- a/cpp/src/io/orc/orc_field_reader.hpp +++ b/cpp/src/io/orc/orc_field_reader.hpp @@ -15,8 +15,8 @@ */ #pragma once -#include #include +#include "orc.h" /** * @file orc_field_reader.hpp diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index c60e5cbd23c..13c7befa3a1 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -15,9 +15,9 @@ */ #pragma once -#include #include #include +#include "orc.h" /** * @file orc_field_writer.hpp diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 55df0adf95b..dadc8a06281 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -19,10 +19,12 @@ #include "timezone.cuh" #include -#include #include #include #include +#include +#include +#include "orc_common.h" #include @@ -30,12 +32,15 @@ namespace cudf { namespace io { namespace orc { namespace gpu { + +using cudf::detail::device_2dspan; + struct CompressedStreamInfo { CompressedStreamInfo() = default; explicit constexpr CompressedStreamInfo(const uint8_t *compressed_data_, size_t compressed_size_) : compressed_data(compressed_data_), - compressed_data_size(compressed_size_), uncompressed_data(nullptr), + compressed_data_size(compressed_size_), decctl(nullptr), decstatus(nullptr), copyctl(nullptr), @@ -67,14 +72,6 @@ enum StreamIndexType { CI_NUM_STREAMS }; -/** - * @brief Struct to describe the output of a string datatype - */ -struct nvstrdesc_s { - const char *ptr; - size_t count; -}; - /** * @brief Struct to describe a single entry in the global dictionary */ @@ -292,8 +289,8 @@ void DecodeOrcColumnData(ColumnDesc const *chunks, * @param[in, out] streams chunk streams device array [column][rowgroup] * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void EncodeOrcColumnData(detail::device_2dspan chunks, - detail::device_2dspan streams, +void EncodeOrcColumnData(device_2dspan chunks, + device_2dspan streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** @@ -307,10 +304,10 @@ void EncodeOrcColumnData(detail::device_2dspan chunks, * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void EncodeStripeDictionaries(StripeDictionary *stripes, - detail::device_2dspan chunks, + device_2dspan chunks, uint32_t num_string_columns, uint32_t num_stripes, - detail::device_2dspan enc_streams, + device_2dspan enc_streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** @@ -321,7 +318,7 @@ void EncodeStripeDictionaries(StripeDictionary *stripes, * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ void set_chunk_columns(const table_device_view &view, - detail::device_2dspan chunks, + device_2dspan chunks, rmm::cuda_stream_view stream); /** @@ -331,8 +328,8 @@ void set_chunk_columns(const table_device_view &view, * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default` */ -void CompactOrcDataStreams(detail::device_2dspan strm_desc, - detail::device_2dspan enc_streams, +void CompactOrcDataStreams(device_2dspan strm_desc, + device_2dspan enc_streams, rmm::cuda_stream_view stream = rmm::cuda_stream_default); /** @@ -352,8 +349,8 @@ void CompressOrcDataStreams(uint8_t *compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, - detail::device_2dspan strm_desc, - detail::device_2dspan enc_streams, + device_2dspan strm_desc, + device_2dspan enc_streams, gpu_inflate_input_s *comp_in, gpu_inflate_status_s *comp_out, rmm::cuda_stream_view stream = rmm::cuda_stream_default); diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index dd4972ee8f8..63f184a9bff 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -23,7 +23,7 @@ #include "timezone.cuh" #include -#include +#include "orc.h" #include #include diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 6206d98773f..cd031af0dfb 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -25,6 +25,8 @@ namespace io { namespace orc { namespace gpu { +using cudf::io::detail::string_index_pair; + // Must be able to handle 512x 8-byte values. These values are base 128 encoded // so 8 byte value is expanded to 10 bytes. constexpr int bytestream_buffer_size = 512 * 8 * 2; @@ -1683,9 +1685,9 @@ __global__ void __launch_bounds__(block_size) case BINARY: case VARCHAR: case CHAR: { - nvstrdesc_s *strdesc = &static_cast(data_out)[row]; - void const *ptr = nullptr; - uint32_t count = 0; + string_index_pair *strdesc = &static_cast(data_out)[row]; + void const *ptr = nullptr; + uint32_t count = 0; if (is_dictionary(s->chunk.encoding_kind)) { auto const dict_idx = s->vals.u32[t + vals_skipped]; if (dict_idx < s->chunk.dict_len) { @@ -1703,8 +1705,8 @@ __global__ void __launch_bounds__(block_size) count = secondary_val; } } - strdesc->ptr = static_cast(ptr); - strdesc->count = count; + strdesc->first = static_cast(ptr); + strdesc->second = count; break; } case TIMESTAMP: { diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 10932d36309..6ed9071f5b7 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -27,7 +27,7 @@ namespace io { namespace orc { namespace gpu { -using detail::device_2dspan; +using cudf::detail::device_2dspan; constexpr int scratch_buffer_size = 512 * 4; @@ -1226,8 +1226,8 @@ void CompressOrcDataStreams(uint8_t *compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, - detail::device_2dspan strm_desc, - detail::device_2dspan enc_streams, + device_2dspan strm_desc, + device_2dspan enc_streams, gpu_inflate_input_s *comp_in, gpu_inflate_status_s *comp_out, rmm::cuda_stream_view stream) diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index ddb5006098d..a9b8eb0ac6b 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include +#include "compact_protocol_writer.hpp" namespace cudf { namespace io { diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index 680ea078a2f..2ce9245490e 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -16,8 +16,8 @@ #pragma once -#include -#include +#include "parquet.hpp" +#include "parquet_common.hpp" #include #include diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 538e238b5ea..dfd9c1384c5 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -14,9 +14,9 @@ * limitations under the License. */ -#include #include #include +#include "parquet_gpu.hpp" #include #include @@ -518,13 +518,14 @@ inline __device__ void gpuOutputString(volatile page_state_s *s, int src_pos, vo if (s->dict_base) { // String dictionary - uint32_t dict_pos = (s->dict_bits > 0) - ? s->dict_idx[src_pos & (non_zero_buffer_size - 1)] * sizeof(nvstrdesc_s) - : 0; + uint32_t dict_pos = (s->dict_bits > 0) ? s->dict_idx[src_pos & (non_zero_buffer_size - 1)] * + sizeof(string_index_pair) + : 0; if (dict_pos < (uint32_t)s->dict_size) { - const nvstrdesc_s *src = reinterpret_cast(s->dict_base + dict_pos); - ptr = src->ptr; - len = src->count; + const string_index_pair *src = + reinterpret_cast(s->dict_base + dict_pos); + ptr = src->first; + len = src->second; } } else { // Plain encoding @@ -539,9 +540,9 @@ inline __device__ void gpuOutputString(volatile page_state_s *s, int src_pos, vo *static_cast(dstv) = device_str2hash32(ptr, len); } else { // Output string descriptor - nvstrdesc_s *dst = static_cast(dstv); - dst->ptr = ptr; - dst->count = len; + string_index_pair *dst = static_cast(dstv); + dst->first = ptr; + dst->second = len; } } @@ -1010,7 +1011,7 @@ static __device__ bool setupLocalPageInfo(page_state_s *const s, // Fall through to DOUBLE case DOUBLE: s->dtype_len = 8; break; case INT96: s->dtype_len = 12; break; - case BYTE_ARRAY: s->dtype_len = sizeof(nvstrdesc_s); break; + case BYTE_ARRAY: s->dtype_len = sizeof(string_index_pair); break; default: // FIXED_LEN_BYTE_ARRAY: s->dtype_len = dtype_len_out; s->error |= (s->dtype_len <= 0); @@ -1094,7 +1095,7 @@ static __device__ bool setupLocalPageInfo(page_state_s *const s, if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) { // String dictionary: use index s->dict_base = reinterpret_cast(s->col.str_dict_index); - s->dict_size = s->col.page_info[0].num_input_values * sizeof(nvstrdesc_s); + s->dict_size = s->col.page_info[0].num_input_values * sizeof(string_index_pair); } else { s->dict_base = s->col.page_info[0].page_data; // dictionary is always stored in the first page diff --git a/cpp/src/io/parquet/page_dict.cu b/cpp/src/io/parquet/page_dict.cu index 2676f30474d..30842820448 100644 --- a/cpp/src/io/parquet/page_dict.cu +++ b/cpp/src/io/parquet/page_dict.cu @@ -14,8 +14,8 @@ * limitations under the License. */ -#include #include +#include "parquet_gpu.hpp" #include diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 51ec0013f1a..6c31605887a 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include #include +#include "parquet_gpu.hpp" #include #include diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 34f5ee6fb1a..bc10fd92566 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -15,8 +15,8 @@ */ #include -#include #include +#include "parquet_gpu.hpp" #include @@ -447,10 +447,10 @@ extern "C" __global__ void __launch_bounds__(128) if (chunk >= num_chunks) { return; } if (!lane_id && ck->num_dict_pages > 0 && ck->str_dict_index) { // Data type to describe a string - nvstrdesc_s *dict_index = ck->str_dict_index; - const uint8_t *dict = ck->page_info[0].page_data; - int dict_size = ck->page_info[0].uncompressed_page_size; - int num_entries = ck->page_info[0].num_input_values; + string_index_pair *dict_index = ck->str_dict_index; + const uint8_t *dict = ck->page_info[0].page_data; + int dict_size = ck->page_info[0].uncompressed_page_size; + int num_entries = ck->page_info[0].num_input_values; int pos = 0, cur = 0; for (int i = 0; i < num_entries; i++) { int len = 0; @@ -464,8 +464,8 @@ extern "C" __global__ void __launch_bounds__(128) } } // TODO: Could store 8 entries in shared mem, then do a single warp-wide store - dict_index[i].ptr = reinterpret_cast(dict + pos + 4); - dict_index[i].count = len; + dict_index[i].first = reinterpret_cast(dict + pos + 4); + dict_index[i].second = len; } } } diff --git a/cpp/src/io/parquet/parquet.cpp b/cpp/src/io/parquet/parquet.cpp index 40ce222825b..2a1bd0d5a18 100644 --- a/cpp/src/io/parquet/parquet.cpp +++ b/cpp/src/io/parquet/parquet.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ +#include "parquet.hpp" #include -#include namespace cudf { namespace io { diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 6c1c6209266..eefff518a9a 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -16,7 +16,7 @@ #pragma once -#include +#include "parquet_common.hpp" #include #include diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 555259c443d..a7698ea8a78 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -18,9 +18,9 @@ #include #include -#include #include #include +#include "parquet_common.hpp" #include #include @@ -39,6 +39,8 @@ namespace cudf { namespace io { namespace parquet { +using cudf::io::detail::string_index_pair; + /** * @brief Struct representing an input column in the file. */ @@ -70,14 +72,6 @@ enum level_type { NUM_LEVEL_TYPES }; -/** - * @brief Struct to describe the output of a string datatype - */ -struct nvstrdesc_s { - const char *ptr; - size_t count; -}; - /** * @brief Nesting information */ @@ -211,7 +205,7 @@ struct ColumnChunkDesc { int32_t max_num_pages; // size of page_info array PageInfo *page_info; // output page info for up to num_dict_pages + // num_data_pages (dictionary pages first) - nvstrdesc_s *str_dict_index; // index for string dictionary + string_index_pair *str_dict_index; // index for string dictionary uint32_t **valid_map_base; // base pointers of valid bit map for this column void **column_data_base; // base pointers of column data int8_t codec; // compressed codec enum diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 743f5e8f80a..363a90522f5 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -1199,7 +1199,7 @@ void reader::impl::decode_page_data(hostdevice_vector &chu // Build index for string dictionaries since they can't be indexed // directly due to variable-sized elements - rmm::device_vector str_dict_index; + rmm::device_vector str_dict_index; if (total_str_dict_indexes > 0) { str_dict_index.resize(total_str_dict_indexes); } // TODO (dm): hd_vec should have begin and end iterator members diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index ca200936134..ffd8975a8d2 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -21,8 +21,8 @@ #pragma once -#include -#include +#include "parquet.hpp" +#include "parquet_gpu.hpp" #include #include diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index f1743e86826..b5700af2d6e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -21,8 +21,8 @@ #include "writer_impl.hpp" -#include #include +#include "compact_protocol_writer.hpp" #include #include diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index b8532d755eb..e5103122033 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -21,8 +21,8 @@ #pragma once -#include -#include +#include "parquet.hpp" +#include "parquet_gpu.hpp" #include #include diff --git a/cpp/src/io/utilities/block_utils.cuh b/cpp/src/io/utilities/block_utils.cuh index ec5177d70c3..759aa2517b6 100644 --- a/cpp/src/io/utilities/block_utils.cuh +++ b/cpp/src/io/utilities/block_utils.cuh @@ -187,60 +187,5 @@ inline __device__ void memcpy_block(void *dstv, const void *srcv, uint32_t len, } } -/** - * @brief Compares two strings - */ -template -inline __device__ T nvstr_compare(const char *as, uint32_t alen, const char *bs, uint32_t blen) -{ - uint32_t len = min(alen, blen); - uint32_t i = 0; - if (len >= 4) { - uint32_t align_a = 3 & reinterpret_cast(as); - uint32_t align_b = 3 & reinterpret_cast(bs); - const uint32_t *as32 = reinterpret_cast(as - align_a); - const uint32_t *bs32 = reinterpret_cast(bs - align_b); - uint32_t ofsa = align_a * 8; - uint32_t ofsb = align_b * 8; - do { - uint32_t a = *as32++; - uint32_t b = *bs32++; - if (ofsa) a = __funnelshift_r(a, *as32, ofsa); - if (ofsb) b = __funnelshift_r(b, *bs32, ofsb); - if (a != b) { - return (lesser == greater || __byte_perm(a, 0, 0x0123) < __byte_perm(b, 0, 0x0123)) - ? lesser - : greater; - } - i += 4; - } while (i + 4 <= len); - } - while (i < len) { - uint8_t a = as[i]; - uint8_t b = bs[i]; - if (a != b) { return (a < b) ? lesser : greater; } - ++i; - } - return (alen == blen) ? equal : (alen < blen) ? lesser : greater; -} - -inline __device__ bool nvstr_is_lesser(const char *as, uint32_t alen, const char *bs, uint32_t blen) -{ - return nvstr_compare(as, alen, bs, blen); -} - -inline __device__ bool nvstr_is_greater(const char *as, - uint32_t alen, - const char *bs, - uint32_t blen) -{ - return nvstr_compare(as, alen, bs, blen); -} - -inline __device__ bool nvstr_is_equal(const char *as, uint32_t alen, const char *bs, uint32_t blen) -{ - return nvstr_compare(as, alen, bs, blen); -} - } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp new file mode 100644 index 00000000000..9170a9016c4 --- /dev/null +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file column_buffer.cpp + * @brief cuDF-IO column_buffer class implementation + */ + +#include "column_buffer.hpp" +#include + +namespace cudf { +namespace io { +namespace detail { + +void column_buffer::create(size_type _size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + size = _size; + + switch (type.id()) { + case type_id::STRING: + _strings = std::make_unique>( + cudf::detail::make_zeroed_device_uvector_async(size, stream)); + break; + + // list columns store a buffer of int32's as offsets to represent + // their individual rows + case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, mr); break; + + // struct columns store no data themselves. just validity and children. + case type_id::STRUCT: break; + + default: _data = create_data(type, size, stream, mr); break; + } + if (is_nullable) { + _null_mask = + cudf::detail::create_null_mask(size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), mr); + } +} + +/** + * @brief Creates a column from an existing set of device memory buffers. + * + * @throws std::bad_alloc if device memory allocation fails + * + * @param buffer Column buffer descriptors + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @return `std::unique_ptr` Column from the existing device data + */ +std::unique_ptr make_column(column_buffer& buffer, + column_name_info* schema_info, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (schema_info != nullptr) { schema_info->name = buffer.name; } + + switch (buffer.type.id()) { + case type_id::STRING: + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{"offsets"}); + schema_info->children.push_back(column_name_info{"chars"}); + } + return make_strings_column(*buffer._strings, stream, mr); + + case type_id::LIST: { + // make offsets column + auto offsets = + std::make_unique(data_type{type_id::INT32}, buffer.size, std::move(buffer._data)); + + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{"offsets"}); + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + + // make child column + CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer"); + auto child = make_column(buffer.children[0], child_info, stream, mr); + + // make the final list column (note : size is the # of offsets, so our actual # of rows is 1 + // less) + return make_lists_column(buffer.size - 1, + std::move(offsets), + std::move(child), + buffer._null_count, + std::move(buffer._null_mask), + stream, + mr); + } break; + + case type_id::STRUCT: { + std::vector> output_children; + output_children.reserve(buffer.children.size()); + std::transform(buffer.children.begin(), + buffer.children.end(), + std::back_inserter(output_children), + [&](column_buffer& col) { + column_name_info* child_info = nullptr; + if (schema_info != nullptr) { + schema_info->children.push_back(column_name_info{""}); + child_info = &schema_info->children.back(); + } + return make_column(col, child_info, stream, mr); + }); + + return make_structs_column(buffer.size, + std::move(output_children), + buffer._null_count, + std::move(buffer._null_mask), + stream, + mr); + } break; + + default: { + return std::make_unique(buffer.type, + buffer.size, + std::move(buffer._data), + std::move(buffer._null_mask), + buffer._null_count); + } + } +} + +} // namespace detail +} // namespace io +} // namespace cudf diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index 75e9a4c18df..5da4b7a873b 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -60,18 +60,13 @@ inline rmm::device_buffer create_data( return data; } +using string_index_pair = thrust::pair; + /** * @brief Class for holding device memory buffers to column data that eventually * will be used to create a column. */ struct column_buffer { - // there is a potential bug here. In the decoding step, the buffer of - // data holding these pairs is cast to an nvstrdesc_s, which is a struct - // containing . So there is a mismatch between the - // size_type and the size_t. I believe this works because the str_pair is - // aligned out to 8 bytes anyway. - using str_pair = thrust::pair; - column_buffer() = default; // construct without a known size. call create() later to actually @@ -84,7 +79,7 @@ struct column_buffer { bool _is_nullable = true, rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) - : type(_type), is_nullable(_is_nullable), _null_count(0) + : type(_type), is_nullable(_is_nullable) { create(_size, stream, mr); } @@ -101,30 +96,7 @@ struct column_buffer { // preprocessing steps such as in the Parquet reader void create(size_type _size, rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) - { - size = _size; - - switch (type.id()) { - case type_id::STRING: - _strings = std::make_unique>(size, stream); - cudaMemsetAsync(_strings->data(), 0, size * sizeof(str_pair), stream.value()); - break; - - // list columns store a buffer of int32's as offsets to represent - // their individual rows - case type_id::LIST: _data = create_data(data_type{type_id::INT32}, size, stream, mr); break; - - // struct columns store no data themselves. just validity and children. - case type_id::STRUCT: break; - - default: _data = create_data(type, size, stream, mr); break; - } - if (is_nullable) { - _null_mask = cudf::detail::create_null_mask( - size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), mr); - } - } + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); auto data() { return _strings ? _strings->data() : _data.data(); } auto data_size() const { return _strings ? _strings->size() : _data.size(); } @@ -138,110 +110,24 @@ struct column_buffer { auto& null_count() { return _null_count; } - std::unique_ptr> _strings; + std::unique_ptr> _strings; rmm::device_buffer _data{}; rmm::device_buffer _null_mask{}; size_type _null_count{0}; - bool is_nullable{false}; data_type type{type_id::EMPTY}; + bool is_nullable{false}; size_type size{0}; std::vector children; uint32_t user_data{0}; // arbitrary user data std::string name; }; -namespace { -/** - * @brief Creates a column from an existing set of device memory buffers. - * - * @throws std::bad_alloc if device memory allocation fails - * - * @param buffer Column buffer descriptors - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @return `std::unique_ptr` Column from the existing device data - */ std::unique_ptr make_column( column_buffer& buffer, column_name_info* schema_info = nullptr, rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) -{ - using str_pair = thrust::pair; - - if (schema_info != nullptr) { schema_info->name = buffer.name; } - - switch (buffer.type.id()) { - case type_id::STRING: - if (schema_info != nullptr) { - schema_info->children.push_back(column_name_info{"offsets"}); - schema_info->children.push_back(column_name_info{"chars"}); - } - return make_strings_column(*buffer._strings, stream, mr); - - case type_id::LIST: { - // make offsets column - auto offsets = - std::make_unique(data_type{type_id::INT32}, buffer.size, std::move(buffer._data)); - - column_name_info* child_info = nullptr; - if (schema_info != nullptr) { - schema_info->children.push_back(column_name_info{"offsets"}); - schema_info->children.push_back(column_name_info{""}); - child_info = &schema_info->children.back(); - } - - // make child column - CUDF_EXPECTS(buffer.children.size() > 0, "Encountered malformed column_buffer"); - auto child = make_column(buffer.children[0], child_info, stream, mr); - - // make the final list column (note : size is the # of offsets, so our actual # of rows is 1 - // less) - return make_lists_column(buffer.size - 1, - std::move(offsets), - std::move(child), - buffer._null_count, - std::move(buffer._null_mask), - stream, - mr); - } break; - - case type_id::STRUCT: { - std::vector> output_children; - output_children.reserve(buffer.children.size()); - std::transform(buffer.children.begin(), - buffer.children.end(), - std::back_inserter(output_children), - [&](column_buffer& col) { - column_name_info* child_info = nullptr; - if (schema_info != nullptr) { - schema_info->children.push_back(column_name_info{""}); - child_info = &schema_info->children.back(); - } - return make_column(col, child_info, stream, mr); - }); - - return make_structs_column(buffer.size, - std::move(output_children), - buffer._null_count, - std::move(buffer._null_mask), - stream, - mr); - } break; - - default: { - return std::make_unique(buffer.type, - buffer.size, - std::move(buffer._data), - std::move(buffer._null_mask), - buffer._null_count); - } - } -} - -} // namespace + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); } // namespace detail } // namespace io diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 10af7bcb0bd..d133d813ab3 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -18,7 +18,7 @@ #include #include -#include +#include "file_io_utilities.hpp" #include diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 8f2a5389b4d..ac8deccd078 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -21,7 +21,7 @@ #include #include -#include +#include "file_io_utilities.hpp" namespace cudf { namespace io { diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index 322296715fc..abf3a3fdef0 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include +#include "file_io_utilities.hpp" #include diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh index b7719cba580..9cfa46aaf11 100644 --- a/cpp/src/io/utilities/parsing_utils.cuh +++ b/cpp/src/io/utilities/parsing_utils.cuh @@ -20,7 +20,7 @@ #include #include -#include +#include "column_type_histogram.hpp" #include