From d192fc4c0ae894b58a55df6265b2e582a1fe4e31 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Fri, 31 May 2024 19:09:35 +0000 Subject: [PATCH 01/12] Deprecate INT96 timestamps --- cpp/include/cudf/io/parquet.hpp | 64 ---------------- cpp/include/cudf/io/parquet_metadata.hpp | 1 - cpp/include/cudf/io/types.hpp | 29 +------ cpp/src/io/parquet/chunk_dict.cu | 1 - cpp/src/io/parquet/decode_fixed.cu | 2 - cpp/src/io/parquet/page_data.cu | 2 - cpp/src/io/parquet/page_data.cuh | 75 ------------------- cpp/src/io/parquet/page_decode.cuh | 3 - cpp/src/io/parquet/page_enc.cu | 60 --------------- cpp/src/io/parquet/parquet_common.hpp | 1 - cpp/src/io/parquet/predicate_pushdown.cpp | 4 - cpp/src/io/parquet/reader_impl_helpers.cpp | 3 - cpp/src/io/parquet/writer_impl.cu | 73 ++++++------------ cpp/src/io/parquet/writer_impl.hpp | 1 - cpp/src/io/statistics/column_statistics.cuh | 43 +++-------- .../io/statistics/orc_column_statistics.cu | 3 +- .../statistics/parquet_column_statistics.cu | 3 +- .../statistics_type_identification.cuh | 19 +---- cpp/tests/io/parquet_writer_test.cpp | 24 ------ python/cudf/cudf/_lib/parquet.pyx | 3 - .../_lib/pylibcudf/libcudf/io/parquet.pxd | 12 --- .../cudf/_lib/pylibcudf/libcudf/io/types.pxd | 1 - python/cudf/cudf/core/dataframe.py | 2 - python/cudf/cudf/io/parquet.py | 14 ---- python/cudf/cudf/tests/test_parquet.py | 27 ------- python/cudf/cudf/utils/ioutils.py | 6 -- python/dask_cudf/dask_cudf/io/parquet.py | 2 - 27 files changed, 40 insertions(+), 438 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index b2f949cdcee..a4d05d5d73f 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -596,9 +596,6 @@ class parquet_writer_options { std::optional _metadata; // Optional footer key_value_metadata std::vector> _user_data; - // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. - // If true then overrides any per-column setting in _metadata. - bool _write_timestamps_as_int96 = false; // Parquet writer can write timestamps as UTC // Defaults to true because libcudf timestamps are implicitly UTC bool _write_timestamps_as_UTC = true; @@ -717,13 +714,6 @@ class parquet_writer_options { return _user_data; } - /** - * @brief Returns `true` if timestamps will be written as INT96 - * - * @return `true` if timestamps will be written as INT96 - */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } - /** * @brief Returns `true` if timestamps will be written as UTC * @@ -867,14 +857,6 @@ class parquet_writer_options { */ void set_compression(compression_type compression) { _compression = compression; } - /** - * @brief Sets timestamp writing preferences. INT96 timestamps will be written - * if `true` and TIMESTAMP_MICROS will be written if `false`. - * - * @param req Boolean value to enable/disable writing of INT96 timestamps - */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } - /** * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. * @@ -1191,18 +1173,6 @@ class parquet_writer_options_builder { return *this; } - /** - * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. - * - * @param enabled Boolean value to enable/disable int96 timestamps - * @return this for chaining - */ - parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } - /** * @brief Set to true if timestamps are to be written as UTC. * @@ -1293,9 +1263,6 @@ class chunked_parquet_writer_options { std::optional _metadata; // Optional footer key_value_metadata std::vector> _user_data; - // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. - // If true then overrides any per-column setting in _metadata. - bool _write_timestamps_as_int96 = false; // Parquet writer can write timestamps as UTC. Defaults to true. bool _write_timestamps_as_UTC = true; // Maximum size of each row group (unless smaller than a single page) @@ -1376,13 +1343,6 @@ class chunked_parquet_writer_options { return _user_data; } - /** - * @brief Returns `true` if timestamps will be written as INT96 - * - * @return `true` if timestamps will be written as INT96 - */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } - /** * @brief Returns `true` if timestamps will be written as UTC * @@ -1509,15 +1469,6 @@ class chunked_parquet_writer_options { */ void set_compression(compression_type compression) { _compression = compression; } - /** - * @brief Sets timestamp writing preferences. - * - * INT96 timestamps will be written if `true` and TIMESTAMP_MICROS will be written if `false`. - * - * @param req Boolean value to enable/disable writing of INT96 timestamps - */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } - /** * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. * @@ -1684,21 +1635,6 @@ class chunked_parquet_writer_options_builder { return *this; } - /** - * @brief Set to true if timestamps should be written as - * int96 types instead of int64 types. Even though int96 is deprecated and is - * not an internal type for cudf, it needs to be written for backwards - * compatibility reasons. - * - * @param enabled Boolean value to enable/disable int96 timestamps - * @return this for chaining - */ - chunked_parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } - /** * @brief Set to true if timestamps are to be written as UTC. * diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index e0c406c180c..99eda1df404 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -46,7 +46,6 @@ enum class TypeKind : int8_t { BOOLEAN = 0, INT32 = 1, INT64 = 2, - INT96 = 3, // Deprecated FLOAT = 4, DOUBLE = 5, BYTE_ARRAY = 6, diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 150e997f533..32f37f95e7f 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -605,10 +605,9 @@ class column_in_metadata { friend table_input_metadata; std::string _name = ""; std::optional _nullable; - bool _list_column_is_map = false; - bool _use_int96_timestamp = false; - bool _output_as_binary = false; - bool _skip_compression = false; + bool _list_column_is_map = false; + bool _output_as_binary = false; + bool _skip_compression = false; std::optional _decimal_precision; std::optional _parquet_field_id; std::optional _type_length; @@ -672,20 +671,6 @@ class column_in_metadata { return *this; } - /** - * @brief Specifies whether this timestamp column should be encoded using the deprecated int96 - * physical type. Only valid for the following column types: - * timestamp_s, timestamp_ms, timestamp_us, timestamp_ns - * - * @param req True = use int96 physical type. False = use int64 physical type - * @return this for chaining - */ - column_in_metadata& set_int96_timestamps(bool req) noexcept - { - _use_int96_timestamp = req; - return *this; - } - /** * @brief Set the decimal precision of this column. Only valid if this column is a decimal * (fixed-point) type @@ -818,14 +803,6 @@ class column_in_metadata { */ [[nodiscard]] bool is_map() const noexcept { return _list_column_is_map; } - /** - * @brief Get whether to encode this timestamp column using deprecated int96 physical type - * - * @return Boolean indicating whether to encode this timestamp column using deprecated int96 - * physical type - */ - [[nodiscard]] bool is_enabled_int96_timestamps() const noexcept { return _use_int96_timestamp; } - /** * @brief Get whether precision has been set for this decimal column * diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index a43c6d4cbb6..6270da87188 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -145,7 +145,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) switch (col->physical_type) { case Type::INT32: return 4; case Type::INT64: return 8; - case Type::INT96: return 12; case Type::FLOAT: return 4; case Type::DOUBLE: return 8; case Type::BYTE_ARRAY: { diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index bfd89200786..1a256445bd1 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -193,8 +193,6 @@ __device__ inline void gpuDecodeValues( } break; } - } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->dtype_len_in == 4) { // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 7207173b82f..3e95018d1dc 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -371,8 +371,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } } else if (dtype == FIXED_LEN_BYTE_ARRAY) { gpuOutputString(s, sb, val_src_pos, dst); - } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, sb, val_src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->dtype_len_in == 4) { // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index f182747650e..c94bc2009ec 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -122,81 +122,6 @@ inline __device__ void gpuStoreOutput(uint2* dst, *dst = v; } -/** - * @brief Convert an INT96 Spark timestamp to 64-bit timestamp - * - * @param[in,out] s Page state input/output - * @param[out] sb Page state buffer output - * @param[in] src_pos Source position - * @param[out] dst Pointer to row output data - */ -template -inline __device__ void gpuOutputInt96Timestamp(page_state_s* s, - state_buf* sb, - int src_pos, - int64_t* dst) -{ - using cuda::std::chrono::duration_cast; - - uint8_t const* src8; - uint32_t dict_pos, dict_size = s->dict_size, ofs; - - if (s->dict_base) { - // Dictionary - dict_pos = - (s->dict_bits > 0) ? sb->dict_idx[rolling_index(src_pos)] : 0; - src8 = s->dict_base; - } else { - // Plain - dict_pos = src_pos; - src8 = s->data_start; - } - dict_pos *= (uint32_t)s->dtype_len_in; - ofs = 3 & reinterpret_cast(src8); - src8 -= ofs; // align to 32-bit boundary - ofs <<= 3; // bytes -> bits - - if (dict_pos + 4 >= dict_size) { - *dst = 0; - return; - } - - uint3 v; - int64_t nanos, days; - v.x = *reinterpret_cast(src8 + dict_pos + 0); - v.y = *reinterpret_cast(src8 + dict_pos + 4); - v.z = *reinterpret_cast(src8 + dict_pos + 8); - if (ofs) { - uint32_t next = *reinterpret_cast(src8 + dict_pos + 12); - v.x = __funnelshift_r(v.x, v.y, ofs); - v.y = __funnelshift_r(v.y, v.z, ofs); - v.z = __funnelshift_r(v.z, next, ofs); - } - nanos = v.y; - nanos <<= 32; - nanos |= v.x; - // Convert from Julian day at noon to UTC seconds - days = static_cast(v.z); - cudf::duration_D d_d{ - days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow - - *dst = [&]() { - switch (s->col.ts_clock_rate) { - case 1: // seconds - return duration_cast(d_d).count() + - duration_cast(duration_ns{nanos}).count(); - case 1'000: // milliseconds - return duration_cast(d_d).count() + - duration_cast(duration_ns{nanos}).count(); - case 1'000'000: // microseconds - return duration_cast(d_d).count() + - duration_cast(duration_ns{nanos}).count(); - case 1'000'000'000: // nanoseconds - default: return duration_cast(d_d).count() + nanos; - } - }(); -} - /** * @brief Output a 64-bit timestamp * diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index b1f8e6dd5fe..79e84efe9cc 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1176,7 +1176,6 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } [[fallthrough]]; case DOUBLE: s->dtype_len = 8; break; - case INT96: s->dtype_len = 12; break; case BYTE_ARRAY: if (is_decimal) { auto const decimal_precision = s->col.logical_type->precision(); @@ -1227,8 +1226,6 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } } else if (data_type == BYTE_ARRAY && s->col.is_strings_to_cat) { s->dtype_len = 4; // HASH32 output - } else if (data_type == INT96) { - s->dtype_len = 8; // Convert to 64-bit timestamp } // during the decoding step we need to offset the global output buffers diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index e9558735929..13f9cf2f126 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -115,7 +115,6 @@ constexpr uint32_t physical_type_len(Type physical_type, type_id id, int type_le return id == type_id::DECIMAL128 ? sizeof(__int128_t) : type_length; } switch (physical_type) { - case INT96: return 12u; case INT64: case DOUBLE: return sizeof(int64_t); case BOOLEAN: return 1u; @@ -545,7 +544,6 @@ __device__ size_t delta_data_len(Type physical_type, auto const dtype_len_out = physical_type_len(physical_type, type_id, sizeof(int32_t)); auto const dtype_len = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } - if (physical_type == INT96) { return sizeof(int64_t); } return dtype_len_out; }(); @@ -1324,27 +1322,6 @@ constexpr auto julian_calendar_epoch_diff() return sys_days{January / 1 / 1970} - (sys_days{November / 24 / -4713} + 12h); } -/** - * @brief Converts number `v` of periods of type `PeriodT` into a pair with nanoseconds since - * midnight and number of Julian days. Does not deal with time zones. Used by INT96 code. - * - * @tparam PeriodT a ratio representing the tick period in duration - * @param v count of ticks since epoch - * @return A pair of (nanoseconds, days) where nanoseconds is the number of nanoseconds - * elapsed in the day and days is the number of days from Julian epoch. - */ -template -__device__ auto julian_days_with_time(int64_t v) -{ - using namespace cuda::std::chrono; - auto const dur_total = duration{v}; - auto const dur_days = floor(dur_total); - auto const dur_time_of_day = dur_total - dur_days; - auto const dur_time_of_day_nanos = duration_cast(dur_time_of_day); - auto const julian_days = dur_days + ceil(julian_calendar_epoch_diff()); - return std::make_pair(dur_time_of_day_nanos, julian_days); -} - // this has been split out into its own kernel because of the amount of shared memory required // for the state buffer. encode kernels that don't use the RLE buffer can get started while // the level data is encoded. @@ -1666,7 +1643,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } - if (physical_type == INT96) { return sizeof(int64_t); } return dtype_len_out; }(); @@ -1770,40 +1746,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } encode_value(dst + pos, v, stride); } break; - case INT96: { - // only PLAIN encoding is supported - int64_t v = s->col.leaf_column->element(val_idx); - int32_t ts_scale = s->col.ts_scale; - if (ts_scale != 0) { - if (ts_scale < 0) { - v /= -ts_scale; - } else { - v *= ts_scale; - } - } - - auto const [last_day_nanos, julian_days] = [&] { - using namespace cuda::std::chrono; - switch (s->col.leaf_column->type().id()) { - case type_id::TIMESTAMP_SECONDS: - case type_id::TIMESTAMP_MILLISECONDS: { - return julian_days_with_time(v); - } break; - case type_id::TIMESTAMP_MICROSECONDS: - case type_id::TIMESTAMP_NANOSECONDS: { - return julian_days_with_time(v); - } break; - } - return julian_days_with_time(0); - }(); - - // the 12 bytes of fixed length data. - v = last_day_nanos.count(); - encode_value(dst + pos, v, 1); - uint32_t w = julian_days.count(); - encode_value(dst + pos + 8, w, 1); - } break; - case BYTE_ARRAY: { // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, @@ -1901,7 +1843,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } - if (physical_type == INT96) { return sizeof(int64_t); } return dtype_len_out; }(); @@ -2033,7 +1974,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } - if (physical_type == INT96) { return sizeof(int64_t); } return dtype_len_out; }(); diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index 8507eca047e..d2535cbf90f 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -34,7 +34,6 @@ enum Type : int8_t { BOOLEAN = 0, INT32 = 1, INT64 = 2, - INT96 = 3, // Deprecated FLOAT = 4, DOUBLE = 5, BYTE_ARRAY = 6, diff --git a/cpp/src/io/parquet/predicate_pushdown.cpp b/cpp/src/io/parquet/predicate_pushdown.cpp index 0109be661a7..c23530e9b9a 100644 --- a/cpp/src/io/parquet/predicate_pushdown.cpp +++ b/cpp/src/io/parquet/predicate_pushdown.cpp @@ -86,10 +86,6 @@ struct stats_caster { switch (type) { case INT32: return targetType(*reinterpret_cast(stats_val)); case INT64: return targetType(*reinterpret_cast(stats_val)); - case INT96: // Deprecated in parquet specification - return targetType(static_cast<__int128_t>(reinterpret_cast(stats_val)[0]) - << 32 | - reinterpret_cast(stats_val)[2]); case BYTE_ARRAY: [[fallthrough]]; case FIXED_LEN_BYTE_ARRAY: if (stats_size == sizeof(T)) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index eb653c6b9ac..79374746563 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -190,9 +190,6 @@ type_id to_type_id(SchemaElement const& schema, if (strings_to_categorical) { return type_id::INT32; } [[fallthrough]]; case FIXED_LEN_BYTE_ARRAY: return type_id::STRING; - case INT96: - return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id - : type_id::TIMESTAMP_NANOSECONDS; default: break; } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 1dfced94f5b..a2ecd9c3b27 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -338,7 +338,6 @@ struct leaf_schema_fn { schema_tree_node& col_schema; cudf::detail::LinkedColPtr const& col; column_in_metadata const& col_meta; - bool timestamp_is_int96; bool timestamp_is_utc; template @@ -461,50 +460,42 @@ struct leaf_schema_fn { template std::enable_if_t, void> operator()() { - col_schema.type = (timestamp_is_int96) ? Type::INT96 : Type::INT64; + col_schema.type = Type::INT64; col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; col_schema.ts_scale = 1000; - if (not timestamp_is_int96) { - col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; - col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; - } + + col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; } template std::enable_if_t, void> operator()() { - col_schema.type = (timestamp_is_int96) ? Type::INT96 : Type::INT64; + col_schema.type = Type::INT64; col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; - if (not timestamp_is_int96) { - col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; - col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; - } + + col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; } template std::enable_if_t, void> operator()() { - col_schema.type = (timestamp_is_int96) ? Type::INT96 : Type::INT64; + col_schema.type = Type::INT64; col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; - if (not timestamp_is_int96) { - col_schema.converted_type = ConvertedType::TIMESTAMP_MICROS; - col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MICROS}}; - } + + col_schema.converted_type = ConvertedType::TIMESTAMP_MICROS; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MICROS}}; } template std::enable_if_t, void> operator()() { - col_schema.type = (timestamp_is_int96) ? Type::INT96 : Type::INT64; + col_schema.type = Type::INT64; col_schema.converted_type = thrust::nullopt; col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; - if (timestamp_is_int96) { - col_schema.ts_scale = -1000; // negative value indicates division by absolute value - } - // set logical type if it's not int96 - else { - col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::NANOS}}; - } + + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::NANOS}}; } // unsupported outside cudf for parquet 1.0. @@ -629,7 +620,6 @@ std::vector construct_schema_tree( cudf::detail::LinkedColVector const& linked_columns, table_input_metadata& metadata, single_write_mode write_mode, - bool int96_timestamps, bool utc_timestamps) { std::vector schema; @@ -716,12 +706,6 @@ std::vector construct_schema_tree( "requested encoding will be ignored"); return; } - if (s.type == Type::INT96) { - CUDF_LOG_WARN( - "BYTE_STREAM_SPLIT encoding is not supported for INT96 columns; the " - "requested encoding will be ignored"); - return; - } break; // supported parquet encodings @@ -896,11 +880,8 @@ std::vector construct_schema_tree( schema_tree_node col_schema{}; - bool timestamp_is_int96 = int96_timestamps or col_meta.is_enabled_int96_timestamps(); - - cudf::type_dispatcher( - col->type(), - leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96, utc_timestamps}); + cudf::type_dispatcher(col->type(), + leaf_schema_fn{col_schema, col, col_meta, utc_timestamps}); col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name(); @@ -1153,19 +1134,17 @@ void calculate_page_fragments(device_span frag, * * @param frag_stats output statistics * @param frags Input page fragments - * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 * @param stream CUDA stream used for device memory operations and kernel launches */ void gather_fragment_statistics(device_span frag_stats, device_span frags, - bool int96_timestamps, rmm::cuda_stream_view stream) { rmm::device_uvector frag_stats_group(frag_stats.size(), stream); InitFragmentStatistics(frag_stats_group, frags, stream); detail::calculate_group_statistics( - frag_stats.data(), frag_stats_group.data(), frag_stats.size(), stream, int96_timestamps); + frag_stats.data(), frag_stats_group.data(), frag_stats.size(), stream); stream.synchronize(); } @@ -1675,7 +1654,6 @@ void fill_table_meta(std::unique_ptr const& table_meta) * @param dict_policy Policy for dictionary use * @param max_dictionary_size Maximum dictionary size, in bytes * @param single_write_mode Flag to indicate that we are guaranteeing a single table write - * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 * @param utc_timestamps Flag to indicate if timestamps are UTC * @param write_v2_headers True if V2 page headers are to be written * @param out_sink Sink for checking if device write is supported, should not be used to write any @@ -1700,15 +1678,13 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, dictionary_policy dict_policy, size_t max_dictionary_size, single_write_mode write_mode, - bool int96_timestamps, bool utc_timestamps, bool write_v2_headers, host_span const> out_sink, rmm::cuda_stream_view stream) { - auto vec = table_to_linked_columns(input); - auto schema_tree = - construct_schema_tree(vec, table_meta, write_mode, int96_timestamps, utc_timestamps); + auto vec = table_to_linked_columns(input); + auto schema_tree = construct_schema_tree(vec, table_meta, write_mode, utc_timestamps); // Construct parquet_column_views from the schema tree leaf nodes. std::vector parquet_columns; @@ -2002,10 +1978,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // and gather fragment statistics if (not frag_stats.is_empty()) { - gather_fragment_statistics(frag_stats, - {page_fragments.device_ptr(), static_cast(total_frags)}, - int96_timestamps, - stream); + gather_fragment_statistics( + frag_stats, {page_fragments.device_ptr(), static_cast(total_frags)}, stream); } } @@ -2309,7 +2283,6 @@ writer::impl::impl(std::vector> sinks, _dict_policy(options.get_dictionary_policy()), _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), - _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), _sorting_columns(options.get_sorting_columns()), @@ -2339,7 +2312,6 @@ writer::impl::impl(std::vector> sinks, _dict_policy(options.get_dictionary_policy()), _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), - _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), _sorting_columns(options.get_sorting_columns()), @@ -2417,7 +2389,6 @@ void writer::impl::write(table_view const& input, std::vector co _dict_policy, _max_dictionary_size, _single_write_mode, - _int96_timestamps, _utc_timestamps, _write_v2_headers, _out_sink, diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 784f78f06d5..12273b9fa4d 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -153,7 +153,6 @@ class writer::impl { dictionary_policy const _dict_policy; size_t const _max_dictionary_size; std::optional const _max_page_fragment_size; - bool const _int96_timestamps; bool const _utc_timestamps; bool const _write_v2_headers; std::optional> _sorting_columns; diff --git a/cpp/src/io/statistics/column_statistics.cuh b/cpp/src/io/statistics/column_statistics.cuh index b2cabe24a50..57167265746 100644 --- a/cpp/src/io/statistics/column_statistics.cuh +++ b/cpp/src/io/statistics/column_statistics.cuh @@ -56,9 +56,7 @@ using block_reduce_storage = detail::block_reduce_storage; * @tparam block_size Dimension of the block * @tparam IO File format for which statistics calculation is being done */ -template +template struct calculate_group_statistics_functor { block_reduce_storage& temp_storage; @@ -99,16 +97,9 @@ struct calculate_group_statistics_functor { !std::is_same_v)>* = nullptr> __device__ void operator()(stats_state_s& s, uint32_t t) { - // Temporarily disable stats writing for int96 timestamps - // TODO: https://github.com/rapidsai/cudf/issues/10438 - if constexpr (cudf::is_timestamp() and IO == detail::io_file_format::PARQUET and - INT96 == detail::is_int96_timestamp::YES) { - return; - } - detail::storage_wrapper storage(temp_storage); - using type_convert = detail::type_conversion>; + using type_convert = detail::type_conversion>; using CT = typename type_convert::template type; typed_statistics_chunk::include_aggregate> chunk; @@ -288,9 +279,7 @@ __device__ void cooperative_load(T& destination, T const* source = nullptr) */ template CUDF_KERNEL void __launch_bounds__(block_size, 1) - gpu_calculate_group_statistics(statistics_chunk* chunks, - statistics_group const* groups, - bool const int96_timestamps) + gpu_calculate_group_statistics(statistics_chunk* chunks, statistics_group const* groups) { __shared__ __align__(8) stats_state_s state; __shared__ block_reduce_storage storage; @@ -305,23 +294,10 @@ CUDF_KERNEL void __launch_bounds__(block_size, 1) // Calculate statistics if constexpr (IO == detail::io_file_format::PARQUET) { // Do not convert ns to us for int64 timestamps - if (not int96_timestamps) { - type_dispatcher( - state.col.leaf_column->type(), - calculate_group_statistics_functor(storage), - state, - threadIdx.x); - } - // Temporarily disable stats writing for int96 timestamps - // TODO: https://github.com/rapidsai/cudf/issues/10438 - else { - type_dispatcher( - state.col.leaf_column->type(), - calculate_group_statistics_functor( - storage), - state, - threadIdx.x); - } + type_dispatcher(state.col.leaf_column->type(), + calculate_group_statistics_functor(storage), + state, + threadIdx.x); } else { type_dispatcher(state.col.leaf_column->type(), calculate_group_statistics_functor(storage), @@ -348,12 +324,11 @@ template void calculate_group_statistics(statistics_chunk* chunks, statistics_group const* groups, uint32_t num_chunks, - rmm::cuda_stream_view stream, - bool const int96_timestamps = false) + rmm::cuda_stream_view stream) { constexpr int block_size = 256; gpu_calculate_group_statistics - <<>>(chunks, groups, int96_timestamps); + <<>>(chunks, groups); } /** diff --git a/cpp/src/io/statistics/orc_column_statistics.cu b/cpp/src/io/statistics/orc_column_statistics.cu index f3356b3a331..9595cc36214 100644 --- a/cpp/src/io/statistics/orc_column_statistics.cu +++ b/cpp/src/io/statistics/orc_column_statistics.cu @@ -35,8 +35,7 @@ template <> void calculate_group_statistics(statistics_chunk* chunks, statistics_group const* groups, uint32_t num_chunks, - rmm::cuda_stream_view stream, - bool int96_timestamp); + rmm::cuda_stream_view stream); } // namespace detail } // namespace io diff --git a/cpp/src/io/statistics/parquet_column_statistics.cu b/cpp/src/io/statistics/parquet_column_statistics.cu index 091f08d8fee..97ae2115f4b 100644 --- a/cpp/src/io/statistics/parquet_column_statistics.cu +++ b/cpp/src/io/statistics/parquet_column_statistics.cu @@ -35,8 +35,7 @@ template <> void calculate_group_statistics(statistics_chunk* chunks, statistics_group const* groups, uint32_t num_chunks, - rmm::cuda_stream_view stream, - bool int96_timestamp); + rmm::cuda_stream_view stream); } // namespace detail } // namespace io diff --git a/cpp/src/io/statistics/statistics_type_identification.cuh b/cpp/src/io/statistics/statistics_type_identification.cuh index 5e11646be6b..ca10e0459ed 100644 --- a/cpp/src/io/statistics/statistics_type_identification.cuh +++ b/cpp/src/io/statistics/statistics_type_identification.cuh @@ -39,14 +39,13 @@ namespace detail { using cudf::io::statistics::byte_array_view; enum class io_file_format { ORC, PARQUET }; -enum class is_int96_timestamp { YES, NO }; -template +template struct conversion_map; // Every timestamp or duration type is converted to nanoseconds in ORC statistics -template -struct conversion_map { +template <> +struct conversion_map { using types = std::tuple, std::pair, std::pair, @@ -55,19 +54,9 @@ struct conversion_map { std::pair>; }; -// In Parquet timestamps and durations with second resolution are converted to -// milliseconds. Timestamps and durations with nanosecond resolution are -// converted to microseconds. -template <> -struct conversion_map { - using types = std::tuple, - std::pair, - std::pair, - std::pair>; -}; // int64 nanosecond timestamp won't be converted template <> -struct conversion_map { +struct conversion_map { using types = std::tuple, std::pair>; }; diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index ad0860e265e..9148a5a00e1 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1391,30 +1391,6 @@ TEST_F(ParquetWriterTest, NoNullsAsNonNullable) EXPECT_NO_THROW(cudf::io::write_parquet(out_opts)); } -TEST_F(ParquetWriterTest, TimestampMicrosINT96NoOverflow) -{ - using namespace cuda::std::chrono; - using namespace cudf::io; - - column_wrapper big_ts_col{ - sys_days{year{3023} / month{7} / day{14}} + 7h + 38min + 45s + 418688us, - sys_days{year{723} / month{3} / day{21}} + 14h + 20min + 13s + microseconds{781ms}}; - - table_view expected({big_ts_col}); - auto filepath = temp_env->get_temp_filepath("BigINT96Timestamp.parquet"); - - auto const out_opts = - parquet_writer_options::builder(sink_info{filepath}, expected).int96_timestamps(true).build(); - write_parquet(out_opts); - - auto const in_opts = parquet_reader_options::builder(source_info(filepath)) - .timestamp_type(cudf::data_type(cudf::type_id::TIMESTAMP_MICROSECONDS)) - .build(); - auto const result = read_parquet(in_opts); - - CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); -} - TEST_F(ParquetWriterTest, PreserveNullability) { constexpr auto num_rows = 100; diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index f0eef9be124..5184e0f9063 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -398,7 +398,6 @@ def write_parquet( object compression="snappy", object statistics="ROWGROUP", object metadata_file_path=None, - object int96_timestamps=False, object row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, object row_group_size_rows=None, object max_page_size_bytes=None, @@ -502,7 +501,6 @@ def write_parquet( cdef unique_ptr[vector[uint8_t]] out_metadata_c cdef vector[string] c_column_chunks_file_paths - cdef bool _int96_timestamps = int96_timestamps cdef vector[cudf_io_types.partition_info] partitions # Perform write @@ -512,7 +510,6 @@ def write_parquet( .key_value_metadata(move(user_data)) .compression(comp_type) .stats_level(stat_freq) - .int96_timestamps(_int96_timestamps) .write_v2_headers(header_version == "2.0") .dictionary_policy(dict_policy) .utc_timestamps(False) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 33a594b432f..5fd72eee311 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -99,9 +99,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_column_chunks_file_paths( vector[string] column_chunks_file_paths ) except + - void set_int96_timestamps( - bool enabled - ) except + void set_utc_timestamps( bool enabled ) except + @@ -144,9 +141,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& column_chunks_file_paths( vector[string] column_chunks_file_paths ) except + - parquet_writer_options_builder& int96_timestamps( - bool enabled - ) except + parquet_writer_options_builder& utc_timestamps( bool enabled ) except + @@ -203,9 +197,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + - void set_int96_timestamps( - bool enabled - ) except + void set_utc_timestamps( bool enabled ) except + @@ -239,9 +230,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& compression( cudf_io_types.compression_type compression ) except + - chunked_parquet_writer_options_builder& int96_timestamps( - bool enabled - ) except + chunked_parquet_writer_options_builder& utc_timestamps( bool enabled ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index 38fae1df1e5..2e3e3ddf2f8 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -90,7 +90,6 @@ cdef extern from "cudf/io/types.hpp" \ column_in_metadata& set_name(const string& name) column_in_metadata& set_nullability(bool nullable) column_in_metadata& set_list_column_as_map() - column_in_metadata& set_int96_timestamps(bool req) column_in_metadata& set_decimal_precision(uint8_t precision) column_in_metadata& child(size_type i) column_in_metadata& set_output_as_binary(bool binary) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index acfc2d781a7..c56eb4618df 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6697,7 +6697,6 @@ def to_parquet( partition_offsets=None, statistics="ROWGROUP", metadata_file_path=None, - int96_timestamps=False, row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, @@ -6727,7 +6726,6 @@ def to_parquet( partition_offsets=partition_offsets, statistics=statistics, metadata_file_path=metadata_file_path, - int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index dbdb2093b72..44666489a74 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -58,7 +58,6 @@ def _write_parquet( index=None, statistics="ROWGROUP", metadata_file_path=None, - int96_timestamps=False, row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, @@ -96,7 +95,6 @@ def _write_parquet( "compression": compression, "statistics": statistics, "metadata_file_path": metadata_file_path, - "int96_timestamps": int96_timestamps, "row_group_size_bytes": row_group_size_bytes, "row_group_size_rows": row_group_size_rows, "max_page_size_bytes": max_page_size_bytes, @@ -141,7 +139,6 @@ def write_to_dataset( preserve_index=False, return_metadata=False, statistics="ROWGROUP", - int96_timestamps=False, row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, @@ -189,12 +186,6 @@ def write_to_dataset( return_metadata : bool, default False Return parquet metadata for written data. Returned metadata will include the file-path metadata (relative to `root_path`). - int96_timestamps : bool, default False - If ``True``, write timestamps in int96 format. This will convert - timestamps from timestamp[ns], timestamp[ms], timestamp[s], and - timestamp[us] to the int96 format, which is the number of Julian - days and the number of nanoseconds since midnight of 1970-01-01. - If ``False``, timestamps will not be altered. row_group_size_bytes: integer or None, default None Maximum size of each stripe of the output. If None, 134217728 (128MB) will be used. @@ -273,7 +264,6 @@ def write_to_dataset( storage_options=storage_options, metadata_file_path=metadata_file_path, statistics=statistics, - int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, @@ -300,7 +290,6 @@ def write_to_dataset( storage_options=storage_options, metadata_file_path=metadata_file_path, statistics=statistics, - int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, @@ -945,7 +934,6 @@ def to_parquet( partition_offsets=None, statistics="ROWGROUP", metadata_file_path=None, - int96_timestamps=False, row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, @@ -1001,7 +989,6 @@ def to_parquet( preserve_index=index, compression=compression, statistics=statistics, - int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, @@ -1032,7 +1019,6 @@ def to_parquet( index=index, statistics=statistics, metadata_file_path=metadata_file_path, - int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e32fdacd8d6..8232b42e3c6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1604,33 +1604,6 @@ def clone_field(table, name, datatype): assert_eq(expect, got) -@pytest.mark.filterwarnings("ignore:Using CPU") -def test_parquet_writer_int96_timestamps(tmpdir, pdf, gdf): - gdf_fname = tmpdir.join("gdf.parquet") - - if len(pdf) == 0: - pdf = pdf.reset_index(drop=True) - gdf = gdf.reset_index(drop=True) - - if "col_category" in pdf.columns: - pdf = pdf.drop(columns=["col_category"]) - if "col_category" in gdf.columns: - gdf = gdf.drop(columns=["col_category"]) - - assert_eq(pdf, gdf) - - # Write out the gdf using the GPU accelerated writer with INT96 timestamps - gdf.to_parquet(gdf_fname.strpath, index=None, int96_timestamps=True) - - assert os.path.exists(gdf_fname) - - expect = pdf - got = pd.read_parquet(gdf_fname) - - # verify INT96 timestamps were converted back to the same data. - assert_eq(expect, got, check_categorical=False, check_dtype=False) - - def test_multifile_parquet_folder(tmpdir): test_pdf1 = make_pdf(nrows=10, nvalids=10 // 2, dtype="float64") test_pdf2 = make_pdf(nrows=20, dtype="float64") diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0209c692935..50e0efc0ccf 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -259,12 +259,6 @@ metadata of the written parquet file. The returned blob will have the ``chunk.file_path`` field set to the ``metadata_file_path`` for each chunk. When using with ``partition_offsets``, should be same size as ``len(path)`` -int96_timestamps : bool, default False - If ``True``, write timestamps in int96 format. This will convert - timestamps from timestamp[ns], timestamp[ms], timestamp[s], and - timestamp[us] to the int96 format, which is the number of Julian - days and the number of nanoseconds since midnight of 1970-01-01. - If ``False``, timestamps will not be altered. row_group_size_bytes: integer, default {row_group_size_bytes_val} Maximum size of each stripe of the output. If None, {row_group_size_bytes_val} diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index fc962670c47..cec620d9c77 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -354,7 +354,6 @@ def write_partition( preserve_index=preserve_index, return_metadata=return_metadata, statistics=kwargs.get("statistics", "ROWGROUP"), - int96_timestamps=kwargs.get("int96_timestamps", False), row_group_size_bytes=kwargs.get( "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT ), @@ -377,7 +376,6 @@ def write_partition( ), partition_offsets=kwargs.get("partition_offsets", None), statistics=kwargs.get("statistics", "ROWGROUP"), - int96_timestamps=kwargs.get("int96_timestamps", False), row_group_size_bytes=kwargs.get( "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT ), From 38bd46fb24a3d105f64062bbfdef4f5604d6863c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Fri, 31 May 2024 19:20:47 +0000 Subject: [PATCH 02/12] update copyrights --- cpp/src/io/statistics/orc_column_statistics.cu | 2 +- cpp/src/io/statistics/parquet_column_statistics.cu | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/statistics/orc_column_statistics.cu b/cpp/src/io/statistics/orc_column_statistics.cu index 9595cc36214..62a9f2003b1 100644 --- a/cpp/src/io/statistics/orc_column_statistics.cu +++ b/cpp/src/io/statistics/orc_column_statistics.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/statistics/parquet_column_statistics.cu b/cpp/src/io/statistics/parquet_column_statistics.cu index 97ae2115f4b..a87cd882f23 100644 --- a/cpp/src/io/statistics/parquet_column_statistics.cu +++ b/cpp/src/io/statistics/parquet_column_statistics.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 68f3db51687e0441efc822753e46ba11102fbc3c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Fri, 31 May 2024 22:58:32 +0000 Subject: [PATCH 03/12] Revert reading int96 timestamps --- cpp/include/cudf/io/parquet_metadata.hpp | 1 + cpp/src/io/parquet/decode_fixed.cu | 33 +++++----- cpp/src/io/parquet/page_data.cuh | 75 ++++++++++++++++++++++ cpp/src/io/parquet/page_decode.cuh | 3 + cpp/src/io/parquet/parquet_common.hpp | 1 + cpp/src/io/parquet/reader_impl_helpers.cpp | 3 + 6 files changed, 101 insertions(+), 15 deletions(-) diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index 99eda1df404..e0c406c180c 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -46,6 +46,7 @@ enum class TypeKind : int8_t { BOOLEAN = 0, INT32 = 1, INT64 = 2, + INT96 = 3, // Deprecated FLOAT = 4, DOUBLE = 5, BYTE_ARRAY = 6, diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 1a256445bd1..6dc08f64ebc 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -193,26 +193,29 @@ __device__ inline void gpuDecodeValues( } break; } - } else if (dtype_len == 8) { - if (s->dtype_len_in == 4) { - // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS - // TIME_MILLIS is the only duration type stored as int32: - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype - gpuOutputFast(s, sb, src_pos, static_cast(dst)); - } else if (s->ts_scale) { - gpuOutputInt64Timestamp(s, sb, src_pos, static_cast(dst)); - } else { - gpuOutputFast(s, sb, src_pos, static_cast(dst)); - } - } else if (dtype_len == 4) { + } + } else if (dtype == INT96) { + gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype gpuOutputFast(s, sb, src_pos, static_cast(dst)); + } else if (s->ts_scale) { + gpuOutputInt64Timestamp(s, sb, src_pos, static_cast(dst)); } else { - gpuOutputGeneric(s, sb, src_pos, static_cast(dst), dtype_len); + gpuOutputFast(s, sb, src_pos, static_cast(dst)); } + } else if (dtype_len == 4) { + gpuOutputFast(s, sb, src_pos, static_cast(dst)); + } else { + gpuOutputGeneric(s, sb, src_pos, static_cast(dst), dtype_len); } - - pos += batch_size; } + + pos += batch_size; +} } template diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index c94bc2009ec..f182747650e 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -122,6 +122,81 @@ inline __device__ void gpuStoreOutput(uint2* dst, *dst = v; } +/** + * @brief Convert an INT96 Spark timestamp to 64-bit timestamp + * + * @param[in,out] s Page state input/output + * @param[out] sb Page state buffer output + * @param[in] src_pos Source position + * @param[out] dst Pointer to row output data + */ +template +inline __device__ void gpuOutputInt96Timestamp(page_state_s* s, + state_buf* sb, + int src_pos, + int64_t* dst) +{ + using cuda::std::chrono::duration_cast; + + uint8_t const* src8; + uint32_t dict_pos, dict_size = s->dict_size, ofs; + + if (s->dict_base) { + // Dictionary + dict_pos = + (s->dict_bits > 0) ? sb->dict_idx[rolling_index(src_pos)] : 0; + src8 = s->dict_base; + } else { + // Plain + dict_pos = src_pos; + src8 = s->data_start; + } + dict_pos *= (uint32_t)s->dtype_len_in; + ofs = 3 & reinterpret_cast(src8); + src8 -= ofs; // align to 32-bit boundary + ofs <<= 3; // bytes -> bits + + if (dict_pos + 4 >= dict_size) { + *dst = 0; + return; + } + + uint3 v; + int64_t nanos, days; + v.x = *reinterpret_cast(src8 + dict_pos + 0); + v.y = *reinterpret_cast(src8 + dict_pos + 4); + v.z = *reinterpret_cast(src8 + dict_pos + 8); + if (ofs) { + uint32_t next = *reinterpret_cast(src8 + dict_pos + 12); + v.x = __funnelshift_r(v.x, v.y, ofs); + v.y = __funnelshift_r(v.y, v.z, ofs); + v.z = __funnelshift_r(v.z, next, ofs); + } + nanos = v.y; + nanos <<= 32; + nanos |= v.x; + // Convert from Julian day at noon to UTC seconds + days = static_cast(v.z); + cudf::duration_D d_d{ + days - 2440588}; // TBD: Should be noon instead of midnight, but this matches pyarrow + + *dst = [&]() { + switch (s->col.ts_clock_rate) { + case 1: // seconds + return duration_cast(d_d).count() + + duration_cast(duration_ns{nanos}).count(); + case 1'000: // milliseconds + return duration_cast(d_d).count() + + duration_cast(duration_ns{nanos}).count(); + case 1'000'000: // microseconds + return duration_cast(d_d).count() + + duration_cast(duration_ns{nanos}).count(); + case 1'000'000'000: // nanoseconds + default: return duration_cast(d_d).count() + nanos; + } + }(); +} + /** * @brief Output a 64-bit timestamp * diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 79e84efe9cc..b1f8e6dd5fe 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1176,6 +1176,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } [[fallthrough]]; case DOUBLE: s->dtype_len = 8; break; + case INT96: s->dtype_len = 12; break; case BYTE_ARRAY: if (is_decimal) { auto const decimal_precision = s->col.logical_type->precision(); @@ -1226,6 +1227,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } } else if (data_type == BYTE_ARRAY && s->col.is_strings_to_cat) { s->dtype_len = 4; // HASH32 output + } else if (data_type == INT96) { + s->dtype_len = 8; // Convert to 64-bit timestamp } // during the decoding step we need to offset the global output buffers diff --git a/cpp/src/io/parquet/parquet_common.hpp b/cpp/src/io/parquet/parquet_common.hpp index d2535cbf90f..8507eca047e 100644 --- a/cpp/src/io/parquet/parquet_common.hpp +++ b/cpp/src/io/parquet/parquet_common.hpp @@ -34,6 +34,7 @@ enum Type : int8_t { BOOLEAN = 0, INT32 = 1, INT64 = 2, + INT96 = 3, // Deprecated FLOAT = 4, DOUBLE = 5, BYTE_ARRAY = 6, diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 79374746563..eb653c6b9ac 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -190,6 +190,9 @@ type_id to_type_id(SchemaElement const& schema, if (strings_to_categorical) { return type_id::INT32; } [[fallthrough]]; case FIXED_LEN_BYTE_ARRAY: return type_id::STRING; + case INT96: + return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id + : type_id::TIMESTAMP_NANOSECONDS; default: break; } From 85503ec69d5792a40c22a2a28f21321467c9dd62 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Fri, 31 May 2024 23:08:12 +0000 Subject: [PATCH 04/12] revert int96 into the PQ reader's path --- cpp/src/io/parquet/decode_fixed.cu | 35 +++++++++++++++--------------- cpp/src/io/parquet/page_data.cu | 2 ++ 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 6dc08f64ebc..bfd89200786 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -193,29 +193,28 @@ __device__ inline void gpuDecodeValues( } break; } - } - } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); - } else if (dtype_len == 8) { - if (s->dtype_len_in == 4) { - // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS - // TIME_MILLIS is the only duration type stored as int32: - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + } else if (dtype == INT96) { + gpuOutputInt96Timestamp(s, sb, src_pos, static_cast(dst)); + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputFast(s, sb, src_pos, static_cast(dst)); + } else if (s->ts_scale) { + gpuOutputInt64Timestamp(s, sb, src_pos, static_cast(dst)); + } else { + gpuOutputFast(s, sb, src_pos, static_cast(dst)); + } + } else if (dtype_len == 4) { gpuOutputFast(s, sb, src_pos, static_cast(dst)); - } else if (s->ts_scale) { - gpuOutputInt64Timestamp(s, sb, src_pos, static_cast(dst)); } else { - gpuOutputFast(s, sb, src_pos, static_cast(dst)); + gpuOutputGeneric(s, sb, src_pos, static_cast(dst), dtype_len); } - } else if (dtype_len == 4) { - gpuOutputFast(s, sb, src_pos, static_cast(dst)); - } else { - gpuOutputGeneric(s, sb, src_pos, static_cast(dst), dtype_len); } - } - pos += batch_size; -} + pos += batch_size; + } } template diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 3e95018d1dc..7207173b82f 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -371,6 +371,8 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) } } else if (dtype == FIXED_LEN_BYTE_ARRAY) { gpuOutputString(s, sb, val_src_pos, dst); + } else if (dtype == INT96) { + gpuOutputInt96Timestamp(s, sb, val_src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->dtype_len_in == 4) { // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS From 8148b2fcc0a4d58109d97d55926d9e88a85038b8 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 31 May 2024 18:43:21 -0700 Subject: [PATCH 05/12] Update column stats check to take care of the index column --- python/cudf/cudf/tests/test_parquet.py | 49 ++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 8232b42e3c6..85d1a53e48b 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1604,6 +1604,55 @@ def clone_field(table, name, datatype): assert_eq(expect, got) +@pytest.mark.filterwarnings("ignore:Using CPU") +def test_parquet_writer_column_stats(tmpdir, pdf, gdf, store_schema): + gdf_fname = tmpdir.join("gdf.parquet") + + if len(pdf) == 0: + pdf = pdf.reset_index(drop=True) + gdf = gdf.reset_index(drop=True) + + if "col_category" in pdf.columns: + pdf = pdf.drop(columns=["col_category"]) + if "col_category" in gdf.columns: + gdf = gdf.drop(columns=["col_category"]) + + assert_eq(pdf, gdf) + + # Write out the gdf using the GPU accelerated writer + gdf.to_parquet(gdf_fname.strpath, index=None) + + assert os.path.exists(gdf_fname) + + expect = pdf.reset_index(drop=True) + got = pd.read_parquet(gdf_fname).reset_index(drop=True) + + # verify timestamps were converted back to the same data. + assert_eq(expect, got, check_categorical=False, check_dtype=False) + + # Read back from pyarrow + pq_file = pq.ParquetFile(gdf_fname) + + # verify each row group's statistics + for rg in range(0, pq_file.num_row_groups): + pd_slice = pq_file.read_row_group(rg).to_pandas() + + # statistics are per-column. So need to verify independently + for i, col in enumerate(pd_slice): + stats = pq_file.metadata.row_group(rg).column(i+1).statistics + + if (col == "col_datetime64[ms]"): + print(i+1, col, stats) + + actual_min = cudf.Series(pd_slice[col].explode().explode()).min() + stats_min = stats.min + assert normalized_equals(actual_min, stats_min) + + actual_max = cudf.Series(pd_slice[col].explode().explode()).max() + stats_max = stats.max + assert normalized_equals(actual_max, stats_max) + + def test_multifile_parquet_folder(tmpdir): test_pdf1 = make_pdf(nrows=10, nvalids=10 // 2, dtype="float64") test_pdf2 = make_pdf(nrows=20, dtype="float64") From a24fdf37ff3abb46a1c84ed15d2ca7899ffae40d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Sat, 1 Jun 2024 01:46:43 +0000 Subject: [PATCH 06/12] ruff format the updated test --- python/cudf/cudf/tests/test_parquet.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 85d1a53e48b..4428314ca10 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1639,10 +1639,10 @@ def test_parquet_writer_column_stats(tmpdir, pdf, gdf, store_schema): # statistics are per-column. So need to verify independently for i, col in enumerate(pd_slice): - stats = pq_file.metadata.row_group(rg).column(i+1).statistics + stats = pq_file.metadata.row_group(rg).column(i + 1).statistics - if (col == "col_datetime64[ms]"): - print(i+1, col, stats) + if col == "col_datetime64[ms]": + print(i + 1, col, stats) actual_min = cudf.Series(pd_slice[col].explode().explode()).min() stats_min = stats.min From a99d088a6ae49ae54c2e851930eed72af5e5e465 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Sat, 1 Jun 2024 01:53:57 +0000 Subject: [PATCH 07/12] remove erroneous print --- python/cudf/cudf/tests/test_parquet.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 4428314ca10..728263859b0 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1641,9 +1641,6 @@ def test_parquet_writer_column_stats(tmpdir, pdf, gdf, store_schema): for i, col in enumerate(pd_slice): stats = pq_file.metadata.row_group(rg).column(i + 1).statistics - if col == "col_datetime64[ms]": - print(i + 1, col, stats) - actual_min = cudf.Series(pd_slice[col].explode().explode()).min() stats_min = stats.min assert normalized_equals(actual_min, stats_min) From 75ab3b784b4e22d4e7f756fed246a1fc4cd739c8 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 31 May 2024 19:31:26 -0700 Subject: [PATCH 08/12] Remove erroneous store_schema argument --- python/cudf/cudf/tests/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 728263859b0..73bc7f4beb8 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1605,7 +1605,7 @@ def clone_field(table, name, datatype): @pytest.mark.filterwarnings("ignore:Using CPU") -def test_parquet_writer_column_stats(tmpdir, pdf, gdf, store_schema): +def test_parquet_writer_column_stats(tmpdir, pdf, gdf): gdf_fname = tmpdir.join("gdf.parquet") if len(pdf) == 0: From ccf56e0f2605b65258eea1f5ea0ed062ff40e4c0 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Sat, 1 Jun 2024 21:08:55 +0000 Subject: [PATCH 09/12] remove int96 timestamps from cudf java tests --- java/src/main/native/src/TableJni.cpp | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index e411b1d5362..05fbf286ac5 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -583,7 +583,6 @@ namespace { int set_column_metadata(cudf::io::column_in_metadata& column_metadata, std::vector& col_names, cudf::jni::native_jbooleanArray& nullability, - cudf::jni::native_jbooleanArray& is_int96, cudf::jni::native_jintArray& precisions, cudf::jni::native_jbooleanArray& is_map, cudf::jni::native_jbooleanArray& hasParquetFieldIds, @@ -598,7 +597,6 @@ int set_column_metadata(cudf::io::column_in_metadata& column_metadata, cudf::io::column_in_metadata child; child.set_name(col_names[read_index]).set_nullability(nullability[read_index]); if (precisions[read_index] > -1) { child.set_decimal_precision(precisions[read_index]); } - if (!is_int96.is_null()) { child.set_int96_timestamps(is_int96[read_index]); } if (!is_binary.is_null()) { child.set_output_as_binary(is_binary[read_index]); } if (is_map[read_index]) { child.set_list_column_as_map(); } if (!parquetFieldIds.is_null() && hasParquetFieldIds[read_index]) { @@ -610,7 +608,6 @@ int set_column_metadata(cudf::io::column_in_metadata& column_metadata, read_index = set_column_metadata(column_metadata.child(write_index), col_names, nullability, - is_int96, precisions, is_map, hasParquetFieldIds, @@ -629,7 +626,6 @@ void createTableMetaData(JNIEnv* env, jobjectArray& j_col_names, jintArray& j_children, jbooleanArray& j_col_nullability, - jbooleanArray& j_is_int96, jintArray& j_precisions, jbooleanArray& j_is_map, cudf::io::table_input_metadata& metadata, @@ -640,7 +636,6 @@ void createTableMetaData(JNIEnv* env, cudf::jni::auto_set_device(env); cudf::jni::native_jstringArray col_names(env, j_col_names); cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability); - cudf::jni::native_jbooleanArray is_int96(env, j_is_int96); cudf::jni::native_jintArray precisions(env, j_precisions); cudf::jni::native_jbooleanArray hasParquetFieldIds(env, j_hasParquetFieldIds); cudf::jni::native_jintArray parquetFieldIds(env, j_parquetFieldIds); @@ -661,9 +656,6 @@ void createTableMetaData(JNIEnv* env, if (precisions[read_index] > -1) { metadata.column_metadata[write_index].set_decimal_precision(precisions[read_index]); } - if (!is_int96.is_null()) { - metadata.column_metadata[write_index].set_int96_timestamps(is_int96[read_index]); - } if (!is_binary.is_null()) { metadata.column_metadata[write_index].set_output_as_binary(is_binary[read_index]); } @@ -676,7 +668,6 @@ void createTableMetaData(JNIEnv* env, read_index = set_column_metadata(metadata.column_metadata[write_index], cpp_names, col_nullability, - is_int96, precisions, is_map, hasParquetFieldIds, @@ -2098,7 +2089,6 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, jobjectArray j_metadata_values, jint j_compression, jint j_stats_freq, - jbooleanArray j_isInt96, jintArray j_precisions, jbooleanArray j_is_map, jbooleanArray j_is_binary, @@ -2125,7 +2115,6 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env, j_col_names, j_children, j_col_nullability, - j_isInt96, j_precisions, j_is_map, metadata, @@ -2175,7 +2164,6 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, jobjectArray j_metadata_values, jint j_compression, jint j_stats_freq, - jbooleanArray j_isInt96, jintArray j_precisions, jbooleanArray j_is_map, jbooleanArray j_is_binary, @@ -2199,7 +2187,6 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env, j_col_names, j_children, j_col_nullability, - j_isInt96, j_precisions, j_is_map, metadata, @@ -2392,8 +2379,6 @@ Java_ai_rapids_cudf_Table_writeORCBufferBegin(JNIEnv* env, using namespace cudf::io; using namespace cudf::jni; table_input_metadata metadata; - // ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue. - jbooleanArray j_is_int96 = NULL; // ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue. jbooleanArray j_hasParquetFieldIds = NULL; jintArray j_parquetFieldIds = NULL; @@ -2405,7 +2390,6 @@ Java_ai_rapids_cudf_Table_writeORCBufferBegin(JNIEnv* env, j_col_names, j_children, j_col_nullability, - j_is_int96, j_precisions, j_is_map, metadata, @@ -2467,8 +2451,6 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCFileBegin(JNIEnv* env, using namespace cudf::jni; cudf::jni::native_jstring output_path(env, j_output_path); table_input_metadata metadata; - // ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue. - jbooleanArray j_is_int96 = NULL; // ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue. jbooleanArray j_hasParquetFieldIds = NULL; jintArray j_parquetFieldIds = NULL; @@ -2479,7 +2461,6 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCFileBegin(JNIEnv* env, j_col_names, j_children, j_col_nullability, - j_is_int96, j_precisions, j_is_map, metadata, From 239101ffb8ba06f04e14dd67dd200bfa3914dd39 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Sun, 2 Jun 2024 19:40:18 +0000 Subject: [PATCH 10/12] remove int96 timestamps from java --- .../ai/rapids/cudf/ColumnWriterOptions.java | 49 ++++++------------- .../CompressionMetadataWriterOptions.java | 5 -- java/src/main/java/ai/rapids/cudf/Table.java | 6 --- 3 files changed, 15 insertions(+), 45 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java index a95c5f58f09..51bf478875c 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java @@ -27,8 +27,6 @@ * The native also uses the same "column_in_metadata" for both Parquet and ORC. */ public class ColumnWriterOptions { - // `isTimestampTypeInt96` is ignored in ORC - private boolean isTimestampTypeInt96; private int precision; private boolean isNullable; private boolean isMap = false; @@ -134,14 +132,14 @@ protected ColumnWriterOptions withDecimal(String name, int precision, return new ColumnWriterOptions(name, false, precision, isNullable, parquetFieldId); } - protected ColumnWriterOptions withTimestamp(String name, boolean isInt96, + protected ColumnWriterOptions withTimestamp(String name, boolean isNullable) { - return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable); + return new ColumnWriterOptions(name, UNKNOWN_PRECISION, isNullable); } - protected ColumnWriterOptions withTimestamp(String name, boolean isInt96, + protected ColumnWriterOptions withTimestamp(String name, boolean isNullable, int parquetFieldId) { - return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable, parquetFieldId); + return new ColumnWriterOptions(name, UNKNOWN_PRECISION, isNullable, parquetFieldId); } protected ColumnWriterOptions withBinary(String name, boolean isNullable) { @@ -301,8 +299,8 @@ public T withBinaryColumn(String name, boolean nullable) { * Set a timestamp child meta data * @return this for chaining. */ - public T withTimestampColumn(String name, boolean isInt96, boolean nullable, int parquetFieldId) { - children.add(withTimestamp(name, isInt96, nullable, parquetFieldId)); + public T withTimestampColumn(String name, boolean nullable, int parquetFieldId) { + children.add(withTimestamp(name, nullable, parquetFieldId)); return (T) this; } @@ -310,8 +308,8 @@ public T withTimestampColumn(String name, boolean isInt96, boolean nullable, int * Set a timestamp child meta data * @return this for chaining. */ - public T withTimestampColumn(String name, boolean isInt96, boolean nullable) { - children.add(withTimestamp(name, isInt96, nullable)); + public T withTimestampColumn(String name, boolean nullable) { + children.add(withTimestamp(name, nullable)); return (T) this; } @@ -319,8 +317,8 @@ public T withTimestampColumn(String name, boolean isInt96, boolean nullable) { * Set a timestamp child meta data * @return this for chaining. */ - public T withTimestampColumn(String name, boolean isInt96) { - withTimestampColumn(name, isInt96, false); + public T withTimestampColumn(String name) { + withTimestampColumn(name, false); return (T) this; } @@ -328,31 +326,29 @@ public T withTimestampColumn(String name, boolean isInt96) { * Set a timestamp child meta data * @return this for chaining. */ - public T withNullableTimestampColumn(String name, boolean isInt96) { - withTimestampColumn(name, isInt96, true); + public T withNullableTimestampColumn(String name) { + withTimestampColumn(name, true); return (T) this; } public abstract V build(); } - public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96, + public ColumnWriterOptions(String columnName, int precision, boolean isNullable) { - this.isTimestampTypeInt96 = isTimestampTypeInt96; this.precision = precision; this.isNullable = isNullable; this.columnName = columnName; } - public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96, + public ColumnWriterOptions(String columnName, int precision, boolean isNullable, int parquetFieldId) { - this(columnName, isTimestampTypeInt96, precision, isNullable); + this(columnName, precision, isNullable); this.hasParquetFieldId = true; this.parquetFieldId = parquetFieldId; } public ColumnWriterOptions(String columnName, boolean isNullable) { - this.isTimestampTypeInt96 = false; this.precision = UNKNOWN_PRECISION; this.isNullable = isNullable; this.columnName = columnName; @@ -378,15 +374,6 @@ protected interface IntArrayProducer { int[] apply(ColumnWriterOptions opt); } - boolean[] getFlatIsTimeTypeInt96() { - boolean[] ret = {isTimestampTypeInt96}; - if (childColumnOptions.length > 0) { - return getFlatBooleans(ret, (opt) -> opt.getFlatIsTimeTypeInt96()); - } else { - return ret; - } - } - protected boolean[] getFlatBooleans(boolean[] ret, ByteArrayProducer producer) { boolean[][] childResults = new boolean[childColumnOptions.length][]; int totalChildrenFlatLength = ret.length; @@ -623,12 +610,6 @@ public int getPrecision() { return precision; } - /** - * Returns true if the writer is expected to write timestamps in INT96 - */ - public boolean isTimestampTypeInt96() { - return isTimestampTypeInt96; - } /** * Return the child columnOptions for this column diff --git a/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java b/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java index 27eb1be565a..bddf71914c7 100644 --- a/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java @@ -31,11 +31,6 @@ protected CompressionMetadataWriterOptions(Builder builder) { this.metadata = builder.metadata; } - @Override - boolean[] getFlatIsTimeTypeInt96() { - return super.getFlatBooleans(new boolean[]{}, (opt) -> opt.getFlatIsTimeTypeInt96()); - } - @Override int[] getFlatPrecision() { return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatPrecision()); diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 4e737451ed6..4eb2bb8482a 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -323,7 +323,6 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames, * @param metadataValues Metadata values corresponding to metadataKeys * @param compression native compression codec ID * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 * @param precisions precision list containing all the precisions of the decimal types in * the columns * @param isMapValues true if a column is a map @@ -339,7 +338,6 @@ private static native long writeParquetFileBegin(String[] columnNames, String[] metadataValues, int compression, int statsFreq, - boolean[] isInt96, int[] precisions, boolean[] isMapValues, boolean[] isBinaryValues, @@ -357,7 +355,6 @@ private static native long writeParquetFileBegin(String[] columnNames, * @param metadataValues Metadata values corresponding to metadataKeys * @param compression native compression codec ID * @param statsFreq native statistics frequency ID - * @param isInt96 true if timestamp type is int96 * @param precisions precision list containing all the precisions of the decimal types in * the columns * @param isMapValues true if a column is a map @@ -373,7 +370,6 @@ private static native long writeParquetBufferBegin(String[] columnNames, String[] metadataValues, int compression, int statsFreq, - boolean[] isInt96, int[] precisions, boolean[] isMapValues, boolean[] isBinaryValues, @@ -1753,7 +1749,6 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { options.getMetadataValues(), options.getCompressionType().nativeId, options.getStatisticsFrequency().nativeId, - options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), options.getFlatIsMap(), options.getFlatIsBinary(), @@ -1773,7 +1768,6 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons options.getMetadataValues(), options.getCompressionType().nativeId, options.getStatisticsFrequency().nativeId, - options.getFlatIsTimeTypeInt96(), options.getFlatPrecision(), options.getFlatIsMap(), options.getFlatIsBinary(), From f16f3e3cf61be85c91f5cef214bc03f69ef648de Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Mon, 3 Jun 2024 18:34:53 +0000 Subject: [PATCH 11/12] fix the constructor for ColumnWriterOptions numeric and timestamp types --- .../java/ai/rapids/cudf/ColumnWriterOptions.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java index 51bf478875c..9e5103c9b3c 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java @@ -124,22 +124,22 @@ protected ColumnWriterOptions withColumn(String name, boolean isNullable, int pa protected ColumnWriterOptions withDecimal(String name, int precision, boolean isNullable) { - return new ColumnWriterOptions(name, false, precision, isNullable); + return new ColumnWriterOptions(name, precision, isNullable); } protected ColumnWriterOptions withDecimal(String name, int precision, boolean isNullable, int parquetFieldId) { - return new ColumnWriterOptions(name, false, precision, isNullable, parquetFieldId); + return new ColumnWriterOptions(name, precision, isNullable, parquetFieldId); } protected ColumnWriterOptions withTimestamp(String name, boolean isNullable) { - return new ColumnWriterOptions(name, UNKNOWN_PRECISION, isNullable); + return new ColumnWriterOptions(name, isNullable); } protected ColumnWriterOptions withTimestamp(String name, boolean isNullable, int parquetFieldId) { - return new ColumnWriterOptions(name, UNKNOWN_PRECISION, isNullable, parquetFieldId); + return new ColumnWriterOptions(name, isNullable, parquetFieldId); } protected ColumnWriterOptions withBinary(String name, boolean isNullable) { @@ -334,15 +334,13 @@ public T withNullableTimestampColumn(String name) { public abstract V build(); } - public ColumnWriterOptions(String columnName, - int precision, boolean isNullable) { + public ColumnWriterOptions(String columnName, int precision, boolean isNullable) { this.precision = precision; this.isNullable = isNullable; this.columnName = columnName; } - public ColumnWriterOptions(String columnName, - int precision, boolean isNullable, int parquetFieldId) { + public ColumnWriterOptions(String columnName, int precision, boolean isNullable, int parquetFieldId) { this(columnName, precision, isNullable); this.hasParquetFieldId = true; this.parquetFieldId = parquetFieldId; From 51e785b4264e05f3a908d02bb507d5adbfe3ec02 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 4 Jun 2024 08:38:17 +0000 Subject: [PATCH 12/12] hopefully the final fix for java tests --- java/src/test/java/ai/rapids/cudf/TableTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index dc6eb55fc6a..044610d377e 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -9031,7 +9031,7 @@ void testParquetWriteWithFieldId() throws IOException { .withColumn(true, "c1", -1) .withDecimalColumn("c2", 9, true, 2) .withStructColumn(sBuilder.build()) - .withTimestampColumn("c4", true, true, -4) + .withTimestampColumn("c4", true, -4) .withColumns( true, "c5") .build();