diff --git a/src/main/cpp/src/GpuTimeZoneDBJni.cpp b/src/main/cpp/src/GpuTimeZoneDBJni.cpp index 55639853de..8f96612f32 100644 --- a/src/main/cpp/src/GpuTimeZoneDBJni.cpp +++ b/src/main/cpp/src/GpuTimeZoneDBJni.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2023, NVIDIA CORPORATION. +/* Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,4 +50,50 @@ Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_convertUTCTimestampColumnToTimeZo } CATCH_STD(env, 0); } + +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCS(JNIEnv* env, + jclass, + jlong input_handle, + jlong duration_handle, + jlong transitions_handle, + jint tz_index) +{ + JNI_NULL_CHECK(env, input_handle, "column is null", 0); + JNI_NULL_CHECK(env, duration_handle, "column is null", 0); + JNI_NULL_CHECK(env, transitions_handle, "column is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const input = reinterpret_cast(input_handle); + auto const duration = reinterpret_cast(duration_handle); + auto const transitions = reinterpret_cast(transitions_handle); + auto const index = static_cast(tz_index); + return cudf::jni::ptr_as_jlong( + spark_rapids_jni::time_add(*input, *duration, *transitions, index).release()); + } + CATCH_STD(env, 0); +} + +JNIEXPORT jlong JNICALL +Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCC(JNIEnv* env, + jclass, + jlong input_handle, + jlong duration_handle, + jlong transitions_handle, + jint tz_index) +{ + JNI_NULL_CHECK(env, input_handle, "column is null", 0); + JNI_NULL_CHECK(env, duration_handle, "column is null", 0); + JNI_NULL_CHECK(env, transitions_handle, "column is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const input = reinterpret_cast(input_handle); + auto const duration = reinterpret_cast(duration_handle); + auto const transitions = reinterpret_cast(transitions_handle); + auto const index = static_cast(tz_index); + return cudf::jni::ptr_as_jlong( + spark_rapids_jni::time_add(*input, *duration, *transitions, index).release()); + } + CATCH_STD(env, 0); +} } diff --git a/src/main/cpp/src/timezones.cu b/src/main/cpp/src/timezones.cu index 43fff55b3b..02a28ca5a2 100644 --- a/src/main/cpp/src/timezones.cu +++ b/src/main/cpp/src/timezones.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -79,7 +80,7 @@ struct convert_timestamp_tz_functor { (to_utc ? tz_instants : utc_instants).data() + tz_transitions.element_offset(0), static_cast(list_size)); - auto const it = thrust::upper_bound( + auto const it = thrust::lower_bound( thrust::seq, transition_times.begin(), transition_times.end(), epoch_seconds); auto const idx = static_cast(thrust::distance(transition_times.begin(), it)); auto const list_offset = tz_transitions.element_offset(idx - 1); @@ -118,6 +119,222 @@ auto convert_timestamp_tz(column_view const& input, return results; } +struct time_add_functor { + using duration_type = typename cudf::timestamp_us::duration; + + lists_column_device_view const transitions; + + size_type const tz_index; + + cudf::duration_us const duration_scalar; + + __device__ inline int64_t get_utc_offset_seconds( + cudf::timestamp_us const& timestamp, + cudf::device_span const& transition_times_utc, + cudf::device_span const& transition_times_tz, + cudf::device_span const& transition_offsets) const + { + auto const epoch_seconds_utc = static_cast( + cuda::std::chrono::duration_cast(timestamp.time_since_epoch()).count()); + // Binary search on utc to find the correct offset to convert utc to local + auto const utc_it = thrust::upper_bound( + thrust::seq, transition_times_utc.begin(), transition_times_utc.end(), epoch_seconds_utc); + auto const utc_idx = + static_cast(thrust::distance(transition_times_utc.begin(), utc_it)); + auto const to_local_offset = transition_offsets[utc_idx - 1]; + return to_local_offset; + } + + __device__ inline cudf::timestamp_us convert_ts_to_utc_tz_overlap_prefers_original( + cudf::timestamp_us const& timestamp, + int64_t const prefered_offset, + cudf::device_span const& transition_times_utc, + cudf::device_span const& transition_times_tz, + cudf::device_span const& transition_offsets) const + { + auto const result_epoch_seconds = static_cast( + cuda::std::chrono::duration_cast(timestamp.time_since_epoch()).count()); + + // Binary search on local to find the correct offset to convert local to utc + auto const local_it = thrust::lower_bound( + thrust::seq, transition_times_tz.begin(), transition_times_tz.end(), result_epoch_seconds); + auto local_idx = + static_cast(thrust::distance(transition_times_tz.begin(), local_it)); + + // In GpuTimeZoneDB, we build the transition list with + // (utcInstant, utcInstant + OffsetBefore, OffsetAfter) if it is a overlap. + // But we actually need to binary search on the utcInstant + OffsetBefore here + // to find the correct offset to convert local to utc. To reuse the data, we need to + // add a special post processing here, to make sure we get the correct id that + // utcInstant + OffsetBefore is larger than the result_epoch_seconds. + auto const temp_offset = transition_offsets[local_idx]; + + // We don't want to check this if the idx is the last because they are just endpoints + bool in_overlap = false; + if (transition_times_utc[local_idx] != std::numeric_limits::max() && + transition_times_utc[local_idx] + temp_offset <= result_epoch_seconds) { + local_idx += 1; + // This if only happens when the result is in the overlap + in_overlap = true; + } + auto to_utc_offset = transition_offsets[local_idx - 1]; + auto const upper_bound_epoch = transition_times_tz[local_idx - 1]; + auto const upper_bound_utc = transition_times_utc[local_idx - 1]; + + // If the result is in the overlap, try to select the original offset if possible + auto const early_offset = static_cast(upper_bound_epoch - upper_bound_utc); + bool const is_gap = (upper_bound_utc + to_utc_offset == upper_bound_epoch); + if (!is_gap && upper_bound_utc != std::numeric_limits::min() && + upper_bound_utc != std::numeric_limits::max()) { // overlap + if (in_overlap) { + // By default, to_utc_offset is the offsetAfter, so unless to_utc_offset is equal to + // early_offset, we need to use the original offsetBefore as the default offset. + if (to_utc_offset != prefered_offset) { to_utc_offset = early_offset; } + } + } + + auto const to_utc_offset_duration = cuda::std::chrono::duration_cast( + cudf::duration_s{static_cast(to_utc_offset)}); + + // subtract the offset to convert local to utc + return timestamp - to_utc_offset_duration; + } + + __device__ inline cudf::timestamp_us plus_with_tz(cudf::timestamp_us const& timestamp, + cudf::duration_us const& duration) const + { + if (duration == cudf::duration_us{0}) { return timestamp; } + + auto const utc_instants = transitions.child().child(0); + auto const tz_instants = transitions.child().child(1); + auto const utc_offsets = transitions.child().child(2); + + auto const tz_transitions = cudf::list_device_view{transitions, tz_index}; + auto const list_size = tz_transitions.size(); + + auto const transition_times_utc = cudf::device_span( + utc_instants.data() + tz_transitions.element_offset(0), + static_cast(list_size)); + + auto const transition_times_tz = cudf::device_span( + tz_instants.data() + tz_transitions.element_offset(0), + static_cast(list_size)); + + auto const transition_offsets = cudf::device_span( + utc_offsets.data() + tz_transitions.element_offset(0), + static_cast(list_size)); + + // In Spark, timeAdd will add the days of the duration to the timestamp first with `plusDays`, + // resolve the offset if the result is in the overlap, and then add the remainder microseconds + // to the results with `plus`, then resolve again. In java, the `plusDays` adds the days to the + // localDateTime in day's field, so we need take transitions into account. While the `plus` step + // adds the remainder microseconds to the localDateTime in microsecond's field, so it is + // equivalent to add the remainder to the utc result without resolving the offset. + auto const duration_value = + static_cast(cuda::std::chrono::duration_cast(duration).count()); + auto const microseconds_per_day = 86400000000ll; + auto const duration_days = (duration_value / microseconds_per_day) * microseconds_per_day; + auto const duration_remainder = duration_value - duration_days; + + // It will be the preferred offset when convert the result back to utc after adding the days + auto const to_local_offset = get_utc_offset_seconds( + timestamp, transition_times_utc, transition_times_tz, transition_offsets); + + auto const to_local_offset_duration = cuda::std::chrono::duration_cast( + cudf::duration_s{static_cast(to_local_offset)}); + + // Add the duration days to the local timestamp + auto const local_timestamp_res = + timestamp + to_local_offset_duration + cudf::duration_us{duration_days}; + + auto const result_epoch_seconds = static_cast( + cuda::std::chrono::duration_cast(local_timestamp_res.time_since_epoch()) + .count()); + + auto const res_utc_with_days = + convert_ts_to_utc_tz_overlap_prefers_original(local_timestamp_res, + to_local_offset, + transition_times_utc, + transition_times_tz, + transition_offsets); + + // Add the remainder duration to the result + return res_utc_with_days + cudf::duration_us{duration_remainder}; + } + + __device__ cudf::timestamp_us operator()(cudf::timestamp_us const& timestamp) const + { + return plus_with_tz(timestamp, duration_scalar); + } + + __device__ cudf::timestamp_us operator()(cudf::timestamp_us const& timestamp, + cudf::duration_us const& interval) const + { + return plus_with_tz(timestamp, interval); + } +}; + +auto time_add_with_tz(column_view const& input, + cudf::duration_scalar const& duration, + table_view const& transitions, + size_type tz_index, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // get the fixed transitions + auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream); + auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr}; + + if (!duration.is_valid()) { + // return a column of nulls + auto results = cudf::make_timestamp_column( + input.type(), input.size(), cudf::mask_state::ALL_NULL, stream, mr); + return results; + } + + auto results = cudf::make_timestamp_column(input.type(), + input.size(), + cudf::detail::copy_bitmask(input, stream, mr), + input.null_count(), + stream, + mr); + + thrust::transform(rmm::exec_policy(stream), + input.begin(), + input.end(), + results->mutable_view().begin(), + time_add_functor{fixed_transitions, tz_index, duration.value()}); + + return results; +} + +auto time_add_with_tz(column_view const& input, + column_view const& duration, + table_view const& transitions, + size_type tz_index, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + // get the fixed transitions + auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream); + auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr}; + + auto [null_mask, null_count] = + cudf::detail::bitmask_and(cudf::table_view{{input, duration}}, stream, mr); + + auto results = cudf::make_timestamp_column( + input.type(), input.size(), rmm::device_buffer(null_mask, stream), null_count, stream, mr); + + thrust::transform(rmm::exec_policy(stream), + input.begin(), + input.end(), + duration.begin(), + results->mutable_view().begin(), + time_add_functor{fixed_transitions, tz_index, cudf::duration_us{0}}); + + return results; +} + } // namespace namespace spark_rapids_jni { @@ -163,4 +380,38 @@ std::unique_ptr convert_utc_timestamp_to_timezone(column_view const& inp return convert_timestamp(input, transitions, tz_index, false, stream, mr); } +std::unique_ptr time_add(column_view const& input, + cudf::scalar const& duration, + table_view const& transitions, + size_type tz_index, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (input.type().id() != cudf::type_id::TIMESTAMP_MICROSECONDS) { + CUDF_FAIL("Unsupported timestamp unit for time add with timezone"); + } + if (duration.type().id() != cudf::type_id::DURATION_MICROSECONDS) { + CUDF_FAIL("Unsupported duration unit for time add with timezone"); + } + auto const duration_scalar = + dynamic_cast const&>(duration); + return time_add_with_tz(input, duration_scalar, transitions, tz_index, stream, mr); +} + +std::unique_ptr time_add(column_view const& input, + column_view const& duration, + table_view const& transitions, + size_type tz_index, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + if (input.type().id() != cudf::type_id::TIMESTAMP_MICROSECONDS) { + CUDF_FAIL("Unsupported timestamp unit for time add with timezone"); + } + if (duration.type().id() != cudf::type_id::DURATION_MICROSECONDS) { + CUDF_FAIL("Unsupported duration unit for time add with timezone"); + } + return time_add_with_tz(input, duration, transitions, tz_index, stream, mr); +} + } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/timezones.hpp b/src/main/cpp/src/timezones.hpp index c7ab3c0cc8..a7c3212f98 100644 --- a/src/main/cpp/src/timezones.hpp +++ b/src/main/cpp/src/timezones.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,4 +66,33 @@ std::unique_ptr convert_utc_timestamp_to_timezone( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); -} // namespace spark_rapids_jni \ No newline at end of file +/** + * @brief Add intervals to a column of timestamps. + * + * The transition rules are in enclosed in a table, and the index corresponding to the + * specific timezone is given. + * + * @param input the column of input timestamps in UTC + * @param intervals the column of intervals to add to the input timestamps + * @param transitions the table of transitions for all timezones + * @param tz_index the index of the row in `transitions` corresponding to the specific timezone + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned timestamp column's memory + */ +std::unique_ptr time_add( + cudf::column_view const& input, + cudf::scalar const& duration, + cudf::table_view const& transitions, + cudf::size_type tz_index, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +std::unique_ptr time_add( + cudf::column_view const& input, + cudf::column_view const& intervals, + cudf::table_view const& transitions, + cudf::size_type tz_index, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/tests/timezones.cpp b/src/main/cpp/tests/timezones.cpp index 9801a3c0a4..7b212835ff 100644 --- a/src/main/cpp/tests/timezones.cpp +++ b/src/main/cpp/tests/timezones.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/com/nvidia/spark/rapids/jni/GpuTimeZoneDB.java b/src/main/java/com/nvidia/spark/rapids/jni/GpuTimeZoneDB.java index 643db278df..c036c2f5e2 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/GpuTimeZoneDB.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/GpuTimeZoneDB.java @@ -17,8 +17,11 @@ package com.nvidia.spark.rapids.jni; import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.ColumnView; +import ai.rapids.cudf.CudfAccessor; import ai.rapids.cudf.DType; import ai.rapids.cudf.HostColumnVector; +import ai.rapids.cudf.Scalar; import ai.rapids.cudf.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,12 +205,7 @@ private void closeResources() { } public static ColumnVector fromTimestampToUtcTimestamp(ColumnVector input, ZoneId currentTimeZone) { - // TODO: Remove this check when all timezones are supported - // (See https://github.com/NVIDIA/spark-rapids/issues/6840) - if (!isSupportedTimeZone(currentTimeZone)) { - throw new IllegalArgumentException(String.format("Unsupported timezone: %s", - currentTimeZone.toString())); - } + assertTimeZoneSupported(currentTimeZone); cacheDatabase(); Integer tzIndex = instance.getZoneIDMap().get(currentTimeZone.normalized().toString()); try (Table transitions = instance.getTransitions()) { @@ -217,12 +215,7 @@ public static ColumnVector fromTimestampToUtcTimestamp(ColumnVector input, ZoneI } public static ColumnVector fromUtcTimestampToTimestamp(ColumnVector input, ZoneId desiredTimeZone) { - // TODO: Remove this check when all timezones are supported - // (See https://github.com/NVIDIA/spark-rapids/issues/6840) - if (!isSupportedTimeZone(desiredTimeZone)) { - throw new IllegalArgumentException(String.format("Unsupported timezone: %s", - desiredTimeZone.toString())); - } + assertTimeZoneSupported(desiredTimeZone); cacheDatabase(); Integer tzIndex = instance.getZoneIDMap().get(desiredTimeZone.normalized().toString()); try (Table transitions = instance.getTransitions()) { @@ -230,6 +223,35 @@ public static ColumnVector fromUtcTimestampToTimestamp(ColumnVector input, ZoneI transitions.getNativeView(), tzIndex)); } } + + public static ColumnVector timeAdd(ColumnVector input, Scalar duration, ZoneId currentTimeZone) { + assertTimeZoneSupported(currentTimeZone); + cacheDatabase(); + Integer tzIndex = instance.getZoneIDMap().get(currentTimeZone.normalized().toString()); + try (Table transitions = instance.getTransitions()) { + return new ColumnVector(timeAddCS(input.getNativeView(), CudfAccessor.getScalarHandle(duration), + transitions.getNativeView(), tzIndex)); + } + } + + public static ColumnVector timeAdd(ColumnVector input, ColumnView duration, ZoneId currentTimeZone) { + assertTimeZoneSupported(currentTimeZone); + cacheDatabase(); + Integer tzIndex = instance.getZoneIDMap().get(currentTimeZone.normalized().toString()); + try (Table transitions = instance.getTransitions()) { + return new ColumnVector(timeAddCC(input.getNativeView(), duration.getNativeView(), + transitions.getNativeView(), tzIndex)); + } + } + + private static void assertTimeZoneSupported(ZoneId zoneId) { + // TODO: Remove this check when all timezones are supported + // (See https://github.com/NVIDIA/spark-rapids/issues/6840) + if (!isSupportedTimeZone(zoneId)) { + throw new IllegalArgumentException(String.format("Unsupported timezone: %s", + zoneId.toString())); + } + } // TODO: Deprecate this API when we support all timezones // (See https://github.com/NVIDIA/spark-rapids/issues/6840) @@ -316,6 +338,12 @@ private void loadData() { ); } }); + ZoneOffsetTransition last = transitions.get(transitions.size() - 1); + // Add Long max and the last offset at the end so binary search always finds a value. + data.add( + new HostColumnVector.StructData(Long.MAX_VALUE, Long.MAX_VALUE, + last.getOffsetAfter().getTotalSeconds()) + ); } masterTransitions.add(data); zoneIdToTable.put(zoneId.getId(), idx); @@ -371,4 +399,8 @@ List getHostFixedTransitions(String zoneId) { private static native long convertTimestampColumnToUTC(long input, long transitions, int tzIndex); private static native long convertUTCTimestampColumnToTimeZone(long input, long transitions, int tzIndex); + + private static native long timeAddCS(long input, long duration, long transitions, int tzIndex); + + private static native long timeAddCC(long input, long duration, long transitions, int tzIndex); } diff --git a/src/test/java/com/nvidia/spark/rapids/jni/TimeZoneTest.java b/src/test/java/com/nvidia/spark/rapids/jni/TimeZoneTest.java index 7aaec496de..524dfbf664 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/TimeZoneTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/TimeZoneTest.java @@ -1,5 +1,5 @@ /* -* Copyright (c) 2023, NVIDIA CORPORATION. +* Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.DType; +import ai.rapids.cudf.Scalar; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -51,7 +53,7 @@ void databaseLoadedTest() { transitions = instance.getHostFixedTransitions("Asia/Shanghai"); assertNotNull(transitions); ZoneId shanghai = ZoneId.of("Asia/Shanghai").normalized(); - assertEquals(shanghai.getRules().getTransitions().size() + 1, transitions.size()); + assertEquals(shanghai.getRules().getTransitions().size() + 2, transitions.size()); } @Test @@ -227,5 +229,76 @@ void convertFromUtcMicroSecondsTest() { assertColumnsAreEqual(expected, actual); } } + + @Test + void timeAddCCTest() { + // Some edge cases related to overlap transitions + try (ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -57954592249912415L, + -2177453143500000L, + -43013395848980300L, + -2177485200000000L, + -2177481695679933L, + -2177481944610644L, + 0L, + -2177481944610644L, + -2177481944610644L, + -44623990055559136L, + -44623987543559136L); + ColumnVector duration = ColumnVector.durationMicroSecondsFromBoxedLongs( + 56087020233685111L, + 1000000L, + 173001810506226873L, + 1000000L, + 1000000L, + 1000000L, + 173001810506226873L, + 86399999999L, + 86400000000L, + 43890193694846543L, + 43890193694846543L + ); + ColumnVector expected = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -1867571673227304L, + -2177453142500000L, + 129988415000246573L, + -2177485199000000L, + -2177481694679933L, + -2177481943610644L, + 173001810506226873L, + -2177395544610645L, + -2177395201610644L, + -733799617712593L, + -733797105712593L); + ColumnVector actual = GpuTimeZoneDB.timeAdd(input, duration, + ZoneId.of("Asia/Shanghai"))) { + assertColumnsAreEqual(expected, actual); + } + } + + @Test + void timeAddCSTest() { + try (ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -57954592249912415L, + -2177453143500000L, + -43013395848980300L, + -2177485200000000L, + -2177481695679933L, + -2177481944610644L, + 0L); + Scalar duration = Scalar.durationFromLong(DType.DURATION_MICROSECONDS, 1800000000L); + ColumnVector expected = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -57954590449912415L, + -2177451343500000L, + -43013394048980300L, + -2177483400000000L, + -2177479895679933L, + -2177480144610644L, + 1800000000L); + ColumnVector actual = GpuTimeZoneDB.timeAdd(input, duration, + ZoneId.of("Asia/Shanghai"))) { + assertColumnsAreEqual(expected, actual); + } + } }