diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7999ada9282..903cff27be4 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -409,6 +409,7 @@ add_library( src/io/orc/stripe_init.cu src/datetime/timezone.cpp src/io/orc/writer_impl.cu + src/io/parquet/arrow_schema_writer.cpp src/io/parquet/compact_protocol_reader.cpp src/io/parquet/compact_protocol_writer.cpp src/io/parquet/decode_preprocess.cu @@ -425,6 +426,7 @@ add_library( src/io/parquet/reader_impl_helpers.cpp src/io/parquet/reader_impl_preprocess.cu src/io/parquet/writer_impl.cu + src/io/parquet/writer_impl_helpers.cpp src/io/parquet/decode_fixed.cu src/io/statistics/orc_column_statistics.cu src/io/statistics/parquet_column_statistics.cu diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 431f14af522..4d98cae73a7 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -597,6 +597,8 @@ class parquet_writer_options_base { // Parquet writer can write timestamps as UTC // Defaults to true because libcudf timestamps are implicitly UTC bool _write_timestamps_as_UTC = true; + // Whether to write ARROW schema + bool _write_arrow_schema = false; // Maximum size of each row group (unless smaller than a single page) size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) @@ -689,6 +691,13 @@ class parquet_writer_options_base { */ [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } + /** + * @brief Returns `true` if arrow schema will be written + * + * @return `true` if arrow schema will be written + */ + [[nodiscard]] auto is_enabled_write_arrow_schema() const { return _write_arrow_schema; } + /** * @brief Returns maximum row group size, in bytes. * @@ -824,6 +833,13 @@ class parquet_writer_options_base { */ void enable_utc_timestamps(bool val); + /** + * @brief Sets preference for writing arrow schema. Write arrow schema if set to `true`. + * + * @param val Boolean value to enable/disable writing of arrow schema. + */ + void enable_write_arrow_schema(bool val); + /** * @brief Sets the maximum row group size, in bytes. * @@ -1084,6 +1100,15 @@ class parquet_writer_options_builder_base { * @return this for chaining */ BuilderT& utc_timestamps(bool enabled); + + /** + * @brief Set to true if arrow schema is to be written + * + * @param enabled Boolean value to enable/disable writing of arrow schema + * @return this for chaining + */ + BuilderT& write_arrow_schema(bool enabled); + /** * @brief Set to true if V2 page headers are to be written. * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 5daa55d4552..b4ece9cec66 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -762,6 +762,9 @@ void parquet_writer_options_base::set_compression(compression_type compression) void parquet_writer_options_base::enable_int96_timestamps(bool req) { + CUDF_EXPECTS(not req or not is_enabled_write_arrow_schema(), + "INT96 timestamps and arrow schema cannot be simultaneously " + "enabled as INT96 timestamps are deprecated in Arrow."); _write_timestamps_as_int96 = req; } @@ -770,6 +773,14 @@ void parquet_writer_options_base::enable_utc_timestamps(bool val) _write_timestamps_as_UTC = val; } +void parquet_writer_options_base::enable_write_arrow_schema(bool val) +{ + CUDF_EXPECTS(not val or not is_enabled_int96_timestamps(), + "arrow schema and INT96 timestamps cannot be simultaneously " + "enabled as INT96 timestamps are deprecated in Arrow."); + _write_arrow_schema = val; +} + void parquet_writer_options_base::set_row_group_size_bytes(size_t size_bytes) { CUDF_EXPECTS( @@ -974,6 +985,13 @@ BuilderT& parquet_writer_options_builder_base::utc_timestamp return static_cast(*this); } +template +BuilderT& parquet_writer_options_builder_base::write_arrow_schema(bool enabled) +{ + _options.enable_write_arrow_schema(enabled); + return static_cast(*this); +} + template BuilderT& parquet_writer_options_builder_base::write_v2_headers(bool enabled) { diff --git a/cpp/src/io/parquet/arrow_schema_writer.cpp b/cpp/src/io/parquet/arrow_schema_writer.cpp new file mode 100644 index 00000000000..ddf65e9020f --- /dev/null +++ b/cpp/src/io/parquet/arrow_schema_writer.cpp @@ -0,0 +1,388 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file arrow_schema_writer.cpp + * @brief Arrow IPC schema writer implementation + */ + +#include "arrow_schema_writer.hpp" + +#include "io/parquet/parquet_common.hpp" +#include "io/utilities/base64_utilities.hpp" +#include "ipc/Message_generated.h" +#include "ipc/Schema_generated.h" +#include "writer_impl_helpers.hpp" + +#include +#include +#include + +namespace cudf::io::parquet::detail { + +using namespace cudf::io::detail; + +namespace { + +// Copied over from arrow source for better code readability +namespace flatbuf = cudf::io::parquet::flatbuf; +using FlatBufferBuilder = flatbuffers::FlatBufferBuilder; +using DictionaryOffset = flatbuffers::Offset; +using FieldOffset = flatbuffers::Offset; +using Offset = flatbuffers::Offset; +using FBString = flatbuffers::Offset; + +/** + * @brief Recursively construct the arrow schema (fields) tree + * + * @param fbb The root flatbuffer builder object instance + * @param column A view of the column + * @param column_metadata Metadata of the column + * @param write_mode Flag to indicate that we are guaranteeing a single table write + * @param utc_timestamps Flag to indicate if timestamps are UTC + * + * @return Flatbuffer offset to the constructed field + */ +FieldOffset make_arrow_schema_fields(FlatBufferBuilder& fbb, + cudf::detail::LinkedColPtr const& column, + column_in_metadata const& column_metadata, + single_write_mode const write_mode, + bool const utc_timestamps); + +/** + * @brief Functor to convert cudf column metadata to arrow schema field metadata + */ +struct dispatch_to_flatbuf { + FlatBufferBuilder& fbb; + cudf::detail::LinkedColPtr const& col; + column_in_metadata const& col_meta; + single_write_mode const write_mode; + bool const utc_timestamps; + Offset& field_offset; + flatbuf::Type& field_type_id; + std::vector& children; + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Bool; + field_offset = flatbuf::CreateBool(fbb).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 8, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 16, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 32, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 64, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 8, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 16, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 32, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Int; + field_offset = flatbuf::CreateInt(fbb, 64, std::numeric_limits::is_signed).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_FloatingPoint; + field_offset = flatbuf::CreateFloatingPoint(fbb, flatbuf::Precision::Precision_SINGLE).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_FloatingPoint; + field_offset = flatbuf::CreateFloatingPoint(fbb, flatbuf::Precision::Precision_DOUBLE).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Utf8View; + field_offset = flatbuf::CreateUtf8View(fbb).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Date; + // Date type (Set unit type to DAY for arrows's Date32) + field_offset = flatbuf::CreateDate(fbb, flatbuf::DateUnit_DAY).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Timestamp; + // Use one of the strings: "UTC", "Etc/UTC" or "+00:00" to indicate a native UTC timestamp + field_offset = flatbuf::CreateTimestamp( + fbb, flatbuf::TimeUnit_SECOND, (utc_timestamps) ? fbb.CreateString("UTC") : 0) + .Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Timestamp; + // Use one of the strings: "UTC", "Etc/UTC" or "+00:00" to indicate a native UTC timestamp + field_offset = + flatbuf::CreateTimestamp( + fbb, flatbuf::TimeUnit_MILLISECOND, (utc_timestamps) ? fbb.CreateString("UTC") : 0) + .Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Timestamp; + // Use one of the strings: "UTC", "Etc/UTC" or "+00:00" to indicate a native UTC timestamp + field_offset = + flatbuf::CreateTimestamp( + fbb, flatbuf::TimeUnit_MICROSECOND, (utc_timestamps) ? fbb.CreateString("UTC") : 0) + .Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Timestamp; + // Use one of the strings: "UTC", "Etc/UTC" or "+00:00" to indicate a native UTC timestamp + field_offset = + flatbuf::CreateTimestamp( + fbb, flatbuf::TimeUnit_NANOSECOND, (utc_timestamps) ? fbb.CreateString("UTC") : 0) + .Union(); + } + + template + std::enable_if_t, void> operator()() + { + // `duration_D` is written as TimeType as `duration_D` is not a valid arrow type. + // This also allows for easy and faithful roundtripping with cudf. + field_type_id = flatbuf::Type_Time; + field_offset = flatbuf::CreateTime(fbb, flatbuf::TimeUnit_MILLISECOND).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Duration; + field_offset = flatbuf::CreateDuration(fbb, flatbuf::TimeUnit_SECOND).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Duration; + field_offset = flatbuf::CreateDuration(fbb, flatbuf::TimeUnit_MILLISECOND).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Duration; + field_offset = flatbuf::CreateDuration(fbb, flatbuf::TimeUnit_MICROSECOND).Union(); + } + + template + std::enable_if_t, void> operator()() + { + field_type_id = flatbuf::Type_Duration; + field_offset = flatbuf::CreateDuration(fbb, flatbuf::TimeUnit_NANOSECOND).Union(); + } + + template + std::enable_if_t(), void> operator()() + { + field_type_id = flatbuf::Type_Decimal; + field_offset = flatbuf::CreateDecimal(fbb, + (col_meta.is_decimal_precision_set()) + ? col_meta.get_decimal_precision() + : MAX_DECIMAL128_PRECISION, + col->type().scale(), + 128) + .Union(); + } + + template + std::enable_if_t(), void> operator()() + { + // Lists are represented differently in arrow and cuDF. + // cuDF representation: List: "col_name" : { "list", "element:int" } (2 children) + // arrow schema representation: List: "col_name" : { "list" } (1 child) + // Hence, we only need to process the second child of the list. + if constexpr (std::is_same_v) { + children.emplace_back(make_arrow_schema_fields( + fbb, col->children[1], col_meta.child(1), write_mode, utc_timestamps)); + field_type_id = flatbuf::Type_List; + field_offset = flatbuf::CreateList(fbb).Union(); + } + + // Traverse the struct in DFS manner and process children fields. + else if constexpr (std::is_same_v) { + std::transform(thrust::make_counting_iterator(0UL), + thrust::make_counting_iterator(col->children.size()), + std::back_inserter(children), + [&](auto const idx) { + return make_arrow_schema_fields( + fbb, col->children[idx], col_meta.child(idx), write_mode, utc_timestamps); + }); + field_type_id = flatbuf::Type_Struct_; + field_offset = flatbuf::CreateStruct_(fbb).Union(); + } + } + + template + std::enable_if_t(), void> operator()() + { + // `dictionary32` columns are not written to parquet by cudf. + CUDF_FAIL("Dictionary columns are not supported for writing"); + } +}; + +FieldOffset make_arrow_schema_fields(FlatBufferBuilder& fbb, + cudf::detail::LinkedColPtr const& column, + column_in_metadata const& column_metadata, + single_write_mode const write_mode, + bool const utc_timestamps) +{ + // Variables to be set by the dispatch_to_flatbuf functor + Offset field_offset = 0; + flatbuf::Type field_type_id = flatbuf::Type_NONE; + std::vector children; + + cudf::type_dispatcher(column->type(), + dispatch_to_flatbuf{fbb, + column, + column_metadata, + write_mode, + utc_timestamps, + field_offset, + field_type_id, + children}); + + // push to field offsets vector + return flatbuf::CreateField( + fbb, + fbb.CreateString(column_metadata.get_name()), // name + is_output_column_nullable(column, column_metadata, write_mode), // nullable + field_type_id, // type id + field_offset, // field offset + {0}, // DictionaryOffset + fbb.CreateVector(children.data(), children.size())); // children vector +} + +} // namespace + +std::string construct_arrow_schema_ipc_message(cudf::detail::LinkedColVector const& linked_columns, + table_input_metadata const& metadata, + single_write_mode const write_mode, + bool const utc_timestamps) +{ + // Lambda function to convert int32 to a string of uint8 bytes + auto const convert_int32_to_byte_string = [&](int32_t const value) { + std::array buffer; + std::memcpy(buffer.data(), &value, sizeof(int32_t)); + return std::string(reinterpret_cast(buffer.data()), buffer.size()); + }; + + // Instantiate a flatbuffer builder + FlatBufferBuilder fbb; + + // Create an empty field offset vector and reserve space for linked columns + std::vector field_offsets; + field_offsets.reserve(linked_columns.size()); + + // populate field offsets (aka schema fields) + std::transform(thrust::make_zip_iterator( + thrust::make_tuple(linked_columns.begin(), metadata.column_metadata.begin())), + thrust::make_zip_iterator( + thrust::make_tuple(linked_columns.end(), metadata.column_metadata.end())), + std::back_inserter(field_offsets), + [&](auto const& elem) { + return make_arrow_schema_fields( + fbb, thrust::get<0>(elem), thrust::get<1>(elem), write_mode, utc_timestamps); + }); + + // Build an arrow:schema flatbuffer using the field offset vector and use it as the header to + // create an ipc message flatbuffer + fbb.Finish(flatbuf::CreateMessage( + fbb, + flatbuf::MetadataVersion_V5, // Metadata version V5 (latest) + flatbuf::MessageHeader_Schema, // Schema type message header + flatbuf::CreateSchema(fbb, + flatbuf::Endianness::Endianness_Little, + fbb.CreateVector(field_offsets)) + .Union(), // arrow:schema built from the field vector + SCHEMA_HEADER_TYPE_IPC_MESSAGE_BODYLENGTH // Body length is zero for schema type ipc message + )); + + // Construct the final string and store it here to use its view in base64_encode + std::string const ipc_message = + convert_int32_to_byte_string(IPC_CONTINUATION_TOKEN) + + // Since the schema type ipc message doesn't have a body, the flatbuffer size is equal to the + // ipc message's metadata length + convert_int32_to_byte_string(fbb.GetSize()) + + std::string(reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); + + // Encode the final ipc message string to base64 and return + return cudf::io::detail::base64_encode(ipc_message); +} + +} // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/arrow_schema_writer.hpp b/cpp/src/io/parquet/arrow_schema_writer.hpp new file mode 100644 index 00000000000..9bc435bf6c8 --- /dev/null +++ b/cpp/src/io/parquet/arrow_schema_writer.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file arrow_schema_writer.hpp + * @brief Arrow IPC schema writer implementation + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace cudf::io::parquet::detail { + +/** + * @brief Construct and return arrow schema from input parquet schema + * + * Recursively traverses through parquet schema to construct the arrow schema tree. + * Serializes the arrow schema tree and stores it as the header (or metadata) of + * an otherwise empty ipc message using flatbuffers. The ipc message is then prepended + * with header size (padded for 16 byte alignment) and a continuation string. The final + * string is base64 encoded and returned. + * + * @param linked_columns Vector of table column views + * @param metadata Metadata of the columns of the table + * @param write_mode Flag to indicate that we are guaranteeing a single table write + * @param utc_timestamps Flag to indicate if timestamps are UTC + * + * @return The constructed arrow ipc message string + */ +std::string construct_arrow_schema_ipc_message(cudf::detail::LinkedColVector const& linked_columns, + table_input_metadata const& metadata, + cudf::io::detail::single_write_mode const write_mode, + bool const utc_timestamps); + +} // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index 8507eca047e..e42c259b1bf 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include namespace cudf::io::parquet::detail { @@ -26,6 +27,15 @@ auto constexpr MAX_DECIMAL32_PRECISION = 9; auto constexpr MAX_DECIMAL64_PRECISION = 18; auto constexpr MAX_DECIMAL128_PRECISION = 38; // log10(2^(sizeof(int128_t) * 8 - 1) - 1) +// Constants copied from arrow source and renamed to match the case +int32_t constexpr MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL = sizeof(int32_t); +int32_t constexpr MESSAGE_DECODER_NEXT_REQUIRED_SIZE_METADATA_LENGTH = sizeof(int32_t); +int32_t constexpr IPC_CONTINUATION_TOKEN = -1; +std::string const ARROW_SCHEMA_KEY = "ARROW:schema"; + +// Schema type ipc message has zero length body +int64_t constexpr SCHEMA_HEADER_TYPE_IPC_MESSAGE_BODYLENGTH = 0; + /** * @brief Basic data types in Parquet, determines how data is physically stored */ diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index ebd4affd099..d1e9a823d3b 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -564,14 +564,14 @@ aggregate_reader_metadata::aggregate_reader_metadata( // Collect and apply arrow:schema from Parquet's key value metadata section if (use_arrow_schema) { apply_arrow_schema(); } - // Erase "ARROW:schema" from the output pfm if exists + // Erase ARROW_SCHEMA_KEY from the output pfm if exists std::for_each( - keyval_maps.begin(), keyval_maps.end(), [](auto& pfm) { pfm.erase("ARROW:schema"); }); + keyval_maps.begin(), keyval_maps.end(), [](auto& pfm) { pfm.erase(ARROW_SCHEMA_KEY); }); } arrow_schema_data_types aggregate_reader_metadata::collect_arrow_schema() const { - // Check the key_value metadata for ARROW:schema, decode and walk it + // Check the key_value metadata for arrow schema, decode and walk it // Function to convert from flatbuf::duration type to cudf::type_id auto const duration_from_flatbuffer = [](flatbuf::Duration const* duration) { // TODO: we only need this for arrow::DurationType for now. Else, we can take in a @@ -645,9 +645,7 @@ arrow_schema_data_types aggregate_reader_metadata::collect_arrow_schema() const return true; }; - // TODO: Should we check if any file has the "ARROW:schema" key - // Or if all files have the same "ARROW:schema"? - auto const it = keyval_maps[0].find("ARROW:schema"); + auto const it = keyval_maps[0].find(ARROW_SCHEMA_KEY); if (it == keyval_maps[0].end()) { return {}; } // Decode the base64 encoded ipc message string @@ -788,11 +786,6 @@ void aggregate_reader_metadata::apply_arrow_schema() std::optional aggregate_reader_metadata::decode_ipc_message( std::string_view const serialized_message) const { - // Constants copied from arrow source and renamed to match the case - constexpr int32_t MESSAGE_DECODER_NEXT_REQUIRED_SIZE_INITIAL = sizeof(int32_t); - constexpr int32_t MESSAGE_DECODER_NEXT_REQUIRED_SIZE_METADATA_LENGTH = sizeof(int32_t); - constexpr int32_t IPC_CONTINUATION_TOKEN = -1; - // message buffer auto message_buf = serialized_message.data(); // current message (buffer) size diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 9aeb19a7723..6bfa8519c76 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -117,6 +117,9 @@ struct metadata : public FileMetaData { void sanitize_schema(); }; +/** + * @brief Class to extract data types from arrow schema tree + */ struct arrow_schema_data_types { std::vector children; data_type type{type_id::EMPTY}; @@ -142,7 +145,7 @@ class aggregate_reader_metadata { const; /** - * @brief Decodes and constructs the arrow schema from the "ARROW:schema" IPC message + * @brief Decodes and constructs the arrow schema from the ARROW_SCHEMA_KEY IPC message * in key value metadata section of Parquet file footer */ [[nodiscard]] arrow_schema_data_types collect_arrow_schema() const; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index bed4dbc5a66..66b4fce16fe 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -19,6 +19,7 @@ * @brief cuDF-IO parquet writer class implementation */ +#include "arrow_schema_writer.hpp" #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "io/comp/nvcomp_adapter.hpp" @@ -30,6 +31,7 @@ #include "parquet_common.hpp" #include "parquet_gpu.cuh" #include "writer_impl.hpp" +#include "writer_impl_helpers.hpp" #include #include @@ -39,9 +41,6 @@ #include #include #include -#include -#include -#include #include #include @@ -70,7 +69,8 @@ struct aggregate_writer_metadata { host_span const> kv_md, host_span tbl_schema, size_type num_columns, - statistics_freq stats_granularity) + statistics_freq stats_granularity, + std::string const arrow_schema_ipc_message) : version(1), schema(std::vector(tbl_schema.begin(), tbl_schema.end())), files(partitions.size()) @@ -92,6 +92,13 @@ struct aggregate_writer_metadata { return KeyValue{kv.first, kv.second}; }); } + + // Append arrow schema to the key-value metadata + if (not arrow_schema_ipc_message.empty()) { + std::for_each(this->files.begin(), this->files.end(), [&](auto& file) { + file.key_value_metadata.emplace_back(KeyValue{ARROW_SCHEMA_KEY, arrow_schema_ipc_message}); + }); + } } aggregate_writer_metadata(aggregate_writer_metadata const&) = default; @@ -182,26 +189,6 @@ struct aggregate_writer_metadata { namespace { -/** - * @brief Function that translates GDF compression to parquet compression. - * - * @param compression The compression type - * @return The supported Parquet compression - */ -Compression to_parquet_compression(compression_type compression) -{ - switch (compression) { - case compression_type::AUTO: - case compression_type::SNAPPY: return Compression::SNAPPY; - case compression_type::ZSTD: return Compression::ZSTD; - case compression_type::LZ4: - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - return Compression::LZ4_RAW; - case compression_type::NONE: return Compression::UNCOMPRESSED; - default: CUDF_FAIL("Unsupported compression type"); - } -} - /** * @brief Convert a mask of encodings to a vector. * @@ -326,6 +313,7 @@ struct leaf_schema_fn { column_in_metadata const& col_meta; bool timestamp_is_int96; bool timestamp_is_utc; + bool write_arrow_schema; template std::enable_if_t, void> operator()() @@ -493,10 +481,11 @@ struct leaf_schema_fn { } } - // unsupported outside cudf for parquet 1.0. template std::enable_if_t, void> operator()() { + // duration_D is based on int32_t and not a valid arrow duration type so simply convert to + // time32(ms). col_schema.type = Type::INT32; col_schema.converted_type = ConvertedType::TIME_MILLIS; col_schema.stats_dtype = statistics_dtype::dtype_int32; @@ -507,62 +496,86 @@ struct leaf_schema_fn { template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT32; - col_schema.converted_type = ConvertedType::TIME_MILLIS; - col_schema.stats_dtype = statistics_dtype::dtype_int32; - col_schema.ts_scale = 1000; - col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; + // If writing arrow schema, no logical type nor converted type is necessary + if (write_arrow_schema) { + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_int64; + } else { + // Write as Time32 logical type otherwise. Parquet TIME_MILLIS annotates INT32 + col_schema.type = Type::INT32; + col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.converted_type = ConvertedType::TIME_MILLIS; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; + col_schema.ts_scale = 1000; + } } template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT32; - col_schema.converted_type = ConvertedType::TIME_MILLIS; - col_schema.stats_dtype = statistics_dtype::dtype_int32; - col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; + // If writing arrow schema, no logical type nor converted type is necessary + if (write_arrow_schema) { + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_int64; + } else { + // Write as Time32 logical type otherwise. Parquet TIME_MILLIS annotates INT32 + col_schema.type = Type::INT32; + col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.converted_type = ConvertedType::TIME_MILLIS; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; + } } template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT64; - col_schema.converted_type = ConvertedType::TIME_MICROS; - col_schema.stats_dtype = statistics_dtype::dtype_int64; - col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MICROS}}; + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_int64; + // Only write as time64 logical type if not writing arrow schema + if (not write_arrow_schema) { + col_schema.converted_type = ConvertedType::TIME_MICROS; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MICROS}}; + } } - // unsupported outside cudf for parquet 1.0. template std::enable_if_t, void> operator()() { - col_schema.type = Type::INT64; - col_schema.stats_dtype = statistics_dtype::dtype_int64; - col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::NANOS}}; + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_int64; + // Only write as time64 logical type if not writing arrow schema + if (not write_arrow_schema) { + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::NANOS}}; + } } template std::enable_if_t(), void> operator()() { - if (std::is_same_v) { - col_schema.type = Type::INT32; - col_schema.stats_dtype = statistics_dtype::dtype_int32; - col_schema.decimal_precision = MAX_DECIMAL32_PRECISION; - col_schema.logical_type = LogicalType{DecimalType{0, MAX_DECIMAL32_PRECISION}}; - } else if (std::is_same_v) { - col_schema.type = Type::INT64; - col_schema.stats_dtype = statistics_dtype::dtype_decimal64; - col_schema.decimal_precision = MAX_DECIMAL64_PRECISION; - col_schema.logical_type = LogicalType{DecimalType{0, MAX_DECIMAL64_PRECISION}}; - } else if (std::is_same_v) { + // If writing arrow schema, then convert d32 and d64 to d128 + if (write_arrow_schema or std::is_same_v) { col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; col_schema.type_length = sizeof(__int128_t); col_schema.stats_dtype = statistics_dtype::dtype_decimal128; col_schema.decimal_precision = MAX_DECIMAL128_PRECISION; col_schema.logical_type = LogicalType{DecimalType{0, MAX_DECIMAL128_PRECISION}}; } else { - CUDF_FAIL("Unsupported fixed point type for parquet writer"); + if (std::is_same_v) { + col_schema.type = Type::INT32; + col_schema.stats_dtype = statistics_dtype::dtype_int32; + col_schema.decimal_precision = MAX_DECIMAL32_PRECISION; + col_schema.logical_type = LogicalType{DecimalType{0, MAX_DECIMAL32_PRECISION}}; + } else if (std::is_same_v) { + col_schema.type = Type::INT64; + col_schema.stats_dtype = statistics_dtype::dtype_decimal64; + col_schema.decimal_precision = MAX_DECIMAL64_PRECISION; + col_schema.logical_type = LogicalType{DecimalType{0, MAX_DECIMAL64_PRECISION}}; + } else { + CUDF_FAIL("Unsupported fixed point type for parquet writer"); + } } + + // Write logical and converted types, decimal scale and precision col_schema.converted_type = ConvertedType::DECIMAL; col_schema.decimal_scale = -col->type().scale(); // parquet and cudf disagree about scale signs col_schema.logical_type->decimal_type->scale = -col->type().scale(); @@ -590,33 +603,19 @@ struct leaf_schema_fn { } }; -inline bool is_col_nullable(cudf::detail::LinkedColPtr const& col, - column_in_metadata const& col_meta, - single_write_mode write_mode) -{ - if (col_meta.is_nullability_defined()) { - CUDF_EXPECTS(col_meta.nullable() or col->null_count() == 0, - "Mismatch in metadata prescribed nullability and input column. " - "Metadata for input column with nulls cannot prescribe nullability = false"); - return col_meta.nullable(); - } - // For chunked write, when not provided nullability, we assume the worst case scenario - // that all columns are nullable. - return write_mode == single_write_mode::NO or col->nullable(); -} - /** * @brief Construct schema from input columns and per-column input options * * Recursively traverses through linked_columns and corresponding metadata to construct schema tree. * The resulting schema tree is stored in a vector in pre-order traversal order. */ -std::vector construct_schema_tree( +std::vector construct_parquet_schema_tree( cudf::detail::LinkedColVector const& linked_columns, table_input_metadata& metadata, single_write_mode write_mode, bool int96_timestamps, - bool utc_timestamps) + bool utc_timestamps, + bool write_arrow_schema) { std::vector schema; schema_tree_node root{}; @@ -629,7 +628,7 @@ std::vector construct_schema_tree( std::function add_schema = [&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) { - bool const col_nullable = is_col_nullable(col, col_meta, write_mode); + bool const col_nullable = is_output_column_nullable(col, col_meta, write_mode); auto set_field_id = [&schema, parent_idx](schema_tree_node& s, column_in_metadata const& col_meta) { @@ -854,7 +853,7 @@ std::vector construct_schema_tree( right_child_meta.set_name("value"); // check the repetition type of key is required i.e. the col should be non-nullable auto key_col = col->children[lists_column_view::child_column_index]->children[0]; - CUDF_EXPECTS(!is_col_nullable(key_col, left_child_meta, write_mode), + CUDF_EXPECTS(!is_output_column_nullable(key_col, left_child_meta, write_mode), "key column cannot be nullable. For chunked writing, explicitly set the " "nullability to false in metadata"); // process key @@ -886,7 +885,8 @@ std::vector construct_schema_tree( cudf::type_dispatcher( col->type(), - leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96, utc_timestamps}); + leaf_schema_fn{ + col_schema, col, col_meta, timestamp_is_int96, utc_timestamps, write_arrow_schema}); col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name(); @@ -1148,7 +1148,6 @@ void calculate_page_fragments(device_span frag, * * @param frag_stats output statistics * @param frags Input page fragments - * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 * @param stream CUDA stream used for device memory operations and kernel launches */ void gather_fragment_statistics(device_span frag_stats, @@ -1164,32 +1163,6 @@ void gather_fragment_statistics(device_span frag_stats, stream.synchronize(); } -auto to_nvcomp_compression_type(Compression codec) -{ - if (codec == Compression::SNAPPY) return nvcomp::compression_type::SNAPPY; - if (codec == Compression::ZSTD) return nvcomp::compression_type::ZSTD; - // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 - if (codec == Compression::LZ4_RAW) return nvcomp::compression_type::LZ4; - CUDF_FAIL("Unsupported compression type"); -} - -auto page_alignment(Compression codec) -{ - if (codec == Compression::UNCOMPRESSED or - nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { - return 1u; - } - - return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(codec)); -} - -size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) -{ - if (codec == Compression::UNCOMPRESSED) return 0; - - return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); -} - auto init_page_sizes(hostdevice_2dvector& chunks, device_span col_desc, uint32_t num_columns, @@ -1629,23 +1602,127 @@ size_t column_index_buffer_size(EncColumnChunk* ck, } /** - * @brief Fill the table metadata with default column names. + * @brief Convert decimal32 and decimal64 data to decimal128 and return the device vector * - * @param table_meta The table metadata to fill + * @tparam DecimalType to convert from + * + * @param column A view of the input columns + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return A device vector containing the converted decimal128 data */ -void fill_table_meta(std::unique_ptr const& table_meta) +template +rmm::device_uvector<__int128_t> convert_data_to_decimal128(column_view const& column, + rmm::cuda_stream_view stream) { - // Fill unnamed columns' names in table_meta - std::function add_default_name = - [&](column_in_metadata& col_meta, std::string default_name) { - if (col_meta.get_name().empty()) col_meta.set_name(default_name); - for (size_type i = 0; i < col_meta.num_children(); ++i) { - add_default_name(col_meta.child(i), col_meta.get_name() + "_" + std::to_string(i)); - } - }; - for (size_t i = 0; i < table_meta->column_metadata.size(); ++i) { - add_default_name(table_meta->column_metadata[i], "_col" + std::to_string(i)); - } + size_type constexpr BIT_WIDTH_RATIO = sizeof(__int128_t) / sizeof(DecimalType); + + rmm::device_uvector<__int128_t> d128_buffer(column.size(), stream); + + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(column.size()), + [in = column.begin(), + out = reinterpret_cast(d128_buffer.data()), + BIT_WIDTH_RATIO] __device__(auto in_idx) { + auto const out_idx = in_idx * BIT_WIDTH_RATIO; + // The lowest order bits are the value, the remainder + // simply matches the sign bit to satisfy the two's + // complement integer representation of negative numbers. + out[out_idx] = in[in_idx]; +#pragma unroll BIT_WIDTH_RATIO - 1 + for (auto i = 1; i < BIT_WIDTH_RATIO; ++i) { + out[out_idx + i] = in[in_idx] < 0 ? -1 : 0; + } + }); + + return d128_buffer; +} + +/** + * @brief Function to convert decimal32 and decimal64 columns to decimal128 data, + * update the input table metadata, and return a new vector of column views. + * + * @param[in,out] table_meta The table metadata + * @param[in,out] d128_vectors Vector containing the computed decimal128 data buffers. + * @param input The input table + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return A device vector containing the converted decimal128 data + */ +std::vector convert_decimal_columns_and_metadata( + table_input_metadata& table_meta, + std::vector>& d128_vectors, + table_view const& table, + rmm::cuda_stream_view stream) +{ + // Lambda function to convert each decimal32/decimal64 column to decimal128. + std::function convert_column = + [&](column_view column, column_in_metadata& metadata) -> column_view { + // Vector of passable-by-reference children column views + std::vector converted_children; + + // Process children column views first + std::transform( + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(column.num_children()), + std::back_inserter(converted_children), + [&](auto const idx) { return convert_column(column.child(idx), metadata.child(idx)); }); + + // Process this column view. Only convert if decimal32 and decimal64 column. + switch (column.type().id()) { + case type_id::DECIMAL32: + // Convert data to decimal128 type + d128_vectors.emplace_back(convert_data_to_decimal128(column, stream)); + // Update metadata + metadata.set_decimal_precision(MAX_DECIMAL32_PRECISION); + metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()})); + // Create a new column view from the d128 data vector + return {data_type{type_id::DECIMAL128, column.type().scale()}, + column.size(), + d128_vectors.back().data(), + column.null_mask(), + column.null_count(), + column.offset(), + converted_children}; + case type_id::DECIMAL64: + // Convert data to decimal128 type + d128_vectors.emplace_back(convert_data_to_decimal128(column, stream)); + // Update metadata + metadata.set_decimal_precision(MAX_DECIMAL64_PRECISION); + metadata.set_type_length(size_of(data_type{type_id::DECIMAL128, column.type().scale()})); + // Create a new column view from the d128 data vector + return {data_type{type_id::DECIMAL128, column.type().scale()}, + column.size(), + d128_vectors.back().data(), + column.null_mask(), + column.null_count(), + column.offset(), + converted_children}; + default: + // Update the children vector keeping everything else the same + return {column.type(), + column.size(), + column.head(), + column.null_mask(), + column.null_count(), + column.offset(), + converted_children}; + } + }; + + // Vector of converted column views + std::vector converted_column_views; + + // Convert each column view + std::transform( + thrust::make_zip_iterator( + thrust::make_tuple(table.begin(), table_meta.column_metadata.begin())), + thrust::make_zip_iterator(thrust::make_tuple(table.end(), table_meta.column_metadata.end())), + std::back_inserter(converted_column_views), + [&](auto elem) { return convert_column(thrust::get<0>(elem), thrust::get<1>(elem)); }); + + return converted_column_views; } /** @@ -1698,12 +1775,22 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, bool int96_timestamps, bool utc_timestamps, bool write_v2_headers, + bool write_arrow_schema, host_span const> out_sink, rmm::cuda_stream_view stream) { - auto vec = table_to_linked_columns(input); - auto schema_tree = - construct_schema_tree(vec, table_meta, write_mode, int96_timestamps, utc_timestamps); + // Container to store decimal128 converted data if needed + std::vector> d128_vectors; + + // Convert decimal32/decimal64 data to decimal128 if writing arrow schema + // and initialize LinkedColVector + auto vec = table_to_linked_columns( + (write_arrow_schema) + ? table_view({convert_decimal_columns_and_metadata(table_meta, d128_vectors, input, stream)}) + : input); + + auto schema_tree = construct_parquet_schema_tree( + vec, table_meta, write_mode, int96_timestamps, utc_timestamps, write_arrow_schema); // Construct parquet_column_views from the schema tree leaf nodes. std::vector parquet_columns; @@ -1826,7 +1913,14 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::unique_ptr agg_meta; if (!curr_agg_meta) { agg_meta = std::make_unique( - partitions, kv_meta, this_table_schema, num_columns, stats_granularity); + partitions, + kv_meta, + this_table_schema, + num_columns, + stats_granularity, + (write_arrow_schema) + ? construct_arrow_schema_ipc_message(vec, table_meta, write_mode, utc_timestamps) + : ""); } else { agg_meta = std::make_unique(*curr_agg_meta); @@ -2307,6 +2401,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), + _write_arrow_schema(options.is_enabled_write_arrow_schema()), _sorting_columns(options.get_sorting_columns()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), @@ -2337,6 +2432,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), + _write_arrow_schema(options.is_enabled_write_arrow_schema()), _sorting_columns(options.get_sorting_columns()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), @@ -2378,7 +2474,7 @@ void writer::impl::write(table_view const& input, std::vector co CUDF_EXPECTS(not _closed, "Data has already been flushed to out and closed"); if (not _table_meta) { _table_meta = std::make_unique(input); } - fill_table_meta(_table_meta); + fill_table_meta(*_table_meta); // All kinds of memory allocation and data compressions/encoding are performed here. // If any error occurs, such as out-of-memory exception, the internal state of the current @@ -2415,6 +2511,7 @@ void writer::impl::write(table_view const& input, std::vector co _int96_timestamps, _utc_timestamps, _write_v2_headers, + _write_arrow_schema, _out_sink, _stream); } catch (...) { // catch any exception type diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 784f78f06d5..63128faf993 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -156,6 +156,7 @@ class writer::impl { bool const _int96_timestamps; bool const _utc_timestamps; bool const _write_v2_headers; + bool const _write_arrow_schema; std::optional> _sorting_columns; int32_t const _column_index_truncate_length; std::vector> const _kv_meta; // Optional user metadata. diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp new file mode 100644 index 00000000000..e2f09f872d3 --- /dev/null +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file writer_impl_helpers.cpp + * @brief Helper function implementation for Parquet writer + */ + +#include "writer_impl_helpers.hpp" + +#include +#include +#include +#include + +namespace cudf::io::parquet::detail { + +using namespace cudf::io::detail; + +Compression to_parquet_compression(compression_type compression) +{ + switch (compression) { + case compression_type::AUTO: + case compression_type::SNAPPY: return Compression::SNAPPY; + case compression_type::ZSTD: return Compression::ZSTD; + case compression_type::LZ4: + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + return Compression::LZ4_RAW; + case compression_type::NONE: return Compression::UNCOMPRESSED; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +nvcomp::compression_type to_nvcomp_compression_type(Compression codec) +{ + switch (codec) { + case Compression::SNAPPY: return nvcomp::compression_type::SNAPPY; + case Compression::ZSTD: return nvcomp::compression_type::ZSTD; + // Parquet refers to LZ4 as "LZ4_RAW"; Parquet's "LZ4" is not standard LZ4 + case Compression::LZ4_RAW: return nvcomp::compression_type::LZ4; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +uint32_t page_alignment(Compression codec) +{ + if (codec == Compression::UNCOMPRESSED or + nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) { + return 1u; + } + + return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(codec)); +} + +size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) +{ + if (codec == Compression::UNCOMPRESSED) return 0; + + return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); +} + +void fill_table_meta(table_input_metadata& table_meta) +{ + // Fill unnamed columns' names in table_meta + std::function add_default_name = + [&](column_in_metadata& col_meta, std::string default_name) { + if (col_meta.get_name().empty()) col_meta.set_name(default_name); + for (size_type i = 0; i < col_meta.num_children(); ++i) { + add_default_name(col_meta.child(i), col_meta.get_name() + "_" + std::to_string(i)); + } + }; + for (size_t i = 0; i < table_meta.column_metadata.size(); ++i) { + add_default_name(table_meta.column_metadata[i], "_col" + std::to_string(i)); + } +} + +[[nodiscard]] size_t column_size(column_view const& column, rmm::cuda_stream_view stream) +{ + if (column.is_empty()) { return 0; } + + if (is_fixed_width(column.type())) { + return size_of(column.type()) * column.size(); + } else if (column.type().id() == type_id::STRING) { + auto const scol = strings_column_view(column); + return cudf::strings::detail::get_offset_value( + scol.offsets(), column.size() + column.offset(), stream) - + cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream); + } else if (column.type().id() == type_id::STRUCT) { + auto const scol = structs_column_view(column); + size_t ret = 0; + for (int i = 0; i < scol.num_children(); i++) { + ret += column_size(scol.get_sliced_child(i, stream), stream); + } + return ret; + } else if (column.type().id() == type_id::LIST) { + auto const lcol = lists_column_view(column); + return column_size(lcol.get_sliced_child(stream), stream); + } + + CUDF_FAIL("Unexpected compound type"); +} + +[[nodiscard]] bool is_output_column_nullable(cudf::detail::LinkedColPtr const& column, + column_in_metadata const& column_metadata, + single_write_mode write_mode) +{ + if (column_metadata.is_nullability_defined()) { + CUDF_EXPECTS(column_metadata.nullable() or column->null_count() == 0, + "Mismatch in metadata prescribed nullability and input column. " + "Metadata for input column with nulls cannot prescribe nullability = false"); + return column_metadata.nullable(); + } + // For chunked write, when not provided nullability, we assume the worst case scenario + // that all columns are nullable. + return write_mode == single_write_mode::NO or column->nullable(); +} + +} // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp new file mode 100644 index 00000000000..a85411594e9 --- /dev/null +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file writer_impl_helpers.hpp + * @brief Helper function implementation for Parquet writer + */ + +#pragma once +#include "io/comp/nvcomp_adapter.hpp" +#include "parquet_common.hpp" + +#include +#include + +namespace cudf::io::parquet::detail { + +/** + * @brief Function that translates GDF compression to parquet compression. + * + * @param compression The compression type + * @return The supported Parquet compression + */ +Compression to_parquet_compression(compression_type compression); + +/** + * @brief Function that translates the given compression codec to nvcomp compression type. + * + * @param codec Compression codec + * @return Translated nvcomp compression type + */ +nvcomp::compression_type to_nvcomp_compression_type(Compression codec); + +/** + * @brief Function that computes input alignment requirements for the given compression type. + * + * @param codec Compression codec + * @return Required alignment + */ +uint32_t page_alignment(Compression codec); + +/** + * @brief Gets the maximum compressed chunk size for the largest chunk uncompressed chunk in the + * batch. + * + * @param codec Compression codec + * @param compression_blocksize Size of the largest uncompressed chunk in the batch + * @return Maximum compressed chunk size + */ +size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize); + +/** + * @brief Fill the table metadata with default column names. + * + * @param table_meta The table metadata to fill + */ +void fill_table_meta(table_input_metadata& table_meta); + +/** + * @brief Compute size (in bytes) of the data stored in the given column. + * + * @param column The input column + * @param stream CUDA stream used for device memory operations and kernel launches + * @return The data size of the input + */ +[[nodiscard]] size_t column_size(column_view const& column, rmm::cuda_stream_view stream); + +/** + * @brief Indicates if the column should be marked as nullable in the output schema + * + * Returns `true` if the input column is nullable or if the write mode is not set to + * write the table all at once instead of chunked. + * + * @param column A view of the (linked) column + * @param column_metadata Metadata of the column + * @param write_mode Flag to indicate that we are guaranteeing a single table write + * + * @return Whether the column is nullable. + */ +[[nodiscard]] bool is_output_column_nullable(cudf::detail::LinkedColPtr const& column, + column_in_metadata const& column_metadata, + ::cudf::io::detail::single_write_mode write_mode); + +} // namespace cudf::io::parquet::detail diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index a1f4c7b81d8..e07ebe25322 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -35,7 +35,7 @@ using cudf::test::iterators::no_nulls; template -void test_durations(mask_op_t mask_op, bool use_byte_stream_split) +void test_durations(mask_op_t mask_op, bool use_byte_stream_split, bool arrow_schema) { std::default_random_engine generator; std::uniform_int_distribution distribution_d(0, 30); @@ -76,20 +76,27 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split) auto filepath = temp_env->get_temp_filepath("Durations.parquet"); cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_arrow_schema(arrow_schema); + cudf::io::write_parquet(out_opts); cudf::io::parquet_reader_options in_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .use_arrow_schema(arrow_schema); auto result = cudf::io::read_parquet(in_opts); auto durations_d_got = cudf::cast(result.tbl->view().column(0), cudf::data_type{cudf::type_id::DURATION_DAYS}); CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_d, durations_d_got->view()); - auto durations_s_got = - cudf::cast(result.tbl->view().column(1), cudf::data_type{cudf::type_id::DURATION_SECONDS}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, durations_s_got->view()); + if (arrow_schema) { + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, result.tbl->view().column(1)); + } else { + auto durations_s_got = + cudf::cast(result.tbl->view().column(1), cudf::data_type{cudf::type_id::DURATION_SECONDS}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_s, durations_s_got->view()); + } CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_ms, result.tbl->view().column(2)); CUDF_TEST_EXPECT_COLUMNS_EQUAL(durations_us, result.tbl->view().column(3)); @@ -98,10 +105,15 @@ void test_durations(mask_op_t mask_op, bool use_byte_stream_split) TEST_F(ParquetWriterTest, Durations) { - test_durations([](auto i) { return true; }, false); - test_durations([](auto i) { return (i % 2) != 0; }, false); - test_durations([](auto i) { return (i % 3) != 0; }, false); - test_durations([](auto i) { return false; }, false); + test_durations([](auto i) { return true; }, false, false); + test_durations([](auto i) { return (i % 2) != 0; }, false, false); + test_durations([](auto i) { return (i % 3) != 0; }, false, false); + test_durations([](auto i) { return false; }, false, false); + + test_durations([](auto i) { return true; }, false, true); + test_durations([](auto i) { return (i % 2) != 0; }, false, true); + test_durations([](auto i) { return (i % 3) != 0; }, false, true); + test_durations([](auto i) { return false; }, false, true); } TEST_F(ParquetWriterTest, MultiIndex) @@ -493,6 +505,50 @@ TEST_F(ParquetWriterTest, DecimalWrite) CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, table); } +TEST_F(ParquetWriterTest, DecimalWriteWithArrowSchema) +{ + constexpr cudf::size_type num_rows = 500; + auto seq_col0 = random_values(num_rows); + auto seq_col1 = random_values(num_rows); + + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2 == 0; }); + + auto col0 = cudf::test::fixed_point_column_wrapper{ + seq_col0.begin(), seq_col0.end(), valids, numeric::scale_type{5}}; + auto col1 = cudf::test::fixed_point_column_wrapper{ + seq_col1.begin(), seq_col1.end(), valids, numeric::scale_type{-9}}; + + auto table = table_view({col0, col1}); + + auto filepath = temp_env->get_temp_filepath("DecimalWriteWithArrowSchema.parquet"); + cudf::io::parquet_writer_options args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) + .write_arrow_schema(true); + + cudf::io::table_input_metadata expected_metadata(table); + // verify success if equal precision is given + expected_metadata.column_metadata[0].set_decimal_precision( + cudf::io::parquet::detail::MAX_DECIMAL32_PRECISION); + expected_metadata.column_metadata[1].set_decimal_precision( + cudf::io::parquet::detail::MAX_DECIMAL64_PRECISION); + args.set_metadata(std::move(expected_metadata)); + cudf::io::write_parquet(args); + + auto expected_col0 = cudf::test::fixed_point_column_wrapper<__int128_t>{ + seq_col0.begin(), seq_col0.end(), valids, numeric::scale_type{5}}; + auto expected_col1 = cudf::test::fixed_point_column_wrapper<__int128_t>{ + seq_col1.begin(), seq_col1.end(), valids, numeric::scale_type{-9}}; + + auto expected_table = table_view({expected_col0, expected_col1}); + + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(read_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, expected_table); +} + TEST_F(ParquetWriterTest, RowGroupSizeInvalid) { auto const unused_table = std::make_unique(); @@ -1935,10 +1991,15 @@ TEST_F(ParquetWriterTest, DecimalByteStreamSplit) TEST_F(ParquetWriterTest, DurationByteStreamSplit) { - test_durations([](auto i) { return true; }, true); - test_durations([](auto i) { return (i % 2) != 0; }, true); - test_durations([](auto i) { return (i % 3) != 0; }, true); - test_durations([](auto i) { return false; }, true); + test_durations([](auto i) { return true; }, true, false); + test_durations([](auto i) { return (i % 2) != 0; }, true, false); + test_durations([](auto i) { return (i % 3) != 0; }, true, false); + test_durations([](auto i) { return false; }, true, false); + + test_durations([](auto i) { return true; }, true, true); + test_durations([](auto i) { return (i % 2) != 0; }, true, true); + test_durations([](auto i) { return (i % 3) != 0; }, true, true); + test_durations([](auto i) { return false; }, true, true); } TEST_F(ParquetWriterTest, WriteFixedLenByteArray) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d1ec5be9e62..158fb6051c3 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -440,6 +440,7 @@ def write_parquet( object column_encoding=None, object column_type_length=None, object output_as_binary=None, + write_arrow_schema=False, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -544,6 +545,7 @@ def write_parquet( .write_v2_headers(header_version == "2.0") .dictionary_policy(dict_policy) .utc_timestamps(False) + .write_arrow_schema(write_arrow_schema) .build() ) if partitions_info is not None: @@ -623,6 +625,9 @@ cdef class ParquetWriter: If ``True``, enable dictionary encoding for Parquet page data subject to ``max_dictionary_size`` constraints. If ``False``, disable dictionary encoding for Parquet page data. + store_schema : bool, default False + If ``True``, enable computing and writing arrow schema to Parquet + file footer's key-value metadata section for faithful round-tripping. See Also -------- cudf.io.parquet.write_parquet @@ -641,6 +646,7 @@ cdef class ParquetWriter: cdef size_type max_page_size_rows cdef size_t max_dictionary_size cdef cudf_io_types.dictionary_policy dict_policy + cdef bool write_arrow_schema def __cinit__(self, object filepath_or_buffer, object index=None, object compression="snappy", str statistics="ROWGROUP", @@ -649,7 +655,8 @@ cdef class ParquetWriter: int max_page_size_bytes=524288, int max_page_size_rows=20000, int max_dictionary_size=1048576, - bool use_dictionary=True): + bool use_dictionary=True, + bool store_schema=False): filepaths_or_buffers = ( list(filepath_or_buffer) if is_list_like(filepath_or_buffer) @@ -670,6 +677,7 @@ cdef class ParquetWriter: if use_dictionary else cudf_io_types.dictionary_policy.NEVER ) + self.write_arrow_schema = store_schema def write_table(self, table, object partitions_info=None): """ Writes a single table to the file """ @@ -788,6 +796,7 @@ cdef class ParquetWriter: .max_page_size_bytes(self.max_page_size_bytes) .max_page_size_rows(self.max_page_size_rows) .max_dictionary_size(self.max_dictionary_size) + .write_arrow_schema(self.write_arrow_schema) .build() ) args.set_dictionary_policy(self.dict_policy) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 0ef6553db56..c38f39f7749 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -78,6 +78,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: size_t get_max_page_size_bytes() except + size_type get_max_page_size_rows() except + size_t get_max_dictionary_size() except + + bool is_enabled_write_arrow_schema() except + void set_metadata( cudf_io_types.table_input_metadata m @@ -103,6 +104,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_max_page_size_rows(size_type val) except + void set_max_dictionary_size(size_t val) except + void enable_write_v2_headers(bool val) except + + void enable_write_arrow_schema(bool val) except + void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + cdef cppclass parquet_writer_options(parquet_writer_options_base): @@ -143,6 +145,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: BuilderT& utc_timestamps( bool enabled ) except + + BuilderT& write_arrow_schema( + bool enabled + ) except + BuilderT& row_group_size_bytes( size_t val ) except + diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7733e770d99..fd0792b5edb 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -73,6 +73,7 @@ def _write_parquet( column_encoding=None, column_type_length=None, output_as_binary=None, + write_arrow_schema=True, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -110,6 +111,7 @@ def _write_parquet( "column_encoding": column_encoding, "column_type_length": column_type_length, "output_as_binary": output_as_binary, + "write_arrow_schema": write_arrow_schema, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -154,6 +156,7 @@ def write_to_dataset( column_encoding=None, column_type_length=None, output_as_binary=None, + store_schema=False, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -242,6 +245,9 @@ def write_to_dataset( output_as_binary : set, optional, default None If a column name is present in the set, that column will be output as unannotated binary, rather than the default 'UTF-8'. + store_schema : bool, default False + If ``True``, enable computing and writing arrow schema to Parquet + file footer's key-value metadata section for faithful round-tripping. """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) @@ -285,6 +291,7 @@ def write_to_dataset( column_encoding=column_encoding, column_type_length=column_type_length, output_as_binary=output_as_binary, + store_schema=store_schema, ) else: @@ -312,6 +319,7 @@ def write_to_dataset( column_encoding=column_encoding, column_type_length=column_type_length, output_as_binary=output_as_binary, + store_schema=store_schema, ) return metadata @@ -968,6 +976,7 @@ def to_parquet( column_encoding=None, column_type_length=None, output_as_binary=None, + store_schema=False, *args, **kwargs, ): @@ -1023,6 +1032,7 @@ def to_parquet( column_encoding=column_encoding, column_type_length=column_type_length, output_as_binary=output_as_binary, + store_schema=store_schema, ) partition_info = ( @@ -1055,6 +1065,7 @@ def to_parquet( column_encoding=column_encoding, column_type_length=column_type_length, output_as_binary=output_as_binary, + write_arrow_schema=store_schema, ) else: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 588bc87d268..ff0c9040737 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1617,7 +1617,11 @@ def test_parquet_writer_int96_timestamps(tmpdir, pdf, gdf): assert_eq(pdf, gdf) # Write out the gdf using the GPU accelerated writer with INT96 timestamps - gdf.to_parquet(gdf_fname.strpath, index=None, int96_timestamps=True) + gdf.to_parquet( + gdf_fname.strpath, + index=None, + int96_timestamps=True, + ) assert os.path.exists(gdf_fname) @@ -1789,10 +1793,11 @@ def test_parquet_write_bytes_io(simple_gdf): assert_eq(cudf.read_parquet(output), simple_gdf) -def test_parquet_writer_bytes_io(simple_gdf): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_writer_bytes_io(simple_gdf, store_schema): output = BytesIO() - writer = ParquetWriter(output) + writer = ParquetWriter(output, store_schema=store_schema) writer.write_table(simple_gdf) writer.write_table(simple_gdf) writer.close() @@ -2124,7 +2129,8 @@ def test_parquet_writer_chunked_partitioned_context(tmpdir_factory): @pytest.mark.parametrize("cols", [None, ["b"]]) -def test_parquet_write_to_dataset(tmpdir_factory, cols): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_write_to_dataset(tmpdir_factory, cols, store_schema): dir1 = tmpdir_factory.mktemp("dir1") dir2 = tmpdir_factory.mktemp("dir2") if cols is None: @@ -2140,7 +2146,7 @@ def test_parquet_write_to_dataset(tmpdir_factory, cols): "b": np.random.choice(np.arange(4), size=size), } ) - gdf.to_parquet(dir1, partition_cols=cols) + gdf.to_parquet(dir1, partition_cols=cols, store_schema=store_schema) cudf.io.write_to_dataset(gdf, dir2, partition_cols=cols) # Read back with cudf @@ -2156,7 +2162,7 @@ def test_parquet_write_to_dataset(tmpdir_factory, cols): } ) with pytest.raises(ValueError): - gdf.to_parquet(dir1, partition_cols=cols) + gdf.to_parquet(dir1, partition_cols=cols, store_schema=store_schema) @pytest.mark.parametrize( @@ -2386,7 +2392,8 @@ def test_parquet_writer_list_large_mixed(tmpdir): assert_eq(expect, got) -def test_parquet_writer_list_chunked(tmpdir): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_writer_list_chunked(tmpdir, store_schema): table1 = cudf.DataFrame( { "a": list_gen(string_gen, 128, 80, 50), @@ -2407,7 +2414,7 @@ def test_parquet_writer_list_chunked(tmpdir): expect = cudf.concat([table1, table2]) expect = expect.reset_index(drop=True) - writer = ParquetWriter(fname) + writer = ParquetWriter(fname, store_schema=store_schema) writer.write_table(table1) writer.write_table(table2) writer.close() @@ -2542,6 +2549,10 @@ def normalized_equals(value1, value2): value1 = None if value2 is pd.NA or value2 is pd.NaT: value2 = None + if isinstance(value1, np.datetime64): + value1 = pd.Timestamp(value1).to_pydatetime() + if isinstance(value2, np.datetime64): + value2 = pd.Timestamp(value2).to_pydatetime() if isinstance(value1, pd.Timestamp): value1 = value1.to_pydatetime() if isinstance(value2, pd.Timestamp): @@ -2550,6 +2561,9 @@ def normalized_equals(value1, value2): value1 = value1.replace(tzinfo=None) if isinstance(value2, datetime.datetime): value2 = value2.replace(tzinfo=None) + if isinstance(value1, pd.Timedelta): + unit = "ms" if value1.unit == "s" else value1.unit + value2 = pd.Timedelta(value2, unit=unit) # if one is datetime then both values are datetimes now if isinstance(value1, datetime.datetime): @@ -2563,7 +2577,8 @@ def normalized_equals(value1, value2): @pytest.mark.parametrize("add_nulls", [True, False]) -def test_parquet_writer_statistics(tmpdir, pdf, add_nulls): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_writer_statistics(tmpdir, pdf, add_nulls, store_schema): file_path = tmpdir.join("cudf.parquet") if "col_category" in pdf.columns: pdf = pdf.drop(columns=["col_category", "col_bool"]) @@ -2580,7 +2595,7 @@ def test_parquet_writer_statistics(tmpdir, pdf, add_nulls): if add_nulls: for col in gdf: set_random_null_mask_inplace(gdf[col]) - gdf.to_parquet(file_path, index=False) + gdf.to_parquet(file_path, index=False, store_schema=store_schema) # Read back from pyarrow pq_file = pq.ParquetFile(file_path) @@ -3205,7 +3220,8 @@ def test_parquet_writer_zstd(): assert_eq(expected, got) -def test_parquet_writer_time_delta_physical_type(): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_writer_time_delta_physical_type(store_schema): df = cudf.DataFrame( { "s": cudf.Series([1], dtype="timedelta64[s]"), @@ -3217,22 +3233,35 @@ def test_parquet_writer_time_delta_physical_type(): } ) buffer = BytesIO() - df.to_parquet(buffer) + df.to_parquet(buffer, store_schema=store_schema) got = pd.read_parquet(buffer) - expected = pd.DataFrame( - { - "s": ["00:00:01"], - "ms": ["00:00:00.002000"], - "us": ["00:00:00.000003"], - "ns": ["00:00:00.000004"], - }, - dtype="str", - ) + + if store_schema: + expected = pd.DataFrame( + { + "s": ["0 days 00:00:01"], + "ms": ["0 days 00:00:00.002000"], + "us": ["0 days 00:00:00.000003"], + "ns": ["0 days 00:00:00.000004"], + }, + dtype="str", + ) + else: + expected = pd.DataFrame( + { + "s": ["00:00:01"], + "ms": ["00:00:00.002000"], + "us": ["00:00:00.000003"], + "ns": ["00:00:00.000004"], + }, + dtype="str", + ) assert_eq(got.astype("str"), expected) -def test_parquet_roundtrip_time_delta(): +@pytest.mark.parametrize("store_schema", [True, False]) +def test_parquet_roundtrip_time_delta(store_schema): num_rows = 12345 df = cudf.DataFrame( { @@ -3255,10 +3284,11 @@ def test_parquet_roundtrip_time_delta(): } ) buffer = BytesIO() - df.to_parquet(buffer) - # TODO: Remove `check_dtype` once following issue is fixed in arrow: - # https://github.com/apache/arrow/issues/33321 + df.to_parquet(buffer, store_schema=store_schema) + # `check_dtype` cannot be removed here as timedelta64[s] will change to `timedelta[ms]` assert_eq(df, cudf.read_parquet(buffer), check_dtype=False) + if store_schema: + assert_eq(df, pd.read_parquet(buffer)) def test_parquet_reader_malformed_file(datadir): @@ -3420,35 +3450,87 @@ def test_parquet_reader_roundtrip_with_arrow_schema(): # Check results for reader with schema assert_eq(expected, got) + # Reset buffer + buffer = BytesIO() -def test_parquet_reader_roundtrip_structs_with_arrow_schema(): - # Ensure that the structs with duration types are faithfully being - # roundtripped across Parquet with arrow schema - pdf = pd.DataFrame( - { - "struct": { - "payload": { - "Domain": { - "Name": "abc", - "Id": {"Name": "host", "Value": "127.0.0.8"}, - "Duration": datetime.timedelta(minutes=12), - }, - "StreamId": "12345678", - "Duration": datetime.timedelta(minutes=4), - "Offset": None, - "Resource": [ - { - "Name": "ZoneName", - "Value": "RAPIDS", - "Duration": datetime.timedelta(seconds=1), - } - ], + # Write to buffer with cudf + expected.to_parquet(buffer, store_schema=True) + + # Read parquet with arrow schema + got = cudf.read_parquet(buffer) + # Convert to cudf table for an apple to apple comparison + expected = cudf.from_pandas(pdf) + + +@pytest.mark.parametrize( + "data", + [ + # struct + [ + {"a": 1, "b": 2}, + {"a": 10, "b": 20}, + {"a": None, "b": 22}, + {"a": None, "b": None}, + {"a": 15, "b": None}, + ], + # struct-of-list + [ + {"a": 1, "b": 2, "c": [1, 2, 3]}, + {"a": 10, "b": 20, "c": [4, 5]}, + {"a": None, "b": 22, "c": [6]}, + {"a": None, "b": None, "c": None}, + {"a": 15, "b": None, "c": [-1, -2]}, + None, + {"a": 100, "b": 200, "c": [-10, None, -20]}, + ], + # list-of-struct + [ + [{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 4, "b": 5}], + None, + [{"a": 10, "b": 20}], + [{"a": 100, "b": 200}, {"a": None, "b": 300}, None], + ], + # struct-of-struct + [ + {"a": 1, "b": {"inner_a": 10, "inner_b": 20}, "c": 2}, + {"a": 3, "b": {"inner_a": 30, "inner_b": 40}, "c": 4}, + {"a": 5, "b": {"inner_a": 50, "inner_b": None}, "c": 6}, + {"a": 7, "b": None, "c": 8}, + {"a": None, "b": {"inner_a": None, "inner_b": None}, "c": None}, + None, + {"a": None, "b": {"inner_a": None, "inner_b": 100}, "c": 10}, + ], + # struct-with-mixed-types + [ + { + "struct": { + "payload": { + "Domain": { + "Name": "abc", + "Id": {"Name": "host", "Value": "127.0.0.8"}, + "Duration": datetime.timedelta(minutes=12), + }, + "StreamId": "12345678", + "Duration": datetime.timedelta(minutes=4), + "Offset": None, + "Resource": [ + { + "Name": "ZoneName", + "Value": "RAPIDS", + "Duration": datetime.timedelta(seconds=1), + } + ], + } } } - } - ) + ], + ], +) +def test_parquet_reader_roundtrip_structs_with_arrow_schema(tmpdir, data): + # Ensure that the structs with duration types are faithfully being + # roundtripped across Parquet with arrow schema + pdf = pd.DataFrame({"struct": pd.Series(data)}) - # Reset the buffer and write parquet with arrow buffer = BytesIO() pdf.to_parquet(buffer, engine="pyarrow") @@ -3460,6 +3542,203 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): # Check results assert_eq(expected, got) + # Reset buffer + buffer = BytesIO() + + # Write to buffer with cudf + expected.to_parquet(buffer, store_schema=True) + + # Read parquet with arrow schema + got = cudf.read_parquet(buffer) + # Convert to cudf table for an apple to apple comparison + expected = cudf.from_pandas(pdf) + + # Check results + assert_eq(expected, got) + + +@pytest.mark.parametrize("index", [None, True, False]) +def test_parquet_writer_roundtrip_with_arrow_schema(index): + # Ensure that the concrete and nested types are faithfully being roundtripped + # across Parquet with arrow schema + expected = cudf.DataFrame( + { + "s": cudf.Series([None, None, None], dtype="timedelta64[s]"), + "us": cudf.Series([None, 3456, None], dtype="timedelta64[us]"), + "duration_list": list( + [ + [ + datetime.timedelta(minutes=7, seconds=4), + datetime.timedelta(minutes=7), + ], + [ + None, + None, + ], + [ + datetime.timedelta(minutes=7, seconds=4), + None, + ], + ] + ), + "int64": cudf.Series([-1234, 123, 4123], dtype="int64"), + "uint32": cudf.Series([1234, 123, 4123], dtype="uint32"), + "list": list([[1, 2], [1, 2], [1, 2]]), + "bool": cudf.Series([True, None, False], dtype=bool), + "fixed32": cudf.Series([0.00, 1.0, None]).astype( + cudf.Decimal32Dtype(7, 2) + ), + "fixed64": cudf.Series([0.00, 1.0, None]).astype( + cudf.Decimal64Dtype(7, 2) + ), + "fixed128": cudf.Series([0.00, 1.0, None]).astype( + cudf.Decimal128Dtype(7, 2) + ), + "datetime": cudf.Series([1234, 123, 4123], dtype="datetime64[ms]"), + "map": cudf.Series(["cat", "dog", "lion"]).map( + {"cat": "kitten", "dog": "puppy", "lion": "cub"} + ), + } + ) + + # Write to Parquet with arrow schema for faithful roundtrip + buffer = BytesIO() + expected.to_parquet(buffer, store_schema=True, index=index) + + # Convert decimal types to d128 + expected = expected.astype({"fixed32": cudf.Decimal128Dtype(9, 2)}) + expected = expected.astype({"fixed64": cudf.Decimal128Dtype(18, 2)}) + + # Read parquet with pyarrow, pandas and cudf readers + got = cudf.DataFrame.from_arrow(pq.read_table(buffer)) + got2 = cudf.DataFrame.from_pandas(pd.read_parquet(buffer)) + got3 = cudf.read_parquet(buffer) + + # drop the index column for comparison: __index_level_0__ + if index: + got.drop(columns="__index_level_0__", inplace=True) + got2.drop(columns="__index_level_0__", inplace=True) + + # Check results + assert_eq(expected, got) + assert_eq(expected, got2) + assert_eq(expected, got3) + + +def test_parquet_writer_int96_timestamps_and_arrow_schema(): + df = cudf.DataFrame( + { + "timestamp": cudf.Series( + [1234, 123, 4123], dtype="datetime64[ms]" + ), + } + ) + + # Output buffer + buffer = BytesIO() + + # Writing out parquet with both INT96 timestamps and arrow_schema + # enabled should throw an exception. + with pytest.raises(RuntimeError): + df.to_parquet(buffer, int96_timestamps=True, store_schema=True) + + +@pytest.mark.parametrize( + "data", + [ + # struct + [ + {"a": 1, "b": 2}, + {"a": 10, "b": 20}, + {"a": None, "b": 22}, + {"a": None, "b": None}, + {"a": 15, "b": None}, + ], + # struct-of-list + [ + {"a": 1, "b": 2, "c": [1, 2, 3]}, + {"a": 10, "b": 20, "c": [4, 5]}, + {"a": None, "b": 22, "c": [6]}, + {"a": None, "b": None, "c": None}, + {"a": 15, "b": None, "c": [-1, -2]}, + None, + {"a": 100, "b": 200, "c": [-10, None, -20]}, + ], + # list-of-struct + [ + [{"a": 1, "b": 2}, {"a": 2, "b": 3}, {"a": 4, "b": 5}], + None, + [{"a": 10, "b": 20}], + [{"a": 100, "b": 200}, {"a": None, "b": 300}, None], + ], + # struct-of-struct + [ + {"a": 1, "b": {"inner_a": 10, "inner_b": 20}, "c": 2}, + {"a": 3, "b": {"inner_a": 30, "inner_b": 40}, "c": 4}, + {"a": 5, "b": {"inner_a": 50, "inner_b": None}, "c": 6}, + {"a": 7, "b": None, "c": 8}, + {"a": None, "b": {"inner_a": None, "inner_b": None}, "c": None}, + None, + {"a": None, "b": {"inner_a": None, "inner_b": 100}, "c": 10}, + ], + # struct-with-mixed-types + [ + { + "struct": { + "payload": { + "Domain": { + "Name": "abc", + "Id": {"Name": "host", "Value": "127.0.0.8"}, + "Duration": datetime.timedelta(minutes=12), + }, + "StreamId": "12345678", + "Duration": datetime.timedelta(minutes=4), + "Offset": None, + "Resource": [ + { + "Name": "ZoneName", + "Value": "RAPIDS", + "Duration": datetime.timedelta(seconds=1), + } + ], + } + } + } + ], + ], +) +@pytest.mark.parametrize("index", [None, True, False]) +def test_parquet_writer_roundtrip_structs_with_arrow_schema( + tmpdir, data, index +): + # Ensure that the structs are faithfully being roundtripped across + # Parquet with arrow schema + pa_expected = pa.Table.from_pydict({"struct": data}) + + expected = cudf.DataFrame.from_arrow(pa_expected) + + # Write expected data frame to Parquet with arrow schema + buffer = BytesIO() + expected.to_parquet(buffer, store_schema=True, index=index) + + # Read Parquet with pyarrow + pa_got = pq.read_table(buffer) + + # drop the index column for comparison: __index_level_0__ + if index: + pa_got = pa_got.drop(columns="__index_level_0__") + + # Check results + assert_eq(pa_expected, pa_got) + + # Convert to cuDF table and also read Parquet with cuDF reader + got = cudf.DataFrame.from_arrow(pa_got) + got2 = cudf.read_parquet(buffer) + + # Check results + assert_eq(expected, got) + assert_eq(expected, got2) + @pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) @pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0209c692935..76c7f2bfdb8 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -322,6 +322,12 @@ output_as_binary : set, optional, default None If a column name is present in the set, that column will be output as unannotated binary, rather than the default 'UTF-8'. +store_schema : bool, default False + If ``True``, writes arrow schema to Parquet file footer's key-value + metadata section to faithfully round-trip ``duration`` types with arrow. + This cannot be used with ``int96_timestamps`` enabled as int96 timestamps + are deprecated in arrow. Also, all decimal32 and decimal64 columns will be + converted to decimal128 as arrow only supports decimal128 and decimal256 types. **kwargs Additional parameters will be passed to execution engines other than ``cudf``.