Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding timeAdd with timezone kernel #1700

Closed
wants to merge 18 commits into from
Closed
48 changes: 47 additions & 1 deletion src/main/cpp/src/GpuTimeZoneDBJni.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2023, NVIDIA CORPORATION.
/* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,4 +50,50 @@ Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_convertUTCTimestampColumnToTimeZo
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL
res-life marked this conversation as resolved.
Show resolved Hide resolved
Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCS(JNIEnv* env,
jclass,
jlong input_handle,
jlong duration_handle,
jlong transitions_handle,
jint tz_index)
{
JNI_NULL_CHECK(env, input_handle, "column is null", 0);
JNI_NULL_CHECK(env, duration_handle, "column is null", 0);
JNI_NULL_CHECK(env, transitions_handle, "column is null", 0);
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a null check on the duration handle too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

cudf::jni::auto_set_device(env);
auto const input = reinterpret_cast<cudf::column_view const*>(input_handle);
auto const duration = reinterpret_cast<cudf::scalar*>(duration_handle);
auto const transitions = reinterpret_cast<cudf::table_view const*>(transitions_handle);
auto const index = static_cast<cudf::size_type>(tz_index);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::time_add(*input, *duration, *transitions, index).release());
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_GpuTimeZoneDB_timeAddCC(JNIEnv* env,
jclass,
jlong input_handle,
jlong duration_handle,
jlong transitions_handle,
jint tz_index)
{
JNI_NULL_CHECK(env, input_handle, "column is null", 0);
JNI_NULL_CHECK(env, duration_handle, "column is null", 0);
JNI_NULL_CHECK(env, transitions_handle, "column is null", 0);
try {
cudf::jni::auto_set_device(env);
auto const input = reinterpret_cast<cudf::column_view const*>(input_handle);
auto const duration = reinterpret_cast<cudf::column_view const*>(duration_handle);
auto const transitions = reinterpret_cast<cudf::table_view const*>(transitions_handle);
auto const index = static_cast<cudf::size_type>(tz_index);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::time_add(*input, *duration, *transitions, index).release());
}
CATCH_STD(env, 0);
}
}
255 changes: 253 additions & 2 deletions src/main/cpp/src/timezones.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/lists/list_device_view.cuh>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>

Expand Down Expand Up @@ -79,7 +80,7 @@ struct convert_timestamp_tz_functor {
(to_utc ? tz_instants : utc_instants).data<int64_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

auto const it = thrust::upper_bound(
auto const it = thrust::lower_bound(
thrust::seq, transition_times.begin(), transition_times.end(), epoch_seconds);
auto const idx = static_cast<size_type>(thrust::distance(transition_times.begin(), it));
auto const list_offset = tz_transitions.element_offset(idx - 1);
Expand Down Expand Up @@ -118,6 +119,222 @@ auto convert_timestamp_tz(column_view const& input,
return results;
}

struct time_add_functor {
using duration_type = typename cudf::timestamp_us::duration;

lists_column_device_view const transitions;

size_type const tz_index;

cudf::duration_us const duration_scalar;

__device__ inline int64_t get_utc_offset_seconds(
cudf::timestamp_us const& timestamp,
cudf::device_span<const int64_t> const& transition_times_utc,
cudf::device_span<const int64_t> const& transition_times_tz,
cudf::device_span<const int32_t> const& transition_offsets) const
{
auto const epoch_seconds_utc = static_cast<int64_t>(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
cuda::std::chrono::duration_cast<cudf::duration_s>(timestamp.time_since_epoch()).count());
// Binary search on utc to find the correct offset to convert utc to local
auto const utc_it = thrust::upper_bound(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we do this some place else? Shouldn't transitioning between time zones like this be a static inline function? And if not can you add comments as to why this is different from GpuTimeZoneDB.fromUtcTimestampToTimestamp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this and convert_timestamp_tz_functor is that we want to keep the to_local_offset. I updated the comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is a enough code to make it common. If we take the time to make the proper abstractions on the CUDA side then adding DST support should have code changes restricted primarily to one common place. But I will leave that up to you and the rest of the people working on timezone support.

thrust::seq, transition_times_utc.begin(), transition_times_utc.end(), epoch_seconds_utc);
auto const utc_idx =
static_cast<size_type>(thrust::distance(transition_times_utc.begin(), utc_it));
auto const to_local_offset = transition_offsets[utc_idx - 1];
return to_local_offset;
}

__device__ inline cudf::timestamp_us convert_ts_to_utc_tz_overlap_prefers_original(
cudf::timestamp_us const& timestamp,
int64_t const prefered_offset,
cudf::device_span<const int64_t> const& transition_times_utc,
cudf::device_span<const int64_t> const& transition_times_tz,
cudf::device_span<const int32_t> const& transition_offsets) const
{
auto const result_epoch_seconds = static_cast<int64_t>(
cuda::std::chrono::duration_cast<cudf::duration_s>(timestamp.time_since_epoch()).count());

// Binary search on local to find the correct offset to convert local to utc
auto const local_it = thrust::lower_bound(
thrust::seq, transition_times_tz.begin(), transition_times_tz.end(), result_epoch_seconds);
auto local_idx =
static_cast<size_type>(thrust::distance(transition_times_tz.begin(), local_it));

// In GpuTimeZoneDB, we build the transition list with
// (utcInstant, utcInstant + OffsetBefore, OffsetAfter) if it is a overlap.
// But we actually need to binary search on the utcInstant + OffsetBefore here
// to find the correct offset to convert local to utc. To reuse the data, we need to
// add a special post processing here, to make sure we get the correct id that
// utcInstant + OffsetBefore is larger than the result_epoch_seconds.
auto const temp_offset = transition_offsets[local_idx];

// We don't want to check this if the idx is the last because they are just endpoints
bool in_overlap = false;
if (transition_times_utc[local_idx] != std::numeric_limits<int64_t>::max() &&
transition_times_utc[local_idx] + temp_offset <= result_epoch_seconds) {
local_idx += 1;
// This if only happens when the result is in the overlap
in_overlap = true;
}
auto to_utc_offset = transition_offsets[local_idx - 1];
auto const upper_bound_epoch = transition_times_tz[local_idx - 1];
auto const upper_bound_utc = transition_times_utc[local_idx - 1];

// If the result is in the overlap, try to select the original offset if possible
auto const early_offset = static_cast<int64_t>(upper_bound_epoch - upper_bound_utc);
bool const is_gap = (upper_bound_utc + to_utc_offset == upper_bound_epoch);
if (!is_gap && upper_bound_utc != std::numeric_limits<int64_t>::min() &&
upper_bound_utc != std::numeric_limits<int64_t>::max()) { // overlap
if (in_overlap) {
// By default, to_utc_offset is the offsetAfter, so unless to_utc_offset is equal to
// early_offset, we need to use the original offsetBefore as the default offset.
if (to_utc_offset != prefered_offset) { to_utc_offset = early_offset; }
}
}

auto const to_utc_offset_duration = cuda::std::chrono::duration_cast<duration_type>(
cudf::duration_s{static_cast<int64_t>(to_utc_offset)});

// subtract the offset to convert local to utc
return timestamp - to_utc_offset_duration;
}

__device__ inline cudf::timestamp_us plus_with_tz(cudf::timestamp_us const& timestamp,
cudf::duration_us const& duration) const
{
if (duration == cudf::duration_us{0}) { return timestamp; }

auto const utc_instants = transitions.child().child(0);
auto const tz_instants = transitions.child().child(1);
auto const utc_offsets = transitions.child().child(2);

auto const tz_transitions = cudf::list_device_view{transitions, tz_index};
auto const list_size = tz_transitions.size();

auto const transition_times_utc = cudf::device_span<int64_t const>(
utc_instants.data<int64_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

auto const transition_times_tz = cudf::device_span<int64_t const>(
tz_instants.data<int64_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

auto const transition_offsets = cudf::device_span<int32_t const>(
utc_offsets.data<int32_t>() + tz_transitions.element_offset(0),
static_cast<size_t>(list_size));

// In Spark, timeAdd will add the days of the duration to the timestamp first with `plusDays`,
// resolve the offset if the result is in the overlap, and then add the remainder microseconds
// to the results with `plus`, then resolve again. In java, the `plusDays` adds the days to the
// localDateTime in day's field, so we need take transitions into account. While the `plus` step
// adds the remainder microseconds to the localDateTime in microsecond's field, so it is
// equivalent to add the remainder to the utc result without resolving the offset.
auto const duration_value =
static_cast<int64_t>(cuda::std::chrono::duration_cast<cudf::duration_us>(duration).count());
auto const microseconds_per_day = 86400000000ll;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we tested with more microseconds than in a day, along with a days value? I know that this is an odd corner case, but if we have to apply days separately from microseconds wouldn't this result in the same kind of errors if we assume that we can always convert microseconds_per_day into days? It looks like for DayTimeIntervalType microsecond values outside of a day are not technically allowed, so we are probably okay there, but CalendarIntervalType does not indicate any limitations like that, and reading the code there are none.

We might be able to work around this by restricting the usage of CalendarInterval, but I'd prefer not to need to do that.

Copy link
Collaborator Author

@thirtiseven thirtiseven Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following test can pass with this pr:

@pytest.mark.parametrize('data_gen', [(0, 86400000001), (1, 86400000001), (1, 86400000000*100)], ids=idfn)
@allow_non_gpu(*non_supported_tz_allow)
def test_timeadd_debug(data_gen):
    days, microseconds = data_gen
    assert_gpu_and_cpu_are_equal_collect(
        lambda spark: unary_op_df(spark, TimestampGen(), length=200000)
            .selectExpr("a + (interval {} days {} microseconds)".format(days, microseconds)))

It's an interesting case I hadn't thought of, I'll check the code to see if there is some validation in spark/pyspark to make this test pass.

If it is possible to pass an invalid CalendarInterval, we can have a simple workaround in plugin side to pass only the days to this kernel and then add the microseconds directly to the result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this case failed on 311. Spark seems to automatically use DayTimeIntervalType for interval literals in higher versions. Will match this behavior in plugin PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matched this in plugin datetimeExpressionsUtils.scala

auto const duration_days = (duration_value / microseconds_per_day) * microseconds_per_day;
auto const duration_remainder = duration_value - duration_days;

// It will be the preferred offset when convert the result back to utc after adding the days
auto const to_local_offset = get_utc_offset_seconds(
timestamp, transition_times_utc, transition_times_tz, transition_offsets);

auto const to_local_offset_duration = cuda::std::chrono::duration_cast<duration_type>(
cudf::duration_s{static_cast<int64_t>(to_local_offset)});

// Add the duration days to the local timestamp
auto const local_timestamp_res =
timestamp + to_local_offset_duration + cudf::duration_us{duration_days};

auto const result_epoch_seconds = static_cast<int64_t>(
cuda::std::chrono::duration_cast<cudf::duration_s>(local_timestamp_res.time_since_epoch())
.count());

auto const res_utc_with_days =
convert_ts_to_utc_tz_overlap_prefers_original(local_timestamp_res,
to_local_offset,
transition_times_utc,
transition_times_tz,
transition_offsets);

// Add the remainder duration to the result
return res_utc_with_days + cudf::duration_us{duration_remainder};
}

__device__ cudf::timestamp_us operator()(cudf::timestamp_us const& timestamp) const
{
return plus_with_tz(timestamp, duration_scalar);
}

__device__ cudf::timestamp_us operator()(cudf::timestamp_us const& timestamp,
cudf::duration_us const& interval) const
{
return plus_with_tz(timestamp, interval);
}
};

auto time_add_with_tz(column_view const& input,
cudf::duration_scalar<cudf::duration_us> const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// get the fixed transitions
auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream);
auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr};

if (!duration.is_valid()) {
// return a column of nulls
auto results = cudf::make_timestamp_column(
input.type(), input.size(), cudf::mask_state::ALL_NULL, stream, mr);
return results;
}

auto results = cudf::make_timestamp_column(input.type(),
input.size(),
cudf::detail::copy_bitmask(input, stream, mr),
input.null_count(),
stream,
mr);

thrust::transform(rmm::exec_policy(stream),
input.begin<cudf::timestamp_us>(),
input.end<cudf::timestamp_us>(),
results->mutable_view().begin<cudf::timestamp_us>(),
time_add_functor{fixed_transitions, tz_index, duration.value()});

return results;
}

auto time_add_with_tz(column_view const& input,
column_view const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// get the fixed transitions
auto const ft_cdv_ptr = column_device_view::create(transitions.column(0), stream);
auto const fixed_transitions = lists_column_device_view{*ft_cdv_ptr};

auto [null_mask, null_count] =
cudf::detail::bitmask_and(cudf::table_view{{input, duration}}, stream, mr);

auto results = cudf::make_timestamp_column(
input.type(), input.size(), rmm::device_buffer(null_mask, stream), null_count, stream, mr);

thrust::transform(rmm::exec_policy(stream),
input.begin<cudf::timestamp_us>(),
input.end<cudf::timestamp_us>(),
duration.begin<cudf::duration_us>(),
results->mutable_view().begin<cudf::timestamp_us>(),
time_add_functor{fixed_transitions, tz_index, cudf::duration_us{0}});

return results;
}

} // namespace

namespace spark_rapids_jni {
Expand Down Expand Up @@ -163,4 +380,38 @@ std::unique_ptr<column> convert_utc_timestamp_to_timezone(column_view const& inp
return convert_timestamp(input, transitions, tz_index, false, stream, mr);
}

std::unique_ptr<column> time_add(column_view const& input,
cudf::scalar const& duration,
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.type().id() != cudf::type_id::TIMESTAMP_MICROSECONDS) {
CUDF_FAIL("Unsupported timestamp unit for time add with timezone");
}
if (duration.type().id() != cudf::type_id::DURATION_MICROSECONDS) {
CUDF_FAIL("Unsupported duration unit for time add with timezone");
}
auto const duration_scalar =
dynamic_cast<cudf::duration_scalar<cudf::duration_us> const&>(duration);
return time_add_with_tz(input, duration_scalar, transitions, tz_index, stream, mr);
}

std::unique_ptr<column> time_add(column_view const& input,
column_view const& duration,
revans2 marked this conversation as resolved.
Show resolved Hide resolved
table_view const& transitions,
size_type tz_index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.type().id() != cudf::type_id::TIMESTAMP_MICROSECONDS) {
CUDF_FAIL("Unsupported timestamp unit for time add with timezone");
}
if (duration.type().id() != cudf::type_id::DURATION_MICROSECONDS) {
CUDF_FAIL("Unsupported duration unit for time add with timezone");
}
return time_add_with_tz(input, duration, transitions, tz_index, stream, mr);
}

} // namespace spark_rapids_jni
33 changes: 31 additions & 2 deletions src/main/cpp/src/timezones.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,4 +66,33 @@ std::unique_ptr<cudf::column> convert_utc_timestamp_to_timezone(
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
/**
* @brief Add intervals to a column of timestamps.
*
* The transition rules are in enclosed in a table, and the index corresponding to the
* specific timezone is given.
*
* @param input the column of input timestamps in UTC
* @param intervals the column of intervals to add to the input timestamps
* @param transitions the table of transitions for all timezones
* @param tz_index the index of the row in `transitions` corresponding to the specific timezone
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned timestamp column's memory
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: duration parameter has 2 types: numeric_scalar and column_view.
Extract a template for duration parameter to combine the two time_add functions to one function.

std::unique_ptr<cudf::column> time_add(
cudf::column_view const& input,
cudf::scalar const& duration,
cudf::table_view const& transitions,
cudf::size_type tz_index,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

std::unique_ptr<cudf::column> time_add(
cudf::column_view const& input,
cudf::column_view const& intervals,
cudf::table_view const& transitions,
cudf::size_type tz_index,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

} // namespace spark_rapids_jni
2 changes: 1 addition & 1 deletion src/main/cpp/tests/timezones.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading
Loading